[Checkins] SVN: zc.async/trunk/s Changes for 1.4.1

Gary Poster gary at zope.com
Wed Jul 30 21:03:45 EDT 2008


Log message for revision 89070:
  Changes for 1.4.1
  
  - The new ``serial`` and ``parallel`` helpers did not allow the the
    ``postprocess`` argument to be a partial closure, and were being naughty.
    Fixed.
  
  - Added tests and demos for advanced features of ``serial`` and ``parallel``.
  
  - More tweaks to the new Quickstart S5 document.
  
  

Changed:
  U   zc.async/trunk/setup.py
  U   zc.async/trunk/src/zc/async/CHANGES.txt
  U   zc.async/trunk/src/zc/async/QUICKSTART_1_VIRTUALENV.txt
  U   zc.async/trunk/src/zc/async/README.txt
  U   zc.async/trunk/src/zc/async/job.py

-=-
Modified: zc.async/trunk/setup.py
===================================================================
--- zc.async/trunk/setup.py	2008-07-30 22:22:56 UTC (rev 89069)
+++ zc.async/trunk/setup.py	2008-07-31 01:03:42 UTC (rev 89070)
@@ -71,7 +71,7 @@
 
 setup(
     name='zc.async',
-    version='1.4.0',
+    version='1.4.1',
     packages=find_packages('src'),
     package_dir={'':'src'},
     zip_safe=False,

Modified: zc.async/trunk/src/zc/async/CHANGES.txt
===================================================================
--- zc.async/trunk/src/zc/async/CHANGES.txt	2008-07-30 22:22:56 UTC (rev 89069)
+++ zc.async/trunk/src/zc/async/CHANGES.txt	2008-07-31 01:03:42 UTC (rev 89070)
@@ -1,6 +1,17 @@
-1.4 (2008-07-30)
-================
+1.4.1 (2008-07-30)
+==================
 
+- The new ``serial`` and ``parallel`` helpers did not allow the the
+  ``postprocess`` argument to be a partial closure, and were being naughty.
+  Fixed.
+
+- Added tests and demos for advanced features of ``serial`` and ``parallel``.
+
+- More tweaks to the new Quickstart S5 document.
+
+1.4.0 (2008-07-30)
+==================
+
 - Mentioned in ftesting.txt that Zope 3 users should uses zope.app.testing
   3.4.2 or newer.  Also added a summary section at the beginning of that file.
 

Modified: zc.async/trunk/src/zc/async/QUICKSTART_1_VIRTUALENV.txt
===================================================================
--- zc.async/trunk/src/zc/async/QUICKSTART_1_VIRTUALENV.txt	2008-07-30 22:22:56 UTC (rev 89069)
+++ zc.async/trunk/src/zc/async/QUICKSTART_1_VIRTUALENV.txt	2008-07-31 01:03:42 UTC (rev 89070)
@@ -89,6 +89,8 @@
 High-level Features
 ===================
 
+.. class:: incremental
+
 - easy to use
 
 - flexible configuration
@@ -112,9 +114,12 @@
     your asynchronous jobs. It is well-tested and has test helpers for you to
     use and test patterns for you to follow.
 
-An Experiment
-=============
+Let's Experiment!
+=================
 
+Installation with virtualenv
+============================
+
 To start, install |virtualenv|_ and create a virtual environment for our
 experiments.
 
@@ -123,12 +128,15 @@
     I prefer zc.buildout for production deployments, but virtualenv is very
     nice for quick experimentation.
 
+::
+
+    $ easy_install virtualenv
+    $ virtualenv quickstart
+
 Install zc.async in the virtual environment.
 
 ::
 
-    $ easy_install virtualenv
-    $ virtualenv quickstart
     $ cd quickstart/
     $ ./bin/easy_install zc.async
 
@@ -139,10 +147,19 @@
 Dependencies
 ============
 
-This installed the ZODB, Twisted, the basic Zope component architecture, and a
-few smaller packages.
+This installed several packages.
 
+- the ZODB, an object database from the Zope project;
 
+- Twisted, a framework for networked applications;
+
+- the component architecture from the Zope project;
+
+- and a few smaller packages.
+
+All of these, and zc.async, are distributed under BSD-like licenses such as
+LGPL and ZPL.
+
 .. class:: handout
 
     ::
@@ -197,8 +214,11 @@
 
     $ ./bin/python
 
-This will be our single client process.  You might have many.
+This will be our single client process.
 
+You might have many, each connecting to the main database server, and each able
+to perform and/or request zc.async jobs.
+
 Database Connection
 ===================
 
@@ -240,7 +260,7 @@
     >>> zc.async.configure.start(
     ...     db, poll_interval=1)
 
-Now the system is polling for jobs every second.
+Now the system has a ``dispatcher`` polling for jobs every second.
 
 The Queue
 =========
@@ -260,17 +280,15 @@
 
 Let's put a job in our queue.
 
->>> import time
+    >>> import time
     >>> j = q.put(time.time)
+
+It's not done yet.
+
     >>> j.result
     >>> j.status
     u'pending-status'
 
-We have to commit the transaction for the dispatcher to see the job.
-
-    >>> import transaction
-    >>> transaction.commit()
-
 .. class:: handout
 
     This is a silly example. Imagine instead that this was some really
@@ -282,6 +300,14 @@
     Or maybe this is a silly example.
 ..
 
+A Transaction
+=============
+
+We have to commit the transaction for the dispatcher to see the job.
+
+    >>> import transaction
+    >>> transaction.commit()
+
 A Result
 ========
 

Modified: zc.async/trunk/src/zc/async/README.txt
===================================================================
--- zc.async/trunk/src/zc/async/README.txt	2008-07-30 22:22:56 UTC (rev 89069)
+++ zc.async/trunk/src/zc/async/README.txt	2008-07-31 01:03:42 UTC (rev 89070)
@@ -786,6 +786,8 @@
     >>> job.result
     (0, 1, 2)
 
+[#extra_serial_tricks]_
+
 The ``parallel`` example we use below follows a similar pattern.
 
 Parallelized Work
@@ -838,7 +840,7 @@
     >>> job.result
     42
 
-Ta-da!
+Ta-da! [#extra_parallel_tricks]_
 
 Now, how did this work?  Let's look at a simple implementation directly.  We'll
 use a slightly different postprocess, that expects results directly rather than
@@ -1297,6 +1299,128 @@
     ...         assert False, 'never completed'
     ...
 
+.. [#extra_serial_tricks] The ``serial`` helper can accept a partial closure
+    for a ``postprocess`` argument.
+
+    >>> def postprocess(extra_info, *jobs):
+    ...     return extra_info, tuple(j.result for j in jobs)
+    ...
+    >>> job = queue.put(zc.async.job.serial(
+    ...     job_zero, job_one, job_two,
+    ...     postprocess=zc.async.job.Job(postprocess, 'foo')))
+    >>> transaction.commit()
+
+    >>> wait_repeatedly()
+    ... # doctest: +ELLIPSIS
+    TIME OUT...
+
+    >>> job.result
+    ('foo', (0, 1, 2))
+
+    The list of jobs can be extended by adding them to the args of the job
+    returned by ``serial`` under these circumstances:
+
+    - before the job has started,
+
+    - by an inner job while it is running, or
+
+    - by any callback added to any inner job *before* that inner job has begun.
+
+    Here's an example.
+
+    >>> def postprocess(*jobs):
+    ...     return [j.result for j in jobs]
+    ...
+    >>> job = queue.put(zc.async.job.serial(postprocess=postprocess))
+    >>> def second_job():
+    ...     return 'second'
+    ...
+    >>> def third_job():
+    ...     return 'third'
+    ...
+    >>> def schedule_third(main_job, ignored):
+    ...     main_job.args.append(zc.async.job.Job(third_job))
+    ...
+    >>> def first_job(main_job):
+    ...     j = zc.async.job.Job(second_job)
+    ...     main_job.args.append(j)
+    ...     j.addCallback(zc.async.job.Job(schedule_third, main_job))
+    ...     return 'first'
+    ...
+    >>> job.args.append(zc.async.job.Job(first_job, job))
+    >>> transaction.commit()
+
+    >>> wait_repeatedly()
+    ... # doctest: +ELLIPSIS
+    TIME OUT...
+
+    >>> job.result
+    ['first', 'second', 'third']
+
+    Be warned, these sort of constructs allow infinite loops!
+
+.. [#extra_parallel_tricks] The ``parallel`` helper can accept a partial closure
+    for a ``postprocess`` argument.
+
+    >>> def postprocess(extra_info, *jobs):
+    ...     return extra_info, sum(j.result for j in jobs)
+    ...
+    >>> job = queue.put(zc.async.job.parallel(
+    ...     job_A, job_B, job_C,
+    ...     postprocess=zc.async.job.Job(postprocess, 'foo')))
+
+    >>> transaction.commit()
+
+    >>> wait_repeatedly()
+    ... # doctest: +ELLIPSIS
+    TIME OUT...
+
+    >>> job.result
+    ('foo', 42)
+
+    The list of jobs can be extended by adding them to the args of the job
+    returned by ``parallel`` under these circumstances:
+
+    - before the job has started,
+
+    - by an inner job while it is running,
+
+    - by any callback added to any inner job *before* that inner job has begun.
+
+    Here's an example.
+
+    >>> def postprocess(*jobs):
+    ...     return [j.result for j in jobs]
+    ...
+    >>> job = queue.put(zc.async.job.parallel(postprocess=postprocess))
+    >>> def second_job():
+    ...     return 'second'
+    ...
+    >>> def third_job():
+    ...     return 'third'
+    ...
+    >>> def schedule_third(main_job, ignored):
+    ...     main_job.args.append(zc.async.job.Job(third_job))
+    ...
+    >>> def first_job(main_job):
+    ...     j = zc.async.job.Job(second_job)
+    ...     main_job.args.append(j)
+    ...     j.addCallback(zc.async.job.Job(schedule_third, main_job))
+    ...     return 'first'
+    ...
+    >>> job.args.append(zc.async.job.Job(first_job, job))
+    >>> transaction.commit()
+
+    >>> wait_repeatedly()
+    ... # doctest: +ELLIPSIS
+    TIME OUT...
+
+    >>> job.result
+    ['first', 'second', 'third']
+
+    As with ``serial``, be warned, these sort of constructs allow infinite
+    loops!
+
 .. [#stop_usage_reactor]
 
     >>> pprint.pprint(dispatcher.getStatistics()) # doctest: +ELLIPSIS
@@ -1307,9 +1431,9 @@
      'shortest active': None,
      'shortest failed': (..., 'unnamed'),
      'shortest successful': (..., 'unnamed'),
-     'started': 34,
+     'started': 54,
      'statistics end': datetime.datetime(2006, 8, 10, 15, 44, 22, 211),
-     'statistics start': datetime.datetime(2006, 8, 10, 15, ...),
-     'successful': 32,
+     'statistics start': datetime.datetime(2006, 8, 10, 16, ...),
+     'successful': 52,
      'unknown': 0}
     >>> reactor.stop()

Modified: zc.async/trunk/src/zc/async/job.py
===================================================================
--- zc.async/trunk/src/zc/async/job.py	2008-07-30 22:22:56 UTC (rev 89069)
+++ zc.async/trunk/src/zc/async/job.py	2008-07-31 01:03:42 UTC (rev 89070)
@@ -949,7 +949,6 @@
     result = Job(scheduler,
                  *(zc.async.interfaces.IJob(j) for j in jobs),
                  **dict(postprocess=postprocess))
-    postprocess.args = result.args # ...I guess this means I bless this muck
     return result
 
 def _queue_next(main_job, ix=0, ignored_result=None):
@@ -960,7 +959,9 @@
         queue.put(next)
         next.addCallback(Job(_queue_next, main_job, ix+1))
     else:
-        queue.put(main_job.kwargs['postprocess'])
+        postprocess = main_job.kwargs['postprocess']
+        postprocess.args.extend(jobs)
+        queue.put(postprocess)
 
 def _schedule_serial(*jobs, **kw):
     _queue_next(zc.async.local.getJob())
@@ -983,7 +984,9 @@
                             zc.async.interfaces.CALLBACKS):
             complete = False
     if complete:
-        queue.put(main_job.kwargs['postprocess'])
+        postprocess = main_job.kwargs['postprocess']
+        postprocess.args.extend(jobs)
+        queue.put(postprocess)
 
 def _schedule_parallel(*jobs, **kw):
     _queue_all(zc.async.local.getJob())



More information about the Checkins mailing list