[Checkins] SVN: zc.async/trunk/src/zc/async/ Make tests more robust across platforms. Small changes to engine module.

Gary Poster gary at zope.com
Thu Aug 17 21:37:26 EDT 2006


Log message for revision 69630:
  Make tests more robust across platforms.  Small changes to engine module.
  
  

Changed:
  U   zc.async/trunk/src/zc/async/README.txt
  U   zc.async/trunk/src/zc/async/engine.py

-=-
Modified: zc.async/trunk/src/zc/async/README.txt
===================================================================
--- zc.async/trunk/src/zc/async/README.txt	2006-08-17 23:23:19 UTC (rev 69629)
+++ zc.async/trunk/src/zc/async/README.txt	2006-08-18 01:37:24 UTC (rev 69630)
@@ -356,7 +356,8 @@
 asynchronously, make the callback with a partial.  The partial will get
 a reference to the data_manager used by the main partial.  It can create a
 partial, assign it to one of the data manager queues, and return the partial.
-Consider the following.
+Consider the following (we use a `resolve` function to let all of the pending
+calls resolve before the example proceeds [#resolve]_).
 
     >>> def multiply(*args):
     ...     res = 1
@@ -373,14 +374,7 @@
     >>> p_callback = p.addCallbacks(
     ...     zc.async.partial.Partial.bind(doCallbackWithPartial))
     >>> transaction.commit()
-    >>> import time
-    >>> for i in range(100):
-    ...     ignore = time_flies(5)
-    ...     time.sleep(0)
-    ...     t = transaction.begin()
-    ...     if p_callback.state == zc.async.interfaces.COMPLETED:
-    ...         break
-    ...
+    >>> resolve(p_callback)
     >>> p.result
     12
     >>> p.state == zc.async.interfaces.COMPLETED
@@ -412,6 +406,7 @@
     ...     partial.annotations[annotation_key] = value
     ...
     >>> import threading
+    >>> import sys
     >>> thread_lock = threading.Lock()
     >>> main_lock = threading.Lock()
     >>> acquired = thread_lock.acquire()
@@ -440,13 +435,7 @@
     >>> p.state == zc.async.interfaces.ACTIVE
     True
     >>> thread_lock.release()
-    >>> for i in range(100):
-    ...     ignore = time_flies(5)
-    ...     time.sleep(0)
-    ...     t = transaction.begin()
-    ...     if p.state == zc.async.interfaces.COMPLETED:
-    ...         break
-    ...
+    >>> resolve(p)
     >>> p.result
     42
     >>> thread_lock.release()
@@ -559,12 +548,13 @@
 .. [#setup] This is a bit more than standard set-up code for a ZODB test,
     because it sets up a multi-database.
 
-    >>> from ZODB.tests.util import DB # use conflict resolution test one XXX
+    >>> from zc.queue.tests import ConflictResolvingMappingStorage
+    >>> from ZODB import DB
     >>> class Factory(object):
     ...     def __init__(self, name):
     ...         self.name = name
     ...     def open(self):
-    ...         return DB()
+    ...         return DB(ConflictResolvingMappingStorage('test'))
     ...
     >>> import zope.app.appsetup.appsetup
     >>> db = zope.app.appsetup.appsetup.multi_database(
@@ -678,22 +668,32 @@
     ...         self._lock.release()
     ...     def addSystemEventTrigger(self, *args):
     ...         self.triggers.append(args) # 'before', 'shutdown', callable
+    ...     def _get_next(self, end):
+    ...         self._lock.acquire()
+    ...         try:
+    ...             if self.calls and self.calls[0][0] <= end:
+    ...                 return self.calls.pop(0)
+    ...         finally:
+    ...             self._lock.release()
     ...     def time_flies(self, time):
     ...         global _now
     ...         end = self.time + time
     ...         ct = 0
-    ...         while self.calls and self.calls[0][0] <= end:
-    ...             self.time, callable, args, kw = self.calls.pop(0)
+    ...         next = self._get_next(end)
+    ...         while next is not None:
+    ...             self.time, callable, args, kw = next
     ...             _now = _datetime(
     ...                 *(_start + datetime.timedelta(
     ...                     seconds=self.time)).__reduce__()[1])
     ...             callable(*args, **kw) # normally this would get try...except
     ...             ct += 1
+    ...             next = self._get_next(end)
     ...         self.time = end
     ...         return ct
     ...     def time_passes(self):
-    ...         if self.calls and self.calls[0][0] <= self.time:
-    ...             self.time, callable, args, kw = self.calls.pop(0)
+    ...         next = self._get_next(self.time)
+    ...         if next is not None:
+    ...             self.time, callable, args, kw = next
     ...             callable(*args, **kw)
     ...             return True
     ...         return False
@@ -723,6 +723,26 @@
     will be called with no arguments, so you must supply all necessary
     arguments for the callable on creation time.
 
+.. [#resolve]
+
+    >>> import time
+    >>> import ZODB.POSException
+    >>> def resolve(p):
+    ...     for i in range(100):
+    ...         t = transaction.begin()
+    ...         ignore = time_flies(5)
+    ...         time.sleep(0)
+    ...         t = transaction.begin()
+    ...         try:
+    ...             if (len(dm.thread) == 0 and
+    ...                 len(dm.workers.values()[0].thread) == 0 and
+    ...                 p.state == zc.async.interfaces.COMPLETED): 
+    ...                 break
+    ...         except ZODB.POSException.ReadConflictError:
+    ...             pass
+    ...     else:
+    ...         print 'Timed out'
+
 .. [#tear_down]
 
     >>> twisted.internet.reactor.callLater = oldCallLater
@@ -730,3 +750,12 @@
     >>> twisted.internet.reactor.addSystemEventTrigger = (
     ...     oldAddSystemEventTrigger)
     >>> datetime.datetime = old_datetime
+    >>> import zc.async.engine
+    >>> engine = zc.async.engine.engines[worker.UUID]
+    >>> while 1: # make sure all the threads are dead before we close down
+    ...     for t in engine._threads:
+    ...         if t.isAlive():
+    ...             break
+    ...     else:
+    ...         break
+    ...

Modified: zc.async/trunk/src/zc/async/engine.py
===================================================================
--- zc.async/trunk/src/zc/async/engine.py	2006-08-17 23:23:19 UTC (rev 69629)
+++ zc.async/trunk/src/zc/async/engine.py	2006-08-18 01:37:24 UTC (rev 69630)
@@ -20,6 +20,8 @@
     p()
     p.addCallback(zc.async.partial.Partial(remove, p.__parent__, p))
 
+engines = {}
+
 class Engine(object):
     # this intentionally does not have an interface.  It would be nicer if this
     # could be a Twisted service, part of the main Zope service, but that does
@@ -38,6 +40,8 @@
         self._threads = []
         self.UUID = uuid.uuid4() # this is supposed to distinguish this engine
         # instance from any others potentially wanting to work on the worker.
+        assert UUID not in engines
+        engines[UUID] = self
 
     def perform_thread(self):
         try:
@@ -109,8 +113,16 @@
             now = datetime.datetime.now(pytz.UTC)
             worker = datamanager.workers.get(self.workerUUID)
             if worker is not None:
-                if (worker.engineUUID is not None and
-                    worker.engineUUID != self.UUID):
+                if worker.engineUUID is None:
+                    worker.engineUUID = self.UUID
+                    try:
+                        tm.commit()
+                    except ZODB.POSException.TransactionError:
+                        # uh-oh.  Somebody else may be adding a worker for the
+                        # same UUID.  we'll just return for now, and figure that
+                        # the next go-round will report the problem.
+                        return # will call finally clause, including abort
+                elif worker.engineUUID != self.UUID:
                     # uh-oh.  Maybe another engine is in on the action?
                     time_of_death = (worker.last_ping + worker.ping_interval
                                      + worker.ping_death_interval)
@@ -118,6 +130,14 @@
                         # hm.  Looks like it's dead.
                         zc.async.datamanager.cleanDeadWorker(worker)
                         worker.engineUUID = self.UUID
+                        try:
+                            tm.commit()
+                        except ZODB.POSException.TransactionError:
+                            # uh-oh.  Somebody else may be adding a worker for
+                            # the same UUID.  we'll just return for now, and
+                            # figure that the next go-round will report the
+                            # problem.
+                            return # will call finally clause, including abort
                     else:
                         # this is some other engine's UUID,
                         # and it isn't dead (yet?).  Houston, we have a problem.
@@ -132,16 +152,6 @@
                             interval.days, interval.seconds,
                             interval.microseconds)
                         return # which will call the finally clause
-                else:
-                    worker.engineUUID = self.UUID
-                try:
-                    tm.commit()
-                except ZODB.POSException.TransactionError:
-                    tm.abort()
-                    # uh-oh.  Somebody else may be adding a worker for the
-                    # same UUID.  we'll just return for now, and figure that
-                    # the next go-round will report the problem.
-                    return # will call finally clause
             else:
                 worker = self.factory(self.workerUUID)
                 datamanager.workers.add(worker)
@@ -149,18 +159,17 @@
                 try:
                     tm.commit()
                 except ZODB.POSException.TransactionError:
-                    tm.abort()
                     # uh-oh.  Somebody else may be adding a worker for the
                     # same UUID.  we'll just return for now, and figure that
                     # the next go-round will report the problem.
-                    return # will call finally clause
+                    return # will call finally clause, including abort
             poll_seconds = worker.poll_seconds
             datamanager.checkSibling(worker.UUID)
             try:
                 tm.commit()
             except ZODB.POSException.TransactionError:
                 tm.abort()
-                # we'll retry next poll.
+                # we'll retry next poll.  Let's keep going.
             if (worker.completed.last_rotation +
                 worker.completed.rotation_interval) <= now:
                 worker.completed.rotate()
@@ -168,14 +177,13 @@
                     tm.commit()
                 except ZODB.POSException.TransactionError:
                     tm.abort()
-                    # we'll retry next poll.
+                    # we'll retry next poll.  Keep going.
             if worker.last_ping + worker.ping_interval <= now:
                 worker.last_ping = now
                 try:
                     tm.commit()
                 except ZODB.POSException.TransactionError:
                     # uh-oh: are there two engines working with the same worker?
-                    tm.abort() # and retry next time
                     logging.error(
                         "Transaction error for worker %s.  This should not "
                         "happen.", self.workerUUID)
@@ -219,7 +227,6 @@
                     tm.commit()
                 except ZODB.POSException.TransactionError:
                     # uh-oh: are there two engines working with the same worker?
-                    tm.abort() # and retry next time
                     logging.error(
                         "Transaction error for worker %s.  This should not "
                         "happen.", self.workerUUID)



More information about the Checkins mailing list