[Checkins] SVN: zc.async/branches/dev/src/zc/async/ another checkpoint; works now as part of a Zope 3 app.

Gary Poster gary at zope.com
Thu Apr 3 23:21:35 EDT 2008


Log message for revision 85091:
  another checkpoint; works now as part of a Zope 3 app.

Changed:
  U   zc.async/branches/dev/src/zc/async/README_2.txt
  U   zc.async/branches/dev/src/zc/async/TODO.txt
  U   zc.async/branches/dev/src/zc/async/agent.py
  A   zc.async/branches/dev/src/zc/async/basic_dispatcher_policy.zcml
  A   zc.async/branches/dev/src/zc/async/configure.zcml
  U   zc.async/branches/dev/src/zc/async/dispatcher.py
  A   zc.async/branches/dev/src/zc/async/dispatcher.zcml
  U   zc.async/branches/dev/src/zc/async/instanceuuid.py
  U   zc.async/branches/dev/src/zc/async/interfaces.py
  U   zc.async/branches/dev/src/zc/async/queue.py
  U   zc.async/branches/dev/src/zc/async/subscribers.py
  U   zc.async/branches/dev/src/zc/async/tests.py
  U   zc.async/branches/dev/src/zc/async/z3tests.py

-=-
Modified: zc.async/branches/dev/src/zc/async/README_2.txt
===================================================================
--- zc.async/branches/dev/src/zc/async/README_2.txt	2008-04-03 22:48:19 UTC (rev 85090)
+++ zc.async/branches/dev/src/zc/async/README_2.txt	2008-04-04 03:21:17 UTC (rev 85091)
@@ -75,12 +75,13 @@
 
 The UUID we register here is a UUID of the instance, which is expected
 to uniquely identify the process when in production. It is stored in
-INSTANCE_HOME/etc/uuid.txt.
+the file specified by the ZC_ASYNC_UUID environment variable (or in
+``os.join(os.getcwd(), 'uuid.txt')`` if this is not specified, for easy
+experimentation.
 
     >>> import uuid
     >>> import os
-    >>> f = open(os.path.join(
-    ...     os.environ.get("INSTANCE_HOME"), 'etc', 'uuid.txt'))
+    >>> f = open(os.environ["ZC_ASYNC_UUID"])
     >>> uuid_hex = f.readline().strip()
     >>> f.close()
     >>> uuid = uuid.UUID(uuid_hex)

Modified: zc.async/branches/dev/src/zc/async/TODO.txt
===================================================================
--- zc.async/branches/dev/src/zc/async/TODO.txt	2008-04-03 22:48:19 UTC (rev 85090)
+++ zc.async/branches/dev/src/zc/async/TODO.txt	2008-04-04 03:21:17 UTC (rev 85091)
@@ -1,10 +1,10 @@
-- Make a test showing the queues in another database.
-- Write the z3monitor code.
+
+- Write the z3monitor tests.
 - Write a stress test.
-- Make the README an introductory text, and move the current one to the side.
+- Finish the README_2 docs.
+- Write a way to use alone, with a separate zdaemon script.
 - Write a way to use with Zope 3 (in particular, something that waits for the
- database opened event that Zope 3 fires).  *Separate* from main code, with a
-  separate test runner and a separate list of eggs.
+ database opened event that Zope 3 fires).
 - remove subscribers.py, or make it part of the Zope 3 bit above.
 - write basic zcml (necessary adapters) and full (more policy) zcml
 
@@ -14,4 +14,5 @@
   section of the README).
 - Write a Zope 3 request/context munger that sets security context and site
   based on current values.
-- Maybe become friendly to Medusa.
+- queues should be pluggable like agent with filter
+- show how to broadcast, maybe add conveniences

Modified: zc.async/branches/dev/src/zc/async/agent.py
===================================================================
--- zc.async/branches/dev/src/zc/async/agent.py	2008-04-03 22:48:19 UTC (rev 85090)
+++ zc.async/branches/dev/src/zc/async/agent.py	2008-04-04 03:21:17 UTC (rev 85091)
@@ -16,7 +16,9 @@
 
     zope.interface.implements(zc.async.interfaces.IAgent)
 
-    def __init__(self, chooser=chooseFirst, size=3):
+    def __init__(self, chooser=None, size=3):
+        if chooser is None:
+            chooser = chooseFirst
         self.chooser = chooser
         self.size = size
         self._data = zc.queue.PersistentQueue()

Added: zc.async/branches/dev/src/zc/async/basic_dispatcher_policy.zcml
===================================================================
--- zc.async/branches/dev/src/zc/async/basic_dispatcher_policy.zcml	                        (rev 0)
+++ zc.async/branches/dev/src/zc/async/basic_dispatcher_policy.zcml	2008-04-04 03:21:17 UTC (rev 85091)
@@ -0,0 +1,8 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<configure xmlns="http://namespaces.zope.org/zope">
+    <include file="dispatcher.zcml" />
+    <subscriber handler=".subscribers.queue_installer" />
+    <subscriber handler=".subscribers.installThreadedDispatcher" />
+    <subscriber handler=".subscribers.agent_installer" />
+    <adapter factory="zc.async.queue.getDefaultQueue" />
+</configure>

Added: zc.async/branches/dev/src/zc/async/configure.zcml
===================================================================
--- zc.async/branches/dev/src/zc/async/configure.zcml	                        (rev 0)
+++ zc.async/branches/dev/src/zc/async/configure.zcml	2008-04-04 03:21:17 UTC (rev 85091)
@@ -0,0 +1,19 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<configure xmlns="http://namespaces.zope.org/zope">
+    <utility component=".instanceuuid.UUID" />
+    <adapter factory="zc.twist.transactionManager" />
+    <adapter factory="zc.twist.transactionManager"
+             for="ZODB.interfaces.IConnection" />
+    <!-- this is usually handled in Zope applications by the
+         zope.app.keyreference.persistent.connectionOfPersistent adapter
+    <adapter factory="zc.twist.connection" /> -->
+    <adapter factory="zc.async.job.Job"
+             for="types.FunctionType"
+             provides="zc.async.interfaces.IJob" />
+    <adapter factory="zc.async.job.Job"
+             for="types.MethodType"
+             provides="zc.async.interfaces.IJob" />
+    <adapter factory="zc.async.job.Job"
+             for="zc.twist.METHOD_WRAPPER_TYPE"
+             provides="zc.async.interfaces.IJob" />
+</configure>

Modified: zc.async/branches/dev/src/zc/async/dispatcher.py
===================================================================
--- zc.async/branches/dev/src/zc/async/dispatcher.py	2008-04-03 22:48:19 UTC (rev 85090)
+++ zc.async/branches/dev/src/zc/async/dispatcher.py	2008-04-04 03:21:17 UTC (rev 85091)
@@ -193,12 +193,12 @@
 
 _dispatchers = {}
 
-def get(uuid, default=None):
+def get(uuid=None, default=None):
     if uuid is None:
         uuid = zope.component.getUtility(zc.async.interfaces.IUUID)
     return _dispatchers.get(uuid, default)
 
-def pop(uuid):
+def pop(uuid=None):
     if uuid is None:
         uuid = zope.component.getUtility(zc.async.interfaces.IUUID)
     return _dispatchers.pop(uuid)
@@ -304,8 +304,12 @@
                             da.deactivate()
                         else:
                             zc.async.utils.log.error(
-                                'UUID %s already activated in queue %s (oid %s): '
-                                'another process?',
+                                'UUID %s already activated in queue %s '
+                                '(oid %s): another process?  To stop '
+                                'poll attempts in this process, set '
+                                '``zc.async.dispatcher.get().activated = '
+                                "False``.  To stop polls permanently, don't "
+                                'start a zc.async.dispatcher!',
                                 self.UUID, queue.name, queue._p_oid)
                             continue
                     da.activate()

Added: zc.async/branches/dev/src/zc/async/dispatcher.zcml
===================================================================
--- zc.async/branches/dev/src/zc/async/dispatcher.zcml	                        (rev 0)
+++ zc.async/branches/dev/src/zc/async/dispatcher.zcml	2008-04-04 03:21:17 UTC (rev 85091)
@@ -0,0 +1,8 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<configure xmlns="http://namespaces.zope.org/zope">
+    <include file="configure.zcml" />
+    <utility component=".monitor.async"
+             provides="zc.z3monitor.interfaces.IZ3MonitorPlugin" name="async" />
+    <!-- maybe could divide up queue_installer so queues collection is added
+         here? -->
+</configure>

Modified: zc.async/branches/dev/src/zc/async/instanceuuid.py
===================================================================
--- zc.async/branches/dev/src/zc/async/instanceuuid.py	2008-04-03 22:48:19 UTC (rev 85090)
+++ zc.async/branches/dev/src/zc/async/instanceuuid.py	2008-04-04 03:21:17 UTC (rev 85091)
@@ -10,10 +10,18 @@
 ------------------------------------------------------------------------
 The value above (and this file) is created and used by the zc.async
 package. It is intended to uniquely identify this software instance when
-it is used to start a zc.async worker process.  This allows multiple
-workers to connect to a single database to do work.  The software
-expects an instance home to only generate a single process.
+it is used to start a zc.async dispatcher.  This allows multiple
+dispatchers, each in its own software instance, to connect to a single
+database to do work.
 
+In order to decide where to look for this file (or to create it, if
+necessary), the module looks in ``os.environ['ZC_ASYNC_UUID']`` for a file
+name.  If you are using Zope 3, you can set this in a zdaemon environment
+section of your zdaemon.conf.
+
+If the ``ZC_ASYNC_UUID`` is not found in the environment, it will use
+``os.path.join(os.getgwd(), 'uuid.txt')`` as the file name.
+
 To get a new identifier for this software instance, delete this file,
 restart Python, and import zc.async.instanceuuid.  This file will be
 recreated with a new value.
@@ -21,9 +29,12 @@
 
 zope.interface.classImplements(uuid.UUID, zc.async.interfaces.IUUID)
 
+key = 'ZC_ASYNC_UUID'
+
 def getUUID():
-    file_name = os.path.join(
-        os.environ["INSTANCE_HOME"], 'etc', 'uuid.txt')
+    file_name = os.environ.get(key)
+    if not file_name:
+        file_name = os.path.join(os.getcwd(), 'uuid.txt')
     if os.path.exists(file_name):
         f = open(file_name, 'r')
         UUID = uuid.UUID(f.readline().strip())

Modified: zc.async/branches/dev/src/zc/async/interfaces.py
===================================================================
--- zc.async/branches/dev/src/zc/async/interfaces.py	2008-04-03 22:48:19 UTC (rev 85090)
+++ zc.async/branches/dev/src/zc/async/interfaces.py	2008-04-04 03:21:17 UTC (rev 85091)
@@ -244,6 +244,7 @@
     def index(item):
         """return index, or raise ValueError if item is not in queue"""
 
+
 class IQueue(zc.queue.interfaces.IQueue):
 
     parent = zope.interface.Attribute(

Modified: zc.async/branches/dev/src/zc/async/queue.py
===================================================================
--- zc.async/branches/dev/src/zc/async/queue.py	2008-04-03 22:48:19 UTC (rev 85090)
+++ zc.async/branches/dev/src/zc/async/queue.py	2008-04-04 03:21:17 UTC (rev 85091)
@@ -213,7 +213,7 @@
         super(Quotas, self).pop(name)
 
 
-class Queue(persistent.Persistent):
+class Queue(zc.async.utils.Base):
     zope.interface.implements(zc.async.interfaces.IQueue)
 
     def __init__(self):

Modified: zc.async/branches/dev/src/zc/async/subscribers.py
===================================================================
--- zc.async/branches/dev/src/zc/async/subscribers.py	2008-04-03 22:48:19 UTC (rev 85090)
+++ zc.async/branches/dev/src/zc/async/subscribers.py	2008-04-04 03:21:17 UTC (rev 85091)
@@ -1,29 +1,28 @@
-import os
+import threading
+import time
+import signal
 import transaction
-import transaction.interfaces
-import ZODB.interfaces
-import twisted.internet.reactor
+import twisted.internet.selectreactor
 import zope.component
-import zope.event
 import zope.app.appsetup.interfaces
 import zc.twist
 
-import zc.async.datamanager
 import zc.async.interfaces
-import zc.async.engine
+import zc.async.queue
+import zc.async.agent
+import zc.async.dispatcher
+import zc.async.utils
 
-NAME = 'zc.async.datamanager'
+class QueueInstaller(object):
 
-class InstallerAndNotifier(object):
-
-    def __init__(self, name=NAME,
-                 factory=lambda *args: zc.async.datamanager.DataManager(),
-                 get_folder=lambda r: r):
+    def __init__(self, queues=('',),
+                 factory=lambda *args: zc.async.queue.Queue(),
+                 db_name=None):
         zope.component.adapter(
             zope.app.appsetup.interfaces.IDatabaseOpenedEvent)(self)
-        self.name = name
+        self.db_name = db_name
         self.factory = factory
-        self.get_folder = get_folder
+        self.queues = queues
 
     def __call__(self, ev):
         db = ev.database
@@ -33,62 +32,81 @@
         try:
             try:
                 root = conn.root()
-                folder = self.get_folder(root)
-                tm.commit()
-                if self.name not in folder:
-                    folder[self.name] = self.factory(conn, folder)
-                    if folder[self.name]._p_jar is None:
-                        conn.add(folder[self.name])
-                elif not zc.async.interfaces.IDataManager.providedBy(
-                    folder[self.name]):
-                    raise RuntimeError(
-                        'IDataManager not found') # TODO better error
-                zope.event.notify(
-                    zc.async.interfaces.DataManagerAvailable(folder[self.name]))
-                tm.commit()
+                if zc.async.interfaces.KEY not in root:
+                    if self.db_name is not None:
+                        other = conn.get_connection(self.db_name)
+                        queues = other.root()[
+                            zc.async.interfaces.KEY] = zc.async.queue.Queues()
+                        other.add(queues)
+                    else:
+                        queues = zc.async.queue.Queues()
+                    root[zc.async.interfaces.KEY] = queues
+                    tm.commit()
+                    zc.async.utils.log.info('queues collection added')
+                else:
+                    queues = root[zc.async.interfaces.KEY]
+                for queue_name in self.queues:
+                    if queue_name not in queues:
+                        queues[queue_name] = self.factory(conn, queue_name)
+                        tm.commit()
+                        zc.async.utils.log.info('queue %r added', queue_name)
             except:
                 tm.abort()
                 raise
         finally:
             conn.close()
 
-basicInstallerAndNotifier = InstallerAndNotifier()
+queue_installer = QueueInstaller()
 
-class SeparateDBCreation(object):
-    def __init__(self, db_name='zc.async', name=NAME,
-                 factory=zc.async.datamanager.DataManager,
-                 get_folder=lambda r:r):
-        self.db_name = db_name
-        self.name = name
-        self.factory = factory
-        self.get_folder = get_folder
+ at zope.component.adapter(zope.app.appsetup.interfaces.IDatabaseOpenedEvent)
+def installThreadedDispatcher(ev):
+    reactor = twisted.internet.selectreactor.SelectReactor()
+    # reactor._handleSignals()
+    curr_sigint_handler = signal.getsignal(signal.SIGINT)
+    def sigint_handler(*args):
+        reactor.callFromThread(reactor.stop)
+        time.sleep(0.5) # bah, a guess, but Works For Me (So Far)
+        curr_sigint_handler(*args)
+    def handler(*args):
+        reactor.callFromThread(reactor.stop)
+    signal.signal(signal.SIGINT, sigint_handler)
+    signal.signal(signal.SIGTERM, handler)
+    # Catch Ctrl-Break in windows
+    if getattr(signal, "SIGBREAK", None) is not None:
+        signal.signal(signal.SIGBREAK, handler)
+    dispatcher = zc.async.dispatcher.Dispatcher(ev.database, reactor)
+    def start():
+        dispatcher.activate()
+        reactor.run(installSignalHandlers=0)
+    thread = threading.Thread(target=start)
+    thread.setDaemon(True)
+    thread.start()
 
-    def __call__(self, conn, folder):
-        conn2 = conn.get_connection(self.db_name)
-        tm = transaction.interfaces.ITransactionManager(conn)
-        root = conn2.root()
-        folder = self.get_folder(root)
-        tm.commit()
-        if self.name in folder:
-            raise ValueError('data manager already exists in separate database',
-                             self.db_name, folder, self.name)
-        dm = folder[self.name] = self.factory()
-        conn2.add(dm)
-        tm.commit()
-        return dm
+class AgentInstaller(object):
 
-installerAndNotifier = InstallerAndNotifier(factory=SeparateDBCreation())
+    def __init__(self, agent_name='', chooser=None, size=3, queue_names=None):
+        zope.component.adapter(
+            zc.async.interfaces.IDispatcherActivated)(self)
+        self.queue_names = queue_names
+        self.agent_name = agent_name
+        self.chooser = chooser
+        self.size = size
 
- at zope.component.adapter(zc.async.interfaces.IDataManagerAvailableEvent)
-def installTwistedEngine(ev):
-    engine = zc.async.engine.Engine(
-        zope.component.getUtility(
-            zc.async.interfaces.IUUID, 'instance'),
-        zc.async.datamanager.Worker)
-    dm = ev.object
-    twisted.internet.reactor.callLater(
-        0,
-        zc.twist.Partial(engine.poll, dm))
-    twisted.internet.reactor.addSystemEventTrigger(
-        'before', 'shutdown', zc.twist.Partial(
-            engine.tearDown, dm))
+    def __call__(self, ev):
+        dispatcher = ev.object
+        if (self.queue_names is None or
+            dispatcher.parent.name in self.queue_names):
+            if self.agent_name not in dispatcher:
+                dispatcher[self.agent_name] = zc.async.agent.Agent(
+                    chooser=self.chooser, size=self.size)
+                zc.async.utils.log.info(
+                    'agent %r added to queue %r',
+                    self.agent_name,
+                    dispatcher.parent.name)
+            else:
+                zc.async.utils.log.info(
+                    'agent %r already in queue %r',
+                    self.agent_name,
+                    dispatcher.parent.name)
+
+agent_installer = AgentInstaller('main')
\ No newline at end of file

Modified: zc.async/branches/dev/src/zc/async/tests.py
===================================================================
--- zc.async/branches/dev/src/zc/async/tests.py	2008-04-03 22:48:19 UTC (rev 85090)
+++ zc.async/branches/dev/src/zc/async/tests.py	2008-04-04 03:21:17 UTC (rev 85091)
@@ -1,5 +1,4 @@
 import os
-import shutil
 import unittest
 
 from zope.testing import doctest, module
@@ -11,23 +10,15 @@
 
 def uuidSetUp(test):
     import zc.async.interfaces
-    test.globs['old_instance_home'] = os.environ.get("INSTANCE_HOME")
-    os.environ['INSTANCE_HOME'] = os.path.join(os.path.dirname(
-        zc.async.interfaces.__file__), '_test_tmp')
-    os.mkdir(os.environ['INSTANCE_HOME'])
-    os.mkdir(os.path.join(os.environ['INSTANCE_HOME'], 'etc'))
+    os.environ['ZC_ASYNC_UUID'] = os.path.join(os.path.dirname(
+        zc.async.interfaces.__file__), 'uuid.txt')
     import zc.async.instanceuuid
     uuid = zc.async.instanceuuid.getUUID()
     if uuid != zc.async.instanceuuid.UUID: # test run changed it...
         zc.async.instanceuuid.UUID = uuid
 
 def uuidTearDown(test):
-    shutil.rmtree(os.environ['INSTANCE_HOME'])
-    if test.globs['old_instance_home'] is None:
-        del os.environ['INSTANCE_HOME']
-    else:
-        os.environ['INSTANCE_HOME'] = test.globs['old_instance_home']
-    del test.globs['old_instance_home']
+    os.remove(os.environ['ZC_ASYNC_UUID'])
 
 def modSetUp(test):
     uuidSetUp(test)

Modified: zc.async/branches/dev/src/zc/async/z3tests.py
===================================================================
--- zc.async/branches/dev/src/zc/async/z3tests.py	2008-04-03 22:48:19 UTC (rev 85090)
+++ zc.async/branches/dev/src/zc/async/z3tests.py	2008-04-04 03:21:17 UTC (rev 85091)
@@ -7,9 +7,9 @@
 def setUp(test):
     zc.async.tests.modSetUp(test)
     # make the uuid stable for these tests
-    f = open(os.path.join(
-        os.environ["INSTANCE_HOME"], 'etc', 'uuid.txt'), 'w')
-    f.writelines(('d10f43dc-ffdf-11dc-abd4-0017f2c49bdd',)) # ...random...
+    f = open(os.environ["ZC_ASYNC_UUID"], 'w')
+    # make this stable for test purposes
+    f.writelines(('d10f43dc-ffdf-11dc-abd4-0017f2c49bdd',))
     f.close()
     zc.async.instanceuuid.UUID = zc.async.instanceuuid.getUUID()
 



More information about the Checkins mailing list