[Checkins] SVN: zc.async/trunk/src/zc/async/QUICKSTART_1_VIRTUALENV.txt Sketching out rest of this quickstart. Need to verify new stuff, complete XXX.
Gary Poster
gary at modernsongs.com
Wed Aug 20 22:11:22 EDT 2008
Log message for revision 90047:
Sketching out rest of this quickstart. Need to verify new stuff, complete XXX.
Changed:
U zc.async/trunk/src/zc/async/QUICKSTART_1_VIRTUALENV.txt
-=-
Modified: zc.async/trunk/src/zc/async/QUICKSTART_1_VIRTUALENV.txt
===================================================================
--- zc.async/trunk/src/zc/async/QUICKSTART_1_VIRTUALENV.txt 2008-08-21 02:00:19 UTC (rev 90046)
+++ zc.async/trunk/src/zc/async/QUICKSTART_1_VIRTUALENV.txt 2008-08-21 02:11:22 UTC (rev 90047)
@@ -142,11 +142,11 @@
::
- >> import ZEO.ClientStorage
- >> import ZODB
- >> storage = ZEO.ClientStorage.ClientStorage(
- ... ('127.0.0.1', 9999))
- >> db = ZODB.DB(storage)
+ import ZEO.ClientStorage
+ import ZODB
+ storage = ZEO.ClientStorage.ClientStorage(
+ ('127.0.0.1', 9999))
+ db = ZODB.DB(storage)
.. When run as a doctest, this uses a simple FileStorage, rather than a
ClientStorage.
@@ -254,37 +254,52 @@
>>> j.status
u'completed-status'
------------
-Another Job
------------
+--------------------------
+Another Job (And Closures)
+--------------------------
-You can also make closures by passing in the job class explicitly. Generating
-RSA keys is actually a reasonable real-world use case for something like this.
+You can also make closures. The Job class accepts arguments similarly to
+the Python 2.5 :func:`functools.partial`: ``Job(func, \*args,
+\*\*keywords)``. This instantiates a new callable (a Job instance) with
+partial application of the given arguments and keywords. You can then
+pass the job instance to the
+:meth:`~zc.async.interfaces.IQueue.put` method.
+Generating RSA keys is actually a reasonable real-world use case for
+something like this.
+
::
- >> import subprocess
- >> j = q.put(zc.async.job.Job(
- ... subprocess.call,
- ... ['openssl', 'genrsa', '-out',
- ... 'key.pem', '1024']))
- >> transaction.commit()
+ import subprocess
+ j = q.put(zc.async.job.Job(
+ subprocess.call,
+ ['openssl', 'genrsa', '-out',
+ 'key.pem', '1024']))
+ transaction.commit()
-We need to begin the transaction to see the result--which in this case is
-simply ``0``, indicating a successful UNIX process.
+We need to begin the transaction to see the result...
::
- >> j.result
- >> _ = transaction.begin()
- >> j.result
+ j.result
+ _ = transaction.begin()
+ j.result
+
+...which in this case is simply ``0``, indicating a successful UNIX
+process.
+
+::
+
0
We can open the file to show the result.
::
- >> subprocess.call(['cat', 'key.pem'])
+ subprocess.call(['cat', 'key.pem'])
+
+This will show you the key, which should look something like this::
+
-----BEGIN RSA PRIVATE KEY-----
MIICXgIBAAKBgQCYAZW+HjDGJhRHnUlZZWqhrGOxU2K/RhssmcMs0JLnWI2cWmZ+
...
@@ -445,14 +460,15 @@
|async|.
Once the samples are all done, we'll reduce the results with
- ``process_samples``. It will return an approximation of pi, with accuracy
- determined by the total size of the aggregated samples, assuming even
- distribution of the random numbers. As you'll see soon, we'll be using a
- |async| convenience function for parallel jobs that gives all of the
- completed jobs that have been running in parallel to this, the
- postprocessing call. Therefore, ``process_samples`` gets the ``result`` of
- ``generate_sample`` off of each job. Other than that, this function is
- ignorant of |async|.
+ ``process_samples``. It will return an approximation of pi, with
+ accuracy determined by the total size of the aggregated samples,
+ assuming even distribution of the random numbers. As you'll see
+ soon, we'll be using a |async| convenience function for
+ :func:`~zc.async.job.parallel` jobs that gives all of the completed
+ jobs that have been running in parallel to this, the postprocessing
+ call. Therefore, ``process_samples`` gets the ``result`` of
+ ``generate_sample`` off of each job. Other than that, this function
+ is ignorant of |async|.
The last code block should look similar to our previous example of starting
up a dispatcher, except this one uses the main, installed Twisted reactor,
@@ -498,6 +514,8 @@
>>> import random
>>> import math
+ >>> import types
+ >>> import sys
>>> def generate_sample(size=100000):
... count = 0
@@ -513,13 +531,11 @@
... count += j.result[0]
... size += j.result[1]
... return 4.0 * count / size
- ...
- >>> class StubModule(object):
- ... pass
...
- >>> pi = StubModule()
- >>> pi.generate_sample = generate_sample
- >>> pi.process_samples = process_samples
+ >>> _pi = types.ModuleType('pi')
+ >>> sys.modules['pi'] = _pi
+ >>> _pi.generate_sample = generate_sample
+ >>> _pi.process_samples = process_samples
.. _`this one`: http://math.fullerton.edu/mathews/n2003/MonteCarloPiMod.html
@@ -550,14 +566,14 @@
::
- >> import ZEO.ClientStorage
- >> import ZODB
- >> storage = ZEO.ClientStorage.ClientStorage(
- ... ('127.0.0.1', 9999))
- >> db = ZODB.DB(storage)
- >> conn = db.open()
- >> import zc.async.configure
- >> zc.async.configure.base()
+ import ZEO.ClientStorage
+ import ZODB
+ storage = ZEO.ClientStorage.ClientStorage(
+ ('127.0.0.1', 9999))
+ db = ZODB.DB(storage)
+ conn = db.open()
+ import zc.async.configure
+ zc.async.configure.base()
We don't have any adapters installed, so ``zc.async.interfaces.IQueue(conn)``
won't work. This will though, and still looks pretty good::
@@ -565,11 +581,11 @@
>>> import zc.async.queue
>>> q = zc.async.queue.getDefaultQueue(conn)
-Now we can start some jobs in parallel.
+Now we can start some jobs in :func:`~zc.async.job.parallel`.
::
- >> import pi
+ >>> import pi
>>> import zc.async.job
>>> j = q.put(zc.async.job.parallel(
... pi.generate_sample, pi.generate_sample, pi.generate_sample,
@@ -579,7 +595,7 @@
Wait a few seconds. If the result is empty (None), begin the transaction again
and check the result again. Eventually, these next two lines should give you a
-similar result--an approximation of pi.
+result: an approximation of pi.
.. This lets us "wait a second".
@@ -590,24 +606,18 @@
::
- >> _ = transaction.begin()
- >> j.result
- 3.1386666666666665
+ _ = transaction.begin()
+ j.result
-Cool.
+For one run, I got ``3.1386666666666665``. Cool.
--------
Closures
--------
-Sometimes, you want to pass arguments to your functions. In this case, what if
-you want to pass a different ``size`` argument to ``generate_sample``?
+We've already seen Jobs used as closures in the openssl example. You can also
+use them to pass a different ``size`` argument to ``generate_sample``.
-You can always pass a Job directly to the queue (or to the ``parallel`` helper
-function, in this case). The job accepts arguments similarly to the Python 2.5
-``functools.partial``: Job(func, \*args, \*\*keywords). This instantiates a new
-callable (a Job) with partial application of the given arguments and keywords.
-
Let's try it.
>>> j = q.put(zc.async.job.parallel(
@@ -628,11 +638,10 @@
::
- >> _ = transaction.begin()
- >> j.result
- 3.1434359999999999
+ _ = transaction.begin()
+ j.result
-Cool.
+My run got ``3.1434359999999999``. Cool.
-------------
Configuration
@@ -654,31 +663,282 @@
Agents
------
-A worker process has a central polling activity, called a ``dispatcher``.
-Dispatchers look in the database to ask their ``agent`` (or agents; think of it
-as a "`talent agent`_" or a "booking agent") to determine what they should get
-their threads to do.
+A worker process regularly polls the database for new jobs. The software
+component that polls is called a ``dispatcher``. Dispatchers look in the
+database to ask their personal ``agent`` (or agents) to determine what
+they should get their threads to do. Think of the ``agent`` as a
+"`talent agent`_" or a "booking agent" for the process.
-By default using the zc.async.configure helpers, each dispatcher is given a
+By default, using the zc.async.configure helpers, each dispatcher is given a
single agent that will choose the first job in the queue, and that wants to run
no more than three jobs at a time.
+For our Monte Carlo job, we'll give each process an additional agent. The
+new agent will only accept up to one instance of a ``generate_sample``
+job at a time.
+
+We will also reconfigure the existing agent to accept up to three of
+anything *except* ``generate_sample``.
+
+We'll do that in our file. Here's the revised version. The only change is
+in the imports and in the ``if __name__ == '__main__':`` block.
+
+.. sidebar:: Code Walkthrough for Changes
+
+ We import three new modules from |async|, :mod:zc.async.queue,
+ :mod:zc.async.instanceuuid, and :mod:zc.async.agent. The ``agent``
+ implementation uses a function called a
+ :attr:`~zc.async.agent.Agent.chooser` to determine its
+ policy for choosing agents. We define two chooser functions, one for
+ each agent: ``choose_generate_sample`` and ``choose_another``. Then
+ we set up the old agent to use ``choose_another``, and create a new
+ agent of ``size=1`` with the ``choose_generate_sample`` chooser.
+ That's it.
+
+ It's important to note that the references to these functions are
+ persistent, and by name. If you change the location or name of these
+ functions, you will need to keep the old names and locations around,
+ at least for long enough to switch your agents to use the new names.
+ This is a general pattern for module globals--functions and
+ classes--to which the ZODB has direct references or instances.
+
+ We could have accomplished the same basic policy changes with several
+ different agent configurations. For instance, we could have written a
+ chooser that looked through its agents's current jobs to verify that
+ it did not have another ``generate_sample``.
+
+ The only trick to this kind of agent configuration is that you always
+ want to have at least some catch-all dispatchers who are willing to do
+ just about any jobs (what our ``choose_another`` choose accomplishes).
+ This supports jobs that the |async| system sometimes needs to run for
+ exceptional circumstances.
+
+::
+
+ import random
+ import math
+
+ import ZEO.ClientStorage
+ import ZODB
+ import transaction
+ import twisted.internet.reactor
+
+ import zc.async.configure
+ import zc.async.queue
+ import zc.async.instanceuuid
+ import zc.async.agent
+
+ def generate_sample(size=100000):
+ count = 0
+ for i in range(size):
+ if math.hypot(random.random(), random.random()) < 1:
+ count += 1
+ return count, size
+
+ def process_samples(*sample_jobs):
+ count = 0
+ size = 0
+ for j in sample_jobs:
+ count += j.result[0]
+ size += j.result[1]
+ return 4.0 * count / size
+
+ def choose_generate_sample(agent):
+ return return agent.queue.claim(
+ lambda j: j.callable == generate_sample)
+
+ def choose_another(agent):
+ return return agent.queue.claim(
+ lambda j: j.callable != generate_sample)
+
+ if __name__ == '__main__':
+ storage = ZEO.ClientStorage.ClientStorage(
+ ('127.0.0.1', 9999))
+ db = ZODB.DB(storage)
+ zc.async.configure.base()
+ zc.async.configure.start(
+ db, poll_interval=1, twisted=True)
+ conn = db.open()
+ q = zc.async.queue.getDefaultQueue(conn)
+ dispatcher = q.dispatchers[zc.async.instanceuuid.UUID]
+ if 'generate_sample' not in dispatcher:
+ agent = dispatcher['main']
+ agent.chooser = choose_another
+ dispatcher['generate_sample'] = zc.async.agent.Agent(
+ choose_generate_sample, 1)
+ transaction.commit()
+ conn.close()
+ twisted.internet.reactor.run()
+
.. _`talent agent`: http://en.wikipedia.org/wiki/Talent_agent
-XXX
-===
+-------------
+Demonstration
+-------------
-Talk about callbacks, and how that lets you respond to results.
+Now let's start up our workers, and see how they work. We're going to
+have two workers now, and they will each need separate UUIDs. A really
+simple approach will be to make two separate working directories for the
+two worker processes. (We also could use the environmental variable,
+``ZC_ASYNC_UUID``, described in the `Process UUIDs`_ section above.)
-Talk briefly about failures, show the exceptions, and briefly mention logging
-and debugging.
+::
-Start up multiple processes with dispatchers.
+ $ mkdir worker1
+ $ mv uuid.txt worker1
+ $ cd worker1
+ $ ../bin/python ../lib/python2.5/site-packages/pi.py &
+ $ cd ..
+ $ mkdir worker2
+ $ cd worker2
+ $ ../bin/python ../lib/python2.5/site-packages/pi.py &
-Close by referring to production instances needing something like zdaemon
-or supervisor; and to preferring the more declarative zc.buildout style for
-production...which we'll show in our next quickstart! ;-)
+Now we'll start the Python process in which we will test our code. We'll move
+to the main directory, but as long as we don't start another worker, it doesn't
+really matter.
+::
+
+ $ cd ..
+ $ ./bin/python
+
+And now, our test.
+
+::
+
+ import ZEO.ClientStorage
+ import ZODB
+ storage = ZEO.ClientStorage.ClientStorage(
+ ('127.0.0.1', 9999))
+ db = ZODB.DB(storage)
+ conn = db.open()
+ import zc.async.configure
+ zc.async.configure.base()
+ import pi
+ import zc.async.job
+ j = q.put(zc.async.job.parallel(
+ zc.async.job.Job(pi.generate_sample, 500000),
+ zc.async.job.Job(pi.generate_sample, size=500000),
+ postprocess=pi.process_samples))
+ import transaction
+ transaction.commit()
+
+Wait a few seconds. If the result is empty (None), begin the transaction again
+and check the result again. Eventually, these next two lines should give you a
+result: an approximation of pi.
+
+::
+
+ _ = transaction.begin()
+ j.result
+
+Just to prove to ourselves that we saved some time, let's do a comparison test:
+the same number of samples, but not in parallel.
+
+::
+
+ j2 = q.put(zc.async.job.parallel(
+ zc.async.job.Job(pi.generate_sample, 1000000),
+ postprocess=pi.process_samples))
+ transaction.commit()
+ _ = transaction.begin()
+ j2.result
+
+Once both jobs are complete, compare their run-time.
+
+::
+
+ j.active_end - j.active_start
+ j2.active_end - j2.active_start
+
+Even in this simple, short example, we ran XXX faster.
+
+Other Configuration
+-------------------
+
+We're at the end of this quickstart. To close, here's a quick survey of some
+other configuration opportunities available that we haven't seen here.
+
+- Callbacks are a very important per-job configuration. You can add them to
+ a job, to be run unconditionally, conditionally if the result is an
+ instance of a ``twisted.python.failure.Failure``, or conditionally if
+ the result is not a ``Failure``. See
+ :meth:`~zc.async.interfaces.IJob.addCallback`,
+ :meth:`~zc.async.interfaces.IJob.addCallbacks`, and
+ :attr:`~zc.async.interfaces.IJob.callbacks`
+
+.. note::
+
+ Unlike Twisted callbacks, all callbacks for the same job get the same
+ result; if you would like to chain results, the callbacks themselves
+ are Jobs, so attach a callback to your callback.
+
+- You can request that a job :attr:`~zc.async.interfaces.IJob.begin_after`
+ a given timezone-aware datetime. If not given, this defaults to now, for the
+ purposes of calculating the effective
+ :attr:`~zc.async.interfaces.IJob.begin_by` datetime, described below.
+
+- You can specify that a job should :attr:`~zc.async.interfaces.IJob.begin_by`
+ a given duration (datetime.timedelta) *after* the jobs's
+ :attr:`~zc.async.interfaces.IJob.begin_after` value. When the queue
+ :gets
+ ready to offer the job for an agent to choose, if the effective
+ ``begin_by`` value has passed, the queue will instead offer a call to
+ the job's
+ :meth:`~zc.async.interfaces.IJob.fail` method.
+
+.. note::
+
+ There is no built-in way to stop a running job, short of stopping the
+ process. This can be approximated by use in your job of
+ :func:`~zc.async.local.getLiveAnnotation` to poll for stop requests; or the
+ brave can write some C to use PyThreadState_SetAsyncExc_.
+
+- You can set up quotas for certain arbitrary quota names that you define.
+ This is a limit: no more than the given quota can run at once, total,
+ across all workers. This can let you decrease the chance of conflict
+ errors for long-running jobs that write to the same data structures. See
+ :attr:`~zc.async.interfaces.IQueue.quotas`,
+ :class:`~zc.async.interfaces.IQuotas`, and
+ :attr:`~zc.async.interfaces.IJob.quota_names`.
+
+- Retry policies determine how jobs should be retried if they raise an
+ uncaught exception while running, if their commit raises an error, or if
+ they are interrupted. They can be configured per job, or as defaults for
+ callback and non-callback jobs. See
+ :class:`~zc.async.interfaces.IRetryPolicy`,
+ :attr:`~zc.async.interfaces.IJob.retry_policy_factory`,
+ :meth:`~zc.async.interfaces.IJob.getRetryPolicy`; the default retry policies
+ :class:`~zc.async.job.RetryCommonFourTimes`,
+ :class:`~zc.async.job.RetryCommonForever` and
+ :class:`~zc.async.job.NeverRetry`; and the ``retry_policy_factory`` argument
+ to :meth:`~zc.async.interfaces.IJob.addCallback`,
+ :meth:`~zc.async.interfaces.IJob.addCallbacks`, and
+ :meth:`~zc.async.interfaces.IQueue.put`.
+
+- The :attr:`~zc.async.interfaces.IJob.failure_log_level` determines at what
+ level Failure results for a given job should be logged. This is usually
+ logging.ERROR, and logging.CRITICAL for callbacks. This can be set directly
+ on the job, or applied via the ``failure_log_level`` argument to
+ :meth:`~zc.async.interfaces.IJob.addCallback`,
+ :meth:`~zc.async.interfaces.IJob.addCallbacks`, and
+ :meth:`~zc.async.interfaces.IQueue.put`.
+
+- Custom job subclasses can take advantage of
+ :meth:`~zc.async.interfaces.IJob.setUp` and
+ :meth:`~zc.async.interfaces.IJob.tearDown` hooks to set up and tear down
+ state. An example is the Zope 3-specific :class:`~zc.async.z3.Job`.
+
+- A custom queue might support broadcasting jobs across dispatchers, or
+ targeting specific dispatchers. These features may be developed for |async|
+ itself at a later date.
+
+There are many other topics to discuss--logging, testing, debugging, Zope 3
+integration, and monitoring, for instance--but this is a quick start, so we'll
+end here.
+
+.. _PyThreadState_SetAsyncExc: http://docs.python.org/api/threads.html
+
.. Now we are going to stop the reactor.
>>> import zc.async.dispatcher
More information about the Checkins
mailing list