[Checkins] SVN: lovely.remotetask/branches/gotcha-z3c-taskqueue/src/lovely/remotetask/processor.py import extracted code
Godefroid Chapelle
gotcha at bubblenet.be
Thu Apr 22 07:21:26 EDT 2010
Log message for revision 111259:
import extracted code
Changed:
U lovely.remotetask/branches/gotcha-z3c-taskqueue/src/lovely/remotetask/processor.py
-=-
Modified: lovely.remotetask/branches/gotcha-z3c-taskqueue/src/lovely/remotetask/processor.py
===================================================================
--- lovely.remotetask/branches/gotcha-z3c-taskqueue/src/lovely/remotetask/processor.py 2010-04-22 11:19:39 UTC (rev 111258)
+++ lovely.remotetask/branches/gotcha-z3c-taskqueue/src/lovely/remotetask/processor.py 2010-04-22 11:21:26 UTC (rev 111259)
@@ -11,146 +11,4 @@
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
-"""Processor Implementations
-
-$Id$
-"""
-__docformat__ = 'restructuredtext'
-import logging
-import threading
-import time
-import transaction
-import zope.interface
-import zope.publisher.base
-import zope.publisher.publish
-from zope.app.publication.zopepublication import ZopePublication
-from zope.security import management
-from zope.security.proxy import removeSecurityProxy
-from zope.traversing.api import traverse
-
-from lovely.remotetask import interfaces
-
-THREAD_STARTUP_WAIT = 0.05
-
-ERROR_MARKER = object()
-
-log = logging.getLogger('lovely.remotetask')
-
-
-class ProcessorPublication(ZopePublication):
- """A custom publication to process the next job."""
-
- def beforeTraversal(self, request):
- # Overwrite this method, so that the publication does not attempt to
- # authenticate; we assume that the processor is allowed to do
- # everything. Note that the DB is not exposed to the job callable.
- management.newInteraction(request)
- transaction.begin()
-
- def traverseName(self, request, ob, name):
- # Provide a very simple traversal mechanism.
- return traverse(removeSecurityProxy(ob), name, None)
-
-class ProcessorRequest(zope.publisher.base.BaseRequest):
- """A custome publisher request for the processor."""
-
- def __init__(self, *args):
- super(ProcessorRequest, self).__init__(None, {}, positional=args)
-
-
-class SimpleProcessor(object):
- """Simple Job Processor
-
- This processor only processes one job at a time.
- """
- zope.interface.implements(interfaces.IProcessor)
-
- @property
- def running(self):
- thread = threading.currentThread()
- if thread is not None:
- return thread.running
- log.error('SimpleProcessor: no currentThread')
- return False
-
- def __init__(self, db, servicePath, waitTime=1.0):
- self.db = db
- self.servicePath = servicePath
- self.waitTime = waitTime
-
- def call(self, method, args=(), errorValue=ERROR_MARKER):
- # Create the path to the method.
- path = self.servicePath[:] + [method]
- path.reverse()
- # Produce a special processor event to be sent to the publisher.
- request = ProcessorRequest(*args)
- request.setPublication(ProcessorPublication(self.db))
- request.setTraversalStack(path)
- # Publish the request, making sure that *all* exceptions are
- # handled. The processor should *never* crash.
- try:
- zope.publisher.publish.publish(request, False)
- return request.response._result
- except Exception, error:
- # This thread should never crash, thus a blank except
- log.error('Processor: ``%s()`` caused an error!' %method)
- log.exception(str(error))
- return errorValue is ERROR_MARKER and error or errorValue
-
- def processNext(self, jobid=None):
- return self.call('processNext', args=(None, jobid))
-
- def __call__(self):
- while self.running:
- result = self.processNext()
- # If there are no jobs available, sleep a little bit and then
- # check again.
- if not result:
- time.sleep(self.waitTime)
-
-
-class MultiProcessor(SimpleProcessor):
- """Multi-threaded Job Processor
-
- This processor can work on multiple jobs at the same time.
- """
- zope.interface.implements(interfaces.IProcessor)
-
- def __init__(self, *args, **kwargs):
- self.maxThreads = kwargs.pop('maxThreads', 5)
- super(MultiProcessor, self).__init__(*args, **kwargs)
- self.threads = []
-
- def hasJobsWaiting(self):
- return self.call('hasJobsWaiting', errorValue=False)
-
- def claimNextJob(self):
- return self.call('claimNextJob', errorValue=None)
-
- def __call__(self):
- # Start the processing loop
- while self.running:
- # Remove all dead threads
- for thread in self.threads:
- if not thread.isAlive():
- self.threads.remove(thread)
- # If the number of threads equals the number of maximum threads,
- # wait a little bit and then start over
- if len(self.threads) == self.maxThreads:
- time.sleep(self.waitTime)
- continue
- # Let's wait for jobs to become available
- while not self.hasJobsWaiting() and self.running:
- time.sleep(self.waitTime)
- # Okay, we have to do some work, so let's do that now. Since we
- # are working with threads, we first have to claim a job and then
- # execute it.
- jobid = self.claimNextJob()
- # If we got a job, let's work on it in a new thread.
- if jobid is not None:
- thread = threading.Thread(
- target=self.processNext, args=(jobid,))
- self.threads.append(thread)
- thread.start()
- # Give the thread some time to start up:
- time.sleep(THREAD_STARTUP_WAIT)
+from z3c.taskqueue.processor import *
More information about the checkins
mailing list