[Zope3-checkins] CVS: Zope3/src/zope/app/services - event.py:1.8

Steve Alexander steve@cat-box.net
Mon, 3 Feb 2003 10:59:48 -0500


Update of /cvs-repository/Zope3/src/zope/app/services
In directory cvs.zope.org:/tmp/cvs-serv23130/src/zope/app/services

Modified Files:
	event.py 
Log Message:
Large event service reimplementation.


=== Zope3/src/zope/app/services/event.py 1.7 => 1.8 ===
--- Zope3/src/zope/app/services/event.py:1.7	Tue Jan 28 06:30:55 2003
+++ Zope3/src/zope/app/services/event.py	Mon Feb  3 10:59:16 2003
@@ -16,9 +16,11 @@
 $Id$
 """
 
+from __future__ import generators
 from zope.exceptions import NotFoundError
 
 from zope.app.interfaces.event import ISubscribingAware, IPublisher, IEvent
+from zope.app.interfaces.event import ISubscriber
 from zope.app.interfaces.traversing import ITraverser
 from zope.app.interfaces.services.event import ISubscriptionService
 from zope.app.interfaces.services.event import IEventChannel, IEventService
@@ -28,7 +30,7 @@
 from zope.component import ComponentLookupError
 from zope.app.component.nextservice import getNextService, queryNextService
 
-from zope.proxy.context import ContextMethod
+from zope.proxy.context import ContextMethod, ContextSuper
 from zope.proxy.introspection import removeAllProxies
 
 from zope.app.event.subs import Subscribable, SubscriptionTracker
@@ -38,31 +40,38 @@
     return getService(context, "Subscription")
 
 def subscribe(subscriber, event_type=IEvent, filter=None, context=None):
-    if context is None:
+    if context is None and not isinstance(subscriber, (int, str, unicode)):
         context = subscriber
     return getSubscriptionService(context).subscribe(
         subscriber, event_type, filter)
 
 def subscribeMany(subscriber, event_types=(IEvent,),
                   filter=None, context=None):
-    if context is None:
+    if context is None and not isinstance(subscriber, (int, str, unicode)):
         context = subscriber
     subscribe = getSubscriptionService(context).subscribe
     for event_type in event_types:
         subscribe(subscriber, event_type, filter)
 
-def unsubscribe(subscriber, event_type=None, filter=None, context=None):
-    if context is None:
+def unsubscribe(subscriber, event_type, filter=None, context=None):
+    if context is None and not isinstance(subscriber, (int, str, unicode)):
         context = subscriber
     return getSubscriptionService(context).unsubscribe(
         subscriber, event_type, filter)
 
-def listSubscriptions(subscriber, event_type=None, context=None):
-    if context is None:
+def unsubscribeAll(subscriber, event_type=IEvent, context=None):
+    if context is None and not isinstance(subscriber, (int, str, unicode)):
         context = subscriber
-    return getSubscriptionService(context).listSubscriptions(
+    return getSubscriptionService(context).unsubscribeAll(
         subscriber, event_type)
 
+def iterSubscriptions(subscriber=None, event_type=None, local_only=False,
+                      context=None):
+    if context is None and not isinstance(subscriber, (int, str, unicode)):
+        context = subscriber
+    return getSubscriptionService(context).iterSubscriptions(
+        subscriber, event_type, local_only)
+
 
 class EventChannel(Subscribable):
 
@@ -71,7 +80,7 @@
     # needs __init__ from zope.app.event.subs.Subscribable
 
     def _notify(clean_self, wrapped_self, event):
-        subscriptionsForEvent = clean_self._registry.getAllForObject(event) 
+        subscriptionsForEvent = clean_self._registry.getAllForObject(event)
         hubGet = getService(wrapped_self, "HubIds").getObject
         pathGet = getAdapter(wrapped_self, ITraverser).traverse
 
@@ -93,9 +102,17 @@
                     except NotFoundError:
                         badSubscribers[subscriber] = 1
                         continue
-                obj.notify(event)
+                # Get an ISubscriber adapter in the context of the object
+                # This is probably the right context to use.
+                #
+                # Using getAdapter rather than queryAdapter because if there
+                # is no ISubscriber adapter available, that is an application
+                # error that should be fixed. So, failing is appropriate, and
+                # adding this subscriber to badSubscribers is inappropriate.
+                getAdapter(obj, ISubscriber).notify(event)
 
         for subscriber in badSubscribers.keys():
+            # XXX this ought to be logged
             clean_self.unsubscribe(subscriber)
 
     def notify(wrapped_self, event):
@@ -133,6 +150,14 @@
     _subscribeToServiceInterface = IEvent
     _subscribeToServiceFilter = None
 
+    def subscribe(wrapped_self, reference, event_type=IEvent, filter=None):
+        if getattr(wrapped_self, "_v_ssecunbinding", None) is not None:
+            raise Exception(
+                'Cannot subscribe to a subscriber that is unbinding.')
+        return ContextSuper(ServiceSubscriberEventChannel, wrapped_self
+                ).subscribe(reference, event_type, filter)
+    subscribe = ContextMethod(subscribe)
+
     def bound(wrapped_self, name):
         "See IBindingAware"
         # Note: if a component is used for more than one service then
@@ -153,12 +178,27 @@
     def unbound(wrapped_self, name):
         "See IBindingAware"
         # see comment in "bound" above
+
         clean_self = removeAllProxies(wrapped_self)
-        getPath = getAdapter(wrapped_self, ITraverser).traverse
-        for subscription in clean_self._subscriptions:
-            subscribable = getPath(subscription[0])
-            subscribable.unsubscribe(wrapped_self)
-        clean_self._subscriptions = ()
+
+        # unsubscribe all subscriptions
+        hubIds = clean_self._hubIds
+        unsubscribeAll = wrapped_self.unsubscribeAll
+        try:
+            clean_self._v_ssecunbinding = True
+            while hubIds:
+                hubId = iter(hubIds).next()
+                unsubscribeAll(hubId, local_only=True)
+
+            paths = clean_self._paths
+            while paths:
+                path = iter(paths).next()
+                unsubscribeAll(path, local_only=True)
+        finally:
+            del clean_self._v_ssecunbinding
+
+        assert len(paths) == len(hubIds) == len(clean_self._registry) == 0
+
         clean_self._serviceName = None
     unbound = ContextMethod(unbound)
 
@@ -169,6 +209,8 @@
     * unsubscribe() asks the next higher service to unsubscribe if this
       service cannot.
 
+    * unsubscribeAll() does the same.
+
     * listSubscriptions() includes this service's subscriptions, and
       those of the next higher service.
     """
@@ -178,137 +220,79 @@
     _serviceName = None # should be replaced; usually done in "bound"
                         # method of a subclass that is IBindingAware
 
-    # uses (and needs) __init__ from zope.app.event.subs.Subscribable
-
-    def unsubscribe(wrapped_self, subscriber, event_type=None, filter=None):
-        originalSubscriber = subscriber
-        clean_self = removeAllProxies(wrapped_self)
-        subscribers, clean_subObj, subObj = clean_self._getSubscribers(
-            wrapped_self, subscriber)
+    # requires __init__ from zope.app.event.subs.Subscribable
 
+    def unsubscribe(wrapped_self, reference, event_type, filter=None):
+        # The point here is that if we can't unsubscribe here, we should
+        # allow the next event service to unsubscribe.
         try:
-            ev_sets = clean_self._getEventSets(subscribers)
+            ContextSuper(ServiceSubscribable, wrapped_self).unsubscribe(
+                reference, event_type, filter)
         except NotFoundError:
             next_service = queryNextService(wrapped_self,
-                                            clean_self._serviceName)
+                                            wrapped_self._serviceName)
             if next_service is not None:
-                next_service.unsubscribe(originalSubscriber,
-                                         event_type,
-                                         filter)
-            elif event_type is not None:
-                raise NotFoundError(originalSubscriber,
-                                    event_type,
-                                    filter)
-            return
-
-        # XXX need to check if subObj is not None?
-        subscribingaware  = queryAdapter(subObj, ISubscribingAware)
-
-        clean_self._p_changed = 1
-
-        if event_type is not None:
-            # we have to clean out one and only one subscription of this
-            # subscriber for event_type, filter (there may be more, even for
-            # this exact combination of subscriber, event_type, filter; we
-            # only delete *one*)
-            ev_type = event_type
-
-            # *** handle optimization: a subscription to IEvent is a
-            # subscription to all events; this is converted to 'None' so
-            # that the _registry can shortcut some of its tests
-            if event_type is IEvent:
-                ev_type = None
-            for (subscriber, subscriber_index), ev_set in ev_sets.items():
-                if ev_type in ev_set:
-                    subscriptions = clean_self._registry.get(ev_type)
-                    if subscriptions:
-                        try:
-                            subscriptions.remove((subscriber, filter))
-                        except ValueError:
-                            pass
-                        else:
-                            if subscribingaware:
-                                subscribingaware.unsubscribedFrom(
-                                    wrapped_self, event_type, filter)
-                            ev_set[ev_type] -= 1
-                            if ev_set[ev_type] < 1:
-                                for sub in subscriptions:
-                                    if sub[0] == subscriber:
-                                        break
-                                else:
-                                    if len(ev_set) > 1:
-                                        del ev_set[ev_type]
-                                    else: # len(ev_set) == 1
-                                        del clean_self._subscribers[
-                                            subscriber_index]
-                            break
+                next_service.unsubscribe(reference, event_type, filter)
             else:
-                next_service = queryNextService(wrapped_self,
-                                                clean_self._serviceName)
-                if next_service is not None:
-                    next_service.unsubscribe(originalSubscriber,
-                                             event_type,
-                                             filter)
-                else:
-                    raise NotFoundError(originalSubscriber, event_type, filter)
-        else:
-            # we have to clean all the event types out (ignoring filter)
-            clean_self._cleanAllForSubscriber(wrapped_self,
-                                              ev_sets,
-                                              subscribingaware,
-                                              subObj)
-            next_service = queryNextService(wrapped_self,
-                                            clean_self._serviceName)
-            if next_service is not None:
-                next_service.unsubscribe(originalSubscriber,
-                                         event_type,
-                                         filter)
+                raise
     unsubscribe = ContextMethod(unsubscribe)
 
-    def listSubscriptions(wrapped_self, subscriber, event_type=None):
-        clean_self = removeAllProxies(wrapped_self)
-        subscribers, clean_subObj, subObj = clean_self._getSubscribers(
-            wrapped_self, subscriber)
+    def unsubscribeAll(wrapped_self, reference, event_type=IEvent,
+                       local_only=False):
+        # unsubscribe all from here, and from the next service
+
+        # n is the number of subscriptions removed
+        n = ContextSuper(ServiceSubscribable, wrapped_self).unsubscribeAll(
+            reference, event_type)
+        if not local_only:
+            next_service = queryNextService(wrapped_self,
+                                            wrapped_self._serviceName)
+            if next_service is not None:
+                n += next_service.unsubscribeAll(reference, event_type)
+        return n
+    unsubscribeAll = ContextMethod(unsubscribeAll)
+
+    def resubscribeByHubId(wrapped_self, reference):
+        n = ContextSuper(ServiceSubscribable, wrapped_self
+            ).resubscribeByHubId(reference)
+        next_service = queryNextService(wrapped_self,
+                                        wrapped_self._serviceName)
+        if next_service is not None:
+            n += next_service.resubscribeByHubId(reference)
+        return n
 
-        result=[]
-        if event_type:
-            ev_type = event_type
-            if event_type is IEvent:
-                ev_type = None # handle optimization
-            subscriptions = clean_self._registry.get(ev_type)
-            if subscriptions:
-                for sub in subscriptions:
-                    for subscriber in subscribers:
-                        if sub[0] == subscriber:
-                            result.append((event_type, sub[1]))
-        else:
-            try:
-                ev_sets = clean_self._getEventSets(subscribers)
-            except NotFoundError:
-                return result
-            for (subscriber, subscriber_index), ev_set in ev_sets.items():
-                for ev_type in ev_set:
-                    subscriptions = clean_self._registry.get(ev_type)
-                    if subscriptions:
-                        if ev_type is None:
-                            ev_type = IEvent
-                        for sub in subscriptions:
-                            if sub[0] == subscriber:
-                                result.append((ev_type, sub[1]))
-        next_service = queryNextService(wrapped_self, clean_self._serviceName)
+    def resubscribeByPath(wrapped_self, reference):
+        n = ContextSuper(ServiceSubscribable, wrapped_self
+            ).resubscribeByPath(reference)
+        next_service = queryNextService(wrapped_self,
+                                        wrapped_self._serviceName)
         if next_service is not None:
-            result.extend(
-                    next_service.listSubscriptions(subscriber, event_type)
-                    )
-        return result
-    listSubscriptions = ContextMethod(listSubscriptions)
+            n += next_service.resubscribeByPath(reference)
+        return n
 
+    def iterSubscriptions(wrapped_self, reference=None, event_type=IEvent,
+                          local_only=False):
+        'See ISubscriptionService'
+        subs = ContextSuper(ServiceSubscribable, wrapped_self
+                ).iterSubscriptions(reference, event_type)
+        for subscription in subs:
+            yield subscription
+
+        if not local_only:
+            next_service = queryNextService(wrapped_self,
+                                            wrapped_self._serviceName)
+            if next_service is not None:
+                for subscription in next_service.iterSubscriptions(
+                    reference, event_type):
+                    yield subscription
+    iterSubscriptions = ContextMethod(iterSubscriptions)
 
 
 class EventService(ServiceSubscriberEventChannel, ServiceSubscribable):
 
     __implements__ = (
         IEventService,
+        ISubscriptionService,
         ServiceSubscribable.__implements__,
         ServiceSubscriberEventChannel.__implements__
         )
@@ -333,7 +317,7 @@
 
         publishedEvents = getattr(clean_self, "_v_publishedEvents", None)
         if publishedEvents is None:
-            publishedEvents = clean_self._v_publishedEvents=[event]
+            publishedEvents = clean_self._v_publishedEvents = [event]
         else:
             publishedEvents.append(event)
         if (clean_self.isPromotableEvent(event)):
@@ -351,6 +335,7 @@
 
     def bound(wrapped_self, name):
         "See IBindingAware"
+        ContextSuper(EventService, wrapped_self).bound(name)
         if name == "Subscription":
             clean_self = removeAllProxies(wrapped_self)
             clean_self._serviceName = name # for LocalServiceSubscribable
@@ -363,13 +348,14 @@
                     es.subscribe(wrapped_self)
     bound = ContextMethod(bound)
 
-    # _unbound = ServiceSubscriberEventChannel.unbound # see comment below
-
     def unbound(wrapped_self, name):
         "See IBindingAware"
         if name == "Subscription":
             clean_self = removeAllProxies(wrapped_self)
             clean_self._v_unbinding = True
+            try:
+                ContextSuper(EventService, wrapped_self).unbound(name)
+
             # this flag is used by the unsubscribedFrom method (below) to
             # determine that it doesn't need to further unsubscribe beyond
             # what we're already doing.
@@ -385,36 +371,16 @@
             # ServiceSubscriberEventChannel:
             # 
             # start copy/paste
-            getPath = getAdapter(wrapped_self, ITraverser).traverse
-            for subscription in clean_self._subscriptions:
-                subscribable = getPath(subscription[0])
-                subscribable.unsubscribe(wrapped_self)
-            clean_self._subscriptions = ()
-            clean_self._serviceName = None
             # end copy/paste
-
-            for subscriber in clean_self._subscribers:
-                clean_self.__unsubscribeAllFromSelf(
-                        wrapped_self, subscriber[0])
-            # unset flag
-            clean_self._v_unbinding = None
+            finally:
+                # unset flag
+                del clean_self._v_unbinding
     unbound = ContextMethod(unbound)
 
-    def __unsubscribeAllFromSelf(clean_self, wrapped_self, subscriber):
-        subscribers, clean_subObj, subObj = clean_self._getSubscribers(
-            wrapped_self, subscriber)
-        ev_sets = clean_self._getEventSets(subscribers)
-        # XXX need to check if subObj is not None?
-        subscribingaware = queryAdapter(subObj, ISubscribingAware)
-
-        clean_self._p_changed = 1  # trigger persistence before change
-        clean_self._cleanAllForSubscriber(wrapped_self,
-                                          ev_sets,
-                                          subscribingaware,
-                                          subObj)
-
     def unsubscribedFrom(wrapped_self, subscribable, event_type, filter):
         "See ISubscribingAware"
+        ContextSuper(EventService, wrapped_self).unsubscribedFrom(
+            subscribable, event_type, filter)
         clean_self = removeAllProxies(wrapped_self)
         if getattr(clean_self, "_v_unbinding", None) is None:
             # we presumably have been unsubscribed from a higher-level
@@ -422,8 +388,6 @@
             # itself: we need to remove the higher level event service
             # from our subscriptions list and try to find another event
             # service to which to attach
-            ServiceSubscriberEventChannel.unsubscribedFrom(
-                clean_self, subscribable, event_type, filter)
             clean_subscribable = removeAllProxies(subscribable)
             if ISubscriptionService.isImplementedBy(
                 removeAllProxies(clean_subscribable)):