[Checkins] SVN: Sandbox/gotcha/z3c.taskqueue/src/z3c/taskqueue/processor.txt put processor back

Godefroid Chapelle gotcha at bubblenet.be
Tue Mar 30 09:50:42 EDT 2010


Log message for revision 110296:
  put processor back
  

Changed:
  A   Sandbox/gotcha/z3c.taskqueue/src/z3c/taskqueue/processor.txt

-=-
Copied: Sandbox/gotcha/z3c.taskqueue/src/z3c/taskqueue/processor.txt (from rev 108713, lovely.remotetask/trunk/src/lovely/remotetask/processor.txt)
===================================================================
--- Sandbox/gotcha/z3c.taskqueue/src/z3c/taskqueue/processor.txt	                        (rev 0)
+++ Sandbox/gotcha/z3c.taskqueue/src/z3c/taskqueue/processor.txt	2010-03-30 13:50:42 UTC (rev 110296)
@@ -0,0 +1,296 @@
+==============
+Job Processors
+==============
+
+The actual processing of the jobs in a queue is handled by a spearate
+component, known as the job processor. This component usually runs in its own
+thread and provides its own main loop.
+
+  >>> from lovely.remotetask import processor
+
+The ``processor`` module provides several implementations of the processor
+API. Let's create the necessary components to test the processor:
+
+1. Create the task service and add it to the root site:
+
+  >>> from lovely import remotetask
+  >>> tasks = remotetask.TaskService()
+
+  >>> sm = root['tasks'] = tasks
+
+2. Register the service as a utility:
+
+  >>> from lovely.remotetask import interfaces
+  >>> sm = root.getSiteManager()
+  >>> sm.registerUtility(tasks, interfaces.ITaskService, name='tasks')
+
+3. Register a task that simply sleeps and writes a message:
+
+  >>> import logging
+  >>> log = logging.getLogger('lovely.remotetask')
+  >>> import time
+  >>> def sleep((sleepTime, id)):
+  ...     time.sleep(sleepTime)
+  ...     log.info('Job: %i' %id)
+
+  >>> import lovely.remotetask.task
+  >>> sleepTask = remotetask.task.SimpleTask(sleep)
+
+  >>> import zope.component
+  >>> zope.component.provideUtility(sleepTask, name='sleep')
+
+4. Setup a database:
+
+  >>> from ZODB.tests import util
+  >>> import transaction
+  >>> db = util.DB()
+
+  >>> from zope.app.publication.zopepublication import ZopePublication
+  >>> conn = db.open()
+  >>> conn.root()[ZopePublication.root_name] = root
+  >>> transaction.commit()
+
+
+The Simple Processor
+--------------------
+
+This processor executes one job at a time. It was designed for jobs that would
+take a long time and use up most of the processing power of a computer. It is
+also the default processor for the task service.
+
+Let's first register a few tasks:
+
+  >>> jobid = tasks.add(u'sleep', (0.04, 1))
+  >>> jobid = tasks.add(u'sleep', (0.1,  2))
+  >>> jobid = tasks.add(u'sleep', (0,    3))
+  >>> jobid = tasks.add(u'sleep', (0.08, 4))
+  >>> transaction.commit()
+
+Let's start by executing a job directly. The first argument to the simple
+processor constructor is the database and the second the traversal stack to
+the task service. All other arguments are optional:
+
+  >>> proc = processor.SimpleProcessor(
+  ...     tasks._p_jar.db(), ['tasks'], waitTime=0.0)
+
+Let's now process the first job. We clear the log and we also have to end any
+existing interactions in order to process the job in this thread:
+
+  >>> log_info.clear()
+
+  >>> from zope.security import management
+  >>> management.endInteraction()
+  >>> proc.processNext()
+  True
+
+  >>> print log_info
+  lovely.remotetask INFO
+    Job: 1
+
+Let's now use the processor from within the task service. Since the processor
+constructors also accept additional arguments, they are specified as well:
+
+  >>> tasks.processorFactory
+  <class 'lovely.remotetask.processor.SimpleProcessor'>
+  >>> tasks.processorArguments
+  {'waitTime': 0.0}
+
+The wait time has been set to zero for testing purposes only. It is really set
+to 1 second by default. Let's now start processing tasks, wait a little bit
+for all the jobs to complete and then stop processing again:
+
+  >>> tasks.startProcessing()
+  >>> transaction.commit()
+
+  >>> time.sleep(0.5)
+
+  >>> tasks.stopProcessing()
+  >>> transaction.commit()
+
+The log shows that all jobs have been processed. But more importantly, they
+were all completed in the order they were defined.
+
+  >>> print log_info
+  lovely.remotetask INFO
+    Job: 1
+  lovely.remotetask INFO
+    Job: 2
+  lovely.remotetask INFO
+    Job: 3
+  lovely.remotetask INFO
+    Job: 4
+
+Transactions in jobs
+--------------------
+
+With the SimpleProcessor, jobs _should_ not change the transaction status, since
+both the administration of the jobs by the TaskService and the job itself run in
+the same transaction, so aborting it from inside the job could wreak havoc with
+the administrative part.
+
+This is a regression test that aborting the transaction inside the job does not
+lead to an infinite loop (because SimpleProcessor pulls the job inside the
+transaction, so if it is aborted, the job remains on the queue):
+
+  >>> counter = 0
+  >>> def count(arg):
+  ...     global counter
+  ...     counter += 1
+  ...     transaction.abort()
+  >>> countTask = remotetask.task.SimpleTask(count)
+  >>> zope.component.provideUtility(countTask, name='count')
+
+  >>> jobid = tasks.add(u'count', ())
+  >>> transaction.commit()
+
+  >>> tasks.startProcessing()
+  >>> transaction.commit()
+  >>> time.sleep(0.5)
+  >>> tasks.stopProcessing()
+  >>> transaction.commit()
+  >>> time.sleep(0.5)
+  >>> transaction.abort() # prevent spurious conflict errors
+  >>> counter
+  1
+
+
+The Multi-thread Processor
+--------------------------
+
+The multi-threaded processor executes several jobs at once. It was designed
+for jobs that would take a long time but use very little processing power.
+
+Let's add a few new tasks to execute:
+
+  >>> jobid = tasks.add(u'sleep', (0.04,  1))
+  >>> jobid = tasks.add(u'sleep', (0.18,  2))
+  >>> jobid = tasks.add(u'sleep', (0,     3))
+  >>> jobid = tasks.add(u'sleep', (0.02,  4))
+  >>> transaction.commit()
+
+Before testing the processor in the task service, let's have a look at every
+method by itself. So we instantiate the processor:
+
+  >>> proc = processor.MultiProcessor(
+  ...     tasks._p_jar.db(), ['tasks'], waitTime=0)
+
+The maximum amount of threads can be set as well:
+
+  >>> proc.maxThreads
+  5
+
+All working threads can be reviewed at any time:
+
+  >>> proc.threads
+  []
+
+At any time you can ask the service whether any jobs are waiting to be
+executed:
+
+  >>> from zope.security import management
+  >>> management.endInteraction()
+
+  >>> proc.hasJobsWaiting()
+  True
+
+Once you know that jobs are available, you can claim a job:
+
+  >>> jobid = proc.claimNextJob()
+  >>> jobid
+  1392637180
+
+We need to claim a job before executing it, so that the database marks the job
+as claimed and no new thread picks up the job. Once we claimed a particular
+job, we can process it:
+
+  >>> log_info.clear()
+
+  >>> proc.processNext(jobid)
+  True
+
+  >>> print log_info
+  lovely.remotetask INFO
+    Job: 1
+
+Let's now have a look at using the processor in the task service. This
+primarily means setting the processor factory:
+
+  >>> management.newInteraction()
+
+  >>> tasks.processorFactory = processor.MultiProcessor
+  >>> transaction.commit()
+
+  >>> log_info.clear()
+
+Let's now process the remaining jobs:
+
+  >>> tasks.startProcessing()
+  >>> transaction.commit()
+
+  >>> time.sleep(0.5)
+
+  >>> tasks.stopProcessing()
+  >>> transaction.commit()
+
+As you can see, this time the jobs are not completed in order, because they
+all need different time to execute:
+
+  >>> print log_info
+  lovely.remotetask INFO
+    Job: 3
+  lovely.remotetask INFO
+    Job: 4
+  lovely.remotetask INFO
+    Job: 2
+
+Let's now set the thread limit to two and construct a new set of tasks that
+demonstrate that not more than two threads run at the same time:
+
+  >>> tasks.processorArguments = {'waitTime': 0.0, 'maxThreads': 2}
+
+  >>> jobid = tasks.add(u'sleep', (0.03, 1))
+  >>> jobid = tasks.add(u'sleep', (0.05, 2))
+  >>> jobid = tasks.add(u'sleep', (0.03, 3))
+  >>> jobid = tasks.add(u'sleep', (0.08, 4))
+  >>> transaction.commit()
+
+If all tasks are processed at once, job 3 should be done first, but since the
+job has to wait for an available thread, it will come in third. We can now run
+the jobs and see the result:
+
+  >>> log_info.clear()
+
+  >>> tasks.startProcessing()
+  >>> transaction.commit()
+
+  >>> time.sleep(0.5)
+
+  >>> tasks.stopProcessing()
+  >>> transaction.commit()
+
+  >>> print log_info
+  lovely.remotetask INFO
+    Job: 1
+  lovely.remotetask INFO
+    Job: 2
+  lovely.remotetask INFO
+    Job: 3
+  lovely.remotetask INFO
+    Job: 4
+
+Note: Sometimes (about 20% of the time in this test) the log will contain a
+conflict error. As long as the error is *not* produced by ``processNext()``,
+they are harmless, even though they are logged on the error level. To avoid
+conflict errors when calling the ``processNext()`` method, we wait a little
+bit to give the thread a chance to start up. This waiting time is defined by a
+module global:
+
+  >>> processor.THREAD_STARTUP_WAIT
+  0.050000000000000003
+
+On my machine this number seems to work well. On slower machines you might
+want to raise that number.
+
+Let's give Python enough time to stop all the threads.
+
+  >>> time.sleep(0.05)



More information about the checkins mailing list