[Checkins] SVN: zc.async/trunk/src/zc/async/ Initial checkin. Ready
for experimentation, but not yet prime time.
Gary Poster
gary at zope.com
Tue Aug 15 16:33:11 EDT 2006
Log message for revision 69535:
Initial checkin. Ready for experimentation, but not yet prime time.
Changed:
A zc.async/trunk/src/zc/async/README.txt
A zc.async/trunk/src/zc/async/__init__.py
A zc.async/trunk/src/zc/async/adapters.py
A zc.async/trunk/src/zc/async/datamanager.py
A zc.async/trunk/src/zc/async/datamanager.txt
A zc.async/trunk/src/zc/async/engine.py
A zc.async/trunk/src/zc/async/i18n.py
A zc.async/trunk/src/zc/async/instanceuuid.py
A zc.async/trunk/src/zc/async/interfaces.py
A zc.async/trunk/src/zc/async/partial.py
A zc.async/trunk/src/zc/async/partial.txt
A zc.async/trunk/src/zc/async/partials_and_transactions.txt
A zc.async/trunk/src/zc/async/rwproperty.py
A zc.async/trunk/src/zc/async/rwproperty.txt
A zc.async/trunk/src/zc/async/subscribers.py
A zc.async/trunk/src/zc/async/tests.py
-=-
Added: zc.async/trunk/src/zc/async/README.txt
===================================================================
--- zc.async/trunk/src/zc/async/README.txt 2006-08-15 20:31:28 UTC (rev 69534)
+++ zc.async/trunk/src/zc/async/README.txt 2006-08-15 20:33:11 UTC (rev 69535)
@@ -0,0 +1,620 @@
+========
+zc.async
+========
+
+The zc.async package provides a way to make asynchronous application
+calls.
+
+Calls are handled by worker processes, each of which is typically a
+standard Zope process, identified by a UUID [#uuid]_, that
+may simultaneously perform other tasks (such as handle standard web
+requests). Each worker is responsible for claiming and performing calls
+in its main thread or additional threads. To have multiple workers on
+the same queue of tasks, share the database with ZEO.
+
+Pending calls and worker data objects are stored in a data manager
+object in the ZODB. This is typically stored in the root of the ZODB,
+alongside the application object, with a key of 'zc.async.datamanager',
+but the adapter that obtains the data manager can be replaced to point
+to a different location.
+
+Worker data objects have queues representing potential or current tasks
+for the worker, in the main thread or a secondary thread. Each worker
+has a virtual loop, part of the Twisted or asyncore main loop, for every
+worker process, which is responsible for responding to system calls
+(like pings) and for claiming pending main thread calls by moving them
+from the datamanager async queue to their own. Each worker thread queue
+also represents spots for claiming and performing pending thread
+calls.
+
+Set Up
+======
+
+By default, zc.async expects to have an object in the root of
+the ZODB, alongside the application object, with a key of
+'zc.async.datamanager'. The package includes subscribers to
+zope.app.appsetup.interfaces.IDatabaseOpenedEvent that sets an instance
+up in this location if one does not exist.
+
+One version of the subscriber expects to put the object in the same
+database as the main application (`basicInstallerAndNotifier`), and the
+other version expects to put the object in a secondary database, with a
+reference to it in the main database (`installerAndNotifier`). The
+second approach keeps the database churn generated by zc.async, which
+can be significant, separate from your main data. You can use either
+(or your own); the first version is the default, since it requires no
+additional set-up. When this documentation is run as a test, it is run
+twice, once with each setup. To accomodate this, in our example below
+we appear to pull the "installerAndNotifier" out of the air: it is
+installed as a global when the test is run. You will want to use one of
+the two subscribers mentioned above, or roll your own.
+
+XXX explain possible gotchas if you run the separate database (i.e., you may
+have to explicitly add objects to connections if you create an object for the
+main database and put it as a partial callable or argument in the same
+transaction).
+
+Let's assume we have a reference to a database named `db`, a connection
+named `conn`, a `root`, and an application in the 'app' key
+[#setup]_. If we provide a handler, fire the event and examine the
+root, we will see the new datamanager.
+
+ >>> import zope.component
+ >>> import zc.async.subscribers
+ >>> zope.component.provideHandler(installerAndNotifier) # see above
+ ... # for explanation of where installerAndNotifier came from
+ >>> import zope.event
+ >>> import zope.app.appsetup.interfaces
+ >>> zope.event.notify(zope.app.appsetup.interfaces.DatabaseOpened(db))
+ >>> import transaction
+ >>> t = transaction.begin()
+ >>> root['zc.async.datamanager'] # doctest: +ELLIPSIS
+ <zc.async.datamanager.DataManager object at ...>
+
+The default adapter from persistent object to datamanager will get us
+the same result.
+
+ >>> import zc.async.adapters
+ >>> zope.component.provideAdapter(
+ ... zc.async.adapters.defaultDataManagerAdapter)
+ >>> import zc.async.interfaces
+ >>> zc.async.interfaces.IDataManager(app) # doctest: +ELLIPSIS
+ <zc.async.datamanager.DataManager object at ...>
+
+Normally, each process discovers or creates its UUID and registers
+itself with the data manager as a worker. This would have happened when
+the data manager was announced as available in the InstallerAndNotifier
+above.
+
+ >>> from zope.component import eventtesting
+ >>> evs = eventtesting.getEvents(
+ ... zc.async.interfaces.IDataManagerAvailableEvent)
+ >>> evs # doctest: +ELLIPSIS
+ [<zc.async.interfaces.DataManagerAvailable object at ...>]
+
+So now we would have had a subscriber that installed the worker in the
+data manager. But right now there are no workers, just because we
+didn't want to talk about the next step yet.
+
+ >>> len(zc.async.interfaces.IDataManager(app).workers)
+ 0
+
+Let's install the subscriber we need and refire the event. Our worker
+will have a UUID created for it, and then it will be installed with the
+UUID as key. We can't actually use the same event because it has an
+object from a different connection, so we'll recreate it.
+
+ >>> zope.component.provideHandler(
+ ... zc.async.subscribers.installTwistedEngine)
+ >>> zope.event.notify(
+ ... zc.async.interfaces.DataManagerAvailable(
+ ... root['zc.async.datamanager']))
+ >>> time_passes()
+ True
+ >>> t = transaction.begin() # sync
+ >>> len(zc.async.interfaces.IDataManager(app).workers)
+ 1
+ >>> zc.async.interfaces.IDataManager(app).workers.values()[0]
+ ... # doctest: +ELLIPSIS
+ <zc.async.datamanager.Worker object at ...>
+ >>> (zc.async.interfaces.IDataManager(app).workers.values()[0].engineUUID
+ ... is not None)
+ True
+
+The new UUID, in hex, is stored in INSTANCE_HOME/etc/uuid.txt
+
+ >>> import uuid
+ >>> import os
+ >>> f = open(os.path.join(
+ ... os.environ.get("INSTANCE_HOME"), 'etc', 'uuid.txt'))
+ >>> uuid_hex = f.readline().strip()
+ >>> f.close()
+ >>> uuid = uuid.UUID(uuid_hex)
+ >>> worker = zc.async.interfaces.IDataManager(app).workers[uuid]
+ >>> worker.UUID == uuid
+ True
+
+The file is intended to stay in the instance home as a persistent identifier
+of this particular worker.
+
+Our worker has `thread` and `reactor` jobs, with all jobs available.
+
+ >>> worker.thread.size
+ 1
+ >>> worker.reactor.size
+ 4
+ >>> len(worker.thread)
+ 0
+ >>> len(worker.reactor)
+ 0
+
+We now have a simple set up: a data manager with a single worker. Let's start
+making some asynchronous calls!
+
+Basic Usage: IManager.add
+=========================
+
+The simplest case is simple to perform: pass a persistable callable to the
+manager's .add method.
+
+ >>> from zc.async import interfaces
+ >>> dm = zc.async.interfaces.IDataManager(app)
+ >>> def send_message():
+ ... print "imagine this sent a message to another machine"
+ >>> partial = dm.reactor.put(send_message)
+ >>> transaction.commit()
+
+Now a few cycles need to pass in order to have the job performed. We'll
+use a helper function called `time_flies` to simulate the asynchronous
+cycles necessary for the manager and workers to perform the task.
+
+ >>> count = time_flies(dm.workers.values()[0].poll_seconds)
+ imagine this sent a message to another machine
+
+You can also pass a datetime.datetime to schedule the call: the
+safest thing to use is a UTC timezone. The datetime is interpreted as a
+UTC datetime.
+
+ >>> t = transaction.begin()
+ >>> import datetime
+ >>> import pytz
+ >>> datetime.datetime.now(pytz.UTC)
+ datetime.datetime(2006, 8, 10, 15, 44, 27, 211, tzinfo=<UTC>)
+ >>> partial = dm.reactor.put(
+ ... send_message, datetime.datetime(
+ ... 2006, 8, 10, 15, 45, tzinfo=pytz.UTC))
+ >>> partial.begin_after
+ datetime.datetime(2006, 8, 10, 15, 45, tzinfo=<UTC>)
+ >>> transaction.commit()
+ >>> count = time_flies(10)
+ >>> count = time_flies(10)
+ >>> count = time_flies(10)
+ >>> count = time_flies(5)
+ imagine this sent a message to another machine
+ >>> datetime.datetime.now(pytz.UTC)
+ datetime.datetime(2006, 8, 10, 15, 45, 2, 211, tzinfo=<UTC>)
+
+If you set a time that has already passed, it will be run as if it had
+been set to run immediately.
+
+ >>> t = transaction.begin()
+ >>> partial = dm.reactor.put(
+ ... send_message, datetime.datetime(2006, 7, 21, 12, tzinfo=pytz.UTC))
+ >>> transaction.commit()
+ >>> count = time_flies(5)
+ imagine this sent a message to another machine
+
+The `add` method of the thread and reactor queues is the manager's
+entire application API. Other methods are used to introspect, but are
+not needed for basic usage. We will examine the introspection API below
+(`Manager Introspection`_), and will discuss an advanced feature of the
+`add` method (`Specifying Workers`), but let's explore some more usage
+patterns first.
+
+Typical Usage: zc.async.Partial
+================================
+
+...(currently tests and discussion are in partial.txt and datamanager.txt.
+We need user-friendly docs, as well as stress tests. The remainder of the
+below is somewhat unedited and incomplete at the moment)...
+
+ >>> t = transaction.begin()
+ >>> import zc.async
+ >>> import persistent
+ >>> import transaction
+ >>> import zc.async.partial
+ >>> class Demo(persistent.Persistent):
+ ... counter = 0
+ ... def increase(self, value=1):
+ ... self.counter += value
+ ...
+ >>> app['demo'] = Demo()
+ >>> transaction.commit() # XXX example of gotcha for multiple databases:
+ ... # connection.add or commit before adding to partial
+ >>> app['demo'].counter
+ 0
+ >>> partial = dm.reactor.put(
+ ... zc.async.partial.Partial(app['demo'].increase))
+ >>> transaction.commit()
+ >>> count = time_flies(5)
+
+We need to commit the transaction in our connection so that we get the
+changes in other connections (beginning and committing transactions sync
+connections).
+
+ >>> app['demo'].counter
+ 0
+ >>> t = transaction.begin()
+ >>> app['demo'].counter
+ 1
+
+The deferred class can take arguments and keyword arguments for the
+wrapped callable as well, similar to Python 2.5's `partial`. For this
+use case, though, realize that the partial will be called with no
+arguments, so you must supply all necessary arguments for the callable
+on creation time.
+
+ >>> partial = dm.reactor.put(
+ ... zc.async.partial.Partial(app['demo'].increase, 5))
+ >>> transaction.commit()
+ >>> count = time_flies(5)
+ >>> t = transaction.begin()
+ >>> app['demo'].counter
+ 6
+ >>> partial = dm.reactor.put(
+ ... zc.async.partial.Partial(app['demo'].increase, value=10))
+ >>> transaction.commit()
+ >>> count = time_flies(5)
+ >>> t = transaction.begin()
+ >>> app['demo'].counter
+ 16
+
+Optimized Usage
+===============
+
+Writing a task that doesn't need a ZODB connection
+--------------------------------------------------
+
+...Twisted reactor tasks are best for this...
+
+...also could have different IPartial implementation sets self as
+ACTIVE, commits and closes connection, calls f with args, and when
+result returns, gets connection again and sets value on it, changes
+state, and performs callbacks, sets state...
+
+Multiple ZEO workers
+--------------------
+
+...
+
+Catching and Fixing Errors
+==========================
+
+...call installed during InstallTwistedWorker to check on worker...
+
+...worker finds another process already installed with same UUID; could be
+shutdown error (ghost of self) or really another process...show engineUUID...
+some discussion already in datamanager.txt...
+
+Gotchas
+=======
+
+...some callbacks may still be working when partial is completed. Therefore
+partial put in `completed` for worker so that it can have a chance to run to
+completion (in addition to other goals, like being able to look at
+
+Patterns
+========
+
+Partials That Need a Certain Environment
+----------------------------------------
+
+...Partial that needs a certain environment: wrap partial in partial. Outer
+partial is responsible for checking if environment is good; if so, run inner
+partial, and if not, create a new outer partial, copy over our excluded worker
+UUIDs, add this worker UUID, set perform_after to adjusted value,
+and schedule it...
+
+Callbacks That Want to be Performed by a Worker
+-----------------------------------------------
+
+Callbacks are called immediately, whether they be within the call to the
+partial, or within the `addCallbacks` call. If you want the job to be done
+asynchronously, make the callback with a partial. The partial will get
+a reference to the data_manager used by the main partial. It can create a
+partial, assign it to one of the data manager queues, and return the partial.
+Consider the following.
+
+ >>> def multiply(*args):
+ ... res = 1
+ ... for a in args:
+ ... res *= a
+ ... return res
+ ...
+ >>> def doCallbackWithPartial(partial, res):
+ ... p = zc.async.partial.Partial(multiply, 2, res)
+ ... zc.async.interfaces.IDataManager(partial).thread.put(p)
+ ... return p
+ ...
+ >>> p = dm.thread.put(zc.async.partial.Partial(multiply, 3, 4))
+ >>> p_callback = p.addCallbacks(
+ ... zc.async.partial.Partial.bind(doCallbackWithPartial))
+ >>> transaction.commit()
+ >>> import time
+ >>> for i in range(100):
+ ... ignore = time_flies(5)
+ ... time.sleep(0)
+ ... t = transaction.begin()
+ ... if p_callback.state == zc.async.interfaces.COMPLETED:
+ ... break
+ ...
+ >>> p.result
+ 12
+ >>> p.state == zc.async.interfaces.COMPLETED
+ True
+ >>> p_callback.state == zc.async.interfaces.COMPLETED
+ True
+ >>> p_callback.result
+ 24
+
+Progress Reports
+----------------
+
+Using zc.twist.Partial, or by managing your own connections
+otherwise, you can send messages back during a long-running connection.
+For instance, imagine you wanted to annotate a partial with progress
+messages, while not actually committing the main work.
+
+Here's an example of one way of getting this to work. We can use the partial's
+annotations, which are not touched by the partial code and are a separate
+persistent object, so can be changed concurrently without conflict errors.
+
+We'll run the partial within a threaded worker. The callable could use
+twisted.internet.reactor.callFromThread to get the change to be made.
+Parts of the twist.Partial machinery expect to be called in the main
+thread, where the twisted reactor is running.
+
+ >>> import twisted.internet.reactor
+ >>> def setAnnotation(partial, annotation_key, value):
+ ... partial.annotations[annotation_key] = value
+ ...
+ >>> import threading
+ >>> thread_lock = threading.Lock()
+ >>> main_lock = threading.Lock()
+ >>> acquired = thread_lock.acquire()
+ >>> acquired = main_lock.acquire()
+ >>> def callWithProgressReport(partial):
+ ... print "do some work"
+ ... print "more work"
+ ... print "about half done"
+ ... twisted.internet.reactor.callFromThread(zc.twist.Partial(
+ ... setAnnotation, partial, 'zc.async.partial_txt.half_done', True))
+ ... main_lock.release()
+ ... acquired = thread_lock.acquire()
+ ... return 42
+ ...
+ >>> p = dm.thread.put(zc.async.partial.Partial.bind(callWithProgressReport))
+ >>> transaction.commit()
+ >>> ignore = time_flies(5) # get the reactor to kick for main call
+ do some work
+ more work
+ about half done
+ >>> ignore = time_flies(5) # get the reactor to kick for progress report
+ >>> acquired = main_lock.acquire()
+ >>> t = transaction.begin() # sync
+ >>> p.annotations.get('zc.async.partial_txt.half_done')
+ True
+ >>> p.state == zc.async.interfaces.ACTIVE
+ True
+ >>> thread_lock.release()
+ >>> for i in range(100):
+ ... ignore = time_flies(5)
+ ... time.sleep(0)
+ ... t = transaction.begin()
+ ... if p.state == zc.async.interfaces.COMPLETED:
+ ... break
+ ...
+ >>> p.result
+ 42
+ >>> thread_lock.release()
+ >>> main_lock.release()
+
+Expiration
+----------
+
+If you want your call to expire after a certain amount of time, keep
+track of time yourself, and return a failure if you go over. The
+partial does not offer special support for this use case.
+
+Stopping the Engine
+-------------------
+
+The subscriber that sets up the async engine within the Twisted reactor also
+sets up a tearDown trigger. We can look in our faux reactor and get it.
+
+ >>> len(faux.triggers)
+ 1
+ >>> len(faux.triggers[0])
+ 3
+ >>> faux.triggers[0][:2]
+ ('before', 'shutdown')
+ >>> dm.workers.values()[0].engineUUID is not None
+ True
+ >>> d = faux.triggers[0][2]()
+ >>> t = transaction.begin()
+ >>> dm.workers.values()[0].engineUUID is None
+ True
+
+[#tear_down]_
+
+=========
+Footnotes
+=========
+
+.. [#uuid] UUIDs are generated by http://zesty.ca/python/uuid.html, as
+ incorporated in Python 2.5. They are expected to be found in
+ os.path.join(os.environ.get("INSTANCE_HOME"), 'etc', 'uuid.txt');
+ this file will be created and populated with a new UUID if it does
+ not exist.
+
+.. [#setup] This is a bit more than standard set-up code for a ZODB test,
+ because it sets up a multi-database.
+
+ >>> from ZODB.tests.util import DB # use conflict resolution test one XXX
+ >>> class Factory(object):
+ ... def __init__(self, name):
+ ... self.name = name
+ ... def open(self):
+ ... return DB()
+ ...
+ >>> import zope.app.appsetup.appsetup
+ >>> db = zope.app.appsetup.appsetup.multi_database(
+ ... (Factory('main'), Factory('zc.async')))[0][0]
+ >>> conn = db.open()
+ >>> root = conn.root()
+ >>> import zope.app.folder # import rootFolder
+ >>> app = root['Application'] = zope.app.folder.rootFolder()
+ >>> import transaction
+ >>> transaction.commit()
+
+ You must have two adapter registrations: IConnection to
+ ITransactionManager, and IPersistent to IConnection. We will also
+ register IPersistent to ITransactionManager because the adapter is
+ designed for it.
+
+ >>> from zc.twist import transactionManager, connection
+ >>> import zope.component
+ >>> zope.component.provideAdapter(transactionManager)
+ >>> zope.component.provideAdapter(connection)
+ >>> import ZODB.interfaces
+ >>> zope.component.provideAdapter(
+ ... transactionManager, adapts=(ZODB.interfaces.IConnection,))
+
+ We need to be able to get data manager partials for functions and methods;
+ normal partials for functions and methods; and a data manager for a partial.
+ Here are the necessary registrations.
+
+ >>> import zope.component
+ >>> import types
+ >>> import zc.async.interfaces
+ >>> import zc.async.partial
+ >>> import zc.async.adapters
+ >>> zope.component.provideAdapter(
+ ... zc.async.adapters.method_to_datamanagerpartial)
+ >>> zope.component.provideAdapter(
+ ... zc.async.adapters.function_to_datamanagerpartial)
+ >>> zope.component.provideAdapter( # partial -> datamanagerpartial
+ ... zc.async.adapters.DataManagerPartial,
+ ... provides=zc.async.interfaces.IDataManagerPartial)
+ >>> zope.component.provideAdapter(
+ ... zc.async.adapters.partial_to_datamanager)
+ >>> zope.component.provideAdapter(
+ ... zc.async.partial.Partial,
+ ... adapts=(types.FunctionType,),
+ ... provides=zc.async.interfaces.IPartial)
+ >>> zope.component.provideAdapter(
+ ... zc.async.partial.Partial,
+ ... adapts=(types.MethodType,),
+ ... provides=zc.async.interfaces.IPartial)
+ ...
+
+ A monkeypatch, removed in another footnote below.
+
+ >>> import datetime
+ >>> import pytz
+ >>> old_datetime = datetime.datetime
+ >>> def set_now(dt):
+ ... global _now
+ ... _now = _datetime(*dt.__reduce__()[1])
+ ...
+ >>> class _datetime(old_datetime):
+ ... @classmethod
+ ... def now(klass, tzinfo=None):
+ ... if tzinfo is None:
+ ... return _now.replace(tzinfo=None)
+ ... else:
+ ... return _now.astimezone(tzinfo)
+ ... def astimezone(self, tzinfo):
+ ... return _datetime(
+ ... *super(_datetime,self).astimezone(tzinfo).__reduce__()[1])
+ ... def replace(self, *args, **kwargs):
+ ... return _datetime(
+ ... *super(_datetime,self).replace(
+ ... *args, **kwargs).__reduce__()[1])
+ ... def __repr__(self):
+ ... raw = super(_datetime, self).__repr__()
+ ... return "datetime.datetime%s" % (
+ ... raw[raw.index('('):],)
+ ... def __reduce__(self):
+ ... return (argh, super(_datetime, self).__reduce__()[1])
+ >>> def argh(*args, **kwargs):
+ ... return _datetime(*args, **kwargs)
+ ...
+ >>> datetime.datetime = _datetime
+ >>> _start = _now = datetime.datetime(
+ ... 2006, 8, 10, 15, 44, 22, 211, pytz.UTC)
+
+ We monkeypatch twisted.internet.reactor (and replace it below).
+
+ >>> import twisted.internet.reactor
+ >>> import threading
+ >>> import bisect
+ >>> class FauxReactor(object):
+ ... def __init__(self):
+ ... self.time = 0
+ ... self.calls = []
+ ... self.triggers = []
+ ... self._lock = threading.Lock()
+ ... def callLater(self, delay, callable, *args, **kw):
+ ... res = (delay + self.time, callable, args, kw)
+ ... self._lock.acquire()
+ ... bisect.insort(self.calls, res)
+ ... self._lock.release()
+ ... # normally we're supposed to return something but not needed
+ ... def callFromThread(self, callable, *args, **kw):
+ ... self._lock.acquire()
+ ... bisect.insort(
+ ... self.calls,
+ ... (self.time, callable, args, kw))
+ ... self._lock.release()
+ ... def addSystemEventTrigger(self, *args):
+ ... self.triggers.append(args) # 'before', 'shutdown', callable
+ ... def time_flies(self, time):
+ ... global _now
+ ... end = self.time + time
+ ... ct = 0
+ ... while self.calls and self.calls[0][0] <= end:
+ ... self.time, callable, args, kw = self.calls.pop(0)
+ ... _now = _datetime(
+ ... *(_start + datetime.timedelta(
+ ... seconds=self.time)).__reduce__()[1])
+ ... callable(*args, **kw) # normally this would get try...except
+ ... ct += 1
+ ... self.time = end
+ ... return ct
+ ... def time_passes(self):
+ ... if self.calls and self.calls[0][0] <= self.time:
+ ... self.time, callable, args, kw = self.calls.pop(0)
+ ... callable(*args, **kw)
+ ... return True
+ ... return False
+ ...
+ >>> faux = FauxReactor()
+ >>> oldCallLater = twisted.internet.reactor.callLater
+ >>> oldCallFromThread = twisted.internet.reactor.callFromThread
+ >>> oldAddSystemEventTrigger = (
+ ... twisted.internet.reactor.addSystemEventTrigger)
+ >>> twisted.internet.reactor.callLater = faux.callLater
+ >>> twisted.internet.reactor.callFromThread = faux.callFromThread
+ >>> twisted.internet.reactor.addSystemEventTrigger = (
+ ... faux.addSystemEventTrigger)
+ >>> time_flies = faux.time_flies
+ >>> time_passes = faux.time_passes
+
+.. [#tear_down]
+
+ >>> twisted.internet.reactor.callLater = oldCallLater
+ >>> twisted.internet.reactor.callFromThread = oldCallFromThread
+ >>> twisted.internet.reactor.addSystemEventTrigger = (
+ ... oldAddSystemEventTrigger)
+ >>> datetime.datetime = old_datetime
Property changes on: zc.async/trunk/src/zc/async/README.txt
___________________________________________________________________
Name: svn:eol-style
+ native
Added: zc.async/trunk/src/zc/async/__init__.py
===================================================================
Property changes on: zc.async/trunk/src/zc/async/__init__.py
___________________________________________________________________
Name: svn:eol-style
+ native
Added: zc.async/trunk/src/zc/async/adapters.py
===================================================================
--- zc.async/trunk/src/zc/async/adapters.py 2006-08-15 20:31:28 UTC (rev 69534)
+++ zc.async/trunk/src/zc/async/adapters.py 2006-08-15 20:33:11 UTC (rev 69535)
@@ -0,0 +1,121 @@
+import types
+import datetime
+import pytz
+import persistent
+import persistent.interfaces
+import zope.interface
+import zope.component
+
+import zc.async.interfaces
+import zc.async.subscribers
+import zc.set
+from zc.async import rwproperty
+
+
+ at zope.component.adapter(persistent.interfaces.IPersistent)
+ at zope.interface.implementer(zc.async.interfaces.IDataManager)
+def defaultDataManagerAdapter(obj):
+ return obj._p_jar.root()[zc.async.subscribers.NAME]
+
+
+ at zope.component.adapter(zc.async.interfaces.IPartial)
+ at zope.interface.implementer(zc.async.interfaces.IDataManager)
+def partial_to_datamanager(partial):
+ p = partial.__parent__
+ while (p is not None and
+ not zc.async.interfaces.IDataManager.providedBy(p)):
+ p = getattr(p, '__parent__', None)
+ return p
+
+
+class TransparentDescriptor(object):
+ def __init__(self, src_name, value_name, readonly=False):
+ self.src_name = src_name
+ self.value_name = value_name
+ self.readonly = readonly
+
+ def __get__(self, obj, klass=None):
+ if obj is None:
+ return self
+ src = getattr(obj, self.src_name)
+ return getattr(src, self.value_name)
+
+ def __set__(self, obj, value):
+ if self.readonly:
+ raise AttributeError
+ src = getattr(obj, self.src_name)
+ setattr(src, self.value_name, value)
+
+
+class DataManagerPartialData(persistent.Persistent):
+
+ workerUUID = assignerUUID = thread = None
+ _begin_by = _begin_after = None
+
+ def __init__(self, partial):
+ self.__parent__ = self.partial = partial
+ self.selectedUUIDs = zc.set.Set()
+ self.excludedUUIDs = zc.set.Set()
+
+ @property
+ def begin_after(self):
+ return self._begin_after
+ @rwproperty.setproperty
+ def begin_after(self, value):
+ if self.assignerUUID is not None:
+ raise RuntimeError(
+ 'can only change begin_after before partial is assigned')
+ if value is not None:
+ if value.tzinfo is None:
+ raise ValueError('cannot use timezone-naive values')
+ else:
+ value = value.astimezone(pytz.UTC)
+ self._begin_after = value
+
+ @property
+ def begin_by(self):
+ return self._begin_by
+ @rwproperty.setproperty
+ def begin_by(self, value):
+ if self.partial.state != zc.async.interfaces.PENDING:
+ raise RuntimeError(
+ 'can only change begin_by value of PENDING partial')
+ if value is not None:
+ if value < datetime.timedelta():
+ raise ValueError('negative values are not allowed')
+ self._begin_by = value
+
+KEY = 'zc.async.datamanagerpartial'
+
+
+class DataManagerPartial(persistent.Persistent):
+ zope.interface.implements(zc.async.interfaces.IDataManagerPartial)
+ zope.component.adapts(zc.async.interfaces.IPartial)
+
+ def __init__(self, partial):
+ self._data = partial
+ if KEY not in partial.annotations:
+ partial.annotations[KEY] = DataManagerPartialData(partial)
+ self._extra = partial.annotations[KEY]
+
+ for nm in zc.async.interfaces.IPartial.names(True):
+ if nm == '__parent__':
+ readonly = False
+ else:
+ readonly = True
+ locals()[nm] = TransparentDescriptor('_data', nm, readonly)
+ for nm in ('workerUUID', 'assignerUUID', 'thread', 'begin_after',
+ 'begin_by'):
+ locals()[nm] = TransparentDescriptor('_extra', nm)
+ for nm in ('selectedUUIDs', 'excludedUUIDs'):
+ locals()[nm] = TransparentDescriptor('_extra', nm, True)
+
+ at zope.component.adapter(types.MethodType)
+ at zope.interface.implementer(zc.async.interfaces.IDataManagerPartial)
+def method_to_datamanagerpartial(m):
+ return DataManagerPartial(zc.async.partial.Partial(m))
+
+ at zope.component.adapter(types.FunctionType)
+ at zope.interface.implementer(zc.async.interfaces.IDataManagerPartial)
+def function_to_datamanagerpartial(f):
+ return DataManagerPartial(zc.async.partial.Partial(f))
Property changes on: zc.async/trunk/src/zc/async/adapters.py
___________________________________________________________________
Name: svn:eol-style
+ native
Added: zc.async/trunk/src/zc/async/datamanager.py
===================================================================
--- zc.async/trunk/src/zc/async/datamanager.py 2006-08-15 20:31:28 UTC (rev 69534)
+++ zc.async/trunk/src/zc/async/datamanager.py 2006-08-15 20:33:11 UTC (rev 69535)
@@ -0,0 +1,403 @@
+import datetime
+import bisect
+import pytz
+import persistent
+import ZODB.interfaces
+import BTrees.OOBTree
+import BTrees.Length
+import zope.interface
+import zope.component
+import zope.bforest
+import zc.queue
+
+import zc.async.interfaces
+
+
+def simpleWrapper(name):
+ def wrapper(self, *args, **kwargs):
+ return getattr(self._data, name)(*args, **kwargs)
+ return wrapper
+
+class Workers(persistent.Persistent):
+ zope.interface.implements(zc.async.interfaces.IWorkers)
+
+ def __init__(self):
+ self._data = BTrees.OOBTree.OOBTree()
+
+ for nm in ('__getitem__', 'get', '__len__', 'keys', 'values', 'items',
+ '__contains__', 'maxKey', 'minKey'):
+ locals()[nm] = simpleWrapper(nm)
+
+ def __iter__(self):
+ return iter(self._data)
+
+ def add(self, value):
+ value = zc.async.interfaces.IWorker(value)
+ if value.UUID is None:
+ raise ValueError("worker must have assigned UUID")
+ self._data[value.UUID] = value
+ value.__parent__ = self
+ return value
+
+ def remove(self, UUID):
+ ob = self._data.pop(UUID)
+ ob.__parent__ = None
+
+def cleanDeadWorker(worker):
+ dm = worker.__parent__.__parent__
+ assert zc.async.interfaces.IDataManager.providedBy(dm)
+ for queue, destination in (
+ (worker.thread, dm.thread), (worker.reactor, dm.reactor)):
+ while queue:
+ p = queue[0]
+ del queue[0]
+ if p.state == zc.async.interfaces.PENDING:
+ destination.put(p.__call__) # will wrap it
+ elif p.state == zc.async.interfaces.ACTIVE:
+ destination.put(p.fail)
+ elif p.state == zc.async.interfaces.CALLBACKS:
+ destination.put(p.resumeCallbacks)
+
+
+class PartialQueue(persistent.Persistent):
+ zope.interface.implements(zc.async.interfaces.IPartialQueue)
+
+ def __init__(self, thread):
+ self.thread = thread
+ self._queue = zc.queue.CompositePersistentQueue()
+ self._held = BTrees.OOBTree.OOBTree()
+ self._length = BTrees.Length.Length(0)
+
+ def put(self, item, begin_after=None, begin_by=None):
+ item = zc.async.interfaces.IDataManagerPartial(item)
+ if item.assignerUUID is not None:
+ raise ValueError(
+ 'cannot add already-assigned partial')
+ now = datetime.datetime.now(pytz.UTC)
+ if begin_after is not None:
+ item.begin_after = begin_after
+ elif item.begin_after is None:
+ item.begin_after = now
+ if begin_by is not None:
+ item.begin_by = begin_by
+ elif item.begin_by is None:
+ item.begin_by = datetime.timedelta(hours=1)
+ item.assignerUUID = zope.component.getUtility(
+ zc.async.interfaces.IUUID, 'instance')
+ if item._p_jar is None:
+ # we need to do this if the partial will be stored in another
+ # database as well during this transaction. Also, _held storage
+ # disambiguates against the database_name and the _p_oid.
+ conn = ZODB.interfaces.IConnection(self)
+ conn.add(item)
+ if now == item.begin_after:
+ self._queue.put(item)
+ else:
+ self._held[
+ (item.begin_after,
+ item._p_jar.db().database_name,
+ item._p_oid)] = item
+ item.__parent__ = self
+ self._length.change(1)
+ return item
+
+ def _iter(self):
+ queue = self._queue
+ tree = self._held
+ q = enumerate(queue)
+ t = iter(tree.items())
+ q_pop = queue.pull
+ t_pop = tree.pop
+ def get_next(i):
+ try:
+ next = i.next()
+ except StopIteration:
+ active = False
+ next = (None, None)
+ else:
+ active = True
+ return active, next
+ q_active, (q_index, q_next) = get_next(q)
+ t_active, (t_index, t_next) = get_next(t)
+ while q_active and t_active:
+ if t_next.begin_after <= q_next.begin_after:
+ yield t_pop, t_index, t_next
+ t_active, (t_index, t_next) = get_next(t)
+ else:
+ yield q_pop, q_index, q_next
+ q_active, (q_index, q_next) = get_next(q)
+ if t_active:
+ yield t_pop, t_index, t_next
+ for (t_index, t_next) in t:
+ yield t_pop, t_index, t_next
+ elif q_active:
+ yield q_pop, q_index, q_next
+ for (q_index, q_next) in q:
+ yield q_pop, q_index, q_next
+
+ def pull(self, index=0):
+ if index >= self._length():
+ raise IndexError(index)
+ for i, (pop, ix, next) in enumerate(self._iter()):
+ if i == index:
+ tmp = pop(ix)
+ assert tmp is next
+ self._length.change(-1)
+ return next
+ assert False, 'programmer error: the length appears to be incorrect.'
+
+ def __len__(self):
+ return self._length()
+
+ def __iter__(self):
+ return (next for pop, ix, next in self._iter())
+
+ def __nonzero__(self):
+ return bool(self._length())
+
+ def __getitem__(self, index):
+ if index >= len(self):
+ raise IndexError(index)
+ return zc.queue.getitem(self, index)
+
+ def pullNext(self, uuid):
+ now = datetime.datetime.now(pytz.UTC)
+ for ix, p in enumerate(self.iterDue()):
+ if uuid not in p.excludedUUIDs and (
+ not p.selectedUUIDs or
+ uuid in p.selectedUUIDs):
+ return self.pull(ix)
+ elif (p.begin_after + p.begin_by) < now:
+ res = zc.async.interfaces.IDataManagerPartial(
+ self.pull(ix).fail)
+ res.__parent__ = self
+ res.begin_after = now
+ res.begin_by = datetime.timedelta(hours=1)
+ res.assignerUUID = zope.component.getUtility(
+ zc.async.interfaces.IUUID, 'instance')
+ return res
+
+ def iterDue(self):
+ now = datetime.datetime.now(pytz.UTC)
+ for partial in self:
+ if partial.begin_after > now:
+ break
+ yield partial
+
+
+class DataManager(persistent.Persistent):
+ zope.interface.implements(zc.async.interfaces.IDataManager)
+
+ def __init__(self):
+ self.thread = PartialQueue(True)
+ self.thread.__parent__ = self
+ self.reactor = PartialQueue(False)
+ self.reactor.__parent__ = self
+ self.workers = Workers()
+ self.workers.__parent__ = self
+
+ def _getNextActiveSibling(self, uuid):
+ for worker in self.workers.values(min=uuid, excludemin=True):
+ if worker.engineUUID is not None:
+ return worker
+ for worker in self.workers.values(max=uuid, excludemax=True):
+ if worker.engineUUID is not None:
+ return worker
+
+ def checkSibling(self, uuid):
+ now = datetime.datetime.now(pytz.UTC)
+ next = self._getNextActiveSibling(uuid)
+ if next is not None and ((
+ next.last_ping + next.ping_interval + next.ping_death_interval)
+ < now):
+ # `next` is a dead worker.
+ next.engineUUID = None
+ self.thread.put(zc.async.partial.Partial(cleanDeadWorker, next))
+
+
+class SizedSequence(persistent.Persistent):
+ zope.interface.implements(zc.async.interfaces.ISizedSequence)
+
+ def __init__(self, size):
+ self.size = size
+ self._data = zc.queue.PersistentQueue()
+ self._data.__parent__ = self
+
+ for nm in ('__len__', '__iter__', '__getitem__', '__nonzero__',
+ '_p_resolveConflict'):
+ locals()[nm] = simpleWrapper(nm)
+
+ def add(self, item):
+ if len(self._data) >= self.size:
+ raise zc.async.interfaces.FullError(self)
+ item.__parent__ = self
+ item.workerUUID = self.__parent__.UUID
+ self._data.put(item)
+ return item
+
+ def index(self, item):
+ for ix, i in enumerate(self):
+ if i is item:
+ return ix
+ raise ValueError("%r not in queue" % (item,))
+
+ def remove(self, item):
+ del self[self.index(item)]
+
+ def __delitem__(self, ix):
+ self._data.pull(ix)
+
+
+START = datetime.datetime(2006, 1, 1, tzinfo=pytz.UTC)
+
+def key(item):
+ dt = item.begin_after
+ diff = dt - START
+ return (-diff.days, -diff.seconds, -diff.microseconds,
+ item._p_jar.db().database_name, item._p_oid)
+
+def code(dt):
+ diff = dt - START
+ return (-diff.days, -diff.seconds, -diff.microseconds)
+
+
+class Completed(persistent.Persistent):
+ zope.interface.implements(zc.async.interfaces.ICompletedCollection)
+ # sorts on begin_after from newest to oldest
+
+ __parent__ = None
+
+ def __init__(self,
+ rotation_interval=datetime.timedelta(hours=2),
+ buckets=6):
+ self._data = zope.bforest.OOBForest(count=buckets)
+ self.rotation_interval = rotation_interval
+ self.last_rotation = datetime.datetime.now(pytz.UTC)
+
+ def add(self, item):
+ self._data[key(item)] = item
+ item.__parent__ = self
+
+ def iter(self, start=None, stop=None):
+ sources = []
+ if start is not None:
+ start = code(start)
+ if stop is not None:
+ stop = code(stop)
+ for b in self._data.buckets:
+ i = iter(b.items(start, stop))
+ try:
+ n = i.next()
+ except StopIteration:
+ pass
+ else:
+ sources.append([n, i])
+ sources.sort()
+ length = len(sources)
+ while length > 1:
+ src = sources.pop(0)
+ yield src[0][1]
+ try:
+ src[0] = src[1].next()
+ except StopIteration:
+ length -= 1
+ else:
+ bisect.insort(sources, src) # mildly interesting micro-
+ # optimisation note: this approach shaves off about 1/5 of
+ # an alternative approach that finds the lowest every time
+ # but does not insort.
+ if sources:
+ yield sources[0][0][1]
+ for k, v in sources[0][1]:
+ yield v
+
+ def __iter__(self):
+ return self._data.itervalues() # this takes more memory but the pattern
+ # is typically faster than the custom iter above (for relatively
+ # complete iterations of relatively small sets). The custom iter
+ # has the advantage of the start and stop code.
+
+ def first(self, start=None):
+ original = start
+ if start is not None:
+ start = code(start)
+ minKey = lambda bkt: bkt.minKey(start)
+ else:
+ minKey = lambda bkt: bkt.minKey()
+ i = iter(self._data.buckets)
+ bucket = i.next()
+ try:
+ key = minKey(bucket)
+ except ValueError:
+ key = None
+ for b in i:
+ try:
+ k = minKey(b)
+ except ValueError:
+ continue
+ if key is None or k < key:
+ bucket, key = b, k
+ if key is None:
+ raise ValueError(original)
+ return bucket[key]
+
+ def last(self, stop=None):
+ original = stop
+ if stop is not None:
+ stop = code(stop)
+ maxKey = lambda bkt: bkt.maxKey(stop)
+ else:
+ maxKey = lambda bkt: bkt.maxKey()
+ i = iter(self._data.buckets)
+ bucket = i.next()
+ try:
+ key = maxKey(bucket)
+ except ValueError:
+ key = None
+ for b in i:
+ try:
+ k = maxKey(b)
+ except ValueError:
+ continue
+ if key is None or k > key:
+ bucket, key = b, k
+ if key is None:
+ raise ValueError(original)
+ return bucket[key]
+
+ def __nonzero__(self):
+ for b in self._data.buckets:
+ try:
+ iter(b).next()
+ except StopIteration:
+ pass
+ else:
+ return True
+ return False
+
+ def __len__(self):
+ return len(self._data)
+
+ def rotate(self):
+ self._data.rotateBucket()
+ self.last_rotation = datetime.datetime.now(pytz.UTC)
+
+
+class Worker(persistent.Persistent):
+ zope.interface.implements(zc.async.interfaces.IWorker)
+
+ def __init__(self, UUID, reactor_size=4, thread_size=1, poll_seconds=5,
+ ping_interval=datetime.timedelta(minutes=1),
+ ping_death_interval=datetime.timedelta(seconds=30)):
+ self.reactor = SizedSequence(reactor_size)
+ self.reactor.__parent__ = self
+ self.thread = SizedSequence(thread_size)
+ self.thread.__parent__ = self
+ self.engineUUID = None
+ self.UUID = UUID
+ self.poll_seconds = poll_seconds
+ self.ping_interval = ping_interval
+ self.ping_death_interval = ping_death_interval
+ self.last_ping = datetime.datetime.now(pytz.UTC)
+ self.completed = Completed()
+ self.completed.__parent__ = self
Property changes on: zc.async/trunk/src/zc/async/datamanager.py
___________________________________________________________________
Name: svn:eol-style
+ native
Added: zc.async/trunk/src/zc/async/datamanager.txt
===================================================================
--- zc.async/trunk/src/zc/async/datamanager.txt 2006-08-15 20:31:28 UTC (rev 69534)
+++ zc.async/trunk/src/zc/async/datamanager.txt 2006-08-15 20:33:11 UTC (rev 69535)
@@ -0,0 +1,1222 @@
+The datamanager module contains the queues that zc.async clients use to
+deposit jobs, and the queues that workers use to put jobs they are working on.
+
+The main datamanager object simply has a queue for thread jobs, a queue for
+reactor jobs, and a mapping of workers. It starts out empty [#setUp]_.
+
+ >>> import zc.async.datamanager
+ >>> dm = root['zc.async.datamanager'] = zc.async.datamanager.DataManager()
+ >>> import transaction
+ >>> transaction.commit()
+ >>> len(dm.workers)
+ 0
+ >>> len(dm.thread)
+ 0
+ >>> len(dm.reactor)
+ 0
+
+As shown in the README.txt of this package, the data manager will typically
+be registered as an adapter to persistent objects that provides
+zc.async.interfaces.IDataManager [#verify]_.
+
+Workers
+=======
+
+When it is installed, workers register themselves. Workers typically
+get their UUID from the instanceuuid module in this package, but we will
+generate our own here.
+
+ >>> import uuid
+ >>> worker1 = zc.async.datamanager.Worker(uuid.uuid1())
+ >>> res = dm.workers.add(worker1)
+ >>> dm.workers[worker1.UUID] is worker1
+ True
+ >>> res is worker1
+ True
+
+The `workers` object has a mapping read API, with `items`, `values`, `keys`,
+`__len__`, `__getitem__`, and `get` [#check_workers_mapping]_. You remove
+workers with their UUID [#check_UUID_equivalence]_.
+
+ >>> dm.workers.remove(worker1.UUID)
+ >>> len(dm.workers)
+ 0
+
+Let's add the worker back. Notice that the __parent__ is None when it is out
+of the workers, but set to the workers object when it is inside. Since the
+workers object also has a __parent__ reference to its parent, the data manager,
+the worker has a link back to the datamanager.
+
+ >>> worker1.__parent__ # None
+ >>> res = dm.workers.add(worker1)
+ >>> worker1.__parent__ is dm.workers
+ True
+ >>> dm.workers.__parent__ is dm
+ True
+
+Each worker has several other attributes. We'll look at four now:
+`UUID`, which we have already seen; `thread`, a sequence of the thread
+jobs the worker is working on; `reactor`, a sequence of the reactor jobs
+the worker is working on; and `engineUUID`, a uuid of the engine that is
+in charge of running the worker, if any [#verify_worker]_.
+
+The two sequences are unusual in that they are sized: if len(sequence)
+== size, trying to put another item in the sequence raises
+zc.async.interfaces.FullError. By default, workers have a reactor size
+of 4, and a thread size of 1.
+
+ >>> worker1.thread.size
+ 1
+ >>> worker1.reactor.size
+ 4
+ >>> def multiply(*args):
+ ... res = 1
+ ... for a in args:
+ ... res *= a
+ ... return res
+ ...
+ >>> import zc.async.partial
+ >>> p1 = zc.async.partial.Partial(multiply, 2, 3)
+ >>> res = worker1.thread.add(p1)
+ >>> len(worker1.thread)
+ 1
+ >>> p2 = zc.async.partial.Partial(multiply, 5, 6)
+ >>> worker1.thread.add(p2)
+ ... # doctest: +ELLIPSIS
+ Traceback (most recent call last):
+ ...
+ FullError: <zc.async.datamanager.SizedSequence object at ...>
+
+You can change the queue size.
+
+ >>> worker1.thread.size = 2
+ >>> res = worker1.thread.add(p2)
+ >>> len(worker1.thread)
+ 2
+
+Decreasing it beyond the current len is acceptable, and will only affect
+how many partials must be removed before new ones may be added.
+
+ >>> worker1.thread.size = 1
+ >>> len(worker1.thread)
+ 2
+
+You can also set it during instantiation of a worker: `reactor_size` and
+`thread_size` are optional arguments.
+
+ >>> worker2 = zc.async.datamanager.Worker(uuid.uuid1(), 2, 1)
+ >>> worker2.reactor.size
+ 2
+ >>> worker2.thread.size
+ 1
+
+We'll add the second worker to the data manager.
+
+ >>> res = dm.workers.add(worker2)
+ >>> len(dm.workers)
+ 2
+
+Engines claim workers by putting their UUID on them. Initially a worker has
+no engineUUID. We'll assign two (arbitrary) UUIDs.
+
+ >>> worker1.engineUUID
+ >>> worker1.engineUUID = uuid.uuid4()
+ >>> worker2.engineUUID = uuid.uuid4()
+
+This indicates that both workers are "open for business". A worker without an
+engine is a dead husk.
+
+We'll look at partials in workers more a little later
+[#remove_partials]_. Next we're going to look at partials in the
+data manager queues.
+
+Partials
+========
+
+Once a Zope has started, it will typically have at least one worker installed,
+with one virtual loop per worker checking the data manager for new jobs (see
+README.txt for integration examples). Now client code can start requesting
+that partials be done.
+
+Basic Story
+-----------
+
+Simplest use is to get the data manager and add a callable. As
+mentioned above, and demonstrated in README.txt, the typical way to get
+the data manager is to adapt a persistent context to IDataManager. We'll
+assume we already have the data manager, and that a utility providing
+zc.async.interfaces.IUUID named 'instance' is available
+[#setUp_UUID_utility]_.
+
+ >>> def send_message():
+ ... print "imagine this sent a message to another machine"
+ ...
+ >>> p = dm.thread.put(send_message)
+
+Now p is a partial wrapping the send_message call. It is specifically a
+data manager partial [#basic_data_manager_partial_checks]_.
+
+ >>> p.callable is send_message
+ True
+ >>> zc.async.interfaces.IDataManagerPartial.providedBy(p)
+ True
+
+The IDataManagerPartial interface extends IPartial and describes the
+interface needed for a partial added to a data manager. Here are the
+attributes on the interface.
+
+- Set automatically:
+
+ * assignerUUID (the UUID of the software instance that put the partial in
+ the queue)
+
+ * workerUUID (the UUID of the worker who claimed the partial)
+
+ * thread (None or bool: whether the partial was assigned to a thread (True)
+ or reactor (False) queue)
+
+- Potentially set by user, not honored for callbacks:
+
+ * selectedUUIDs (the UUIDs of workers that should work on the partial, as
+ selected by user)
+
+ * excludedUUIDs (the UUIDs of workers that should not work on the partial,
+ as selected by user)
+
+ * begin_after (a datetime.datetime with pytz.UTC timezone that specifies a
+ date and time to wait till running the partial; defaults to creation
+ time)
+
+ * begin_by (a datetime.timedelta of a duration after begin_after after
+ which workers should call `fail` on the partial; defaults to one hour)
+
+These are described in some more detail on the IDataManagerPartial
+interface.
+
+The thread queue contains the partial.
+
+ >>> len(dm.thread)
+ 1
+ >>> list(dm.thread) == [p]
+ True
+
+If you ask the data manager for all due jobs, it also includes the partial.
+
+ >>> list(dm.thread.iterDue()) == [p]
+ True
+
+The partial knows its __parent__ and can be used to obtain its data manager.
+
+ >>> zc.async.interfaces.IDataManager(p) is dm
+ True
+ >>> p.__parent__ is dm.thread
+ True
+
+The easiest for a worker to get a task is to call pullNext, passing its
+UUID. It will get the next available task that does not exclude it (and
+that includes it), removing it from the queue. If nothing is available,
+return None.
+
+ >>> res = dm.thread.pullNext(worker1.UUID)
+ >>> res is p
+ True
+ >>> len(dm.thread)
+ 0
+
+Once a partial has been put in a data manager, it is "claimed": trying to
+put it in another one (or back in the same one) will raise an error.
+
+ >>> dm.thread.put(p)
+ Traceback (most recent call last):
+ ...
+ ValueError: cannot add already-assigned partial
+
+If we remove the assignerUUID, we can put it back in.
+
+ >>> p.assignerUUID = None
+ >>> res = dm.thread.put(p)
+ >>> res is p
+ True
+ >>> len(dm.thread)
+ 1
+ >>> transaction.commit()
+
+In normal behavior, after client code has put the task in the thread queue,
+an engine (associated with a persistent worker in a one-to-one relationship,
+in which the worker is the persistent store for the transient, per-process
+engine) will claim and perform it like this (we'll do this from the
+perspective of worker 1).
+
+ >>> trans = transaction.begin()
+ >>> import Queue
+ >>> thread_queue = Queue.Queue(0)
+ >>> claimed = dm.workers[worker1.UUID].thread
+ >>> ct = 0
+ >>> while 1:
+ ... if len(claimed) < claimed.size:
+ ... next = dm.thread.pullNext(worker1.UUID)
+ ... if next is not None:
+ ... claimed.add(next)
+ ... database_name = next._p_jar.db().database_name
+ ... identifier = next._p_oid
+ ... try:
+ ... transaction.commit()
+ ... except ZODB.POSException.TransactionError:
+ ... transaction.abort()
+ ... ct += 1
+ ... if ct < 5: # in twisted, this would probably callLater
+ ... continue
+ ... else:
+ ... thread_queue.put((database_name, identifier))
+ ... ct = 0
+ ... continue # in twisted, this would probably callLater
+ ... break
+ ... # doctest: +ELLIPSIS
+ ...
+ <zc.async.adapters.DataManagerPartial object at ...>
+
+Now the worker 1 has claimed it.
+
+ >>> len(worker1.thread)
+ 1
+
+A thread in that worker will begin it,
+given the database name and _p_oid of the partial it should perform, and will
+do something like this.
+
+ >>> import thread
+ >>> database_name, identifier = thread_queue.get(False)
+ >>> claimed = dm.workers[worker1.UUID].thread # this would actually open a
+ ... # connection and get the worker thread queue object by id
+ >>> for p in claimed:
+ ... if (p._p_oid == identifier and
+ ... p._p_jar.db().database_name == database_name):
+ ... p.thread = thread.get_ident()
+ ... transaction.commit()
+ ... try:
+ ... p()
+ ... except ZODB.POSException.TransactionError:
+ ... transaction.abort()
+ ... p.fail()
+ ... while 1:
+ ... try:
+ ... claimed.remove(p)
+ ... claimed.__parent__.completed.add(p)
+ ... transaction.commit()
+ ... except ZODB.POSException.TransactionError:
+ ... transaction.abort() # retry forever!
+ ... else:
+ ... break
+ ... break
+ ...
+ imagine this sent a message to another machine
+
+And look, there's our message: the partial was called.
+
+The worker's thread list is empty, and the partial has a note of what thread
+ran it.
+
+ >>> len(worker1.thread)
+ 0
+ >>> p.thread == thread.get_ident()
+ True
+
+Notice also that the `completed` container now contains the partial.
+
+ >>> len(worker1.completed)
+ 1
+ >>> list(worker1.completed) == [p]
+ True
+
+The API of the completed container is still in flux [#test_completed]_.
+
+For Reactors
+------------
+
+If you are into Twisted programming, use the reactor queue. The story is
+very similar, so we'll go a bit quicker.
+
+ >>> import twisted.internet.defer
+ >>> import twisted.internet.reactor
+ >>> def twistedPartDeux(d):
+ ... d.callback(42)
+ ...
+ >>> def doSomethingInTwisted():
+ ... d = twisted.internet.defer.Deferred()
+ ... twisted.internet.reactor.callLater(0, twistedPartDeux, d)
+ ... return d
+ ...
+ >>> p = dm.reactor.put(doSomethingInTwisted)
+ >>> def arbitraryThingThatNeedsAConnection(folder, result):
+ ... folder['result'] = result
+ ...
+ >>> p_callback = p.addCallbacks(zc.async.partial.Partial(
+ ... arbitraryThingThatNeedsAConnection, root))
+ >>> transaction.commit()
+
+The engine might do something like this [#set_up_reactor]_.
+
+ >>> import zc.twist
+ >>> def remove(container, partial, result):
+ ... container.remove(partial)
+ ...
+ >>> def perform(p):
+ ... res = p()
+ ... p.addCallback(zc.async.partial.Partial(
+ ... remove, p.__parent__, p))
+ ... transaction.commit()
+ ...
+ >>> trans = transaction.begin()
+ >>> claimed = dm.workers[worker2.UUID].reactor
+ >>> ct = 0
+ >>> while 1:
+ ... if len(claimed) < claimed.size:
+ ... next = dm.reactor.pullNext(worker2.UUID)
+ ... if next is not None:
+ ... claimed.add(next)
+ ... partial = zc.twist.Partial(perform, next)
+ ... try:
+ ... transaction.commit()
+ ... except ZODB.POSException.TransactionError:
+ ... transaction.abort()
+ ... ct += 1
+ ... if ct < 5: # this would probably callLater really
+ ... continue
+ ... else:
+ ... twisted.internet.reactor.callLater(0, partial)
+ ... ct = 0
+ ... continue # this would probably callLater really
+ ... break
+ ... # doctest: +ELLIPSIS
+ ...
+ <zc.async.adapters.DataManagerPartial object at ...>
+
+Then the reactor would churn, and eventually we'd get our result. The
+execution should be something like this (where `time_passes` represents
+one tick of the Twisted reactor that you would normally not have to call
+explicitly--this is just for demonstration purposes).
+
+ >>> time_passes() # perform and doSomethingInTwisted
+ True
+ >>> trans = transaction.begin()
+ >>> p.result # None
+ >>> len(worker2.reactor)
+ 1
+ >>> time_passes() # twistedPartDeux and arbitraryThingThatNeedsAConnection
+ True
+ >>> trans = transaction.begin()
+ >>> p.result
+ 42
+ >>> root['result']
+ 42
+ >>> p.state == zc.async.interfaces.COMPLETED
+ True
+ >>> len(worker2.reactor)
+ 0
+
+[#tear_down_reactor]_
+
+Held Calls
+----------
+
+Both of the examples so far request that jobs be done as soon as possible.
+It's also possible to request that jobs be done later. Let's assume we
+can control the current time generated by datetime.datetime.now with a
+`set_now` callable [#set_up_datetime]_. A partial added without any special
+calls gets a `begin_after` attribute of now.
+
+ >>> import datetime
+ >>> import pytz
+ >>> datetime.datetime.now(pytz.UTC)
+ datetime.datetime(2006, 8, 10, 15, 44, 22, 211, tzinfo=<UTC>)
+ >>> res1 = dm.thread.put(
+ ... zc.async.partial.Partial(multiply, 3, 6))
+ ...
+ >>> res1.begin_after
+ datetime.datetime(2006, 8, 10, 15, 44, 22, 211, tzinfo=<UTC>)
+
+This means that it's immediately ready to be performed. `iterDue` shows this.
+
+ >>> list(dm.thread.iterDue()) == [res1]
+ True
+
+You can also specify a begin_after date when you make the call. Then it
+isn't due immediately.
+
+ >>> res2 = dm.thread.put(
+ ... zc.async.partial.Partial(multiply, 4, 6),
+ ... datetime.datetime(2006, 8, 10, 16, tzinfo=pytz.UTC))
+ ...
+ >>> len(dm.thread)
+ 2
+ >>> res2.begin_after
+ datetime.datetime(2006, 8, 10, 16, 0, tzinfo=<UTC>)
+ >>> list(dm.thread.iterDue()) == [res1]
+ True
+
+When the time passes, it is available. Partials are ordered by their
+begin_after dates.
+
+ >>> set_now(datetime.datetime(2006, 8, 10, 16, 0, 0, 1, tzinfo=pytz.UTC))
+ >>> list(dm.thread.iterDue()) == [res1, res2]
+ True
+
+Pre-dating (before now) makes the item come first (or in order with other
+pre-dated items.
+
+ >>> res3 = dm.thread.put(
+ ... zc.async.partial.Partial(multiply, 5, 6),
+ ... begin_after=datetime.datetime(2006, 8, 10, 15, 35, tzinfo=pytz.UTC))
+ ...
+ >>> res3.begin_after
+ datetime.datetime(2006, 8, 10, 15, 35, tzinfo=<UTC>)
+ >>> list(dm.thread) == [res3, res1, res2]
+ True
+ >>> list(dm.thread.iterDue()) == [res3, res1, res2]
+ True
+
+Other timezones are normalized to UTC.
+
+ >>> res4 = dm.thread.put(
+ ... zc.async.partial.Partial(multiply, 6, 6),
+ ... pytz.timezone('EST').localize(
+ ... datetime.datetime(2006, 8, 10, 11, 30)))
+ ...
+ >>> res4.begin_after
+ datetime.datetime(2006, 8, 10, 16, 30, tzinfo=<UTC>)
+ >>> list(dm.thread.iterDue()) == [res3, res1, res2]
+ True
+
+Naive timezones are not allowed.
+
+ >>> dm.thread.put(send_message, datetime.datetime(2006, 8, 10, 16, 15))
+ Traceback (most recent call last):
+ ...
+ ValueError: cannot use timezone-naive values
+
+Iteration, again, is based on begin_after, not the order added.
+
+ >>> res5 = dm.thread.put(
+ ... zc.async.partial.Partial(multiply, 7, 6),
+ ... datetime.datetime(2006, 8, 10, 16, 15, tzinfo=pytz.UTC))
+ ...
+ >>> list(dm.thread) == [res3, res1, res2, res5, res4]
+ True
+ >>> list(dm.thread.iterDue()) == [res3, res1, res2]
+ True
+
+pullNext only returns items that are due.
+
+ >>> dm.thread.pullNext(worker1.UUID) == res3
+ True
+ >>> dm.thread.pullNext(worker1.UUID) == res1
+ True
+ >>> dm.thread.pullNext(worker1.UUID) == res2
+ True
+ >>> dm.thread.pullNext(worker1.UUID) # None
+
+When it is due, begin_after also affects pullNext.
+
+ >>> set_now(datetime.datetime(2006, 8, 10, 16, 31, tzinfo=pytz.UTC))
+ >>> dm.thread.pullNext(worker1.UUID) == res5
+ True
+ >>> dm.thread.pullNext(worker1.UUID) == res4
+ True
+ >>> dm.thread.pullNext(worker1.UUID) # None
+
+Selecting and Excluding Workers
+-------------------------------
+
+Some use cases want to limit the workers that can perform a given partial,
+either by explicitly selecting or excluding certain workers. Here are some of
+those use cases:
+
+- You may want to divide up your workers by tasks: certain long running tasks
+ should only tie up one set of workers, so short tasks that users expect
+ more responsiveness from can use any available worker, including some that
+ are reserved for them.
+
+- You may only have the system resources necessary to perform a given task on
+ a certain set of your workers.
+
+- You may want to broadcast a message to all workers, so you need to generate
+ a task specifically for each. This would be an interesting way to build
+ a simple but potentially powerful WSGI reverse proxy that supported
+ invalidations, for instance.
+
+There are probably more, and even these have interesting variants. For
+instance, for the second, what if you don't know which workers are
+appropriate and need to test to find out? You could write a partial
+that contains a partial. The outer partial's job is to find a worker
+with an environment appropriate for the inner one. When it runs, if the
+worker's environment is appropriate, it performs the inner partial. If
+it is not, it creates a new outer partial wrapping its inner partial,
+specifies the current worker UUID as excluded, and schedules it to be
+called. If the next worker is also inappropriate, in creates a third
+outer wrapper that excludes both of the failed workers' UUIDs...and so
+on.
+
+To do this, you use worker UUIDs in a partial's selectedUUIDs and
+excludedUUIDs sets. An empty selectedUUIDs set is interpreted as a
+catch-all. For a worker to be able to perform a partial, it must not
+be in the excludedUUIDs and either selectedUUIDs is empty or it is within
+selectedUUIDs.
+
+Let's look at some examples. We'll assume the five partials we looked at
+above are all back in the dm.threads [#reinstate]_. We've already looked
+at partials with empty sets for excludedUUIDs and selectedUUIDs: every
+partial we've shown up to now has fit that description.
+
+(order is [res3, res1, res2, res5, res4])
+
+ >>> res3.selectedUUIDs.add(uuid.uuid1()) # nobody here
+ >>> res1.selectedUUIDs.add(worker1.UUID)
+ >>> res2.selectedUUIDs.update((worker1.UUID, worker2.UUID))
+ >>> res2.excludedUUIDs.add(worker2.UUID)
+ >>> res5.excludedUUIDs.add(worker2.UUID)
+ >>> res4.excludedUUIDs.update((worker2.UUID, worker1.UUID))
+
+Now poor worker2 can't get any work.
+
+ >>> dm.thread.pullNext(worker2.UUID) # None
+
+worker1 can get three of them: res1, res2, and res 5.
+
+ >>> dm.thread.pullNext(worker1.UUID) is res1
+ True
+ >>> dm.thread.pullNext(worker1.UUID) is res2
+ True
+ >>> dm.thread.pullNext(worker1.UUID) is res5
+ True
+ >>> dm.thread.pullNext(worker1.UUID) # None
+
+Now we have two jobs that can never be claimed (as long as we have only
+these two workers and the selected/excluded values are not changed.
+What happens to them?
+
+Never-Claimed Calls
+-------------------
+
+Sometimes, due to impossible worker selections or exclusions, or simply
+workers that are too busy, we need to give up and say that a given partial
+will not be run, and should fail. Partials have a begin_by attribute which
+controls approximately when this should happen. The begin_by value should
+be a non-negative datetime.timedelta, which is added to the begin_after value
+to determine when the partial should fail. If you put a partial in a
+data manager with no begin_by value set, the data manager sets it to one hour.
+
+ >>> res3.begin_by
+ datetime.timedelta(0, 3600)
+
+This can be changed.
+
+ >>> res3.begin_by = datetime.timedelta(hours=2)
+
+So how are they cancelled? `pullNext` will wrap any expired partial with
+another partial that calls the inner `fail` method. This will be handed to
+any worker.
+
+ >>> res3.begin_after + res3.begin_by
+ datetime.datetime(2006, 8, 10, 17, 35, tzinfo=<UTC>)
+ >>> res4.begin_after + res4.begin_by
+ datetime.datetime(2006, 8, 10, 17, 30, tzinfo=<UTC>)
+ >>> set_now(datetime.datetime(2006, 8, 10, 17, 32, tzinfo=pytz.UTC))
+ >>> p = dm.thread.pullNext(worker1.UUID)
+ >>> p()
+ >>> res4.state == zc.async.interfaces.COMPLETED
+ True
+ >>> print res4.result.getTraceback()
+ ... # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
+ Traceback (most recent call last):
+ ...
+ zc.async.interfaces.AbortedError:
+ >>> dm.thread.pullNext(worker1.UUID) # None
+
+ >>> set_now(datetime.datetime(2006, 8, 10, 17, 37, tzinfo=pytz.UTC))
+ >>> p = dm.thread.pullNext(worker1.UUID)
+ >>> p()
+ >>> res3.state == zc.async.interfaces.COMPLETED
+ True
+ >>> print res3.result.getTraceback()
+ ... # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
+ Traceback (most recent call last):
+ ...
+ zc.async.interfaces.AbortedError:
+ >>> dm.thread.pullNext(worker1.UUID) # None
+ >>> len(dm.thread)
+ 0
+
+Dead Workers
+------------
+
+What happens when an engine, driving a worker, dies? If it is the only
+engine/worker, that's the end: when the worker restarts it should clean
+out its old worker object and then proceed. But what if there are more
+than one simultaneous worker? How do we know to clean out the dead
+workers jobs?
+
+In addition to the jobs listed above, each worker virtual main loop has
+an additional task: be his brother's keeper. Each must update a ping
+date on its worker object at a given maximum interval, and check the
+next sibling. To support this story, the worker objects have a few more
+attributes we haven't talked about: `poll_seconds`, `ping_interval`,
+`ping_death_interval` and `last_ping`.
+
+ >>> worker1.poll_seconds
+ 5
+ >>> worker1.ping_interval
+ datetime.timedelta(0, 60)
+ >>> worker1.ping_death_interval
+ datetime.timedelta(0, 30)
+
+`pullNext` on a thread queue will return a partial to clean a worker when the
+next highest one by UUID (circling around to the lowest one when the uuid
+is the highest). So let's set a ping on worker1.
+
+ >>> worker1.last_ping = datetime.datetime.now(pytz.UTC)
+ >>> worker1.last_ping
+ datetime.datetime(2006, 8, 10, 17, 37, tzinfo=<UTC>)
+
+Let's put res1, res2, res3, res4, and res5 in worker1.
+
+ >>> len(worker1.thread)
+ 0
+ >>> worker1.thread.size = 3
+ >>> res1.excludedUUIDs.clear()
+ >>> res1.selectedUUIDs.clear()
+ >>> r = worker1.thread.add(res1) # PENDING
+ >>> res2._state = zc.async.interfaces.ACTIVE
+ >>> r = worker1.reactor.add(res2)
+ >>> r = worker1.thread.add(res3) # COMPLETED
+ >>> r = worker1.reactor.add(res4) # COMPLETED
+ >>> res5._state = zc.async.interfaces.CALLBACKS
+ >>> res5._result = res5.callable(*res5.args, **dict(res5.kwargs))
+ >>> r = worker1.thread.add(res5)
+
+While we are still within our acceptable time period, the `checkSibling`
+method will not do anything.
+
+ >>> len(dm.workers)
+ 2
+ >>> len(dm.thread)
+ 0
+ >>> set_now(worker1.last_ping + worker1.ping_interval)
+ >>> dm.checkSibling(worker2.UUID)
+ >>> len(dm.workers)
+ 2
+ >>> len(dm.thread)
+ 0
+
+We need to move now to after last_ping + ping_interval +
+ping_death_interval. Now when worker2 calls checkSibling on the data
+manager, worker1 will have engineUUID set to None, and a partial will be
+added to clean out the partials in worker 1.
+
+ >>> set_now(worker1.last_ping + worker1.ping_interval +
+ ... worker1.ping_death_interval + datetime.timedelta(seconds=1))
+ >>> worker1.engineUUID is not None
+ True
+ >>> worker2.engineUUID is not None
+ True
+ >>> dm.checkSibling(worker2.UUID)
+ >>> len(dm.workers)
+ 2
+ >>> worker1.engineUUID is not None
+ False
+ >>> worker2.engineUUID is not None
+ True
+ >>> len(dm.thread)
+ 1
+
+So worker2 can get the job and perform it.
+
+ >>> res = dm.thread.pullNext(worker2.UUID)
+ >>> partial = worker2.thread.add(res)
+ >>> len(worker1.thread)
+ 3
+ >>> len(worker1.reactor)
+ 2
+ >>> res()
+ >>> len(worker1.thread)
+ 0
+ >>> len(worker1.reactor)
+ 0
+ >>> r = dm.thread.pullNext(worker2.UUID)()
+ >>> r = dm.thread.pullNext(worker2.UUID)()
+ >>> r = dm.reactor.pullNext(worker2.UUID)()
+ >>> dm.thread.pullNext(worker2.UUID) # None
+ >>> dm.reactor.pullNext(worker2.UUID) # None
+
+ >>> res1.state == zc.async.interfaces.COMPLETED
+ True
+ >>> res2.state == zc.async.interfaces.COMPLETED
+ True
+ >>> res3.state == zc.async.interfaces.COMPLETED
+ True
+ >>> res4.state == zc.async.interfaces.COMPLETED
+ True
+ >>> res5.state == zc.async.interfaces.COMPLETED
+ True
+
+If you have multiple workers, it is strongly suggested that you get the
+associated servers connected to a shared time server.
+
+[#tear_down_datetime]_
+
+=========
+Footnotes
+=========
+
+.. [#setUp] We'll actually create the state that the text needs here.
+
+ >>> from ZODB.tests.util import DB
+ >>> db = DB()
+ >>> conn = db.open()
+ >>> root = conn.root()
+
+ You must have two adapter registrations: IConnection to
+ ITransactionManager, and IPersistent to IConnection. We will also
+ register IPersistent to ITransactionManager because the adapter is
+ designed for it.
+
+ >>> from zc.twist import transactionManager, connection
+ >>> import zope.component
+ >>> zope.component.provideAdapter(transactionManager)
+ >>> zope.component.provideAdapter(connection)
+ >>> import ZODB.interfaces
+ >>> zope.component.provideAdapter(
+ ... transactionManager, adapts=(ZODB.interfaces.IConnection,))
+
+ We need to be able to get data manager partials for functions and methods;
+ normal partials for functions and methods; and a data manager for a partial.
+ Here are the necessary registrations.
+
+ >>> import zope.component
+ >>> import types
+ >>> import zc.async.interfaces
+ >>> import zc.async.partial
+ >>> import zc.async.adapters
+ >>> zope.component.provideAdapter(
+ ... zc.async.adapters.method_to_datamanagerpartial)
+ >>> zope.component.provideAdapter(
+ ... zc.async.adapters.function_to_datamanagerpartial)
+ >>> zope.component.provideAdapter( # partial -> datamanagerpartial
+ ... zc.async.adapters.DataManagerPartial,
+ ... provides=zc.async.interfaces.IDataManagerPartial)
+ >>> zope.component.provideAdapter(
+ ... zc.async.adapters.partial_to_datamanager)
+ >>> zope.component.provideAdapter(
+ ... zc.async.partial.Partial,
+ ... adapts=(types.FunctionType,),
+ ... provides=zc.async.interfaces.IPartial)
+ >>> zope.component.provideAdapter(
+ ... zc.async.partial.Partial,
+ ... adapts=(types.MethodType,),
+ ... provides=zc.async.interfaces.IPartial)
+ ...
+
+.. [#verify] Verify data manager interface.
+
+ >>> from zope.interface.verify import verifyObject
+ >>> verifyObject(zc.async.interfaces.IDataManager, dm)
+ True
+ >>> verifyObject(zc.async.interfaces.IPartialQueue, dm.thread)
+ True
+ >>> verifyObject(zc.async.interfaces.IPartialQueue, dm.reactor)
+ True
+ >>> verifyObject(zc.async.interfaces.IWorkers, dm.workers)
+ True
+
+.. [#check_workers_mapping]
+
+ >>> len(dm.workers)
+ 1
+ >>> list(dm.workers.keys()) == [worker1.UUID]
+ True
+ >>> list(dm.workers) == [worker1.UUID]
+ True
+ >>> list(dm.workers.values()) == [worker1]
+ True
+ >>> list(dm.workers.items()) == [(worker1.UUID, worker1)]
+ True
+ >>> dm.workers.get(worker1.UUID) is worker1
+ True
+ >>> dm.workers.get(2) is None
+ True
+ >>> dm.workers[worker1.UUID] is worker1
+ True
+ >>> dm.workers[2]
+ Traceback (most recent call last):
+ ...
+ KeyError: 2
+
+.. [#check_UUID_equivalence] This is paranoid--it should be the responsibility
+ of the uuid.UUID class--but we'll check it anyway.
+
+ >>> equivalent_UUID = uuid.UUID(bytes=worker1.UUID.bytes)
+ >>> dm.workers[equivalent_UUID] is worker1
+ True
+ >>> dm.workers.remove(equivalent_UUID)
+ >>> len(dm.workers)
+ 0
+ >>> res = dm.workers.add(worker1)
+
+.. [#verify_worker]
+
+ >>> verifyObject(zc.async.interfaces.IWorker, worker1)
+ True
+ >>> verifyObject(zc.async.interfaces.ISizedSequence, worker1.thread)
+ True
+ >>> verifyObject(zc.async.interfaces.ISizedSequence, worker1.reactor)
+ True
+ >>> isinstance(worker1.UUID, uuid.UUID)
+ True
+
+.. [#remove_partials] We can remove our partials from a worker with
+ `remove`.
+
+ >>> worker1.thread.remove(p1)
+ >>> len(worker1.thread)
+ 1
+ >>> list(worker1.thread) == [p2]
+ True
+ >>> worker1.thread.remove(p2)
+ >>> len(worker1.thread)
+ 0
+
+ The remove method of the worker thread and reactor sequences
+ raises IndexError if you ask for the index of something that isn't
+ contained.
+
+ >>> worker1.thread.remove((2, 4)) # an iterable can surprise some
+ ... # naive string replacements, so we use this to verify we didn't
+ ... # fall into that trap.
+ Traceback (most recent call last):
+ ...
+ ValueError: (2, 4) not in queue
+
+.. [#setUp_UUID_utility] We need to provide an IUUID utility that
+ identifies the current instance.
+
+ >>> import uuid
+ >>> zope.interface.classImplements(uuid.UUID, zc.async.interfaces.IUUID)
+ >>> zope.component.provideUtility(
+ ... worker1.UUID, zc.async.interfaces.IUUID, 'instance')
+
+ Normally this would be the UUID instance in zc.async.instanceuuid.
+
+ While we're at it, we'll get "now" so we can compare it in the footnote
+ below.
+
+ >>> import datetime
+ >>> import pytz
+ >>> _before = datetime.datetime.now(pytz.UTC)
+
+.. [#basic_data_manager_partial_checks] Even though the functionality checks
+ belong elsewhere, here are a few default checks for the values.
+
+ >>> verifyObject(zc.async.interfaces.IDataManagerPartial, p)
+ True
+ >>> p.workerUUID # None
+ >>> isinstance(p.assignerUUID, uuid.UUID)
+ True
+ >>> p.selectedUUIDs
+ zc.set.Set([])
+ >>> p.excludedUUIDs
+ zc.set.Set([])
+ >>> _before <= p.begin_after <= datetime.datetime.now(pytz.UTC)
+ True
+ >>> p.begin_by
+ datetime.timedelta(0, 3600)
+ >>> p.thread # None
+
+.. [#test_completed] Here are some nitty-gritty tests of the completed
+ container.
+
+ >>> verifyObject(zc.async.interfaces.ICompletedCollection,
+ ... worker1.completed)
+ True
+ >>> bool(worker1.completed)
+ True
+ >>> len(worker2.completed)
+ 0
+ >>> bool(worker2.completed)
+ False
+ >>> list(worker1.completed.iter()) == [p]
+ True
+ >>> list(worker1.completed.iter(p.begin_after)) == [p]
+ True
+ >>> list(worker1.completed.iter(
+ ... p.begin_after - datetime.timedelta(seconds=1))) == []
+ True
+ >>> list(worker2.completed) == []
+ True
+ >>> list(worker2.completed.iter()) == []
+ True
+ >>> worker1.completed.first() is p
+ True
+ >>> worker1.completed.last() is p
+ True
+ >>> worker2.completed.first()
+ Traceback (most recent call last):
+ ...
+ ValueError: None
+ >>> worker2.completed.last()
+ Traceback (most recent call last):
+ ...
+ ValueError: None
+ >>> worker1.completed.rotate()
+ >>> worker1.completed.first() is p
+ True
+ >>> worker1.completed.rotate()
+ >>> worker1.completed.first() is p
+ True
+ >>> worker1.completed.rotate()
+ >>> worker1.completed.first() is p
+ True
+ >>> worker1.completed.rotate()
+ >>> worker1.completed.first() is p
+ True
+ >>> worker1.completed.rotate()
+ >>> worker1.completed.first() is p
+ True
+ >>> worker1.completed.rotate()
+ >>> worker1.completed.first()
+ Traceback (most recent call last):
+ ...
+ ValueError: None
+
+ Let's look at the completed collection with a few more partials in it.
+ The rotation means we can look at its behavior as the underlying buckets
+ are rotated out.
+
+ >>> root['coll_p0'] = zc.async.interfaces.IDataManagerPartial(send_message)
+ >>> root['coll_p1'] = zc.async.interfaces.IDataManagerPartial(send_message)
+ >>> root['coll_p2'] = zc.async.interfaces.IDataManagerPartial(send_message)
+ >>> root['coll_p3'] = zc.async.interfaces.IDataManagerPartial(send_message)
+ >>> root['coll_p4'] = zc.async.interfaces.IDataManagerPartial(send_message)
+ >>> root['coll_p5'] = zc.async.interfaces.IDataManagerPartial(send_message)
+ >>> root['coll_p6'] = zc.async.interfaces.IDataManagerPartial(send_message)
+ >>> root['coll_p7'] = zc.async.interfaces.IDataManagerPartial(send_message)
+ >>> root['coll_p8'] = zc.async.interfaces.IDataManagerPartial(send_message)
+ >>> root['coll_p9'] = zc.async.interfaces.IDataManagerPartial(send_message)
+ >>> root['coll_p0'].begin_after = datetime.datetime(
+ ... 2006, 1, 1, tzinfo=pytz.UTC)
+ >>> root['coll_p1'].begin_after = datetime.datetime(
+ ... 2006, 2, 1, tzinfo=pytz.UTC)
+ >>> root['coll_p2'].begin_after = datetime.datetime(
+ ... 2006, 3, 1, tzinfo=pytz.UTC)
+ >>> root['coll_p3'].begin_after = datetime.datetime(
+ ... 2006, 4, 1, tzinfo=pytz.UTC)
+ >>> root['coll_p4'].begin_after = datetime.datetime(
+ ... 2006, 5, 1, tzinfo=pytz.UTC)
+ >>> root['coll_p5'].begin_after = datetime.datetime(
+ ... 2006, 6, 1, tzinfo=pytz.UTC)
+ >>> root['coll_p6'].begin_after = datetime.datetime(
+ ... 2006, 7, 1, tzinfo=pytz.UTC)
+ >>> root['coll_p7'].begin_after = datetime.datetime(
+ ... 2006, 8, 1, tzinfo=pytz.UTC)
+ >>> root['coll_p8'].begin_after = datetime.datetime(
+ ... 2006, 8, 2, tzinfo=pytz.UTC)
+ >>> root['coll_p9'].begin_after = datetime.datetime(
+ ... 2006, 8, 3, tzinfo=pytz.UTC)
+ >>> transaction.commit()
+ >>> worker1.completed.add(root['coll_p8'])
+ >>> worker1.completed.add(root['coll_p6'])
+ >>> worker1.completed.rotate()
+ >>> worker1.completed.rotate()
+ >>> worker1.completed.add(root['coll_p4'])
+ >>> worker1.completed.add(root['coll_p2'])
+ >>> worker1.completed.rotate()
+ >>> worker1.completed.add(root['coll_p0'])
+ >>> worker1.completed.add(root['coll_p1'])
+ >>> worker1.completed.rotate()
+ >>> worker1.completed.add(root['coll_p3'])
+ >>> worker1.completed.add(root['coll_p5'])
+ >>> worker1.completed.rotate()
+ >>> worker1.completed.add(root['coll_p7'])
+ >>> worker1.completed.add(root['coll_p9'])
+ >>> list(worker1.completed) == [
+ ... root['coll_p9'], root['coll_p8'], root['coll_p7'], root['coll_p6'],
+ ... root['coll_p5'], root['coll_p4'], root['coll_p3'], root['coll_p2'],
+ ... root['coll_p1'], root['coll_p0']]
+ True
+ >>> len(worker1.completed)
+ 10
+
+ The `iter` method can simply work like __iter__, but can also take starts
+ and stops for relatively efficient jumps.
+
+ >>> list(worker1.completed.iter()) == [
+ ... root['coll_p9'], root['coll_p8'], root['coll_p7'], root['coll_p6'],
+ ... root['coll_p5'], root['coll_p4'], root['coll_p3'], root['coll_p2'],
+ ... root['coll_p1'], root['coll_p0']]
+ True
+ >>> list(worker1.completed.iter(start=datetime.datetime(
+ ... 2006, 7, 15, tzinfo=pytz.UTC))) == [
+ ... root['coll_p6'],
+ ... root['coll_p5'], root['coll_p4'], root['coll_p3'], root['coll_p2'],
+ ... root['coll_p1'], root['coll_p0']]
+ True
+ >>> list(worker1.completed.iter(start=datetime.datetime(
+ ... 2006, 7, 1, tzinfo=pytz.UTC))) == [
+ ... root['coll_p6'],
+ ... root['coll_p5'], root['coll_p4'], root['coll_p3'], root['coll_p2'],
+ ... root['coll_p1'], root['coll_p0']]
+ True
+ >>> list(worker1.completed.iter(stop=datetime.datetime(
+ ... 2006, 7, 15, tzinfo=pytz.UTC))) == [
+ ... root['coll_p9'], root['coll_p8'], root['coll_p7']]
+ True
+ >>> list(worker1.completed.iter(stop=datetime.datetime(
+ ... 2006, 7, 1, tzinfo=pytz.UTC))) == [
+ ... root['coll_p9'], root['coll_p8'], root['coll_p7']]
+ True
+ >>> list(worker1.completed.iter(stop=datetime.datetime(
+ ... 2006, 6, 30, tzinfo=pytz.UTC))) == [
+ ... root['coll_p9'], root['coll_p8'], root['coll_p7'], root['coll_p6']]
+ True
+ >>> list(worker1.completed.iter(start=datetime.datetime(
+ ... 2006, 7, 1, tzinfo=pytz.UTC), stop=datetime.datetime(
+ ... 2006, 3, 1, tzinfo=pytz.UTC))) == [
+ ... root['coll_p6'], root['coll_p5'], root['coll_p4'], root['coll_p3']]
+ True
+
+ `first` and `last` give you the ability to find limits including given
+ start and stop points, respectively.
+
+ >>> worker1.completed.first() == root['coll_p9']
+ True
+ >>> worker1.completed.last() == root['coll_p0']
+ True
+ >>> worker1.completed.first(
+ ... datetime.datetime(2006, 7, 15, tzinfo=pytz.UTC)) == (
+ ... root['coll_p6'])
+ True
+ >>> worker1.completed.last(
+ ... datetime.datetime(2006, 7, 15, tzinfo=pytz.UTC)) == (
+ ... root['coll_p7'])
+ True
+
+ As you rotate the completed container, older items disappear.
+
+ >>> worker1.completed.rotate()
+ >>> list(worker1.completed.iter()) == [
+ ... root['coll_p9'], root['coll_p7'],
+ ... root['coll_p5'], root['coll_p4'], root['coll_p3'], root['coll_p2'],
+ ... root['coll_p1'], root['coll_p0']]
+ True
+ >>> worker1.completed.rotate() # no change
+ >>> list(worker1.completed.iter()) == [
+ ... root['coll_p9'], root['coll_p7'],
+ ... root['coll_p5'], root['coll_p4'], root['coll_p3'], root['coll_p2'],
+ ... root['coll_p1'], root['coll_p0']]
+ True
+ >>> worker1.completed.rotate()
+ >>> list(worker1.completed.iter()) == [
+ ... root['coll_p9'], root['coll_p7'],
+ ... root['coll_p5'], root['coll_p3'],
+ ... root['coll_p1'], root['coll_p0']]
+ True
+ >>> worker1.completed.rotate()
+ >>> list(worker1.completed.iter()) == [
+ ... root['coll_p9'], root['coll_p7'],
+ ... root['coll_p5'], root['coll_p3']]
+ True
+ >>> worker1.completed.rotate()
+ >>> list(worker1.completed.iter()) == [
+ ... root['coll_p9'], root['coll_p7']]
+ True
+ >>> worker1.completed.rotate()
+ >>> list(worker1.completed.iter()) == []
+ True
+ >>> transaction.commit()
+
+.. [#set_up_reactor] We monkeypatch twisted.internet.reactor
+ (and replace it below).
+
+ >>> import twisted.internet.reactor
+ >>> oldCallLater = twisted.internet.reactor.callLater
+ >>> import bisect
+ >>> class FauxReactor(object):
+ ... def __init__(self):
+ ... self.time = 0
+ ... self.calls = []
+ ... def callLater(self, delay, callable, *args, **kw):
+ ... res = (delay + self.time, callable, args, kw)
+ ... bisect.insort(self.calls, res)
+ ... # normally we're supposed to return something but not needed
+ ... def time_flies(self, time):
+ ... end = self.time + time
+ ... ct = 0
+ ... while self.calls and self.calls[0][0] <= end:
+ ... self.time, callable, args, kw = self.calls.pop(0)
+ ... callable(*args, **kw) # normally this would get try...except
+ ... ct += 1
+ ... self.time = end
+ ... return ct
+ ... def time_passes(self):
+ ... if self.calls and self.calls[0][0] <= self.time:
+ ... self.time, callable, args, kw = self.calls.pop(0)
+ ... callable(*args, **kw)
+ ... return True
+ ... return False
+ ...
+ >>> faux = FauxReactor()
+ >>> twisted.internet.reactor.callLater = faux.callLater
+ >>> time_flies = faux.time_flies
+ >>> time_passes = faux.time_passes
+
+.. [#tear_down_reactor]
+
+ >>> twisted.internet.reactor.callLater = oldCallLater
+
+.. [#set_up_datetime] A monkeypatch, removed in another footnote below.
+
+ >>> import datetime
+ >>> import pytz
+ >>> old_datetime = datetime.datetime
+ >>> def set_now(dt):
+ ... global _now
+ ... _now = _datetime(*dt.__reduce__()[1])
+ ...
+ >>> class _datetime(old_datetime):
+ ... @classmethod
+ ... def now(klass, tzinfo=None):
+ ... if tzinfo is None:
+ ... return _now.replace(tzinfo=None)
+ ... else:
+ ... return _now.astimezone(tzinfo)
+ ... def astimezone(self, tzinfo):
+ ... return _datetime(
+ ... *super(_datetime,self).astimezone(tzinfo).__reduce__()[1])
+ ... def replace(self, *args, **kwargs):
+ ... return _datetime(
+ ... *super(_datetime,self).replace(
+ ... *args, **kwargs).__reduce__()[1])
+ ... def __repr__(self):
+ ... raw = super(_datetime, self).__repr__()
+ ... return "datetime.datetime%s" % (
+ ... raw[raw.index('('):],)
+ ... def __reduce__(self):
+ ... return (argh, super(_datetime, self).__reduce__()[1])
+ >>> def argh(*args, **kwargs):
+ ... return _datetime(*args, **kwargs)
+ ...
+ >>> datetime.datetime = _datetime
+ >>> _now = datetime.datetime(2006, 8, 10, 15, 44, 22, 211, pytz.UTC)
+
+.. [#reinstate]
+
+ >>> for p in (res1, res2, res3, res4, res5):
+ ... p.assignerUUID = None
+ ... res = dm.thread.put(p)
+ ...
+ >>> list(dm.thread) == [res3, res1, res2, res5, res4]
+ True
+ >>> list(dm.thread.iterDue()) == [res3, res1, res2, res5, res4]
+ True
+
+.. [#tear_down_datetime]
+
+ >>> datetime.datetime = old_datetime
Property changes on: zc.async/trunk/src/zc/async/datamanager.txt
___________________________________________________________________
Name: svn:eol-style
+ native
Added: zc.async/trunk/src/zc/async/engine.py
===================================================================
--- zc.async/trunk/src/zc/async/engine.py 2006-08-15 20:31:28 UTC (rev 69534)
+++ zc.async/trunk/src/zc/async/engine.py 2006-08-15 20:33:11 UTC (rev 69535)
@@ -0,0 +1,247 @@
+import uuid
+import Queue
+import thread
+import threading
+import datetime
+import logging
+import pytz
+import twisted.internet.reactor
+import ZODB.POSException
+import transaction
+import transaction.interfaces
+
+import zc.twist
+
+def remove(container, partial, result=None):
+ container.remove(partial)
+ container.__parent__.completed.add(partial)
+
+def perform(p):
+ p()
+ p.addCallback(zc.async.partial.Partial(remove, p.__parent__, p))
+
+class Engine(object):
+ # this intentionally does not have an interface. It would be nicer if this
+ # could be a Twisted service, part of the main Zope service, but that does
+ # not appear easy to arrange at the moment. Therefore we have a subscriber
+ # in subscribers.py that does custom set-up, using raw reactor code.
+ # Eventually I'd like to move this to a service interface, and tie it to
+ # the Zope service in the subscriber.
+
+ _needed = 0
+ alive = True
+
+ def __init__(self, UUID, factory):
+ self.workerUUID = UUID
+ self.factory = factory
+ self.thread_queue = Queue.Queue(0)
+ self._threads = []
+ self.UUID = uuid.uuid4() # this is supposed to distinguish this engine
+ # instance from any others potentially wanting to work on the worker.
+
+ def perform_thread(self):
+ try:
+ job = self.thread_queue.get()
+ while job is not None:
+ db, identifier = job
+ conn = db.open()
+ removal = None
+ try:
+ transaction.begin()
+ p = conn.get(identifier)
+ p.thread = thread.get_ident()
+ transaction.commit()
+ removal = zc.twist.Partial(remove, p.__parent__, p)
+ try:
+ p() # this does the committing and retrying, largely
+ except ZODB.POSException.TransactionError:
+ transaction.abort()
+ while 1:
+ try:
+ p.fail()
+ transaction.commit()
+ except ZODB.POSException.TransactionError:
+ transaction.abort() # retry forever (!)
+ else:
+ break
+ finally:
+ conn.close()
+ if removal is not None:
+ twisted.internet.reactor.callFromThread(removal)
+ job = self.thread_queue.get()
+ finally:
+ # this may cause some bouncing, but we don't ever want to end
+ # up with fewer than needed.
+ twisted.internet.reactor.callFromThread(self.set_threads)
+
+ def set_threads(self, needed=None):
+ # this should only be called from the main thread (otherwise it needs
+ # locks)
+ if needed is None:
+ needed = self._needed
+ else:
+ self._needed = needed
+ res = []
+ ct = 0
+ for t in self._threads:
+ if t.isAlive():
+ res.append(t)
+ ct += 1
+ self._threads[:] = res
+ if ct < needed:
+ for i in range(max(needed - ct, 0)):
+ t = threading.Thread(target=self.perform_thread)
+ self._threads.append(t)
+ t.start()
+ elif ct > needed:
+ # this may cause some bouncing, but hopefully nothing too bad.
+ for i in range(ct - needed):
+ self.thread_queue.put(None)
+
+ def poll(self, datamanager):
+ if not self.alive:
+ return
+ poll_seconds = 0.25
+ call = zc.twist.Partial(self.poll, datamanager)
+ try:
+ tm = transaction.interfaces.ITransactionManager(datamanager)
+ tm.begin()
+ now = datetime.datetime.now(pytz.UTC)
+ worker = datamanager.workers.get(self.workerUUID)
+ if worker is not None:
+ if (worker.engineUUID is not None and
+ worker.engineUUID != self.UUID):
+ # uh-oh. Maybe another engine is in on the action?
+ time_of_death = (worker.last_ping + worker.ping_interval
+ + worker.ping_death_interval)
+ if time_of_death < now:
+ # hm. Looks like it's dead.
+ zc.async.datamanager.cleanDeadWorker(worker)
+ worker.engineUUID = self.UUID
+ else:
+ # this is some other engine's UUID,
+ # and it isn't dead (yet?). Houston, we have a problem.
+ interval = time_of_death - now
+ logging.warning(
+ 'Another engine instance, %s, has claimed worker '
+ '%s. This engine instance, %s, is '
+ "deferring. The other engine will be "
+ "regarded dead and scheduled for removal after "
+ '%d days, %d seconds, and %d microseconds',
+ worker.engineUUID, worker.UUID, self.UUID,
+ interval.days, interval.seconds,
+ interval.microseconds)
+ return # which will call the finally clause
+ else:
+ worker.engineUUID = self.UUID
+ try:
+ tm.commit()
+ except ZODB.POSException.TransactionError:
+ tm.abort()
+ # uh-oh. Somebody else may be adding a worker for the
+ # same UUID. we'll just return for now, and figure that
+ # the next go-round will report the problem.
+ return # will call finally clause
+ else:
+ worker = self.factory(self.workerUUID)
+ datamanager.workers.add(worker)
+ worker.engineUUID = self.UUID
+ try:
+ tm.commit()
+ except ZODB.POSException.TransactionError:
+ tm.abort()
+ # uh-oh. Somebody else may be adding a worker for the
+ # same UUID. we'll just return for now, and figure that
+ # the next go-round will report the problem.
+ return # will call finally clause
+ poll_seconds = worker.poll_seconds
+ datamanager.checkSibling(worker.UUID)
+ try:
+ tm.commit()
+ except ZODB.POSException.TransactionError:
+ tm.abort()
+ # we'll retry next poll.
+ if (worker.completed.last_rotation +
+ worker.completed.rotation_interval) <= now:
+ worker.completed.rotate()
+ try:
+ tm.commit()
+ except ZODB.POSException.TransactionError:
+ tm.abort()
+ # we'll retry next poll.
+ if worker.last_ping + worker.ping_interval <= now:
+ worker.last_ping = now
+ try:
+ tm.commit()
+ except ZODB.POSException.TransactionError:
+ # uh-oh: are there two engines working with the same worker?
+ tm.abort() # and retry next time
+ logging.error(
+ "Transaction error for worker %s. This should not "
+ "happen.", self.workerUUID)
+ return
+ def thread_size():
+ if len(datamanager.workers) == 1:
+ return 1
+ else:
+ return worker.thread_size
+ self.set_threads(thread_size())
+ while len(worker.thread) < thread_size():
+ p = datamanager.thread.pullNext(uuid)
+ if p is not None:
+ worker.thread.add(p)
+ try:
+ tm.commit()
+ except ZODB.POSException.TransactionError:
+ tm.abort()
+ else:
+ self.thread_queue.put((p._p_jar.db(), p._p_oid))
+ else:
+ break
+ self.set_threads(thread_size())
+ while len(worker.reactor) < worker.reactor.size:
+ p = datamanager.reactor.pullNext(uuid)
+ if p is not None:
+ worker.reactor.add(p)
+ try:
+ tm.commit()
+ except ZODB.POSException.TransactionError:
+ tm.abort()
+ else:
+ twisted.internet.reactor.callLater(
+ 0, zc.twist.Partial(perform, p))
+ else:
+ break
+ now = datetime.datetime.now(pytz.UTC)
+ if worker.last_ping + worker.ping_interval <= now:
+ worker.last_ping = now
+ try:
+ tm.commit()
+ except ZODB.POSException.TransactionError:
+ # uh-oh: are there two engines working with the same worker?
+ tm.abort() # and retry next time
+ logging.error(
+ "Transaction error for worker %s. This should not "
+ "happen.", self.workerUUID)
+ return
+ finally:
+ tm.abort()
+ if self.alive:
+ twisted.internet.reactor.callLater(poll_seconds, call)
+
+ def tearDown(self, datamanager):
+ self.alive = False
+ self.set_threads(0)
+ try:
+ tm = transaction.interfaces.ITransactionManager(datamanager)
+ tm.begin()
+ worker = datamanager.workers.get(self.workerUUID)
+ if worker is not None:
+ worker.engineUUID = None
+ datamanager.thread.put(
+ zc.async.partial.Partial(
+ zc.async.datamanager.cleanDeadWorker, worker))
+ tm.commit()
+ finally:
+ tm.abort()
+
Property changes on: zc.async/trunk/src/zc/async/engine.py
___________________________________________________________________
Name: svn:eol-style
+ native
Added: zc.async/trunk/src/zc/async/i18n.py
===================================================================
--- zc.async/trunk/src/zc/async/i18n.py 2006-08-15 20:31:28 UTC (rev 69534)
+++ zc.async/trunk/src/zc/async/i18n.py 2006-08-15 20:33:11 UTC (rev 69535)
@@ -0,0 +1,32 @@
+##############################################################################
+#
+# Copyright (c) 2004 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+
+"""I18N support for the site package.
+
+This defines a `MessageFactory` for the I18N domain for the
+`zc.async` package. This is normally used with this import::
+
+ from i18n import _
+
+The factory is then used normally. Two examples::
+
+ text = _('some internationalized text')
+ text = _('helpful-descriptive-message-id', 'default text')
+"""
+__docformat__ = "reStructuredText"
+
+
+from zope import i18nmessageid
+
+MessageFactory = _ = i18nmessageid.MessageFactory("zc.async")
Property changes on: zc.async/trunk/src/zc/async/i18n.py
___________________________________________________________________
Name: svn:eol-style
+ native
Added: zc.async/trunk/src/zc/async/instanceuuid.py
===================================================================
--- zc.async/trunk/src/zc/async/instanceuuid.py 2006-08-15 20:31:28 UTC (rev 69534)
+++ zc.async/trunk/src/zc/async/instanceuuid.py 2006-08-15 20:33:11 UTC (rev 69535)
@@ -0,0 +1,38 @@
+
+import os
+import uuid
+import zope.interface
+import zc.async.interfaces
+
+# test for this file is in tests.py
+
+msg = """
+------------------------------------------------------------------------
+The value above (and this file) is created and used by the zc.async
+package. It is intended to uniquely identify this software instance when
+it is used to start a zc.async worker process. This allows multiple
+workers to connect to a single database to do work. The software
+expects an instance home to only generate a single process.
+
+To get a new identifier for this software instance, delete this file and
+restart Zope (or more precisely, delete this file, restart Python, and
+import zc.async.instanceuuid). This file will be recreated with a new value.
+"""
+
+zope.interface.classImplements(uuid.UUID, zc.async.interfaces.IUUID)
+
+def getUUID():
+ file_name = os.path.join(
+ os.environ.get("INSTANCE_HOME"), 'etc', 'uuid.txt')
+ if os.path.exists(file_name):
+ f = open(file_name, 'r')
+ UUID = uuid.UUID(f.readline().strip())
+ f.close()
+ else:
+ UUID = uuid.uuid1()
+ f = open(file_name, 'w')
+ f.writelines((str(UUID), msg))
+ f.close()
+ return UUID
+
+UUID = getUUID()
Property changes on: zc.async/trunk/src/zc/async/instanceuuid.py
___________________________________________________________________
Name: svn:eol-style
+ native
Added: zc.async/trunk/src/zc/async/interfaces.py
===================================================================
--- zc.async/trunk/src/zc/async/interfaces.py 2006-08-15 20:31:28 UTC (rev 69534)
+++ zc.async/trunk/src/zc/async/interfaces.py 2006-08-15 20:33:11 UTC (rev 69535)
@@ -0,0 +1,358 @@
+import zope.interface
+import zope.interface.common.mapping
+import zope.interface.common.sequence
+import zope.component.interfaces
+import zc.queue.interfaces
+from zc.async.i18n import _
+
+PENDING = _('pending-state', 'Pending')
+ACTIVE = _('active-state', 'Active')
+CALLBACKS = _('callback-state', 'Performing Callbacks')
+COMPLETED = _('completed-state', 'Completed')
+
+class AbortedError(Exception):
+ """An explicit abort, as generated by the default behavior of
+ IPartial.fail"""
+
+class BadStateError(Exception):
+ """The partial is not in the state it should be for the call being made.
+ This is almost certainly a programmer error."""
+
+class IPartialFactory(zope.interface.Interface):
+
+ def __call__(self, call, *args, **kwargs):
+ """return an IPartial with the given call, args, and kwargs"""
+
+ def bind(self, call, *args, **kwargs):
+ """returns IPartial with the IPartial inserted as first value in args.
+ """
+
+class IPartial(zope.interface.Interface):
+
+ __parent__ = zope.interface.Attribute(
+ """The current canonical location of the partial""")
+
+ callable = zope.interface.Attribute(
+ """The callable object that should be called with *IPartial.args and
+ **IPartial.kwargs when the IPartial is called. Mutable.""")
+
+ args = zope.interface.Attribute(
+ """a peristent list of the args that should be applied to self.call.
+ May include persistent objects (though note that, if passing a method
+ is desired, it will typicall need to be wrapped in an IPartial).""")
+
+ kwargs = zope.interface.Attribute(
+ """a persistent mapping of the kwargs that should be applied to
+ self.call. May include persistent objects (though note that, if
+ passing a method is desired, it will typicall need to be wrapped
+ in an IPartial).""")
+
+ state = zope.interface.Attribute(
+ """One of constants defined in zc.async.interfaces: PENDING,
+ ACTIVE, CALLBACKS, COMPLETED. PENDING means not yet called.
+ ACTIVE means in the process of being called. CALLBACKS means in
+ the process of calling callbacks. COMPLETED means called.""")
+
+ result = zope.interface.Attribute(
+ """The result of the call. When state equals PENDING or ACTIVE, will
+ be None. When COMPLETED, will be a twisted.python.failure.Failure
+ describing the call failure or the successful result.""")
+
+ callbacks = zope.interface.Attribute(
+ """A mutable persistent list of the callback partials added by
+ addCallbacks.""")
+
+ unhandled_error = zope.interface.Attribute(
+ """A boolean: whether this partial has an unhandled error.
+ An unhandled error is defined as a Failure result on any callback
+ leaf node, or a Failure on this partial if this has no callbacks
+ or if it has one or more incomplete callback.""")
+
+ annotations = zope.interface.Attribute(
+ """An OOBTree that is available for metadata use.""")
+
+ def addCallbacks(success=None, failure=None):
+ """if success or failure is not None, adds a callback partial to
+ self.callbacks and returns the partial. Otherwise returns self.
+ success and failure must be None or adaptable to IPartial.
+ addCallbacks may be called multiple times. Each will be called
+ with the result of this partial. If callback is already in COMPLETED
+ state then the callback will be performed immediately."""
+
+ def addCallback(callback):
+ """callback will receive result (independent of whether it is a
+ success or a failure). callback must be adaptable to IPartial.
+ addCallback may be called multiple times. Each will be called
+ with the result of this partial. If callback is already in
+ COMPLETED state then the callback will be performed immediately."""
+
+ def __call__(*args, **kwargs):
+ """call the callable. Any given args are effectively appended to
+ self.args for the call, and any kwargs effectively update self.kwargs
+ for the call."""
+
+ def fail(e=AbortedError):
+ """Fail this partial, with option error e. May only be called when
+ partial is in PENDING or ACTIVE states, or else raises BadStateError.
+ If e is not provided,"""
+
+ def resumeCallbacks():
+ """Make all callbacks remaining for this partial. Any callbacks
+ that are in PENDING state should be called normally; any callbacks
+ in ACTIVE state should be `fail`ed; any callbacks in CALLBACKS state
+ should `resumeCallback`; and any callbacks in COMPLETED state should
+ be untouched. May only be called when partial is in CALLBACKS state.
+ State will be COMPLETED after this call."""
+
+class IDataManagerPartial(IPartial):
+ """An async partial with all the necessary knobs to by put in a
+ datamanager."""
+
+ workerUUID = zope.interface.Attribute(
+ """The UUID of the IWorker who is, or was, responsible for this
+ partial. None initially. Should be assigned by
+ IWorker.[reactor|thread].put.""")
+
+ assignerUUID = zope.interface.Attribute(
+ """The UUID of the software instance that was in charge when the
+ IPartial was put in an IPartialQueue. Should be assigned by
+ IPartialQueue.put.""")
+
+ selectedUUIDs = zope.interface.Attribute(
+ """a set of selected worker UUIDs. If it is empty, it is
+ interpreted as the set of all available workerUUIDs. Only
+ workers with UUIDs in the set may perform it.
+
+ If a worker would have selected this partial for a run, but the
+ difference of selected_workerUUIDs and excluded_workerUUIDs
+ stopped it, it is responsible for verifying that the effective
+ set of workerUUIDs intersects with the available workers; if the
+ intersection contains no possible workers, the worker should
+ call partial.fail().""")
+
+ excludedUUIDs = zope.interface.Attribute(
+ """a set of excluded worker UUIDs. Workers with UUIDs in this
+ set may not perform the partial.
+
+ If a worker would have selected this partial for a run, but the
+ difference of selected_workerUUIDs and excluded_workerUUIDs
+ stopped it, it is responsible for verifying that the effective
+ set of workerUUIDs intersects with the available workers; if the
+ intersection contains no possible workers, the worker should
+ call partial.fail().""")
+
+ begin_after = zope.interface.Attribute(
+ """A datetime.datetime in UTC of the first time when the
+ partial may run. Cannot be set after partial gets a data_manager.
+ """)
+
+ begin_by = zope.interface.Attribute(
+ """A datetime.timedelta of the duration after the begin_after
+ value after which the partial will fail, if it has not already
+ begun. Cannot be set after partial has begun.""")
+
+ thread = zope.interface.Attribute(
+ """None or thread.get_ident() of the worker who performs it. If a
+ reactor partial, must be None.""")
+
+class IPartialQueue(zc.queue.interfaces.IQueue):
+
+ __parent__ = zope.interface.Attribute(
+ """the IDataManager of which this is a part.""")
+
+ thread = zope.interface.Attribute(
+ """boolean of whether this is a thread or reactor queue""")
+
+ def put(item, begin_after=None, begin_by=None):
+ """Put an IPartial adapted from item into the queue. Returns IPartial.
+
+ Rememeber that IPartials are not guaranteed to be run in order
+ added to a queue. If you need sequencing, use
+ IPartial.addCallbacks.
+
+ item must be an IPartial, or be adaptable to that interface.
+ begin_after must be None (to leave the partial's current value) or a
+ datetime.datetime. begin_by must be None (to leave it alone) or a
+ datetime.timedelta of the duration after the begin_after.
+
+ If item.begin_after is None and begin_after is None, begin_after will
+ effectively be now. If item.begin_by is None and begin_by is None,
+ begin_by will effectively be datetime.timedelta(hours=1).
+
+ datetime.datetimes are suggested to be in UTC. Timezone-naive
+ datetimes will be interpreted as in UTC. Timezone-aware datetimes
+ will be converted to UTC, and errors because of this (such as
+ pytz ambiguity errors) will be raised.
+
+ When an IPartial is put in the queue, the queue puts the
+ begin_after time and begin_by duration on the partial,
+ and the UUID of the Zope instance that put the partial in the
+ queue on the `assignerUUID`.
+ """
+
+ def iterDue():
+ """return an iterable of all partials whose begin_after value is
+ less than or equal to now. Any expired partials (begin_after +
+ begin_by > datetime.datetime.now(pytz.UTC)) are also included.
+ """
+
+ def pullNext(UUID):
+ """returns first due job that is available for the given UUID,
+ removing it from the queue as appropriate; or None, if none are
+ available. Responsible for including jobs to fail expired
+ partials, and jobs to decomission dead workers for the next
+ highest worker (sorted by UUID) if its (last_ping +
+ ping_interval + ping_death_interval) < now. If this is the
+ highest worker UUID, cycles around to lowest."""
+
+class IWorkers(zope.interface.common.mapping.IEnumerableMapping):
+
+ __parent__ = zope.interface.Attribute(
+ """the IDataManager of which this is a part.""")
+
+ def add(value):
+ """add an IWorker with key of value.UUID. If value.UUID is None,
+ raise ValueError. Set value.__parent__ to the IWorkers."""
+
+ def remove(UUID):
+ """remove the registered IWorker with the give UUID. Raise KeyError
+ if such does not exist."""
+
+class IDataManager(zope.interface.Interface):
+ """Note that partials added to queues are not guaranteed to run in
+ the order added. For sequencing, use IPartial.addCallbacks."""
+
+ thread = zope.interface.Attribute(
+ """An IPartialQueue of IPartials that should be run in a thread.""")
+
+ reactor = zope.interface.Attribute(
+ """An IPartialQueue of IPartials that should be run in the main
+ loop (e.g., Twisted's main reactor loop).""")
+
+ workers = zope.interface.Attribute(
+ """An IWorkers of registered IWorker objects for this data manager;
+ these objects represent processes that are responsible for claiming
+ and performing the IPartials in the data manager.""")
+
+ def checkSibling(uuid):
+ """check the next sibling of uuid to see if it is dead, according
+ to its last_poll, and remove the engineUUID and schedule removal of its
+ partials if it is dead."""
+
+class IDataManagerAvailableEvent(zope.component.interfaces.IObjectEvent):
+ """object is data manager"""
+
+class DataManagerAvailable(zope.component.interfaces.ObjectEvent):
+ zope.interface.implements(IDataManagerAvailableEvent)
+
+class FullError(Exception):
+ """Container is full.
+ """
+
+class ISizedSequence(zope.interface.common.sequence.IFiniteSequence):
+ size = zope.interface.Attribute(
+ """an integer. If the queue's len >= size, put will raise
+ FullError""")
+
+ def add(item):
+ """same contract as IQueue.put, except if queue's len >= size, put will
+ raise FullError, and all objects get __parent__ set to the queue;
+ and it will only store partials."""
+
+ __parent__ = zope.interface.Attribute(
+ """a link to parent: an IWorker""")
+
+ def remove(item):
+ """remove item, or raise ValueError if item is not in queue"""
+
+ def __delitem__(index):
+ """delete item at index"""
+
+ def index(item):
+ """return index, or raise ValueError if item is not in queue"""
+
+class ICompletedCollection(zope.interface.Interface):
+ def __iter__():
+ """Iterate over partials in collection, from most recent `begin_after`
+ to oldest"""
+
+ def iter(start=None, stop=None):
+ """Iterate over partials in collection, starting and stopping with
+ given timezone-aware datetime values reasonably efficiently."""
+
+ def __len__():
+ """Return number of partials in collection"""
+
+ def add(partial):
+ """Add partial to collection and set __parent__ to the collection."""
+
+ __parent__ = zope.interface.Attribute(
+ """an IWorker""")
+
+ rotation_interval = zope.interface.Attribute(
+ """A datetime.timedelta of how often the buckets in the collection
+ should be rotated, to clean them out.""")
+
+ last_rotation = zope.interface.Attribute(
+ """A datetime.datetime in pytz.UTC of the last time a rotation was
+ performed (should initialize to the creation time).""")
+
+ def first(start=None):
+ """Return the first (most recent) partial in the collection, starting
+ with optional timezone-aware datetime."""
+
+ def last(stop=None):
+ """Return the last (oldest) partial in the collection, stopping
+ with optional timezone-aware datetime."""
+
+ def __nonzero__():
+ "whether collection contains any partials"
+
+ def rotate():
+ """rotate buckets, eliminating the ones added longest ago. Note that
+ this may be different than the ordering by begin_after."""
+
+class IWorker(zope.interface.Interface):
+
+ reactor = zope.interface.Attribute(
+ """An ISizedQueue of reactor partials currently being worked on by this
+ worker.""")
+
+ thread = zope.interface.Attribute(
+ """An ISizedQueue of thread partials currently being worked on by this
+ worker.""")
+
+ UUID = zope.interface.Attribute(
+ """The uuid.UUID that identifies this worker (where one instance ==
+ one process == one worker == one UUID).""")
+
+ engineUUID = zope.interface.Attribute(
+ """The uuid.UUID of the engine that is running this worker, or None.""")
+
+ last_ping = zope.interface.Attribute(
+ """the datetime.datetime in the pytz.UTC timezone of the last ping.
+ This date should be updated anytime a worker accepts a job in a
+ reactor or thread queue; and whenever, during a poll,
+ (last_ping + ping_interval) <= now.""")
+
+ poll_seconds = zope.interface.Attribute(
+ """The number of seconds between the end of one worker poll and the
+ start of the next.""")
+
+ ping_interval = zope.interface.Attribute(
+ """The approximate maximum datetime.timedelta between pings before
+ a new last_ping should be recorded.""")
+
+ ping_death_interval = zope.interface.Attribute(
+ """the datetime.timedelta after the last_ping + ping_interval that
+ signifies the workers death. That is, if (last_ping + ping_interval +
+ ping_death_interval) < now, the worker should be regarded as dead.
+ """)
+
+ completed = zope.interface.Attribute(
+ """The most recent partials completed by this worker, in the order
+ from most recent `begin_after` to oldest. ICompletedCollection.""")
+
+class IUUID(zope.interface.Interface):
+ """A marker interface for the API of Ka-Ping Yee's uuid.UUID class.
+ See http://zesty.ca/python/uuid.html """
Property changes on: zc.async/trunk/src/zc/async/interfaces.py
___________________________________________________________________
Name: svn:eol-style
+ native
Added: zc.async/trunk/src/zc/async/partial.py
===================================================================
--- zc.async/trunk/src/zc/async/partial.py 2006-08-15 20:31:28 UTC (rev 69534)
+++ zc.async/trunk/src/zc/async/partial.py 2006-08-15 20:33:11 UTC (rev 69535)
@@ -0,0 +1,246 @@
+
+import types
+
+import BTrees.OOBTree
+import ZODB.POSException
+import transaction.interfaces
+import persistent
+import persistent.list
+import persistent.mapping
+import twisted.internet.defer
+import twisted.python.failure
+import zope.interface
+import zc.queue
+
+import zc.async.interfaces
+import zc.twist
+from zc.async import rwproperty
+
+def success_or_failure(success, failure, res):
+ callable = None
+ if isinstance(res, twisted.python.failure.Failure):
+ if failure is not None:
+ callable = failure
+ elif success is not None:
+ callable = success
+ if callable is None:
+ return res
+ return callable(res)
+
+def completeStartedPartialArguments(partial, result):
+ if isinstance(result, twisted.python.failure.Failure):
+ for collection in (partial.args, partial.kwargs.values()):
+ for a in collection:
+ if (zc.async.interfaces.IPartial.providedBy(a) and
+ a.state not in (
+ zc.async.interfaces.PENDING,
+ zc.async.interfaces.COMPLETED)):
+ if a.state == zc.async.interfaces.ACTIVE:
+ a.fail()
+ elif a.state == zc.async.interfaces.CALLBACKS:
+ a.resumeCallbacks()
+ return result
+
+class Partial(persistent.Persistent):
+
+ zope.interface.implements(zc.async.interfaces.IPartial)
+ zope.interface.classProvides(zc.async.interfaces.IPartialFactory)
+
+ __parent__ = _callable_root = _callable_name = _result = None
+ _state = zc.async.interfaces.PENDING
+
+ def __init__(self, *args, **kwargs):
+ self.args = persistent.list.PersistentList(args)
+ self.callable = self.args.pop(0)
+ self.kwargs = persistent.mapping.PersistentMapping(kwargs)
+ self.callbacks = zc.queue.PersistentQueue()
+ self.annotations = BTrees.OOBTree.OOBTree()
+
+ @property
+ def result(self):
+ return self._result
+
+ @property
+ def state(self):
+ return self._state
+
+ @property
+ def unhandled_error(self):
+ if (self.state in (zc.async.interfaces.COMPLETED,
+ zc.async.interfaces.CALLBACKS) and
+ isinstance(self.result, twisted.python.failure.Failure)):
+ ct = 0
+ for c in self.callbacks:
+ if (c.state not in (zc.async.interfaces.COMPLETED,
+ zc.async.interfaces.CALLBACKS) or
+ c.unhandled_error):
+ return True
+ ct += 1
+ if not ct:
+ return True
+ return False
+
+ @classmethod
+ def bind(klass, *args, **kwargs):
+ res = klass(*args, **kwargs)
+ res.args.insert(0, res)
+ return res
+
+ @property
+ def callable(self):
+ if self._callable_name is None:
+ return self._callable_root
+ else:
+ return getattr(self._callable_root, self._callable_name)
+ @rwproperty.setproperty
+ def callable(self, value):
+ # can't pickle/persist methods by default as of this writing, so we
+ # add the sugar ourselves
+ if self.state != zc.async.interfaces.PENDING:
+ raise zc.async.interfaces.BadStateError(
+ 'can only set callable when a partial is in PENDING state')
+ if isinstance(value, types.MethodType):
+ self._callable_root = value.im_self
+ self._callable_name = value.__name__
+ else:
+ self._callable_root, self._callable_name = value, None
+
+ def addCallbacks(self, success=None, failure=None):
+ if success is not None or failure is not None:
+ if success is not None:
+ success = zc.async.interfaces.IPartial(success)
+ if failure is not None:
+ failure = zc.async.interfaces.IPartial(failure)
+ res = Partial(success_or_failure, success, failure)
+ if success is not None:
+ success.__parent__ = res
+ if failure is not None:
+ failure.__parent__ = res
+ self.addCallback(res)
+ # we need to handle the case of callbacks on the internal success/
+ # failure partials, to be safe.
+ abort_handler = zc.async.interfaces.IPartial(
+ completeStartedPartialArguments)
+ abort_handler.args.append(res)
+ res = res.addCallback(abort_handler)
+ else:
+ res = self
+ return res
+
+ def addCallback(self, callback):
+ callback = zc.async.interfaces.IPartial(callback)
+ self.callbacks.put(callback)
+ callback.__parent__ = self
+ if self.state == zc.async.interfaces.COMPLETED:
+ callback(self.result) # this commits transactions!
+ else:
+ self._p_changed = True # to try and fire conflict errors if
+ # our reading of self.state has changed beneath us
+ return callback
+
+ def __call__(self, *args, **kwargs):
+ if self.state != zc.async.interfaces.PENDING:
+ raise zc.async.interfaces.BadStateError(
+ 'can only call a partial in PENDING state')
+ tm = transaction.interfaces.ITransactionManager(self)
+ self._state = zc.async.interfaces.ACTIVE
+ tm.commit()
+ effective_args = list(args)
+ effective_args[0:0] = self.args
+ effective_kwargs = dict(self.kwargs)
+ effective_kwargs.update(kwargs)
+ return self._call_with_retry(
+ lambda: self.callable(*effective_args, **effective_kwargs))
+
+ def _call_with_retry(self, call):
+ ct = 0
+ tm = transaction.interfaces.ITransactionManager(self)
+ res = None
+ while 1:
+ try:
+ res = call()
+ if zc.async.interfaces.IPartial.providedBy(res):
+ res.addCallback(self._callback)
+ elif isinstance(res, twisted.internet.defer.Deferred):
+ res.addBoth(zc.twist.Partial(self._callback))
+ else:
+ if isinstance(res, twisted.python.failure.Failure):
+ res = zc.twist.sanitize(res)
+ self._result = res
+ self._state = zc.async.interfaces.CALLBACKS
+ tm.commit()
+ except ZODB.POSException.TransactionError:
+ tm.abort()
+ ct += 1
+ if ct >= 5:
+ res = self._result = zc.twist.sanitize(
+ twisted.python.failure.Failure())
+ self._state = zc.async.interfaces.CALLBACKS
+ tm.commit()
+ self.resumeCallbacks()
+ else:
+ continue
+ except zc.twist.EXPLOSIVE_ERRORS:
+ tm.abort()
+ raise
+ except:
+ tm.abort()
+ res = self._result = zc.twist.sanitize(
+ twisted.python.failure.Failure())
+ self._state = zc.async.interfaces.CALLBACKS
+ tm.commit()
+ self.resumeCallbacks()
+ else:
+ if self.state == zc.async.interfaces.CALLBACKS:
+ self.resumeCallbacks()
+ return res
+
+ def _callback(self, res):
+ self._call_with_retry(lambda: res)
+
+ def fail(self, e=None):
+ if e is None:
+ e = zc.async.interfaces.AbortedError()
+ if self.state not in (zc.async.interfaces.PENDING,
+ zc.async.interfaces.ACTIVE):
+ raise zc.async.interfaces.BadStateError(
+ 'can only call fail on a partial in PENDING or ACTIVE states')
+ tm = transaction.interfaces.ITransactionManager(self)
+ self._result = zc.twist.sanitize(
+ twisted.python.failure.Failure(e))
+ self._state = zc.async.interfaces.CALLBACKS
+ tm.commit()
+ self.resumeCallbacks()
+
+ def resumeCallbacks(self):
+ if self.state != zc.async.interfaces.CALLBACKS:
+ raise zc.async.interfaces.BadStateError(
+ 'can only resumeCallbacks on a partial in CALLBACKS state')
+ callbacks = list(self.callbacks)
+ tm = transaction.interfaces.ITransactionManager(self)
+ length = 0
+ while 1:
+ for p in callbacks:
+ if p.state == zc.async.interfaces.PENDING:
+ p(self.result)
+ elif p.state == zc.async.interfaces.ACTIVE:
+ p.fail()
+ elif p.state == zc.async.interfaces.CALLBACKS:
+ p.resumeCallbacks()
+ # TODO: this shouldn't raise anything we want to catch, right?
+ # now, this should catch all the errors except EXPLOSIVE_ERRORS
+ # cleaning up dead partials should look something like the above.
+ tm.commit()
+ # it's possible that someone added some callbacks run until
+ # we're exhausted.
+ length += len(callbacks)
+ callbacks = list(self.callbacks)[length:]
+ if not callbacks:
+ try:
+ self._state = zc.async.interfaces.COMPLETED
+ tm.commit()
+ except ZODB.POSException.TransactionError:
+ tm.abort()
+ callbacks = list(self.callbacks)[length:]
+ else:
+ break # and return
Property changes on: zc.async/trunk/src/zc/async/partial.py
___________________________________________________________________
Name: svn:eol-style
+ native
Added: zc.async/trunk/src/zc/async/partial.txt
===================================================================
--- zc.async/trunk/src/zc/async/partial.txt 2006-08-15 20:31:28 UTC (rev 69534)
+++ zc.async/trunk/src/zc/async/partial.txt 2006-08-15 20:33:11 UTC (rev 69535)
@@ -0,0 +1,912 @@
+========
+Partials
+========
+
+What if you want to persist a reference to the method of a persistent
+object--you can't persist that normally in the ZODB, but that can be
+very useful, especially to store asynchronous calls. What if you want
+to act on the result of an asynchronous call that may be called later?
+The zc.async package offers an approach modelled loosely on the Twisted
+deferred code: `zc.async.partial.Partial`. To use it, simply wrap the
+callable--a method of a persistent object or a callable persistent
+object or a global function--in the partial. You can include ordered
+and keyword arguments to the partial, which may be persistent objects or
+simply pickleable objects.
+
+Unlike a normal partial, the result of the wrapped call goes on the
+partial's 'result' attribute, and the immediate return of the call might
+not be the end result. It could also be a failure, indicating an
+exception; or another partial, indicating that we are waiting to be
+called back by the second partial; or a twisted deferred, indicating
+that we are waiting to be called back by a twisted Deferred (see the
+`twist` module, also in this package). After you have the partial, you
+can then use a number of methods and attributes on the partial for
+further set up. Let's show the most basic use first, though.
+
+Note that, even though this looks like an interactive prompt, all
+functions and classes defined in this document act as if they were
+defined within a module. Classes and functions defined in an interactive
+prompt are normally not picklable, and the async Partial must work with
+picklable objects [#set_up]_.
+
+ >>> import zc.async.partial
+ >>> def call():
+ ... print 'hello world'
+ ... return 'my result'
+ ...
+ >>> p = root['p'] = zc.async.partial.Partial(call)
+ >>> import transaction
+ >>> transaction.commit()
+
+Now we have a partial [#verify]_. We can see that the state is PENDING,
+call it, and then see that the function was called, and see the result on
+the partial.
+
+ >>> import zc.async.interfaces
+ >>> p.state == zc.async.interfaces.PENDING
+ True
+ >>> res = p()
+ hello world
+ >>> p.result
+ 'my result'
+ >>> p.state == zc.async.interfaces.COMPLETED
+ True
+
+The result of the partial also happens to be the end result of the call,
+but as mentioned above, the partial may return a deferred or another partial.
+
+ >>> res
+ 'my result'
+
+We can also use a method of a persistent object. Imagine we have a ZODB
+root that we can put objects in to.
+
+ >>> import persistent
+ >>> class Demo(persistent.Persistent):
+ ... counter = 0
+ ... def increase(self, value=1):
+ ... self.counter += value
+ ...
+ >>> demo = root['demo'] = Demo()
+ >>> demo.counter
+ 0
+ >>> p = root['p'] = zc.async.partial.Partial(demo.increase)
+ >>> transaction.commit()
+ >>> p() # result is None
+ >>> demo.counter
+ 1
+
+So our two calls so far have returned direct successes. This one returns
+a failure, because the wrapped call raises an exception.
+
+ >>> def callFailure():
+ ... raise RuntimeError('Bad Things Happened Here')
+ ...
+ >>> p = root['p'] = zc.async.partial.Partial(callFailure)
+ >>> transaction.commit()
+ >>> res = p()
+ >>> p.result
+ <twisted.python.failure.Failure exceptions.RuntimeError>
+
+These are standard twisted Failures, except that frames in the stored
+traceback have been converted to reprs, so that we don't keep references
+around when we pass the Failures around (over ZEO, for instance)
+[#no_live_frames]_. This doesn't stop us from getting nice tracebacks,
+though.
+
+ >>> print p.result.getTraceback() # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
+ Traceback (most recent call last):
+ ...
+ exceptions.RuntimeError: Bad Things Happened Here
+
+Note that all calls can return a failure explicitly, rather than raising
+an exception that the partial converts to an exception. However, there
+is an important difference in behavior. If a wrapped call raises an
+exception, the partial aborts the transaction; but if the wrapped call
+returns a failure, no abort occurs. Wrapped calls that explicitly return
+failures are thus responsible for any necessary transaction aborts. See
+the footnote for an example [#explicit_failure_example]_.
+
+Now let's return a partial. This generally represents a result that is waiting
+on another asynchronous persistent call, which would normally be called by
+a worker. We'll fire the second call ourselves for this demonstration.
+
+ >>> def innerCall():
+ ... return 42
+ ...
+ >>> ip = root['ip'] = zc.async.partial.Partial(innerCall)
+ >>> def callPartial():
+ ... return ip
+ ...
+ >>> p = root['p'] = zc.async.partial.Partial(callPartial)
+ >>> transaction.commit()
+ >>> res = p()
+ >>> res is ip
+ True
+
+While we are waiting for the result, the state is ACTIVE.
+
+ >>> p.state == zc.async.interfaces.ACTIVE
+ True
+
+When we call the inner partial, the result will be placed on the outer partial.
+
+ >>> p.result # None
+ >>> res = ip()
+ >>> p.result
+ 42
+ >>> p.state == zc.async.interfaces.COMPLETED
+ True
+
+This is accomplished with callbacks, discussed below in the Callbacks_ section.
+
+Now we'll return a deferred. The story is almost identical to the
+partial story, except that, in our demonstration, we must handle
+transactions, because the deferred story uses the `twist` module in
+this package to let the Twisted reactor communicate safely with the
+ZODB: see twist.txt for details.
+
+ >>> import twisted.internet.defer
+ >>> inner_d = twisted.internet.defer.Deferred()
+ >>> def callDeferred():
+ ... return inner_d
+ ...
+ >>> p = root['p2'] = zc.async.partial.Partial(callDeferred)
+ >>> transaction.commit()
+ >>> res = p()
+ >>> res is inner_d
+ True
+ >>> p.state == zc.async.interfaces.ACTIVE
+ True
+ >>> p.result # None
+
+After the deferred receives its result, we need to sync our connection to see
+it.
+
+ >>> inner_d.callback(42)
+ >>> p.result # still None; we need to sync our connection to see the result
+ >>> p.state == zc.async.interfaces.ACTIVE # it's completed, but need to sync
+ True
+ >>> trans = transaction.begin() # sync our connection
+ >>> p.result
+ 42
+ >>> p.state == zc.async.interfaces.COMPLETED
+ True
+
+As the last step in looking at the basics, let's look at passing arguments
+into the partial. They can be persistent objects or generally picklable
+objects, and they can be ordered or keyword arguments.
+
+ >>> class PersistentDemo(persistent.Persistent):
+ ... def __init__(self, value=0):
+ ... self.value = value
+ ...
+ >>> root['demo2'] = PersistentDemo()
+ >>> import operator
+ >>> def argCall(ob, ob2=None, value=0, op=operator.add):
+ ... for o in (ob, ob2):
+ ... if o is not None:
+ ... o.value = op(o.value, value)
+ ...
+ >>> p = root['p3'] = zc.async.partial.Partial(
+ ... argCall, root['demo2'], value=4)
+ >>> transaction.commit()
+ >>> p()
+ >>> root['demo2'].value
+ 4
+
+And, of course, this is a partial: we can specify some arguments when the
+partial is made, and some when it is called.
+
+ >>> root['demo3'] = PersistentDemo(10)
+ >>> p = root['p3'] = zc.async.partial.Partial(
+ ... argCall, root['demo2'], value=4)
+ >>> transaction.commit()
+ >>> p(root['demo3'], op=operator.mul)
+ >>> root['demo2'].value
+ 16
+ >>> root['demo3'].value
+ 40
+
+This last feature makes partials possible to use for callbacks: our next
+topic.
+
+Callbacks
+---------
+
+The partial object can also be used to handle return values and
+exceptions from the call. The `addCallbacks` method enables the
+functionality. Its signature is (success=None, failure=None). It may
+be called multiple times, each time adding a success and/or failure
+callable that takes an end result: a value or a zc.async.Failure object,
+respectively. Failure objects are passed to failure callables, and
+any other results are passed to success callables.
+
+The return value of the success and failure callables is
+important for chains and for determining whether a partial had any
+errors that need logging, as we'll see below. The call to
+`addCallbacks` returns a partial, which can be used for chaining (see
+`Chaining Callbacks`_).
+
+Let's look at a simple example.
+
+ >>> def call(*args):
+ ... res = 1
+ ... for a in args:
+ ... res *= a
+ ... return res
+ ...
+ >>> def callback(res):
+ ... return 'the result is %r' % (res,)
+ ...
+ >>> p = root['p4'] = zc.async.partial.Partial(call, 2, 3)
+ >>> p_callback = p.addCallbacks(callback)
+ >>> transaction.commit()
+ >>> res = p(4)
+ >>> p.result
+ 24
+ >>> res
+ 24
+ >>> p_callback.result
+ 'the result is 24'
+
+We can now introduce another new concept: unhandled errors. A partial
+with a failure is considered to have an unhandled error if any leaf-node
+callback has a failure result, or if it itself has a failure result and
+has no callbacks. This convention, if followed, can be used to
+determine whether to highlight the partial as an error in logs or other
+situations. However, it is only a convention as far as the partial is
+concerned (other elements of the zc.async package may treat it more
+seriously).
+
+ >>> p.unhandled_error
+ False
+ >>> def error():
+ ... raise RuntimeError('Boo!')
+ ...
+ >>> p = root['p3'] = zc.async.partial.Partial(error)
+ >>> transaction.commit()
+ >>> f = p()
+ >>> p.result
+ <twisted.python.failure.Failure exceptions.RuntimeError>
+ >>> p.unhandled_error
+ True
+ >>> def handleRuntime(f):
+ ... f.trap(RuntimeError)
+ ...
+ >>> p_callback = p.addCallbacks(failure=handleRuntime)
+ >>> p_callback.state == zc.async.interfaces.COMPLETED
+ True
+ >>> p_callback.result # None
+ >>> p_callback.unhandled_error
+ False
+ >>> p.unhandled_error
+ False
+
+Here are some callback examples adding a success and a failure
+simultaneously. This one causes a success...
+
+ >>> def multiply(first, second, third=None):
+ ... res = first * second
+ ... if third is not None:
+ ... res *= third
+ ... return res
+ ...
+ >>> p = root['p'] = zc.async.partial.Partial(multiply, 5, 3)
+ >>> transaction.commit()
+ >>> def success(res):
+ ... print "success!", res
+ ...
+ >>> def failure(f):
+ ... print "failure.", f
+ ...
+ >>> p.addCallbacks(success, failure) # doctest: +ELLIPSIS
+ <zc.async.partial.Partial object at ...>
+ >>> res = p()
+ success! 15
+
+...and this one a failure.
+
+ >>> p = root['p'] = zc.async.partial.Partial(multiply, 5, None)
+ >>> transaction.commit()
+ >>> p.addCallbacks(success, failure) # doctest: +ELLIPSIS
+ <zc.async.partial.Partial object at ...>
+ >>> res = p() # doctest: +ELLIPSIS
+ failure. [Failure instance: Traceback: exceptions.TypeError...]
+
+you can also add multiple callbacks.
+
+ >>> def also_success(val):
+ ... print "also a success!", val
+ ...
+ >>> def also_failure(f):
+ ... print "also a failure.", f
+ ...
+ >>> p = root['p'] = zc.async.partial.Partial(multiply, 5, 3)
+ >>> transaction.commit()
+ >>> p.addCallbacks(success) # doctest: +ELLIPSIS
+ <zc.async.partial.Partial object at ...>
+ >>> p.addCallbacks(also_success) # doctest: +ELLIPSIS
+ <zc.async.partial.Partial object at ...>
+ >>> res = p()
+ success! 15
+ also a success! 15
+
+ >>> p = root['p'] = zc.async.partial.Partial(multiply, 5, None)
+ >>> transaction.commit()
+ >>> p.addCallbacks(failure=failure) # doctest: +ELLIPSIS
+ <zc.async.partial.Partial object at ...>
+ >>> p.addCallbacks(failure=also_failure) # doctest: +ELLIPSIS
+ <zc.async.partial.Partial object at ...>
+ >>> res = p() # doctest: +ELLIPSIS
+ failure. [Failure instance: Traceback: exceptions.TypeError...]
+ also a failure. [Failure instance: Traceback: exceptions.TypeError...]
+
+Chaining Callbacks
+------------------
+
+Sometimes it's desirable to have a chain of callables, so that one callable
+effects the input of another. The returned partial from addCallables can
+be used for that purpose. Effectively, the logic for addCallables is this:
+
+ def success_or_failure(success, failure, res):
+ if zc.async.interfaces.IFailure.providedBy(res):
+ if failure is not None:
+ res = failure(res)
+ elif success is not None:
+ res = success(res)
+ return res
+
+ class Partial(...):
+ ...
+ def addCallbacks(self, success=None, failure=None):
+ if success is None and failure is None:
+ return
+ res = Partial(success_or_failure, success, failure)
+ self.callbacks.append(res)
+ return res
+
+Here's a simple chain, then. We multiply 5 * 3, then that result by 4, then
+print the result in the `success` function.
+
+ >>> p = root['p'] = zc.async.partial.Partial(multiply, 5, 3)
+ >>> transaction.commit()
+ >>> p.addCallbacks(zc.async.partial.Partial(multiply, 4)
+ ... ).addCallbacks(success) # doctest: +ELLIPSIS
+ <zc.async.partial.Partial object at ...>
+ >>> res = p()
+ success! 60
+
+A less artificial use case is to handle errors (like try...except) or do
+cleanup (like try...finally). Here's an example of handling errors.
+
+ >>> def handle_failure(f):
+ ... return 0
+ >>> p = root['p'] = zc.async.partial.Partial(multiply, 5, None)
+ >>> transaction.commit()
+ >>> p.addCallbacks(
+ ... failure=handle_failure).addCallbacks(success) # doctest: +ELLIPSIS
+ <zc.async.partial.Partial object at ...>
+ >>> res = p()
+ success! 0
+
+If you recall our discussion of unhandled errors above, then you know
+this means that even though the top partial has a failure, unhandled_error
+is False.
+
+ >>> isinstance(p.result, twisted.python.failure.Failure)
+ True
+ >>> p.unhandled_error
+ False
+
+Callbacks on Completed Partial
+------------------------------
+
+When you add a callback to a partial that has been completed, it is performed
+immediately.
+
+ >>> p = root['p'] = zc.async.partial.Partial(multiply, 5, 2)
+ >>> transaction.commit()
+ >>> res = p()
+ >>> p.result
+ 10
+ >>> p.state == zc.async.interfaces.COMPLETED
+ True
+ >>> p_callback = p.addCallbacks(zc.async.partial.Partial(multiply, 3))
+ >>> p_callback.result
+ 30
+ >>> p.state == zc.async.interfaces.COMPLETED
+ True
+
+Chaining Partials
+-----------------
+
+It's also possible to achieve a somewhat similar pattern by using a
+partial as a success or failure callable, and then add callbacks to the
+second partial. This differs from the other approach in that you are only
+adding callbacks to one side, success or failure, not the effective
+combined result.
+
+ >>> p = root['p'] = zc.async.partial.Partial(multiply, 5, 3)
+ >>> transaction.commit()
+ >>> p_callback = p.addCallbacks(success)
+ >>> p2 = zc.async.partial.Partial(multiply, 4)
+ >>> p_callback_2 = p.addCallbacks(p2)
+ >>> p_callback_3 = p2.addCallbacks(also_success)
+ >>> res = p()
+ success! 15
+ also a success! 60
+
+This can be used to handle failures, to some degree.
+
+ >>> def handle_failure(f):
+ ... return 0
+ >>> p = root['p'] = zc.async.partial.Partial(multiply, 5, None)
+ >>> transaction.commit()
+ >>> p_callback = p.addCallbacks(failure=failure)
+ >>> p2 = zc.async.partial.Partial(handle_failure)
+ >>> p_callback_2 = p.addCallbacks(failure=p2)
+ >>> p_callback_3 = p2.addCallbacks(success)
+ >>> res = p() # doctest: +ELLIPSIS
+ failure. [Failure instance: Traceback: exceptions.TypeError...]
+ success! 0
+
+Failing
+-------
+
+Speaking again of failures, it's worth discussing two other aspects of
+failing. One is that partials offer an explicit way to fail a call. It can
+be called when the partial is in PENDING or ACTIVE states. The primary use
+cases for this method are to cancel a partial that is overdue to start, and
+to cancel a partial that was in progress by a worker when the worker died
+(more on that below).
+
+ >>> p = root['p'] = zc.async.partial.Partial(multiply, 5, 2)
+ >>> transaction.commit()
+ >>> p.fail()
+ >>> print p.result.getTraceback() # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
+ Traceback (most recent call last):
+ ...
+ zc.async.interfaces.AbortedError:
+
+`fail` calls all failure callbacks with the failure.
+
+ >>> p = root['p'] = zc.async.partial.Partial(multiply, 5, 2)
+ >>> p_callback = p.addCallbacks(failure=failure)
+ >>> transaction.commit()
+ >>> res = p.fail() # doctest: +ELLIPSIS
+ failure. [Failure instance: Traceback...zc.async.interfaces.AbortedError...]
+
+As seen above, it fails with zc.async.interfaces.AbortedError by default.
+You can also pass in a different error.
+
+ >>> p = root['p'] = zc.async.partial.Partial(multiply, 5, 2)
+ >>> transaction.commit()
+ >>> p.fail(RuntimeError('failed'))
+ >>> print p.result.getTraceback() # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
+ Traceback (most recent call last):
+ ...
+ exceptions.RuntimeError: failed
+
+As mentioned, if a worker dies when working on an active task, the active task
+should be aborted using `fail`, so the method also
+works if a partial is in the ACTIVE state. We'll reach under the covers
+to show this.
+
+ >>> p = root['p'] = zc.async.partial.Partial(multiply, 5, 2)
+ >>> p._state = zc.async.interfaces.ACTIVE
+ >>> transaction.commit()
+ >>> p.fail()
+ >>> print p.result.getTraceback() # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
+ Traceback (most recent call last):
+ ...
+ zc.async.interfaces.AbortedError:
+
+It won't work for failing tasks in COMPLETED or CALLBACKS state.
+
+ >>> p.fail()
+ Traceback (most recent call last):
+ ...
+ BadStateError: can only call fail on a partial in PENDING or ACTIVE states
+ >>> p._state = zc.async.interfaces.CALLBACKS
+ >>> p.fail()
+ Traceback (most recent call last):
+ ...
+ BadStateError: can only call fail on a partial in PENDING or ACTIVE states
+
+Using `resumeCallbacks`
+-----------------------
+
+So `fail` is the proper way to handle an active partial that was being
+worked on by a dead worker, but how does one handle a partial that was in the
+CALLBACKS state? The answer is to use resumeCallbacks. Any partial that is
+still pending will be called; any partial that is active will be failed;
+any partial that is in the middle of calling its own callbacks will have its
+`resumeCallbacks` called; and any partial that is completed will be ignored.
+
+ >>> p = root['p'] = zc.async.partial.Partial(multiply, 5, 2)
+ >>> p._result = 10
+ >>> p._state = zc.async.interfaces.CALLBACKS
+ >>> completed_p = zc.async.partial.Partial(multiply, 3)
+ >>> callbacks_p = zc.async.partial.Partial(multiply, 4)
+ >>> callbacks_p._result = 40
+ >>> callbacks_p._state = zc.async.interfaces.CALLBACKS
+ >>> sub_callbacks_p = callbacks_p.addCallbacks(
+ ... zc.async.partial.Partial(multiply, 2))
+ >>> active_p = zc.async.partial.Partial(multiply, 5)
+ >>> active_p._state = zc.async.interfaces.ACTIVE
+ >>> pending_p = zc.async.partial.Partial(multiply, 6)
+ >>> for _p in completed_p, callbacks_p, active_p, pending_p:
+ ... p.callbacks.put(_p)
+ ...
+ >>> transaction.commit()
+ >>> res = completed_p(10)
+ >>> p.resumeCallbacks()
+ >>> sub_callbacks_p.result
+ 80
+ >>> sub_callbacks_p.state == zc.async.interfaces.COMPLETED
+ True
+ >>> print active_p.result.getTraceback()
+ ... # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
+ Traceback (most recent call last):
+ ...
+ zc.async.interfaces.AbortedError:
+ >>> active_p.state == zc.async.interfaces.COMPLETED
+ True
+ >>> pending_p.result
+ 60
+ >>> pending_p.state == zc.async.interfaces.COMPLETED
+ True
+
+Introspecting and Mutating Arguments
+------------------------------------
+
+Partial arguments can be introspected and mutated.
+
+ >>> p = root['p'] = zc.async.partial.Partial(multiply, 5, 3)
+ >>> transaction.commit()
+ >>> p.args
+ [5, 3]
+ >>> p.kwargs
+ {}
+ >>> p.kwargs['third'] = 2
+ >>> p()
+ 30
+
+This can allow wrapped callables to have a reference to the partial
+itself.
+
+ >>> def show(v):
+ ... print v
+ ...
+ >>> p = root['p'] = zc.async.partial.Partial(show)
+ >>> transaction.commit()
+ >>> p.args.append(p)
+ >>> res = p() # doctest: +ELLIPSIS
+ <zc.async.partial.Partial object at ...>
+
+A class method on Partial, `bind`, can simplify this. It puts the partial as
+the first argument to the callable, as if the callable were bound as a method
+on the partial.
+
+ >>> p = root['p'] = zc.async.partial.Partial.bind(show)
+ >>> transaction.commit()
+ >>> res = p() # doctest: +ELLIPSIS
+ <zc.async.partial.Partial object at ...>
+
+Result and State
+----------------
+
+Partials know about their state, and after a successful call also know
+their result, whether it is a Failure or another value. Possible states are
+the constants in zc.async.interfaces named PENDING, ACTIVE, CALLBACKS, and
+COMPLETED.
+
+ >>> def showState(partial, *ignore):
+ ... state = partial.state
+ ... for nm in 'PENDING', 'ACTIVE', 'CALLBACKS', 'COMPLETED':
+ ... val = getattr(zc.async.interfaces, nm)
+ ... if state == val:
+ ... print nm
+ ...
+ >>> p = root['p'] = zc.async.partial.Partial.bind(showState)
+ >>> transaction.commit()
+ >>> p_callback = p.addCallbacks(zc.async.partial.Partial(showState, p))
+
+ >>> showState(p)
+ PENDING
+ >>> p.result # None
+ >>> res = p()
+ ACTIVE
+ CALLBACKS
+ >>> showState(p)
+ COMPLETED
+
+A partial may only be called when the state is PENDING: calling a
+partial again raises a BadStateError.
+
+ >>> p()
+ Traceback (most recent call last):
+ ...
+ BadStateError: can only call a partial in PENDING state
+
+Other similar restrictions include the following:
+
+- A partial may not call itself [#call_self]_.
+
+- Also, a partial's direct callback may not call the partial
+ [#callback_self]_.
+
+More Partial Introspection
+--------------------------
+
+We've already shown that it is possible to introspect unhandled_error,
+state, result, args, and kwargs. Two other aspects of the basic partial
+functionality are introspectable: callable and callbacks.
+
+The callable is the callable (function or method of a picklable object) that
+the partial will call. You can change it while the partial is in a pending
+state.
+
+ >>> p = root['p'] = zc.async.partial.Partial(multiply, 2)
+ >>> p.callable is multiply
+ True
+ >>> p.callable = root['demo'].increase
+ >>> p.callable == root['demo'].increase
+ True
+ >>> transaction.commit()
+ >>> root['demo'].counter
+ 2
+ >>> res = p()
+ >>> root['demo'].counter
+ 4
+
+The callbacks are a queue of the callbacks added by addCallbacks (or the
+currently experimental and underdocumented addCallback). Currently the
+code may allow for direct mutation of the callbacks, but it is strongly
+suggested that you do not mutate the callbacks, especially not adding them
+except through addCallbacks or addCallback.
+
+ >>> p = root['p'] = zc.async.partial.Partial(multiply, 2, 8)
+ >>> len(p.callbacks)
+ 0
+ >>> p_callback = p.addCallbacks(zc.async.partial.Partial(multiply, 5))
+ >>> len(p.callbacks)
+ 1
+
+When you use addCallbacks, you actually get a callback to your callback,
+for safety reasons. Specifically, when you use addCallbacks, the success
+and failure callbacks are actually arguments to another callback--the result
+of the `addCallbacks` call. If a worker dies while the partial is in
+progress, active argument partials should be cleaned up and will not be
+cleaned up automatically with the logic in `resumeCallbacks` (by design:
+this may not be desired behavior in all cases). Therefore we add a callback
+to the main callback that does this job. We return the subsidiary callback
+so that error handling is calculated more as expected (see the
+`unhandled_error` attribute).
+
+ >>> p.callbacks[0] is p_callback
+ False
+ >>> p.callbacks[0] is p_callback.__parent__
+ True
+
+`addCallback` does not have this characteristic (you are responsible for any
+internal partials, therefore).
+
+ >>> p_callback2 = zc.async.partial.Partial(multiply, 9)
+ >>> p_callback2 is p.addCallback(p_callback2)
+ True
+
+To continue with our example of introspecting the partial...
+
+ >>> len(p.callbacks)
+ 2
+ >>> p.callbacks[1] is p_callback2
+ True
+ >>> transaction.commit()
+ >>> res = p()
+ >>> p.result
+ 16
+ >>> p_callback.result
+ 80
+ >>> p_callback2.result
+ 144
+ >>> len(p.callbacks)
+ 2
+ >>> p.callbacks[0] is p_callback.__parent__
+ True
+ >>> p.callbacks[1] is p_callback2
+ True
+
+The __parent__ attribute should hold the immediate parent of a partial.
+This means that a pending partial will be within a data manager's queue;
+an active partial will be within a worker's queue (which is within a
+worker, which is within a workers container, which is within a data
+manager); and a callback will be within another partial (which may be
+intermediate to the top level partial, in which case __parent__ of the
+intermediate partial is the top level). Here's an example.
+
+ >>> p = root['p'] = zc.async.partial.Partial(multiply, 3, 5)
+ >>> p_callback = zc.async.partial.Partial(multiply, 2)
+ >>> p_callback2 = p.addCallbacks(p_callback)
+ >>> p_callback.__parent__ is p_callback2.__parent__
+ True
+ >>> p_callback2.__parent__.__parent__ is p
+ True
+ >>> transaction.abort()
+
+=========
+Footnotes
+=========
+
+.. [#set_up] We'll actually create the state that the text needs here.
+
+ >>> from ZODB.tests.util import DB
+ >>> db = DB()
+ >>> conn = db.open()
+ >>> root = conn.root()
+
+ You must have two adapter registrations: IConnection to
+ ITransactionManager, and IPersistent to IConnection. We will also
+ register IPersistent to ITransactionManager because the adapter is
+ designed for it.
+
+ >>> from zc.twist import transactionManager, connection
+ >>> import zope.component
+ >>> zope.component.provideAdapter(transactionManager)
+ >>> zope.component.provideAdapter(connection)
+ >>> import ZODB.interfaces
+ >>> zope.component.provideAdapter(
+ ... transactionManager, adapts=(ZODB.interfaces.IConnection,))
+
+ The partial class can be registered as an adapter for
+ functions and methods. It needs to be for expected simple usage of
+ addCallbacks.
+
+ >>> import zope.component
+ >>> import types
+ >>> import zc.async.interfaces
+ >>> zope.component.provideAdapter(
+ ... zc.async.partial.Partial,
+ ... adapts=(types.FunctionType,),
+ ... provides=zc.async.interfaces.IDataManagerPartial)
+ >>> zope.component.provideAdapter(
+ ... zc.async.partial.Partial,
+ ... adapts=(types.MethodType,),
+ ... provides=zc.async.interfaces.IDataManagerPartial)
+
+.. [#verify] Verify interface
+
+ >>> from zope.interface.verify import verifyObject
+ >>> verifyObject(zc.async.interfaces.IPartial, p)
+ True
+
+ Note that state and result are readonly.
+
+ >>> p.state = 1
+ Traceback (most recent call last):
+ ...
+ AttributeError: can't set attribute
+ >>> p.result = 1
+ Traceback (most recent call last):
+ ...
+ AttributeError: can't set attribute
+
+.. [#no_live_frames] Failures have two particularly dangerous bits: the
+ traceback and the stack. We use the __getstate__ code on Failures
+ to clean them up. This makes the traceback (`tb`) None...
+
+ >>> p.result.tb # None
+
+ ...and it makes all of the values in the stack--the locals and
+ globals-- into strings. The stack is a list of lists, in which each
+ internal list represents a frame, and contains five elements: the
+ code name (`f_code.co_name`), the code file (`f_code.co_filename`),
+ the line number (`f_lineno`), an items list of the locals, and an
+ items list for the globals. All of the values in the items list
+ would normally be objects, but are now strings.
+
+ >>> for (codename, filename, lineno, local_i, global_i) in p.result.stack:
+ ... for k, v in local_i:
+ ... assert isinstance(v, basestring), 'bad local %s' % (v,)
+ ... for k, v in global_i:
+ ... assert isinstance(v, basestring), 'bad global %s' % (v,)
+ ...
+
+ Here's a reasonable question. The Twisted Failure code has a
+ __getstate__ that cleans up the failure, and that's even what we are
+ using to sanitize the failure. If the failure is attached to a
+ partial and stored in the ZODB, it is going to be cleaned up anyway.
+ Why explicitly clean up the failure even before it is pickled?
+
+ The answer might be classified as paranoia. Just in case the failure
+ is kept around in memory longer--by being put on a deferred, or somehow
+ otherwise passed around--we want to eliminate any references to objects
+ in the connection as soon as possible.
+
+ Unfortunately, the __getstate__ code in the Twisted Failure can cause
+ some interaction problems for code that has a __repr__ with side effects--
+ like xmlrpclib, unfortunately. The `twist` module has a monkeypatch
+ for that particular problem, thanks to Florent Guillaume at Nuxeo, but
+ others may be discovered.
+
+.. [#explicit_failure_example] As the main text describes, if a call raises
+ an exception, the partial will abort the transaction; but if it
+ returns a failure explicitly, the call is responsible for making any
+ desired changes to the transaction (such as aborting) before the
+ partial calls commit. Compare. Here is a call that raises an
+ exception, and rolls back changes.
+
+ (Note that we are passing arguments to the partial, a topic that has
+ not yet been discussed in the text when this footnote is given: read
+ on a bit in the main text to see the details, if it seems surprising
+ or confusing.)
+
+ >>> def callAndRaise(ob):
+ ... ob.increase()
+ ... print ob.counter
+ ... raise RuntimeError
+ ...
+ >>> p = root['raise_exception_example'] = zc.async.partial.Partial(
+ ... callAndRaise, root['demo'])
+ >>> transaction.commit()
+ >>> root['demo'].counter
+ 1
+ >>> res = p() # this shows the result of the print in `callAndRaise` above.
+ 2
+ >>> root['demo'].counter # it was rolled back
+ 1
+ >>> print p.result.getTraceback() # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
+ Traceback (most recent call last):
+ ...
+ exceptions.RuntimeError:
+
+ Here is a call that returns a failure, and does not abort, even though
+ the partial result looks very similar.
+
+ >>> import twisted.python.failure
+ >>> def returnExplicitFailure(ob):
+ ... ob.increase()
+ ... try:
+ ... raise RuntimeError
+ ... except RuntimeError:
+ ... # we could have just made and returned a failure without the
+ ... # try/except, but this is intended to make crystal clear that
+ ... # exceptions are irrelevant if you catch them and return a
+ ... # failure
+ ... return twisted.python.failure.Failure()
+ ...
+ >>> p = root['explicit_failure_example'] = zc.async.partial.Partial(
+ ... returnExplicitFailure, root['demo'])
+ >>> transaction.commit()
+ >>> res = p()
+ >>> root['demo'].counter # it was not rolled back automatically
+ 2
+ >>> print p.result.getTraceback() # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
+ Traceback (most recent call last):
+ ...
+ exceptions.RuntimeError:
+
+.. [#call_self] Here's a partial trying to call itself.
+
+ >>> def call(obj, *ignore):
+ ... return obj()
+ ...
+ >>> p = root['p'] = zc.async.partial.Partial.bind(call)
+ >>> transaction.commit()
+ >>> res = p()
+ >>> print p.result.getTraceback() # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
+ Traceback (most recent call last):
+ ...
+ zc.async.interfaces.BadStateError: can only call a partial in PENDING state
+
+.. [#callback_self] Here's a partial's callback trying to call the partial.
+
+ >>> p = root['p'] = zc.async.partial.Partial(multiply, 3, 4)
+ >>> p_callback = p.addCallbacks(
+ ... zc.async.partial.Partial(call, p)).addCallbacks(failure=failure)
+ >>> transaction.commit()
+ >>> res = p() # doctest: +ELLIPSIS
+ failure. [Failure instance: Traceback: zc.async.interfaces.BadStateError...]
+ >>> p.result # the main partial still ran to completion
+ 12
Property changes on: zc.async/trunk/src/zc/async/partial.txt
___________________________________________________________________
Name: svn:eol-style
+ native
Added: zc.async/trunk/src/zc/async/partials_and_transactions.txt
===================================================================
--- zc.async/trunk/src/zc/async/partials_and_transactions.txt 2006-08-15 20:31:28 UTC (rev 69534)
+++ zc.async/trunk/src/zc/async/partials_and_transactions.txt 2006-08-15 20:33:11 UTC (rev 69535)
@@ -0,0 +1,294 @@
+This is a document for maintainers and for testing.
+
+Partials manage their own transactions when they are called. In normal
+use, this means that transactions are committed and aborted by the
+partial itself at the points marked "COMMIT" and "ABORT" in this list
+(other software components will make commits, just not the partial):
+
+- client creates a partial, puts it in a queue, and assigns callbacks to it
+ before it is run.
+- a worker claims a partial
+- a worker calls a partial
+- partial changes state to ACTIVE: COMMIT
+- partial runs the wrapped callable, stores the result on its "result"
+ attribute, changes the state to CALLBACKS, and tries to COMMIT.
+ * if there is a ZODB.POSException.TransactionError, abort and retry 5
+ times, after which ABORT, set a Failure on the result attribute,
+ COMMIT, and skip to `complete`_ step below.
+ * if there is a SystemExit, KeyboardInterrupt, or any non-TransactionError
+ ZODB.POSException.POSError (which includes all ZEO-related storage
+ errors) ABORT and raise.
+ * if there are any other exceptions, ABORT, set a Failure on the result
+ attribute, COMMIT, and skip to `complete`_ step below.
+- If the result of the wrapped callable is a partial or Twisted deferred,
+ add a callable for a method that sets the result, sets the state to
+ CALLBACKS, tries to commit as described above, and then proceeds with
+ the `complete`_ step. COMMIT and return.
+- _`complete`: for each callback (which is itself a partial), call it.
+ Each callback partial will commit as described here. The top partial
+ catches no errors while it runs the callbacks.
+- When all callbacks have been called, set state to COMPLETED and COMMIT.
+ if there is a ZODB.POSException.TransactionError, look in the callbacks to
+ see if there is a new one. If there is, perform it and try again; otherwise,
+ retry this forever, logging every time, because this should not happen
+ except in the case of a new additional callback.
+ logging retries: there should be no conflict errors, because no two
+ workers should be touching this partial.
+- If a callback is added to this completed partial, perform the callback
+ and COMMIT. If anything fails, including a ConflictError, just raise it.
+ Someone else should abort as necessary.
+- If a callback is added to a partial in any other state, set the partial's
+ _p_changed to True and commit so that we raise a ConflictError, check the
+ state again, and retry if the partial's state changed while we were
+ checking it.
+
+Note the following:
+- if a partial's wrapped callable returns a failure, that means that it
+ is taking responsiblity for any necessary abort: the partial will still
+ attempt to commit.
+- the state never changes out of COMPLETED even when a new callback is
+ added.
+- __call__ *can* raise a ConflictError; the only known way is to have two
+ workers start the same partial, which should not be possible in normal
+ zc.async usage.
+- addCallbacks may raise a ConflictError: this would happen, for instance,
+ when state is COMPLETED so callbacks are performed immediately.
+
+What could go wrong? In this list "T1" stands for one hypothetical
+thread, and "T2" stands for another hypothetical thread, often
+overlapping in time with T1.
+
+- T1 goes to CALLBACKS state and begins evaluating callbacks. T2 adds another
+ callback [#set_up]_. We need to be careful that the callback is executed.
+
+ >>> import threading
+ >>> _thread_lock = threading.Lock()
+ >>> _main_lock = threading.Lock()
+ >>> called = 0
+ >>> def safe_release(lock):
+ ... while not lock.locked():
+ ... pass
+ ... lock.release()
+ ...
+ >>> def locked_call(res=None):
+ ... global called
+ ... safe_release(_main_lock)
+ ... _thread_lock.acquire()
+ ... called += 1
+ ...
+ >>> def call_from_thread(p):
+ ... id = p._p_oid
+ ... def call():
+ ... conn = db.open()
+ ... p = conn.get(id)
+ ... p()
+ ... return call
+ ...
+ >>> _thread_lock.acquire()
+ True
+ >>> _main_lock.acquire()
+ True
+ >>> import zc.async.partial
+ >>> root['p'] = p = zc.async.partial.Partial(locked_call)
+ >>> p2 = p.addCallbacks(locked_call)
+ >>> import transaction
+ >>> transaction.commit()
+ >>> t = threading.Thread(target=call_from_thread(p))
+ >>> t.start()
+ >>> _main_lock.acquire()
+ True
+ >>> called
+ 0
+ >>> trans = transaction.begin()
+ >>> p.state == zc.async.interfaces.ACTIVE
+ True
+ >>> safe_release(_thread_lock)
+ >>> _main_lock.acquire()
+ True
+ >>> called # the main call
+ 1
+ >>> trans = transaction.begin()
+ >>> p.state == zc.async.interfaces.CALLBACKS
+ True
+ >>> p2 = p.addCallbacks(locked_call)
+ >>> transaction.commit()
+ >>> safe_release(_thread_lock)
+ >>> _main_lock.acquire()
+ True
+ >>> called # call back number one
+ 2
+ >>> safe_release(_thread_lock)
+ >>> safe_release(_thread_lock)
+ >>> while t.isAlive():
+ ... pass
+ ...
+ >>> called # call back number two
+ ... # (added while first callback was in progress)
+ 3
+ >>> _main_lock.release()
+
+- T1 goes to CALLBACKS state. In the split second between checking for
+ any remaining callbacks and changing state to COMPLETED, T2 adds a
+ callback and commits. T1 commits. T2 thinks that callbacks are still
+ being processed, so does not process the callback, but meanwhile the
+ state is being switched to COMPLETED, and the new callback is never
+ made. For this, we could turn off MVCC, but we don't want to do that
+ if we can help it because of efficiency. A better solution is to set
+ _p_changed in T2 on the partial, and commit; if there's a conflict
+ error, re-get the state because its change may have caused the
+ conflict.
+
+ >>> import sys
+ >>> class LockedSetter(object):
+ ... def __init__(self, name, condition, initial=None):
+ ... self.name = name
+ ... self.condition = condition
+ ... self.value = initial
+ ... def __get__(self, obj, typ=None):
+ ... if obj is None:
+ ... return self
+ ... return getattr(obj, '_z_locked_' + self.name, self.value)
+ ... def __set__(self, obj, value):
+ ... if self.condition(obj, value):
+ ... safe_release(_main_lock)
+ ... _thread_lock.acquire()
+ ... setattr(obj, '_z_locked_' + self.name, value)
+ ...
+ >>> import zc.async.partial
+ >>> class Partial(zc.async.partial.Partial):
+ ... _state = LockedSetter(
+ ... '_state',
+ ... lambda o, v: v == zc.async.interfaces.COMPLETED,
+ ... zc.async.interfaces.PENDING)
+ ...
+ >>> called = 0
+ >>> def call(res=None):
+ ... global called
+ ... called += 1
+ ...
+ >>> root['p2'] = p = Partial(call)
+ >>> transaction.commit()
+ >>> _thread_lock.acquire()
+ True
+ >>> _main_lock.acquire()
+ True
+ >>> t = threading.Thread(target=call_from_thread(p))
+ >>> t.start()
+ >>> _main_lock.acquire()
+ True
+ >>> trans = transaction.begin()
+ >>> called
+ 1
+ >>> p.state == zc.async.interfaces.CALLBACKS
+ True
+ >>> p2 = p.addCallbacks(call)
+ >>> transaction.commit()
+ >>> safe_release(_thread_lock)
+ >>> _main_lock.acquire()
+ True
+ >>> trans = transaction.begin()
+ >>> called
+ 2
+ >>> p.state == zc.async.interfaces.CALLBACKS
+ True
+ >>> safe_release(_thread_lock)
+ >>> safe_release(_thread_lock)
+ >>> while t.isAlive():
+ ... pass
+ ...
+ >>> _main_lock.release()
+
+ Note, because of this, addCallbacks can raise a ConflictError: it probably
+ means that the state changed out from under it. Just retry.
+
+- T1 is performing callbacks. T2 begins and adds a callback. T1 changes state
+ to COMPLETED and commits. T2 commits. If we don't handle it carefully,
+ the callback is never called. So we handle it carefully.
+
+ >>> _thread_lock.acquire()
+ True
+ >>> _main_lock.acquire()
+ True
+ >>> called = 0
+ >>> root['p3'] = p = zc.async.partial.Partial(call)
+ >>> p1 = p.addCallbacks(locked_call)
+ >>> transaction.commit()
+ >>> t = threading.Thread(target=call_from_thread(p))
+ >>> t.start()
+ >>> _main_lock.acquire()
+ True
+ >>> called
+ 1
+ >>> trans = transaction.begin()
+ >>> def call_and_unlock(res):
+ ... global called
+ ... called += 1
+ ...
+ >>> p2 = p.addCallbacks(call_and_unlock)
+ >>> safe_release(_thread_lock)
+ >>> safe_release(_thread_lock)
+ >>> while t.isAlive():
+ ... pass
+ ...
+ >>> called # the main call
+ 2
+ >>> transaction.commit() # doctest: +ELLIPSIS
+ Traceback (most recent call last):
+ ...
+ ConflictError: database conflict error (..., class zc.async.partial.Partial)
+ >>> transaction.abort()
+ >>> p2 = p.addCallbacks(call_and_unlock)
+ >>> called
+ 3
+ >>> transaction.commit()
+ >>> _main_lock.release()
+
+- T1 adds a callback to COMPLETED state. It immediately runs the callback.
+ Simultaneously, T2 adds a callback to COMPLETED state. No problem.
+
+- two workers might claim and start the same partial. This should
+ already be stopped by workers committing transactions after they claimed
+ them. This is considered to be a pathological case.
+
+- Generally, if a worker is determined to be dead, and its partials are
+ handed out to other workers, but the worker is actually alive, this can
+ be a serious problem. This is also considered to be a pathological case.
+
+=========
+Footnotes
+=========
+
+.. [#set_up] We'll actually create the state that the text needs here.
+
+ >>> from ZODB.tests.util import DB
+ >>> db = DB()
+ >>> conn = db.open()
+ >>> root = conn.root()
+
+ You must have two adapter registrations: IConnection to
+ ITransactionManager, and IPersistent to IConnection. We will also
+ register IPersistent to ITransactionManager because the adapter is
+ designed for it.
+
+ >>> from zc.twist import transactionManager, connection
+ >>> import zope.component
+ >>> zope.component.provideAdapter(transactionManager)
+ >>> zope.component.provideAdapter(connection)
+ >>> import ZODB.interfaces
+ >>> zope.component.provideAdapter(
+ ... transactionManager, adapts=(ZODB.interfaces.IConnection,))
+
+ We also need to adapt Function and Method to IPartial.
+
+ >>> import zc.async.partial
+ >>> import zc.async.interfaces
+ >>> import zope.component
+ >>> import types
+ >>> zope.component.provideAdapter(
+ ... zc.async.partial.Partial,
+ ... adapts=(types.FunctionType,),
+ ... provides=zc.async.interfaces.IPartial)
+ >>> zope.component.provideAdapter(
+ ... zc.async.partial.Partial,
+ ... adapts=(types.MethodType,),
+ ... provides=zc.async.interfaces.IPartial)
Property changes on: zc.async/trunk/src/zc/async/partials_and_transactions.txt
___________________________________________________________________
Name: svn:eol-style
+ native
Added: zc.async/trunk/src/zc/async/rwproperty.py
===================================================================
--- zc.async/trunk/src/zc/async/rwproperty.py 2006-08-15 20:31:28 UTC (rev 69534)
+++ zc.async/trunk/src/zc/async/rwproperty.py 2006-08-15 20:33:11 UTC (rev 69535)
@@ -0,0 +1,75 @@
+# Read & write properties
+#
+# Copyright (c) 2006 by Philipp "philiKON" von Weitershausen
+# philikon at philikon.de
+#
+# Freely distributable under the terms of the Zope Public License, v2.1.
+#
+# See rwproperty.txt for detailed explanations
+#
+import sys
+
+__all__ = ['getproperty', 'setproperty', 'delproperty']
+
+class rwproperty(object):
+
+ def __new__(cls, func):
+ name = func.__name__
+
+ # ugly, but common hack
+ frame = sys._getframe(1)
+ locals = frame.f_locals
+
+ if name not in locals:
+ return cls.createProperty(func)
+
+ oldprop = locals[name]
+ if isinstance(oldprop, property):
+ return cls.enhanceProperty(oldprop, func)
+
+ raise TypeError("read & write properties cannot be mixed with "
+ "other attributes except regular property objects.")
+
+ # this might not be particularly elegant, but it's easy on the eyes
+
+ @staticmethod
+ def createProperty(func):
+ raise NotImplementedError
+
+ @staticmethod
+ def enhanceProperty(oldprop, func):
+ raise NotImplementedError
+
+class getproperty(rwproperty):
+
+ @staticmethod
+ def createProperty(func):
+ return property(func)
+
+ @staticmethod
+ def enhanceProperty(oldprop, func):
+ return property(func, oldprop.fset, oldprop.fdel)
+
+class setproperty(rwproperty):
+
+ @staticmethod
+ def createProperty(func):
+ return property(None, func)
+
+ @staticmethod
+ def enhanceProperty(oldprop, func):
+ return property(oldprop.fget, func, oldprop.fdel)
+
+class delproperty(rwproperty):
+
+ @staticmethod
+ def createProperty(func):
+ return property(None, None, func)
+
+ @staticmethod
+ def enhanceProperty(oldprop, func):
+ return property(oldprop.fget, oldprop.fset, func)
+
+if __name__ == "__main__":
+ import doctest
+ doctest.testfile('rwproperty.txt')
Property changes on: zc.async/trunk/src/zc/async/rwproperty.py
___________________________________________________________________
Name: svn:eol-style
+ native
Added: zc.async/trunk/src/zc/async/rwproperty.txt
===================================================================
--- zc.async/trunk/src/zc/async/rwproperty.txt 2006-08-15 20:31:28 UTC (rev 69534)
+++ zc.async/trunk/src/zc/async/rwproperty.txt 2006-08-15 20:33:11 UTC (rev 69535)
@@ -0,0 +1,140 @@
+Read & write properties
+========================
+
+:Author: Philipp von Weitershausen
+:Email: philikon at philikon.de
+:License: Zope Public License, v2.1
+
+Motivation
+----------
+
+Using method decorators and descriptors like ``property``, we can
+easily create computed attributes:
+
+ >>> class JamesBrown(object):
+ ... @property
+ ... def feel(self):
+ ... return self._feel
+
+An attribute like this cannot be written, though. You would have to
+do something like this:
+
+ >>> class JamesBrown(object):
+ ... def _getFeel(self):
+ ... return self._feel
+ ... def _setFeel(self, feel):
+ ... self._feel = feel
+ ... feel = property(_getFeel, _setFeel)
+
+The problem with this approach is that it leaves the getter and setter
+sitting around in the class namespace. It also lacks the compact
+spelling of a decorator solution. To cope with that, some people like
+to write:
+
+ >>> class JamesBrown(object):
+ ... @apply
+ ... def feel():
+ ... def get(self):
+ ... return self._feel
+ ... def set(self, feel):
+ ... self._feel = feel
+ ... return property(get, set)
+
+This spelling feels rather cumbersome, apart from the fact that
+``apply`` is `going to go away`_ in Python 3000.
+
+.. _going to go away: http://www.python.org/peps/pep-3000.html#id24
+
+
+Goal
+----
+
+There should be a way to declare a read & write property and still use
+the compact and easy decorator spelling. The read & write properties
+should be as easy to use as the read-only property. We explicitly
+don't want that immediately called function that really just helps us
+name the attribute and create a local scope for the getter and setter.
+
+
+Read & write property
+---------------------
+
+Read & write properties work like regular properties. You simply
+define a method and then apply a decorator, except that you now don't
+use ``@property`` but ``@getproperty`` to mark the getter and
+``@setproperty`` to mark the setter:
+
+ >>> from rwproperty import getproperty, setproperty
+ >>> class JamesBrown(object):
+ ... @getproperty
+ ... def feel(self):
+ ... return self._feel
+ ... @setproperty
+ ... def feel(self, feel):
+ ... self._feel = feel
+
+ >>> i = JamesBrown()
+ >>> i.feel
+ Traceback (most recent call last):
+ ...
+ AttributeError: 'JamesBrown' object has no attribute '_feel'
+
+ >>> i.feel = "good"
+ >>> i.feel
+ 'good'
+
+The order in which getters and setters are declared doesn't matter:
+
+ >>> from rwproperty import getproperty, setproperty
+ >>> class JamesBrown(object):
+ ... @setproperty
+ ... def feel(self, feel):
+ ... self._feel = feel
+ ... @getproperty
+ ... def feel(self):
+ ... return self._feel
+
+ >>> i = JamesBrown()
+ >>> i.feel = "good"
+ >>> i.feel
+ 'good'
+
+Of course, deleters are also possible:
+
+ >>> from rwproperty import delproperty
+ >>> class JamesBrown(object):
+ ... @setproperty
+ ... def feel(self, feel):
+ ... self._feel = feel
+ ... @getproperty
+ ... def feel(self):
+ ... return self._feel
+ ... @delproperty
+ ... def feel(self):
+ ... del self._feel
+
+ >>> i = JamesBrown()
+ >>> i.feel = "good"
+ >>> del i.feel
+ >>> i.feel
+ Traceback (most recent call last):
+ ...
+ AttributeError: 'JamesBrown' object has no attribute '_feel'
+
+
+Edge cases
+----------
+
+There might be a case where you're using a flavour of read & write
+properties and already have a non-property attribute of the same name
+defined:
+
+ >>> class JamesBrown(object):
+ ... feel = "good"
+ ... @getproperty
+ ... def feel(self):
+ ... return "so good"
+ ...
+ Traceback (most recent call last):
+ ...
+ TypeError: read & write properties cannot be mixed with other attributes except regular property objects.
Property changes on: zc.async/trunk/src/zc/async/rwproperty.txt
___________________________________________________________________
Name: svn:eol-style
+ native
Added: zc.async/trunk/src/zc/async/subscribers.py
===================================================================
--- zc.async/trunk/src/zc/async/subscribers.py 2006-08-15 20:31:28 UTC (rev 69534)
+++ zc.async/trunk/src/zc/async/subscribers.py 2006-08-15 20:33:11 UTC (rev 69535)
@@ -0,0 +1,94 @@
+import os
+import transaction
+import transaction.interfaces
+import ZODB.interfaces
+import twisted.internet.reactor
+import zope.component
+import zope.event
+import zope.app.appsetup.interfaces
+import zc.twist
+
+import zc.async.datamanager
+import zc.async.interfaces
+import zc.async.engine
+
+NAME = 'zc.async.datamanager'
+
+class InstallerAndNotifier(object):
+
+ def __init__(self, name=NAME,
+ factory=lambda *args: zc.async.datamanager.DataManager(),
+ get_folder=lambda r: r):
+ zope.component.adapter(
+ zope.app.appsetup.interfaces.IDatabaseOpenedEvent)(self)
+ self.name = name
+ self.factory = factory
+ self.get_folder = get_folder
+
+ def __call__(self, ev):
+ db = ev.database
+ tm = transaction.TransactionManager()
+ conn = db.open(transaction_manager=tm)
+ tm.begin()
+ try:
+ try:
+ root = conn.root()
+ folder = self.get_folder(root)
+ tm.commit()
+ if self.name not in folder:
+ folder[self.name] = self.factory(conn, folder)
+ if folder[self.name]._p_jar is None:
+ conn.add(folder[self.name])
+ elif not zc.async.interfaces.IDataManager.providedBy(
+ folder[self.name]):
+ raise RuntimeError(
+ 'IDataManager not found') # TODO better error
+ zope.event.notify(
+ zc.async.interfaces.DataManagerAvailable(folder[self.name]))
+ tm.commit()
+ except:
+ tm.abort()
+ raise
+ finally:
+ conn.close()
+
+basicInstallerAndNotifier = InstallerAndNotifier()
+
+class SeparateDBCreation(object):
+ def __init__(self, db_name='zc.async', name=NAME,
+ factory=zc.async.datamanager.DataManager,
+ get_folder=lambda r:r):
+ self.db_name = db_name
+ self.name = name
+ self.factory = factory
+ self.get_folder = get_folder
+
+ def __call__(self, conn, folder):
+ conn2 = conn.get_connection(self.db_name)
+ tm = transaction.interfaces.ITransactionManager(conn)
+ root = conn2.root()
+ folder = self.get_folder(root)
+ tm.commit()
+ if self.name in folder:
+ raise ValueError('data manager already exists in separate database',
+ self.db_name, folder, self.name)
+ dm = folder[self.name] = self.factory()
+ conn2.add(dm)
+ tm.commit()
+ return dm
+
+installerAndNotifier = InstallerAndNotifier(factory=SeparateDBCreation())
+
+ at zope.component.adapter(zc.async.interfaces.IDataManagerAvailableEvent)
+def installTwistedEngine(ev):
+ engine = zc.async.engine.Engine(
+ zope.component.getUtility(
+ zc.async.interfaces.IUUID, 'instance'),
+ zc.async.datamanager.Worker)
+ dm = ev.object
+ twisted.internet.reactor.callLater(
+ 0,
+ zc.twist.Partial(engine.poll, dm))
+ twisted.internet.reactor.addSystemEventTrigger(
+ 'before', 'shutdown', zc.twist.Partial(
+ engine.tearDown, dm))
Property changes on: zc.async/trunk/src/zc/async/subscribers.py
___________________________________________________________________
Name: svn:eol-style
+ native
Added: zc.async/trunk/src/zc/async/tests.py
===================================================================
--- zc.async/trunk/src/zc/async/tests.py 2006-08-15 20:31:28 UTC (rev 69534)
+++ zc.async/trunk/src/zc/async/tests.py 2006-08-15 20:33:11 UTC (rev 69535)
@@ -0,0 +1,130 @@
+import os
+import shutil
+import unittest
+
+from zope.testing import doctest, module
+import zope.component
+import zope.component.testing
+import zope.component.eventtesting
+import zc.async.partial
+import zc.async.subscribers
+
+def modSetUp(test):
+ zope.component.testing.setUp(test)
+ module.setUp(test, 'zc.async.doctest_test')
+
+def modTearDown(test):
+ module.tearDown(test)
+ zope.component.testing.tearDown(test)
+
+def uuidSetUp(test):
+ test.globs['old_instance_home'] = os.environ.get("INSTANCE_HOME")
+ os.environ['INSTANCE_HOME'] = os.path.join(os.path.dirname(
+ zc.async.interfaces.__file__), '_test_tmp')
+ os.mkdir(os.environ['INSTANCE_HOME'])
+ os.mkdir(os.path.join(os.environ['INSTANCE_HOME'], 'etc'))
+
+def uuidTearDown(test):
+ shutil.rmtree(os.environ['INSTANCE_HOME'])
+ if test.globs['old_instance_home'] is None:
+ del os.environ['INSTANCE_HOME']
+ else:
+ os.environ['INSTANCE_HOME'] = test.globs['old_instance_home']
+ del test.globs['old_instance_home']
+
+def readmeSetUp(test):
+ modSetUp(test)
+ uuidSetUp(test)
+ zope.component.eventtesting.setUp(test)
+ test.globs['installerAndNotifier'] = (
+ zc.async.subscribers.basicInstallerAndNotifier)
+ from zc.async import instanceuuid
+ instanceuuid.UUID = instanceuuid.getUUID()
+ zope.component.provideUtility(instanceuuid.UUID, name='instance')
+
+def altReadmeSetUp(test):
+ modSetUp(test)
+ uuidSetUp(test)
+ zope.component.eventtesting.setUp(test)
+ test.globs['installerAndNotifier'] = (
+ zc.async.subscribers.installerAndNotifier)
+ from zc.async import instanceuuid
+ instanceuuid.UUID = instanceuuid.getUUID()
+ zope.component.provideUtility(instanceuuid.UUID, name='instance')
+
+def readmeTearDown(test):
+ r = test.globs.get('faux')
+ if r:
+ for when, eventname, callable in r.triggers:
+ if eventname == 'shutdown': # test didn't run to completion
+ # let's clean up
+ callable()
+ uuidTearDown(test)
+ modTearDown(test)
+
+def test_instanceuuid():
+ """This module provides access to a UUID that is intended to uniquely
+ identify this software instance. Read the `msg` value below for more
+ information.
+
+ The uuid is generated and then stashed in a file. It only works if
+ the INSTANCE_HOME environment variable is set to a folder that has an
+ `etc` folder in it--a standard Zope set up. For this test, we mock it
+ up in uuidSetUp and uuidTearDown below.
+
+ >>> import zc.async.instanceuuid
+ >>> import uuid
+ >>> isinstance(zc.async.instanceuuid.UUID, uuid.UUID)
+ True
+ >>> (zc.async.instanceuuid.getUUID() ==
+ ... zc.async.instanceuuid.UUID ==
+ ... zc.async.instanceuuid.getUUID())
+ True
+
+ uuid.UUIDs now provide zc.async.interfaces.IUUID
+
+ >>> import zc.async.interfaces
+ >>> zc.async.interfaces.IUUID.implementedBy(uuid.UUID)
+ True
+ >>> zc.async.interfaces.IUUID.providedBy(zc.async.instanceuuid.UUID)
+ True
+
+ That's a bit invasive, but now you can register the instance UUID as
+ a utility and get it back out as something that provides
+ zc.async.interfaces.IUUID.
+
+ >>> import zope.component
+ >>> zope.component.provideUtility(
+ ... zc.async.instanceuuid.UUID, name='instance')
+ >>> id = zope.component.getUtility(
+ ... zc.async.interfaces.IUUID, 'instance')
+ >>> id is zc.async.instanceuuid.UUID
+ True
+
+ (Unfortunately you can't register a utility to provide a class, or I
+ would have done that...though maybe that's not unfortunate :-) )
+
+ """
+
+def test_suite():
+ return unittest.TestSuite((
+ doctest.DocTestSuite(setUp=uuidSetUp, tearDown=uuidTearDown),
+ doctest.DocFileSuite(
+ 'partial.txt',
+ 'partials_and_transactions.txt',
+ 'datamanager.txt',
+ setUp=modSetUp, tearDown=modTearDown,
+ optionflags=doctest.INTERPRET_FOOTNOTES),
+ doctest.DocFileSuite(
+ 'README.txt',
+ setUp=readmeSetUp, tearDown=readmeTearDown,
+ optionflags=doctest.INTERPRET_FOOTNOTES),
+ doctest.DocFileSuite(
+ 'README.txt',
+ setUp=altReadmeSetUp, tearDown=readmeTearDown,
+ optionflags=doctest.INTERPRET_FOOTNOTES),
+ ))
+
+
+if __name__ == '__main__':
+ unittest.main(defaultTest='test_suite')
Property changes on: zc.async/trunk/src/zc/async/tests.py
___________________________________________________________________
Name: svn:eol-style
+ native
More information about the Checkins
mailing list