[Checkins] SVN: zc.async/trunk/src/zc/async/QUICKSTART_1_VIRTUALENV.txt Sketching out rest of this quickstart. Need to verify new stuff, complete XXX.

Gary Poster gary at modernsongs.com
Wed Aug 20 22:11:22 EDT 2008


Log message for revision 90047:
  Sketching out rest of this quickstart.  Need to verify new stuff, complete XXX.
  
  

Changed:
  U   zc.async/trunk/src/zc/async/QUICKSTART_1_VIRTUALENV.txt

-=-
Modified: zc.async/trunk/src/zc/async/QUICKSTART_1_VIRTUALENV.txt
===================================================================
--- zc.async/trunk/src/zc/async/QUICKSTART_1_VIRTUALENV.txt	2008-08-21 02:00:19 UTC (rev 90046)
+++ zc.async/trunk/src/zc/async/QUICKSTART_1_VIRTUALENV.txt	2008-08-21 02:11:22 UTC (rev 90047)
@@ -142,11 +142,11 @@
 
 ::
 
-    >>  import ZEO.ClientStorage
-    >>  import ZODB
-    >>  storage = ZEO.ClientStorage.ClientStorage(
-    ...     ('127.0.0.1', 9999))
-    >>  db = ZODB.DB(storage)
+    import ZEO.ClientStorage
+    import ZODB
+    storage = ZEO.ClientStorage.ClientStorage(
+        ('127.0.0.1', 9999))
+    db = ZODB.DB(storage)
 
 .. When run as a doctest, this uses a simple FileStorage, rather than a
    ClientStorage.
@@ -254,37 +254,52 @@
     >>> j.status
     u'completed-status'
 
------------
-Another Job
------------
+--------------------------
+Another Job (And Closures)
+--------------------------
 
-You can also make closures by passing in the job class explicitly.  Generating
-RSA keys is actually a reasonable real-world use case for something like this.
+You can also make closures.  The Job class accepts arguments similarly to
+the Python 2.5 :func:`functools.partial`: ``Job(func, \*args,
+\*\*keywords)``. This instantiates a new callable (a Job instance) with
+partial application of the given arguments and keywords.  You can then
+pass the job instance to the
+:meth:`~zc.async.interfaces.IQueue.put` method.
 
+Generating RSA keys is actually a reasonable real-world use case for
+something like this.
+
 ::
 
-    >>  import subprocess
-    >>  j = q.put(zc.async.job.Job(
-    ...     subprocess.call,
-    ...     ['openssl', 'genrsa', '-out',
-    ...      'key.pem', '1024']))
-    >>  transaction.commit()
+    import subprocess
+    j = q.put(zc.async.job.Job(
+        subprocess.call,
+        ['openssl', 'genrsa', '-out',
+         'key.pem', '1024']))
+    transaction.commit()
 
-We need to begin the transaction to see the result--which in this case is
-simply ``0``, indicating a successful UNIX process.
+We need to begin the transaction to see the result...
 
 ::
 
-    >>  j.result
-    >>  _ = transaction.begin()
-    >>  j.result
+    j.result
+    _ = transaction.begin()
+    j.result
+
+...which in this case is simply ``0``, indicating a successful UNIX
+process.
+
+::
+
     0
 
 We can open the file to show the result.
 
 ::
 
-    >>  subprocess.call(['cat', 'key.pem'])
+    subprocess.call(['cat', 'key.pem'])
+
+This will show you the key, which should look something like this::
+
     -----BEGIN RSA PRIVATE KEY-----
     MIICXgIBAAKBgQCYAZW+HjDGJhRHnUlZZWqhrGOxU2K/RhssmcMs0JLnWI2cWmZ+
     ...
@@ -445,14 +460,15 @@
     |async|.
     
     Once the samples are all done, we'll reduce the results with
-    ``process_samples``.  It will return an approximation of pi, with accuracy
-    determined by the total size of the aggregated samples, assuming even
-    distribution of the random numbers.  As you'll see soon, we'll be using a
-    |async| convenience function for parallel jobs that gives all of the
-    completed jobs that have been running in parallel to this, the
-    postprocessing call. Therefore, ``process_samples`` gets the ``result`` of
-    ``generate_sample`` off of each job. Other than that, this function is
-    ignorant of |async|.
+    ``process_samples``.  It will return an approximation of pi, with
+    accuracy determined by the total size of the aggregated samples,
+    assuming even distribution of the random numbers.  As you'll see
+    soon, we'll be using a |async| convenience function for
+    :func:`~zc.async.job.parallel` jobs that gives all of the completed
+    jobs that have been running in parallel to this, the postprocessing
+    call. Therefore, ``process_samples`` gets the ``result`` of
+    ``generate_sample`` off of each job. Other than that, this function
+    is ignorant of |async|.
     
     The last code block should look similar to our previous example of starting
     up a dispatcher, except this one uses the main, installed Twisted reactor,
@@ -498,6 +514,8 @@
 
     >>> import random
     >>> import math
+    >>> import types
+    >>> import sys
 
     >>> def generate_sample(size=100000):
     ...     count = 0
@@ -513,13 +531,11 @@
     ...         count += j.result[0]
     ...         size += j.result[1]
     ...     return 4.0 * count / size
-    ... 
-    >>> class StubModule(object):
-    ...     pass
     ...
-    >>> pi = StubModule()
-    >>> pi.generate_sample = generate_sample
-    >>> pi.process_samples = process_samples
+    >>> _pi = types.ModuleType('pi')
+    >>> sys.modules['pi'] = _pi
+    >>> _pi.generate_sample = generate_sample
+    >>> _pi.process_samples = process_samples
 
 .. _`this one`: http://math.fullerton.edu/mathews/n2003/MonteCarloPiMod.html
 
@@ -550,14 +566,14 @@
 
 ::
 
-    >>  import ZEO.ClientStorage
-    >>  import ZODB
-    >>  storage = ZEO.ClientStorage.ClientStorage(
-    ...     ('127.0.0.1', 9999))
-    >>  db = ZODB.DB(storage)
-    >>  conn = db.open()
-    >>  import zc.async.configure
-    >>  zc.async.configure.base()
+    import ZEO.ClientStorage
+    import ZODB
+    storage = ZEO.ClientStorage.ClientStorage(
+        ('127.0.0.1', 9999))
+    db = ZODB.DB(storage)
+    conn = db.open()
+    import zc.async.configure
+    zc.async.configure.base()
 
 We don't have any adapters installed, so ``zc.async.interfaces.IQueue(conn)``
 won't work.  This will though, and still looks pretty good::
@@ -565,11 +581,11 @@
     >>> import zc.async.queue
     >>> q = zc.async.queue.getDefaultQueue(conn)
 
-Now we can start some jobs in parallel.
+Now we can start some jobs in :func:`~zc.async.job.parallel`.
 
 ::
 
-    >>  import pi
+    >>> import pi
     >>> import zc.async.job
     >>> j = q.put(zc.async.job.parallel(
     ...     pi.generate_sample, pi.generate_sample, pi.generate_sample,
@@ -579,7 +595,7 @@
 
 Wait a few seconds.  If the result is empty (None), begin the transaction again
 and check the result again.  Eventually, these next two lines should give you a
-similar result--an approximation of pi.
+result: an approximation of pi.
 
 .. This lets us "wait a second".
 
@@ -590,24 +606,18 @@
 
 ::
 
-    >>  _ = transaction.begin()
-    >>  j.result
-    3.1386666666666665
+    _ = transaction.begin()
+    j.result
 
-Cool.
+For one run, I got ``3.1386666666666665``.  Cool.
 
 --------
 Closures
 --------
 
-Sometimes, you want to pass arguments to your functions.  In this case, what if
-you want to pass a different ``size`` argument to ``generate_sample``?
+We've already seen Jobs used as closures in the openssl example.  You can also
+use them to pass a different ``size`` argument to ``generate_sample``.
 
-You can always pass a Job directly to the queue (or to the ``parallel`` helper
-function, in this case).  The job accepts arguments similarly to the Python 2.5
-``functools.partial``: Job(func, \*args, \*\*keywords).  This instantiates a new
-callable (a Job) with partial application of the given arguments and keywords.
-
 Let's try it.
 
     >>> j = q.put(zc.async.job.parallel(
@@ -628,11 +638,10 @@
 
 ::
 
-    >>  _ = transaction.begin()
-    >>  j.result
-    3.1434359999999999
+    _ = transaction.begin()
+    j.result
 
-Cool.
+My run got ``3.1434359999999999``.  Cool.
 
 -------------
 Configuration
@@ -654,31 +663,282 @@
 Agents
 ------
 
-A worker process has a central polling activity, called a ``dispatcher``.
-Dispatchers look in the database to ask their ``agent`` (or agents; think of it
-as a "`talent agent`_" or a "booking agent") to determine what they should get
-their threads to do.
+A worker process regularly polls the database for new jobs.  The software
+component that polls is called a ``dispatcher``. Dispatchers look in the
+database to ask their personal ``agent`` (or agents) to determine what
+they should get their threads to do.  Think of the ``agent`` as a
+"`talent agent`_" or a "booking agent" for the process.
 
-By default using the zc.async.configure helpers, each dispatcher is given a
+By default, using the zc.async.configure helpers, each dispatcher is given a
 single agent that will choose the first job in the queue, and that wants to run
 no more than three jobs at a time.
 
+For our Monte Carlo job, we'll give each process an additional agent. The
+new agent will only accept up to one instance of a ``generate_sample``
+job at a time. 
+
+We will also reconfigure the existing agent to accept up to three of
+anything *except* ``generate_sample``.
+
+We'll do that in our file.  Here's the revised version.  The only change is
+in the imports and in the ``if __name__ == '__main__':`` block.
+
+.. sidebar:: Code Walkthrough for Changes
+
+   We import three new modules from |async|, :mod:zc.async.queue,
+   :mod:zc.async.instanceuuid, and :mod:zc.async.agent.  The ``agent``
+   implementation uses a function called a
+   :attr:`~zc.async.agent.Agent.chooser` to determine its
+   policy for choosing agents.  We define two chooser functions, one for
+   each agent: ``choose_generate_sample`` and ``choose_another``.  Then
+   we set up the old agent to use ``choose_another``, and create a new
+   agent of ``size=1`` with the ``choose_generate_sample`` chooser. 
+   That's it.
+   
+   It's important to note that the references to these functions are
+   persistent, and by name.  If you change the location or name of these
+   functions, you will need to keep the old names and locations around,
+   at least for long enough to switch your agents to use the new names. 
+   This is a general pattern for module globals--functions and
+   classes--to which the ZODB has direct references or instances.
+   
+   We could have accomplished the same basic policy changes with several
+   different agent configurations. For instance, we could have written a
+   chooser that looked through its agents's current jobs to verify that
+   it did not have another ``generate_sample``.
+   
+   The only trick to this kind of agent configuration is that you always
+   want to have at least some catch-all dispatchers who are willing to do
+   just about any jobs (what our ``choose_another`` choose accomplishes).
+   This supports jobs that the |async| system sometimes needs to run for
+   exceptional circumstances.
+
+::
+
+    import random
+    import math
+
+    import ZEO.ClientStorage
+    import ZODB
+    import transaction
+    import twisted.internet.reactor
+
+    import zc.async.configure
+    import zc.async.queue
+    import zc.async.instanceuuid
+    import zc.async.agent
+
+    def generate_sample(size=100000):
+        count = 0
+        for i in range(size):
+            if math.hypot(random.random(), random.random()) < 1:
+                count += 1
+        return count, size
+
+    def process_samples(*sample_jobs):
+        count = 0 
+        size = 0
+        for j in sample_jobs:
+            count += j.result[0]
+            size += j.result[1]
+        return 4.0 * count / size
+
+    def choose_generate_sample(agent):
+        return return agent.queue.claim(
+            lambda j: j.callable == generate_sample)
+
+    def choose_another(agent):
+        return return agent.queue.claim(
+            lambda j: j.callable != generate_sample)
+
+    if __name__ == '__main__':
+        storage = ZEO.ClientStorage.ClientStorage(
+            ('127.0.0.1', 9999))
+        db = ZODB.DB(storage)
+        zc.async.configure.base()
+        zc.async.configure.start(
+            db, poll_interval=1, twisted=True)
+        conn = db.open()
+        q = zc.async.queue.getDefaultQueue(conn)
+        dispatcher = q.dispatchers[zc.async.instanceuuid.UUID]
+        if 'generate_sample' not in dispatcher:
+            agent = dispatcher['main']
+            agent.chooser = choose_another
+            dispatcher['generate_sample'] = zc.async.agent.Agent(
+                choose_generate_sample, 1)
+            transaction.commit()
+        conn.close()
+        twisted.internet.reactor.run()
+
 .. _`talent agent`: http://en.wikipedia.org/wiki/Talent_agent
 
-XXX
-===
+-------------
+Demonstration
+-------------
 
-Talk about callbacks, and how that lets you respond to results.
+Now let's start up our workers, and see how they work.  We're going to
+have two workers now, and they will each need separate UUIDs.  A really
+simple approach will be to make two separate working directories for the
+two worker processes.  (We also could use the environmental variable,
+``ZC_ASYNC_UUID``, described in the `Process UUIDs`_ section above.)
 
-Talk briefly about failures, show the exceptions, and briefly mention logging
-and debugging.
+::
 
-Start up multiple processes with dispatchers.
+    $ mkdir worker1
+    $ mv uuid.txt worker1
+    $ cd worker1
+    $ ../bin/python ../lib/python2.5/site-packages/pi.py &
+    $ cd ..
+    $ mkdir worker2
+    $ cd worker2
+    $ ../bin/python ../lib/python2.5/site-packages/pi.py &
 
-Close by referring to production instances needing something like zdaemon
-or supervisor; and to preferring the more declarative zc.buildout style for
-production...which we'll show in our next quickstart! ;-)
+Now we'll start the Python process in which we will test our code.  We'll move
+to the main directory, but as long as we don't start another worker, it doesn't
+really matter.
 
+::
+
+    $ cd ..
+    $ ./bin/python
+
+And now, our test.
+
+::
+
+    import ZEO.ClientStorage
+    import ZODB
+    storage = ZEO.ClientStorage.ClientStorage(
+        ('127.0.0.1', 9999))
+    db = ZODB.DB(storage)
+    conn = db.open()
+    import zc.async.configure
+    zc.async.configure.base()
+    import pi
+    import zc.async.job
+    j = q.put(zc.async.job.parallel(
+        zc.async.job.Job(pi.generate_sample, 500000),
+        zc.async.job.Job(pi.generate_sample, size=500000),
+        postprocess=pi.process_samples))
+    import transaction
+    transaction.commit()
+
+Wait a few seconds.  If the result is empty (None), begin the transaction again
+and check the result again.  Eventually, these next two lines should give you a
+result: an approximation of pi.
+
+::
+
+    _ = transaction.begin()
+    j.result
+
+Just to prove to ourselves that we saved some time, let's do a comparison test:
+the same number of samples, but not in parallel.
+
+::
+
+    j2 = q.put(zc.async.job.parallel(
+        zc.async.job.Job(pi.generate_sample, 1000000),
+        postprocess=pi.process_samples))
+    transaction.commit()
+    _ = transaction.begin()
+    j2.result
+
+Once both jobs are complete, compare their run-time.
+
+::
+
+    j.active_end - j.active_start
+    j2.active_end - j2.active_start
+
+Even in this simple, short example, we ran XXX faster.
+
+Other Configuration
+-------------------
+
+We're at the end of this quickstart.  To close, here's a quick survey of some
+other configuration opportunities available that we haven't seen here.
+
+- Callbacks are a very important per-job configuration.  You can add them to
+  a job, to be run unconditionally, conditionally if the result is an
+  instance of a ``twisted.python.failure.Failure``, or conditionally if
+  the result is not a ``Failure``.  See
+  :meth:`~zc.async.interfaces.IJob.addCallback`,
+  :meth:`~zc.async.interfaces.IJob.addCallbacks`, and
+  :attr:`~zc.async.interfaces.IJob.callbacks`
+
+.. note::
+
+   Unlike Twisted callbacks, all callbacks for the same job get the same
+   result; if you would like to chain results, the callbacks themselves
+   are Jobs, so attach a callback to your callback.
+
+- You can request that a job :attr:`~zc.async.interfaces.IJob.begin_after`
+  a given timezone-aware datetime.  If not given, this defaults to now, for the
+  purposes of calculating the effective
+  :attr:`~zc.async.interfaces.IJob.begin_by` datetime, described below.
+
+- You can specify that a job should :attr:`~zc.async.interfaces.IJob.begin_by`
+  a given duration (datetime.timedelta) *after* the jobs's
+  :attr:`~zc.async.interfaces.IJob.begin_after` value.  When the queue
+  :gets
+  ready to offer the job for an agent to choose, if the effective
+  ``begin_by`` value has passed, the queue will instead offer a call to
+  the job's
+  :meth:`~zc.async.interfaces.IJob.fail` method.
+
+.. note::
+
+   There is no built-in way to stop a running job, short of stopping the
+   process.  This can be approximated by use in your job of
+   :func:`~zc.async.local.getLiveAnnotation` to poll for stop requests; or the
+   brave can write some C to use PyThreadState_SetAsyncExc_.
+
+- You can set up quotas for certain arbitrary quota names that you define.
+  This is a limit: no more than the given quota can run at once, total,
+  across all workers.  This can let you decrease the chance of conflict
+  errors for long-running jobs that write to the same data structures.  See
+  :attr:`~zc.async.interfaces.IQueue.quotas`,
+  :class:`~zc.async.interfaces.IQuotas`, and
+  :attr:`~zc.async.interfaces.IJob.quota_names`.
+
+- Retry policies determine how jobs should be retried if they raise an
+  uncaught exception while running, if their commit raises an error, or if
+  they are interrupted.  They can be configured per job, or as defaults for
+  callback and non-callback jobs.  See
+  :class:`~zc.async.interfaces.IRetryPolicy`,
+  :attr:`~zc.async.interfaces.IJob.retry_policy_factory`,
+  :meth:`~zc.async.interfaces.IJob.getRetryPolicy`; the default retry policies
+  :class:`~zc.async.job.RetryCommonFourTimes`,
+  :class:`~zc.async.job.RetryCommonForever` and
+  :class:`~zc.async.job.NeverRetry`; and the ``retry_policy_factory`` argument
+  to :meth:`~zc.async.interfaces.IJob.addCallback`,
+  :meth:`~zc.async.interfaces.IJob.addCallbacks`, and
+  :meth:`~zc.async.interfaces.IQueue.put`.
+
+- The :attr:`~zc.async.interfaces.IJob.failure_log_level` determines at what
+  level Failure results for a given job should be logged.  This is usually
+  logging.ERROR, and logging.CRITICAL for callbacks.  This can be set directly
+  on the job, or applied via the ``failure_log_level`` argument to
+  :meth:`~zc.async.interfaces.IJob.addCallback`,
+  :meth:`~zc.async.interfaces.IJob.addCallbacks`, and
+  :meth:`~zc.async.interfaces.IQueue.put`.
+
+- Custom job subclasses can take advantage of
+  :meth:`~zc.async.interfaces.IJob.setUp` and
+  :meth:`~zc.async.interfaces.IJob.tearDown` hooks to set up and tear down
+  state.  An example is the Zope 3-specific :class:`~zc.async.z3.Job`.
+
+- A custom queue might support broadcasting jobs across dispatchers, or
+  targeting specific dispatchers.  These features may be developed for |async|
+  itself at a later date.
+
+There are many other topics to discuss--logging, testing, debugging, Zope 3
+integration, and monitoring, for instance--but this is a quick start, so we'll
+end here.
+
+.. _PyThreadState_SetAsyncExc: http://docs.python.org/api/threads.html
+
 .. Now we are going to stop the reactor.
 
     >>> import zc.async.dispatcher



More information about the Checkins mailing list