[Checkins] SVN: zc.async/trunk/src/zc/async/ move reusable testing
functions out to to zc.async.testing
Gary Poster
gary at zope.com
Thu Apr 24 16:56:47 EDT 2008
Log message for revision 85706:
move reusable testing functions out to to zc.async.testing
Changed:
U zc.async/trunk/src/zc/async/README.txt
U zc.async/trunk/src/zc/async/README_2.txt
U zc.async/trunk/src/zc/async/README_3.txt
U zc.async/trunk/src/zc/async/README_3b.txt
U zc.async/trunk/src/zc/async/TODO.txt
U zc.async/trunk/src/zc/async/dispatcher.txt
U zc.async/trunk/src/zc/async/monitor.txt
U zc.async/trunk/src/zc/async/subscribers.txt
U zc.async/trunk/src/zc/async/testing.py
-=-
Modified: zc.async/trunk/src/zc/async/README.txt
===================================================================
--- zc.async/trunk/src/zc/async/README.txt 2008-04-24 19:18:26 UTC (rev 85705)
+++ zc.async/trunk/src/zc/async/README.txt 2008-04-24 20:56:43 UTC (rev 85706)
@@ -205,10 +205,10 @@
The ``put`` returned a job. Now we need to wait for the job to be
performed. We would normally do this by really waiting. For our
-examples, we will use a helper function called ``wait_for`` to wait for
-the job to be completed [#wait_for]_.
+examples, we will use a helper method on the testing reactor to ``wait_for``
+the job to be completed.
- >>> wait_for(job)
+ >>> reactor.wait_for(job)
imagine this sent a message to another machine
We also could have used the method of a persistent object. Here's another
@@ -233,7 +233,7 @@
>>> job = queue.put(root['demo'].increase)
>>> transaction.commit()
- >>> wait_for(job)
+ >>> reactor.wait_for(job)
>>> root['demo'].counter
1
@@ -279,16 +279,16 @@
>>> job.begin_after
datetime.datetime(2006, 8, 10, 15, 56, tzinfo=<UTC>)
>>> transaction.commit()
- >>> wait_for(job, attempts=2) # +5 virtual seconds
+ >>> reactor.wait_for(job, attempts=2) # +5 virtual seconds
TIME OUT
- >>> wait_for(job, attempts=2) # +5 virtual seconds
+ >>> reactor.wait_for(job, attempts=2) # +5 virtual seconds
TIME OUT
>>> datetime.datetime.now(pytz.UTC)
datetime.datetime(2006, 8, 10, 15, 44, 43, 211, tzinfo=<UTC>)
>>> zc.async.testing.set_now(datetime.datetime(
... 2006, 8, 10, 15, 56, tzinfo=pytz.UTC))
- >>> wait_for(job)
+ >>> reactor.wait_for(job)
imagine this sent a message to another machine
>>> datetime.datetime.now(pytz.UTC) >= job.begin_after
True
@@ -353,7 +353,7 @@
>>> job.status == zc.async.interfaces.PENDING
True
>>> transaction.commit()
- >>> wait_for(job)
+ >>> reactor.wait_for(job)
>>> t = transaction.begin()
>>> job.result
'200 OK'
@@ -377,7 +377,7 @@
>>> job = queue.put(
... zc.async.job.Job(root['demo'].increase, 5))
>>> transaction.commit()
- >>> wait_for(job)
+ >>> reactor.wait_for(job)
>>> t = transaction.begin()
>>> root['demo'].counter
6
@@ -387,7 +387,7 @@
>>> job = queue.put(
... zc.async.job.Job(root['demo'].increase, value=10))
>>> transaction.commit()
- >>> wait_for(job)
+ >>> reactor.wait_for(job)
>>> t = transaction.begin()
>>> root['demo'].counter
16
@@ -405,7 +405,7 @@
...
>>> job = queue.put(I_am_a_bad_bad_function)
>>> transaction.commit()
- >>> wait_for(job)
+ >>> reactor.wait_for(job)
>>> t = transaction.begin()
>>> job.result
<zc.twist.Failure exceptions.NameError>
@@ -443,7 +443,7 @@
>>> job = queue.put(imaginaryNetworkCall)
>>> callback = job.addCallback(I_scribble_on_strings)
>>> transaction.commit()
- >>> wait_for(job)
+ >>> reactor.wait_for(job)
>>> job.result
'200 OK'
>>> callback.result
@@ -460,7 +460,7 @@
>>> callback1 = job.addCallbacks(failure=I_handle_NameErrors)
>>> callback2 = callback1.addCallback(I_scribble_on_strings)
>>> transaction.commit()
- >>> wait_for(job)
+ >>> reactor.wait_for(job)
>>> job.result
<zc.twist.Failure exceptions.NameError>
>>> callback1.result
@@ -544,6 +544,7 @@
...
>>> job = queue.put(annotateStatus)
>>> transaction.commit()
+ >>> import time
>>> def wait_for_annotation(job, key):
... reactor.time_flies(dispatcher.poll_interval) # starts thread
... for i in range(10):
@@ -567,7 +568,7 @@
>>> job.annotations['zc.async.test.flag'] = True
>>> transaction.commit()
- >>> wait_for(job)
+ >>> reactor.wait_for(job)
>>> job.result
42
@@ -602,7 +603,7 @@
>>> job1.args.append(job2)
>>> job2.args.append(job1)
>>> transaction.commit()
- >>> wait_for(job1, job2)
+ >>> reactor.wait_for(job1, job2)
>>> job1.status == zc.async.interfaces.COMPLETED
True
>>> job2.status == zc.async.interfaces.COMPLETED
@@ -655,8 +656,8 @@
True
>>> job2.annotations['zc.async.test.flag'] = False
>>> transaction.commit()
- >>> wait_for(job1)
- >>> wait_for(job2)
+ >>> reactor.wait_for(job1)
+ >>> reactor.wait_for(job2)
>>> print job1.result
None
>>> print job2.result
@@ -702,9 +703,9 @@
...
>>> job = queue.put(first_job)
>>> transaction.commit()
- >>> wait_for(job, attempts=3)
+ >>> reactor.wait_for(job, attempts=3)
TIME OUT
- >>> wait_for(job, attempts=3)
+ >>> reactor.wait_for(job, attempts=3)
>>> job.result
42
@@ -778,13 +779,13 @@
>>> job = queue.put(main_job)
>>> transaction.commit()
- >>> wait_for(job, attempts=3)
+ >>> reactor.wait_for(job, attempts=3)
TIME OUT
- >>> wait_for(job, attempts=3)
+ >>> reactor.wait_for(job, attempts=3)
TIME OUT
- >>> wait_for(job, attempts=3)
+ >>> reactor.wait_for(job, attempts=3)
TIME OUT
- >>> wait_for(job, attempts=3)
+ >>> reactor.wait_for(job, attempts=3)
>>> job.result
42
@@ -819,7 +820,7 @@
...
>>> job = queue.put(delegator)
>>> transaction.commit()
- >>> wait_for(job)
+ >>> reactor.wait_for(job)
>>> job.result
'200 OK'
@@ -1048,27 +1049,6 @@
3
>>> transaction.commit()
-.. [#wait_for] This is our helper function. It relies on the test fixtures
- set up in the previous footnote.
-
- >>> import time
- >>> def wait_for(*jobs, **kwargs):
- ... reactor.time_flies(dispatcher.poll_interval) # starts thread
- ... # now we wait for the thread
- ... for i in range(kwargs.get('attempts', 10)):
- ... while reactor.time_passes():
- ... pass
- ... transaction.begin()
- ... for j in jobs:
- ... if j.status != zc.async.interfaces.COMPLETED:
- ... break
- ... else:
- ... break
- ... time.sleep(0.1)
- ... else:
- ... print 'TIME OUT'
- ...
-
.. [#commit_for_multidatabase] We commit before we do the next step as a
good practice, in case the queue is from a different database than
the root. See the configuration sections for a discussion about
@@ -1087,7 +1067,7 @@
>>> job = queue.put(
... send_message, datetime.datetime(2006, 8, 10, 15, tzinfo=pytz.UTC))
>>> transaction.commit()
- >>> wait_for(job)
+ >>> reactor.wait_for(job)
imagine this sent a message to another machine
It's worth noting that this situation consitutes a small exception
@@ -1103,7 +1083,7 @@
>>> job = queue.put(
... send_message, datetime.datetime(2006, 7, 21, 12, tzinfo=pytz.UTC))
>>> transaction.commit()
- >>> wait_for(job)
+ >>> reactor.wait_for(job)
>>> job.result
<zc.twist.Failure zc.async.interfaces.AbortedError>
>>> import sys
Modified: zc.async/trunk/src/zc/async/README_2.txt
===================================================================
--- zc.async/trunk/src/zc/async/README_2.txt 2008-04-24 19:18:26 UTC (rev 85705)
+++ zc.async/trunk/src/zc/async/README_2.txt 2008-04-24 20:56:43 UTC (rev 85706)
@@ -403,10 +403,10 @@
>>> thread.start()
The dispatcher should be starting up now. Let's wait for it to activate.
-We're using a test convenience, get_poll, defined in the footnotes
-[#get_poll]_.
+We're using a test convenience, get_poll, defined in the testing module.
- >>> poll = get_poll(0)
+ >>> from zc.async.testing import get_poll
+ >>> poll = get_poll(dispatcher, 0)
We're off! The events have been fired for registering and activating the
dispatcher. Therefore, our subscriber to add our agent has fired.
@@ -715,33 +715,13 @@
>>> quota.filled
False
-.. [#get_poll]
-
- >>> import time
- >>> def get_poll(count = None):
- ... if count is None:
- ... count = len(dispatcher.polls)
- ... for i in range(30):
- ... if len(dispatcher.polls) > count:
- ... return dispatcher.polls.first()
- ... time.sleep(0.1)
- ... else:
- ... assert False, 'no poll!'
- ...
-
.. [#stop_config_reactor] We don't want the live dispatcher for our demos,
actually. See dispatcher.txt to see the live dispatcher actually in use.
+ So, here we'll stop the "real" reactor and switch to a testing one.
>>> reactor.callFromThread(reactor.stop)
- >>> for i in range(30):
- ... if not dispatcher.activated:
- ... break
- ... time.sleep(0.1)
- ... else:
- ... assert False, 'dispatcher did not deactivate'
- ...
-
- Now, we'll restart with an explicit reactor.
+ >>> thread.join(3)
+ >>> assert not dispatcher.activated, 'dispatcher did not deactivate'
>>> import zc.async.testing
>>> reactor = zc.async.testing.Reactor()
Modified: zc.async/trunk/src/zc/async/README_3.txt
===================================================================
--- zc.async/trunk/src/zc/async/README_3.txt 2008-04-24 19:18:26 UTC (rev 85705)
+++ zc.async/trunk/src/zc/async/README_3.txt 2008-04-24 20:56:43 UTC (rev 85706)
@@ -152,7 +152,7 @@
>>> import zc.async.dispatcher
>>> dispatcher = zc.async.dispatcher.get()
>>> import pprint
- >>> pprint.pprint(get_poll(0))
+ >>> pprint.pprint(get_poll(dispatcher, 0))
{'': {'main': {'active jobs': [],
'error': None,
'len': 0,
@@ -297,24 +297,4 @@
>>> import zc.async.interfaces
>>> zope.event.notify(zc.async.interfaces.DatabaseOpened(db))
- >>> import time
- >>> def get_poll(count=None): # just a helper used later, not processing
- ... if count is None:
- ... count = len(dispatcher.polls)
- ... for i in range(30):
- ... if len(dispatcher.polls) > count:
- ... return dispatcher.polls.first()
- ... time.sleep(0.1)
- ... else:
- ... assert False, 'no poll!'
- ...
-
- >>> def wait_for_result(job): # just a helper used later, not processing
- ... for i in range(30):
- ... t = transaction.begin()
- ... if job.status == zc.async.interfaces.COMPLETED:
- ... return job.result
- ... time.sleep(0.5)
- ... else:
- ... assert False, 'job never completed'
- ...
+ >>> from zc.async.testing import get_poll, wait_for_result
Modified: zc.async/trunk/src/zc/async/README_3b.txt
===================================================================
--- zc.async/trunk/src/zc/async/README_3b.txt 2008-04-24 19:18:26 UTC (rev 85705)
+++ zc.async/trunk/src/zc/async/README_3b.txt 2008-04-24 20:56:43 UTC (rev 85706)
@@ -39,7 +39,7 @@
>>> import zc.async.dispatcher
>>> dispatcher = zc.async.dispatcher.get()
>>> import pprint
- >>> pprint.pprint(get_poll(0))
+ >>> pprint.pprint(get_poll(dispatcher, 0))
{'': {'main': {'active jobs': [],
'error': None,
'len': 0,
@@ -198,28 +198,8 @@
>>> import zc.async.interfaces
>>> zope.event.notify(zc.async.interfaces.DatabaseOpened(db))
- >>> import time
- >>> def get_poll(count=None): # just a helper used later, not processing
- ... if count is None:
- ... count = len(dispatcher.polls)
- ... for i in range(30):
- ... if len(dispatcher.polls) > count:
- ... return dispatcher.polls.first()
- ... time.sleep(0.1)
- ... else:
- ... assert False, 'no poll!'
- ...
+ >>> from zc.async.testing import get_poll, wait_for_result
- >>> def wait_for_result(job): # just a helper used later, not processing
- ... for i in range(30):
- ... t = transaction.begin()
- ... if job.status == zc.async.interfaces.COMPLETED:
- ... return job.result
- ... time.sleep(0.5)
- ... else:
- ... assert False, 'job never completed'
- ...
-
.. [#shutdown]
>>> import zc.async.dispatcher
@@ -227,14 +207,6 @@
>>> dispatcher.reactor.callFromThread(dispatcher.reactor.stop)
>>> dispatcher.thread.join(3)
- >>> t = transaction.begin() # sync
- >>> import zope.component
- >>> import zc.async.interfaces
- >>> uuid = zope.component.getUtility(zc.async.interfaces.IUUID)
- >>> da = queue.dispatchers[uuid]
- >>> bool(da.activated)
- False
-
>>> db.close()
>>> db.databases['async'].close()
>>> import shutil
Modified: zc.async/trunk/src/zc/async/TODO.txt
===================================================================
--- zc.async/trunk/src/zc/async/TODO.txt 2008-04-24 19:18:26 UTC (rev 85705)
+++ zc.async/trunk/src/zc/async/TODO.txt 2008-04-24 20:56:43 UTC (rev 85706)
@@ -5,6 +5,16 @@
- show how to broadcast, maybe add conveniences
- show how to use with collapsing jobs (hint to future self: use external queue
to put in work, and have job(s) just pull what they can see from queue)
+- write tips and tricks
+ * avoid long transactions if possible. really avoid long transactions
+ involving frequently written objects. Discuss ramifications and
+ strategies.
+ * in zope.app.testing.functional tests, zc.async doesn't do well being
+ started in a layer's setup because then it is associated with the
+ wrapped layer DB, and the test is associated with the DemoStorage wrapper,
+ so that the test can see what zc.async does, but zc.async can't see what
+ the test does. The current workaround is to start the dispatcher in the
+ test or the test set up (but, again, *not* The layer set up).
For some other package, maybe:
Modified: zc.async/trunk/src/zc/async/dispatcher.txt
===================================================================
--- zc.async/trunk/src/zc/async/dispatcher.txt 2008-04-24 19:18:26 UTC (rev 85705)
+++ zc.async/trunk/src/zc/async/dispatcher.txt 2008-04-24 20:56:43 UTC (rev 85706)
@@ -95,10 +95,10 @@
>>> thread.start()
The dispatcher should be starting up now. Let's wait for it to activate.
-We're using a test convenience, get_poll, defined in the footnotes
-[#get_poll]_.
+We're using a test convenience, get_poll, defined in zc.async.testing.
- >>> poll = get_poll(0)
+ >>> from zc.async.testing import get_poll
+ >>> poll = get_poll(dispatcher, 0)
>>> poll == {}
True
>>> initial <= poll.utc_timestamp <= datetime.datetime.utcnow()
@@ -121,7 +121,7 @@
Now the next poll will register and activate the dispatcher in the queue.
- >>> poll = get_poll()
+ >>> poll = get_poll(dispatcher)
This accomplished the following things.
@@ -201,7 +201,7 @@
The next poll will include the fact that it asked the 'main' agent for
a job.
- >>> poll = get_poll()
+ >>> poll = get_poll(dispatcher)
>>> import pprint
>>> pprint.pprint(dict(poll))
{'': {'main': {'active jobs': [],
@@ -217,8 +217,7 @@
11
We can actually get it to perform some jobs now. Here's a silly simple
-one. We use a test convenience, wait_for_result, defined in the footnotes
-[#wait_for_result]_.
+one. We use a test convenience, wait_for_result, defined in zc.async.testing.
>>> import operator
>>> job1 = queue.put(
@@ -227,6 +226,7 @@
None
>>> transaction.commit()
+ >>> from zc.async.testing import wait_for_result
>>> wait_for_result(job1)
42
@@ -378,8 +378,8 @@
An important caveat about this is that the annotations, whether a get or
a set, must not be persistent objects, or contain them directly or indirectly.
-We use a new test convenience , wait_for_annotation, defined in the footnotes
-[#wait_for_annotation]_.
+We use a new test convenience , wait_for_annotation, defined in the
+zc.async.testing.
>>> def annotation_func():
... zc.async.local.setLiveAnnotation('hello', 'from thread!')
@@ -390,7 +390,8 @@
...
>>> job4 = queue.put(annotation_func)
>>> transaction.commit()
-
+
+ >>> from zc.async.testing import wait_for_annotation
>>> wait_for_annotation(job4, 'hello')
'from thread!'
>>> job4.annotations['reply'] = 'HIYA'
@@ -458,33 +459,6 @@
>>> import zc.async.configure
>>> zc.async.configure.base()
-.. [#get_poll]
-
- >>> import time
- >>> def get_poll(count = None):
- ... if count is None:
- ... count = len(dispatcher.polls)
- ... for i in range(30):
- ... if len(dispatcher.polls) > count:
- ... return dispatcher.polls.first()
- ... time.sleep(0.1)
- ... else:
- ... assert False, 'no poll!'
- ...
-
-.. [#wait_for_result]
-
- >>> import zc.async.interfaces
- >>> def wait_for_result(job):
- ... for i in range(30):
- ... t = transaction.begin()
- ... if job.status == zc.async.interfaces.COMPLETED:
- ... return job.result
- ... time.sleep(0.1)
- ... else:
- ... assert False, 'job never completed'
- ...
-
.. [#getPollInfo] The dispatcher has a ``getPollInfo`` method that lets you
find this poll information also.
@@ -535,15 +509,3 @@
exceptions.TypeError: unsupported operand type(s) for *: 'int' and 'NoneType'
*--- End of Failure #... ---
<BLANKLINE>
-
-.. [#wait_for_annotation]
-
- >>> def wait_for_annotation(job, name):
- ... for i in range(30):
- ... t = transaction.begin()
- ... if name in job.annotations:
- ... return job.annotations[name]
- ... time.sleep(0.1)
- ... else:
- ... assert False, 'TIMEOUT'
- ...
Modified: zc.async/trunk/src/zc/async/monitor.txt
===================================================================
--- zc.async/trunk/src/zc/async/monitor.txt 2008-04-24 19:18:26 UTC (rev 85705)
+++ zc.async/trunk/src/zc/async/monitor.txt 2008-04-24 20:56:43 UTC (rev 85706)
@@ -316,24 +316,6 @@
>>> queue.dispatchers[dispatcher.UUID]['main'] = agent
>>> transaction.commit()
- >>> import time
- >>> def wait_for(*jobs, **kwargs):
- ... reactor.time_flies(dispatcher.poll_interval) # starts thread
- ... # now we wait for the thread
- ... for i in range(kwargs.get('attempts', 10)):
- ... while reactor.time_passes():
- ... pass
- ... transaction.begin()
- ... for j in jobs:
- ... if j.status != zc.async.interfaces.COMPLETED:
- ... break
- ... else:
- ... break
- ... time.sleep(0.1)
- ... else:
- ... print 'TIME OUT'
- ...
-
.. [#z3monitor_setup] This part actually sets up the monitoring.
>>> import zc.ngi.testing
Modified: zc.async/trunk/src/zc/async/subscribers.txt
===================================================================
--- zc.async/trunk/src/zc/async/subscribers.txt 2008-04-24 19:18:26 UTC (rev 85705)
+++ zc.async/trunk/src/zc/async/subscribers.txt 2008-04-24 20:56:43 UTC (rev 85706)
@@ -25,14 +25,16 @@
>>> zope.component.provideHandler(threaded_installer)
>>> zope.event.notify(zc.async.interfaces.DatabaseOpened(db))
-Now a dispatcher is installed and running. (The get_poll helper is defined in
-the first footnote.)
+Now a dispatcher is installed and running. (The get_poll helper is a testing
+helper.)
>>> import zc.async.dispatcher
>>> dispatcher = zc.async.dispatcher.get()
>>> dispatcher.poll_interval
0.5
- >>> get_poll(0)
+
+ >>> from zc.async.testing import get_poll
+ >>> get_poll(dispatcher, 0)
{}
The function also stashed the thread on the dispatcher, so we can write tests
@@ -120,7 +122,7 @@
database, with a queue, with a dispatcher, with an agent.
>>> import pprint
- >>> pprint.pprint(get_poll())
+ >>> pprint.pprint(get_poll(dispatcher))
{'': {'main': {'active jobs': [],
'error': None,
'len': 0,
@@ -220,15 +222,3 @@
>>> import zc.async.configure
>>> zc.async.configure.base()
-
- >>> import time
- >>> def get_poll(count = None):
- ... if count is None:
- ... count = len(dispatcher.polls)
- ... for i in range(30):
- ... if len(dispatcher.polls) > count:
- ... return dispatcher.polls.first()
- ... time.sleep(0.1)
- ... else:
- ... assert False, 'no poll!'
- ...
Modified: zc.async/trunk/src/zc/async/testing.py
===================================================================
--- zc.async/trunk/src/zc/async/testing.py 2008-04-24 19:18:26 UTC (rev 85705)
+++ zc.async/trunk/src/zc/async/testing.py 2008-04-24 20:56:43 UTC (rev 85706)
@@ -13,9 +13,12 @@
##############################################################################
import threading
import bisect
+import time
import datetime
import pytz
+import transaction
+import zc.async.interfaces
_now = None
@@ -116,14 +119,6 @@
# end reactor methods
- def _get_next(self, end):
- self._lock.acquire()
- try:
- if self.calls and self.calls[0][0] <= end:
- return self.calls.pop(0)
- finally:
- self._lock.release()
-
def start(self):
setUpDatetime()
self.started = True
@@ -134,6 +129,16 @@
self.started = False
tearDownDatetime()
+ # these are for tests
+
+ def _get_next(self, end):
+ self._lock.acquire()
+ try:
+ if self.calls and self.calls[0][0] <= end:
+ return self.calls.pop(0)
+ finally:
+ self._lock.release()
+
def time_flies(self, seconds):
if not self.started:
raise ValueError('not started')
@@ -158,3 +163,51 @@
callable(*args, **kw)
return True
return False
+
+ def wait_for(self, *jobs, **kwargs):
+ poll_interval = kwargs.get('poll_interval', 5)
+ self.time_flies(poll_interval) # starts thread
+ # now we wait for the thread
+ for i in range(kwargs.get('attempts', 10)):
+ while self.time_passes():
+ pass
+ transaction.begin()
+ for j in jobs:
+ if j.status != zc.async.interfaces.COMPLETED:
+ break
+ else:
+ break
+ time.sleep(0.1)
+ else:
+ print 'TIME OUT'
+
+# helper functions convenient for tests
+
+def get_poll(dispatcher, count=None):
+ if count is None:
+ count = len(dispatcher.polls)
+ for i in range(60):
+ if len(dispatcher.polls) > count:
+ return dispatcher.polls.first()
+ time.sleep(0.1)
+ else:
+ assert False, 'no poll!'
+
+def wait_for_result(job):
+ for i in range(60):
+ t = transaction.begin()
+ if job.status == zc.async.interfaces.COMPLETED:
+ return job.result
+ time.sleep(0.1)
+ else:
+ assert False, 'job never completed'
+
+
+def wait_for_annotation(job, name):
+ for i in range(60):
+ t = transaction.begin()
+ if name in job.annotations:
+ return job.annotations[name]
+ time.sleep(0.1)
+ else:
+ assert False, 'annotation never found'
More information about the Checkins
mailing list