[Checkins] SVN: zc.async/trunk/s - More doc additions (primarily work in the virtualenv quickstart)

Gary Poster gary at modernsongs.com
Sun Aug 17 00:08:20 EDT 2008


Log message for revision 89917:
  - More doc additions (primarily work in the virtualenv quickstart)
  
  - Fix support for Twisted installed reactor XXX NEEDS TEST
  
  - Fix retry behavior for parallel and serial jobs XXX NEEDS TEST
  
  - tweaked the uuid.txt to mention zdaemon/supervisor rather than Zope 3.
  
  
  

Changed:
  U   zc.async/trunk/setup.py
  U   zc.async/trunk/sphinx/.static/default.css
  U   zc.async/trunk/src/zc/async/CHANGES.txt
  U   zc.async/trunk/src/zc/async/QUICKSTART_1_VIRTUALENV.txt
  U   zc.async/trunk/src/zc/async/TODO.txt
  U   zc.async/trunk/src/zc/async/dispatcher.py
  U   zc.async/trunk/src/zc/async/instanceuuid.py
  U   zc.async/trunk/src/zc/async/job.py

-=-
Modified: zc.async/trunk/setup.py
===================================================================
--- zc.async/trunk/setup.py	2008-08-16 18:29:19 UTC (rev 89916)
+++ zc.async/trunk/setup.py	2008-08-17 04:08:16 UTC (rev 89917)
@@ -72,7 +72,7 @@
 
 setup(
     name='zc.async',
-    version='1.4.2a2',
+    version='1.4.2a3',
     namespace_packages=['zc'],
     packages=find_packages('src'),
     package_dir={'':'src'},

Modified: zc.async/trunk/sphinx/.static/default.css
===================================================================
--- zc.async/trunk/sphinx/.static/default.css	2008-08-16 18:29:19 UTC (rev 89916)
+++ zc.async/trunk/sphinx/.static/default.css	2008-08-17 04:08:16 UTC (rev 89917)
@@ -868,4 +868,9 @@
 
 div.sidebar p.sidebar-title tt {
 	background: transparent;
+}
+
+div#a-job div.highlight,
+div#make-a-file div.highlight {
+	background-color: #fff;
 }
\ No newline at end of file

Modified: zc.async/trunk/src/zc/async/CHANGES.txt
===================================================================
--- zc.async/trunk/src/zc/async/CHANGES.txt	2008-08-16 18:29:19 UTC (rev 89916)
+++ zc.async/trunk/src/zc/async/CHANGES.txt	2008-08-17 04:08:16 UTC (rev 89917)
@@ -18,6 +18,12 @@
 
 - Added zc.async.partial.Partial for backward compatibility purposes.
 
+- Fix support for Twisted installed reactor XXX NEEDS TEST
+
+- Fix retry behavior for parallel and serial jobs XXX NEEDS TEST
+
+- tweaked the uuid.txt to mention zdaemon/supervisor rather than Zope 3.
+
 1.4.1 (2008-07-30)
 ==================
 

Modified: zc.async/trunk/src/zc/async/QUICKSTART_1_VIRTUALENV.txt
===================================================================
--- zc.async/trunk/src/zc/async/QUICKSTART_1_VIRTUALENV.txt	2008-08-16 18:29:19 UTC (rev 89916)
+++ zc.async/trunk/src/zc/async/QUICKSTART_1_VIRTUALENV.txt	2008-08-17 04:08:16 UTC (rev 89917)
@@ -203,6 +203,17 @@
 A Job
 -----
 
+.. sidebar:: A Silly Example
+
+    This is a silly example. Imagine instead that this was some really
+    long-running job. Maybe you have lots of these jobs coming in, and you need
+    to have many machines to claim jobs and perform them, so that you can
+    scale. Maybe this job divides itself up into parallel or serial jobs, and
+    this parent job isn't done until all the children jobs run to completion.
+
+    Or maybe this is a silly example.
+..
+
 Let's put a job in our queue.  This silly example will return the current time.
 
     >>> import time
@@ -214,17 +225,6 @@
     >>> j.status
     u'pending-status'
 
-.. sidebar:: A Silly Example
-
-    This is a silly example. Imagine instead that this was some really
-    long-running job. Maybe you have lots of these jobs coming in, and you need
-    to have many machines to claim jobs and perform them, so that you can
-    scale. Maybe this job divides itself up into parallel or serial jobs, and
-    this parent job isn't done until all the children jobs run to completion.
-
-    Or maybe this is a silly example.
-..
-
 -------------
 A Transaction
 -------------
@@ -306,7 +306,7 @@
    means`_.
 
 We've now seen some simple examples from the standard library.  But how do you
-get your own work done?  How can you debug it?
+get your own work done?
 
 Let's say we want to write an approach a chestnut of a problem: use a `Monte
 Carlo simulation`_ (read "throwing darts and analyzing the results") to
@@ -323,7 +323,7 @@
 
 .. _`Hadoop`: http://hadoop.apache.org/core/
 
-.. _`calculate pi`: http://www.eveandersson.com/pi/monte-carlo-circle
+.. _`calculate pi`: http://math.fullerton.edu/mathews/n2003/MonteCarloPiMod.html
 
 .. _`far more sophisticated means`: http://en.wikipedia.org/wiki/Computing_Ï€
 
@@ -387,45 +387,287 @@
 
 .. _MVCC: http://en.wikipedia.org/wiki/Multiversion_concurrency_control
 
+-------------
+Process UUIDs
+-------------
+
+Exit the Python interpreter (control-D).  Look around in the directory
+(``ls``): you should see a few database files, the key.pem you created, and a
+new file: ``uuid.txt``.  It should look something like this::
+
+    $ cat uuid.txt
+    afd1e0d0-52e1-11dd-879b-0017f2c49bdd
+    ------------------------------------------------------------------------
+    The value above (and this file) is created and used by the zc.async
+    package. It is intended to uniquely identify this software instance when
+    it is used to start a zc.async dispatcher.  This allows multiple
+    dispatchers, each in its own software instance, to connect to a single
+    database to do work.
+    
+    In order to decide where to look for this file (or to create it, if
+    necessary), the module looks in ``os.environ['ZC_ASYNC_UUID']`` for a
+    file name.
+    
+    If you are using zdaemon (http://pypi.python.org/pypi/zdaemon) to
+    daemonize your process, you can set this in a zdaemon environment section
+    of your zdaemon.conf. Supervisor (http://supervisord.org/) also provides
+    this functionality. Other similar tools probably do as well.
+    
+    If the ``ZC_ASYNC_UUID`` is not found in the environment, it will use
+    ``os.path.join(os.getgwd(), 'uuid.txt')`` as the file name.
+    
+    To get a new identifier for this software instance, delete this file,
+    restart Python, and import zc.async.instanceuuid.  This file will be
+    recreated with a new value.
+
+That text is intended to be self-explanatory, so hopefully it made sense to
+you.  We'll handle these UUIDs more explicitly in a moment.
+
 -----------
 Make a File
 -----------
 
 Make a new Python file.  Let's call it ``pi.py``.
+Save this file in ``lib/python2.5/site-packages/``.
 
-At the top, include the same code we used in the interpreter above.
+Use the following for the file content.
 
+.. sidebar:: Code Walkthrough
+
+    This code walkthrough focuses on the elements needed to use |async|,
+    rather than the calculation of pi.  Numerous explanations of this Monte
+    Carlo simulation to calculate pi are on the internet.  I liked `this one`_.
+
+    We begin with some imports.
+
+    The ``generate_sample`` code will be run on multiple processes to produce
+    samples for our Monte Carlo function simulation.  It is ignorant of
+    |async|.
+    
+    Once the samples are all done, we'll reduce the results with
+    ``process_samples``.  It will return an approximation of pi, with accuracy
+    determined by the total size of the aggregated samples, assuming even
+    distribution of the random numbers.  As you'll see soon, we'll be using a
+    |async| convenience function for parallel jobs that gives all of the
+    completed jobs that have been running in parallel to this, the
+    postprocessing call. Therefore, ``process_samples`` gets the ``result`` of
+    ``generate_sample`` off of each job. Other than that, this function is
+    ignorant of |async|.
+    
+    The last code block should look similar to our previous example of starting
+    up a dispatcher, except this one uses the main, installed Twisted reactor,
+    rather than a threaded instance.  It creates the database, configures
+    zc.async, and starts the reactor.
+
 ::
 
+    import random
+    import math
+
     import ZEO.ClientStorage
     import ZODB
     import twisted.internet.reactor
 
-    twisted.internet.reactor.start()
+    import zc.async.configure
 
-    def generateSample():
-        pass XXX
+    def generate_sample(size=100000):
+        count = 0
+        for i in range(size):
+            if math.hypot(random.random(), random.random()) < 1:
+                count += 1
+        return count, size
 
-    def processSamples(*sample_jobs):
-        pass XXX
+    def process_samples(*sample_jobs):
+        count = 0 
+        size = 0
+        for j in sample_jobs:
+            count += j.result[0]
+            size += j.result[1]
+        return 4.0 * count / size
 
     if __name__ == '__main__':
         storage = ZEO.ClientStorage.ClientStorage(
             ('127.0.0.1', 9999))
         db = ZODB.DB(storage)
-    
-        import zc.async.configure
         zc.async.configure.base()
-    
         zc.async.configure.start(
             db, poll_interval=1, twisted=True)
+        twisted.internet.reactor.run()
 
+.. We'll need these defined when we run this as a test.
+
+    >>> import random
+    >>> import math
+
+    >>> def generate_sample(size=100000):
+    ...     count = 0
+    ...     for i in range(size):
+    ...         if math.hypot(random.random(), random.random()) < 1:
+    ...             count += 1
+    ...     return count, size
+    ... 
+    >>> def process_samples(*sample_jobs):
+    ...     count = 0 
+    ...     size = 0
+    ...     for j in sample_jobs:
+    ...         count += j.result[0]
+    ...         size += j.result[1]
+    ...     return 4.0 * count / size
+    ... 
+    >>> class StubModule(object):
+    ...     pass
+    ...
+    >>> pi = StubModule()
+    >>> pi.generate_sample = generate_sample
+    >>> pi.process_samples = process_samples
+
+.. _`this one`: http://math.fullerton.edu/mathews/n2003/MonteCarloPiMod.html
+
+--------------------------------
+Our First Monte Carlo Experiment
+--------------------------------
+
+We'll need the ZEO server running.  If you've gone through this file from the
+start, it should still be running.  If not, use this::
+
+    $ ./bin/runzeo -a 9999 -f test.fs &
+
+Now, for our first experiment with the Monte Carlo simulation, we'll start a
+single worker process.
+
+Enter this in the terminal::
+
+    $ ./bin/python lib/python2.5/site-packages/pi.py &
+
+That will start our worker process.  Now let's start an interpreter.
+
+::
+
+    $ ./bin/python
+
+Now get a database and a connection, as we've seen before.  We'll also set up
+the base zc.async configuration.
+
+::
+
+    >>  import ZEO.ClientStorage
+    >>  import ZODB
+    >>  storage = ZEO.ClientStorage.ClientStorage(
+    ...     ('127.0.0.1', 9999))
+    >>  db = ZODB.DB(storage)
+    >>  conn = db.open()
+    >>  import zc.async.configure
+    >>  zc.async.configure.base()
+
+We don't have any adapters installed, so ``zc.async.interfaces.IQueue(conn)``
+won't work.  This will though, and still looks pretty good::
+
+    >>> import zc.async.queue
+    >>> q = zc.async.queue.getDefaultQueue(conn)
+
+Now we can start some jobs in parallel.
+
+::
+
+    >>  import pi
+    >>> import zc.async.job
+    >>> j = q.put(zc.async.job.parallel(
+    ...     pi.generate_sample, pi.generate_sample, pi.generate_sample,
+    ...     postprocess=pi.process_samples))
+    >>> import transaction
+    >>> transaction.commit()
+
+Wait a few seconds.  If the result is empty (None), begin the transaction again
+and check the result again.  Eventually, these next two lines should give you a
+similar result--an approximation of pi.
+
+.. This lets us "wait a second".
+
+    >>> zc.async.testing.wait_for_result(j) # doctest: +ELLIPSIS
+    3.1...
+
+..
+
+::
+
+    >>  _ = transaction.begin()
+    >>  j.result
+    3.1386666666666665
+
+Cool.
+
+--------
+Closures
+--------
+
+Sometimes, you want to pass arguments to your functions.  In this case, what if
+you want to pass a different ``size`` argument to ``generate_sample``?
+
+You can always pass a Job directly to the queue (or to the ``parallel`` helper
+function, in this case).  The job accepts arguments similarly to the Python 2.5
+``functools.partial``: Job(func, \*args, \*\*keywords).  This instantiates a new
+callable (a Job) with partial application of the given arguments and keywords.
+
+Let's try it.
+
+    >>> j = q.put(zc.async.job.parallel(
+    ...     zc.async.job.Job(pi.generate_sample, 1000000),
+    ...     zc.async.job.Job(pi.generate_sample, size=1000000),
+    ...     postprocess=pi.process_samples))
+    >>> transaction.commit()
+
+Wait a bit, again.  Retry these two lines until you get a result.  It should
+be well under a minute on most machines.
+
+.. This lets us "wait a second".
+
+    >>> zc.async.testing.wait_for_result(j, seconds=20) # doctest: +ELLIPSIS
+    3.1...
+
+..
+
+::
+
+    >>  _ = transaction.begin()
+    >>  j.result
+    3.1434359999999999
+
+Cool.
+
+-------------
+Configuration
+-------------
+
+Unfortunately, the parallel runs we've done so far would actually make the
+calculation go slower than if we did it in a single function call!  That's
+because we ran it in a single Python worker process, and the overhead of
+threads just makes the jobs a bit less efficient.  Threads help for some uses
+of |async|, but not this one.
+
+Let's assume you are running these experiments on a machine with two processor
+cores.  We should actually start two worker processes then, one for each core.
+We'll need to make sure each worker process has its own OID.
+
+Moreover, we need to configure each process to only take one
+``generate_sample`` job at a time.  Let's adjust our code to do that.
+
+Agents
+------
+
+A worker process has a central polling activity, called a ``dispatcher``.
+Dispatchers look in the database to ask their ``agent`` (or agents; think of it
+as a "`talent agent`_" or a "booking agent") to determine what they should get
+their threads to do.
+
+By default using the zc.async.configure helpers, each dispatcher is given a
+single agent that will choose the first job in the queue, and that wants to run
+no more than three jobs at a time.
+
+.. _`talent agent`: http://en.wikipedia.org/wiki/Talent_agent
+
 XXX
 ===
 
-Next need to discuss that callables must be picklable, so we need to switch
-from the interpreter to the filesystem.
-
 Talk about callbacks, and how that lets you respond to results.
 
 Talk briefly about failures, show the exceptions, and briefly mention logging

Modified: zc.async/trunk/src/zc/async/TODO.txt
===================================================================
--- zc.async/trunk/src/zc/async/TODO.txt	2008-08-16 18:29:19 UTC (rev 89916)
+++ zc.async/trunk/src/zc/async/TODO.txt	2008-08-17 04:08:16 UTC (rev 89917)
@@ -2,6 +2,8 @@
 - finish quickstart
 - write a zc.buildout quickstart
 - write a grok quickstart
+- write test for using installed twisted reactor
+- write test for interrupt retry of parallel/serial
 
 Improvements
 

Modified: zc.async/trunk/src/zc/async/dispatcher.py
===================================================================
--- zc.async/trunk/src/zc/async/dispatcher.py	2008-08-16 18:29:19 UTC (rev 89916)
+++ zc.async/trunk/src/zc/async/dispatcher.py	2008-08-17 04:08:16 UTC (rev 89917)
@@ -438,19 +438,21 @@
             self.reactor.callLater(self.poll_interval, self.directPoll)
 
     def _inThreadPoll(self, deferred):
+        self.conn = self.db.open()
         try:
             self.poll()
         finally:
+            self.conn.close()
             self.reactor.callFromThread(deferred.callback, None)
 
     def threadedPoll(self):
         if not self.activated:
             return
         deferred = twisted.internet.defer.Deferred()
-        self.reactor.callInThread(self._inThreadPoll, deferred)
         deferred.addCallback(
             lambda result: self.reactor.callLater(
                 self.poll_interval, self.threadedPoll))
+        self.reactor.callInThread(self._inThreadPoll, deferred)
 
     def activate(self, threaded=False):
         if self.activated:
@@ -463,8 +465,9 @@
         self.jobs.clear()
         # increase pool size to account for the dispatcher poll
         self.db.setPoolSize(self.db.getPoolSize() + 1)
-        self.conn = self.db.open() # we keep the same connection for all
-        # polls as an optimization
+        if not threaded:
+            self.conn = self.db.open() # we keep the same connection for all
+            # polls as an optimization
         if threaded:
             self.reactor.callWhenRunning(self.threadedPoll)
         else:

Modified: zc.async/trunk/src/zc/async/instanceuuid.py
===================================================================
--- zc.async/trunk/src/zc/async/instanceuuid.py	2008-08-16 18:29:19 UTC (rev 89916)
+++ zc.async/trunk/src/zc/async/instanceuuid.py	2008-08-17 04:08:16 UTC (rev 89917)
@@ -27,10 +27,14 @@
 database to do work.
 
 In order to decide where to look for this file (or to create it, if
-necessary), the module looks in ``os.environ['ZC_ASYNC_UUID']`` for a file
-name.  If you are using Zope 3, you can set this in a zdaemon environment
-section of your zdaemon.conf.
+necessary), the module looks in ``os.environ['ZC_ASYNC_UUID']`` for a
+file name.
 
+If you are using zdaemon (http://pypi.python.org/pypi/zdaemon) to
+daemonize your process, you can set this in a zdaemon environment section
+of your zdaemon.conf. Supervisor (http://supervisord.org/) also provides
+this functionality. Other similar tools probably do as well.
+
 If the ``ZC_ASYNC_UUID`` is not found in the environment, it will use
 ``os.path.join(os.getgwd(), 'uuid.txt')`` as the file name.
 

Modified: zc.async/trunk/src/zc/async/job.py
===================================================================
--- zc.async/trunk/src/zc/async/job.py	2008-08-16 18:29:19 UTC (rev 89916)
+++ zc.async/trunk/src/zc/async/job.py	2008-08-17 04:08:16 UTC (rev 89917)
@@ -985,8 +985,10 @@
         next.addCallback(Job(_queue_next, main_job, ix+1))
     else:
         postprocess = main_job.kwargs['postprocess']
-        postprocess.args.extend(jobs)
-        queue.put(postprocess)
+        if postprocess.status == zc.async.interfaces.NEW:
+            # will not be NEW if this is a retry
+            postprocess.args.extend(jobs)
+            queue.put(postprocess)
 
 def _schedule_serial(*jobs, **kw):
     _queue_next(zc.async.local.getJob())
@@ -1010,8 +1012,10 @@
             complete = False
     if complete:
         postprocess = main_job.kwargs['postprocess']
-        postprocess.args.extend(jobs)
-        queue.put(postprocess)
+        if postprocess.status == zc.async.interfaces.NEW:
+            # will not be NEW if this is a retry
+            postprocess.args.extend(jobs)
+            queue.put(postprocess)
 
 def _schedule_parallel(*jobs, **kw):
     _queue_all(zc.async.local.getJob())



More information about the Checkins mailing list