[Checkins] SVN: Sandbox/gotcha/z3c.taskqueue/src/z3c/taskqueue/ remove processors, they are z3 specific
Godefroid Chapelle
gotcha at bubblenet.be
Mon Mar 8 05:25:09 EST 2010
Log message for revision 109801:
remove processors, they are z3 specific
Changed:
D Sandbox/gotcha/z3c.taskqueue/src/z3c/taskqueue/processor.py
D Sandbox/gotcha/z3c.taskqueue/src/z3c/taskqueue/processor.txt
U Sandbox/gotcha/z3c.taskqueue/src/z3c/taskqueue/tests.py
-=-
Deleted: Sandbox/gotcha/z3c.taskqueue/src/z3c/taskqueue/processor.py
===================================================================
--- Sandbox/gotcha/z3c.taskqueue/src/z3c/taskqueue/processor.py 2010-03-08 10:23:30 UTC (rev 109800)
+++ Sandbox/gotcha/z3c.taskqueue/src/z3c/taskqueue/processor.py 2010-03-08 10:25:09 UTC (rev 109801)
@@ -1,156 +0,0 @@
-##############################################################################
-#
-# Copyright (c) 2006, 2007 Lovely Systems and Contributors.
-# All Rights Reserved.
-#
-# This software is subject to the provisions of the Zope Public License,
-# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
-# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
-# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
-# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
-# FOR A PARTICULAR PURPOSE.
-#
-##############################################################################
-"""Processor Implementations
-
-$Id$
-"""
-__docformat__ = 'restructuredtext'
-import logging
-import threading
-import time
-import transaction
-import zope.interface
-import zope.publisher.base
-import zope.publisher.publish
-from zope.app.publication.zopepublication import ZopePublication
-from zope.security import management
-from zope.security.proxy import removeSecurityProxy
-from zope.traversing.api import traverse
-
-from lovely.remotetask import interfaces
-
-THREAD_STARTUP_WAIT = 0.05
-
-ERROR_MARKER = object()
-
-log = logging.getLogger('lovely.remotetask')
-
-
-class ProcessorPublication(ZopePublication):
- """A custom publication to process the next job."""
-
- def beforeTraversal(self, request):
- # Overwrite this method, so that the publication does not attempt to
- # authenticate; we assume that the processor is allowed to do
- # everything. Note that the DB is not exposed to the job callable.
- management.newInteraction(request)
- transaction.begin()
-
- def traverseName(self, request, ob, name):
- # Provide a very simple traversal mechanism.
- return traverse(removeSecurityProxy(ob), name, None)
-
-class ProcessorRequest(zope.publisher.base.BaseRequest):
- """A custome publisher request for the processor."""
-
- def __init__(self, *args):
- super(ProcessorRequest, self).__init__(None, {}, positional=args)
-
-
-class SimpleProcessor(object):
- """Simple Job Processor
-
- This processor only processes one job at a time.
- """
- zope.interface.implements(interfaces.IProcessor)
-
- @property
- def running(self):
- thread = threading.currentThread()
- if thread is not None:
- return thread.running
- log.error('SimpleProcessor: no currentThread')
- return False
-
- def __init__(self, db, servicePath, waitTime=1.0):
- self.db = db
- self.servicePath = servicePath
- self.waitTime = waitTime
-
- def call(self, method, args=(), errorValue=ERROR_MARKER):
- # Create the path to the method.
- path = self.servicePath[:] + [method]
- path.reverse()
- # Produce a special processor event to be sent to the publisher.
- request = ProcessorRequest(*args)
- request.setPublication(ProcessorPublication(self.db))
- request.setTraversalStack(path)
- # Publish the request, making sure that *all* exceptions are
- # handled. The processor should *never* crash.
- try:
- zope.publisher.publish.publish(request, False)
- return request.response._result
- except Exception, error:
- # This thread should never crash, thus a blank except
- log.error('Processor: ``%s()`` caused an error!' %method)
- log.exception(error)
- return errorValue is ERROR_MARKER and error or errorValue
-
- def processNext(self, jobid=None):
- return self.call('processNext', args=(None, jobid))
-
- def __call__(self):
- while self.running:
- result = self.processNext()
- # If there are no jobs available, sleep a little bit and then
- # check again.
- if not result:
- time.sleep(self.waitTime)
-
-
-class MultiProcessor(SimpleProcessor):
- """Multi-threaded Job Processor
-
- This processor can work on multiple jobs at the same time.
- """
- zope.interface.implements(interfaces.IProcessor)
-
- def __init__(self, *args, **kwargs):
- self.maxThreads = kwargs.pop('maxThreads', 5)
- super(MultiProcessor, self).__init__(*args, **kwargs)
- self.threads = []
-
- def hasJobsWaiting(self):
- return self.call('hasJobsWaiting', errorValue=False)
-
- def claimNextJob(self):
- return self.call('claimNextJob', errorValue=None)
-
- def __call__(self):
- # Start the processing loop
- while self.running:
- # Remove all dead threads
- for thread in self.threads:
- if not thread.isAlive():
- self.threads.remove(thread)
- # If the number of threads equals the number of maximum threads,
- # wait a little bit and then start over
- if len(self.threads) == self.maxThreads:
- time.sleep(self.waitTime)
- continue
- # Let's wait for jobs to become available
- while not self.hasJobsWaiting() and self.running:
- time.sleep(self.waitTime)
- # Okay, we have to do some work, so let's do that now. Since we
- # are working with threads, we first have to claim a job and then
- # execute it.
- jobid = self.claimNextJob()
- # If we got a job, let's work on it in a new thread.
- if jobid is not None:
- thread = threading.Thread(
- target=self.processNext, args=(jobid,))
- self.threads.append(thread)
- thread.start()
- # Give the thread some time to start up:
- time.sleep(THREAD_STARTUP_WAIT)
Deleted: Sandbox/gotcha/z3c.taskqueue/src/z3c/taskqueue/processor.txt
===================================================================
--- Sandbox/gotcha/z3c.taskqueue/src/z3c/taskqueue/processor.txt 2010-03-08 10:23:30 UTC (rev 109800)
+++ Sandbox/gotcha/z3c.taskqueue/src/z3c/taskqueue/processor.txt 2010-03-08 10:25:09 UTC (rev 109801)
@@ -1,296 +0,0 @@
-==============
-Job Processors
-==============
-
-The actual processing of the jobs in a queue is handled by a spearate
-component, known as the job processor. This component usually runs in its own
-thread and provides its own main loop.
-
- >>> from lovely.remotetask import processor
-
-The ``processor`` module provides several implementations of the processor
-API. Let's create the necessary components to test the processor:
-
-1. Create the task service and add it to the root site:
-
- >>> from lovely import remotetask
- >>> tasks = remotetask.TaskService()
-
- >>> sm = root['tasks'] = tasks
-
-2. Register the service as a utility:
-
- >>> from lovely.remotetask import interfaces
- >>> sm = root.getSiteManager()
- >>> sm.registerUtility(tasks, interfaces.ITaskService, name='tasks')
-
-3. Register a task that simply sleeps and writes a message:
-
- >>> import logging
- >>> log = logging.getLogger('lovely.remotetask')
- >>> import time
- >>> def sleep((sleepTime, id)):
- ... time.sleep(sleepTime)
- ... log.info('Job: %i' %id)
-
- >>> import lovely.remotetask.task
- >>> sleepTask = remotetask.task.SimpleTask(sleep)
-
- >>> import zope.component
- >>> zope.component.provideUtility(sleepTask, name='sleep')
-
-4. Setup a database:
-
- >>> from ZODB.tests import util
- >>> import transaction
- >>> db = util.DB()
-
- >>> from zope.app.publication.zopepublication import ZopePublication
- >>> conn = db.open()
- >>> conn.root()[ZopePublication.root_name] = root
- >>> transaction.commit()
-
-
-The Simple Processor
---------------------
-
-This processor executes one job at a time. It was designed for jobs that would
-take a long time and use up most of the processing power of a computer. It is
-also the default processor for the task service.
-
-Let's first register a few tasks:
-
- >>> jobid = tasks.add(u'sleep', (0.04, 1))
- >>> jobid = tasks.add(u'sleep', (0.1, 2))
- >>> jobid = tasks.add(u'sleep', (0, 3))
- >>> jobid = tasks.add(u'sleep', (0.08, 4))
- >>> transaction.commit()
-
-Let's start by executing a job directly. The first argument to the simple
-processor constructor is the database and the second the traversal stack to
-the task service. All other arguments are optional:
-
- >>> proc = processor.SimpleProcessor(
- ... tasks._p_jar.db(), ['tasks'], waitTime=0.0)
-
-Let's now process the first job. We clear the log and we also have to end any
-existing interactions in order to process the job in this thread:
-
- >>> log_info.clear()
-
- >>> from zope.security import management
- >>> management.endInteraction()
- >>> proc.processNext()
- True
-
- >>> print log_info
- lovely.remotetask INFO
- Job: 1
-
-Let's now use the processor from within the task service. Since the processor
-constructors also accept additional arguments, they are specified as well:
-
- >>> tasks.processorFactory
- <class 'lovely.remotetask.processor.SimpleProcessor'>
- >>> tasks.processorArguments
- {'waitTime': 0.0}
-
-The wait time has been set to zero for testing purposes only. It is really set
-to 1 second by default. Let's now start processing tasks, wait a little bit
-for all the jobs to complete and then stop processing again:
-
- >>> tasks.startProcessing()
- >>> transaction.commit()
-
- >>> time.sleep(0.5)
-
- >>> tasks.stopProcessing()
- >>> transaction.commit()
-
-The log shows that all jobs have been processed. But more importantly, they
-were all completed in the order they were defined.
-
- >>> print log_info
- lovely.remotetask INFO
- Job: 1
- lovely.remotetask INFO
- Job: 2
- lovely.remotetask INFO
- Job: 3
- lovely.remotetask INFO
- Job: 4
-
-Transactions in jobs
---------------------
-
-With the SimpleProcessor, jobs _should_ not change the transaction status, since
-both the administration of the jobs by the TaskService and the job itself run in
-the same transaction, so aborting it from inside the job could wreak havoc with
-the administrative part.
-
-This is a regression test that aborting the transaction inside the job does not
-lead to an infinite loop (because SimpleProcessor pulls the job inside the
-transaction, so if it is aborted, the job remains on the queue):
-
- >>> counter = 0
- >>> def count(arg):
- ... global counter
- ... counter += 1
- ... transaction.abort()
- >>> countTask = remotetask.task.SimpleTask(count)
- >>> zope.component.provideUtility(countTask, name='count')
-
- >>> jobid = tasks.add(u'count', ())
- >>> transaction.commit()
-
- >>> tasks.startProcessing()
- >>> transaction.commit()
- >>> time.sleep(0.5)
- >>> tasks.stopProcessing()
- >>> transaction.commit()
- >>> time.sleep(0.5)
- >>> transaction.abort() # prevent spurious conflict errors
- >>> counter
- 1
-
-
-The Multi-thread Processor
---------------------------
-
-The multi-threaded processor executes several jobs at once. It was designed
-for jobs that would take a long time but use very little processing power.
-
-Let's add a few new tasks to execute:
-
- >>> jobid = tasks.add(u'sleep', (0.04, 1))
- >>> jobid = tasks.add(u'sleep', (0.18, 2))
- >>> jobid = tasks.add(u'sleep', (0, 3))
- >>> jobid = tasks.add(u'sleep', (0.02, 4))
- >>> transaction.commit()
-
-Before testing the processor in the task service, let's have a look at every
-method by itself. So we instantiate the processor:
-
- >>> proc = processor.MultiProcessor(
- ... tasks._p_jar.db(), ['tasks'], waitTime=0)
-
-The maximum amount of threads can be set as well:
-
- >>> proc.maxThreads
- 5
-
-All working threads can be reviewed at any time:
-
- >>> proc.threads
- []
-
-At any time you can ask the service whether any jobs are waiting to be
-executed:
-
- >>> from zope.security import management
- >>> management.endInteraction()
-
- >>> proc.hasJobsWaiting()
- True
-
-Once you know that jobs are available, you can claim a job:
-
- >>> jobid = proc.claimNextJob()
- >>> jobid
- 1392637180
-
-We need to claim a job before executing it, so that the database marks the job
-as claimed and no new thread picks up the job. Once we claimed a particular
-job, we can process it:
-
- >>> log_info.clear()
-
- >>> proc.processNext(jobid)
- True
-
- >>> print log_info
- lovely.remotetask INFO
- Job: 1
-
-Let's now have a look at using the processor in the task service. This
-primarily means setting the processor factory:
-
- >>> management.newInteraction()
-
- >>> tasks.processorFactory = processor.MultiProcessor
- >>> transaction.commit()
-
- >>> log_info.clear()
-
-Let's now process the remaining jobs:
-
- >>> tasks.startProcessing()
- >>> transaction.commit()
-
- >>> time.sleep(0.5)
-
- >>> tasks.stopProcessing()
- >>> transaction.commit()
-
-As you can see, this time the jobs are not completed in order, because they
-all need different time to execute:
-
- >>> print log_info
- lovely.remotetask INFO
- Job: 3
- lovely.remotetask INFO
- Job: 4
- lovely.remotetask INFO
- Job: 2
-
-Let's now set the thread limit to two and construct a new set of tasks that
-demonstrate that not more than two threads run at the same time:
-
- >>> tasks.processorArguments = {'waitTime': 0.0, 'maxThreads': 2}
-
- >>> jobid = tasks.add(u'sleep', (0.03, 1))
- >>> jobid = tasks.add(u'sleep', (0.05, 2))
- >>> jobid = tasks.add(u'sleep', (0.03, 3))
- >>> jobid = tasks.add(u'sleep', (0.08, 4))
- >>> transaction.commit()
-
-If all tasks are processed at once, job 3 should be done first, but since the
-job has to wait for an available thread, it will come in third. We can now run
-the jobs and see the result:
-
- >>> log_info.clear()
-
- >>> tasks.startProcessing()
- >>> transaction.commit()
-
- >>> time.sleep(0.5)
-
- >>> tasks.stopProcessing()
- >>> transaction.commit()
-
- >>> print log_info
- lovely.remotetask INFO
- Job: 1
- lovely.remotetask INFO
- Job: 2
- lovely.remotetask INFO
- Job: 3
- lovely.remotetask INFO
- Job: 4
-
-Note: Sometimes (about 20% of the time in this test) the log will contain a
-conflict error. As long as the error is *not* produced by ``processNext()``,
-they are harmless, even though they are logged on the error level. To avoid
-conflict errors when calling the ``processNext()`` method, we wait a little
-bit to give the thread a chance to start up. This waiting time is defined by a
-module global:
-
- >>> processor.THREAD_STARTUP_WAIT
- 0.050000000000000003
-
-On my machine this number seems to work well. On slower machines you might
-want to raise that number.
-
-Let's give Python enough time to stop all the threads.
-
- >>> time.sleep(0.05)
Modified: Sandbox/gotcha/z3c.taskqueue/src/z3c/taskqueue/tests.py
===================================================================
--- Sandbox/gotcha/z3c.taskqueue/src/z3c/taskqueue/tests.py 2010-03-08 10:23:30 UTC (rev 109800)
+++ Sandbox/gotcha/z3c.taskqueue/src/z3c/taskqueue/tests.py 2010-03-08 10:25:09 UTC (rev 109801)
@@ -79,7 +79,6 @@
unittest.makeSuite(TestIdGenerator),
DocFileSuite('README.txt',
'startlater.txt',
- 'processor.txt',
'TESTING.txt',
setUp=setUp,
tearDown=tearDown,
More information about the checkins
mailing list