[Checkins] SVN: zc.async/trunk/src/zc/async/subscribers. `AgentInstaller` now accepts 'filter' as a parameter as well.

Satchidanand Haridas satchit at zope.com
Mon Oct 13 15:56:03 EDT 2008


Log message for revision 92165:
  `AgentInstaller` now accepts 'filter' as a parameter as well.
   
  

Changed:
  U   zc.async/trunk/src/zc/async/subscribers.py
  U   zc.async/trunk/src/zc/async/subscribers.txt

-=-
Modified: zc.async/trunk/src/zc/async/subscribers.py
===================================================================
--- zc.async/trunk/src/zc/async/subscribers.py	2008-10-13 19:55:46 UTC (rev 92164)
+++ zc.async/trunk/src/zc/async/subscribers.py	2008-10-13 19:56:01 UTC (rev 92165)
@@ -107,7 +107,7 @@
         dispatcher.thread = thread = threading.Thread(target=start)
         thread.setDaemon(True)
         thread.start()
-    
+
         # The above is really sufficient. This signal registration, below, is
         # an optimization. The dispatcher, on its next run, will eventually
         # figure out that it is looking at a previous incarnation of itself if
@@ -120,11 +120,11 @@
             reactor.callFromThread(reactor.stop)
             thread.join(3)
             curr_sigint_handler(*args)
-    
+
         def handler(*args):
             reactor.callFromThread(reactor.stop)
             raise SystemExit()
-    
+
         signal.signal(signal.SIGINT, sigint_handler)
         signal.signal(signal.SIGTERM, handler)
         # Catch Ctrl-Break in windows
@@ -145,17 +145,20 @@
         dispatcher = zc.async.dispatcher.Dispatcher(
             ev.database, poll_interval=self.poll_interval)
         dispatcher.activate(threaded=True)
-    
+
 twisted_dispatcher_installer = TwistedDispatcherInstaller()
 
 class AgentInstaller(object):
 
-    def __init__(self, agent_name, chooser=None, size=3, queue_names=None):
+    def __init__(self, agent_name, chooser=None, size=3, queue_names=None, filter=None):
         zope.component.adapter(
             zc.async.interfaces.IDispatcherActivated)(self)
         self.queue_names = queue_names
         self.agent_name = agent_name
+        if filter is not None and chooser is not None:
+            raise ValueError('cannot set both chooser and filter to non-None')
         self.chooser = chooser
+        self.filter = filter
         self.size = size
 
     def __call__(self, ev):
@@ -164,7 +167,7 @@
             dispatcher.parent.name in self.queue_names):
             if self.agent_name not in dispatcher:
                 dispatcher[self.agent_name] = zc.async.agent.Agent(
-                    chooser=self.chooser, size=self.size)
+                    chooser=self.chooser, filter=self.filter, size=self.size)
                 zc.async.utils.log.info(
                     'agent %r added to queue %r',
                     self.agent_name,

Modified: zc.async/trunk/src/zc/async/subscribers.txt
===================================================================
--- zc.async/trunk/src/zc/async/subscribers.txt	2008-10-13 19:55:46 UTC (rev 92164)
+++ zc.async/trunk/src/zc/async/subscribers.txt	2008-10-13 19:56:01 UTC (rev 92165)
@@ -82,13 +82,13 @@
 the database generally opens once, but for the purposes of our example we will
 fire it again in a moment.
 
-While we're at it, we'll use the other handler: ``AgentInstaller``.  This
-class generates a subscriber that installs agents in the queues it finds when
-dispatcher agent activation events fire.  You must specify an agent name to
-use; and can specify a chooser (a way to choose the tasks this agent should
-perform), a size (the number of concurrent jobs this agent should hand out),
-and specific queue names in which the agent should be installed, defaulting to
-None, or all queues.
+While we're at it, we'll use the other handler: ``AgentInstaller``.  This class
+generates a subscriber that installs agents in the queues it finds when
+dispatcher agent activation events fire.  You must specify an agent name to use;
+and can specify a chooser (a way to choose the tasks this agent should perform),
+a filter (a recent and a better alternative to defining a chooser), a size (the
+number of concurrent jobs this agent should hand out), and specific queue names
+in which the agent should be installed, defaulting to None, or all queues.
 
 The agent_installer installs an agent named 'main' for the active dispatcher
 in all queues, with a default FIFO chooser.
@@ -102,6 +102,8 @@
     None
     >>> print zc.async.subscribers.agent_installer.chooser
     None
+    >>> print zc.async.subscribers.agent_installer.filter
+    None
     >>> zc.async.subscribers.agent_installer.size
     3
 
@@ -118,6 +120,7 @@
     ...     zc.async.subscribers.agent_installer)
     >>> zope.event.notify(zc.async.interfaces.DatabaseOpened(db))
 
+
 Now if we look in the database, we'll find a queues collection in another
 database, with a queue, with a dispatcher, with an agent.
 
@@ -146,6 +149,10 @@
     >>> bool(da.activated)
     True
 
+
+We discuss a few other use-cases for the AgentInstaller in a separate footnote
+[#filters_and_choosers]_.
+
 When an IQueues or IQueue is installed, an event is fired that provides the
 object being added, the container it is added to, and the name under which it
 is added.  Therefore, two ObjectAdded events have fired now, one for a queues
@@ -179,6 +186,10 @@
     <class 'zc.async.interfaces.ObjectAdded'>
     <class 'zc.async.interfaces.DispatcherRegistered'>
     <class 'zc.async.interfaces.DispatcherActivated'>
+    <class 'zope.component.interfaces.Unregistered'>
+    <class 'zc.async.interfaces.DispatcherActivated'>
+    <class 'zope.component.interfaces.Unregistered'>
+    <class 'zc.async.interfaces.DispatcherActivated'>
 
 
 Finally, we mentioned at the start that the threaded dispatcher installer also
@@ -223,11 +234,11 @@
     >>> import ZODB.FileStorage
     >>> storage = ZODB.FileStorage.FileStorage(
     ...     'main.fs', create=True)
-    
+
     >>> async_storage = ZODB.FileStorage.FileStorage(
     ...     'async.fs', create=True)
 
-    >>> from ZODB.DB import DB 
+    >>> from ZODB.DB import DB
     >>> databases[''] = db = DB(storage)
     >>> databases['async'] = async_db = DB(async_storage)
     >>> async_db.databases = db.databases = databases
@@ -236,3 +247,110 @@
 
     >>> import zc.async.configure
     >>> zc.async.configure.base()
+
+
+.. [#filters_and_choosers] AgentInstaller also takes a number of other
+    parameters such as 'chooser' and 'filter' that sets up the agent to handle
+    only certain kinds of jobs. Both of these parameters expect a callable that
+    will be used to select jobs that the agent will handle. Note that in the
+    following we do not demonstrate the functionality of filters or
+    choosers. That is discussed in detail in agent.txt. But we do look at the
+    functionality of the installer itself.
+
+    We first create an agent that will only accept jobs if the callable is a
+    method named 'mock_work'.
+
+    >>> def mock_work_filter(job):
+    ...     return job.callable.__name__ == 'mock_work'
+    >>> filtering_agent_installer = zc.async.subscribers.AgentInstaller(
+    ...     'filtering_agent', filter=mock_work_filter)
+
+
+    >>> isinstance(filtering_agent_installer,
+    ...            zc.async.subscribers.AgentInstaller)
+    True
+    >>> filtering_agent_installer.agent_name
+    'filtering_agent'
+    >>> print filtering_agent_installer.queue_names
+    None
+    >>> print filtering_agent_installer.chooser
+    None
+    >>> filtering_agent_installer.filter.__name__
+    'mock_work_filter'
+    >>> filtering_agent_installer.size
+    3
+
+    We now register the new agent-installer, unregistering the earlier one at
+    the same time.
+
+    >>> zope.component.getGlobalSiteManager().unregisterHandler(
+    ...     zc.async.subscribers.agent_installer)
+    True
+    >>> zope.component.provideHandler(filtering_agent_installer)
+
+    Since the queue has already been installed and we want our new agent
+    installer to run, we fire a DispatcherActivated event, which is the event
+    that our installer subscribes to.
+
+    >>> zope.event.notify(zc.async.interfaces.DispatcherActivated(da))
+
+    Let's check the new agent is setup correctly:
+
+    >>> 'filtering_agent' in da
+    True
+    >>> da['filtering_agent'].filter.__name__
+    'mock_work_filter'
+    >>> print da['filtering_agent'].chooser
+    None
+
+
+    A chooser can be used instead of a filter, but using the latter is preferred
+    as the zv.async monitoring code provides hooks for filters (and not
+    choosers). We now create an agent that uses a chooser instead of a filter.
+
+    >>> def mock_work_chooser(agent):
+    ...     return agent.queue.claim(lambda j: j.callable == mock_work)
+    ...
+
+    >>> choosing_agent_installer = zc.async.subscribers.AgentInstaller(
+    ...     'choosing_agent', chooser=mock_work_chooser)
+
+
+    >>> isinstance(choosing_agent_installer,
+    ...            zc.async.subscribers.AgentInstaller)
+    True
+    >>> choosing_agent_installer.agent_name
+    'choosing_agent'
+    >>> print choosing_agent_installer.queue_names
+    None
+    >>> print choosing_agent_installer.filter
+    None
+    >>> choosing_agent_installer.chooser.__name__
+    'mock_work_chooser'
+    >>> choosing_agent_installer.size
+    3
+
+    Like before, we register the new handler and check the agents are setup
+    correctly.
+
+    >>> zope.component.getGlobalSiteManager().unregisterHandler(
+    ...     filtering_agent_installer)
+    True
+    >>> zope.component.provideHandler(choosing_agent_installer)
+    >>> zope.event.notify(zc.async.interfaces.DispatcherActivated(da))
+
+    >>> 'choosing_agent' in da
+    True
+    >>> da['choosing_agent'].chooser.__name__
+    'mock_work_chooser'
+    >>> print da['choosing_agent'].filter
+    None
+
+    A chooser and a filter cannot be both provided the the same time.
+
+    >>> corrupt_installer = zc.async.subscribers.AgentInstaller(
+    ...     'filtering_agent', chooser=mock_work_chooser,
+    ...     filter=mock_work_filter)
+    Traceback (most recent call last):
+    ...
+    ValueError: cannot set both chooser and filter to non-None



More information about the Checkins mailing list