[Checkins] SVN: zc.async/trunk/src/zc/async/ move reusable testing functions out to to zc.async.testing

Gary Poster gary at zope.com
Thu Apr 24 16:56:47 EDT 2008


Log message for revision 85706:
  move reusable testing functions out to to zc.async.testing

Changed:
  U   zc.async/trunk/src/zc/async/README.txt
  U   zc.async/trunk/src/zc/async/README_2.txt
  U   zc.async/trunk/src/zc/async/README_3.txt
  U   zc.async/trunk/src/zc/async/README_3b.txt
  U   zc.async/trunk/src/zc/async/TODO.txt
  U   zc.async/trunk/src/zc/async/dispatcher.txt
  U   zc.async/trunk/src/zc/async/monitor.txt
  U   zc.async/trunk/src/zc/async/subscribers.txt
  U   zc.async/trunk/src/zc/async/testing.py

-=-
Modified: zc.async/trunk/src/zc/async/README.txt
===================================================================
--- zc.async/trunk/src/zc/async/README.txt	2008-04-24 19:18:26 UTC (rev 85705)
+++ zc.async/trunk/src/zc/async/README.txt	2008-04-24 20:56:43 UTC (rev 85706)
@@ -205,10 +205,10 @@
 
 The ``put`` returned a job.  Now we need to wait for the job to be
 performed.  We would normally do this by really waiting.  For our
-examples, we will use a helper function called ``wait_for`` to wait for
-the job to be completed [#wait_for]_.
+examples, we will use a helper method on the testing reactor to ``wait_for``
+the job to be completed.
 
-    >>> wait_for(job)
+    >>> reactor.wait_for(job)
     imagine this sent a message to another machine
 
 We also could have used the method of a persistent object.  Here's another
@@ -233,7 +233,7 @@
     >>> job = queue.put(root['demo'].increase)
     >>> transaction.commit()
 
-    >>> wait_for(job)
+    >>> reactor.wait_for(job)
     >>> root['demo'].counter
     1
 
@@ -279,16 +279,16 @@
     >>> job.begin_after
     datetime.datetime(2006, 8, 10, 15, 56, tzinfo=<UTC>)
     >>> transaction.commit()
-    >>> wait_for(job, attempts=2) # +5 virtual seconds
+    >>> reactor.wait_for(job, attempts=2) # +5 virtual seconds
     TIME OUT
-    >>> wait_for(job, attempts=2) # +5 virtual seconds
+    >>> reactor.wait_for(job, attempts=2) # +5 virtual seconds
     TIME OUT
     >>> datetime.datetime.now(pytz.UTC)
     datetime.datetime(2006, 8, 10, 15, 44, 43, 211, tzinfo=<UTC>)
 
     >>> zc.async.testing.set_now(datetime.datetime(
     ...     2006, 8, 10, 15, 56, tzinfo=pytz.UTC))
-    >>> wait_for(job)
+    >>> reactor.wait_for(job)
     imagine this sent a message to another machine
     >>> datetime.datetime.now(pytz.UTC) >= job.begin_after
     True
@@ -353,7 +353,7 @@
     >>> job.status == zc.async.interfaces.PENDING
     True
     >>> transaction.commit()
-    >>> wait_for(job)
+    >>> reactor.wait_for(job)
     >>> t = transaction.begin()
     >>> job.result
     '200 OK'
@@ -377,7 +377,7 @@
     >>> job = queue.put(
     ...     zc.async.job.Job(root['demo'].increase, 5))
     >>> transaction.commit()
-    >>> wait_for(job)
+    >>> reactor.wait_for(job)
     >>> t = transaction.begin()
     >>> root['demo'].counter
     6
@@ -387,7 +387,7 @@
     >>> job = queue.put(
     ...     zc.async.job.Job(root['demo'].increase, value=10))
     >>> transaction.commit()
-    >>> wait_for(job)
+    >>> reactor.wait_for(job)
     >>> t = transaction.begin()
     >>> root['demo'].counter
     16
@@ -405,7 +405,7 @@
     ...
     >>> job = queue.put(I_am_a_bad_bad_function)
     >>> transaction.commit()
-    >>> wait_for(job)
+    >>> reactor.wait_for(job)
     >>> t = transaction.begin()
     >>> job.result
     <zc.twist.Failure exceptions.NameError>
@@ -443,7 +443,7 @@
     >>> job = queue.put(imaginaryNetworkCall)
     >>> callback = job.addCallback(I_scribble_on_strings)
     >>> transaction.commit()
-    >>> wait_for(job)
+    >>> reactor.wait_for(job)
     >>> job.result
     '200 OK'
     >>> callback.result
@@ -460,7 +460,7 @@
     >>> callback1 = job.addCallbacks(failure=I_handle_NameErrors)
     >>> callback2 = callback1.addCallback(I_scribble_on_strings)
     >>> transaction.commit()
-    >>> wait_for(job)
+    >>> reactor.wait_for(job)
     >>> job.result
     <zc.twist.Failure exceptions.NameError>
     >>> callback1.result
@@ -544,6 +544,7 @@
     ...
     >>> job = queue.put(annotateStatus)
     >>> transaction.commit()
+    >>> import time
     >>> def wait_for_annotation(job, key):
     ...     reactor.time_flies(dispatcher.poll_interval) # starts thread
     ...     for i in range(10):
@@ -567,7 +568,7 @@
 
     >>> job.annotations['zc.async.test.flag'] = True
     >>> transaction.commit()
-    >>> wait_for(job)
+    >>> reactor.wait_for(job)
     >>> job.result
     42
 
@@ -602,7 +603,7 @@
     >>> job1.args.append(job2)
     >>> job2.args.append(job1)
     >>> transaction.commit()
-    >>> wait_for(job1, job2)
+    >>> reactor.wait_for(job1, job2)
     >>> job1.status == zc.async.interfaces.COMPLETED
     True
     >>> job2.status == zc.async.interfaces.COMPLETED
@@ -655,8 +656,8 @@
     True
     >>> job2.annotations['zc.async.test.flag'] = False
     >>> transaction.commit()
-    >>> wait_for(job1)
-    >>> wait_for(job2)
+    >>> reactor.wait_for(job1)
+    >>> reactor.wait_for(job2)
     >>> print job1.result
     None
     >>> print job2.result
@@ -702,9 +703,9 @@
     ...
     >>> job = queue.put(first_job)
     >>> transaction.commit()
-    >>> wait_for(job, attempts=3)
+    >>> reactor.wait_for(job, attempts=3)
     TIME OUT
-    >>> wait_for(job, attempts=3)
+    >>> reactor.wait_for(job, attempts=3)
     >>> job.result
     42
 
@@ -778,13 +779,13 @@
 
     >>> job = queue.put(main_job)
     >>> transaction.commit()
-    >>> wait_for(job, attempts=3)
+    >>> reactor.wait_for(job, attempts=3)
     TIME OUT
-    >>> wait_for(job, attempts=3)
+    >>> reactor.wait_for(job, attempts=3)
     TIME OUT
-    >>> wait_for(job, attempts=3)
+    >>> reactor.wait_for(job, attempts=3)
     TIME OUT
-    >>> wait_for(job, attempts=3)
+    >>> reactor.wait_for(job, attempts=3)
     >>> job.result
     42
 
@@ -819,7 +820,7 @@
     ...
     >>> job = queue.put(delegator)
     >>> transaction.commit()
-    >>> wait_for(job)
+    >>> reactor.wait_for(job)
     >>> job.result
     '200 OK'
 
@@ -1048,27 +1049,6 @@
     3
     >>> transaction.commit()
 
-.. [#wait_for] This is our helper function.  It relies on the test fixtures
-    set up in the previous footnote.
-
-    >>> import time
-    >>> def wait_for(*jobs, **kwargs):
-    ...     reactor.time_flies(dispatcher.poll_interval) # starts thread
-    ...     # now we wait for the thread
-    ...     for i in range(kwargs.get('attempts', 10)):
-    ...         while reactor.time_passes():
-    ...             pass
-    ...         transaction.begin()
-    ...         for j in jobs:
-    ...             if j.status != zc.async.interfaces.COMPLETED:
-    ...                 break
-    ...         else:
-    ...             break
-    ...         time.sleep(0.1)
-    ...     else:
-    ...         print 'TIME OUT'
-    ...
-
 .. [#commit_for_multidatabase] We commit before we do the next step as a
     good practice, in case the queue is from a different database than
     the root.  See the configuration sections for a discussion about
@@ -1087,7 +1067,7 @@
     >>> job = queue.put(
     ...     send_message, datetime.datetime(2006, 8, 10, 15, tzinfo=pytz.UTC))
     >>> transaction.commit()
-    >>> wait_for(job)
+    >>> reactor.wait_for(job)
     imagine this sent a message to another machine
 
     It's worth noting that this situation consitutes a small exception
@@ -1103,7 +1083,7 @@
     >>> job = queue.put(
     ...     send_message, datetime.datetime(2006, 7, 21, 12, tzinfo=pytz.UTC))
     >>> transaction.commit()
-    >>> wait_for(job)
+    >>> reactor.wait_for(job)
     >>> job.result
     <zc.twist.Failure zc.async.interfaces.AbortedError>
     >>> import sys

Modified: zc.async/trunk/src/zc/async/README_2.txt
===================================================================
--- zc.async/trunk/src/zc/async/README_2.txt	2008-04-24 19:18:26 UTC (rev 85705)
+++ zc.async/trunk/src/zc/async/README_2.txt	2008-04-24 20:56:43 UTC (rev 85706)
@@ -403,10 +403,10 @@
     >>> thread.start()
 
 The dispatcher should be starting up now.  Let's wait for it to activate.
-We're using a test convenience, get_poll, defined in the footnotes
-[#get_poll]_.
+We're using a test convenience, get_poll, defined in the testing module.
 
-    >>> poll = get_poll(0)
+    >>> from zc.async.testing import get_poll
+    >>> poll = get_poll(dispatcher, 0)
 
 We're off!  The events have been fired for registering and activating the
 dispatcher.  Therefore, our subscriber to add our agent has fired.
@@ -715,33 +715,13 @@
     >>> quota.filled
     False
 
-.. [#get_poll]
-
-    >>> import time
-    >>> def get_poll(count = None):
-    ...     if count is None:
-    ...         count = len(dispatcher.polls)
-    ...     for i in range(30):
-    ...         if len(dispatcher.polls) > count:
-    ...             return dispatcher.polls.first()
-    ...         time.sleep(0.1)
-    ...     else:
-    ...         assert False, 'no poll!'
-    ... 
-
 .. [#stop_config_reactor] We don't want the live dispatcher for our demos,
     actually.  See dispatcher.txt to see the live dispatcher actually in use.
+    So, here we'll stop the "real" reactor and switch to a testing one.
 
     >>> reactor.callFromThread(reactor.stop)
-    >>> for i in range(30):
-    ...     if not dispatcher.activated:
-    ...         break
-    ...     time.sleep(0.1)
-    ... else:
-    ...     assert False, 'dispatcher did not deactivate'
-    ...
-
-    Now, we'll restart with an explicit reactor.
+    >>> thread.join(3)
+    >>> assert not dispatcher.activated, 'dispatcher did not deactivate'
     
     >>> import zc.async.testing
     >>> reactor = zc.async.testing.Reactor()

Modified: zc.async/trunk/src/zc/async/README_3.txt
===================================================================
--- zc.async/trunk/src/zc/async/README_3.txt	2008-04-24 19:18:26 UTC (rev 85705)
+++ zc.async/trunk/src/zc/async/README_3.txt	2008-04-24 20:56:43 UTC (rev 85706)
@@ -152,7 +152,7 @@
     >>> import zc.async.dispatcher
     >>> dispatcher = zc.async.dispatcher.get()
     >>> import pprint
-    >>> pprint.pprint(get_poll(0))
+    >>> pprint.pprint(get_poll(dispatcher, 0))
     {'': {'main': {'active jobs': [],
                    'error': None,
                    'len': 0,
@@ -297,24 +297,4 @@
     >>> import zc.async.interfaces
     >>> zope.event.notify(zc.async.interfaces.DatabaseOpened(db))
 
-    >>> import time
-    >>> def get_poll(count=None): # just a helper used later, not processing
-    ...     if count is None:
-    ...         count = len(dispatcher.polls)
-    ...     for i in range(30):
-    ...         if len(dispatcher.polls) > count:
-    ...             return dispatcher.polls.first()
-    ...         time.sleep(0.1)
-    ...     else:
-    ...         assert False, 'no poll!'
-    ... 
-
-    >>> def wait_for_result(job): # just a helper used later, not processing
-    ...     for i in range(30):
-    ...         t = transaction.begin()
-    ...         if job.status == zc.async.interfaces.COMPLETED:
-    ...             return job.result
-    ...         time.sleep(0.5)
-    ...     else:
-    ...         assert False, 'job never completed'
-    ...
+    >>> from zc.async.testing import get_poll, wait_for_result

Modified: zc.async/trunk/src/zc/async/README_3b.txt
===================================================================
--- zc.async/trunk/src/zc/async/README_3b.txt	2008-04-24 19:18:26 UTC (rev 85705)
+++ zc.async/trunk/src/zc/async/README_3b.txt	2008-04-24 20:56:43 UTC (rev 85706)
@@ -39,7 +39,7 @@
     >>> import zc.async.dispatcher
     >>> dispatcher = zc.async.dispatcher.get()
     >>> import pprint
-    >>> pprint.pprint(get_poll(0))
+    >>> pprint.pprint(get_poll(dispatcher, 0))
     {'': {'main': {'active jobs': [],
                    'error': None,
                    'len': 0,
@@ -198,28 +198,8 @@
     >>> import zc.async.interfaces
     >>> zope.event.notify(zc.async.interfaces.DatabaseOpened(db))
 
-    >>> import time
-    >>> def get_poll(count=None): # just a helper used later, not processing
-    ...     if count is None:
-    ...         count = len(dispatcher.polls)
-    ...     for i in range(30):
-    ...         if len(dispatcher.polls) > count:
-    ...             return dispatcher.polls.first()
-    ...         time.sleep(0.1)
-    ...     else:
-    ...         assert False, 'no poll!'
-    ... 
+    >>> from zc.async.testing import get_poll, wait_for_result
 
-    >>> def wait_for_result(job): # just a helper used later, not processing
-    ...     for i in range(30):
-    ...         t = transaction.begin()
-    ...         if job.status == zc.async.interfaces.COMPLETED:
-    ...             return job.result
-    ...         time.sleep(0.5)
-    ...     else:
-    ...         assert False, 'job never completed'
-    ...
-
 .. [#shutdown]
 
     >>> import zc.async.dispatcher
@@ -227,14 +207,6 @@
     >>> dispatcher.reactor.callFromThread(dispatcher.reactor.stop)
     >>> dispatcher.thread.join(3)
 
-    >>> t = transaction.begin() # sync
-    >>> import zope.component
-    >>> import zc.async.interfaces
-    >>> uuid = zope.component.getUtility(zc.async.interfaces.IUUID)
-    >>> da = queue.dispatchers[uuid]
-    >>> bool(da.activated)
-    False
-
     >>> db.close()
     >>> db.databases['async'].close()
     >>> import shutil

Modified: zc.async/trunk/src/zc/async/TODO.txt
===================================================================
--- zc.async/trunk/src/zc/async/TODO.txt	2008-04-24 19:18:26 UTC (rev 85705)
+++ zc.async/trunk/src/zc/async/TODO.txt	2008-04-24 20:56:43 UTC (rev 85706)
@@ -5,6 +5,16 @@
 - show how to broadcast, maybe add conveniences
 - show how to use with collapsing jobs (hint to future self: use external queue
   to put in work, and have job(s) just pull what they can see from queue)
+- write tips and tricks
+  * avoid long transactions if possible.  really avoid long transactions
+    involving frequently written objects.  Discuss ramifications and
+    strategies.
+  * in zope.app.testing.functional tests, zc.async doesn't do well being
+    started in a layer's setup because then it is associated with the
+    wrapped layer DB, and the test is associated with the DemoStorage wrapper,
+    so that the test can see what zc.async does, but zc.async can't see what
+    the test does.  The current workaround is to start the dispatcher in the
+    test or the test set up (but, again, *not* The layer set up).
 
 For some other package, maybe:
 

Modified: zc.async/trunk/src/zc/async/dispatcher.txt
===================================================================
--- zc.async/trunk/src/zc/async/dispatcher.txt	2008-04-24 19:18:26 UTC (rev 85705)
+++ zc.async/trunk/src/zc/async/dispatcher.txt	2008-04-24 20:56:43 UTC (rev 85706)
@@ -95,10 +95,10 @@
     >>> thread.start()
 
 The dispatcher should be starting up now.  Let's wait for it to activate.
-We're using a test convenience, get_poll, defined in the footnotes
-[#get_poll]_.
+We're using a test convenience, get_poll, defined in zc.async.testing.
 
-    >>> poll = get_poll(0)
+    >>> from zc.async.testing import get_poll
+    >>> poll = get_poll(dispatcher, 0)
     >>> poll == {}
     True
     >>> initial <= poll.utc_timestamp <= datetime.datetime.utcnow()
@@ -121,7 +121,7 @@
 
 Now the next poll will register and activate the dispatcher in the queue.
 
-    >>> poll = get_poll()
+    >>> poll = get_poll(dispatcher)
 
 This accomplished the following things.
 
@@ -201,7 +201,7 @@
 The next poll will include the fact that it asked the 'main' agent for
 a job.
 
-    >>> poll = get_poll()
+    >>> poll = get_poll(dispatcher)
     >>> import pprint
     >>> pprint.pprint(dict(poll))
     {'': {'main': {'active jobs': [],
@@ -217,8 +217,7 @@
     11
 
 We can actually get it to perform some jobs now.  Here's a silly simple
-one.  We use a test convenience, wait_for_result, defined in the footnotes
-[#wait_for_result]_.
+one.  We use a test convenience, wait_for_result, defined in zc.async.testing.
 
     >>> import operator
     >>> job1 = queue.put(
@@ -227,6 +226,7 @@
     None
     >>> transaction.commit()
 
+    >>> from zc.async.testing import wait_for_result
     >>> wait_for_result(job1)
     42
 
@@ -378,8 +378,8 @@
 
 An important caveat about this is that the annotations, whether a get or
 a set, must not be persistent objects, or contain them directly or indirectly.
-We use a new test convenience , wait_for_annotation, defined in the footnotes
-[#wait_for_annotation]_.
+We use a new test convenience , wait_for_annotation, defined in the
+zc.async.testing.
 
     >>> def annotation_func():
     ...     zc.async.local.setLiveAnnotation('hello', 'from thread!')
@@ -390,7 +390,8 @@
     ...
     >>> job4 = queue.put(annotation_func)
     >>> transaction.commit()
-    
+
+    >>> from zc.async.testing import wait_for_annotation
     >>> wait_for_annotation(job4, 'hello')
     'from thread!'
     >>> job4.annotations['reply'] = 'HIYA'
@@ -458,33 +459,6 @@
     >>> import zc.async.configure
     >>> zc.async.configure.base()
 
-.. [#get_poll]
-
-    >>> import time
-    >>> def get_poll(count = None):
-    ...     if count is None:
-    ...         count = len(dispatcher.polls)
-    ...     for i in range(30):
-    ...         if len(dispatcher.polls) > count:
-    ...             return dispatcher.polls.first()
-    ...         time.sleep(0.1)
-    ...     else:
-    ...         assert False, 'no poll!'
-    ... 
-
-.. [#wait_for_result]
-
-    >>> import zc.async.interfaces
-    >>> def wait_for_result(job):
-    ...     for i in range(30):
-    ...         t = transaction.begin()
-    ...         if job.status == zc.async.interfaces.COMPLETED:
-    ...             return job.result
-    ...         time.sleep(0.1)
-    ...     else:
-    ...         assert False, 'job never completed'
-    ...
-
 .. [#getPollInfo] The dispatcher has a ``getPollInfo`` method that lets you
     find this poll information also.
     
@@ -535,15 +509,3 @@
     exceptions.TypeError: unsupported operand type(s) for *: 'int' and 'NoneType'
     *--- End of Failure #... ---
     <BLANKLINE>
-
-.. [#wait_for_annotation]
-
-    >>> def wait_for_annotation(job, name):
-    ...     for i in range(30):
-    ...         t = transaction.begin()
-    ...         if name in job.annotations:
-    ...             return job.annotations[name]
-    ...         time.sleep(0.1)
-    ...     else:
-    ...         assert False, 'TIMEOUT'
-    ...

Modified: zc.async/trunk/src/zc/async/monitor.txt
===================================================================
--- zc.async/trunk/src/zc/async/monitor.txt	2008-04-24 19:18:26 UTC (rev 85705)
+++ zc.async/trunk/src/zc/async/monitor.txt	2008-04-24 20:56:43 UTC (rev 85706)
@@ -316,24 +316,6 @@
     >>> queue.dispatchers[dispatcher.UUID]['main'] = agent
     >>> transaction.commit()
 
-    >>> import time
-    >>> def wait_for(*jobs, **kwargs):
-    ...     reactor.time_flies(dispatcher.poll_interval) # starts thread
-    ...     # now we wait for the thread
-    ...     for i in range(kwargs.get('attempts', 10)):
-    ...         while reactor.time_passes():
-    ...             pass
-    ...         transaction.begin()
-    ...         for j in jobs:
-    ...             if j.status != zc.async.interfaces.COMPLETED:
-    ...                 break
-    ...         else:
-    ...             break
-    ...         time.sleep(0.1)
-    ...     else:
-    ...         print 'TIME OUT'
-    ...
-
 .. [#z3monitor_setup] This part actually sets up the monitoring.
 
     >>> import zc.ngi.testing

Modified: zc.async/trunk/src/zc/async/subscribers.txt
===================================================================
--- zc.async/trunk/src/zc/async/subscribers.txt	2008-04-24 19:18:26 UTC (rev 85705)
+++ zc.async/trunk/src/zc/async/subscribers.txt	2008-04-24 20:56:43 UTC (rev 85706)
@@ -25,14 +25,16 @@
     >>> zope.component.provideHandler(threaded_installer)
     >>> zope.event.notify(zc.async.interfaces.DatabaseOpened(db))
 
-Now a dispatcher is installed and running.  (The get_poll helper is defined in
-the first footnote.)
+Now a dispatcher is installed and running.  (The get_poll helper is a testing
+helper.)
 
     >>> import zc.async.dispatcher
     >>> dispatcher = zc.async.dispatcher.get()
     >>> dispatcher.poll_interval
     0.5
-    >>> get_poll(0)
+
+    >>> from zc.async.testing import get_poll
+    >>> get_poll(dispatcher, 0)
     {}
 
 The function also stashed the thread on the dispatcher, so we can write tests
@@ -120,7 +122,7 @@
 database, with a queue, with a dispatcher, with an agent.
 
     >>> import pprint
-    >>> pprint.pprint(get_poll())
+    >>> pprint.pprint(get_poll(dispatcher))
     {'': {'main': {'active jobs': [],
                    'error': None,
                    'len': 0,
@@ -220,15 +222,3 @@
 
     >>> import zc.async.configure
     >>> zc.async.configure.base()
-
-    >>> import time
-    >>> def get_poll(count = None):
-    ...     if count is None:
-    ...         count = len(dispatcher.polls)
-    ...     for i in range(30):
-    ...         if len(dispatcher.polls) > count:
-    ...             return dispatcher.polls.first()
-    ...         time.sleep(0.1)
-    ...     else:
-    ...         assert False, 'no poll!'
-    ...

Modified: zc.async/trunk/src/zc/async/testing.py
===================================================================
--- zc.async/trunk/src/zc/async/testing.py	2008-04-24 19:18:26 UTC (rev 85705)
+++ zc.async/trunk/src/zc/async/testing.py	2008-04-24 20:56:43 UTC (rev 85706)
@@ -13,9 +13,12 @@
 ##############################################################################
 import threading
 import bisect
+import time
 import datetime
 
 import pytz
+import transaction
+import zc.async.interfaces
 
 
 _now = None
@@ -116,14 +119,6 @@
 
     # end reactor methods
 
-    def _get_next(self, end):
-        self._lock.acquire()
-        try:
-            if self.calls and self.calls[0][0] <= end:
-                return self.calls.pop(0)
-        finally:
-            self._lock.release()
-
     def start(self):
         setUpDatetime()
         self.started = True
@@ -134,6 +129,16 @@
         self.started = False
         tearDownDatetime()
 
+    # these are for tests
+
+    def _get_next(self, end):
+        self._lock.acquire()
+        try:
+            if self.calls and self.calls[0][0] <= end:
+                return self.calls.pop(0)
+        finally:
+            self._lock.release()
+
     def time_flies(self, seconds):
         if not self.started:
             raise ValueError('not started')
@@ -158,3 +163,51 @@
             callable(*args, **kw)
             return True
         return False
+
+    def wait_for(self, *jobs, **kwargs):
+        poll_interval = kwargs.get('poll_interval', 5)
+        self.time_flies(poll_interval) # starts thread
+        # now we wait for the thread
+        for i in range(kwargs.get('attempts', 10)):
+            while self.time_passes():
+                pass
+            transaction.begin()
+            for j in jobs:
+                if j.status != zc.async.interfaces.COMPLETED:
+                    break
+            else:
+                break
+            time.sleep(0.1)
+        else:
+            print 'TIME OUT'
+
+# helper functions convenient for tests
+
+def get_poll(dispatcher, count=None):
+    if count is None:
+        count = len(dispatcher.polls)
+    for i in range(60):
+        if len(dispatcher.polls) > count:
+            return dispatcher.polls.first()
+        time.sleep(0.1)
+    else:
+        assert False, 'no poll!'
+
+def wait_for_result(job):
+    for i in range(60):
+        t = transaction.begin()
+        if job.status == zc.async.interfaces.COMPLETED:
+            return job.result
+        time.sleep(0.1)
+    else:
+        assert False, 'job never completed'
+
+
+def wait_for_annotation(job, name):
+    for i in range(60):
+        t = transaction.begin()
+        if name in job.annotations:
+            return job.annotations[name]
+        time.sleep(0.1)
+    else:
+        assert False, 'annotation never found'



More information about the Checkins mailing list