[Checkins] SVN: zc.async/trunk/src/zc/async/ Initial checkin. Ready for experimentation, but not yet prime time.

Gary Poster gary at zope.com
Tue Aug 15 16:33:11 EDT 2006


Log message for revision 69535:
  Initial checkin.  Ready for experimentation, but not yet prime time.
  

Changed:
  A   zc.async/trunk/src/zc/async/README.txt
  A   zc.async/trunk/src/zc/async/__init__.py
  A   zc.async/trunk/src/zc/async/adapters.py
  A   zc.async/trunk/src/zc/async/datamanager.py
  A   zc.async/trunk/src/zc/async/datamanager.txt
  A   zc.async/trunk/src/zc/async/engine.py
  A   zc.async/trunk/src/zc/async/i18n.py
  A   zc.async/trunk/src/zc/async/instanceuuid.py
  A   zc.async/trunk/src/zc/async/interfaces.py
  A   zc.async/trunk/src/zc/async/partial.py
  A   zc.async/trunk/src/zc/async/partial.txt
  A   zc.async/trunk/src/zc/async/partials_and_transactions.txt
  A   zc.async/trunk/src/zc/async/rwproperty.py
  A   zc.async/trunk/src/zc/async/rwproperty.txt
  A   zc.async/trunk/src/zc/async/subscribers.py
  A   zc.async/trunk/src/zc/async/tests.py

-=-
Added: zc.async/trunk/src/zc/async/README.txt
===================================================================
--- zc.async/trunk/src/zc/async/README.txt	2006-08-15 20:31:28 UTC (rev 69534)
+++ zc.async/trunk/src/zc/async/README.txt	2006-08-15 20:33:11 UTC (rev 69535)
@@ -0,0 +1,620 @@
+========
+zc.async
+========
+
+The zc.async package provides a way to make asynchronous application
+calls.
+
+Calls are handled by worker processes, each of which is typically a
+standard Zope process, identified by a UUID [#uuid]_, that
+may simultaneously perform other tasks (such as handle standard web
+requests).  Each worker is responsible for claiming and performing calls
+in its main thread or additional threads.  To have multiple workers on
+the same queue of tasks, share the database with ZEO.
+
+Pending calls and worker data objects are stored in a data manager
+object in the ZODB.  This is typically stored in the root of the ZODB,
+alongside the application object, with a key of 'zc.async.datamanager',
+but the adapter that obtains the data manager can be replaced to point
+to a different location.
+
+Worker data objects have queues representing potential or current tasks
+for the worker, in the main thread or a secondary thread.  Each worker
+has a virtual loop, part of the Twisted or asyncore main loop, for every
+worker process, which is responsible for responding to system calls
+(like pings) and for claiming pending main thread calls by moving them
+from the datamanager async queue to their own.  Each worker thread queue
+also represents spots for claiming and performing pending thread
+calls.
+
+Set Up
+======
+
+By default, zc.async expects to have an object in the root of
+the ZODB, alongside the application object, with a key of
+'zc.async.datamanager'.  The package includes subscribers to
+zope.app.appsetup.interfaces.IDatabaseOpenedEvent that sets an instance
+up in this location if one does not exist.
+
+One version of the subscriber expects to put the object in the same
+database as the main application (`basicInstallerAndNotifier`), and the
+other version expects to put the object in a secondary database, with a
+reference to it in the main database (`installerAndNotifier`).  The
+second approach keeps the database churn generated by zc.async, which
+can be significant, separate from your main data.  You can use either
+(or your own); the first version is the default, since it requires no
+additional set-up. When this documentation is run as a test, it is run
+twice, once with each setup.  To accomodate this, in our example below
+we appear to pull the "installerAndNotifier" out of the air: it is
+installed as a global when the test is run.  You will want to use one of
+the two subscribers mentioned above, or roll your own.
+
+XXX explain possible gotchas if you run the separate database (i.e., you may
+have to explicitly add objects to connections if you create an object for the
+main database and put it as a partial callable or argument in the same
+transaction).
+
+Let's assume we have a reference to a database named `db`, a connection
+named `conn`, a `root`, and an application in the 'app' key
+[#setup]_.  If we provide a handler, fire the event and examine the
+root, we will see the new datamanager.
+
+    >>> import zope.component
+    >>> import zc.async.subscribers
+    >>> zope.component.provideHandler(installerAndNotifier) # see above
+    ... # for explanation of where installerAndNotifier came from
+    >>> import zope.event
+    >>> import zope.app.appsetup.interfaces
+    >>> zope.event.notify(zope.app.appsetup.interfaces.DatabaseOpened(db))
+    >>> import transaction
+    >>> t = transaction.begin()
+    >>> root['zc.async.datamanager'] # doctest: +ELLIPSIS
+    <zc.async.datamanager.DataManager object at ...>
+
+The default adapter from persistent object to datamanager will get us
+the same result.
+
+    >>> import zc.async.adapters
+    >>> zope.component.provideAdapter(
+    ...     zc.async.adapters.defaultDataManagerAdapter)
+    >>> import zc.async.interfaces
+    >>> zc.async.interfaces.IDataManager(app) # doctest: +ELLIPSIS
+    <zc.async.datamanager.DataManager object at ...>
+
+Normally, each process discovers or creates its UUID and registers
+itself with the data manager as a worker.  This would have happened when
+the data manager was announced as available in the InstallerAndNotifier
+above.
+
+    >>> from zope.component import eventtesting
+    >>> evs = eventtesting.getEvents(
+    ...     zc.async.interfaces.IDataManagerAvailableEvent)
+    >>> evs # doctest: +ELLIPSIS
+    [<zc.async.interfaces.DataManagerAvailable object at ...>]
+
+So now we would have had a subscriber that installed the worker in the
+data manager.  But right now there are no workers, just because we
+didn't want to talk about the next step yet.
+
+    >>> len(zc.async.interfaces.IDataManager(app).workers)
+    0
+
+Let's install the subscriber we need and refire the event.  Our worker
+will have a UUID created for it, and then it will be installed with the
+UUID as key.  We can't actually use the same event because it has an
+object from a different connection, so we'll recreate it.
+
+    >>> zope.component.provideHandler(
+    ...     zc.async.subscribers.installTwistedEngine)
+    >>> zope.event.notify(
+    ...     zc.async.interfaces.DataManagerAvailable(
+    ...         root['zc.async.datamanager']))
+    >>> time_passes()
+    True
+    >>> t = transaction.begin() # sync
+    >>> len(zc.async.interfaces.IDataManager(app).workers)
+    1
+    >>> zc.async.interfaces.IDataManager(app).workers.values()[0]
+    ... # doctest: +ELLIPSIS
+    <zc.async.datamanager.Worker object at ...>
+    >>> (zc.async.interfaces.IDataManager(app).workers.values()[0].engineUUID
+    ...  is not None)
+    True
+
+The new UUID, in hex, is stored in INSTANCE_HOME/etc/uuid.txt
+
+    >>> import uuid
+    >>> import os
+    >>> f = open(os.path.join(
+    ...     os.environ.get("INSTANCE_HOME"), 'etc', 'uuid.txt'))
+    >>> uuid_hex = f.readline().strip()
+    >>> f.close()
+    >>> uuid = uuid.UUID(uuid_hex)
+    >>> worker = zc.async.interfaces.IDataManager(app).workers[uuid]
+    >>> worker.UUID == uuid
+    True
+
+The file is intended to stay in the instance home as a persistent identifier
+of this particular worker.
+
+Our worker has `thread` and `reactor` jobs, with all jobs available.
+
+    >>> worker.thread.size
+    1
+    >>> worker.reactor.size
+    4
+    >>> len(worker.thread)
+    0
+    >>> len(worker.reactor)
+    0
+
+We now have a simple set up: a data manager with a single worker.  Let's start
+making some asynchronous calls!
+
+Basic Usage: IManager.add
+=========================
+
+The simplest case is simple to perform: pass a persistable callable to the
+manager's .add method.
+
+    >>> from zc.async import interfaces
+    >>> dm = zc.async.interfaces.IDataManager(app)
+    >>> def send_message():
+    ...     print "imagine this sent a message to another machine"
+    >>> partial = dm.reactor.put(send_message)
+    >>> transaction.commit()
+
+Now a few cycles need to pass in order to have the job performed.  We'll
+use a helper function called `time_flies` to simulate the asynchronous
+cycles necessary for the manager and workers to perform the task.
+
+    >>> count = time_flies(dm.workers.values()[0].poll_seconds)
+    imagine this sent a message to another machine
+
+You can also pass a datetime.datetime to schedule the call: the
+safest thing to use is a UTC timezone. The datetime is interpreted as a
+UTC datetime.
+
+    >>> t = transaction.begin()
+    >>> import datetime
+    >>> import pytz
+    >>> datetime.datetime.now(pytz.UTC)
+    datetime.datetime(2006, 8, 10, 15, 44, 27, 211, tzinfo=<UTC>)
+    >>> partial = dm.reactor.put(
+    ...     send_message, datetime.datetime(
+    ...         2006, 8, 10, 15, 45, tzinfo=pytz.UTC))
+    >>> partial.begin_after
+    datetime.datetime(2006, 8, 10, 15, 45, tzinfo=<UTC>)
+    >>> transaction.commit()
+    >>> count = time_flies(10)
+    >>> count = time_flies(10)
+    >>> count = time_flies(10)
+    >>> count = time_flies(5)
+    imagine this sent a message to another machine
+    >>> datetime.datetime.now(pytz.UTC)
+    datetime.datetime(2006, 8, 10, 15, 45, 2, 211, tzinfo=<UTC>)
+
+If you set a time that has already passed, it will be run as if it had
+been set to run immediately.
+
+    >>> t = transaction.begin()
+    >>> partial = dm.reactor.put(
+    ...     send_message, datetime.datetime(2006, 7, 21, 12, tzinfo=pytz.UTC))
+    >>> transaction.commit()
+    >>> count = time_flies(5)
+    imagine this sent a message to another machine
+
+The `add` method of the thread and reactor queues is the manager's
+entire application API.  Other methods are used to introspect, but are
+not needed for basic usage.  We will examine the introspection API below
+(`Manager Introspection`_), and will discuss an advanced feature of the
+`add` method (`Specifying Workers`), but let's explore some more usage
+patterns first.
+
+Typical Usage: zc.async.Partial
+================================
+
+...(currently tests and discussion are in partial.txt and datamanager.txt.
+We need user-friendly docs, as well as stress tests.  The remainder of the
+below is somewhat unedited and incomplete at the moment)...
+
+    >>> t = transaction.begin()
+    >>> import zc.async
+    >>> import persistent
+    >>> import transaction
+    >>> import zc.async.partial
+    >>> class Demo(persistent.Persistent):
+    ...     counter = 0
+    ...     def increase(self, value=1):
+    ...         self.counter += value
+    ...
+    >>> app['demo'] = Demo()
+    >>> transaction.commit() # XXX example of gotcha for multiple databases:
+    ... # connection.add or commit before adding to partial
+    >>> app['demo'].counter
+    0
+    >>> partial = dm.reactor.put(
+    ...     zc.async.partial.Partial(app['demo'].increase))
+    >>> transaction.commit()
+    >>> count = time_flies(5)
+
+We need to commit the transaction in our connection so that we get the
+changes in other connections (beginning and committing transactions sync
+connections).
+
+    >>> app['demo'].counter
+    0
+    >>> t = transaction.begin()
+    >>> app['demo'].counter
+    1
+
+The deferred class can take arguments and keyword arguments for the
+wrapped callable as well, similar to Python 2.5's `partial`.  For this
+use case, though, realize that the partial will be called with no
+arguments, so you must supply all necessary arguments for the callable
+on creation time.
+
+    >>> partial = dm.reactor.put(
+    ...     zc.async.partial.Partial(app['demo'].increase, 5))
+    >>> transaction.commit()
+    >>> count = time_flies(5)
+    >>> t = transaction.begin()
+    >>> app['demo'].counter
+    6
+    >>> partial = dm.reactor.put(
+    ...     zc.async.partial.Partial(app['demo'].increase, value=10))
+    >>> transaction.commit()
+    >>> count = time_flies(5)
+    >>> t = transaction.begin()
+    >>> app['demo'].counter
+    16
+
+Optimized Usage
+===============
+
+Writing a task that doesn't need a ZODB connection
+--------------------------------------------------
+
+...Twisted reactor tasks are best for this...
+
+...also could have different IPartial implementation sets self as
+ACTIVE, commits and closes connection, calls f with args, and when
+result returns, gets connection again and sets value on it, changes
+state, and performs callbacks, sets state...
+
+Multiple ZEO workers
+--------------------
+
+...
+
+Catching and Fixing Errors
+==========================
+
+...call installed during InstallTwistedWorker to check on worker...
+
+...worker finds another process already installed with same UUID; could be
+shutdown error (ghost of self) or really another process...show engineUUID...
+some discussion already in datamanager.txt...
+
+Gotchas
+=======
+
+...some callbacks may still be working when partial is completed.  Therefore
+partial put in `completed` for worker so that it can have a chance to run to
+completion (in addition to other goals, like being able to look at 
+
+Patterns
+========
+
+Partials That Need a Certain Environment
+----------------------------------------
+
+...Partial that needs a certain environment: wrap partial in partial.  Outer
+partial is responsible for checking if environment is good; if so, run inner
+partial, and if not, create a new outer partial, copy over our excluded worker
+UUIDs, add this worker UUID, set perform_after to adjusted value,
+and schedule it...
+
+Callbacks That Want to be Performed by a Worker
+-----------------------------------------------
+
+Callbacks are called immediately, whether they be within the call to the
+partial, or within the `addCallbacks` call.  If you want the job to be done
+asynchronously, make the callback with a partial.  The partial will get
+a reference to the data_manager used by the main partial.  It can create a
+partial, assign it to one of the data manager queues, and return the partial.
+Consider the following.
+
+    >>> def multiply(*args):
+    ...     res = 1
+    ...     for a in args:
+    ...         res *= a
+    ...     return res
+    ...
+    >>> def doCallbackWithPartial(partial, res):
+    ...     p = zc.async.partial.Partial(multiply, 2, res)
+    ...     zc.async.interfaces.IDataManager(partial).thread.put(p)
+    ...     return p
+    ...
+    >>> p = dm.thread.put(zc.async.partial.Partial(multiply, 3, 4))
+    >>> p_callback = p.addCallbacks(
+    ...     zc.async.partial.Partial.bind(doCallbackWithPartial))
+    >>> transaction.commit()
+    >>> import time
+    >>> for i in range(100):
+    ...     ignore = time_flies(5)
+    ...     time.sleep(0)
+    ...     t = transaction.begin()
+    ...     if p_callback.state == zc.async.interfaces.COMPLETED:
+    ...         break
+    ...
+    >>> p.result
+    12
+    >>> p.state == zc.async.interfaces.COMPLETED
+    True
+    >>> p_callback.state == zc.async.interfaces.COMPLETED
+    True
+    >>> p_callback.result
+    24
+
+Progress Reports
+----------------
+
+Using zc.twist.Partial, or by managing your own connections
+otherwise, you can send messages back during a long-running connection. 
+For instance, imagine you wanted to annotate a partial with progress
+messages, while not actually committing the main work.
+
+Here's an example of one way of getting this to work.  We can use the partial's
+annotations, which are not touched by the partial code and are a separate
+persistent object, so can be changed concurrently without conflict errors.
+
+We'll run the partial within a threaded worker. The callable could use
+twisted.internet.reactor.callFromThread to get the change to be made. 
+Parts of the twist.Partial machinery expect to be called in the main
+thread, where the twisted reactor is running.
+
+    >>> import twisted.internet.reactor
+    >>> def setAnnotation(partial, annotation_key, value):
+    ...     partial.annotations[annotation_key] = value
+    ...
+    >>> import threading
+    >>> thread_lock = threading.Lock()
+    >>> main_lock = threading.Lock()
+    >>> acquired = thread_lock.acquire()
+    >>> acquired = main_lock.acquire()
+    >>> def callWithProgressReport(partial):
+    ...     print "do some work"
+    ...     print "more work"
+    ...     print "about half done"
+    ...     twisted.internet.reactor.callFromThread(zc.twist.Partial(
+    ...         setAnnotation, partial, 'zc.async.partial_txt.half_done', True))
+    ...     main_lock.release()
+    ...     acquired = thread_lock.acquire()
+    ...     return 42
+    ...
+    >>> p = dm.thread.put(zc.async.partial.Partial.bind(callWithProgressReport))
+    >>> transaction.commit()
+    >>> ignore = time_flies(5) # get the reactor to kick for main call
+    do some work
+    more work
+    about half done
+    >>> ignore = time_flies(5) # get the reactor to kick for progress report
+    >>> acquired = main_lock.acquire()
+    >>> t = transaction.begin() # sync
+    >>> p.annotations.get('zc.async.partial_txt.half_done')
+    True
+    >>> p.state == zc.async.interfaces.ACTIVE
+    True
+    >>> thread_lock.release()
+    >>> for i in range(100):
+    ...     ignore = time_flies(5)
+    ...     time.sleep(0)
+    ...     t = transaction.begin()
+    ...     if p.state == zc.async.interfaces.COMPLETED:
+    ...         break
+    ...
+    >>> p.result
+    42
+    >>> thread_lock.release()
+    >>> main_lock.release()
+
+Expiration
+----------
+
+If you want your call to expire after a certain amount of time, keep
+track of time yourself, and return a failure if you go over.  The
+partial does not offer special support for this use case.
+
+Stopping the Engine
+-------------------
+
+The subscriber that sets up the async engine within the Twisted reactor also
+sets up a tearDown trigger.  We can look in our faux reactor and get it.
+
+    >>> len(faux.triggers)
+    1
+    >>> len(faux.triggers[0])
+    3
+    >>> faux.triggers[0][:2]
+    ('before', 'shutdown')
+    >>> dm.workers.values()[0].engineUUID is not None
+    True
+    >>> d = faux.triggers[0][2]()
+    >>> t = transaction.begin()
+    >>> dm.workers.values()[0].engineUUID is None
+    True
+
+[#tear_down]_
+
+=========
+Footnotes
+=========
+
+.. [#uuid] UUIDs are generated by http://zesty.ca/python/uuid.html, as
+    incorporated in Python 2.5.  They are expected to be found in 
+    os.path.join(os.environ.get("INSTANCE_HOME"), 'etc', 'uuid.txt');
+    this file will be created and populated with a new UUID if it does
+    not exist.
+
+.. [#setup] This is a bit more than standard set-up code for a ZODB test,
+    because it sets up a multi-database.
+
+    >>> from ZODB.tests.util import DB # use conflict resolution test one XXX
+    >>> class Factory(object):
+    ...     def __init__(self, name):
+    ...         self.name = name
+    ...     def open(self):
+    ...         return DB()
+    ...
+    >>> import zope.app.appsetup.appsetup
+    >>> db = zope.app.appsetup.appsetup.multi_database(
+    ...     (Factory('main'), Factory('zc.async')))[0][0]
+    >>> conn = db.open()
+    >>> root = conn.root()
+    >>> import zope.app.folder # import rootFolder
+    >>> app = root['Application'] = zope.app.folder.rootFolder()
+    >>> import transaction
+    >>> transaction.commit()
+
+    You must have two adapter registrations: IConnection to
+    ITransactionManager, and IPersistent to IConnection.  We will also
+    register IPersistent to ITransactionManager because the adapter is
+    designed for it.
+
+    >>> from zc.twist import transactionManager, connection
+    >>> import zope.component
+    >>> zope.component.provideAdapter(transactionManager)
+    >>> zope.component.provideAdapter(connection)
+    >>> import ZODB.interfaces
+    >>> zope.component.provideAdapter(
+    ...     transactionManager, adapts=(ZODB.interfaces.IConnection,))
+
+    We need to be able to get data manager partials for functions and methods;
+    normal partials for functions and methods; and a data manager for a partial.
+    Here are the necessary registrations.
+
+    >>> import zope.component
+    >>> import types
+    >>> import zc.async.interfaces
+    >>> import zc.async.partial
+    >>> import zc.async.adapters
+    >>> zope.component.provideAdapter(
+    ...     zc.async.adapters.method_to_datamanagerpartial)
+    >>> zope.component.provideAdapter(
+    ...     zc.async.adapters.function_to_datamanagerpartial)
+    >>> zope.component.provideAdapter( # partial -> datamanagerpartial
+    ...     zc.async.adapters.DataManagerPartial,
+    ...     provides=zc.async.interfaces.IDataManagerPartial)
+    >>> zope.component.provideAdapter(
+    ...     zc.async.adapters.partial_to_datamanager)
+    >>> zope.component.provideAdapter(
+    ...     zc.async.partial.Partial,
+    ...     adapts=(types.FunctionType,),
+    ...     provides=zc.async.interfaces.IPartial)
+    >>> zope.component.provideAdapter(
+    ...     zc.async.partial.Partial,
+    ...     adapts=(types.MethodType,),
+    ...     provides=zc.async.interfaces.IPartial)
+    ...
+    
+    A monkeypatch, removed in another footnote below.
+
+    >>> import datetime
+    >>> import pytz
+    >>> old_datetime = datetime.datetime
+    >>> def set_now(dt):
+    ...     global _now
+    ...     _now = _datetime(*dt.__reduce__()[1])
+    ...
+    >>> class _datetime(old_datetime):
+    ...     @classmethod
+    ...     def now(klass, tzinfo=None):
+    ...         if tzinfo is None:
+    ...             return _now.replace(tzinfo=None)
+    ...         else:
+    ...             return _now.astimezone(tzinfo)
+    ...     def astimezone(self, tzinfo):
+    ...         return _datetime(
+    ...             *super(_datetime,self).astimezone(tzinfo).__reduce__()[1])
+    ...     def replace(self, *args, **kwargs):
+    ...         return _datetime(
+    ...             *super(_datetime,self).replace(
+    ...                 *args, **kwargs).__reduce__()[1])
+    ...     def __repr__(self):
+    ...         raw = super(_datetime, self).__repr__()
+    ...         return "datetime.datetime%s" % (
+    ...             raw[raw.index('('):],)
+    ...     def __reduce__(self):
+    ...         return (argh, super(_datetime, self).__reduce__()[1])
+    >>> def argh(*args, **kwargs):
+    ...     return _datetime(*args, **kwargs)
+    ...
+    >>> datetime.datetime = _datetime
+    >>> _start = _now = datetime.datetime(
+    ...     2006, 8, 10, 15, 44, 22, 211, pytz.UTC)
+
+    We monkeypatch twisted.internet.reactor (and replace it below).  
+
+    >>> import twisted.internet.reactor
+    >>> import threading
+    >>> import bisect
+    >>> class FauxReactor(object):
+    ...     def __init__(self):
+    ...         self.time = 0
+    ...         self.calls = []
+    ...         self.triggers = []
+    ...         self._lock = threading.Lock()
+    ...     def callLater(self, delay, callable, *args, **kw):
+    ...         res = (delay + self.time, callable, args, kw)
+    ...         self._lock.acquire()
+    ...         bisect.insort(self.calls, res)
+    ...         self._lock.release()
+    ...         # normally we're supposed to return something but not needed
+    ...     def callFromThread(self, callable, *args, **kw):
+    ...         self._lock.acquire()
+    ...         bisect.insort(
+    ...             self.calls,
+    ...             (self.time, callable, args, kw))
+    ...         self._lock.release()
+    ...     def addSystemEventTrigger(self, *args):
+    ...         self.triggers.append(args) # 'before', 'shutdown', callable
+    ...     def time_flies(self, time):
+    ...         global _now
+    ...         end = self.time + time
+    ...         ct = 0
+    ...         while self.calls and self.calls[0][0] <= end:
+    ...             self.time, callable, args, kw = self.calls.pop(0)
+    ...             _now = _datetime(
+    ...                 *(_start + datetime.timedelta(
+    ...                     seconds=self.time)).__reduce__()[1])
+    ...             callable(*args, **kw) # normally this would get try...except
+    ...             ct += 1
+    ...         self.time = end
+    ...         return ct
+    ...     def time_passes(self):
+    ...         if self.calls and self.calls[0][0] <= self.time:
+    ...             self.time, callable, args, kw = self.calls.pop(0)
+    ...             callable(*args, **kw)
+    ...             return True
+    ...         return False
+    ...
+    >>> faux = FauxReactor()
+    >>> oldCallLater = twisted.internet.reactor.callLater
+    >>> oldCallFromThread = twisted.internet.reactor.callFromThread
+    >>> oldAddSystemEventTrigger = (
+    ...     twisted.internet.reactor.addSystemEventTrigger)
+    >>> twisted.internet.reactor.callLater = faux.callLater
+    >>> twisted.internet.reactor.callFromThread = faux.callFromThread
+    >>> twisted.internet.reactor.addSystemEventTrigger = (
+    ...     faux.addSystemEventTrigger)
+    >>> time_flies = faux.time_flies
+    >>> time_passes = faux.time_passes
+
+.. [#tear_down]
+
+    >>> twisted.internet.reactor.callLater = oldCallLater
+    >>> twisted.internet.reactor.callFromThread = oldCallFromThread
+    >>> twisted.internet.reactor.addSystemEventTrigger = (
+    ...     oldAddSystemEventTrigger)
+    >>> datetime.datetime = old_datetime


Property changes on: zc.async/trunk/src/zc/async/README.txt
___________________________________________________________________
Name: svn:eol-style
   + native

Added: zc.async/trunk/src/zc/async/__init__.py
===================================================================


Property changes on: zc.async/trunk/src/zc/async/__init__.py
___________________________________________________________________
Name: svn:eol-style
   + native

Added: zc.async/trunk/src/zc/async/adapters.py
===================================================================
--- zc.async/trunk/src/zc/async/adapters.py	2006-08-15 20:31:28 UTC (rev 69534)
+++ zc.async/trunk/src/zc/async/adapters.py	2006-08-15 20:33:11 UTC (rev 69535)
@@ -0,0 +1,121 @@
+import types
+import datetime
+import pytz
+import persistent
+import persistent.interfaces
+import zope.interface
+import zope.component
+
+import zc.async.interfaces
+import zc.async.subscribers
+import zc.set
+from zc.async import rwproperty
+
+
+ at zope.component.adapter(persistent.interfaces.IPersistent)
+ at zope.interface.implementer(zc.async.interfaces.IDataManager)
+def defaultDataManagerAdapter(obj):
+    return obj._p_jar.root()[zc.async.subscribers.NAME]
+
+
+ at zope.component.adapter(zc.async.interfaces.IPartial)
+ at zope.interface.implementer(zc.async.interfaces.IDataManager)
+def partial_to_datamanager(partial):
+    p = partial.__parent__
+    while (p is not None and
+           not zc.async.interfaces.IDataManager.providedBy(p)):
+        p = getattr(p, '__parent__', None)
+    return p
+
+
+class TransparentDescriptor(object):
+    def __init__(self, src_name, value_name, readonly=False):
+        self.src_name = src_name
+        self.value_name = value_name
+        self.readonly = readonly
+
+    def __get__(self, obj, klass=None):
+        if obj is None:
+            return self
+        src = getattr(obj, self.src_name)
+        return getattr(src, self.value_name)
+
+    def __set__(self, obj, value):
+        if self.readonly:
+            raise AttributeError
+        src = getattr(obj, self.src_name)
+        setattr(src, self.value_name, value)
+
+
+class DataManagerPartialData(persistent.Persistent):
+
+    workerUUID = assignerUUID = thread = None
+    _begin_by = _begin_after = None
+
+    def __init__(self, partial):
+        self.__parent__ = self.partial = partial
+        self.selectedUUIDs = zc.set.Set()
+        self.excludedUUIDs = zc.set.Set()
+
+    @property
+    def begin_after(self):
+        return self._begin_after
+    @rwproperty.setproperty
+    def begin_after(self, value):
+        if self.assignerUUID is not None:
+            raise RuntimeError(
+                'can only change begin_after before partial is assigned')
+        if value is not None:
+            if value.tzinfo is None:
+                raise ValueError('cannot use timezone-naive values')
+            else:
+                value = value.astimezone(pytz.UTC)
+        self._begin_after = value
+
+    @property
+    def begin_by(self):
+        return self._begin_by
+    @rwproperty.setproperty
+    def begin_by(self, value):
+        if self.partial.state != zc.async.interfaces.PENDING:
+            raise RuntimeError(
+                'can only change begin_by value of PENDING partial')
+        if value is not None:
+            if value < datetime.timedelta():
+                raise ValueError('negative values are not allowed')
+        self._begin_by = value
+
+KEY = 'zc.async.datamanagerpartial'
+
+
+class DataManagerPartial(persistent.Persistent):
+    zope.interface.implements(zc.async.interfaces.IDataManagerPartial)
+    zope.component.adapts(zc.async.interfaces.IPartial)
+
+    def __init__(self, partial):
+        self._data = partial
+        if KEY not in partial.annotations:
+            partial.annotations[KEY] = DataManagerPartialData(partial)
+        self._extra = partial.annotations[KEY]
+
+    for nm in zc.async.interfaces.IPartial.names(True):
+        if nm == '__parent__':
+            readonly = False
+        else:
+            readonly = True
+        locals()[nm] = TransparentDescriptor('_data', nm, readonly)
+    for nm in ('workerUUID', 'assignerUUID', 'thread', 'begin_after',
+               'begin_by'):
+        locals()[nm] = TransparentDescriptor('_extra', nm)
+    for nm in ('selectedUUIDs', 'excludedUUIDs'):
+        locals()[nm] = TransparentDescriptor('_extra', nm, True)
+
+ at zope.component.adapter(types.MethodType)
+ at zope.interface.implementer(zc.async.interfaces.IDataManagerPartial)
+def method_to_datamanagerpartial(m):
+    return DataManagerPartial(zc.async.partial.Partial(m))
+
+ at zope.component.adapter(types.FunctionType)
+ at zope.interface.implementer(zc.async.interfaces.IDataManagerPartial)
+def function_to_datamanagerpartial(f):
+    return DataManagerPartial(zc.async.partial.Partial(f))


Property changes on: zc.async/trunk/src/zc/async/adapters.py
___________________________________________________________________
Name: svn:eol-style
   + native

Added: zc.async/trunk/src/zc/async/datamanager.py
===================================================================
--- zc.async/trunk/src/zc/async/datamanager.py	2006-08-15 20:31:28 UTC (rev 69534)
+++ zc.async/trunk/src/zc/async/datamanager.py	2006-08-15 20:33:11 UTC (rev 69535)
@@ -0,0 +1,403 @@
+import datetime
+import bisect
+import pytz
+import persistent
+import ZODB.interfaces
+import BTrees.OOBTree
+import BTrees.Length
+import zope.interface
+import zope.component
+import zope.bforest
+import zc.queue
+
+import zc.async.interfaces
+
+
+def simpleWrapper(name):
+    def wrapper(self, *args, **kwargs):
+        return getattr(self._data, name)(*args, **kwargs)
+    return wrapper
+
+class Workers(persistent.Persistent):
+    zope.interface.implements(zc.async.interfaces.IWorkers)
+
+    def __init__(self):
+        self._data = BTrees.OOBTree.OOBTree()
+
+    for nm in ('__getitem__', 'get', '__len__', 'keys', 'values', 'items',
+               '__contains__', 'maxKey', 'minKey'):
+        locals()[nm] = simpleWrapper(nm)
+
+    def __iter__(self):
+        return iter(self._data)
+
+    def add(self, value):
+        value = zc.async.interfaces.IWorker(value)
+        if value.UUID is None:
+            raise ValueError("worker must have assigned UUID")
+        self._data[value.UUID] = value
+        value.__parent__ = self
+        return value
+
+    def remove(self, UUID):
+        ob = self._data.pop(UUID)
+        ob.__parent__ = None
+
+def cleanDeadWorker(worker):
+    dm = worker.__parent__.__parent__
+    assert zc.async.interfaces.IDataManager.providedBy(dm)
+    for queue, destination in (
+        (worker.thread, dm.thread), (worker.reactor, dm.reactor)):
+        while queue:
+            p = queue[0]
+            del queue[0]
+            if p.state == zc.async.interfaces.PENDING:
+                destination.put(p.__call__) # will wrap it
+            elif p.state == zc.async.interfaces.ACTIVE:
+                destination.put(p.fail)
+            elif p.state == zc.async.interfaces.CALLBACKS:
+                destination.put(p.resumeCallbacks)
+    
+
+class PartialQueue(persistent.Persistent):
+    zope.interface.implements(zc.async.interfaces.IPartialQueue)
+
+    def __init__(self, thread):
+        self.thread = thread
+        self._queue = zc.queue.CompositePersistentQueue()
+        self._held = BTrees.OOBTree.OOBTree()
+        self._length = BTrees.Length.Length(0)
+
+    def put(self, item, begin_after=None, begin_by=None):
+        item = zc.async.interfaces.IDataManagerPartial(item)
+        if item.assignerUUID is not None:
+            raise ValueError(
+                'cannot add already-assigned partial')
+        now = datetime.datetime.now(pytz.UTC)
+        if begin_after is not None:
+            item.begin_after = begin_after
+        elif item.begin_after is None:
+            item.begin_after = now
+        if begin_by is not None:
+            item.begin_by = begin_by
+        elif item.begin_by is None:
+            item.begin_by = datetime.timedelta(hours=1)
+        item.assignerUUID = zope.component.getUtility(
+            zc.async.interfaces.IUUID, 'instance')
+        if item._p_jar is None:
+            # we need to do this if the partial will be stored in another
+            # database as well during this transaction.  Also, _held storage
+            # disambiguates against the database_name and the _p_oid.
+            conn = ZODB.interfaces.IConnection(self)
+            conn.add(item)
+        if now == item.begin_after:
+            self._queue.put(item)
+        else:
+            self._held[
+                (item.begin_after,
+                 item._p_jar.db().database_name,
+                 item._p_oid)] = item
+        item.__parent__ = self
+        self._length.change(1)
+        return item
+
+    def _iter(self):
+        queue = self._queue
+        tree = self._held
+        q = enumerate(queue)
+        t = iter(tree.items())
+        q_pop = queue.pull
+        t_pop = tree.pop
+        def get_next(i):
+            try:
+                next = i.next()
+            except StopIteration:
+                active = False
+                next = (None, None)
+            else:
+                active = True
+            return active, next
+        q_active, (q_index, q_next) = get_next(q)
+        t_active, (t_index, t_next) = get_next(t)
+        while q_active and t_active:
+            if t_next.begin_after <= q_next.begin_after:
+                yield t_pop, t_index, t_next
+                t_active, (t_index, t_next) = get_next(t)
+            else:
+                yield q_pop, q_index, q_next
+                q_active, (q_index, q_next) = get_next(q)
+        if t_active:
+            yield t_pop, t_index, t_next
+            for (t_index, t_next) in t:
+                yield t_pop, t_index, t_next
+        elif q_active:
+            yield q_pop, q_index, q_next
+            for (q_index, q_next) in q:
+                yield q_pop, q_index, q_next
+
+    def pull(self, index=0):
+        if index >= self._length():
+            raise IndexError(index)
+        for i, (pop, ix, next) in enumerate(self._iter()):
+            if i == index:
+                tmp = pop(ix)
+                assert tmp is next
+                self._length.change(-1)
+                return next
+        assert False, 'programmer error: the length appears to be incorrect.'
+
+    def __len__(self):
+        return self._length()
+
+    def __iter__(self):
+        return (next for pop, ix, next in self._iter())
+
+    def __nonzero__(self):
+        return bool(self._length())
+
+    def __getitem__(self, index):
+        if index >= len(self):
+            raise IndexError(index)
+        return zc.queue.getitem(self, index)
+
+    def pullNext(self, uuid):
+        now = datetime.datetime.now(pytz.UTC)
+        for ix, p in enumerate(self.iterDue()):
+            if uuid not in p.excludedUUIDs and (
+                not p.selectedUUIDs or
+                uuid in p.selectedUUIDs):
+                return self.pull(ix)
+            elif (p.begin_after + p.begin_by) < now:
+                res = zc.async.interfaces.IDataManagerPartial(
+                        self.pull(ix).fail)
+                res.__parent__ = self
+                res.begin_after = now
+                res.begin_by = datetime.timedelta(hours=1)
+                res.assignerUUID = zope.component.getUtility(
+                    zc.async.interfaces.IUUID, 'instance')
+                return res
+
+    def iterDue(self):
+        now = datetime.datetime.now(pytz.UTC)
+        for partial in self:
+            if partial.begin_after > now:
+                break
+            yield partial
+
+
+class DataManager(persistent.Persistent):
+    zope.interface.implements(zc.async.interfaces.IDataManager)
+
+    def __init__(self):
+        self.thread = PartialQueue(True)
+        self.thread.__parent__ = self
+        self.reactor = PartialQueue(False)
+        self.reactor.__parent__ = self
+        self.workers = Workers()
+        self.workers.__parent__ = self
+
+    def _getNextActiveSibling(self, uuid):
+        for worker in self.workers.values(min=uuid, excludemin=True):
+            if worker.engineUUID is not None:
+                return worker
+        for worker in self.workers.values(max=uuid, excludemax=True):
+            if worker.engineUUID is not None:
+                return worker
+
+    def checkSibling(self, uuid):
+        now = datetime.datetime.now(pytz.UTC)
+        next = self._getNextActiveSibling(uuid)
+        if next is not None and ((
+            next.last_ping + next.ping_interval + next.ping_death_interval)
+            < now):
+            # `next` is a dead worker.
+            next.engineUUID = None
+            self.thread.put(zc.async.partial.Partial(cleanDeadWorker, next))
+
+
+class SizedSequence(persistent.Persistent):
+    zope.interface.implements(zc.async.interfaces.ISizedSequence)
+
+    def __init__(self, size):
+        self.size = size
+        self._data = zc.queue.PersistentQueue()
+        self._data.__parent__ = self
+
+    for nm in ('__len__', '__iter__', '__getitem__', '__nonzero__',
+               '_p_resolveConflict'):
+        locals()[nm] = simpleWrapper(nm)
+
+    def add(self, item):
+        if len(self._data) >= self.size:
+            raise zc.async.interfaces.FullError(self)
+        item.__parent__ = self
+        item.workerUUID = self.__parent__.UUID
+        self._data.put(item)
+        return item
+
+    def index(self, item):
+        for ix, i in enumerate(self):
+            if i is item:
+                return ix
+        raise ValueError("%r not in queue" % (item,))
+
+    def remove(self, item):
+        del self[self.index(item)]
+
+    def __delitem__(self, ix):
+        self._data.pull(ix)
+
+
+START = datetime.datetime(2006, 1, 1, tzinfo=pytz.UTC)
+
+def key(item):
+    dt = item.begin_after
+    diff = dt - START
+    return (-diff.days, -diff.seconds, -diff.microseconds,
+            item._p_jar.db().database_name, item._p_oid)
+
+def code(dt):
+    diff = dt - START
+    return (-diff.days, -diff.seconds, -diff.microseconds)
+
+
+class Completed(persistent.Persistent):
+    zope.interface.implements(zc.async.interfaces.ICompletedCollection)
+    # sorts on begin_after from newest to oldest
+
+    __parent__ = None
+
+    def __init__(self,
+                 rotation_interval=datetime.timedelta(hours=2),
+                 buckets=6):
+        self._data = zope.bforest.OOBForest(count=buckets)
+        self.rotation_interval = rotation_interval
+        self.last_rotation = datetime.datetime.now(pytz.UTC)
+
+    def add(self, item):
+        self._data[key(item)] = item
+        item.__parent__ = self
+
+    def iter(self, start=None, stop=None):
+        sources = []
+        if start is not None:
+            start = code(start)
+        if stop is not None:
+            stop = code(stop)
+        for b in self._data.buckets:
+            i = iter(b.items(start, stop))
+            try:
+                n = i.next()
+            except StopIteration:
+                pass
+            else:
+                sources.append([n, i])
+        sources.sort()
+        length = len(sources)
+        while length > 1:
+            src = sources.pop(0)
+            yield src[0][1]
+            try:
+                src[0] = src[1].next()
+            except StopIteration:
+                length -= 1
+            else:
+                bisect.insort(sources, src) # mildly interesting micro-
+                # optimisation note: this approach shaves off about 1/5 of
+                # an alternative approach that finds the lowest every time
+                # but does not insort.
+        if sources:
+            yield sources[0][0][1]
+            for k, v in sources[0][1]:
+                yield v
+
+    def __iter__(self):
+        return self._data.itervalues() # this takes more memory but the pattern
+        # is typically faster than the custom iter above (for relatively
+        # complete iterations of relatively small sets).  The custom iter
+        # has the advantage of the start and stop code.
+
+    def first(self, start=None):
+        original = start
+        if start is not None:
+            start = code(start)
+            minKey = lambda bkt: bkt.minKey(start)
+        else:
+            minKey = lambda bkt: bkt.minKey()
+        i = iter(self._data.buckets)
+        bucket = i.next()
+        try:
+            key = minKey(bucket)
+        except ValueError:
+            key = None
+        for b in i:
+            try:
+                k = minKey(b)
+            except ValueError:
+                continue
+            if key is None or k < key:
+                bucket, key = b, k
+        if key is None:
+            raise ValueError(original)
+        return bucket[key]
+
+    def last(self, stop=None):
+        original = stop
+        if stop is not None:
+            stop = code(stop)
+            maxKey = lambda bkt: bkt.maxKey(stop)
+        else:
+            maxKey = lambda bkt: bkt.maxKey()
+        i = iter(self._data.buckets)
+        bucket = i.next()
+        try:
+            key = maxKey(bucket)
+        except ValueError:
+            key = None
+        for b in i:
+            try:
+                k = maxKey(b)
+            except ValueError:
+                continue
+            if key is None or k > key:
+                bucket, key = b, k
+        if key is None:
+            raise ValueError(original)
+        return bucket[key]
+
+    def __nonzero__(self):
+        for b in self._data.buckets:
+            try:
+                iter(b).next()
+            except StopIteration:
+                pass
+            else:
+                return True
+        return False
+
+    def __len__(self):
+        return len(self._data)
+
+    def rotate(self):
+        self._data.rotateBucket()
+        self.last_rotation = datetime.datetime.now(pytz.UTC)
+
+
+class Worker(persistent.Persistent):
+    zope.interface.implements(zc.async.interfaces.IWorker)
+
+    def __init__(self, UUID, reactor_size=4, thread_size=1, poll_seconds=5,
+                 ping_interval=datetime.timedelta(minutes=1),
+                 ping_death_interval=datetime.timedelta(seconds=30)):
+        self.reactor = SizedSequence(reactor_size)
+        self.reactor.__parent__ = self
+        self.thread = SizedSequence(thread_size)
+        self.thread.__parent__ = self
+        self.engineUUID = None
+        self.UUID = UUID
+        self.poll_seconds = poll_seconds
+        self.ping_interval = ping_interval
+        self.ping_death_interval = ping_death_interval
+        self.last_ping = datetime.datetime.now(pytz.UTC)
+        self.completed = Completed()
+        self.completed.__parent__ = self


Property changes on: zc.async/trunk/src/zc/async/datamanager.py
___________________________________________________________________
Name: svn:eol-style
   + native

Added: zc.async/trunk/src/zc/async/datamanager.txt
===================================================================
--- zc.async/trunk/src/zc/async/datamanager.txt	2006-08-15 20:31:28 UTC (rev 69534)
+++ zc.async/trunk/src/zc/async/datamanager.txt	2006-08-15 20:33:11 UTC (rev 69535)
@@ -0,0 +1,1222 @@
+The datamanager module contains the queues that zc.async clients use to
+deposit jobs, and the queues that workers use to put jobs they are working on.
+
+The main datamanager object simply has a queue for thread jobs, a queue for
+reactor jobs, and a mapping of workers.  It starts out empty [#setUp]_.
+
+    >>> import zc.async.datamanager
+    >>> dm = root['zc.async.datamanager'] = zc.async.datamanager.DataManager()
+    >>> import transaction
+    >>> transaction.commit()
+    >>> len(dm.workers)
+    0
+    >>> len(dm.thread)
+    0
+    >>> len(dm.reactor)
+    0
+
+As shown in the README.txt of this package, the data manager will typically
+be registered as an adapter to persistent objects that provides
+zc.async.interfaces.IDataManager [#verify]_.  
+
+Workers
+=======
+
+When it is installed, workers register themselves.  Workers typically
+get their UUID from the instanceuuid module in this package, but we will
+generate our own here.
+
+    >>> import uuid
+    >>> worker1 = zc.async.datamanager.Worker(uuid.uuid1())
+    >>> res = dm.workers.add(worker1)
+    >>> dm.workers[worker1.UUID] is worker1
+    True
+    >>> res is worker1
+    True
+
+The `workers` object has a mapping read API, with `items`, `values`, `keys`,
+`__len__`, `__getitem__`, and `get` [#check_workers_mapping]_.  You remove
+workers with their UUID [#check_UUID_equivalence]_.
+
+    >>> dm.workers.remove(worker1.UUID)
+    >>> len(dm.workers)
+    0
+
+Let's add the worker back.  Notice that the __parent__ is None when it is out
+of the workers, but set to the workers object when it is inside.  Since the
+workers object also has a __parent__ reference to its parent, the data manager,
+the worker has a link back to the datamanager.
+
+    >>> worker1.__parent__ # None
+    >>> res = dm.workers.add(worker1)
+    >>> worker1.__parent__ is dm.workers
+    True
+    >>> dm.workers.__parent__ is dm
+    True
+
+Each worker has several other attributes.  We'll look at four now:
+`UUID`, which we have already seen; `thread`, a sequence of the thread
+jobs the worker is working on; `reactor`, a sequence of the reactor jobs
+the worker is working on; and `engineUUID`, a uuid of the engine that is
+in charge of running the worker, if any [#verify_worker]_.
+
+The two sequences are unusual in that they are sized: if len(sequence)
+== size, trying to put another item in the sequence raises
+zc.async.interfaces.FullError.  By default, workers have a reactor size
+of 4, and a thread size of 1.
+
+    >>> worker1.thread.size
+    1
+    >>> worker1.reactor.size
+    4
+    >>> def multiply(*args):
+    ...     res = 1
+    ...     for a in args:
+    ...         res *= a
+    ...     return res
+    ...
+    >>> import zc.async.partial
+    >>> p1 = zc.async.partial.Partial(multiply, 2, 3)
+    >>> res = worker1.thread.add(p1)
+    >>> len(worker1.thread)
+    1
+    >>> p2 = zc.async.partial.Partial(multiply, 5, 6)
+    >>> worker1.thread.add(p2)
+    ... # doctest: +ELLIPSIS
+    Traceback (most recent call last):
+    ...
+    FullError: <zc.async.datamanager.SizedSequence object at ...>
+
+You can change the queue size.
+
+    >>> worker1.thread.size = 2
+    >>> res = worker1.thread.add(p2)
+    >>> len(worker1.thread)
+    2
+
+Decreasing it beyond the current len is acceptable, and will only affect
+how many partials must be removed before new ones may be added.
+
+    >>> worker1.thread.size = 1
+    >>> len(worker1.thread)
+    2
+
+You can also set it during instantiation of a worker: `reactor_size` and
+`thread_size` are optional arguments.
+
+    >>> worker2 = zc.async.datamanager.Worker(uuid.uuid1(), 2, 1)
+    >>> worker2.reactor.size
+    2
+    >>> worker2.thread.size
+    1
+
+We'll add the second worker to the data manager.
+
+    >>> res = dm.workers.add(worker2)
+    >>> len(dm.workers)
+    2
+
+Engines claim workers by putting their UUID on them.  Initially a worker has
+no engineUUID.  We'll assign two (arbitrary) UUIDs.
+
+    >>> worker1.engineUUID
+    >>> worker1.engineUUID = uuid.uuid4()
+    >>> worker2.engineUUID = uuid.uuid4()
+
+This indicates that both workers are "open for business".  A worker without an
+engine is a dead husk.
+
+We'll look at partials in workers more a little later
+[#remove_partials]_.  Next we're going to look at partials in the
+data manager queues.
+
+Partials
+========
+
+Once a Zope has started, it will typically have at least one worker installed,
+with one virtual loop per worker checking the data manager for new jobs (see
+README.txt for integration examples).  Now client code can start requesting
+that partials be done.
+
+Basic Story
+-----------
+
+Simplest use is to get the data manager and add a callable.  As
+mentioned above, and demonstrated in README.txt, the typical way to get
+the data manager is to adapt a persistent context to IDataManager. We'll
+assume we already have the data manager, and that a utility providing
+zc.async.interfaces.IUUID named 'instance' is available
+[#setUp_UUID_utility]_.
+
+    >>> def send_message():
+    ...     print "imagine this sent a message to another machine"
+    ...
+    >>> p = dm.thread.put(send_message)
+
+Now p is a partial wrapping the send_message call.  It is specifically a
+data manager partial [#basic_data_manager_partial_checks]_.
+
+    >>> p.callable is send_message
+    True
+    >>> zc.async.interfaces.IDataManagerPartial.providedBy(p)
+    True
+
+The IDataManagerPartial interface extends IPartial and describes the
+interface needed for a partial added to a data manager.  Here are the
+attributes on the interface.
+
+- Set automatically:
+
+  * assignerUUID (the UUID of the software instance that put the partial in
+    the queue)
+
+  * workerUUID (the UUID of the worker who claimed the partial)
+
+  * thread (None or bool: whether the partial was assigned to a thread (True)
+    or reactor (False) queue)
+
+- Potentially set by user, not honored for callbacks:
+
+  * selectedUUIDs (the UUIDs of workers that should work on the partial, as
+    selected by user)
+
+  * excludedUUIDs (the UUIDs of workers that should not work on the partial,
+    as selected by user)
+
+  * begin_after (a datetime.datetime with pytz.UTC timezone that specifies a
+    date and time to wait till running the partial; defaults to creation
+    time)
+
+  * begin_by (a datetime.timedelta of a duration after begin_after after
+    which workers should call `fail` on the partial; defaults to one hour)
+
+These are described in some more detail on the IDataManagerPartial
+interface.
+
+The thread queue contains the partial.
+
+    >>> len(dm.thread)
+    1
+    >>> list(dm.thread) == [p]
+    True
+
+If you ask the data manager for all due jobs, it also includes the partial.
+
+    >>> list(dm.thread.iterDue()) == [p]
+    True
+
+The partial knows its __parent__ and can be used to obtain its data manager.
+
+    >>> zc.async.interfaces.IDataManager(p) is dm
+    True
+    >>> p.__parent__ is dm.thread
+    True
+
+The easiest for a worker to get a task is to call pullNext, passing its
+UUID. It will get the next available task that does not exclude it (and
+that includes it), removing it from the queue.  If nothing is available,
+return None.
+
+    >>> res = dm.thread.pullNext(worker1.UUID)
+    >>> res is p
+    True
+    >>> len(dm.thread)
+    0
+
+Once a partial has been put in a data manager, it is "claimed": trying to
+put it in another one (or back in the same one) will raise an error.
+
+    >>> dm.thread.put(p)
+    Traceback (most recent call last):
+    ...
+    ValueError: cannot add already-assigned partial
+
+If we remove the assignerUUID, we can put it back in.
+
+    >>> p.assignerUUID = None
+    >>> res = dm.thread.put(p)
+    >>> res is p
+    True
+    >>> len(dm.thread)
+    1
+    >>> transaction.commit()
+
+In normal behavior, after client code has put the task in the thread queue,
+an engine (associated with a persistent worker in a one-to-one relationship,
+in which the worker is the persistent store for the transient, per-process
+engine) will claim and perform it like this (we'll do this from the
+perspective of worker 1).
+
+    >>> trans = transaction.begin()
+    >>> import Queue
+    >>> thread_queue = Queue.Queue(0)
+    >>> claimed = dm.workers[worker1.UUID].thread
+    >>> ct = 0
+    >>> while 1:
+    ...     if len(claimed) < claimed.size:
+    ...         next = dm.thread.pullNext(worker1.UUID)
+    ...         if next is not None:
+    ...             claimed.add(next)
+    ...             database_name = next._p_jar.db().database_name
+    ...             identifier = next._p_oid
+    ...             try:
+    ...                 transaction.commit()
+    ...             except ZODB.POSException.TransactionError:
+    ...                 transaction.abort()
+    ...                 ct += 1
+    ...                 if ct < 5: # in twisted, this would probably callLater
+    ...                     continue
+    ...             else:
+    ...                 thread_queue.put((database_name, identifier))
+    ...                 ct = 0
+    ...                 continue # in twisted, this would probably callLater
+    ...     break
+    ... # doctest: +ELLIPSIS
+    ...
+    <zc.async.adapters.DataManagerPartial object at ...>
+
+Now the worker 1 has claimed it.  
+
+    >>> len(worker1.thread)
+    1
+
+A thread in that worker will begin it,
+given the database name and _p_oid of the partial it should perform, and will
+do something like this.
+
+    >>> import thread
+    >>> database_name, identifier = thread_queue.get(False)
+    >>> claimed = dm.workers[worker1.UUID].thread # this would actually open a
+    ... # connection and get the worker thread queue object by id
+    >>> for p in claimed:
+    ...     if (p._p_oid == identifier and
+    ...         p._p_jar.db().database_name == database_name):
+    ...         p.thread = thread.get_ident()
+    ...         transaction.commit()
+    ...         try:
+    ...             p()
+    ...         except ZODB.POSException.TransactionError:
+    ...             transaction.abort()
+    ...             p.fail()
+    ...         while 1:
+    ...             try:
+    ...                 claimed.remove(p)
+    ...                 claimed.__parent__.completed.add(p)
+    ...                 transaction.commit()
+    ...             except ZODB.POSException.TransactionError:
+    ...                 transaction.abort() # retry forever!
+    ...             else:
+    ...                 break
+    ...         break
+    ...
+    imagine this sent a message to another machine
+
+And look, there's our message: the partial was called.
+
+The worker's thread list is empty, and the partial has a note of what thread
+ran it.
+
+    >>> len(worker1.thread)
+    0
+    >>> p.thread == thread.get_ident()
+    True
+
+Notice also that the `completed` container now contains the partial.
+
+    >>> len(worker1.completed)
+    1
+    >>> list(worker1.completed) == [p]
+    True
+
+The API of the completed container is still in flux [#test_completed]_.
+
+For Reactors
+------------
+
+If you are into Twisted programming, use the reactor queue.  The story is
+very similar, so we'll go a bit quicker.
+
+    >>> import twisted.internet.defer
+    >>> import twisted.internet.reactor
+    >>> def twistedPartDeux(d):
+    ...     d.callback(42)
+    ...
+    >>> def doSomethingInTwisted():
+    ...     d = twisted.internet.defer.Deferred()
+    ...     twisted.internet.reactor.callLater(0, twistedPartDeux, d)
+    ...     return d
+    ...
+    >>> p = dm.reactor.put(doSomethingInTwisted)
+    >>> def arbitraryThingThatNeedsAConnection(folder, result):
+    ...     folder['result'] = result
+    ...
+    >>> p_callback = p.addCallbacks(zc.async.partial.Partial(
+    ...     arbitraryThingThatNeedsAConnection, root))
+    >>> transaction.commit()
+
+The engine might do something like this [#set_up_reactor]_.
+
+    >>> import zc.twist
+    >>> def remove(container, partial, result):
+    ...     container.remove(partial)
+    ...
+    >>> def perform(p):
+    ...     res = p()
+    ...     p.addCallback(zc.async.partial.Partial(
+    ...         remove, p.__parent__, p))
+    ...     transaction.commit()
+    ...
+    >>> trans = transaction.begin()
+    >>> claimed = dm.workers[worker2.UUID].reactor
+    >>> ct = 0
+    >>> while 1:
+    ...     if len(claimed) < claimed.size:
+    ...         next = dm.reactor.pullNext(worker2.UUID)
+    ...         if next is not None:
+    ...             claimed.add(next)
+    ...             partial = zc.twist.Partial(perform, next)
+    ...             try:
+    ...                 transaction.commit()
+    ...             except ZODB.POSException.TransactionError:
+    ...                 transaction.abort()
+    ...                 ct += 1
+    ...                 if ct < 5: # this would probably callLater really
+    ...                     continue
+    ...             else:
+    ...                 twisted.internet.reactor.callLater(0, partial)
+    ...                 ct = 0
+    ...                 continue # this would probably callLater really
+    ...     break
+    ... # doctest: +ELLIPSIS
+    ...
+    <zc.async.adapters.DataManagerPartial object at ...>
+
+Then the reactor would churn, and eventually we'd get our result.  The
+execution should be something like this (where `time_passes` represents
+one tick of the Twisted reactor that you would normally not have to call
+explicitly--this is just for demonstration purposes).
+
+    >>> time_passes() # perform and doSomethingInTwisted
+    True
+    >>> trans = transaction.begin()
+    >>> p.result # None
+    >>> len(worker2.reactor)
+    1
+    >>> time_passes() # twistedPartDeux and arbitraryThingThatNeedsAConnection
+    True
+    >>> trans = transaction.begin()
+    >>> p.result
+    42
+    >>> root['result']
+    42
+    >>> p.state == zc.async.interfaces.COMPLETED
+    True
+    >>> len(worker2.reactor)
+    0
+
+[#tear_down_reactor]_
+
+Held Calls
+----------
+
+Both of the examples so far request that jobs be done as soon as possible.
+It's also possible to request that jobs be done later.  Let's assume we
+can control the current time generated by datetime.datetime.now with a
+`set_now` callable [#set_up_datetime]_.  A partial added without any special
+calls gets a `begin_after` attribute of now.
+
+    >>> import datetime
+    >>> import pytz
+    >>> datetime.datetime.now(pytz.UTC) 
+    datetime.datetime(2006, 8, 10, 15, 44, 22, 211, tzinfo=<UTC>)
+    >>> res1 = dm.thread.put(
+    ...     zc.async.partial.Partial(multiply, 3, 6))
+    ...
+    >>> res1.begin_after
+    datetime.datetime(2006, 8, 10, 15, 44, 22, 211, tzinfo=<UTC>)
+
+This means that it's immediately ready to be performed.  `iterDue` shows this.
+
+    >>> list(dm.thread.iterDue()) == [res1]
+    True
+
+You can also specify a begin_after date when you make the call.  Then it
+isn't due immediately.
+
+    >>> res2 = dm.thread.put(
+    ...     zc.async.partial.Partial(multiply, 4, 6),
+    ...     datetime.datetime(2006, 8, 10, 16, tzinfo=pytz.UTC))
+    ...
+    >>> len(dm.thread)
+    2
+    >>> res2.begin_after
+    datetime.datetime(2006, 8, 10, 16, 0, tzinfo=<UTC>)
+    >>> list(dm.thread.iterDue()) == [res1]
+    True
+
+When the time passes, it is available.  Partials are ordered by their
+begin_after dates.
+
+    >>> set_now(datetime.datetime(2006, 8, 10, 16, 0, 0, 1, tzinfo=pytz.UTC))
+    >>> list(dm.thread.iterDue()) == [res1, res2]
+    True
+
+Pre-dating (before now) makes the item come first (or in order with other
+pre-dated items.
+
+    >>> res3 = dm.thread.put(
+    ...     zc.async.partial.Partial(multiply, 5, 6),
+    ...     begin_after=datetime.datetime(2006, 8, 10, 15, 35, tzinfo=pytz.UTC))
+    ...
+    >>> res3.begin_after
+    datetime.datetime(2006, 8, 10, 15, 35, tzinfo=<UTC>)
+    >>> list(dm.thread) == [res3, res1, res2]
+    True
+    >>> list(dm.thread.iterDue()) == [res3, res1, res2]
+    True
+
+Other timezones are normalized to UTC.
+
+    >>> res4 = dm.thread.put(
+    ...     zc.async.partial.Partial(multiply, 6, 6),
+    ...     pytz.timezone('EST').localize(
+    ...         datetime.datetime(2006, 8, 10, 11, 30)))
+    ...
+    >>> res4.begin_after
+    datetime.datetime(2006, 8, 10, 16, 30, tzinfo=<UTC>)
+    >>> list(dm.thread.iterDue()) == [res3, res1, res2]
+    True
+
+Naive timezones are not allowed.
+
+    >>> dm.thread.put(send_message, datetime.datetime(2006, 8, 10, 16, 15))
+    Traceback (most recent call last):
+    ...
+    ValueError: cannot use timezone-naive values
+
+Iteration, again, is based on begin_after, not the order added.
+
+    >>> res5 = dm.thread.put(
+    ...     zc.async.partial.Partial(multiply, 7, 6),
+    ...     datetime.datetime(2006, 8, 10, 16, 15, tzinfo=pytz.UTC))
+    ...
+    >>> list(dm.thread) == [res3, res1, res2, res5, res4]
+    True
+    >>> list(dm.thread.iterDue()) == [res3, res1, res2]
+    True
+
+pullNext only returns items that are due.
+
+    >>> dm.thread.pullNext(worker1.UUID) == res3
+    True
+    >>> dm.thread.pullNext(worker1.UUID) == res1
+    True
+    >>> dm.thread.pullNext(worker1.UUID) == res2
+    True
+    >>> dm.thread.pullNext(worker1.UUID) # None
+
+When it is due, begin_after also affects pullNext.
+
+    >>> set_now(datetime.datetime(2006, 8, 10, 16, 31, tzinfo=pytz.UTC))
+    >>> dm.thread.pullNext(worker1.UUID) == res5
+    True
+    >>> dm.thread.pullNext(worker1.UUID) == res4
+    True
+    >>> dm.thread.pullNext(worker1.UUID) # None
+
+Selecting and Excluding Workers
+-------------------------------
+
+Some use cases want to limit the workers that can perform a given partial,
+either by explicitly selecting or excluding certain workers.  Here are some of
+those use cases:
+
+- You may want to divide up your workers by tasks: certain long running tasks
+  should only tie up one set of workers, so short tasks that users expect
+  more responsiveness from can use any available worker, including some that
+  are reserved for them.
+
+- You may only have the system resources necessary to perform a given task on
+  a certain set of your workers.
+
+- You may want to broadcast a message to all workers, so you need to generate
+  a task specifically for each.  This would be an interesting way to build
+  a simple but potentially powerful WSGI reverse proxy that supported
+  invalidations, for instance.
+
+There are probably more, and even these have interesting variants.  For
+instance, for the second, what if you don't know which workers are
+appropriate and need to test to find out?  You could write a partial
+that contains a partial.  The outer partial's job is to find a worker
+with an environment appropriate for the inner one.  When it runs, if the
+worker's environment is appropriate, it performs the inner partial.  If
+it is not, it creates a new outer partial wrapping its inner partial,
+specifies the current worker UUID as excluded, and schedules it to be
+called.  If the next worker is also inappropriate, in creates a third
+outer wrapper that excludes both of the failed workers' UUIDs...and so
+on.
+
+To do this, you use worker UUIDs in a partial's selectedUUIDs and
+excludedUUIDs sets.  An empty selectedUUIDs set is interpreted as a
+catch-all.  For a worker to be able to perform a partial, it must not
+be in the excludedUUIDs and either selectedUUIDs is empty or it is within
+selectedUUIDs.
+
+Let's look at some examples.  We'll assume the five partials we looked at
+above are all back in the dm.threads [#reinstate]_.  We've already looked
+at partials with empty sets for excludedUUIDs and selectedUUIDs: every
+partial we've shown up to now has fit that description.
+
+(order is [res3, res1, res2, res5, res4])
+
+    >>> res3.selectedUUIDs.add(uuid.uuid1()) # nobody here
+    >>> res1.selectedUUIDs.add(worker1.UUID)
+    >>> res2.selectedUUIDs.update((worker1.UUID, worker2.UUID))
+    >>> res2.excludedUUIDs.add(worker2.UUID)
+    >>> res5.excludedUUIDs.add(worker2.UUID)
+    >>> res4.excludedUUIDs.update((worker2.UUID, worker1.UUID))
+
+Now poor worker2 can't get any work.
+
+    >>> dm.thread.pullNext(worker2.UUID) # None
+
+worker1 can get three of them: res1, res2, and res 5.
+
+    >>> dm.thread.pullNext(worker1.UUID) is res1
+    True
+    >>> dm.thread.pullNext(worker1.UUID) is res2
+    True
+    >>> dm.thread.pullNext(worker1.UUID) is res5
+    True
+    >>> dm.thread.pullNext(worker1.UUID) # None
+
+Now we have two jobs that can never be claimed (as long as we have only
+these two workers and the selected/excluded values are not changed. 
+What happens to them?
+
+Never-Claimed Calls
+-------------------
+
+Sometimes, due to impossible worker selections or exclusions, or simply
+workers that are too busy, we need to give up and say that a given partial
+will not be run, and should fail.  Partials have a begin_by attribute which
+controls approximately when this should happen.  The begin_by value should
+be a non-negative datetime.timedelta, which is added to the begin_after value
+to determine when the partial should fail.  If you put a partial in a
+data manager with no begin_by value set, the data manager sets it to one hour.
+
+    >>> res3.begin_by
+    datetime.timedelta(0, 3600)
+
+This can be changed.
+
+    >>> res3.begin_by = datetime.timedelta(hours=2)
+
+So how are they cancelled?  `pullNext` will wrap any expired partial with
+another partial that calls the inner `fail` method.  This will be handed to
+any worker.
+
+    >>> res3.begin_after + res3.begin_by
+    datetime.datetime(2006, 8, 10, 17, 35, tzinfo=<UTC>)
+    >>> res4.begin_after + res4.begin_by
+    datetime.datetime(2006, 8, 10, 17, 30, tzinfo=<UTC>)
+    >>> set_now(datetime.datetime(2006, 8, 10, 17, 32, tzinfo=pytz.UTC))
+    >>> p = dm.thread.pullNext(worker1.UUID)
+    >>> p()
+    >>> res4.state == zc.async.interfaces.COMPLETED
+    True
+    >>> print res4.result.getTraceback()
+    ... # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
+    Traceback (most recent call last):
+    ...
+    zc.async.interfaces.AbortedError:
+    >>> dm.thread.pullNext(worker1.UUID) # None
+
+    >>> set_now(datetime.datetime(2006, 8, 10, 17, 37, tzinfo=pytz.UTC))
+    >>> p = dm.thread.pullNext(worker1.UUID)
+    >>> p()
+    >>> res3.state == zc.async.interfaces.COMPLETED
+    True
+    >>> print res3.result.getTraceback()
+    ... # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
+    Traceback (most recent call last):
+    ...
+    zc.async.interfaces.AbortedError:
+    >>> dm.thread.pullNext(worker1.UUID) # None
+    >>> len(dm.thread)
+    0
+
+Dead Workers
+------------
+
+What happens when an engine, driving a worker, dies?  If it is the only
+engine/worker, that's the end: when the worker restarts it should clean
+out its old worker object and then proceed.  But what if there are more
+than one simultaneous worker?  How do we know to clean out the dead
+workers jobs?
+
+In addition to the jobs listed above, each worker virtual main loop has
+an additional task: be his brother's keeper.  Each must update a ping
+date on its worker object at a given maximum interval, and check the
+next sibling.  To support this story, the worker objects have a few more
+attributes we haven't talked about: `poll_seconds`, `ping_interval`,
+`ping_death_interval` and `last_ping`.
+
+    >>> worker1.poll_seconds
+    5
+    >>> worker1.ping_interval
+    datetime.timedelta(0, 60)
+    >>> worker1.ping_death_interval
+    datetime.timedelta(0, 30)
+
+`pullNext` on a thread queue will return a partial to clean a worker when the
+next highest one by UUID (circling around to the lowest one when the uuid
+is the highest).  So let's set a ping on worker1.
+
+    >>> worker1.last_ping = datetime.datetime.now(pytz.UTC)
+    >>> worker1.last_ping
+    datetime.datetime(2006, 8, 10, 17, 37, tzinfo=<UTC>)
+
+Let's put res1, res2, res3, res4, and res5 in worker1.
+
+    >>> len(worker1.thread)
+    0
+    >>> worker1.thread.size = 3
+    >>> res1.excludedUUIDs.clear()
+    >>> res1.selectedUUIDs.clear()
+    >>> r = worker1.thread.add(res1) # PENDING
+    >>> res2._state = zc.async.interfaces.ACTIVE
+    >>> r = worker1.reactor.add(res2)
+    >>> r = worker1.thread.add(res3) # COMPLETED
+    >>> r = worker1.reactor.add(res4) # COMPLETED
+    >>> res5._state = zc.async.interfaces.CALLBACKS
+    >>> res5._result = res5.callable(*res5.args, **dict(res5.kwargs))
+    >>> r = worker1.thread.add(res5)
+
+While we are still within our acceptable time period, the `checkSibling`
+method will not do anything.
+
+    >>> len(dm.workers)
+    2
+    >>> len(dm.thread)
+    0
+    >>> set_now(worker1.last_ping + worker1.ping_interval)
+    >>> dm.checkSibling(worker2.UUID)
+    >>> len(dm.workers)
+    2
+    >>> len(dm.thread)
+    0
+
+We need to move now to after last_ping + ping_interval +
+ping_death_interval. Now when worker2 calls checkSibling on the data
+manager, worker1 will have engineUUID set to None, and a partial will be
+added to clean out the partials in worker 1.
+
+    >>> set_now(worker1.last_ping + worker1.ping_interval +
+    ...         worker1.ping_death_interval + datetime.timedelta(seconds=1))
+    >>> worker1.engineUUID is not None
+    True
+    >>> worker2.engineUUID is not None
+    True
+    >>> dm.checkSibling(worker2.UUID)
+    >>> len(dm.workers)
+    2
+    >>> worker1.engineUUID is not None
+    False
+    >>> worker2.engineUUID is not None
+    True
+    >>> len(dm.thread)
+    1
+
+So worker2 can get the job and perform it.
+
+    >>> res = dm.thread.pullNext(worker2.UUID)
+    >>> partial = worker2.thread.add(res)
+    >>> len(worker1.thread)
+    3
+    >>> len(worker1.reactor)
+    2
+    >>> res()
+    >>> len(worker1.thread)
+    0
+    >>> len(worker1.reactor)
+    0
+    >>> r = dm.thread.pullNext(worker2.UUID)()
+    >>> r = dm.thread.pullNext(worker2.UUID)()
+    >>> r = dm.reactor.pullNext(worker2.UUID)()
+    >>> dm.thread.pullNext(worker2.UUID) # None
+    >>> dm.reactor.pullNext(worker2.UUID) # None
+    
+    >>> res1.state == zc.async.interfaces.COMPLETED
+    True
+    >>> res2.state == zc.async.interfaces.COMPLETED
+    True
+    >>> res3.state == zc.async.interfaces.COMPLETED
+    True
+    >>> res4.state == zc.async.interfaces.COMPLETED
+    True
+    >>> res5.state == zc.async.interfaces.COMPLETED
+    True
+
+If you have multiple workers, it is strongly suggested that you get the
+associated servers connected to a shared time server.
+
+[#tear_down_datetime]_
+
+=========
+Footnotes
+=========
+
+.. [#setUp] We'll actually create the state that the text needs here.
+
+    >>> from ZODB.tests.util import DB
+    >>> db = DB()
+    >>> conn = db.open()
+    >>> root = conn.root()
+
+    You must have two adapter registrations: IConnection to
+    ITransactionManager, and IPersistent to IConnection.  We will also
+    register IPersistent to ITransactionManager because the adapter is
+    designed for it.
+
+    >>> from zc.twist import transactionManager, connection
+    >>> import zope.component
+    >>> zope.component.provideAdapter(transactionManager)
+    >>> zope.component.provideAdapter(connection)
+    >>> import ZODB.interfaces
+    >>> zope.component.provideAdapter(
+    ...     transactionManager, adapts=(ZODB.interfaces.IConnection,))
+
+    We need to be able to get data manager partials for functions and methods;
+    normal partials for functions and methods; and a data manager for a partial.
+    Here are the necessary registrations.
+
+    >>> import zope.component
+    >>> import types
+    >>> import zc.async.interfaces
+    >>> import zc.async.partial
+    >>> import zc.async.adapters
+    >>> zope.component.provideAdapter(
+    ...     zc.async.adapters.method_to_datamanagerpartial)
+    >>> zope.component.provideAdapter(
+    ...     zc.async.adapters.function_to_datamanagerpartial)
+    >>> zope.component.provideAdapter( # partial -> datamanagerpartial
+    ...     zc.async.adapters.DataManagerPartial,
+    ...     provides=zc.async.interfaces.IDataManagerPartial)
+    >>> zope.component.provideAdapter(
+    ...     zc.async.adapters.partial_to_datamanager)
+    >>> zope.component.provideAdapter(
+    ...     zc.async.partial.Partial,
+    ...     adapts=(types.FunctionType,),
+    ...     provides=zc.async.interfaces.IPartial)
+    >>> zope.component.provideAdapter(
+    ...     zc.async.partial.Partial,
+    ...     adapts=(types.MethodType,),
+    ...     provides=zc.async.interfaces.IPartial)
+    ...
+
+.. [#verify] Verify data manager interface.
+
+    >>> from zope.interface.verify import verifyObject
+    >>> verifyObject(zc.async.interfaces.IDataManager, dm)
+    True
+    >>> verifyObject(zc.async.interfaces.IPartialQueue, dm.thread)
+    True
+    >>> verifyObject(zc.async.interfaces.IPartialQueue, dm.reactor)
+    True
+    >>> verifyObject(zc.async.interfaces.IWorkers, dm.workers)
+    True
+
+.. [#check_workers_mapping]
+
+    >>> len(dm.workers)
+    1
+    >>> list(dm.workers.keys()) == [worker1.UUID]
+    True
+    >>> list(dm.workers) == [worker1.UUID]
+    True
+    >>> list(dm.workers.values()) == [worker1]
+    True
+    >>> list(dm.workers.items()) == [(worker1.UUID, worker1)]
+    True
+    >>> dm.workers.get(worker1.UUID) is worker1
+    True
+    >>> dm.workers.get(2) is None
+    True
+    >>> dm.workers[worker1.UUID] is worker1
+    True
+    >>> dm.workers[2]
+    Traceback (most recent call last):
+    ...
+    KeyError: 2
+
+.. [#check_UUID_equivalence] This is paranoid--it should be the responsibility
+    of the uuid.UUID class--but we'll check it anyway.
+
+     >>> equivalent_UUID = uuid.UUID(bytes=worker1.UUID.bytes)
+     >>> dm.workers[equivalent_UUID] is worker1
+     True
+     >>> dm.workers.remove(equivalent_UUID)
+     >>> len(dm.workers)
+     0
+     >>> res = dm.workers.add(worker1)
+
+.. [#verify_worker]
+
+    >>> verifyObject(zc.async.interfaces.IWorker, worker1)
+    True
+    >>> verifyObject(zc.async.interfaces.ISizedSequence, worker1.thread)
+    True
+    >>> verifyObject(zc.async.interfaces.ISizedSequence, worker1.reactor)
+    True
+    >>> isinstance(worker1.UUID, uuid.UUID)
+    True
+
+.. [#remove_partials] We can remove our partials from a worker with
+    `remove`.
+
+    >>> worker1.thread.remove(p1)
+    >>> len(worker1.thread)
+    1
+    >>> list(worker1.thread) == [p2]
+    True
+    >>> worker1.thread.remove(p2)
+    >>> len(worker1.thread)
+    0
+
+    The remove method of the worker thread and reactor sequences
+    raises IndexError if you ask for the index of something that isn't
+    contained.
+
+    >>> worker1.thread.remove((2, 4)) # an iterable can surprise some
+    ... # naive string replacements, so we use this to verify we didn't
+    ... # fall into that trap.
+    Traceback (most recent call last):
+    ...
+    ValueError: (2, 4) not in queue
+
+.. [#setUp_UUID_utility] We need to provide an IUUID utility that
+    identifies the current instance.
+
+    >>> import uuid
+    >>> zope.interface.classImplements(uuid.UUID, zc.async.interfaces.IUUID)
+    >>> zope.component.provideUtility(
+    ...     worker1.UUID, zc.async.interfaces.IUUID, 'instance')
+
+    Normally this would be the UUID instance in zc.async.instanceuuid.
+
+    While we're at it, we'll get "now" so we can compare it in the footnote
+    below.
+
+    >>> import datetime
+    >>> import pytz
+    >>> _before = datetime.datetime.now(pytz.UTC)
+
+.. [#basic_data_manager_partial_checks] Even though the functionality checks
+    belong elsewhere, here are a few default checks for the values.
+
+    >>> verifyObject(zc.async.interfaces.IDataManagerPartial, p)
+    True
+    >>> p.workerUUID # None
+    >>> isinstance(p.assignerUUID, uuid.UUID)
+    True
+    >>> p.selectedUUIDs
+    zc.set.Set([])
+    >>> p.excludedUUIDs
+    zc.set.Set([])
+    >>> _before <= p.begin_after <= datetime.datetime.now(pytz.UTC)
+    True
+    >>> p.begin_by
+    datetime.timedelta(0, 3600)
+    >>> p.thread # None
+
+.. [#test_completed] Here are some nitty-gritty tests of the completed
+    container.
+
+    >>> verifyObject(zc.async.interfaces.ICompletedCollection,
+    ...              worker1.completed)
+    True
+    >>> bool(worker1.completed)
+    True
+    >>> len(worker2.completed)
+    0
+    >>> bool(worker2.completed)
+    False
+    >>> list(worker1.completed.iter()) == [p]
+    True
+    >>> list(worker1.completed.iter(p.begin_after)) == [p]
+    True
+    >>> list(worker1.completed.iter(
+    ...     p.begin_after - datetime.timedelta(seconds=1))) == []
+    True
+    >>> list(worker2.completed) == []
+    True
+    >>> list(worker2.completed.iter()) == []
+    True
+    >>> worker1.completed.first() is p
+    True
+    >>> worker1.completed.last() is p
+    True
+    >>> worker2.completed.first()
+    Traceback (most recent call last):
+    ...
+    ValueError: None
+    >>> worker2.completed.last()
+    Traceback (most recent call last):
+    ...
+    ValueError: None
+    >>> worker1.completed.rotate()
+    >>> worker1.completed.first() is p
+    True
+    >>> worker1.completed.rotate()
+    >>> worker1.completed.first() is p
+    True
+    >>> worker1.completed.rotate()
+    >>> worker1.completed.first() is p
+    True
+    >>> worker1.completed.rotate()
+    >>> worker1.completed.first() is p
+    True
+    >>> worker1.completed.rotate()
+    >>> worker1.completed.first() is p
+    True
+    >>> worker1.completed.rotate()
+    >>> worker1.completed.first()
+    Traceback (most recent call last):
+    ...
+    ValueError: None
+
+    Let's look at the completed collection with a few more partials in it.
+    The rotation means we can look at its behavior as the underlying buckets
+    are rotated out.
+
+    >>> root['coll_p0'] = zc.async.interfaces.IDataManagerPartial(send_message)
+    >>> root['coll_p1'] = zc.async.interfaces.IDataManagerPartial(send_message)
+    >>> root['coll_p2'] = zc.async.interfaces.IDataManagerPartial(send_message)
+    >>> root['coll_p3'] = zc.async.interfaces.IDataManagerPartial(send_message)
+    >>> root['coll_p4'] = zc.async.interfaces.IDataManagerPartial(send_message)
+    >>> root['coll_p5'] = zc.async.interfaces.IDataManagerPartial(send_message)
+    >>> root['coll_p6'] = zc.async.interfaces.IDataManagerPartial(send_message)
+    >>> root['coll_p7'] = zc.async.interfaces.IDataManagerPartial(send_message)
+    >>> root['coll_p8'] = zc.async.interfaces.IDataManagerPartial(send_message)
+    >>> root['coll_p9'] = zc.async.interfaces.IDataManagerPartial(send_message)
+    >>> root['coll_p0'].begin_after = datetime.datetime(
+    ...     2006, 1, 1, tzinfo=pytz.UTC)
+    >>> root['coll_p1'].begin_after = datetime.datetime(
+    ...     2006, 2, 1, tzinfo=pytz.UTC)
+    >>> root['coll_p2'].begin_after = datetime.datetime(
+    ...     2006, 3, 1, tzinfo=pytz.UTC)
+    >>> root['coll_p3'].begin_after = datetime.datetime(
+    ...     2006, 4, 1, tzinfo=pytz.UTC)
+    >>> root['coll_p4'].begin_after = datetime.datetime(
+    ...     2006, 5, 1, tzinfo=pytz.UTC)
+    >>> root['coll_p5'].begin_after = datetime.datetime(
+    ...     2006, 6, 1, tzinfo=pytz.UTC)
+    >>> root['coll_p6'].begin_after = datetime.datetime(
+    ...     2006, 7, 1, tzinfo=pytz.UTC)
+    >>> root['coll_p7'].begin_after = datetime.datetime(
+    ...     2006, 8, 1, tzinfo=pytz.UTC)
+    >>> root['coll_p8'].begin_after = datetime.datetime(
+    ...     2006, 8, 2, tzinfo=pytz.UTC)
+    >>> root['coll_p9'].begin_after = datetime.datetime(
+    ...     2006, 8, 3, tzinfo=pytz.UTC)
+    >>> transaction.commit()
+    >>> worker1.completed.add(root['coll_p8'])
+    >>> worker1.completed.add(root['coll_p6'])
+    >>> worker1.completed.rotate()
+    >>> worker1.completed.rotate()
+    >>> worker1.completed.add(root['coll_p4'])
+    >>> worker1.completed.add(root['coll_p2'])
+    >>> worker1.completed.rotate()
+    >>> worker1.completed.add(root['coll_p0'])
+    >>> worker1.completed.add(root['coll_p1'])
+    >>> worker1.completed.rotate()
+    >>> worker1.completed.add(root['coll_p3'])
+    >>> worker1.completed.add(root['coll_p5'])
+    >>> worker1.completed.rotate()
+    >>> worker1.completed.add(root['coll_p7'])
+    >>> worker1.completed.add(root['coll_p9'])
+    >>> list(worker1.completed) == [
+    ...     root['coll_p9'], root['coll_p8'], root['coll_p7'], root['coll_p6'],
+    ...     root['coll_p5'], root['coll_p4'], root['coll_p3'], root['coll_p2'],
+    ...     root['coll_p1'], root['coll_p0']]
+    True
+    >>> len(worker1.completed)
+    10
+
+    The `iter` method can simply work like __iter__, but can also take starts
+    and stops for relatively efficient jumps.
+
+    >>> list(worker1.completed.iter()) == [
+    ...     root['coll_p9'], root['coll_p8'], root['coll_p7'], root['coll_p6'],
+    ...     root['coll_p5'], root['coll_p4'], root['coll_p3'], root['coll_p2'],
+    ...     root['coll_p1'], root['coll_p0']]
+    True
+    >>> list(worker1.completed.iter(start=datetime.datetime(
+    ...     2006, 7, 15, tzinfo=pytz.UTC))) == [
+    ...     root['coll_p6'],
+    ...     root['coll_p5'], root['coll_p4'], root['coll_p3'], root['coll_p2'],
+    ...     root['coll_p1'], root['coll_p0']]
+    True
+    >>> list(worker1.completed.iter(start=datetime.datetime(
+    ...     2006, 7, 1, tzinfo=pytz.UTC))) == [
+    ...     root['coll_p6'],
+    ...     root['coll_p5'], root['coll_p4'], root['coll_p3'], root['coll_p2'],
+    ...     root['coll_p1'], root['coll_p0']]
+    True
+    >>> list(worker1.completed.iter(stop=datetime.datetime(
+    ...     2006, 7, 15, tzinfo=pytz.UTC))) == [
+    ...     root['coll_p9'], root['coll_p8'], root['coll_p7']]
+    True
+    >>> list(worker1.completed.iter(stop=datetime.datetime(
+    ...     2006, 7, 1, tzinfo=pytz.UTC))) == [
+    ...     root['coll_p9'], root['coll_p8'], root['coll_p7']]
+    True
+    >>> list(worker1.completed.iter(stop=datetime.datetime(
+    ...     2006, 6, 30, tzinfo=pytz.UTC))) == [
+    ...     root['coll_p9'], root['coll_p8'], root['coll_p7'], root['coll_p6']]
+    True
+    >>> list(worker1.completed.iter(start=datetime.datetime(
+    ...     2006, 7, 1, tzinfo=pytz.UTC), stop=datetime.datetime(
+    ...     2006, 3, 1, tzinfo=pytz.UTC))) == [
+    ...     root['coll_p6'], root['coll_p5'], root['coll_p4'], root['coll_p3']]
+    True
+    
+    `first` and `last` give you the ability to find limits including given
+    start and stop points, respectively.
+    
+    >>> worker1.completed.first() == root['coll_p9']
+    True
+    >>> worker1.completed.last() == root['coll_p0']
+    True
+    >>> worker1.completed.first(
+    ...     datetime.datetime(2006, 7, 15, tzinfo=pytz.UTC)) == (
+    ...     root['coll_p6'])
+    True
+    >>> worker1.completed.last(
+    ...     datetime.datetime(2006, 7, 15, tzinfo=pytz.UTC)) == (
+    ...     root['coll_p7'])
+    True
+
+    As you rotate the completed container, older items disappear.
+
+    >>> worker1.completed.rotate()
+    >>> list(worker1.completed.iter()) == [
+    ...     root['coll_p9'], root['coll_p7'],
+    ...     root['coll_p5'], root['coll_p4'], root['coll_p3'], root['coll_p2'],
+    ...     root['coll_p1'], root['coll_p0']]
+    True
+    >>> worker1.completed.rotate() # no change
+    >>> list(worker1.completed.iter()) == [
+    ...     root['coll_p9'], root['coll_p7'],
+    ...     root['coll_p5'], root['coll_p4'], root['coll_p3'], root['coll_p2'],
+    ...     root['coll_p1'], root['coll_p0']]
+    True
+    >>> worker1.completed.rotate()
+    >>> list(worker1.completed.iter()) == [
+    ...     root['coll_p9'], root['coll_p7'],
+    ...     root['coll_p5'], root['coll_p3'],
+    ...     root['coll_p1'], root['coll_p0']]
+    True
+    >>> worker1.completed.rotate()
+    >>> list(worker1.completed.iter()) == [
+    ...     root['coll_p9'], root['coll_p7'],
+    ...     root['coll_p5'], root['coll_p3']]
+    True
+    >>> worker1.completed.rotate()
+    >>> list(worker1.completed.iter()) == [
+    ...     root['coll_p9'], root['coll_p7']]
+    True
+    >>> worker1.completed.rotate()
+    >>> list(worker1.completed.iter()) == []
+    True
+    >>> transaction.commit()
+
+.. [#set_up_reactor] We monkeypatch twisted.internet.reactor
+    (and replace it below).
+
+    >>> import twisted.internet.reactor
+    >>> oldCallLater = twisted.internet.reactor.callLater
+    >>> import bisect
+    >>> class FauxReactor(object):
+    ...     def __init__(self):
+    ...         self.time = 0
+    ...         self.calls = []
+    ...     def callLater(self, delay, callable, *args, **kw):
+    ...         res = (delay + self.time, callable, args, kw)
+    ...         bisect.insort(self.calls, res)
+    ...         # normally we're supposed to return something but not needed
+    ...     def time_flies(self, time):
+    ...         end = self.time + time
+    ...         ct = 0
+    ...         while self.calls and self.calls[0][0] <= end:
+    ...             self.time, callable, args, kw = self.calls.pop(0)
+    ...             callable(*args, **kw) # normally this would get try...except
+    ...             ct += 1
+    ...         self.time = end
+    ...         return ct
+    ...     def time_passes(self):
+    ...         if self.calls and self.calls[0][0] <= self.time:
+    ...             self.time, callable, args, kw = self.calls.pop(0)
+    ...             callable(*args, **kw)
+    ...             return True
+    ...         return False
+    ...
+    >>> faux = FauxReactor()
+    >>> twisted.internet.reactor.callLater = faux.callLater
+    >>> time_flies = faux.time_flies
+    >>> time_passes = faux.time_passes
+
+.. [#tear_down_reactor]
+
+    >>> twisted.internet.reactor.callLater = oldCallLater
+
+.. [#set_up_datetime] A monkeypatch, removed in another footnote below.
+
+    >>> import datetime
+    >>> import pytz
+    >>> old_datetime = datetime.datetime
+    >>> def set_now(dt):
+    ...     global _now
+    ...     _now = _datetime(*dt.__reduce__()[1])
+    ...
+    >>> class _datetime(old_datetime):
+    ...     @classmethod
+    ...     def now(klass, tzinfo=None):
+    ...         if tzinfo is None:
+    ...             return _now.replace(tzinfo=None)
+    ...         else:
+    ...             return _now.astimezone(tzinfo)
+    ...     def astimezone(self, tzinfo):
+    ...         return _datetime(
+    ...             *super(_datetime,self).astimezone(tzinfo).__reduce__()[1])
+    ...     def replace(self, *args, **kwargs):
+    ...         return _datetime(
+    ...             *super(_datetime,self).replace(
+    ...                 *args, **kwargs).__reduce__()[1])
+    ...     def __repr__(self):
+    ...         raw = super(_datetime, self).__repr__()
+    ...         return "datetime.datetime%s" % (
+    ...             raw[raw.index('('):],)
+    ...     def __reduce__(self):
+    ...         return (argh, super(_datetime, self).__reduce__()[1])
+    >>> def argh(*args, **kwargs):
+    ...     return _datetime(*args, **kwargs)
+    ...
+    >>> datetime.datetime = _datetime
+    >>> _now = datetime.datetime(2006, 8, 10, 15, 44, 22, 211, pytz.UTC)
+
+.. [#reinstate]
+
+    >>> for p in (res1, res2, res3, res4, res5):
+    ...     p.assignerUUID = None
+    ...     res = dm.thread.put(p)
+    ...
+    >>> list(dm.thread) == [res3, res1, res2, res5, res4]
+    True
+    >>> list(dm.thread.iterDue()) == [res3, res1, res2, res5, res4]
+    True
+
+.. [#tear_down_datetime]
+
+    >>> datetime.datetime = old_datetime


Property changes on: zc.async/trunk/src/zc/async/datamanager.txt
___________________________________________________________________
Name: svn:eol-style
   + native

Added: zc.async/trunk/src/zc/async/engine.py
===================================================================
--- zc.async/trunk/src/zc/async/engine.py	2006-08-15 20:31:28 UTC (rev 69534)
+++ zc.async/trunk/src/zc/async/engine.py	2006-08-15 20:33:11 UTC (rev 69535)
@@ -0,0 +1,247 @@
+import uuid
+import Queue
+import thread
+import threading
+import datetime
+import logging
+import pytz
+import twisted.internet.reactor
+import ZODB.POSException
+import transaction
+import transaction.interfaces
+
+import zc.twist
+    
+def remove(container, partial, result=None):
+    container.remove(partial)
+    container.__parent__.completed.add(partial)
+    
+def perform(p):
+    p()
+    p.addCallback(zc.async.partial.Partial(remove, p.__parent__, p))
+
+class Engine(object):
+    # this intentionally does not have an interface.  It would be nicer if this
+    # could be a Twisted service, part of the main Zope service, but that does
+    # not appear easy to arrange at the moment.  Therefore we have a subscriber
+    # in subscribers.py that does custom set-up, using raw reactor code.
+    # Eventually I'd like to move this to a service interface, and tie it to
+    # the Zope service in the subscriber.
+
+    _needed = 0
+    alive = True
+
+    def __init__(self, UUID, factory):
+        self.workerUUID = UUID
+        self.factory = factory
+        self.thread_queue = Queue.Queue(0)
+        self._threads = []
+        self.UUID = uuid.uuid4() # this is supposed to distinguish this engine
+        # instance from any others potentially wanting to work on the worker.
+
+    def perform_thread(self):
+        try:
+            job = self.thread_queue.get()
+            while job is not None:
+                db, identifier = job
+                conn = db.open()
+                removal = None
+                try:
+                    transaction.begin()
+                    p = conn.get(identifier)
+                    p.thread = thread.get_ident()
+                    transaction.commit()
+                    removal = zc.twist.Partial(remove, p.__parent__, p)
+                    try:
+                        p() # this does the committing and retrying, largely
+                    except ZODB.POSException.TransactionError:
+                        transaction.abort()
+                        while 1:
+                            try:
+                                p.fail()
+                                transaction.commit()
+                            except ZODB.POSException.TransactionError:
+                                transaction.abort() # retry forever (!)
+                            else:
+                                break
+                finally:
+                    conn.close()
+                    if removal is not None:
+                        twisted.internet.reactor.callFromThread(removal)
+                job = self.thread_queue.get()
+        finally:
+            # this may cause some bouncing, but we don't ever want to end
+            # up with fewer than needed.
+            twisted.internet.reactor.callFromThread(self.set_threads)
+    
+    def set_threads(self, needed=None):
+        # this should only be called from the main thread (otherwise it needs
+        # locks)
+        if needed is None:
+            needed = self._needed
+        else:
+            self._needed = needed
+        res = []
+        ct = 0
+        for t in self._threads:
+            if t.isAlive():
+                res.append(t)
+                ct += 1
+        self._threads[:] = res
+        if ct < needed:
+            for i in range(max(needed - ct, 0)):
+                t = threading.Thread(target=self.perform_thread)
+                self._threads.append(t)
+                t.start()
+        elif ct > needed:
+            # this may cause some bouncing, but hopefully nothing too bad.
+            for i in range(ct - needed):
+                self.thread_queue.put(None)
+    
+    def poll(self, datamanager):
+        if not self.alive:
+            return
+        poll_seconds = 0.25
+        call = zc.twist.Partial(self.poll, datamanager)
+        try:
+            tm = transaction.interfaces.ITransactionManager(datamanager)
+            tm.begin()
+            now = datetime.datetime.now(pytz.UTC)
+            worker = datamanager.workers.get(self.workerUUID)
+            if worker is not None:
+                if (worker.engineUUID is not None and
+                    worker.engineUUID != self.UUID):
+                    # uh-oh.  Maybe another engine is in on the action?
+                    time_of_death = (worker.last_ping + worker.ping_interval
+                                     + worker.ping_death_interval)
+                    if time_of_death < now:
+                        # hm.  Looks like it's dead.
+                        zc.async.datamanager.cleanDeadWorker(worker)
+                        worker.engineUUID = self.UUID
+                    else:
+                        # this is some other engine's UUID,
+                        # and it isn't dead (yet?).  Houston, we have a problem.
+                        interval = time_of_death - now
+                        logging.warning(
+                            'Another engine instance, %s, has claimed worker '
+                            '%s.  This engine instance, %s, is '
+                            "deferring.  The other engine will be "
+                            "regarded dead and scheduled for removal after "
+                            '%d days, %d seconds, and %d microseconds',
+                            worker.engineUUID, worker.UUID, self.UUID,
+                            interval.days, interval.seconds,
+                            interval.microseconds)
+                        return # which will call the finally clause
+                else:
+                    worker.engineUUID = self.UUID
+                try:
+                    tm.commit()
+                except ZODB.POSException.TransactionError:
+                    tm.abort()
+                    # uh-oh.  Somebody else may be adding a worker for the
+                    # same UUID.  we'll just return for now, and figure that
+                    # the next go-round will report the problem.
+                    return # will call finally clause
+            else:
+                worker = self.factory(self.workerUUID)
+                datamanager.workers.add(worker)
+                worker.engineUUID = self.UUID
+                try:
+                    tm.commit()
+                except ZODB.POSException.TransactionError:
+                    tm.abort()
+                    # uh-oh.  Somebody else may be adding a worker for the
+                    # same UUID.  we'll just return for now, and figure that
+                    # the next go-round will report the problem.
+                    return # will call finally clause
+            poll_seconds = worker.poll_seconds
+            datamanager.checkSibling(worker.UUID)
+            try:
+                tm.commit()
+            except ZODB.POSException.TransactionError:
+                tm.abort()
+                # we'll retry next poll.
+            if (worker.completed.last_rotation +
+                worker.completed.rotation_interval) <= now:
+                worker.completed.rotate()
+                try:
+                    tm.commit()
+                except ZODB.POSException.TransactionError:
+                    tm.abort()
+                    # we'll retry next poll.
+            if worker.last_ping + worker.ping_interval <= now:
+                worker.last_ping = now
+                try:
+                    tm.commit()
+                except ZODB.POSException.TransactionError:
+                    # uh-oh: are there two engines working with the same worker?
+                    tm.abort() # and retry next time
+                    logging.error(
+                        "Transaction error for worker %s.  This should not "
+                        "happen.", self.workerUUID)
+                    return
+            def thread_size():
+                if len(datamanager.workers) == 1:
+                    return 1
+                else:
+                    return worker.thread_size
+            self.set_threads(thread_size())
+            while len(worker.thread) < thread_size():
+                p = datamanager.thread.pullNext(uuid)
+                if p is not None:
+                    worker.thread.add(p)
+                    try:
+                        tm.commit()
+                    except ZODB.POSException.TransactionError:
+                        tm.abort()
+                    else:
+                        self.thread_queue.put((p._p_jar.db(), p._p_oid))
+                else:
+                    break
+            self.set_threads(thread_size())
+            while len(worker.reactor) < worker.reactor.size:
+                p = datamanager.reactor.pullNext(uuid)
+                if p is not None:
+                    worker.reactor.add(p)
+                    try:
+                        tm.commit()
+                    except ZODB.POSException.TransactionError:
+                        tm.abort()
+                    else:
+                        twisted.internet.reactor.callLater(
+                            0, zc.twist.Partial(perform, p))
+                else:
+                    break
+            now = datetime.datetime.now(pytz.UTC)
+            if worker.last_ping + worker.ping_interval <= now:
+                worker.last_ping = now
+                try:
+                    tm.commit()
+                except ZODB.POSException.TransactionError:
+                    # uh-oh: are there two engines working with the same worker?
+                    tm.abort() # and retry next time
+                    logging.error(
+                        "Transaction error for worker %s.  This should not "
+                        "happen.", self.workerUUID)
+                    return
+        finally:
+            tm.abort()
+            if self.alive:
+                twisted.internet.reactor.callLater(poll_seconds, call)
+
+    def tearDown(self, datamanager):
+        self.alive = False
+        self.set_threads(0)
+        try:
+            tm = transaction.interfaces.ITransactionManager(datamanager)
+            tm.begin()
+            worker = datamanager.workers.get(self.workerUUID)
+            if worker is not None:
+                worker.engineUUID = None
+                datamanager.thread.put(
+                    zc.async.partial.Partial(
+                        zc.async.datamanager.cleanDeadWorker, worker))
+                tm.commit()
+        finally:
+            tm.abort()
+        


Property changes on: zc.async/trunk/src/zc/async/engine.py
___________________________________________________________________
Name: svn:eol-style
   + native

Added: zc.async/trunk/src/zc/async/i18n.py
===================================================================
--- zc.async/trunk/src/zc/async/i18n.py	2006-08-15 20:31:28 UTC (rev 69534)
+++ zc.async/trunk/src/zc/async/i18n.py	2006-08-15 20:33:11 UTC (rev 69535)
@@ -0,0 +1,32 @@
+##############################################################################
+#
+# Copyright (c) 2004 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################  
+
+"""I18N support for the site package.
+
+This defines a `MessageFactory` for the I18N domain for the
+`zc.async` package.  This is normally used with this import::
+
+  from i18n import _
+
+The factory is then used normally.  Two examples::
+
+  text = _('some internationalized text')
+  text = _('helpful-descriptive-message-id', 'default text')
+"""
+__docformat__ = "reStructuredText"
+
+
+from zope import i18nmessageid
+
+MessageFactory = _ = i18nmessageid.MessageFactory("zc.async")


Property changes on: zc.async/trunk/src/zc/async/i18n.py
___________________________________________________________________
Name: svn:eol-style
   + native

Added: zc.async/trunk/src/zc/async/instanceuuid.py
===================================================================
--- zc.async/trunk/src/zc/async/instanceuuid.py	2006-08-15 20:31:28 UTC (rev 69534)
+++ zc.async/trunk/src/zc/async/instanceuuid.py	2006-08-15 20:33:11 UTC (rev 69535)
@@ -0,0 +1,38 @@
+
+import os
+import uuid
+import zope.interface
+import zc.async.interfaces
+
+# test for this file is in tests.py
+
+msg = """
+------------------------------------------------------------------------
+The value above (and this file) is created and used by the zc.async
+package. It is intended to uniquely identify this software instance when
+it is used to start a zc.async worker process.  This allows multiple
+workers to connect to a single database to do work.  The software
+expects an instance home to only generate a single process.
+
+To get a new identifier for this software instance, delete this file and
+restart Zope (or more precisely, delete this file, restart Python, and
+import zc.async.instanceuuid).  This file will be recreated with a new value.
+"""
+
+zope.interface.classImplements(uuid.UUID, zc.async.interfaces.IUUID)
+
+def getUUID():
+    file_name = os.path.join(
+        os.environ.get("INSTANCE_HOME"), 'etc', 'uuid.txt')
+    if os.path.exists(file_name):
+        f = open(file_name, 'r')
+        UUID = uuid.UUID(f.readline().strip())
+        f.close()
+    else:
+        UUID = uuid.uuid1()
+        f = open(file_name, 'w')
+        f.writelines((str(UUID), msg))
+        f.close()
+    return UUID
+
+UUID = getUUID()


Property changes on: zc.async/trunk/src/zc/async/instanceuuid.py
___________________________________________________________________
Name: svn:eol-style
   + native

Added: zc.async/trunk/src/zc/async/interfaces.py
===================================================================
--- zc.async/trunk/src/zc/async/interfaces.py	2006-08-15 20:31:28 UTC (rev 69534)
+++ zc.async/trunk/src/zc/async/interfaces.py	2006-08-15 20:33:11 UTC (rev 69535)
@@ -0,0 +1,358 @@
+import zope.interface
+import zope.interface.common.mapping
+import zope.interface.common.sequence
+import zope.component.interfaces
+import zc.queue.interfaces
+from zc.async.i18n import _
+
+PENDING = _('pending-state', 'Pending')
+ACTIVE = _('active-state', 'Active')
+CALLBACKS = _('callback-state', 'Performing Callbacks')
+COMPLETED = _('completed-state', 'Completed')
+
+class AbortedError(Exception):
+    """An explicit abort, as generated by the default behavior of
+    IPartial.fail"""
+
+class BadStateError(Exception):
+    """The partial is not in the state it should be for the call being made.
+    This is almost certainly a programmer error."""
+
+class IPartialFactory(zope.interface.Interface):
+
+    def __call__(self, call, *args, **kwargs):
+        """return an IPartial with the given call, args, and kwargs"""
+
+    def bind(self, call, *args, **kwargs):
+        """returns IPartial with the IPartial inserted as first value in args.
+        """
+
+class IPartial(zope.interface.Interface):
+
+    __parent__ = zope.interface.Attribute(
+        """The current canonical location of the partial""")
+
+    callable = zope.interface.Attribute(
+        """The callable object that should be called with *IPartial.args and
+        **IPartial.kwargs when the IPartial is called.  Mutable.""")
+
+    args = zope.interface.Attribute(
+        """a peristent list of the args that should be applied to self.call.
+        May include persistent objects (though note that, if passing a method
+        is desired, it will typicall need to be wrapped in an IPartial).""")
+
+    kwargs = zope.interface.Attribute(
+        """a persistent mapping of the kwargs that should be applied to
+        self.call.  May include persistent objects (though note that, if
+        passing a method is desired, it will typicall need to be wrapped
+        in an IPartial).""")
+
+    state = zope.interface.Attribute(
+        """One of constants defined in zc.async.interfaces: PENDING,
+        ACTIVE, CALLBACKS, COMPLETED.  PENDING means not yet called. 
+        ACTIVE means in the process of being called.  CALLBACKS means in
+        the process of calling callbacks.  COMPLETED means called.""")
+
+    result = zope.interface.Attribute(
+        """The result of the call.  When state equals PENDING or ACTIVE, will
+        be None.  When COMPLETED, will be a twisted.python.failure.Failure
+        describing the call failure or the successful result.""")
+
+    callbacks = zope.interface.Attribute(
+        """A mutable persistent list of the callback partials added by
+        addCallbacks.""")
+
+    unhandled_error = zope.interface.Attribute(
+        """A boolean: whether this partial has an unhandled error.
+        An unhandled error is defined as a Failure result on any callback
+        leaf node, or a Failure on this partial if this has no callbacks
+        or if it has one or more incomplete callback.""")
+
+    annotations = zope.interface.Attribute(
+        """An OOBTree that is available for metadata use.""")
+
+    def addCallbacks(success=None, failure=None):
+        """if success or failure is not None, adds a callback partial to
+        self.callbacks and returns the partial.  Otherwise returns self.
+        success and failure must be None or adaptable to IPartial.
+        addCallbacks may be called multiple times.  Each will be called
+        with the result of this partial.  If callback is already in COMPLETED
+        state then the callback will be performed immediately."""
+
+    def addCallback(callback):
+        """callback will receive result (independent of whether it is a
+        success or a failure).  callback must be adaptable to IPartial.
+        addCallback may be called multiple times.  Each will be called
+        with the result of this partial.  If callback is already in
+        COMPLETED state then the callback will be performed immediately."""
+
+    def __call__(*args, **kwargs):
+        """call the callable.  Any given args are effectively appended to
+        self.args for the call, and any kwargs effectively update self.kwargs
+        for the call."""
+
+    def fail(e=AbortedError):
+        """Fail this partial, with option error e.  May only be called when
+        partial is in PENDING or ACTIVE states, or else raises BadStateError.
+        If e is not provided,"""
+
+    def resumeCallbacks():
+        """Make all callbacks remaining for this partial.  Any callbacks
+        that are in PENDING state should be called normally; any callbacks
+        in ACTIVE state should be `fail`ed; any callbacks in CALLBACKS state
+        should `resumeCallback`; and any callbacks in COMPLETED state should
+        be untouched.  May only be called when partial is in CALLBACKS state.
+        State will be COMPLETED after this call."""
+
+class IDataManagerPartial(IPartial):
+    """An async partial with all the necessary knobs to by put in a
+    datamanager."""
+
+    workerUUID = zope.interface.Attribute(
+        """The UUID of the IWorker who is, or was, responsible for this
+        partial.  None initially.  Should be assigned by
+        IWorker.[reactor|thread].put.""")
+
+    assignerUUID = zope.interface.Attribute(
+        """The UUID of the software instance that was in charge when the
+        IPartial was put in an IPartialQueue.  Should be assigned by
+        IPartialQueue.put.""")
+
+    selectedUUIDs = zope.interface.Attribute(
+        """a set of selected worker UUIDs.  If it is empty, it is
+        interpreted as the set of all available workerUUIDs.  Only
+        workers with UUIDs in the set may perform it.
+
+        If a worker would have selected this partial for a run, but the
+        difference of selected_workerUUIDs and excluded_workerUUIDs
+        stopped it, it is responsible for verifying that the effective
+        set of workerUUIDs intersects with the available workers; if the
+        intersection contains no possible workers, the worker should
+        call partial.fail().""")
+
+    excludedUUIDs = zope.interface.Attribute(
+        """a set of excluded worker UUIDs.  Workers with UUIDs in this
+        set may not perform the partial.
+
+        If a worker would have selected this partial for a run, but the
+        difference of selected_workerUUIDs and excluded_workerUUIDs
+        stopped it, it is responsible for verifying that the effective
+        set of workerUUIDs intersects with the available workers; if the
+        intersection contains no possible workers, the worker should
+        call partial.fail().""")
+
+    begin_after = zope.interface.Attribute(
+        """A datetime.datetime in UTC of the first time when the
+        partial may run.  Cannot be set after partial gets a data_manager.
+        """)
+
+    begin_by = zope.interface.Attribute(
+        """A datetime.timedelta of the duration after the begin_after
+        value after which the partial will fail, if it has not already
+        begun.  Cannot be set after partial has begun.""")
+
+    thread = zope.interface.Attribute(
+        """None or thread.get_ident() of the worker who performs it.  If a
+        reactor partial, must be None.""")
+
+class IPartialQueue(zc.queue.interfaces.IQueue):
+
+    __parent__ = zope.interface.Attribute(
+        """the IDataManager of which this is a part.""")
+
+    thread = zope.interface.Attribute(
+        """boolean of whether this is a thread or reactor queue""")
+
+    def put(item, begin_after=None, begin_by=None):
+        """Put an IPartial adapted from item into the queue.  Returns IPartial.
+
+        Rememeber that IPartials are not guaranteed to be run in order
+        added to a queue.  If you need sequencing, use
+        IPartial.addCallbacks.
+        
+        item must be an IPartial, or be adaptable to that interface.
+        begin_after must be None (to leave the partial's current value) or a 
+        datetime.datetime.  begin_by must be None (to leave it alone) or a
+        datetime.timedelta of the duration after the begin_after.
+
+        If item.begin_after is None and begin_after is None, begin_after will
+        effectively be now.  If item.begin_by is None and begin_by is None,
+        begin_by will effectively be datetime.timedelta(hours=1).
+
+        datetime.datetimes are suggested to be in UTC.  Timezone-naive
+        datetimes will be interpreted as in UTC.  Timezone-aware datetimes
+        will be converted to UTC, and errors because of this (such as
+        pytz ambiguity errors) will be raised.
+
+        When an IPartial is put in the queue, the queue puts the
+        begin_after time and begin_by duration on the partial,
+        and the UUID of the Zope instance that put the partial in the
+        queue on the `assignerUUID`.
+        """
+
+    def iterDue():
+        """return an iterable of all partials whose begin_after value is
+        less than or equal to now.  Any expired partials (begin_after +
+        begin_by > datetime.datetime.now(pytz.UTC)) are also included.
+        """
+
+    def pullNext(UUID):
+        """returns first due job that is available for the given UUID,
+        removing it from the queue as appropriate; or None, if none are
+        available. Responsible for including jobs to fail expired
+        partials, and jobs to decomission dead workers for the next
+        highest worker (sorted by UUID) if its (last_ping +
+        ping_interval + ping_death_interval) < now.  If this is the
+        highest worker UUID, cycles around to lowest."""
+
+class IWorkers(zope.interface.common.mapping.IEnumerableMapping):
+
+    __parent__ = zope.interface.Attribute(
+        """the IDataManager of which this is a part.""")
+
+    def add(value):
+        """add an IWorker with key of value.UUID.  If value.UUID is None,
+        raise ValueError.  Set value.__parent__ to the IWorkers."""
+
+    def remove(UUID):
+        """remove the registered IWorker with the give UUID.  Raise KeyError
+        if such does not exist."""
+
+class IDataManager(zope.interface.Interface):
+    """Note that partials added to queues are not guaranteed to run in
+    the order added.  For sequencing, use IPartial.addCallbacks."""
+
+    thread = zope.interface.Attribute(
+        """An IPartialQueue of IPartials that should be run in a thread.""")
+
+    reactor = zope.interface.Attribute(
+        """An IPartialQueue of IPartials that should be run in the main
+        loop (e.g., Twisted's main reactor loop).""")
+
+    workers = zope.interface.Attribute(
+        """An IWorkers of registered IWorker objects for this data manager;
+        these objects represent processes that are responsible for claiming
+        and performing the IPartials in the data manager.""")
+
+    def checkSibling(uuid):
+        """check the next sibling of uuid to see if it is dead, according
+        to its last_poll, and remove the engineUUID and schedule removal of its
+        partials if it is dead."""
+
+class IDataManagerAvailableEvent(zope.component.interfaces.IObjectEvent):
+    """object is data manager"""
+
+class DataManagerAvailable(zope.component.interfaces.ObjectEvent):
+    zope.interface.implements(IDataManagerAvailableEvent)
+
+class FullError(Exception):
+    """Container is full.
+    """
+
+class ISizedSequence(zope.interface.common.sequence.IFiniteSequence):
+    size = zope.interface.Attribute(
+        """an integer.  If the queue's len >= size, put will raise
+        FullError""")
+
+    def add(item):
+        """same contract as IQueue.put, except if queue's len >= size, put will
+        raise FullError, and all objects get __parent__ set to the queue;
+        and it will only store partials."""
+
+    __parent__ = zope.interface.Attribute(
+        """a link to parent: an IWorker""")
+
+    def remove(item):
+        """remove item, or raise ValueError if item is not in queue"""
+
+    def __delitem__(index):
+        """delete item at index"""
+
+    def index(item):
+        """return index, or raise ValueError if item is not in queue"""
+
+class ICompletedCollection(zope.interface.Interface):
+    def __iter__():
+        """Iterate over partials in collection, from most recent `begin_after`
+        to oldest"""
+
+    def iter(start=None, stop=None):
+        """Iterate over partials in collection, starting and stopping with
+        given timezone-aware datetime values reasonably efficiently."""
+
+    def __len__():
+        """Return number of partials in collection"""
+
+    def add(partial):
+        """Add partial to collection and set __parent__ to the collection."""
+
+    __parent__ = zope.interface.Attribute(
+        """an IWorker""")
+
+    rotation_interval = zope.interface.Attribute(
+        """A datetime.timedelta of how often the buckets in the collection
+        should be rotated, to clean them out.""")
+
+    last_rotation = zope.interface.Attribute(
+        """A datetime.datetime in pytz.UTC of the last time a rotation was
+        performed (should initialize to the creation time).""")
+
+    def first(start=None):
+        """Return the first (most recent) partial in the collection, starting
+        with optional timezone-aware datetime."""
+
+    def last(stop=None):
+        """Return the last (oldest) partial in the collection, stopping
+        with optional timezone-aware datetime."""
+
+    def __nonzero__():
+        "whether collection contains any partials"
+
+    def rotate():
+        """rotate buckets, eliminating the ones added longest ago.  Note that
+        this may be different than the ordering by begin_after."""
+
+class IWorker(zope.interface.Interface):
+
+    reactor = zope.interface.Attribute(
+        """An ISizedQueue of reactor partials currently being worked on by this
+        worker.""")
+
+    thread = zope.interface.Attribute(
+        """An ISizedQueue of thread partials currently being worked on by this
+        worker.""")
+
+    UUID = zope.interface.Attribute(
+        """The uuid.UUID that identifies this worker (where one instance ==
+        one process == one worker == one UUID).""")
+
+    engineUUID = zope.interface.Attribute(
+        """The uuid.UUID of the engine that is running this worker, or None.""")
+
+    last_ping = zope.interface.Attribute(
+        """the datetime.datetime in the pytz.UTC timezone of the last ping.
+        This date should be updated anytime a worker accepts a job in a
+        reactor or thread queue; and whenever, during a poll,
+        (last_ping + ping_interval) <= now.""")
+
+    poll_seconds = zope.interface.Attribute(
+        """The number of seconds between the end of one worker poll and the
+        start of the next.""")
+
+    ping_interval = zope.interface.Attribute(
+        """The approximate maximum datetime.timedelta between pings before
+        a new last_ping should be recorded.""")
+
+    ping_death_interval = zope.interface.Attribute(
+        """the datetime.timedelta after the last_ping + ping_interval that
+        signifies the workers death.  That is, if (last_ping + ping_interval +
+        ping_death_interval) < now, the worker should be regarded as dead.
+        """)
+
+    completed = zope.interface.Attribute(
+        """The most recent partials completed by this worker, in the order
+        from most recent `begin_after` to oldest.  ICompletedCollection.""")
+
+class IUUID(zope.interface.Interface):
+    """A marker interface for the API of Ka-Ping Yee's uuid.UUID class.
+    See http://zesty.ca/python/uuid.html """


Property changes on: zc.async/trunk/src/zc/async/interfaces.py
___________________________________________________________________
Name: svn:eol-style
   + native

Added: zc.async/trunk/src/zc/async/partial.py
===================================================================
--- zc.async/trunk/src/zc/async/partial.py	2006-08-15 20:31:28 UTC (rev 69534)
+++ zc.async/trunk/src/zc/async/partial.py	2006-08-15 20:33:11 UTC (rev 69535)
@@ -0,0 +1,246 @@
+
+import types
+
+import BTrees.OOBTree
+import ZODB.POSException
+import transaction.interfaces
+import persistent
+import persistent.list
+import persistent.mapping
+import twisted.internet.defer
+import twisted.python.failure
+import zope.interface
+import zc.queue
+
+import zc.async.interfaces
+import zc.twist
+from zc.async import rwproperty
+
+def success_or_failure(success, failure, res):
+    callable = None
+    if isinstance(res, twisted.python.failure.Failure):
+        if failure is not None:
+            callable = failure
+    elif success is not None:
+        callable = success
+    if callable is None:
+        return res
+    return callable(res)
+
+def completeStartedPartialArguments(partial, result):
+    if isinstance(result, twisted.python.failure.Failure):
+        for collection in (partial.args, partial.kwargs.values()):
+            for a in collection:
+                if (zc.async.interfaces.IPartial.providedBy(a) and
+                    a.state not in (
+                        zc.async.interfaces.PENDING,
+                        zc.async.interfaces.COMPLETED)):
+                    if a.state == zc.async.interfaces.ACTIVE:
+                        a.fail()
+                    elif a.state == zc.async.interfaces.CALLBACKS:
+                        a.resumeCallbacks()
+    return result
+
+class Partial(persistent.Persistent):
+
+    zope.interface.implements(zc.async.interfaces.IPartial)
+    zope.interface.classProvides(zc.async.interfaces.IPartialFactory)
+
+    __parent__ = _callable_root = _callable_name = _result = None
+    _state = zc.async.interfaces.PENDING
+
+    def __init__(self, *args, **kwargs):
+        self.args = persistent.list.PersistentList(args)
+        self.callable = self.args.pop(0)
+        self.kwargs = persistent.mapping.PersistentMapping(kwargs)
+        self.callbacks = zc.queue.PersistentQueue()
+        self.annotations = BTrees.OOBTree.OOBTree()
+
+    @property
+    def result(self):
+        return self._result
+
+    @property
+    def state(self):
+        return self._state
+
+    @property
+    def unhandled_error(self):
+        if (self.state in (zc.async.interfaces.COMPLETED,
+                           zc.async.interfaces.CALLBACKS) and
+            isinstance(self.result, twisted.python.failure.Failure)):
+            ct = 0
+            for c in self.callbacks:
+                if (c.state not in (zc.async.interfaces.COMPLETED,
+                                    zc.async.interfaces.CALLBACKS) or
+                    c.unhandled_error):
+                    return True
+                ct += 1
+            if not ct:
+                return True
+        return False
+
+    @classmethod
+    def bind(klass, *args, **kwargs):
+        res = klass(*args, **kwargs)
+        res.args.insert(0, res)
+        return res
+
+    @property
+    def callable(self):
+        if self._callable_name is None:
+            return self._callable_root
+        else:
+            return getattr(self._callable_root, self._callable_name)
+    @rwproperty.setproperty
+    def callable(self, value):
+        # can't pickle/persist methods by default as of this writing, so we
+        # add the sugar ourselves
+        if self.state != zc.async.interfaces.PENDING:
+            raise zc.async.interfaces.BadStateError(
+                'can only set callable when a partial is in PENDING state')
+        if isinstance(value, types.MethodType):
+            self._callable_root = value.im_self
+            self._callable_name = value.__name__
+        else:
+            self._callable_root, self._callable_name = value, None
+
+    def addCallbacks(self, success=None, failure=None):
+        if success is not None or failure is not None:
+            if success is not None:
+                success = zc.async.interfaces.IPartial(success)
+            if failure is not None:
+                failure = zc.async.interfaces.IPartial(failure)
+            res = Partial(success_or_failure, success, failure)
+            if success is not None:
+                success.__parent__ = res
+            if failure is not None:
+                failure.__parent__ = res
+            self.addCallback(res)
+            # we need to handle the case of callbacks on the internal success/
+            # failure partials, to be safe.
+            abort_handler = zc.async.interfaces.IPartial(
+                completeStartedPartialArguments)
+            abort_handler.args.append(res)
+            res = res.addCallback(abort_handler)
+        else:
+            res = self
+        return res
+
+    def addCallback(self, callback):
+        callback = zc.async.interfaces.IPartial(callback)
+        self.callbacks.put(callback)
+        callback.__parent__ = self
+        if self.state == zc.async.interfaces.COMPLETED:
+            callback(self.result) # this commits transactions!
+        else:
+            self._p_changed = True # to try and fire conflict errors if
+            # our reading of self.state has changed beneath us
+        return callback
+
+    def __call__(self, *args, **kwargs):
+        if self.state != zc.async.interfaces.PENDING:
+            raise zc.async.interfaces.BadStateError(
+                'can only call a partial in PENDING state')
+        tm = transaction.interfaces.ITransactionManager(self)
+        self._state = zc.async.interfaces.ACTIVE
+        tm.commit()
+        effective_args = list(args)
+        effective_args[0:0] = self.args
+        effective_kwargs = dict(self.kwargs)
+        effective_kwargs.update(kwargs)
+        return self._call_with_retry(
+            lambda: self.callable(*effective_args, **effective_kwargs))
+
+    def _call_with_retry(self, call):
+        ct = 0
+        tm = transaction.interfaces.ITransactionManager(self)
+        res = None
+        while 1:
+            try:
+                res = call()
+                if zc.async.interfaces.IPartial.providedBy(res):
+                    res.addCallback(self._callback)
+                elif isinstance(res, twisted.internet.defer.Deferred):
+                    res.addBoth(zc.twist.Partial(self._callback))
+                else:
+                    if isinstance(res, twisted.python.failure.Failure):
+                        res = zc.twist.sanitize(res)
+                    self._result = res
+                    self._state = zc.async.interfaces.CALLBACKS
+                tm.commit()
+            except ZODB.POSException.TransactionError:
+                tm.abort()
+                ct += 1
+                if ct >= 5:
+                    res = self._result = zc.twist.sanitize(
+                        twisted.python.failure.Failure())
+                    self._state = zc.async.interfaces.CALLBACKS
+                    tm.commit()
+                    self.resumeCallbacks()
+                else:
+                    continue
+            except zc.twist.EXPLOSIVE_ERRORS:
+                tm.abort()
+                raise
+            except:
+                tm.abort()
+                res = self._result = zc.twist.sanitize(
+                    twisted.python.failure.Failure())
+                self._state = zc.async.interfaces.CALLBACKS
+                tm.commit()
+                self.resumeCallbacks()
+            else:
+                if self.state == zc.async.interfaces.CALLBACKS:
+                    self.resumeCallbacks()
+            return res
+
+    def _callback(self, res):
+        self._call_with_retry(lambda: res)
+
+    def fail(self, e=None):
+        if e is None:
+            e = zc.async.interfaces.AbortedError()
+        if self.state not in (zc.async.interfaces.PENDING,
+                              zc.async.interfaces.ACTIVE):
+            raise zc.async.interfaces.BadStateError(
+                'can only call fail on a partial in PENDING or ACTIVE states')
+        tm = transaction.interfaces.ITransactionManager(self)
+        self._result = zc.twist.sanitize(
+            twisted.python.failure.Failure(e))
+        self._state = zc.async.interfaces.CALLBACKS
+        tm.commit()
+        self.resumeCallbacks()
+
+    def resumeCallbacks(self):
+        if self.state != zc.async.interfaces.CALLBACKS:
+            raise zc.async.interfaces.BadStateError(
+                'can only resumeCallbacks on a partial in CALLBACKS state')
+        callbacks = list(self.callbacks)
+        tm = transaction.interfaces.ITransactionManager(self)
+        length = 0
+        while 1:
+            for p in callbacks:
+                if p.state == zc.async.interfaces.PENDING:
+                    p(self.result)
+                elif p.state == zc.async.interfaces.ACTIVE:
+                    p.fail()
+                elif p.state == zc.async.interfaces.CALLBACKS:
+                    p.resumeCallbacks()
+                # TODO: this shouldn't raise anything we want to catch, right?
+                # now, this should catch all the errors except EXPLOSIVE_ERRORS
+                # cleaning up dead partials should look something like the above.
+            tm.commit()
+            # it's possible that someone added some callbacks run until
+            # we're exhausted.
+            length += len(callbacks)
+            callbacks = list(self.callbacks)[length:]
+            if not callbacks:
+                try:
+                    self._state = zc.async.interfaces.COMPLETED
+                    tm.commit()
+                except ZODB.POSException.TransactionError:
+                    tm.abort()
+                    callbacks = list(self.callbacks)[length:]
+                else:
+                    break # and return


Property changes on: zc.async/trunk/src/zc/async/partial.py
___________________________________________________________________
Name: svn:eol-style
   + native

Added: zc.async/trunk/src/zc/async/partial.txt
===================================================================
--- zc.async/trunk/src/zc/async/partial.txt	2006-08-15 20:31:28 UTC (rev 69534)
+++ zc.async/trunk/src/zc/async/partial.txt	2006-08-15 20:33:11 UTC (rev 69535)
@@ -0,0 +1,912 @@
+========
+Partials
+========
+
+What if you want to persist a reference to the method of a persistent
+object--you can't persist that normally in the ZODB, but that can be
+very useful, especially to store asynchronous calls.  What if you want
+to act on the result of an asynchronous call that may be called later? 
+The zc.async package offers an approach modelled loosely on the Twisted
+deferred code: `zc.async.partial.Partial`.  To use it, simply wrap the
+callable--a method of a persistent object or a callable persistent
+object or a global function--in the partial.  You can include ordered
+and keyword arguments to the partial, which may be persistent objects or
+simply pickleable objects.
+
+Unlike a normal partial, the result of the wrapped call goes on the
+partial's 'result' attribute, and the immediate return of the call might
+not be the end result.  It could also be a failure, indicating an
+exception; or another partial, indicating that we are waiting to be
+called back by the second partial; or a twisted deferred, indicating
+that we are waiting to be called back by a twisted Deferred (see the
+`twist` module, also in this package).  After you have the partial, you
+can then use a number of methods and attributes on the partial for
+further set up.  Let's show the most basic use first, though.
+
+Note that, even though this looks like an interactive prompt, all
+functions and classes defined in this document act as if they were
+defined within a module.  Classes and functions defined in an interactive
+prompt are normally not picklable, and the async Partial must work with
+picklable objects [#set_up]_.
+
+    >>> import zc.async.partial
+    >>> def call():
+    ...     print 'hello world'
+    ...     return 'my result'
+    ...
+    >>> p = root['p'] = zc.async.partial.Partial(call)
+    >>> import transaction
+    >>> transaction.commit()
+
+Now we have a partial [#verify]_.  We can see that the state is PENDING,
+call it, and then see that the function was called, and see the result on
+the partial.
+
+    >>> import zc.async.interfaces
+    >>> p.state == zc.async.interfaces.PENDING
+    True
+    >>> res = p()
+    hello world
+    >>> p.result
+    'my result'
+    >>> p.state == zc.async.interfaces.COMPLETED
+    True
+
+The result of the partial also happens to be the end result of the call,
+but as mentioned above, the partial may return a deferred or another partial.
+
+    >>> res
+    'my result'
+
+We can also use a method of a persistent object.  Imagine we have a ZODB
+root that we can put objects in to.
+
+    >>> import persistent
+    >>> class Demo(persistent.Persistent):
+    ...     counter = 0
+    ...     def increase(self, value=1):
+    ...         self.counter += value
+    ...
+    >>> demo = root['demo'] = Demo()
+    >>> demo.counter
+    0
+    >>> p = root['p'] = zc.async.partial.Partial(demo.increase)
+    >>> transaction.commit()
+    >>> p() # result is None
+    >>> demo.counter
+    1
+
+So our two calls so far have returned direct successes.  This one returns
+a failure, because the wrapped call raises an exception.
+
+    >>> def callFailure():
+    ...     raise RuntimeError('Bad Things Happened Here')
+    ...
+    >>> p = root['p'] = zc.async.partial.Partial(callFailure)
+    >>> transaction.commit()
+    >>> res = p()
+    >>> p.result
+    <twisted.python.failure.Failure exceptions.RuntimeError>
+
+These are standard twisted Failures, except that frames in the stored
+traceback have been converted to reprs, so that we don't keep references
+around when we pass the Failures around (over ZEO, for instance)
+[#no_live_frames]_.  This doesn't stop us from getting nice tracebacks,
+though.
+
+    >>> print p.result.getTraceback() # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
+    Traceback (most recent call last):
+    ...
+    exceptions.RuntimeError: Bad Things Happened Here
+
+Note that all calls can return a failure explicitly, rather than raising
+an exception that the partial converts to an exception.  However, there
+is an important difference in behavior.  If a wrapped call raises an
+exception, the partial aborts the transaction; but if the wrapped call
+returns a failure, no abort occurs.  Wrapped calls that explicitly return
+failures are thus responsible for any necessary transaction aborts.  See
+the footnote for an example [#explicit_failure_example]_.
+
+Now let's return a partial.  This generally represents a result that is waiting
+on another asynchronous persistent call, which would normally be called by
+a worker.  We'll fire the second call ourselves for this demonstration.
+
+    >>> def innerCall():
+    ...     return 42
+    ...
+    >>> ip = root['ip'] = zc.async.partial.Partial(innerCall)
+    >>> def callPartial():
+    ...     return ip
+    ...
+    >>> p = root['p'] = zc.async.partial.Partial(callPartial)
+    >>> transaction.commit()
+    >>> res = p()
+    >>> res is ip
+    True
+
+While we are waiting for the result, the state is ACTIVE.
+
+    >>> p.state == zc.async.interfaces.ACTIVE
+    True
+
+When we call the inner partial, the result will be placed on the outer partial.
+
+    >>> p.result # None
+    >>> res = ip()
+    >>> p.result
+    42
+    >>> p.state == zc.async.interfaces.COMPLETED
+    True
+
+This is accomplished with callbacks, discussed below in the Callbacks_ section.
+
+Now we'll return a deferred.  The story is almost identical to the
+partial story, except that, in our demonstration, we must handle
+transactions, because the deferred story uses the `twist` module in
+this package to let the Twisted reactor communicate safely with the
+ZODB: see twist.txt for details.
+
+    >>> import twisted.internet.defer
+    >>> inner_d = twisted.internet.defer.Deferred()
+    >>> def callDeferred():
+    ...     return inner_d
+    ...
+    >>> p = root['p2'] = zc.async.partial.Partial(callDeferred)
+    >>> transaction.commit()
+    >>> res = p()
+    >>> res is inner_d
+    True
+    >>> p.state == zc.async.interfaces.ACTIVE
+    True
+    >>> p.result # None
+
+After the deferred receives its result, we need to sync our connection to see
+it.
+
+    >>> inner_d.callback(42)
+    >>> p.result # still None; we need to sync our connection to see the result
+    >>> p.state == zc.async.interfaces.ACTIVE # it's completed, but need to sync
+    True
+    >>> trans = transaction.begin() # sync our connection
+    >>> p.result
+    42
+    >>> p.state == zc.async.interfaces.COMPLETED
+    True
+
+As the last step in looking at the basics, let's look at passing arguments
+into the partial.  They can be persistent objects or generally picklable
+objects, and they can be ordered or keyword arguments.
+
+    >>> class PersistentDemo(persistent.Persistent):
+    ...     def __init__(self, value=0):
+    ...         self.value = value
+    ...
+    >>> root['demo2'] = PersistentDemo()
+    >>> import operator
+    >>> def argCall(ob, ob2=None, value=0, op=operator.add):
+    ...     for o in (ob, ob2):
+    ...         if o is not None:
+    ...             o.value = op(o.value, value)
+    ...
+    >>> p = root['p3'] = zc.async.partial.Partial(
+    ...     argCall, root['demo2'], value=4)
+    >>> transaction.commit()
+    >>> p()
+    >>> root['demo2'].value
+    4
+
+And, of course, this is a partial: we can specify some arguments when the
+partial is made, and some when it is called.
+
+    >>> root['demo3'] = PersistentDemo(10)
+    >>> p = root['p3'] = zc.async.partial.Partial(
+    ...     argCall, root['demo2'], value=4)
+    >>> transaction.commit()
+    >>> p(root['demo3'], op=operator.mul)
+    >>> root['demo2'].value
+    16
+    >>> root['demo3'].value
+    40
+
+This last feature makes partials possible to use for callbacks: our next
+topic.
+
+Callbacks
+---------
+
+The partial object can also be used to handle return values and
+exceptions from the call.  The `addCallbacks` method enables the
+functionality.  Its signature is (success=None, failure=None).  It may
+be called multiple times, each time adding a success and/or failure
+callable that takes an end result: a value or a zc.async.Failure object,
+respectively.  Failure objects are passed to failure callables, and
+any other results are passed to success callables.
+
+The return value of the success and failure callables is
+important for chains and for determining whether a partial had any
+errors that need logging, as we'll see below.  The call to
+`addCallbacks` returns a partial, which can be used for chaining (see
+`Chaining Callbacks`_).
+
+Let's look at a simple example.
+
+    >>> def call(*args):
+    ...     res = 1
+    ...     for a in args:
+    ...         res *= a
+    ...     return res
+    ...
+    >>> def callback(res):
+    ...     return 'the result is %r' % (res,)
+    ...
+    >>> p = root['p4'] = zc.async.partial.Partial(call, 2, 3)
+    >>> p_callback = p.addCallbacks(callback)
+    >>> transaction.commit()
+    >>> res = p(4)
+    >>> p.result
+    24
+    >>> res
+    24
+    >>> p_callback.result
+    'the result is 24'
+
+We can now introduce another new concept: unhandled errors. A partial
+with a failure is considered to have an unhandled error if any leaf-node
+callback has a failure result, or if it itself has a failure result and
+has no callbacks.  This convention, if followed, can be used to
+determine whether to highlight the partial as an error in logs or other
+situations.  However, it is only a convention as far as the partial is
+concerned (other elements of the zc.async package may treat it more
+seriously).
+
+    >>> p.unhandled_error
+    False
+    >>> def error():
+    ...     raise RuntimeError('Boo!')
+    ...
+    >>> p = root['p3'] = zc.async.partial.Partial(error)
+    >>> transaction.commit()
+    >>> f = p()
+    >>> p.result
+    <twisted.python.failure.Failure exceptions.RuntimeError>
+    >>> p.unhandled_error
+    True
+    >>> def handleRuntime(f):
+    ...     f.trap(RuntimeError)
+    ...
+    >>> p_callback = p.addCallbacks(failure=handleRuntime)
+    >>> p_callback.state == zc.async.interfaces.COMPLETED
+    True
+    >>> p_callback.result # None
+    >>> p_callback.unhandled_error
+    False
+    >>> p.unhandled_error
+    False
+
+Here are some callback examples adding a success and a failure
+simultaneously.  This one causes a success...
+
+    >>> def multiply(first, second, third=None):
+    ...     res = first * second
+    ...     if third is not None:
+    ...         res *= third
+    ...     return res
+    ...
+    >>> p = root['p'] = zc.async.partial.Partial(multiply, 5, 3)
+    >>> transaction.commit()
+    >>> def success(res):
+    ...     print "success!", res
+    ...
+    >>> def failure(f):
+    ...     print "failure.", f
+    ...
+    >>> p.addCallbacks(success, failure) # doctest: +ELLIPSIS
+    <zc.async.partial.Partial object at ...>
+    >>> res = p()
+    success! 15
+
+...and this one a failure.
+
+    >>> p = root['p'] = zc.async.partial.Partial(multiply, 5, None)
+    >>> transaction.commit()
+    >>> p.addCallbacks(success, failure) # doctest: +ELLIPSIS
+    <zc.async.partial.Partial object at ...>
+    >>> res = p() # doctest: +ELLIPSIS
+    failure. [Failure instance: Traceback: exceptions.TypeError...]
+
+you can also add multiple callbacks.
+
+    >>> def also_success(val):
+    ...     print "also a success!", val
+    ...
+    >>> def also_failure(f):
+    ...     print "also a failure.", f
+    ...
+    >>> p = root['p'] = zc.async.partial.Partial(multiply, 5, 3)
+    >>> transaction.commit()
+    >>> p.addCallbacks(success) # doctest: +ELLIPSIS
+    <zc.async.partial.Partial object at ...>
+    >>> p.addCallbacks(also_success) # doctest: +ELLIPSIS
+    <zc.async.partial.Partial object at ...>
+    >>> res = p()
+    success! 15
+    also a success! 15
+
+    >>> p = root['p'] = zc.async.partial.Partial(multiply, 5, None)
+    >>> transaction.commit()
+    >>> p.addCallbacks(failure=failure) # doctest: +ELLIPSIS
+    <zc.async.partial.Partial object at ...>
+    >>> p.addCallbacks(failure=also_failure) # doctest: +ELLIPSIS
+    <zc.async.partial.Partial object at ...>
+    >>> res = p() # doctest: +ELLIPSIS
+    failure. [Failure instance: Traceback: exceptions.TypeError...]
+    also a failure. [Failure instance: Traceback: exceptions.TypeError...]
+
+Chaining Callbacks
+------------------
+
+Sometimes it's desirable to have a chain of callables, so that one callable
+effects the input of another.  The returned partial from addCallables can
+be used for that purpose.  Effectively, the logic for addCallables is this:
+
+    def success_or_failure(success, failure, res):
+        if zc.async.interfaces.IFailure.providedBy(res):
+            if failure is not None:
+                res = failure(res)
+        elif success is not None:
+            res = success(res)
+        return res
+
+    class Partial(...):
+        ...
+        def addCallbacks(self, success=None, failure=None):
+            if success is None and failure is None:
+                return
+            res = Partial(success_or_failure, success, failure)
+            self.callbacks.append(res)
+            return res
+
+Here's a simple chain, then.  We multiply 5 * 3, then that result by 4, then
+print the result in the `success` function.
+
+    >>> p = root['p'] = zc.async.partial.Partial(multiply, 5, 3)
+    >>> transaction.commit()
+    >>> p.addCallbacks(zc.async.partial.Partial(multiply, 4)
+    ...               ).addCallbacks(success) # doctest: +ELLIPSIS
+    <zc.async.partial.Partial object at ...>
+    >>> res = p()
+    success! 60
+
+A less artificial use case is to handle errors (like try...except) or do
+cleanup (like try...finally).  Here's an example of handling errors.
+
+    >>> def handle_failure(f):
+    ...     return 0
+    >>> p = root['p'] = zc.async.partial.Partial(multiply, 5, None)
+    >>> transaction.commit()
+    >>> p.addCallbacks(
+    ...     failure=handle_failure).addCallbacks(success) # doctest: +ELLIPSIS
+    <zc.async.partial.Partial object at ...>
+    >>> res = p()
+    success! 0
+
+If you recall our discussion of unhandled errors above, then you know
+this means that even though the top partial has a failure, unhandled_error
+is False.
+
+    >>> isinstance(p.result, twisted.python.failure.Failure)
+    True
+    >>> p.unhandled_error
+    False
+
+Callbacks on Completed Partial
+------------------------------
+
+When you add a callback to a partial that has been completed, it is performed
+immediately.
+
+    >>> p = root['p'] = zc.async.partial.Partial(multiply, 5, 2)
+    >>> transaction.commit()
+    >>> res = p()
+    >>> p.result
+    10
+    >>> p.state == zc.async.interfaces.COMPLETED
+    True
+    >>> p_callback = p.addCallbacks(zc.async.partial.Partial(multiply, 3))
+    >>> p_callback.result
+    30
+    >>> p.state == zc.async.interfaces.COMPLETED
+    True
+
+Chaining Partials
+-----------------
+
+It's also possible to achieve a somewhat similar pattern by using a
+partial as a success or failure callable, and then add callbacks to the
+second partial.  This differs from the other approach in that you are only
+adding callbacks to one side, success or failure, not the effective
+combined result.
+
+    >>> p = root['p'] = zc.async.partial.Partial(multiply, 5, 3)
+    >>> transaction.commit()
+    >>> p_callback = p.addCallbacks(success)
+    >>> p2 = zc.async.partial.Partial(multiply, 4)
+    >>> p_callback_2 = p.addCallbacks(p2)
+    >>> p_callback_3 = p2.addCallbacks(also_success)
+    >>> res = p()
+    success! 15
+    also a success! 60
+
+This can be used to handle failures, to some degree.
+
+    >>> def handle_failure(f):
+    ...     return 0
+    >>> p = root['p'] = zc.async.partial.Partial(multiply, 5, None)
+    >>> transaction.commit()
+    >>> p_callback = p.addCallbacks(failure=failure)
+    >>> p2 = zc.async.partial.Partial(handle_failure)
+    >>> p_callback_2 = p.addCallbacks(failure=p2)
+    >>> p_callback_3 = p2.addCallbacks(success)
+    >>> res = p() # doctest: +ELLIPSIS
+    failure. [Failure instance: Traceback: exceptions.TypeError...]
+    success! 0
+
+Failing
+-------
+
+Speaking again of failures, it's worth discussing two other aspects of
+failing.  One is that partials offer an explicit way to fail a call.  It can
+be called when the partial is in PENDING or ACTIVE states.  The primary use
+cases for this method are to cancel a partial that is overdue to start, and
+to cancel a partial that was in progress by a worker when the worker died
+(more on that below).
+
+    >>> p = root['p'] = zc.async.partial.Partial(multiply, 5, 2)
+    >>> transaction.commit()
+    >>> p.fail()
+    >>> print p.result.getTraceback() # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
+    Traceback (most recent call last):
+    ...
+    zc.async.interfaces.AbortedError:
+
+`fail` calls all failure callbacks with the failure.
+
+    >>> p = root['p'] = zc.async.partial.Partial(multiply, 5, 2)
+    >>> p_callback = p.addCallbacks(failure=failure)
+    >>> transaction.commit()
+    >>> res = p.fail() # doctest: +ELLIPSIS
+    failure. [Failure instance: Traceback...zc.async.interfaces.AbortedError...]
+
+As seen above, it fails with zc.async.interfaces.AbortedError by default.
+You can also pass in a different error.
+
+    >>> p = root['p'] = zc.async.partial.Partial(multiply, 5, 2)
+    >>> transaction.commit()
+    >>> p.fail(RuntimeError('failed'))
+    >>> print p.result.getTraceback() # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
+    Traceback (most recent call last):
+    ...
+    exceptions.RuntimeError: failed
+
+As mentioned, if a worker dies when working on an active task, the active task
+should be aborted using `fail`, so the method also
+works if a partial is in the ACTIVE state.  We'll reach under the covers
+to show this.
+
+    >>> p = root['p'] = zc.async.partial.Partial(multiply, 5, 2)
+    >>> p._state = zc.async.interfaces.ACTIVE
+    >>> transaction.commit()
+    >>> p.fail()
+    >>> print p.result.getTraceback() # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
+    Traceback (most recent call last):
+    ...
+    zc.async.interfaces.AbortedError:
+
+It won't work for failing tasks in COMPLETED or CALLBACKS state.
+
+    >>> p.fail()
+    Traceback (most recent call last):
+    ...
+    BadStateError: can only call fail on a partial in PENDING or ACTIVE states
+    >>> p._state = zc.async.interfaces.CALLBACKS
+    >>> p.fail()
+    Traceback (most recent call last):
+    ...
+    BadStateError: can only call fail on a partial in PENDING or ACTIVE states
+
+Using `resumeCallbacks`
+-----------------------
+
+So `fail` is the proper way to handle an active partial that was being
+worked on by a dead worker, but how does one handle a partial that was in the
+CALLBACKS state?  The answer is to use resumeCallbacks.  Any partial that is
+still pending will be called; any partial that is active will be failed;
+any partial that is in the middle of calling its own callbacks will have its
+`resumeCallbacks` called; and any partial that is completed will be ignored.
+
+    >>> p = root['p'] = zc.async.partial.Partial(multiply, 5, 2)
+    >>> p._result = 10
+    >>> p._state = zc.async.interfaces.CALLBACKS
+    >>> completed_p = zc.async.partial.Partial(multiply, 3)
+    >>> callbacks_p = zc.async.partial.Partial(multiply, 4)
+    >>> callbacks_p._result = 40
+    >>> callbacks_p._state = zc.async.interfaces.CALLBACKS
+    >>> sub_callbacks_p = callbacks_p.addCallbacks(
+    ...     zc.async.partial.Partial(multiply, 2))
+    >>> active_p = zc.async.partial.Partial(multiply, 5)
+    >>> active_p._state = zc.async.interfaces.ACTIVE
+    >>> pending_p = zc.async.partial.Partial(multiply, 6)
+    >>> for _p in completed_p, callbacks_p, active_p, pending_p:
+    ...     p.callbacks.put(_p)
+    ...
+    >>> transaction.commit()
+    >>> res = completed_p(10)
+    >>> p.resumeCallbacks()
+    >>> sub_callbacks_p.result
+    80
+    >>> sub_callbacks_p.state == zc.async.interfaces.COMPLETED
+    True
+    >>> print active_p.result.getTraceback()
+    ... # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
+    Traceback (most recent call last):
+    ...
+    zc.async.interfaces.AbortedError:
+    >>> active_p.state == zc.async.interfaces.COMPLETED
+    True
+    >>> pending_p.result
+    60
+    >>> pending_p.state == zc.async.interfaces.COMPLETED
+    True
+
+Introspecting and Mutating Arguments
+------------------------------------
+
+Partial arguments can be introspected and mutated.
+
+    >>> p = root['p'] = zc.async.partial.Partial(multiply, 5, 3)
+    >>> transaction.commit()
+    >>> p.args
+    [5, 3]
+    >>> p.kwargs
+    {}
+    >>> p.kwargs['third'] = 2
+    >>> p()
+    30
+
+This can allow wrapped callables to have a reference to the partial
+itself.
+
+    >>> def show(v):
+    ...     print v
+    ...
+    >>> p = root['p'] = zc.async.partial.Partial(show)
+    >>> transaction.commit()
+    >>> p.args.append(p)
+    >>> res = p() # doctest: +ELLIPSIS
+    <zc.async.partial.Partial object at ...>
+
+A class method on Partial, `bind`, can simplify this.  It puts the partial as
+the first argument to the callable, as if the callable were bound as a method
+on the partial.
+
+    >>> p = root['p'] = zc.async.partial.Partial.bind(show)
+    >>> transaction.commit()
+    >>> res = p() # doctest: +ELLIPSIS
+    <zc.async.partial.Partial object at ...>
+
+Result and State
+----------------
+
+Partials know about their state, and after a successful call also know
+their result, whether it is a Failure or another value.  Possible states are
+the constants in zc.async.interfaces named PENDING, ACTIVE, CALLBACKS, and
+COMPLETED.
+
+    >>> def showState(partial, *ignore):
+    ...     state = partial.state
+    ...     for nm in 'PENDING', 'ACTIVE', 'CALLBACKS', 'COMPLETED':
+    ...         val = getattr(zc.async.interfaces, nm)
+    ...         if state == val:
+    ...             print nm
+    ...
+    >>> p = root['p'] = zc.async.partial.Partial.bind(showState)
+    >>> transaction.commit()
+    >>> p_callback = p.addCallbacks(zc.async.partial.Partial(showState, p))
+
+    >>> showState(p)
+    PENDING
+    >>> p.result # None
+    >>> res = p()
+    ACTIVE
+    CALLBACKS
+    >>> showState(p)
+    COMPLETED
+
+A partial may only be called when the state is PENDING: calling a
+partial again raises a BadStateError.
+
+    >>> p()
+    Traceback (most recent call last):
+    ...
+    BadStateError: can only call a partial in PENDING state
+
+Other similar restrictions include the following:
+
+- A partial may not call itself [#call_self]_.
+
+- Also, a partial's direct callback may not call the partial
+  [#callback_self]_.
+
+More Partial Introspection
+--------------------------
+
+We've already shown that it is possible to introspect unhandled_error,
+state, result, args, and kwargs.  Two other aspects of the basic partial
+functionality are introspectable: callable and callbacks.
+
+The callable is the callable (function or method of a picklable object) that
+the partial will call.  You can change it while the partial is in a pending
+state.
+
+    >>> p = root['p'] = zc.async.partial.Partial(multiply, 2)
+    >>> p.callable is multiply
+    True
+    >>> p.callable = root['demo'].increase
+    >>> p.callable == root['demo'].increase
+    True
+    >>> transaction.commit()
+    >>> root['demo'].counter
+    2
+    >>> res = p()
+    >>> root['demo'].counter
+    4
+
+The callbacks are a queue of the callbacks added by addCallbacks (or the
+currently experimental and underdocumented addCallback).  Currently the
+code may allow for direct mutation of the callbacks, but it is strongly
+suggested that you do not mutate the callbacks, especially not adding them
+except through addCallbacks or addCallback.
+
+    >>> p = root['p'] = zc.async.partial.Partial(multiply, 2, 8)
+    >>> len(p.callbacks)
+    0
+    >>> p_callback = p.addCallbacks(zc.async.partial.Partial(multiply, 5))
+    >>> len(p.callbacks)
+    1
+
+When you use addCallbacks, you actually get a callback to your callback,
+for safety reasons.  Specifically, when you use addCallbacks, the success
+and failure callbacks are actually arguments to another callback--the result
+of the `addCallbacks` call.  If a worker dies while the partial is in
+progress, active argument partials should be cleaned up and will not be
+cleaned up automatically with the logic in `resumeCallbacks` (by design:
+this may not be desired behavior in all cases).  Therefore we add a callback
+to the main callback that does this job.  We return the subsidiary callback
+so that error handling is calculated more as expected (see the
+`unhandled_error` attribute).
+
+    >>> p.callbacks[0] is p_callback
+    False
+    >>> p.callbacks[0] is p_callback.__parent__
+    True
+
+`addCallback` does not have this characteristic (you are responsible for any
+internal partials, therefore).
+
+    >>> p_callback2 = zc.async.partial.Partial(multiply, 9)
+    >>> p_callback2 is p.addCallback(p_callback2)
+    True
+
+To continue with our example of introspecting the partial...
+
+    >>> len(p.callbacks)
+    2
+    >>> p.callbacks[1] is p_callback2
+    True
+    >>> transaction.commit()
+    >>> res = p()
+    >>> p.result
+    16
+    >>> p_callback.result
+    80
+    >>> p_callback2.result
+    144
+    >>> len(p.callbacks)
+    2
+    >>> p.callbacks[0] is p_callback.__parent__
+    True
+    >>> p.callbacks[1] is p_callback2
+    True
+
+The __parent__ attribute should hold the immediate parent of a partial. 
+This means that a pending partial will be within a data manager's queue;
+an active partial will be within a worker's queue (which is within a
+worker, which is within a workers container, which is within a data
+manager); and a callback will be within another partial (which may be
+intermediate to the top level partial, in which case __parent__ of the
+intermediate partial is the top level).  Here's an example.
+
+    >>> p = root['p'] = zc.async.partial.Partial(multiply, 3, 5)
+    >>> p_callback = zc.async.partial.Partial(multiply, 2)
+    >>> p_callback2 = p.addCallbacks(p_callback)
+    >>> p_callback.__parent__ is p_callback2.__parent__
+    True
+    >>> p_callback2.__parent__.__parent__ is p
+    True
+    >>> transaction.abort()
+
+=========
+Footnotes
+=========
+
+.. [#set_up] We'll actually create the state that the text needs here.
+
+    >>> from ZODB.tests.util import DB
+    >>> db = DB()
+    >>> conn = db.open()
+    >>> root = conn.root()
+
+    You must have two adapter registrations: IConnection to
+    ITransactionManager, and IPersistent to IConnection.  We will also
+    register IPersistent to ITransactionManager because the adapter is
+    designed for it.
+
+    >>> from zc.twist import transactionManager, connection
+    >>> import zope.component
+    >>> zope.component.provideAdapter(transactionManager)
+    >>> zope.component.provideAdapter(connection)
+    >>> import ZODB.interfaces
+    >>> zope.component.provideAdapter(
+    ...     transactionManager, adapts=(ZODB.interfaces.IConnection,))
+
+    The partial class can be registered as an adapter for
+    functions and methods.  It needs to be for expected simple usage of
+    addCallbacks.
+
+    >>> import zope.component
+    >>> import types
+    >>> import zc.async.interfaces
+    >>> zope.component.provideAdapter(
+    ...     zc.async.partial.Partial,
+    ...     adapts=(types.FunctionType,),
+    ...     provides=zc.async.interfaces.IDataManagerPartial)
+    >>> zope.component.provideAdapter(
+    ...     zc.async.partial.Partial,
+    ...     adapts=(types.MethodType,),
+    ...     provides=zc.async.interfaces.IDataManagerPartial)
+
+.. [#verify] Verify interface
+
+    >>> from zope.interface.verify import verifyObject
+    >>> verifyObject(zc.async.interfaces.IPartial, p)
+    True
+    
+    Note that state and result are readonly.
+    
+    >>> p.state = 1
+    Traceback (most recent call last):
+    ...
+    AttributeError: can't set attribute
+    >>> p.result = 1
+    Traceback (most recent call last):
+    ...
+    AttributeError: can't set attribute
+
+.. [#no_live_frames] Failures have two particularly dangerous bits: the
+    traceback and the stack.  We use the __getstate__ code on Failures
+    to clean them up.  This makes the traceback (`tb`) None...
+    
+    >>> p.result.tb # None
+    
+    ...and it makes all of the values in the stack--the locals and
+    globals-- into strings.  The stack is a list of lists, in which each
+    internal list represents a frame, and contains five elements: the
+    code name (`f_code.co_name`), the code file (`f_code.co_filename`),
+    the line number (`f_lineno`), an items list of the locals, and an
+    items list for the globals.  All of the values in the items list
+    would normally be objects, but are now strings.
+    
+    >>> for (codename, filename, lineno, local_i, global_i) in p.result.stack:
+    ...     for k, v in local_i:
+    ...         assert isinstance(v, basestring), 'bad local %s' % (v,)
+    ...     for k, v in global_i:
+    ...         assert isinstance(v, basestring), 'bad global %s' % (v,)
+    ...
+    
+    Here's a reasonable question.  The Twisted Failure code has a
+    __getstate__ that cleans up the failure, and that's even what we are
+    using to sanitize the failure.  If the failure is attached to a
+    partial and stored in the ZODB, it is going to be cleaned up anyway.
+     Why explicitly clean up the failure even before it is pickled?
+
+    The answer might be classified as paranoia.  Just in case the failure
+    is kept around in memory longer--by being put on a deferred, or somehow
+    otherwise passed around--we want to eliminate any references to objects
+    in the connection as soon as possible.
+
+    Unfortunately, the __getstate__ code in the Twisted Failure can cause
+    some interaction problems for code that has a __repr__ with side effects--
+    like xmlrpclib, unfortunately.  The `twist` module has a monkeypatch
+    for that particular problem, thanks to Florent Guillaume at Nuxeo, but
+    others may be discovered.
+
+.. [#explicit_failure_example] As the main text describes, if a call raises
+    an exception, the partial will abort the transaction; but if it
+    returns a failure explicitly, the call is responsible for making any
+    desired changes to the transaction (such as aborting) before the
+    partial calls commit.  Compare.  Here is a call that raises an
+    exception, and rolls back changes.
+    
+    (Note that we are passing arguments to the partial, a topic that has
+    not yet been discussed in the text when this footnote is given: read
+    on a bit in the main text to see the details, if it seems surprising
+    or confusing.)
+
+    >>> def callAndRaise(ob):
+    ...     ob.increase()
+    ...     print ob.counter
+    ...     raise RuntimeError
+    ...
+    >>> p = root['raise_exception_example'] = zc.async.partial.Partial(
+    ...     callAndRaise, root['demo'])
+    >>> transaction.commit()
+    >>> root['demo'].counter
+    1
+    >>> res = p() # this shows the result of the print in `callAndRaise` above.
+    2
+    >>> root['demo'].counter # it was rolled back
+    1
+    >>> print p.result.getTraceback() # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
+    Traceback (most recent call last):
+    ...
+    exceptions.RuntimeError:
+
+    Here is a call that returns a failure, and does not abort, even though
+    the partial result looks very similar.
+
+    >>> import twisted.python.failure
+    >>> def returnExplicitFailure(ob):
+    ...     ob.increase()
+    ...     try:
+    ...         raise RuntimeError
+    ...     except RuntimeError:
+    ...         # we could have just made and returned a failure without the
+    ...         # try/except, but this is intended to make crystal clear that
+    ...         # exceptions are irrelevant if you catch them and return a
+    ...         # failure
+    ...         return twisted.python.failure.Failure()
+    ...
+    >>> p = root['explicit_failure_example'] = zc.async.partial.Partial(
+    ...     returnExplicitFailure, root['demo'])
+    >>> transaction.commit()
+    >>> res = p()
+    >>> root['demo'].counter # it was not rolled back automatically
+    2
+    >>> print p.result.getTraceback() # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
+    Traceback (most recent call last):
+    ...
+    exceptions.RuntimeError:
+
+.. [#call_self] Here's a partial trying to call itself.
+
+    >>> def call(obj, *ignore):
+    ...     return obj()
+    ...
+    >>> p = root['p'] = zc.async.partial.Partial.bind(call)
+    >>> transaction.commit()
+    >>> res = p()
+    >>> print p.result.getTraceback() # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
+    Traceback (most recent call last):
+    ...
+    zc.async.interfaces.BadStateError: can only call a partial in PENDING state
+
+.. [#callback_self] Here's a partial's callback trying to call the partial.
+
+    >>> p = root['p'] = zc.async.partial.Partial(multiply, 3, 4)
+    >>> p_callback = p.addCallbacks(
+    ...     zc.async.partial.Partial(call, p)).addCallbacks(failure=failure)
+    >>> transaction.commit()
+    >>> res = p() # doctest: +ELLIPSIS
+    failure. [Failure instance: Traceback: zc.async.interfaces.BadStateError...]
+    >>> p.result # the main partial still ran to completion
+    12


Property changes on: zc.async/trunk/src/zc/async/partial.txt
___________________________________________________________________
Name: svn:eol-style
   + native

Added: zc.async/trunk/src/zc/async/partials_and_transactions.txt
===================================================================
--- zc.async/trunk/src/zc/async/partials_and_transactions.txt	2006-08-15 20:31:28 UTC (rev 69534)
+++ zc.async/trunk/src/zc/async/partials_and_transactions.txt	2006-08-15 20:33:11 UTC (rev 69535)
@@ -0,0 +1,294 @@
+This is a document for maintainers and for testing.
+
+Partials manage their own transactions when they are called.  In normal
+use, this means that transactions are committed and aborted by the
+partial itself at the points marked "COMMIT" and "ABORT" in this list
+(other software components will make commits, just not the partial):
+
+- client creates a partial, puts it in a queue, and assigns callbacks to it
+  before it is run.
+- a worker claims a partial
+- a worker calls a partial
+- partial changes state to ACTIVE: COMMIT
+- partial runs the wrapped callable, stores the result on its "result"
+  attribute, changes the state to CALLBACKS, and tries to COMMIT.
+  * if there is a ZODB.POSException.TransactionError, abort and retry 5
+    times, after which ABORT, set a Failure on the result attribute,
+    COMMIT, and skip to `complete`_ step below.
+  * if there is a SystemExit, KeyboardInterrupt, or any non-TransactionError
+    ZODB.POSException.POSError (which includes all ZEO-related storage
+    errors) ABORT and raise.
+  * if there are any other exceptions, ABORT, set a Failure on the result
+    attribute, COMMIT, and skip to `complete`_ step below.
+- If the result of the wrapped callable is a partial or Twisted deferred,
+  add a callable for a method that sets the result, sets the state to
+  CALLBACKS, tries to commit as described above, and then proceeds with
+  the `complete`_ step.  COMMIT and return.
+- _`complete`: for each callback (which is itself a partial), call it.
+  Each callback partial will commit as described here.  The top partial
+  catches no errors while it runs the callbacks.
+- When all callbacks have been called, set state to COMPLETED and COMMIT.
+  if there is a ZODB.POSException.TransactionError, look in the callbacks to
+  see if there is a new one.  If there is, perform it and try again; otherwise,
+  retry this forever, logging every time, because this should not happen
+  except in the case of a new additional callback.
+  logging retries: there should be no conflict errors, because no two
+  workers should be touching this partial.
+- If a callback is added to this completed partial, perform the callback
+  and COMMIT.  If anything fails, including a ConflictError, just raise it.
+  Someone else should abort as necessary.
+- If a callback is added to a partial in any other state, set the partial's
+  _p_changed to True and commit so that we raise a ConflictError, check the
+  state again, and retry if the partial's state changed while we were
+  checking it.
+
+Note the following:
+- if a partial's wrapped callable returns a failure, that means that it
+  is taking responsiblity for any necessary abort: the partial will still
+  attempt to commit.
+- the state never changes out of COMPLETED even when a new callback is
+  added.
+- __call__ *can* raise a ConflictError; the only known way is to have two
+  workers start the same partial, which should not be possible in normal
+  zc.async usage.
+- addCallbacks may raise a ConflictError: this would happen, for instance,
+  when state is COMPLETED so callbacks are performed immediately.
+
+What could go wrong?  In this list "T1" stands for one hypothetical
+thread, and "T2" stands for another hypothetical thread, often
+overlapping in time with T1.
+
+- T1 goes to CALLBACKS state and begins evaluating callbacks.  T2 adds another
+  callback [#set_up]_.  We need to be careful that the callback is executed.
+
+    >>> import threading
+    >>> _thread_lock = threading.Lock()
+    >>> _main_lock = threading.Lock()
+    >>> called = 0
+    >>> def safe_release(lock):
+    ...     while not lock.locked():
+    ...         pass
+    ...     lock.release()
+    ...
+    >>> def locked_call(res=None):
+    ...     global called
+    ...     safe_release(_main_lock)
+    ...     _thread_lock.acquire()
+    ...     called += 1
+    ...
+    >>> def call_from_thread(p):
+    ...     id = p._p_oid
+    ...     def call():
+    ...         conn = db.open()
+    ...         p = conn.get(id)
+    ...         p()
+    ...     return call
+    ...
+    >>> _thread_lock.acquire()
+    True
+    >>> _main_lock.acquire()
+    True
+    >>> import zc.async.partial
+    >>> root['p'] = p = zc.async.partial.Partial(locked_call)
+    >>> p2 = p.addCallbacks(locked_call)
+    >>> import transaction
+    >>> transaction.commit()
+    >>> t = threading.Thread(target=call_from_thread(p))
+    >>> t.start()
+    >>> _main_lock.acquire()
+    True
+    >>> called
+    0
+    >>> trans = transaction.begin()
+    >>> p.state == zc.async.interfaces.ACTIVE
+    True
+    >>> safe_release(_thread_lock)
+    >>> _main_lock.acquire()
+    True
+    >>> called # the main call
+    1
+    >>> trans = transaction.begin()
+    >>> p.state == zc.async.interfaces.CALLBACKS
+    True
+    >>> p2 = p.addCallbacks(locked_call)
+    >>> transaction.commit()
+    >>> safe_release(_thread_lock)
+    >>> _main_lock.acquire()
+    True
+    >>> called # call back number one
+    2
+    >>> safe_release(_thread_lock)
+    >>> safe_release(_thread_lock)
+    >>> while t.isAlive():
+    ...     pass
+    ...
+    >>> called # call back number two
+    ...        # (added while first callback was in progress)
+    3
+    >>> _main_lock.release()
+
+- T1 goes to CALLBACKS state.  In the split second between checking for
+  any remaining callbacks and changing state to COMPLETED, T2 adds a
+  callback and commits.  T1 commits.  T2 thinks that callbacks are still
+  being processed, so does not process the callback, but meanwhile the
+  state is being switched to COMPLETED, and the new callback is never
+  made. For this, we could turn off MVCC, but we don't want to do that
+  if we can help it because of efficiency.  A better solution is to set
+  _p_changed in T2 on the partial, and commit; if there's a conflict
+  error, re-get the state because its change may have caused the
+  conflict.
+
+    >>> import sys
+    >>> class LockedSetter(object):
+    ...     def __init__(self, name, condition, initial=None):
+    ...         self.name = name
+    ...         self.condition = condition
+    ...         self.value = initial
+    ...     def __get__(self, obj, typ=None):
+    ...         if obj is None:
+    ...             return self
+    ...         return getattr(obj, '_z_locked_' + self.name, self.value)
+    ...     def __set__(self, obj, value):
+    ...         if self.condition(obj, value):
+    ...             safe_release(_main_lock)
+    ...             _thread_lock.acquire()
+    ...         setattr(obj, '_z_locked_' + self.name, value)
+    ...
+    >>> import zc.async.partial
+    >>> class Partial(zc.async.partial.Partial):
+    ...     _state = LockedSetter(
+    ...         '_state',
+    ...         lambda o, v: v == zc.async.interfaces.COMPLETED,
+    ...         zc.async.interfaces.PENDING)
+    ...
+    >>> called = 0
+    >>> def call(res=None):
+    ...     global called
+    ...     called += 1
+    ...
+    >>> root['p2'] = p = Partial(call)
+    >>> transaction.commit()
+    >>> _thread_lock.acquire()
+    True
+    >>> _main_lock.acquire()
+    True
+    >>> t = threading.Thread(target=call_from_thread(p))
+    >>> t.start()
+    >>> _main_lock.acquire()
+    True
+    >>> trans = transaction.begin()
+    >>> called
+    1
+    >>> p.state == zc.async.interfaces.CALLBACKS
+    True
+    >>> p2 = p.addCallbacks(call)
+    >>> transaction.commit()
+    >>> safe_release(_thread_lock)
+    >>> _main_lock.acquire()
+    True
+    >>> trans = transaction.begin()
+    >>> called
+    2
+    >>> p.state == zc.async.interfaces.CALLBACKS
+    True
+    >>> safe_release(_thread_lock)
+    >>> safe_release(_thread_lock)
+    >>> while t.isAlive():
+    ...     pass
+    ...
+    >>> _main_lock.release()
+
+  Note, because of this, addCallbacks can raise a ConflictError: it probably
+  means that the state changed out from under it.  Just retry.
+
+- T1 is performing callbacks.  T2 begins and adds a callback.  T1 changes state
+  to COMPLETED and commits.  T2 commits.  If we don't handle it carefully,
+  the callback is never called.  So we handle it carefully.
+
+    >>> _thread_lock.acquire()
+    True
+    >>> _main_lock.acquire()
+    True
+    >>> called = 0
+    >>> root['p3'] = p = zc.async.partial.Partial(call)
+    >>> p1 = p.addCallbacks(locked_call)
+    >>> transaction.commit()
+    >>> t = threading.Thread(target=call_from_thread(p))
+    >>> t.start()
+    >>> _main_lock.acquire()
+    True
+    >>> called
+    1
+    >>> trans = transaction.begin()
+    >>> def call_and_unlock(res):
+    ...     global called
+    ...     called += 1
+    ...
+    >>> p2 = p.addCallbacks(call_and_unlock)
+    >>> safe_release(_thread_lock)
+    >>> safe_release(_thread_lock)
+    >>> while t.isAlive():
+    ...     pass
+    ...
+    >>> called # the main call
+    2
+    >>> transaction.commit() # doctest: +ELLIPSIS
+    Traceback (most recent call last):
+    ...
+    ConflictError: database conflict error (..., class zc.async.partial.Partial)
+    >>> transaction.abort()
+    >>> p2 = p.addCallbacks(call_and_unlock)
+    >>> called
+    3
+    >>> transaction.commit()
+    >>> _main_lock.release()
+
+- T1 adds a callback to COMPLETED state.  It immediately runs the callback.
+  Simultaneously, T2 adds a callback to COMPLETED state.  No problem.
+
+- two workers might claim and start the same partial.  This should
+  already be stopped by workers committing transactions after they claimed
+  them.  This is considered to be a pathological case.
+
+- Generally, if a worker is determined to be dead, and its partials are
+  handed out to other workers, but the worker is actually alive, this can
+  be a serious problem.  This is also considered to be a pathological case.
+
+=========
+Footnotes
+=========
+
+.. [#set_up] We'll actually create the state that the text needs here.
+
+    >>> from ZODB.tests.util import DB
+    >>> db = DB()
+    >>> conn = db.open()
+    >>> root = conn.root()
+
+    You must have two adapter registrations: IConnection to
+    ITransactionManager, and IPersistent to IConnection.  We will also
+    register IPersistent to ITransactionManager because the adapter is
+    designed for it.
+
+    >>> from zc.twist import transactionManager, connection
+    >>> import zope.component
+    >>> zope.component.provideAdapter(transactionManager)
+    >>> zope.component.provideAdapter(connection)
+    >>> import ZODB.interfaces
+    >>> zope.component.provideAdapter(
+    ...     transactionManager, adapts=(ZODB.interfaces.IConnection,))
+
+    We also need to adapt Function and Method to IPartial.
+
+    >>> import zc.async.partial
+    >>> import zc.async.interfaces
+    >>> import zope.component
+    >>> import types
+    >>> zope.component.provideAdapter(
+    ...     zc.async.partial.Partial,
+    ...     adapts=(types.FunctionType,),
+    ...     provides=zc.async.interfaces.IPartial)
+    >>> zope.component.provideAdapter(
+    ...     zc.async.partial.Partial,
+    ...     adapts=(types.MethodType,),
+    ...     provides=zc.async.interfaces.IPartial)


Property changes on: zc.async/trunk/src/zc/async/partials_and_transactions.txt
___________________________________________________________________
Name: svn:eol-style
   + native

Added: zc.async/trunk/src/zc/async/rwproperty.py
===================================================================
--- zc.async/trunk/src/zc/async/rwproperty.py	2006-08-15 20:31:28 UTC (rev 69534)
+++ zc.async/trunk/src/zc/async/rwproperty.py	2006-08-15 20:33:11 UTC (rev 69535)
@@ -0,0 +1,75 @@
+# Read & write properties
+#
+# Copyright (c) 2006 by Philipp "philiKON" von Weitershausen
+#                       philikon at philikon.de
+#
+# Freely distributable under the terms of the Zope Public License, v2.1.
+#
+# See rwproperty.txt for detailed explanations
+#
+import sys
+
+__all__ = ['getproperty', 'setproperty', 'delproperty']
+
+class rwproperty(object):
+
+    def __new__(cls, func):
+        name = func.__name__
+
+        # ugly, but common hack
+        frame = sys._getframe(1)
+        locals = frame.f_locals
+
+        if name not in locals:
+            return cls.createProperty(func)
+
+        oldprop = locals[name]
+        if isinstance(oldprop, property):
+            return cls.enhanceProperty(oldprop, func)
+
+        raise TypeError("read & write properties cannot be mixed with "
+                        "other attributes except regular property objects.")
+
+    # this might not be particularly elegant, but it's easy on the eyes
+
+    @staticmethod
+    def createProperty(func):
+        raise NotImplementedError
+
+    @staticmethod
+    def enhanceProperty(oldprop, func):
+        raise NotImplementedError
+
+class getproperty(rwproperty):
+
+    @staticmethod
+    def createProperty(func):
+        return property(func)
+
+    @staticmethod
+    def enhanceProperty(oldprop, func):
+        return property(func, oldprop.fset, oldprop.fdel)
+
+class setproperty(rwproperty):
+
+    @staticmethod
+    def createProperty(func):
+        return property(None, func)
+
+    @staticmethod
+    def enhanceProperty(oldprop, func):
+        return property(oldprop.fget, func, oldprop.fdel)
+
+class delproperty(rwproperty):
+
+    @staticmethod
+    def createProperty(func):
+        return property(None, None, func)
+
+    @staticmethod
+    def enhanceProperty(oldprop, func):
+        return property(oldprop.fget, oldprop.fset, func)
+
+if __name__ == "__main__":
+    import doctest
+    doctest.testfile('rwproperty.txt')


Property changes on: zc.async/trunk/src/zc/async/rwproperty.py
___________________________________________________________________
Name: svn:eol-style
   + native

Added: zc.async/trunk/src/zc/async/rwproperty.txt
===================================================================
--- zc.async/trunk/src/zc/async/rwproperty.txt	2006-08-15 20:31:28 UTC (rev 69534)
+++ zc.async/trunk/src/zc/async/rwproperty.txt	2006-08-15 20:33:11 UTC (rev 69535)
@@ -0,0 +1,140 @@
+Read & write properties
+========================
+
+:Author:   Philipp von Weitershausen
+:Email:    philikon at philikon.de
+:License:  Zope Public License, v2.1
+
+Motivation
+----------
+
+Using method decorators and descriptors like ``property``, we can
+easily create computed attributes:
+
+  >>> class JamesBrown(object):
+  ...     @property
+  ...     def feel(self):
+  ...         return self._feel
+
+An attribute like this cannot be written, though.  You would have to
+do something like this:
+
+  >>> class JamesBrown(object):
+  ...     def _getFeel(self):
+  ...         return self._feel
+  ...     def _setFeel(self, feel):
+  ...         self._feel = feel
+  ...     feel = property(_getFeel, _setFeel)
+
+The problem with this approach is that it leaves the getter and setter
+sitting around in the class namespace.  It also lacks the compact
+spelling of a decorator solution.  To cope with that, some people like
+to write:
+
+  >>> class JamesBrown(object):
+  ...     @apply
+  ...     def feel():
+  ...         def get(self):
+  ...             return self._feel
+  ...         def set(self, feel):
+  ...             self._feel = feel
+  ...         return property(get, set)
+
+This spelling feels rather cumbersome, apart from the fact that
+``apply`` is `going to go away`_ in Python 3000.
+
+.. _going to go away: http://www.python.org/peps/pep-3000.html#id24
+
+
+Goal
+----
+
+There should be a way to declare a read & write property and still use
+the compact and easy decorator spelling.  The read & write properties
+should be as easy to use as the read-only property.  We explicitly
+don't want that immediately called function that really just helps us
+name the attribute and create a local scope for the getter and setter.
+
+
+Read & write property
+---------------------
+
+Read & write properties work like regular properties.  You simply
+define a method and then apply a decorator, except that you now don't
+use ``@property`` but ``@getproperty`` to mark the getter and
+``@setproperty`` to mark the setter:
+
+  >>> from rwproperty import getproperty, setproperty
+  >>> class JamesBrown(object):
+  ...     @getproperty
+  ...     def feel(self):
+  ...         return self._feel
+  ...     @setproperty
+  ...     def feel(self, feel):
+  ...         self._feel = feel
+
+  >>> i = JamesBrown()
+  >>> i.feel
+  Traceback (most recent call last):
+  ...
+  AttributeError: 'JamesBrown' object has no attribute '_feel'
+
+  >>> i.feel = "good"
+  >>> i.feel
+  'good'
+
+The order in which getters and setters are declared doesn't matter:
+
+  >>> from rwproperty import getproperty, setproperty
+  >>> class JamesBrown(object):
+  ...     @setproperty
+  ...     def feel(self, feel):
+  ...         self._feel = feel
+  ...     @getproperty
+  ...     def feel(self):
+  ...         return self._feel
+
+  >>> i = JamesBrown()
+  >>> i.feel = "good"
+  >>> i.feel
+  'good'
+
+Of course, deleters are also possible:
+
+  >>> from rwproperty import delproperty
+  >>> class JamesBrown(object):
+  ...     @setproperty
+  ...     def feel(self, feel):
+  ...         self._feel = feel
+  ...     @getproperty
+  ...     def feel(self):
+  ...         return self._feel
+  ...     @delproperty
+  ...     def feel(self):
+  ...         del self._feel
+
+  >>> i = JamesBrown()
+  >>> i.feel = "good"
+  >>> del i.feel
+  >>> i.feel
+  Traceback (most recent call last):
+  ...
+  AttributeError: 'JamesBrown' object has no attribute '_feel'
+
+
+Edge cases
+----------
+
+There might be a case where you're using a flavour of read & write
+properties and already have a non-property attribute of the same name
+defined:
+
+  >>> class JamesBrown(object):
+  ...     feel = "good"
+  ...     @getproperty
+  ...     def feel(self):
+  ...         return "so good"
+  ...
+  Traceback (most recent call last):
+  ...
+  TypeError: read & write properties cannot be mixed with other attributes except regular property objects.


Property changes on: zc.async/trunk/src/zc/async/rwproperty.txt
___________________________________________________________________
Name: svn:eol-style
   + native

Added: zc.async/trunk/src/zc/async/subscribers.py
===================================================================
--- zc.async/trunk/src/zc/async/subscribers.py	2006-08-15 20:31:28 UTC (rev 69534)
+++ zc.async/trunk/src/zc/async/subscribers.py	2006-08-15 20:33:11 UTC (rev 69535)
@@ -0,0 +1,94 @@
+import os
+import transaction
+import transaction.interfaces
+import ZODB.interfaces
+import twisted.internet.reactor
+import zope.component
+import zope.event
+import zope.app.appsetup.interfaces
+import zc.twist
+
+import zc.async.datamanager
+import zc.async.interfaces
+import zc.async.engine
+
+NAME = 'zc.async.datamanager'
+
+class InstallerAndNotifier(object):
+
+    def __init__(self, name=NAME,
+                 factory=lambda *args: zc.async.datamanager.DataManager(),
+                 get_folder=lambda r: r):
+        zope.component.adapter(
+            zope.app.appsetup.interfaces.IDatabaseOpenedEvent)(self)
+        self.name = name
+        self.factory = factory
+        self.get_folder = get_folder
+
+    def __call__(self, ev):
+        db = ev.database
+        tm = transaction.TransactionManager()
+        conn = db.open(transaction_manager=tm)
+        tm.begin()
+        try:
+            try:
+                root = conn.root()
+                folder = self.get_folder(root)
+                tm.commit()
+                if self.name not in folder:
+                    folder[self.name] = self.factory(conn, folder)
+                    if folder[self.name]._p_jar is None:
+                        conn.add(folder[self.name])
+                elif not zc.async.interfaces.IDataManager.providedBy(
+                    folder[self.name]):
+                    raise RuntimeError(
+                        'IDataManager not found') # TODO better error
+                zope.event.notify(
+                    zc.async.interfaces.DataManagerAvailable(folder[self.name]))
+                tm.commit()
+            except:
+                tm.abort()
+                raise
+        finally:
+            conn.close()
+
+basicInstallerAndNotifier = InstallerAndNotifier()
+
+class SeparateDBCreation(object):
+    def __init__(self, db_name='zc.async', name=NAME,
+                 factory=zc.async.datamanager.DataManager,
+                 get_folder=lambda r:r):
+        self.db_name = db_name
+        self.name = name
+        self.factory = factory
+        self.get_folder = get_folder
+
+    def __call__(self, conn, folder):
+        conn2 = conn.get_connection(self.db_name)
+        tm = transaction.interfaces.ITransactionManager(conn)
+        root = conn2.root()
+        folder = self.get_folder(root)
+        tm.commit()
+        if self.name in folder:
+            raise ValueError('data manager already exists in separate database',
+                             self.db_name, folder, self.name)
+        dm = folder[self.name] = self.factory()
+        conn2.add(dm)
+        tm.commit()
+        return dm
+
+installerAndNotifier = InstallerAndNotifier(factory=SeparateDBCreation())
+
+ at zope.component.adapter(zc.async.interfaces.IDataManagerAvailableEvent)
+def installTwistedEngine(ev):
+    engine = zc.async.engine.Engine(
+        zope.component.getUtility(
+            zc.async.interfaces.IUUID, 'instance'),
+        zc.async.datamanager.Worker)
+    dm = ev.object
+    twisted.internet.reactor.callLater(
+        0,
+        zc.twist.Partial(engine.poll, dm))
+    twisted.internet.reactor.addSystemEventTrigger(
+        'before', 'shutdown', zc.twist.Partial(
+            engine.tearDown, dm))


Property changes on: zc.async/trunk/src/zc/async/subscribers.py
___________________________________________________________________
Name: svn:eol-style
   + native

Added: zc.async/trunk/src/zc/async/tests.py
===================================================================
--- zc.async/trunk/src/zc/async/tests.py	2006-08-15 20:31:28 UTC (rev 69534)
+++ zc.async/trunk/src/zc/async/tests.py	2006-08-15 20:33:11 UTC (rev 69535)
@@ -0,0 +1,130 @@
+import os
+import shutil
+import unittest
+
+from zope.testing import doctest, module
+import zope.component
+import zope.component.testing
+import zope.component.eventtesting
+import zc.async.partial
+import zc.async.subscribers
+
+def modSetUp(test):
+    zope.component.testing.setUp(test)
+    module.setUp(test, 'zc.async.doctest_test')
+
+def modTearDown(test):
+    module.tearDown(test)
+    zope.component.testing.tearDown(test)
+
+def uuidSetUp(test):
+    test.globs['old_instance_home'] = os.environ.get("INSTANCE_HOME")
+    os.environ['INSTANCE_HOME'] = os.path.join(os.path.dirname(
+        zc.async.interfaces.__file__), '_test_tmp')
+    os.mkdir(os.environ['INSTANCE_HOME'])
+    os.mkdir(os.path.join(os.environ['INSTANCE_HOME'], 'etc'))
+
+def uuidTearDown(test):
+    shutil.rmtree(os.environ['INSTANCE_HOME'])
+    if test.globs['old_instance_home'] is None:
+        del os.environ['INSTANCE_HOME']
+    else:
+        os.environ['INSTANCE_HOME'] = test.globs['old_instance_home']
+    del test.globs['old_instance_home']
+
+def readmeSetUp(test):
+    modSetUp(test)
+    uuidSetUp(test)
+    zope.component.eventtesting.setUp(test)
+    test.globs['installerAndNotifier'] = (
+        zc.async.subscribers.basicInstallerAndNotifier)
+    from zc.async import instanceuuid
+    instanceuuid.UUID = instanceuuid.getUUID()
+    zope.component.provideUtility(instanceuuid.UUID, name='instance')
+
+def altReadmeSetUp(test):
+    modSetUp(test)
+    uuidSetUp(test)
+    zope.component.eventtesting.setUp(test)
+    test.globs['installerAndNotifier'] = (
+        zc.async.subscribers.installerAndNotifier)
+    from zc.async import instanceuuid
+    instanceuuid.UUID = instanceuuid.getUUID()
+    zope.component.provideUtility(instanceuuid.UUID, name='instance')
+
+def readmeTearDown(test):
+    r = test.globs.get('faux')
+    if r:
+        for when, eventname, callable in r.triggers:
+            if eventname == 'shutdown': # test didn't run to completion
+                # let's clean up
+                callable()
+    uuidTearDown(test)
+    modTearDown(test)
+
+def test_instanceuuid():
+    """This module provides access to a UUID that is intended to uniquely
+    identify this software instance.  Read the `msg` value below for more
+    information.
+    
+    The uuid is generated and then stashed in a file.  It only works if
+    the INSTANCE_HOME environment variable is set to a folder that has an
+    `etc` folder in it--a standard Zope set up.  For this test, we mock it
+    up in uuidSetUp and uuidTearDown below.
+    
+        >>> import zc.async.instanceuuid
+        >>> import uuid
+        >>> isinstance(zc.async.instanceuuid.UUID, uuid.UUID)
+        True
+        >>> (zc.async.instanceuuid.getUUID() ==
+        ...  zc.async.instanceuuid.UUID ==
+        ...  zc.async.instanceuuid.getUUID())
+        True
+
+    uuid.UUIDs now provide zc.async.interfaces.IUUID
+    
+        >>> import zc.async.interfaces
+        >>> zc.async.interfaces.IUUID.implementedBy(uuid.UUID)
+        True
+        >>> zc.async.interfaces.IUUID.providedBy(zc.async.instanceuuid.UUID)
+        True
+
+    That's a bit invasive, but now you can register the instance UUID as
+    a utility and get it back out as something that provides
+    zc.async.interfaces.IUUID.
+    
+        >>> import zope.component
+        >>> zope.component.provideUtility(
+        ...     zc.async.instanceuuid.UUID, name='instance')
+        >>> id = zope.component.getUtility(
+        ...     zc.async.interfaces.IUUID, 'instance')
+        >>> id is zc.async.instanceuuid.UUID
+        True
+
+    (Unfortunately you can't register a utility to provide a class, or I
+    would have done that...though maybe that's not unfortunate :-) )
+
+    """
+
+def test_suite():
+    return unittest.TestSuite((
+        doctest.DocTestSuite(setUp=uuidSetUp, tearDown=uuidTearDown),
+        doctest.DocFileSuite(
+            'partial.txt',
+            'partials_and_transactions.txt',
+            'datamanager.txt',
+            setUp=modSetUp, tearDown=modTearDown,
+            optionflags=doctest.INTERPRET_FOOTNOTES),
+        doctest.DocFileSuite(
+            'README.txt',
+            setUp=readmeSetUp, tearDown=readmeTearDown,
+            optionflags=doctest.INTERPRET_FOOTNOTES),
+        doctest.DocFileSuite(
+            'README.txt',
+            setUp=altReadmeSetUp, tearDown=readmeTearDown,
+            optionflags=doctest.INTERPRET_FOOTNOTES),
+        ))
+
+
+if __name__ == '__main__':
+    unittest.main(defaultTest='test_suite')


Property changes on: zc.async/trunk/src/zc/async/tests.py
___________________________________________________________________
Name: svn:eol-style
   + native



More information about the Checkins mailing list