[Checkins] SVN: zc.async/branches/dev/src/zc/async/ another
checkpoint; works now as part of a Zope 3 app.
Gary Poster
gary at zope.com
Thu Apr 3 23:21:35 EDT 2008
Log message for revision 85091:
another checkpoint; works now as part of a Zope 3 app.
Changed:
U zc.async/branches/dev/src/zc/async/README_2.txt
U zc.async/branches/dev/src/zc/async/TODO.txt
U zc.async/branches/dev/src/zc/async/agent.py
A zc.async/branches/dev/src/zc/async/basic_dispatcher_policy.zcml
A zc.async/branches/dev/src/zc/async/configure.zcml
U zc.async/branches/dev/src/zc/async/dispatcher.py
A zc.async/branches/dev/src/zc/async/dispatcher.zcml
U zc.async/branches/dev/src/zc/async/instanceuuid.py
U zc.async/branches/dev/src/zc/async/interfaces.py
U zc.async/branches/dev/src/zc/async/queue.py
U zc.async/branches/dev/src/zc/async/subscribers.py
U zc.async/branches/dev/src/zc/async/tests.py
U zc.async/branches/dev/src/zc/async/z3tests.py
-=-
Modified: zc.async/branches/dev/src/zc/async/README_2.txt
===================================================================
--- zc.async/branches/dev/src/zc/async/README_2.txt 2008-04-03 22:48:19 UTC (rev 85090)
+++ zc.async/branches/dev/src/zc/async/README_2.txt 2008-04-04 03:21:17 UTC (rev 85091)
@@ -75,12 +75,13 @@
The UUID we register here is a UUID of the instance, which is expected
to uniquely identify the process when in production. It is stored in
-INSTANCE_HOME/etc/uuid.txt.
+the file specified by the ZC_ASYNC_UUID environment variable (or in
+``os.join(os.getcwd(), 'uuid.txt')`` if this is not specified, for easy
+experimentation.
>>> import uuid
>>> import os
- >>> f = open(os.path.join(
- ... os.environ.get("INSTANCE_HOME"), 'etc', 'uuid.txt'))
+ >>> f = open(os.environ["ZC_ASYNC_UUID"])
>>> uuid_hex = f.readline().strip()
>>> f.close()
>>> uuid = uuid.UUID(uuid_hex)
Modified: zc.async/branches/dev/src/zc/async/TODO.txt
===================================================================
--- zc.async/branches/dev/src/zc/async/TODO.txt 2008-04-03 22:48:19 UTC (rev 85090)
+++ zc.async/branches/dev/src/zc/async/TODO.txt 2008-04-04 03:21:17 UTC (rev 85091)
@@ -1,10 +1,10 @@
-- Make a test showing the queues in another database.
-- Write the z3monitor code.
+
+- Write the z3monitor tests.
- Write a stress test.
-- Make the README an introductory text, and move the current one to the side.
+- Finish the README_2 docs.
+- Write a way to use alone, with a separate zdaemon script.
- Write a way to use with Zope 3 (in particular, something that waits for the
- database opened event that Zope 3 fires). *Separate* from main code, with a
- separate test runner and a separate list of eggs.
+ database opened event that Zope 3 fires).
- remove subscribers.py, or make it part of the Zope 3 bit above.
- write basic zcml (necessary adapters) and full (more policy) zcml
@@ -14,4 +14,5 @@
section of the README).
- Write a Zope 3 request/context munger that sets security context and site
based on current values.
-- Maybe become friendly to Medusa.
+- queues should be pluggable like agent with filter
+- show how to broadcast, maybe add conveniences
Modified: zc.async/branches/dev/src/zc/async/agent.py
===================================================================
--- zc.async/branches/dev/src/zc/async/agent.py 2008-04-03 22:48:19 UTC (rev 85090)
+++ zc.async/branches/dev/src/zc/async/agent.py 2008-04-04 03:21:17 UTC (rev 85091)
@@ -16,7 +16,9 @@
zope.interface.implements(zc.async.interfaces.IAgent)
- def __init__(self, chooser=chooseFirst, size=3):
+ def __init__(self, chooser=None, size=3):
+ if chooser is None:
+ chooser = chooseFirst
self.chooser = chooser
self.size = size
self._data = zc.queue.PersistentQueue()
Added: zc.async/branches/dev/src/zc/async/basic_dispatcher_policy.zcml
===================================================================
--- zc.async/branches/dev/src/zc/async/basic_dispatcher_policy.zcml (rev 0)
+++ zc.async/branches/dev/src/zc/async/basic_dispatcher_policy.zcml 2008-04-04 03:21:17 UTC (rev 85091)
@@ -0,0 +1,8 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<configure xmlns="http://namespaces.zope.org/zope">
+ <include file="dispatcher.zcml" />
+ <subscriber handler=".subscribers.queue_installer" />
+ <subscriber handler=".subscribers.installThreadedDispatcher" />
+ <subscriber handler=".subscribers.agent_installer" />
+ <adapter factory="zc.async.queue.getDefaultQueue" />
+</configure>
Added: zc.async/branches/dev/src/zc/async/configure.zcml
===================================================================
--- zc.async/branches/dev/src/zc/async/configure.zcml (rev 0)
+++ zc.async/branches/dev/src/zc/async/configure.zcml 2008-04-04 03:21:17 UTC (rev 85091)
@@ -0,0 +1,19 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<configure xmlns="http://namespaces.zope.org/zope">
+ <utility component=".instanceuuid.UUID" />
+ <adapter factory="zc.twist.transactionManager" />
+ <adapter factory="zc.twist.transactionManager"
+ for="ZODB.interfaces.IConnection" />
+ <!-- this is usually handled in Zope applications by the
+ zope.app.keyreference.persistent.connectionOfPersistent adapter
+ <adapter factory="zc.twist.connection" /> -->
+ <adapter factory="zc.async.job.Job"
+ for="types.FunctionType"
+ provides="zc.async.interfaces.IJob" />
+ <adapter factory="zc.async.job.Job"
+ for="types.MethodType"
+ provides="zc.async.interfaces.IJob" />
+ <adapter factory="zc.async.job.Job"
+ for="zc.twist.METHOD_WRAPPER_TYPE"
+ provides="zc.async.interfaces.IJob" />
+</configure>
Modified: zc.async/branches/dev/src/zc/async/dispatcher.py
===================================================================
--- zc.async/branches/dev/src/zc/async/dispatcher.py 2008-04-03 22:48:19 UTC (rev 85090)
+++ zc.async/branches/dev/src/zc/async/dispatcher.py 2008-04-04 03:21:17 UTC (rev 85091)
@@ -193,12 +193,12 @@
_dispatchers = {}
-def get(uuid, default=None):
+def get(uuid=None, default=None):
if uuid is None:
uuid = zope.component.getUtility(zc.async.interfaces.IUUID)
return _dispatchers.get(uuid, default)
-def pop(uuid):
+def pop(uuid=None):
if uuid is None:
uuid = zope.component.getUtility(zc.async.interfaces.IUUID)
return _dispatchers.pop(uuid)
@@ -304,8 +304,12 @@
da.deactivate()
else:
zc.async.utils.log.error(
- 'UUID %s already activated in queue %s (oid %s): '
- 'another process?',
+ 'UUID %s already activated in queue %s '
+ '(oid %s): another process? To stop '
+ 'poll attempts in this process, set '
+ '``zc.async.dispatcher.get().activated = '
+ "False``. To stop polls permanently, don't "
+ 'start a zc.async.dispatcher!',
self.UUID, queue.name, queue._p_oid)
continue
da.activate()
Added: zc.async/branches/dev/src/zc/async/dispatcher.zcml
===================================================================
--- zc.async/branches/dev/src/zc/async/dispatcher.zcml (rev 0)
+++ zc.async/branches/dev/src/zc/async/dispatcher.zcml 2008-04-04 03:21:17 UTC (rev 85091)
@@ -0,0 +1,8 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<configure xmlns="http://namespaces.zope.org/zope">
+ <include file="configure.zcml" />
+ <utility component=".monitor.async"
+ provides="zc.z3monitor.interfaces.IZ3MonitorPlugin" name="async" />
+ <!-- maybe could divide up queue_installer so queues collection is added
+ here? -->
+</configure>
Modified: zc.async/branches/dev/src/zc/async/instanceuuid.py
===================================================================
--- zc.async/branches/dev/src/zc/async/instanceuuid.py 2008-04-03 22:48:19 UTC (rev 85090)
+++ zc.async/branches/dev/src/zc/async/instanceuuid.py 2008-04-04 03:21:17 UTC (rev 85091)
@@ -10,10 +10,18 @@
------------------------------------------------------------------------
The value above (and this file) is created and used by the zc.async
package. It is intended to uniquely identify this software instance when
-it is used to start a zc.async worker process. This allows multiple
-workers to connect to a single database to do work. The software
-expects an instance home to only generate a single process.
+it is used to start a zc.async dispatcher. This allows multiple
+dispatchers, each in its own software instance, to connect to a single
+database to do work.
+In order to decide where to look for this file (or to create it, if
+necessary), the module looks in ``os.environ['ZC_ASYNC_UUID']`` for a file
+name. If you are using Zope 3, you can set this in a zdaemon environment
+section of your zdaemon.conf.
+
+If the ``ZC_ASYNC_UUID`` is not found in the environment, it will use
+``os.path.join(os.getgwd(), 'uuid.txt')`` as the file name.
+
To get a new identifier for this software instance, delete this file,
restart Python, and import zc.async.instanceuuid. This file will be
recreated with a new value.
@@ -21,9 +29,12 @@
zope.interface.classImplements(uuid.UUID, zc.async.interfaces.IUUID)
+key = 'ZC_ASYNC_UUID'
+
def getUUID():
- file_name = os.path.join(
- os.environ["INSTANCE_HOME"], 'etc', 'uuid.txt')
+ file_name = os.environ.get(key)
+ if not file_name:
+ file_name = os.path.join(os.getcwd(), 'uuid.txt')
if os.path.exists(file_name):
f = open(file_name, 'r')
UUID = uuid.UUID(f.readline().strip())
Modified: zc.async/branches/dev/src/zc/async/interfaces.py
===================================================================
--- zc.async/branches/dev/src/zc/async/interfaces.py 2008-04-03 22:48:19 UTC (rev 85090)
+++ zc.async/branches/dev/src/zc/async/interfaces.py 2008-04-04 03:21:17 UTC (rev 85091)
@@ -244,6 +244,7 @@
def index(item):
"""return index, or raise ValueError if item is not in queue"""
+
class IQueue(zc.queue.interfaces.IQueue):
parent = zope.interface.Attribute(
Modified: zc.async/branches/dev/src/zc/async/queue.py
===================================================================
--- zc.async/branches/dev/src/zc/async/queue.py 2008-04-03 22:48:19 UTC (rev 85090)
+++ zc.async/branches/dev/src/zc/async/queue.py 2008-04-04 03:21:17 UTC (rev 85091)
@@ -213,7 +213,7 @@
super(Quotas, self).pop(name)
-class Queue(persistent.Persistent):
+class Queue(zc.async.utils.Base):
zope.interface.implements(zc.async.interfaces.IQueue)
def __init__(self):
Modified: zc.async/branches/dev/src/zc/async/subscribers.py
===================================================================
--- zc.async/branches/dev/src/zc/async/subscribers.py 2008-04-03 22:48:19 UTC (rev 85090)
+++ zc.async/branches/dev/src/zc/async/subscribers.py 2008-04-04 03:21:17 UTC (rev 85091)
@@ -1,29 +1,28 @@
-import os
+import threading
+import time
+import signal
import transaction
-import transaction.interfaces
-import ZODB.interfaces
-import twisted.internet.reactor
+import twisted.internet.selectreactor
import zope.component
-import zope.event
import zope.app.appsetup.interfaces
import zc.twist
-import zc.async.datamanager
import zc.async.interfaces
-import zc.async.engine
+import zc.async.queue
+import zc.async.agent
+import zc.async.dispatcher
+import zc.async.utils
-NAME = 'zc.async.datamanager'
+class QueueInstaller(object):
-class InstallerAndNotifier(object):
-
- def __init__(self, name=NAME,
- factory=lambda *args: zc.async.datamanager.DataManager(),
- get_folder=lambda r: r):
+ def __init__(self, queues=('',),
+ factory=lambda *args: zc.async.queue.Queue(),
+ db_name=None):
zope.component.adapter(
zope.app.appsetup.interfaces.IDatabaseOpenedEvent)(self)
- self.name = name
+ self.db_name = db_name
self.factory = factory
- self.get_folder = get_folder
+ self.queues = queues
def __call__(self, ev):
db = ev.database
@@ -33,62 +32,81 @@
try:
try:
root = conn.root()
- folder = self.get_folder(root)
- tm.commit()
- if self.name not in folder:
- folder[self.name] = self.factory(conn, folder)
- if folder[self.name]._p_jar is None:
- conn.add(folder[self.name])
- elif not zc.async.interfaces.IDataManager.providedBy(
- folder[self.name]):
- raise RuntimeError(
- 'IDataManager not found') # TODO better error
- zope.event.notify(
- zc.async.interfaces.DataManagerAvailable(folder[self.name]))
- tm.commit()
+ if zc.async.interfaces.KEY not in root:
+ if self.db_name is not None:
+ other = conn.get_connection(self.db_name)
+ queues = other.root()[
+ zc.async.interfaces.KEY] = zc.async.queue.Queues()
+ other.add(queues)
+ else:
+ queues = zc.async.queue.Queues()
+ root[zc.async.interfaces.KEY] = queues
+ tm.commit()
+ zc.async.utils.log.info('queues collection added')
+ else:
+ queues = root[zc.async.interfaces.KEY]
+ for queue_name in self.queues:
+ if queue_name not in queues:
+ queues[queue_name] = self.factory(conn, queue_name)
+ tm.commit()
+ zc.async.utils.log.info('queue %r added', queue_name)
except:
tm.abort()
raise
finally:
conn.close()
-basicInstallerAndNotifier = InstallerAndNotifier()
+queue_installer = QueueInstaller()
-class SeparateDBCreation(object):
- def __init__(self, db_name='zc.async', name=NAME,
- factory=zc.async.datamanager.DataManager,
- get_folder=lambda r:r):
- self.db_name = db_name
- self.name = name
- self.factory = factory
- self.get_folder = get_folder
+ at zope.component.adapter(zope.app.appsetup.interfaces.IDatabaseOpenedEvent)
+def installThreadedDispatcher(ev):
+ reactor = twisted.internet.selectreactor.SelectReactor()
+ # reactor._handleSignals()
+ curr_sigint_handler = signal.getsignal(signal.SIGINT)
+ def sigint_handler(*args):
+ reactor.callFromThread(reactor.stop)
+ time.sleep(0.5) # bah, a guess, but Works For Me (So Far)
+ curr_sigint_handler(*args)
+ def handler(*args):
+ reactor.callFromThread(reactor.stop)
+ signal.signal(signal.SIGINT, sigint_handler)
+ signal.signal(signal.SIGTERM, handler)
+ # Catch Ctrl-Break in windows
+ if getattr(signal, "SIGBREAK", None) is not None:
+ signal.signal(signal.SIGBREAK, handler)
+ dispatcher = zc.async.dispatcher.Dispatcher(ev.database, reactor)
+ def start():
+ dispatcher.activate()
+ reactor.run(installSignalHandlers=0)
+ thread = threading.Thread(target=start)
+ thread.setDaemon(True)
+ thread.start()
- def __call__(self, conn, folder):
- conn2 = conn.get_connection(self.db_name)
- tm = transaction.interfaces.ITransactionManager(conn)
- root = conn2.root()
- folder = self.get_folder(root)
- tm.commit()
- if self.name in folder:
- raise ValueError('data manager already exists in separate database',
- self.db_name, folder, self.name)
- dm = folder[self.name] = self.factory()
- conn2.add(dm)
- tm.commit()
- return dm
+class AgentInstaller(object):
-installerAndNotifier = InstallerAndNotifier(factory=SeparateDBCreation())
+ def __init__(self, agent_name='', chooser=None, size=3, queue_names=None):
+ zope.component.adapter(
+ zc.async.interfaces.IDispatcherActivated)(self)
+ self.queue_names = queue_names
+ self.agent_name = agent_name
+ self.chooser = chooser
+ self.size = size
- at zope.component.adapter(zc.async.interfaces.IDataManagerAvailableEvent)
-def installTwistedEngine(ev):
- engine = zc.async.engine.Engine(
- zope.component.getUtility(
- zc.async.interfaces.IUUID, 'instance'),
- zc.async.datamanager.Worker)
- dm = ev.object
- twisted.internet.reactor.callLater(
- 0,
- zc.twist.Partial(engine.poll, dm))
- twisted.internet.reactor.addSystemEventTrigger(
- 'before', 'shutdown', zc.twist.Partial(
- engine.tearDown, dm))
+ def __call__(self, ev):
+ dispatcher = ev.object
+ if (self.queue_names is None or
+ dispatcher.parent.name in self.queue_names):
+ if self.agent_name not in dispatcher:
+ dispatcher[self.agent_name] = zc.async.agent.Agent(
+ chooser=self.chooser, size=self.size)
+ zc.async.utils.log.info(
+ 'agent %r added to queue %r',
+ self.agent_name,
+ dispatcher.parent.name)
+ else:
+ zc.async.utils.log.info(
+ 'agent %r already in queue %r',
+ self.agent_name,
+ dispatcher.parent.name)
+
+agent_installer = AgentInstaller('main')
\ No newline at end of file
Modified: zc.async/branches/dev/src/zc/async/tests.py
===================================================================
--- zc.async/branches/dev/src/zc/async/tests.py 2008-04-03 22:48:19 UTC (rev 85090)
+++ zc.async/branches/dev/src/zc/async/tests.py 2008-04-04 03:21:17 UTC (rev 85091)
@@ -1,5 +1,4 @@
import os
-import shutil
import unittest
from zope.testing import doctest, module
@@ -11,23 +10,15 @@
def uuidSetUp(test):
import zc.async.interfaces
- test.globs['old_instance_home'] = os.environ.get("INSTANCE_HOME")
- os.environ['INSTANCE_HOME'] = os.path.join(os.path.dirname(
- zc.async.interfaces.__file__), '_test_tmp')
- os.mkdir(os.environ['INSTANCE_HOME'])
- os.mkdir(os.path.join(os.environ['INSTANCE_HOME'], 'etc'))
+ os.environ['ZC_ASYNC_UUID'] = os.path.join(os.path.dirname(
+ zc.async.interfaces.__file__), 'uuid.txt')
import zc.async.instanceuuid
uuid = zc.async.instanceuuid.getUUID()
if uuid != zc.async.instanceuuid.UUID: # test run changed it...
zc.async.instanceuuid.UUID = uuid
def uuidTearDown(test):
- shutil.rmtree(os.environ['INSTANCE_HOME'])
- if test.globs['old_instance_home'] is None:
- del os.environ['INSTANCE_HOME']
- else:
- os.environ['INSTANCE_HOME'] = test.globs['old_instance_home']
- del test.globs['old_instance_home']
+ os.remove(os.environ['ZC_ASYNC_UUID'])
def modSetUp(test):
uuidSetUp(test)
Modified: zc.async/branches/dev/src/zc/async/z3tests.py
===================================================================
--- zc.async/branches/dev/src/zc/async/z3tests.py 2008-04-03 22:48:19 UTC (rev 85090)
+++ zc.async/branches/dev/src/zc/async/z3tests.py 2008-04-04 03:21:17 UTC (rev 85091)
@@ -7,9 +7,9 @@
def setUp(test):
zc.async.tests.modSetUp(test)
# make the uuid stable for these tests
- f = open(os.path.join(
- os.environ["INSTANCE_HOME"], 'etc', 'uuid.txt'), 'w')
- f.writelines(('d10f43dc-ffdf-11dc-abd4-0017f2c49bdd',)) # ...random...
+ f = open(os.environ["ZC_ASYNC_UUID"], 'w')
+ # make this stable for test purposes
+ f.writelines(('d10f43dc-ffdf-11dc-abd4-0017f2c49bdd',))
f.close()
zc.async.instanceuuid.UUID = zc.async.instanceuuid.getUUID()
More information about the Checkins
mailing list