[Checkins] SVN: zc.async/trunk/src/zc/async/ Make tests more robust
across platforms. Small changes to engine module.
Gary Poster
gary at zope.com
Thu Aug 17 21:37:26 EDT 2006
Log message for revision 69630:
Make tests more robust across platforms. Small changes to engine module.
Changed:
U zc.async/trunk/src/zc/async/README.txt
U zc.async/trunk/src/zc/async/engine.py
-=-
Modified: zc.async/trunk/src/zc/async/README.txt
===================================================================
--- zc.async/trunk/src/zc/async/README.txt 2006-08-17 23:23:19 UTC (rev 69629)
+++ zc.async/trunk/src/zc/async/README.txt 2006-08-18 01:37:24 UTC (rev 69630)
@@ -356,7 +356,8 @@
asynchronously, make the callback with a partial. The partial will get
a reference to the data_manager used by the main partial. It can create a
partial, assign it to one of the data manager queues, and return the partial.
-Consider the following.
+Consider the following (we use a `resolve` function to let all of the pending
+calls resolve before the example proceeds [#resolve]_).
>>> def multiply(*args):
... res = 1
@@ -373,14 +374,7 @@
>>> p_callback = p.addCallbacks(
... zc.async.partial.Partial.bind(doCallbackWithPartial))
>>> transaction.commit()
- >>> import time
- >>> for i in range(100):
- ... ignore = time_flies(5)
- ... time.sleep(0)
- ... t = transaction.begin()
- ... if p_callback.state == zc.async.interfaces.COMPLETED:
- ... break
- ...
+ >>> resolve(p_callback)
>>> p.result
12
>>> p.state == zc.async.interfaces.COMPLETED
@@ -412,6 +406,7 @@
... partial.annotations[annotation_key] = value
...
>>> import threading
+ >>> import sys
>>> thread_lock = threading.Lock()
>>> main_lock = threading.Lock()
>>> acquired = thread_lock.acquire()
@@ -440,13 +435,7 @@
>>> p.state == zc.async.interfaces.ACTIVE
True
>>> thread_lock.release()
- >>> for i in range(100):
- ... ignore = time_flies(5)
- ... time.sleep(0)
- ... t = transaction.begin()
- ... if p.state == zc.async.interfaces.COMPLETED:
- ... break
- ...
+ >>> resolve(p)
>>> p.result
42
>>> thread_lock.release()
@@ -559,12 +548,13 @@
.. [#setup] This is a bit more than standard set-up code for a ZODB test,
because it sets up a multi-database.
- >>> from ZODB.tests.util import DB # use conflict resolution test one XXX
+ >>> from zc.queue.tests import ConflictResolvingMappingStorage
+ >>> from ZODB import DB
>>> class Factory(object):
... def __init__(self, name):
... self.name = name
... def open(self):
- ... return DB()
+ ... return DB(ConflictResolvingMappingStorage('test'))
...
>>> import zope.app.appsetup.appsetup
>>> db = zope.app.appsetup.appsetup.multi_database(
@@ -678,22 +668,32 @@
... self._lock.release()
... def addSystemEventTrigger(self, *args):
... self.triggers.append(args) # 'before', 'shutdown', callable
+ ... def _get_next(self, end):
+ ... self._lock.acquire()
+ ... try:
+ ... if self.calls and self.calls[0][0] <= end:
+ ... return self.calls.pop(0)
+ ... finally:
+ ... self._lock.release()
... def time_flies(self, time):
... global _now
... end = self.time + time
... ct = 0
- ... while self.calls and self.calls[0][0] <= end:
- ... self.time, callable, args, kw = self.calls.pop(0)
+ ... next = self._get_next(end)
+ ... while next is not None:
+ ... self.time, callable, args, kw = next
... _now = _datetime(
... *(_start + datetime.timedelta(
... seconds=self.time)).__reduce__()[1])
... callable(*args, **kw) # normally this would get try...except
... ct += 1
+ ... next = self._get_next(end)
... self.time = end
... return ct
... def time_passes(self):
- ... if self.calls and self.calls[0][0] <= self.time:
- ... self.time, callable, args, kw = self.calls.pop(0)
+ ... next = self._get_next(self.time)
+ ... if next is not None:
+ ... self.time, callable, args, kw = next
... callable(*args, **kw)
... return True
... return False
@@ -723,6 +723,26 @@
will be called with no arguments, so you must supply all necessary
arguments for the callable on creation time.
+.. [#resolve]
+
+ >>> import time
+ >>> import ZODB.POSException
+ >>> def resolve(p):
+ ... for i in range(100):
+ ... t = transaction.begin()
+ ... ignore = time_flies(5)
+ ... time.sleep(0)
+ ... t = transaction.begin()
+ ... try:
+ ... if (len(dm.thread) == 0 and
+ ... len(dm.workers.values()[0].thread) == 0 and
+ ... p.state == zc.async.interfaces.COMPLETED):
+ ... break
+ ... except ZODB.POSException.ReadConflictError:
+ ... pass
+ ... else:
+ ... print 'Timed out'
+
.. [#tear_down]
>>> twisted.internet.reactor.callLater = oldCallLater
@@ -730,3 +750,12 @@
>>> twisted.internet.reactor.addSystemEventTrigger = (
... oldAddSystemEventTrigger)
>>> datetime.datetime = old_datetime
+ >>> import zc.async.engine
+ >>> engine = zc.async.engine.engines[worker.UUID]
+ >>> while 1: # make sure all the threads are dead before we close down
+ ... for t in engine._threads:
+ ... if t.isAlive():
+ ... break
+ ... else:
+ ... break
+ ...
Modified: zc.async/trunk/src/zc/async/engine.py
===================================================================
--- zc.async/trunk/src/zc/async/engine.py 2006-08-17 23:23:19 UTC (rev 69629)
+++ zc.async/trunk/src/zc/async/engine.py 2006-08-18 01:37:24 UTC (rev 69630)
@@ -20,6 +20,8 @@
p()
p.addCallback(zc.async.partial.Partial(remove, p.__parent__, p))
+engines = {}
+
class Engine(object):
# this intentionally does not have an interface. It would be nicer if this
# could be a Twisted service, part of the main Zope service, but that does
@@ -38,6 +40,8 @@
self._threads = []
self.UUID = uuid.uuid4() # this is supposed to distinguish this engine
# instance from any others potentially wanting to work on the worker.
+ assert UUID not in engines
+ engines[UUID] = self
def perform_thread(self):
try:
@@ -109,8 +113,16 @@
now = datetime.datetime.now(pytz.UTC)
worker = datamanager.workers.get(self.workerUUID)
if worker is not None:
- if (worker.engineUUID is not None and
- worker.engineUUID != self.UUID):
+ if worker.engineUUID is None:
+ worker.engineUUID = self.UUID
+ try:
+ tm.commit()
+ except ZODB.POSException.TransactionError:
+ # uh-oh. Somebody else may be adding a worker for the
+ # same UUID. we'll just return for now, and figure that
+ # the next go-round will report the problem.
+ return # will call finally clause, including abort
+ elif worker.engineUUID != self.UUID:
# uh-oh. Maybe another engine is in on the action?
time_of_death = (worker.last_ping + worker.ping_interval
+ worker.ping_death_interval)
@@ -118,6 +130,14 @@
# hm. Looks like it's dead.
zc.async.datamanager.cleanDeadWorker(worker)
worker.engineUUID = self.UUID
+ try:
+ tm.commit()
+ except ZODB.POSException.TransactionError:
+ # uh-oh. Somebody else may be adding a worker for
+ # the same UUID. we'll just return for now, and
+ # figure that the next go-round will report the
+ # problem.
+ return # will call finally clause, including abort
else:
# this is some other engine's UUID,
# and it isn't dead (yet?). Houston, we have a problem.
@@ -132,16 +152,6 @@
interval.days, interval.seconds,
interval.microseconds)
return # which will call the finally clause
- else:
- worker.engineUUID = self.UUID
- try:
- tm.commit()
- except ZODB.POSException.TransactionError:
- tm.abort()
- # uh-oh. Somebody else may be adding a worker for the
- # same UUID. we'll just return for now, and figure that
- # the next go-round will report the problem.
- return # will call finally clause
else:
worker = self.factory(self.workerUUID)
datamanager.workers.add(worker)
@@ -149,18 +159,17 @@
try:
tm.commit()
except ZODB.POSException.TransactionError:
- tm.abort()
# uh-oh. Somebody else may be adding a worker for the
# same UUID. we'll just return for now, and figure that
# the next go-round will report the problem.
- return # will call finally clause
+ return # will call finally clause, including abort
poll_seconds = worker.poll_seconds
datamanager.checkSibling(worker.UUID)
try:
tm.commit()
except ZODB.POSException.TransactionError:
tm.abort()
- # we'll retry next poll.
+ # we'll retry next poll. Let's keep going.
if (worker.completed.last_rotation +
worker.completed.rotation_interval) <= now:
worker.completed.rotate()
@@ -168,14 +177,13 @@
tm.commit()
except ZODB.POSException.TransactionError:
tm.abort()
- # we'll retry next poll.
+ # we'll retry next poll. Keep going.
if worker.last_ping + worker.ping_interval <= now:
worker.last_ping = now
try:
tm.commit()
except ZODB.POSException.TransactionError:
# uh-oh: are there two engines working with the same worker?
- tm.abort() # and retry next time
logging.error(
"Transaction error for worker %s. This should not "
"happen.", self.workerUUID)
@@ -219,7 +227,6 @@
tm.commit()
except ZODB.POSException.TransactionError:
# uh-oh: are there two engines working with the same worker?
- tm.abort() # and retry next time
logging.error(
"Transaction error for worker %s. This should not "
"happen.", self.workerUUID)
More information about the Checkins
mailing list