[Checkins] SVN: lovely.remotetask/trunk/ - Refactored processor to be a replacable component. The original processor is

Stephan Richter srichter at cosmos.phy.tufts.edu
Sun Dec 2 19:18:14 EST 2007


Log message for revision 82086:
  - Refactored processor to be a replacable component. The original processor is
    now known as the ``SimpleProcessor``. Also implemented the
    ``MultiProcessor``, which is able to process multiple jobs at once using
    threads.
  

Changed:
  U   lovely.remotetask/trunk/buildout.cfg
  U   lovely.remotetask/trunk/src/lovely/remotetask/README.txt
  U   lovely.remotetask/trunk/src/lovely/remotetask/TESTING.txt
  U   lovely.remotetask/trunk/src/lovely/remotetask/browser/README.txt
  U   lovely.remotetask/trunk/src/lovely/remotetask/ftesting.zcml
  U   lovely.remotetask/trunk/src/lovely/remotetask/interfaces.py
  A   lovely.remotetask/trunk/src/lovely/remotetask/processor.py
  A   lovely.remotetask/trunk/src/lovely/remotetask/processor.txt
  U   lovely.remotetask/trunk/src/lovely/remotetask/service.py
  U   lovely.remotetask/trunk/src/lovely/remotetask/tests.py

-=-
Modified: lovely.remotetask/trunk/buildout.cfg
===================================================================
--- lovely.remotetask/trunk/buildout.cfg	2007-12-03 00:01:41 UTC (rev 82085)
+++ lovely.remotetask/trunk/buildout.cfg	2007-12-03 00:18:13 UTC (rev 82086)
@@ -1,9 +1,14 @@
 [buildout]
 develop = .
-parts = test
+parts = py test
 index = http://download.zope.org/zope3.4
 
 [test]
 recipe = zc.recipe.testrunner
 defaults = ['--tests-pattern', '^f?tests$']
 eggs = lovely.remotetask [test]
+
+[py]
+recipe = zc.recipe.egg
+interpreter = python
+eggs = lovely.remotetask

Modified: lovely.remotetask/trunk/src/lovely/remotetask/README.txt
===================================================================
--- lovely.remotetask/trunk/src/lovely/remotetask/README.txt	2007-12-03 00:01:41 UTC (rev 82085)
+++ lovely.remotetask/trunk/src/lovely/remotetask/README.txt	2007-12-03 00:18:13 UTC (rev 82086)
@@ -34,6 +34,8 @@
 Usage
 _____
 
+  >>> STOP_SLEEP_TIME = 0.02
+
 Let's now start by creating a single service:
 
   >>> from lovely import remotetask
@@ -126,9 +128,9 @@
   >>> service.isProcessing()
   False
 
-The TaskService is being started automatically - if specified in zope.conf -
-as soon as the `IDatabaseOpenedEvent` is fired. Let's emulate the zope.conf
-settings:
+The ``TaskService`` is being started automatically - if specified in
+``zope.conf`` - as soon as the ``IDatabaseOpenedEvent`` is fired. Let's
+emulate the ``zope.conf`` settings:
 
   >>> class Config(object):
   ...     mapping = {}
@@ -151,8 +153,8 @@
 
   >>> log_info.clear()
 
-On Zope startup the IDatabaseOpenedEvent is fired, and will call
-the bootStrap method:
+On Zope startup the ``IDatabaseOpenedEvent`` is fired, and will call
+the ``bootStrap()`` method:
 
   >>> from ZODB.tests import util
   >>> import transaction
@@ -203,6 +205,8 @@
   >>> root_service.isProcessing()
   False
 
+  >>> import time; time.sleep(STOP_SLEEP_TIME)
+
 And reset the logger::
 
   >>> log_info.clear()
@@ -240,6 +244,8 @@
   >>> service.isProcessing()
   False
 
+  >>> import time; time.sleep(STOP_SLEEP_TIME)
+
 Reset the product configuration with the asterisked service names::
 
   >>> config.mapping['autostart'] = '*@*'
@@ -284,6 +290,8 @@
   >>> root_service.isProcessing()
   False
 
+  >>> import time; time.sleep(STOP_SLEEP_TIME)
+
 Reset the product configuration with the asterisked service names::
 
   >>> config.mapping['autostart'] = '*@TestTaskService1'
@@ -320,6 +328,8 @@
   >>> service.isProcessing()
   False
 
+  >>> import time; time.sleep(STOP_SLEEP_TIME)
+
   >>> config.mapping['autostart'] = '*@Foo'
   >>> setProductConfigurations([config])
   >>> getAutostartServiceNames()
@@ -352,6 +362,8 @@
   >>> root_service.isProcessing()
   False
 
+  >>> import time; time.sleep(STOP_SLEEP_TIME)
+
 Let's now read a job:
 
   >>> jobid = service.add(u'echo', {'foo': 'bar'})
@@ -654,8 +666,7 @@
   >>> service2.stopProcessing()
   >>> root_service.stopProcessing()
 
-  >>> import time
-  >>> time.sleep(0.5)
+  >>> import time; time.sleep(STOP_SLEEP_TIME)
 
 The threads have exited now::
 

Modified: lovely.remotetask/trunk/src/lovely/remotetask/TESTING.txt
===================================================================
--- lovely.remotetask/trunk/src/lovely/remotetask/TESTING.txt	2007-12-03 00:01:41 UTC (rev 82085)
+++ lovely.remotetask/trunk/src/lovely/remotetask/TESTING.txt	2007-12-03 00:18:13 UTC (rev 82086)
@@ -3,13 +3,13 @@
 =====================
 
 This package provides an implementation of a remote task execution Web service
-that allows to execute pre-defined tasks on another server. See more info 
+that allows to execute pre-defined tasks on another server. See more info
 about the TaskService in README.txt. This test will test the TaskServiceStub
-implementation. The only different is, that the TaskServiceStub will handle 
+implementation. The only different is, that the TaskServiceStub will handle
 task implementation providing ITaskStub interfaces rather then ITask. This way
 we can register stub tasks for a testing setup. See also another usecase for
-a task service stub implementation which is working with XML-RPC in the 
-package lovely.transcoding. 
+a task service stub implementation which is working with XML-RPC in the
+package lovely.transcoding.
 
 Let's now start by creating a task service stub:
 
@@ -41,7 +41,7 @@
 echo utility used in the REAME.txt tests for the ITaskStub interface:
 
   >>> import zope.component
-  >>> zope.component.provideUtility(echoTask, provides=testing.ITaskStub, 
+  >>> zope.component.provideUtility(echoTask, provides=testing.ITaskStub,
   ...     name='echo')
 
 The echo task is now available in the service:
@@ -110,7 +110,7 @@
   ...     raise remotetask.task.TaskError('An error occurred.')
 
   >>> zope.component.provideUtility(
-  ...     remotetask.task.SimpleTask(error), provides=testing.ITaskStub, 
+  ...     remotetask.task.SimpleTask(error), provides=testing.ITaskStub,
   ...     name='error')
 
 Now add and execute it:

Modified: lovely.remotetask/trunk/src/lovely/remotetask/browser/README.txt
===================================================================
--- lovely.remotetask/trunk/src/lovely/remotetask/browser/README.txt	2007-12-03 00:01:41 UTC (rev 82085)
+++ lovely.remotetask/trunk/src/lovely/remotetask/browser/README.txt	2007-12-03 00:18:13 UTC (rev 82086)
@@ -173,8 +173,11 @@
   >>> transaction.commit()
   >>> service.startProcessing()
   >>> transaction.commit()
-  >>> sleep(1.5)
 
+  >>> import time
+  >>> time.sleep(1.5)
+
+
 Note that the processing thread is daemonic, that way it won't keep the process
 alive unnecessarily.
 
@@ -187,21 +190,23 @@
   >>> service.stopProcessing()
   >>> transaction.commit()
 
+
 We got log entries with the tracebacks of the division error.
 
   >>> logvalue = io.getvalue()
   >>> print logvalue
-  catched a generic exception, preventing thread from crashing
+  Caught a generic exception, preventing thread from crashing
   integer division or modulo by zero
   Traceback (most recent call last):
   ...
   ZeroDivisionError: integer division or modulo by zero
   <BLANKLINE>
 
-We had 3 retries.
+We had 3 retries, but every error is reported twice, once by the processor and
+once from by the task service.
 
   >>> logvalue.count('ZeroDivisionError')
-  3
+  6
 
 The job status is set to 'error'.
 
@@ -227,17 +232,18 @@
 
   >>> logvalue = io.getvalue()
   >>> print logvalue
-  catched a generic exception, preventing thread from crashing
+  Caught a generic exception, preventing thread from crashing
   integer division or modulo by zero
   Traceback (most recent call last):
   ...
   ZeroDivisionError: integer division or modulo by zero
   <BLANKLINE>
 
-We had 3 retries.
+We had 3 retries, but every error is reported twice, once by the processor and
+once from by the task service.
 
   >>> logvalue.count('ZeroDivisionError')
-  3
+  6
 
 The job status is set to 'error'.
 

Modified: lovely.remotetask/trunk/src/lovely/remotetask/ftesting.zcml
===================================================================
--- lovely.remotetask/trunk/src/lovely/remotetask/ftesting.zcml	2007-12-03 00:01:41 UTC (rev 82085)
+++ lovely.remotetask/trunk/src/lovely/remotetask/ftesting.zcml	2007-12-03 00:18:13 UTC (rev 82086)
@@ -4,7 +4,7 @@
            i18n_domain="zope">
 
   <include package="zope.app.securitypolicy" file="meta.zcml" />
-  
+
   <include
       zcml:condition="installed zope.app.zcmlfiles"
       package="zope.app.zcmlfiles"
@@ -37,7 +37,7 @@
    title="Administrator"
    login="mgr"
    password="mgrpw" />
-  
+
   <grant
    role="zope.Manager"
    principal="zope.manager"

Modified: lovely.remotetask/trunk/src/lovely/remotetask/interfaces.py
===================================================================
--- lovely.remotetask/trunk/src/lovely/remotetask/interfaces.py	2007-12-03 00:01:41 UTC (rev 82085)
+++ lovely.remotetask/trunk/src/lovely/remotetask/interfaces.py	2007-12-03 00:18:13 UTC (rev 82086)
@@ -45,7 +45,7 @@
         schema=interface.Interface,
         required=False)
 
-    def __call__(self, service, jobid, input):
+    def __call__(service, jobid, input):
         """Execute the task.
 
         The ``service`` argument is the task service object. It allows access to
@@ -74,6 +74,12 @@
             default = ITask,
             )
 
+    processor = schema.Field(
+            title = u'Processor',
+            description = u'A callable that processes queued jobs using '
+                          u'an infinite loop.',
+            )
+
     def getAvailableTasks():
         """Return a mapping of task name to the task."""
 
@@ -114,6 +120,12 @@
     def getError(jobid):
         """Get the error of the job."""
 
+    def hasJobWaiting():
+        """Determine whether there are jobs that need to be processed.
+
+        Returns a simple boolean.
+        """
+
     def processNext():
         """Process the next job in the queue."""
 
@@ -260,3 +272,30 @@
             required = False,
             value_type = schema.TextLine()
             )
+
+
+class IProcessor(interface.Interface):
+    """Job Processor
+
+    Process the jobs that are waiting in the queue. A processor is meant to
+    be run in a separate thread. To complete a job, it simply calls back into
+    the task server. This works, since it does not use up any Web server
+    threads.
+
+    Processing a job can take a long time. However, we do not have to worry
+    about transaction conflicts, since no other request is touching the job
+    object.
+    """
+
+    running = schema.Bool(
+        title=u"Running Flag",
+        description=u"Tells whether the processor is currently running.",
+        readonly=True)
+
+    def __call__(db, servicePath):
+        """Run the processor.
+
+        The ``db`` is a ZODB instance that is used to call back into the task
+        service. The ``servicePath`` specifies how to traverse to the task
+        service itself.
+        """

Added: lovely.remotetask/trunk/src/lovely/remotetask/processor.py
===================================================================
--- lovely.remotetask/trunk/src/lovely/remotetask/processor.py	                        (rev 0)
+++ lovely.remotetask/trunk/src/lovely/remotetask/processor.py	2007-12-03 00:18:13 UTC (rev 82086)
@@ -0,0 +1,152 @@
+##############################################################################
+#
+# Copyright (c) 2006, 2007 Lovely Systems and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""Processor Implementations
+
+$Id$
+"""
+__docformat__ = 'restructuredtext'
+import logging
+import threading
+import time
+import transaction
+import zope.interface
+import zope.publisher.base
+import zope.publisher.publish
+from zope.app.publication.zopepublication import ZopePublication
+from zope.security import management
+from zope.security.proxy import removeSecurityProxy
+from zope.traversing.api import traverse
+
+from lovely.remotetask import interfaces
+
+THREAD_STARTUP_WAIT = 0.05
+
+ERROR_MARKER = object()
+
+log = logging.getLogger('lovely.remotetask')
+
+
+class ProcessorPublication(ZopePublication):
+    """A custom publication to process the next job."""
+
+    def beforeTraversal(self, request):
+        # Overwrite this method, so that the publication does not attempt to
+        # authenticate; we assume that the processor is allowed to do
+        # everything. Note that the DB is not exposed to the job callable.
+        management.newInteraction(request)
+        transaction.begin()
+
+    def traverseName(self, request, ob, name):
+        # Provide a very simple traversal mechanism.
+        return traverse(removeSecurityProxy(ob), name, None)
+
+class ProcessorRequest(zope.publisher.base.BaseRequest):
+    """A custome publisher request for the processor."""
+
+    def __init__(self, *args):
+        super(ProcessorRequest, self).__init__(None, {}, positional=args)
+
+
+class SimpleProcessor(object):
+    """Simple Job Processor
+
+    This processor only processes one job at a time.
+    """
+    zope.interface.implements(interfaces.IProcessor)
+
+    @property
+    def running(self):
+        return threading.currentThread().running
+
+    def __init__(self, db, servicePath, waitTime=1.0):
+        self.db = db
+        self.servicePath = servicePath
+        self.waitTime = waitTime
+
+    def call(self, method, args=(), errorValue=ERROR_MARKER):
+        # Create the path to the method.
+        path = self.servicePath[:] + [method]
+        path.reverse()
+        # Produce a special processor event to be sent to the publisher.
+        request = ProcessorRequest(*args)
+        request.setPublication(ProcessorPublication(self.db))
+        request.setTraversalStack(path)
+        # Publish the request, amking sure that *all* exceptions are
+        # handled. The processot should *never* crash.
+        try:
+            zope.publisher.publish.publish(request, False)
+            return request.response._result
+        except Exception, error:
+            # This thread should never crash, thus a blank except
+            log.error('Processor: ``%s()`` caused an error!' %method)
+            log.exception(error)
+            return errorValue is ERROR_MARKER and error or errorValue
+
+    def processNext(self, jobid=None):
+        return self.call('processNext', args=(None, jobid))
+
+    def __call__(self):
+        while self.running:
+            result = self.processNext()
+            # If there are no jobs available, sleep a little bit and then
+            # check again.
+            if not result:
+                time.sleep(self.waitTime)
+
+
+class MultiProcessor(SimpleProcessor):
+    """Multi-threaded Job Processor
+
+    This processor can work on multiple jobs at the same time.
+    """
+    zope.interface.implements(interfaces.IProcessor)
+
+    def __init__(self, *args, **kwargs):
+        self.maxThreads = kwargs.pop('maxThreads', 5)
+        super(MultiProcessor, self).__init__(*args, **kwargs)
+        self.threads = []
+
+    def hasJobsWaiting(self):
+        return self.call('hasJobsWaiting', errorValue=False)
+
+    def claimNextJob(self):
+        return self.call('claimNextJob', errorValue=None)
+
+    def __call__(self):
+        # Start the processing loop
+        while self.running:
+            # Remove all dead threads
+            for thread in self.threads:
+                if not thread.isAlive():
+                    self.threads.remove(thread)
+            # If the number of threads equals the number of maximum threads,
+            # wait a little bit and then start over
+            if len(self.threads) == self.maxThreads:
+                time.sleep(self.waitTime)
+                continue
+            # Let's wait for jobs to become available
+            while not self.hasJobsWaiting() and self.running:
+                time.sleep(self.waitTime)
+            # Okay, we have to do some work, so let's do that now. Since we
+            # are working with threads, we first have to claim a job and then
+            # execute it.
+            jobid = self.claimNextJob()
+            # If we got a job, let's work on it in a new thread.
+            if jobid is not None:
+                thread = threading.Thread(
+                    target=self.processNext, args=(jobid,))
+                self.threads.append(thread)
+                thread.start()
+                # Give the thread some time to start up:
+                time.sleep(THREAD_STARTUP_WAIT)


Property changes on: lovely.remotetask/trunk/src/lovely/remotetask/processor.py
___________________________________________________________________
Name: svn:keywords
   + Id

Added: lovely.remotetask/trunk/src/lovely/remotetask/processor.txt
===================================================================
--- lovely.remotetask/trunk/src/lovely/remotetask/processor.txt	                        (rev 0)
+++ lovely.remotetask/trunk/src/lovely/remotetask/processor.txt	2007-12-03 00:18:13 UTC (rev 82086)
@@ -0,0 +1,263 @@
+==============
+Job Processors
+==============
+
+The actual processing of the jobs in a queue is handled by a spearate
+component, known as the job processor. This component usually runs in its own
+thread and provides its own main loop.
+
+  >>> from lovely.remotetask import processor
+
+The ``processor`` module provides several implementations of the processor
+API. Let's create the necessary components to test the processor:
+
+1. Create the task service and add it to the root site:
+
+  >>> from lovely import remotetask
+  >>> tasks = remotetask.TaskService()
+
+  >>> sm = root['tasks'] = tasks
+
+2. Register the service as a utility:
+
+  >>> from lovely.remotetask import interfaces
+  >>> sm = root.getSiteManager()
+  >>> sm.registerUtility(tasks, interfaces.ITaskService, name='tasks')
+
+3. Register a task that simply sleeps and writes a message:
+
+  >>> import logging
+  >>> log = logging.getLogger('lovely.remotetask')
+  >>> import time
+  >>> def sleep((sleepTime, id)):
+  ...     time.sleep(sleepTime)
+  ...     log.info('Job: %i' %id)
+
+  >>> import lovely.remotetask.task
+  >>> sleepTask = remotetask.task.SimpleTask(sleep)
+
+  >>> import zope.component
+  >>> zope.component.provideUtility(sleepTask, name='sleep')
+
+4. Setup a database:
+
+  >>> from ZODB.tests import util
+  >>> import transaction
+  >>> db = util.DB()
+
+  >>> from zope.app.publication.zopepublication import ZopePublication
+  >>> conn = db.open()
+  >>> conn.root()[ZopePublication.root_name] = root
+  >>> transaction.commit()
+
+
+The Simple Processor
+--------------------
+
+This processor executes one job at a time. It was designed for jobs that would
+take a long time and use up most of the processing power of a computer. It is
+also the default processor for the task service.
+
+Let's first register a few tasks:
+
+  >>> jobid = tasks.add(u'sleep', (0.04, 1))
+  >>> jobid = tasks.add(u'sleep', (0.1,  2))
+  >>> jobid = tasks.add(u'sleep', (0,    3))
+  >>> jobid = tasks.add(u'sleep', (0.08, 4))
+  >>> transaction.commit()
+
+Let's start by executing a job directly. The first argument to the simple
+processor constructor is the database and the second the traversal stack to
+the task service. All other arguments are optional:
+
+  >>> proc = processor.SimpleProcessor(
+  ...     tasks._p_jar.db(), ['tasks'], waitTime=0.0)
+
+Let's now process the first job. We clear the log and we also have to end any
+existing interactions in order to process the job in this thread:
+
+  >>> log_info.clear()
+
+  >>> from zope.security import management
+  >>> management.endInteraction()
+  >>> proc.processNext()
+  True
+
+  >>> print log_info
+  lovely.remotetask INFO
+    Job: 1
+
+Let's now use the processor from within the task service. Since the processor
+constructors also accept additional arguments, they are specified as well:
+
+  >>> tasks.processorFactory
+  <class 'lovely.remotetask.processor.SimpleProcessor'>
+  >>> tasks.processorArguments
+  {'waitTime': 0.0}
+
+The wait time has been set to zero for testing purposes only. It is really set
+to 1 second by default. Let's now start processing tasks, wait a little bit
+for all the jobs to complete and then stop processing again:
+
+  >>> tasks.startProcessing()
+  >>> transaction.commit()
+
+  >>> time.sleep(0.5)
+
+  >>> tasks.stopProcessing()
+  >>> transaction.commit()
+
+The log shows that all jobs have been processed. But more importantly, they
+were all completed in the order they were defined.
+
+  >>> print log_info
+  lovely.remotetask INFO
+    Job: 1
+  lovely.remotetask INFO
+    Job: 2
+  lovely.remotetask INFO
+    Job: 3
+  lovely.remotetask INFO
+    Job: 4
+
+
+The Multi-thread Processor
+--------------------------
+
+The multi-threaded processor executes several jobs at once. It was designed
+for jobs that would take a long time but use very little processing power.
+
+Let's add a few new tasks to execute:
+
+  >>> jobid = tasks.add(u'sleep', (0.04,  1))
+  >>> jobid = tasks.add(u'sleep', (0.18,  2))
+  >>> jobid = tasks.add(u'sleep', (0,     3))
+  >>> jobid = tasks.add(u'sleep', (0.02,  4))
+  >>> transaction.commit()
+
+Before testing the processor in the task service, let's have a look at every
+method by itself. So we instantiate the processor:
+
+  >>> proc = processor.MultiProcessor(
+  ...     tasks._p_jar.db(), ['tasks'], waitTime=0)
+
+The maximum amount of threads can be set as well:
+
+  >>> proc.maxThreads
+  5
+
+All working threads can be reviewed at any time:
+
+  >>> proc.threads
+  []
+
+At any time you can ask the service whether any jobs are waiting to be
+executed:
+
+  >>> from zope.security import management
+  >>> management.endInteraction()
+
+  >>> proc.hasJobsWaiting()
+  True
+
+Once you know that jobs are available, you can claim a job:
+
+  >>> jobid = proc.claimNextJob()
+  >>> jobid
+  5
+
+We need to claim a job before executing it, so that the database marks the job
+as claimed and no new thread picks up the job. Once we claimed a particular
+job, we can process it:
+
+  >>> log_info.clear()
+
+  >>> proc.processNext(jobid)
+  True
+
+  >>> print log_info
+  lovely.remotetask INFO
+    Job: 1
+
+Let's now have a look at using the processor in the task service. This
+primarily means setting the processor factory:
+
+  >>> management.newInteraction()
+
+  >>> tasks.processorFactory = processor.MultiProcessor
+  >>> transaction.commit()
+
+  >>> log_info.clear()
+
+Let's now process the remaining jobs:
+
+  >>> tasks.startProcessing()
+  >>> transaction.commit()
+
+  >>> time.sleep(0.5)
+
+  >>> tasks.stopProcessing()
+  >>> transaction.commit()
+
+As you can see, this time the jobs are not completed in order, because they
+all need different time to execute:
+
+  >>> print log_info
+  lovely.remotetask INFO
+    Job: 3
+  lovely.remotetask INFO
+    Job: 4
+  lovely.remotetask INFO
+    Job: 2
+
+Let's now set the thread limit to two and construct a new set of tasks that
+demonstrate that not more than two threads run at the same time:
+
+  >>> tasks.processorArguments = {'waitTime': 0.0, 'maxThreads': 2}
+
+  >>> jobid = tasks.add(u'sleep', (0.03, 1))
+  >>> jobid = tasks.add(u'sleep', (0.05, 2))
+  >>> jobid = tasks.add(u'sleep', (0.03, 3))
+  >>> jobid = tasks.add(u'sleep', (0.08, 4))
+  >>> transaction.commit()
+
+If all tasks are processed at once, job 3 should be done first, but since the
+job has to wait for an available thread, it will come in third. We can now run
+the jobs and see the result:
+
+  >>> log_info.clear()
+
+  >>> tasks.startProcessing()
+  >>> transaction.commit()
+
+  >>> time.sleep(0.5)
+
+  >>> tasks.stopProcessing()
+  >>> transaction.commit()
+
+  >>> print log_info
+  lovely.remotetask INFO
+    Job: 1
+  lovely.remotetask INFO
+    Job: 2
+  lovely.remotetask INFO
+    Job: 3
+  lovely.remotetask INFO
+    Job: 4
+
+Note: Sometimes (about 20% of the time in this test) the log will contain a
+conflict error. As long as the error is *not* produced by ``processNext()``,
+they are harmless, even though they are logged on the error level. To avoid
+conflict errors when calling the ``processNext()`` method, we wait a little
+bit to give the thread a chance to start up. This waiting time is defined by a
+module global:
+
+  >>> processor.THREAD_STARTUP_WAIT
+  0.050000000000000003
+
+On my machine this number seems to work well. On slower machines you might
+want to raise that number.
+
+Let's give Python enough time to stop all the threads.
+
+  >>> time.sleep(0.05)


Property changes on: lovely.remotetask/trunk/src/lovely/remotetask/processor.txt
___________________________________________________________________
Name: svn:eol-style
   + native

Modified: lovely.remotetask/trunk/src/lovely/remotetask/service.py
===================================================================
--- lovely.remotetask/trunk/src/lovely/remotetask/service.py	2007-12-03 00:01:41 UTC (rev 82085)
+++ lovely.remotetask/trunk/src/lovely/remotetask/service.py	2007-12-03 00:18:13 UTC (rev 82086)
@@ -1,6 +1,6 @@
 ##############################################################################
 #
-# Copyright (c) 2006 Lovely Systems and Contributors.
+# Copyright (c) 2006, 2007 Lovely Systems and Contributors.
 # All Rights Reserved.
 #
 # This software is subject to the provisions of the Zope Public License,
@@ -24,8 +24,6 @@
 import time
 import zc.queue
 import zope.interface
-import zope.publisher.base
-import zope.publisher.publish
 from BTrees.IOBTree import IOBTree
 from zope import component
 from zope.app import zapi
@@ -33,17 +31,13 @@
 from zope.app.container import contained
 from zope.app.component.interfaces import ISite
 from zope.app.publication.zopepublication import ZopePublication
-from zope.security.proxy import removeSecurityProxy
-from zope.traversing.api import traverse
 from zope.component.interfaces import ComponentLookupError
-from lovely.remotetask import interfaces, job, task
+from lovely.remotetask import interfaces, job, task, processor
 
 log = logging.getLogger('lovely.remotetask')
 
 storage = threading.local()
 
-SLEEP_TIME = 1
-
 class TaskService(contained.Contained, persistent.Persistent):
     """A persistent task service.
 
@@ -52,6 +46,8 @@
     zope.interface.implements(interfaces.ITaskService)
 
     taskInterface = interfaces.ITask
+    processorFactory = processor.SimpleProcessor
+    processorArguments = {'waitTime': 1.0}
 
     _scheduledJobs  = None
     _scheduledQueue = None
@@ -150,15 +146,15 @@
             self._scheduledJobs = IOBTree()
         if self._scheduledQueue == None:
             self._scheduledQueue = zc.queue.PersistentQueue()
-        path = [parent.__name__ for parent in zapi.getParents(self)
-                if parent.__name__]
-        path.reverse()
-        path.append(self.__name__)
-        path.append('processNext')
-
-        thread = threading.Thread(
-            target=processor, args=(self._p_jar.db(), path),
-            name=self._threadName())
+        # Create the path to the service within the DB.
+        servicePath = [parent.__name__ for parent in zapi.getParents(self)
+                       if parent.__name__]
+        servicePath.reverse()
+        servicePath.append(self.__name__)
+        # Start the thread running the processor inside.
+        processor = self.processorFactory(
+            self._p_jar.db(), servicePath, **self.processorArguments)
+        thread = threading.Thread(target=processor, name=self._threadName())
         thread.setDaemon(True)
         thread.running = True
         thread.start()
@@ -195,8 +191,38 @@
         path.append(self.__name__)
         return '.'.join(path)
 
-    def processNext(self, now=None):
+    def hasJobsWaiting(self, now=None):
+        # If there is are any simple jobs in the queue, we have work to do.
+        if self._queue:
+            return True
+        # First, move new cron jobs from the scheduled queue into the cronjob
+        # list.
+        if now is None:
+            now = int(time.time())
+        while len(self._scheduledQueue) > 0:
+            job = self._scheduledQueue.pull()
+            if job.status is not interfaces.CANCELLED:
+                self._insertCronJob(job, now)
+        # Now get all jobs that should be done now or earlier; if there are
+        # any that do not have errors or are cancelled, then we have jobs to
+        # do.
+        for key in self._scheduledJobs.keys(max=now):
+            jobs = [job for job in self._scheduledJobs[key]
+                    if job.status not in (interfaces.CANCELLED,
+                                          interfaces.ERROR)]
+            if jobs:
+                return True
+        return False
+
+    def claimNextJob(self, now=None):
         job = self._pullJob(now)
+        return job and job.id or None
+
+    def processNext(self, now=None, jobid=None):
+        if jobid is None:
+            job = self._pullJob(now)
+        else:
+            job = self.jobs[jobid]
         if job is None:
             return False
         try:
@@ -222,8 +248,8 @@
                 job.status = interfaces.ERROR
         except Exception, error:
             if storage.runCount <= 3:
-                log.error(
-                    'catched a generic exception, preventing thread from crashing')
+                log.error('Caught a generic exception, preventing thread '
+                          'from crashing')
                 log.exception(error)
                 raise
             else:
@@ -244,7 +270,7 @@
         # list
         if now is None:
             now = int(time.time())
-        while len(self._scheduledQueue)>0:
+        while len(self._scheduledQueue) > 0:
             job = self._scheduledQueue.pull()
             if job.status is not interfaces.CANCELLED:
                 self._insertCronJob(job, now)
@@ -292,33 +318,6 @@
         self._scheduledJobs[nextCallTime] = jobs + (job,)
 
 
-class ProcessorPublication(ZopePublication):
-    """A custom publication to process the next job."""
-
-    def traverseName(self, request, ob, name):
-        return traverse(removeSecurityProxy(ob), name, None)
-
-
-def processor(db, path):
-    """Job Processor
-
-    Process the jobs that are waiting in the queue. This processor is meant to
-    be run in a separate process; however, it simply goes back to the task
-    service to actually do the processing.
-    """
-    path.reverse()
-    while threading.currentThread().running:
-        request = zope.publisher.base.BaseRequest(None, {})
-        request.setPublication(ProcessorPublication(db))
-        request.setTraversalStack(path)
-        try:
-            zope.publisher.publish.publish(request, False)
-            if not request.response._result:
-                time.sleep(SLEEP_TIME)
-        except:
-            # This thread should never crash, thus a blank except
-            pass
-
 def getAutostartServiceNames():
     """get a list of services to start"""
 

Modified: lovely.remotetask/trunk/src/lovely/remotetask/tests.py
===================================================================
--- lovely.remotetask/trunk/src/lovely/remotetask/tests.py	2007-12-03 00:01:41 UTC (rev 82085)
+++ lovely.remotetask/trunk/src/lovely/remotetask/tests.py	2007-12-03 00:18:13 UTC (rev 82086)
@@ -35,13 +35,15 @@
 
     log_info = InstalledHandler('lovely.remotetask')
     test.globs['log_info'] = log_info
-    service.SLEEP_TIME = 0
+    test.origArgs = service.TaskService.processorArguments
+    service.TaskService.processorArguments = {'waitTime': 0.0}
 
 def tearDown(test):
     placefulTearDown()
     log_info = test.globs['log_info']
+    log_info.clear()
     log_info.uninstall()
-    service.SLEEP_TIME = 1
+    service.TaskService.processorArguments = test.origArgs
 
 def test_suite():
     return unittest.TestSuite((
@@ -52,6 +54,12 @@
                      |doctest.ELLIPSIS
                      |INTERPRET_FOOTNOTES
                      ),
+        DocFileSuite('processor.txt',
+                     setUp=setUp,
+                     tearDown=tearDown,
+                     optionflags=doctest.NORMALIZE_WHITESPACE
+                     |doctest.ELLIPSIS
+                     ),
         DocFileSuite('TESTING.txt',
                      setUp=placelesssetup.setUp,
                      tearDown=placelesssetup.tearDown,



More information about the Checkins mailing list