[Checkins] SVN: zc.async/trunk/s - More doc additions (primarily work in the virtualenv quickstart)
Gary Poster
gary at modernsongs.com
Sun Aug 17 00:08:20 EDT 2008
Log message for revision 89917:
- More doc additions (primarily work in the virtualenv quickstart)
- Fix support for Twisted installed reactor XXX NEEDS TEST
- Fix retry behavior for parallel and serial jobs XXX NEEDS TEST
- tweaked the uuid.txt to mention zdaemon/supervisor rather than Zope 3.
Changed:
U zc.async/trunk/setup.py
U zc.async/trunk/sphinx/.static/default.css
U zc.async/trunk/src/zc/async/CHANGES.txt
U zc.async/trunk/src/zc/async/QUICKSTART_1_VIRTUALENV.txt
U zc.async/trunk/src/zc/async/TODO.txt
U zc.async/trunk/src/zc/async/dispatcher.py
U zc.async/trunk/src/zc/async/instanceuuid.py
U zc.async/trunk/src/zc/async/job.py
-=-
Modified: zc.async/trunk/setup.py
===================================================================
--- zc.async/trunk/setup.py 2008-08-16 18:29:19 UTC (rev 89916)
+++ zc.async/trunk/setup.py 2008-08-17 04:08:16 UTC (rev 89917)
@@ -72,7 +72,7 @@
setup(
name='zc.async',
- version='1.4.2a2',
+ version='1.4.2a3',
namespace_packages=['zc'],
packages=find_packages('src'),
package_dir={'':'src'},
Modified: zc.async/trunk/sphinx/.static/default.css
===================================================================
--- zc.async/trunk/sphinx/.static/default.css 2008-08-16 18:29:19 UTC (rev 89916)
+++ zc.async/trunk/sphinx/.static/default.css 2008-08-17 04:08:16 UTC (rev 89917)
@@ -868,4 +868,9 @@
div.sidebar p.sidebar-title tt {
background: transparent;
+}
+
+div#a-job div.highlight,
+div#make-a-file div.highlight {
+ background-color: #fff;
}
\ No newline at end of file
Modified: zc.async/trunk/src/zc/async/CHANGES.txt
===================================================================
--- zc.async/trunk/src/zc/async/CHANGES.txt 2008-08-16 18:29:19 UTC (rev 89916)
+++ zc.async/trunk/src/zc/async/CHANGES.txt 2008-08-17 04:08:16 UTC (rev 89917)
@@ -18,6 +18,12 @@
- Added zc.async.partial.Partial for backward compatibility purposes.
+- Fix support for Twisted installed reactor XXX NEEDS TEST
+
+- Fix retry behavior for parallel and serial jobs XXX NEEDS TEST
+
+- tweaked the uuid.txt to mention zdaemon/supervisor rather than Zope 3.
+
1.4.1 (2008-07-30)
==================
Modified: zc.async/trunk/src/zc/async/QUICKSTART_1_VIRTUALENV.txt
===================================================================
--- zc.async/trunk/src/zc/async/QUICKSTART_1_VIRTUALENV.txt 2008-08-16 18:29:19 UTC (rev 89916)
+++ zc.async/trunk/src/zc/async/QUICKSTART_1_VIRTUALENV.txt 2008-08-17 04:08:16 UTC (rev 89917)
@@ -203,6 +203,17 @@
A Job
-----
+.. sidebar:: A Silly Example
+
+ This is a silly example. Imagine instead that this was some really
+ long-running job. Maybe you have lots of these jobs coming in, and you need
+ to have many machines to claim jobs and perform them, so that you can
+ scale. Maybe this job divides itself up into parallel or serial jobs, and
+ this parent job isn't done until all the children jobs run to completion.
+
+ Or maybe this is a silly example.
+..
+
Let's put a job in our queue. This silly example will return the current time.
>>> import time
@@ -214,17 +225,6 @@
>>> j.status
u'pending-status'
-.. sidebar:: A Silly Example
-
- This is a silly example. Imagine instead that this was some really
- long-running job. Maybe you have lots of these jobs coming in, and you need
- to have many machines to claim jobs and perform them, so that you can
- scale. Maybe this job divides itself up into parallel or serial jobs, and
- this parent job isn't done until all the children jobs run to completion.
-
- Or maybe this is a silly example.
-..
-
-------------
A Transaction
-------------
@@ -306,7 +306,7 @@
means`_.
We've now seen some simple examples from the standard library. But how do you
-get your own work done? How can you debug it?
+get your own work done?
Let's say we want to write an approach a chestnut of a problem: use a `Monte
Carlo simulation`_ (read "throwing darts and analyzing the results") to
@@ -323,7 +323,7 @@
.. _`Hadoop`: http://hadoop.apache.org/core/
-.. _`calculate pi`: http://www.eveandersson.com/pi/monte-carlo-circle
+.. _`calculate pi`: http://math.fullerton.edu/mathews/n2003/MonteCarloPiMod.html
.. _`far more sophisticated means`: http://en.wikipedia.org/wiki/Computing_Ï
@@ -387,45 +387,287 @@
.. _MVCC: http://en.wikipedia.org/wiki/Multiversion_concurrency_control
+-------------
+Process UUIDs
+-------------
+
+Exit the Python interpreter (control-D). Look around in the directory
+(``ls``): you should see a few database files, the key.pem you created, and a
+new file: ``uuid.txt``. It should look something like this::
+
+ $ cat uuid.txt
+ afd1e0d0-52e1-11dd-879b-0017f2c49bdd
+ ------------------------------------------------------------------------
+ The value above (and this file) is created and used by the zc.async
+ package. It is intended to uniquely identify this software instance when
+ it is used to start a zc.async dispatcher. This allows multiple
+ dispatchers, each in its own software instance, to connect to a single
+ database to do work.
+
+ In order to decide where to look for this file (or to create it, if
+ necessary), the module looks in ``os.environ['ZC_ASYNC_UUID']`` for a
+ file name.
+
+ If you are using zdaemon (http://pypi.python.org/pypi/zdaemon) to
+ daemonize your process, you can set this in a zdaemon environment section
+ of your zdaemon.conf. Supervisor (http://supervisord.org/) also provides
+ this functionality. Other similar tools probably do as well.
+
+ If the ``ZC_ASYNC_UUID`` is not found in the environment, it will use
+ ``os.path.join(os.getgwd(), 'uuid.txt')`` as the file name.
+
+ To get a new identifier for this software instance, delete this file,
+ restart Python, and import zc.async.instanceuuid. This file will be
+ recreated with a new value.
+
+That text is intended to be self-explanatory, so hopefully it made sense to
+you. We'll handle these UUIDs more explicitly in a moment.
+
-----------
Make a File
-----------
Make a new Python file. Let's call it ``pi.py``.
+Save this file in ``lib/python2.5/site-packages/``.
-At the top, include the same code we used in the interpreter above.
+Use the following for the file content.
+.. sidebar:: Code Walkthrough
+
+ This code walkthrough focuses on the elements needed to use |async|,
+ rather than the calculation of pi. Numerous explanations of this Monte
+ Carlo simulation to calculate pi are on the internet. I liked `this one`_.
+
+ We begin with some imports.
+
+ The ``generate_sample`` code will be run on multiple processes to produce
+ samples for our Monte Carlo function simulation. It is ignorant of
+ |async|.
+
+ Once the samples are all done, we'll reduce the results with
+ ``process_samples``. It will return an approximation of pi, with accuracy
+ determined by the total size of the aggregated samples, assuming even
+ distribution of the random numbers. As you'll see soon, we'll be using a
+ |async| convenience function for parallel jobs that gives all of the
+ completed jobs that have been running in parallel to this, the
+ postprocessing call. Therefore, ``process_samples`` gets the ``result`` of
+ ``generate_sample`` off of each job. Other than that, this function is
+ ignorant of |async|.
+
+ The last code block should look similar to our previous example of starting
+ up a dispatcher, except this one uses the main, installed Twisted reactor,
+ rather than a threaded instance. It creates the database, configures
+ zc.async, and starts the reactor.
+
::
+ import random
+ import math
+
import ZEO.ClientStorage
import ZODB
import twisted.internet.reactor
- twisted.internet.reactor.start()
+ import zc.async.configure
- def generateSample():
- pass XXX
+ def generate_sample(size=100000):
+ count = 0
+ for i in range(size):
+ if math.hypot(random.random(), random.random()) < 1:
+ count += 1
+ return count, size
- def processSamples(*sample_jobs):
- pass XXX
+ def process_samples(*sample_jobs):
+ count = 0
+ size = 0
+ for j in sample_jobs:
+ count += j.result[0]
+ size += j.result[1]
+ return 4.0 * count / size
if __name__ == '__main__':
storage = ZEO.ClientStorage.ClientStorage(
('127.0.0.1', 9999))
db = ZODB.DB(storage)
-
- import zc.async.configure
zc.async.configure.base()
-
zc.async.configure.start(
db, poll_interval=1, twisted=True)
+ twisted.internet.reactor.run()
+.. We'll need these defined when we run this as a test.
+
+ >>> import random
+ >>> import math
+
+ >>> def generate_sample(size=100000):
+ ... count = 0
+ ... for i in range(size):
+ ... if math.hypot(random.random(), random.random()) < 1:
+ ... count += 1
+ ... return count, size
+ ...
+ >>> def process_samples(*sample_jobs):
+ ... count = 0
+ ... size = 0
+ ... for j in sample_jobs:
+ ... count += j.result[0]
+ ... size += j.result[1]
+ ... return 4.0 * count / size
+ ...
+ >>> class StubModule(object):
+ ... pass
+ ...
+ >>> pi = StubModule()
+ >>> pi.generate_sample = generate_sample
+ >>> pi.process_samples = process_samples
+
+.. _`this one`: http://math.fullerton.edu/mathews/n2003/MonteCarloPiMod.html
+
+--------------------------------
+Our First Monte Carlo Experiment
+--------------------------------
+
+We'll need the ZEO server running. If you've gone through this file from the
+start, it should still be running. If not, use this::
+
+ $ ./bin/runzeo -a 9999 -f test.fs &
+
+Now, for our first experiment with the Monte Carlo simulation, we'll start a
+single worker process.
+
+Enter this in the terminal::
+
+ $ ./bin/python lib/python2.5/site-packages/pi.py &
+
+That will start our worker process. Now let's start an interpreter.
+
+::
+
+ $ ./bin/python
+
+Now get a database and a connection, as we've seen before. We'll also set up
+the base zc.async configuration.
+
+::
+
+ >> import ZEO.ClientStorage
+ >> import ZODB
+ >> storage = ZEO.ClientStorage.ClientStorage(
+ ... ('127.0.0.1', 9999))
+ >> db = ZODB.DB(storage)
+ >> conn = db.open()
+ >> import zc.async.configure
+ >> zc.async.configure.base()
+
+We don't have any adapters installed, so ``zc.async.interfaces.IQueue(conn)``
+won't work. This will though, and still looks pretty good::
+
+ >>> import zc.async.queue
+ >>> q = zc.async.queue.getDefaultQueue(conn)
+
+Now we can start some jobs in parallel.
+
+::
+
+ >> import pi
+ >>> import zc.async.job
+ >>> j = q.put(zc.async.job.parallel(
+ ... pi.generate_sample, pi.generate_sample, pi.generate_sample,
+ ... postprocess=pi.process_samples))
+ >>> import transaction
+ >>> transaction.commit()
+
+Wait a few seconds. If the result is empty (None), begin the transaction again
+and check the result again. Eventually, these next two lines should give you a
+similar result--an approximation of pi.
+
+.. This lets us "wait a second".
+
+ >>> zc.async.testing.wait_for_result(j) # doctest: +ELLIPSIS
+ 3.1...
+
+..
+
+::
+
+ >> _ = transaction.begin()
+ >> j.result
+ 3.1386666666666665
+
+Cool.
+
+--------
+Closures
+--------
+
+Sometimes, you want to pass arguments to your functions. In this case, what if
+you want to pass a different ``size`` argument to ``generate_sample``?
+
+You can always pass a Job directly to the queue (or to the ``parallel`` helper
+function, in this case). The job accepts arguments similarly to the Python 2.5
+``functools.partial``: Job(func, \*args, \*\*keywords). This instantiates a new
+callable (a Job) with partial application of the given arguments and keywords.
+
+Let's try it.
+
+ >>> j = q.put(zc.async.job.parallel(
+ ... zc.async.job.Job(pi.generate_sample, 1000000),
+ ... zc.async.job.Job(pi.generate_sample, size=1000000),
+ ... postprocess=pi.process_samples))
+ >>> transaction.commit()
+
+Wait a bit, again. Retry these two lines until you get a result. It should
+be well under a minute on most machines.
+
+.. This lets us "wait a second".
+
+ >>> zc.async.testing.wait_for_result(j, seconds=20) # doctest: +ELLIPSIS
+ 3.1...
+
+..
+
+::
+
+ >> _ = transaction.begin()
+ >> j.result
+ 3.1434359999999999
+
+Cool.
+
+-------------
+Configuration
+-------------
+
+Unfortunately, the parallel runs we've done so far would actually make the
+calculation go slower than if we did it in a single function call! That's
+because we ran it in a single Python worker process, and the overhead of
+threads just makes the jobs a bit less efficient. Threads help for some uses
+of |async|, but not this one.
+
+Let's assume you are running these experiments on a machine with two processor
+cores. We should actually start two worker processes then, one for each core.
+We'll need to make sure each worker process has its own OID.
+
+Moreover, we need to configure each process to only take one
+``generate_sample`` job at a time. Let's adjust our code to do that.
+
+Agents
+------
+
+A worker process has a central polling activity, called a ``dispatcher``.
+Dispatchers look in the database to ask their ``agent`` (or agents; think of it
+as a "`talent agent`_" or a "booking agent") to determine what they should get
+their threads to do.
+
+By default using the zc.async.configure helpers, each dispatcher is given a
+single agent that will choose the first job in the queue, and that wants to run
+no more than three jobs at a time.
+
+.. _`talent agent`: http://en.wikipedia.org/wiki/Talent_agent
+
XXX
===
-Next need to discuss that callables must be picklable, so we need to switch
-from the interpreter to the filesystem.
-
Talk about callbacks, and how that lets you respond to results.
Talk briefly about failures, show the exceptions, and briefly mention logging
Modified: zc.async/trunk/src/zc/async/TODO.txt
===================================================================
--- zc.async/trunk/src/zc/async/TODO.txt 2008-08-16 18:29:19 UTC (rev 89916)
+++ zc.async/trunk/src/zc/async/TODO.txt 2008-08-17 04:08:16 UTC (rev 89917)
@@ -2,6 +2,8 @@
- finish quickstart
- write a zc.buildout quickstart
- write a grok quickstart
+- write test for using installed twisted reactor
+- write test for interrupt retry of parallel/serial
Improvements
Modified: zc.async/trunk/src/zc/async/dispatcher.py
===================================================================
--- zc.async/trunk/src/zc/async/dispatcher.py 2008-08-16 18:29:19 UTC (rev 89916)
+++ zc.async/trunk/src/zc/async/dispatcher.py 2008-08-17 04:08:16 UTC (rev 89917)
@@ -438,19 +438,21 @@
self.reactor.callLater(self.poll_interval, self.directPoll)
def _inThreadPoll(self, deferred):
+ self.conn = self.db.open()
try:
self.poll()
finally:
+ self.conn.close()
self.reactor.callFromThread(deferred.callback, None)
def threadedPoll(self):
if not self.activated:
return
deferred = twisted.internet.defer.Deferred()
- self.reactor.callInThread(self._inThreadPoll, deferred)
deferred.addCallback(
lambda result: self.reactor.callLater(
self.poll_interval, self.threadedPoll))
+ self.reactor.callInThread(self._inThreadPoll, deferred)
def activate(self, threaded=False):
if self.activated:
@@ -463,8 +465,9 @@
self.jobs.clear()
# increase pool size to account for the dispatcher poll
self.db.setPoolSize(self.db.getPoolSize() + 1)
- self.conn = self.db.open() # we keep the same connection for all
- # polls as an optimization
+ if not threaded:
+ self.conn = self.db.open() # we keep the same connection for all
+ # polls as an optimization
if threaded:
self.reactor.callWhenRunning(self.threadedPoll)
else:
Modified: zc.async/trunk/src/zc/async/instanceuuid.py
===================================================================
--- zc.async/trunk/src/zc/async/instanceuuid.py 2008-08-16 18:29:19 UTC (rev 89916)
+++ zc.async/trunk/src/zc/async/instanceuuid.py 2008-08-17 04:08:16 UTC (rev 89917)
@@ -27,10 +27,14 @@
database to do work.
In order to decide where to look for this file (or to create it, if
-necessary), the module looks in ``os.environ['ZC_ASYNC_UUID']`` for a file
-name. If you are using Zope 3, you can set this in a zdaemon environment
-section of your zdaemon.conf.
+necessary), the module looks in ``os.environ['ZC_ASYNC_UUID']`` for a
+file name.
+If you are using zdaemon (http://pypi.python.org/pypi/zdaemon) to
+daemonize your process, you can set this in a zdaemon environment section
+of your zdaemon.conf. Supervisor (http://supervisord.org/) also provides
+this functionality. Other similar tools probably do as well.
+
If the ``ZC_ASYNC_UUID`` is not found in the environment, it will use
``os.path.join(os.getgwd(), 'uuid.txt')`` as the file name.
Modified: zc.async/trunk/src/zc/async/job.py
===================================================================
--- zc.async/trunk/src/zc/async/job.py 2008-08-16 18:29:19 UTC (rev 89916)
+++ zc.async/trunk/src/zc/async/job.py 2008-08-17 04:08:16 UTC (rev 89917)
@@ -985,8 +985,10 @@
next.addCallback(Job(_queue_next, main_job, ix+1))
else:
postprocess = main_job.kwargs['postprocess']
- postprocess.args.extend(jobs)
- queue.put(postprocess)
+ if postprocess.status == zc.async.interfaces.NEW:
+ # will not be NEW if this is a retry
+ postprocess.args.extend(jobs)
+ queue.put(postprocess)
def _schedule_serial(*jobs, **kw):
_queue_next(zc.async.local.getJob())
@@ -1010,8 +1012,10 @@
complete = False
if complete:
postprocess = main_job.kwargs['postprocess']
- postprocess.args.extend(jobs)
- queue.put(postprocess)
+ if postprocess.status == zc.async.interfaces.NEW:
+ # will not be NEW if this is a retry
+ postprocess.args.extend(jobs)
+ queue.put(postprocess)
def _schedule_parallel(*jobs, **kw):
_queue_all(zc.async.local.getJob())
More information about the Checkins
mailing list