[Checkins] SVN: lovely.remotetask/trunk/ - Refactored processor to
be a replacable component. The original processor is
Stephan Richter
srichter at cosmos.phy.tufts.edu
Sun Dec 2 19:18:14 EST 2007
Log message for revision 82086:
- Refactored processor to be a replacable component. The original processor is
now known as the ``SimpleProcessor``. Also implemented the
``MultiProcessor``, which is able to process multiple jobs at once using
threads.
Changed:
U lovely.remotetask/trunk/buildout.cfg
U lovely.remotetask/trunk/src/lovely/remotetask/README.txt
U lovely.remotetask/trunk/src/lovely/remotetask/TESTING.txt
U lovely.remotetask/trunk/src/lovely/remotetask/browser/README.txt
U lovely.remotetask/trunk/src/lovely/remotetask/ftesting.zcml
U lovely.remotetask/trunk/src/lovely/remotetask/interfaces.py
A lovely.remotetask/trunk/src/lovely/remotetask/processor.py
A lovely.remotetask/trunk/src/lovely/remotetask/processor.txt
U lovely.remotetask/trunk/src/lovely/remotetask/service.py
U lovely.remotetask/trunk/src/lovely/remotetask/tests.py
-=-
Modified: lovely.remotetask/trunk/buildout.cfg
===================================================================
--- lovely.remotetask/trunk/buildout.cfg 2007-12-03 00:01:41 UTC (rev 82085)
+++ lovely.remotetask/trunk/buildout.cfg 2007-12-03 00:18:13 UTC (rev 82086)
@@ -1,9 +1,14 @@
[buildout]
develop = .
-parts = test
+parts = py test
index = http://download.zope.org/zope3.4
[test]
recipe = zc.recipe.testrunner
defaults = ['--tests-pattern', '^f?tests$']
eggs = lovely.remotetask [test]
+
+[py]
+recipe = zc.recipe.egg
+interpreter = python
+eggs = lovely.remotetask
Modified: lovely.remotetask/trunk/src/lovely/remotetask/README.txt
===================================================================
--- lovely.remotetask/trunk/src/lovely/remotetask/README.txt 2007-12-03 00:01:41 UTC (rev 82085)
+++ lovely.remotetask/trunk/src/lovely/remotetask/README.txt 2007-12-03 00:18:13 UTC (rev 82086)
@@ -34,6 +34,8 @@
Usage
_____
+ >>> STOP_SLEEP_TIME = 0.02
+
Let's now start by creating a single service:
>>> from lovely import remotetask
@@ -126,9 +128,9 @@
>>> service.isProcessing()
False
-The TaskService is being started automatically - if specified in zope.conf -
-as soon as the `IDatabaseOpenedEvent` is fired. Let's emulate the zope.conf
-settings:
+The ``TaskService`` is being started automatically - if specified in
+``zope.conf`` - as soon as the ``IDatabaseOpenedEvent`` is fired. Let's
+emulate the ``zope.conf`` settings:
>>> class Config(object):
... mapping = {}
@@ -151,8 +153,8 @@
>>> log_info.clear()
-On Zope startup the IDatabaseOpenedEvent is fired, and will call
-the bootStrap method:
+On Zope startup the ``IDatabaseOpenedEvent`` is fired, and will call
+the ``bootStrap()`` method:
>>> from ZODB.tests import util
>>> import transaction
@@ -203,6 +205,8 @@
>>> root_service.isProcessing()
False
+ >>> import time; time.sleep(STOP_SLEEP_TIME)
+
And reset the logger::
>>> log_info.clear()
@@ -240,6 +244,8 @@
>>> service.isProcessing()
False
+ >>> import time; time.sleep(STOP_SLEEP_TIME)
+
Reset the product configuration with the asterisked service names::
>>> config.mapping['autostart'] = '*@*'
@@ -284,6 +290,8 @@
>>> root_service.isProcessing()
False
+ >>> import time; time.sleep(STOP_SLEEP_TIME)
+
Reset the product configuration with the asterisked service names::
>>> config.mapping['autostart'] = '*@TestTaskService1'
@@ -320,6 +328,8 @@
>>> service.isProcessing()
False
+ >>> import time; time.sleep(STOP_SLEEP_TIME)
+
>>> config.mapping['autostart'] = '*@Foo'
>>> setProductConfigurations([config])
>>> getAutostartServiceNames()
@@ -352,6 +362,8 @@
>>> root_service.isProcessing()
False
+ >>> import time; time.sleep(STOP_SLEEP_TIME)
+
Let's now read a job:
>>> jobid = service.add(u'echo', {'foo': 'bar'})
@@ -654,8 +666,7 @@
>>> service2.stopProcessing()
>>> root_service.stopProcessing()
- >>> import time
- >>> time.sleep(0.5)
+ >>> import time; time.sleep(STOP_SLEEP_TIME)
The threads have exited now::
Modified: lovely.remotetask/trunk/src/lovely/remotetask/TESTING.txt
===================================================================
--- lovely.remotetask/trunk/src/lovely/remotetask/TESTING.txt 2007-12-03 00:01:41 UTC (rev 82085)
+++ lovely.remotetask/trunk/src/lovely/remotetask/TESTING.txt 2007-12-03 00:18:13 UTC (rev 82086)
@@ -3,13 +3,13 @@
=====================
This package provides an implementation of a remote task execution Web service
-that allows to execute pre-defined tasks on another server. See more info
+that allows to execute pre-defined tasks on another server. See more info
about the TaskService in README.txt. This test will test the TaskServiceStub
-implementation. The only different is, that the TaskServiceStub will handle
+implementation. The only different is, that the TaskServiceStub will handle
task implementation providing ITaskStub interfaces rather then ITask. This way
we can register stub tasks for a testing setup. See also another usecase for
-a task service stub implementation which is working with XML-RPC in the
-package lovely.transcoding.
+a task service stub implementation which is working with XML-RPC in the
+package lovely.transcoding.
Let's now start by creating a task service stub:
@@ -41,7 +41,7 @@
echo utility used in the REAME.txt tests for the ITaskStub interface:
>>> import zope.component
- >>> zope.component.provideUtility(echoTask, provides=testing.ITaskStub,
+ >>> zope.component.provideUtility(echoTask, provides=testing.ITaskStub,
... name='echo')
The echo task is now available in the service:
@@ -110,7 +110,7 @@
... raise remotetask.task.TaskError('An error occurred.')
>>> zope.component.provideUtility(
- ... remotetask.task.SimpleTask(error), provides=testing.ITaskStub,
+ ... remotetask.task.SimpleTask(error), provides=testing.ITaskStub,
... name='error')
Now add and execute it:
Modified: lovely.remotetask/trunk/src/lovely/remotetask/browser/README.txt
===================================================================
--- lovely.remotetask/trunk/src/lovely/remotetask/browser/README.txt 2007-12-03 00:01:41 UTC (rev 82085)
+++ lovely.remotetask/trunk/src/lovely/remotetask/browser/README.txt 2007-12-03 00:18:13 UTC (rev 82086)
@@ -173,8 +173,11 @@
>>> transaction.commit()
>>> service.startProcessing()
>>> transaction.commit()
- >>> sleep(1.5)
+ >>> import time
+ >>> time.sleep(1.5)
+
+
Note that the processing thread is daemonic, that way it won't keep the process
alive unnecessarily.
@@ -187,21 +190,23 @@
>>> service.stopProcessing()
>>> transaction.commit()
+
We got log entries with the tracebacks of the division error.
>>> logvalue = io.getvalue()
>>> print logvalue
- catched a generic exception, preventing thread from crashing
+ Caught a generic exception, preventing thread from crashing
integer division or modulo by zero
Traceback (most recent call last):
...
ZeroDivisionError: integer division or modulo by zero
<BLANKLINE>
-We had 3 retries.
+We had 3 retries, but every error is reported twice, once by the processor and
+once from by the task service.
>>> logvalue.count('ZeroDivisionError')
- 3
+ 6
The job status is set to 'error'.
@@ -227,17 +232,18 @@
>>> logvalue = io.getvalue()
>>> print logvalue
- catched a generic exception, preventing thread from crashing
+ Caught a generic exception, preventing thread from crashing
integer division or modulo by zero
Traceback (most recent call last):
...
ZeroDivisionError: integer division or modulo by zero
<BLANKLINE>
-We had 3 retries.
+We had 3 retries, but every error is reported twice, once by the processor and
+once from by the task service.
>>> logvalue.count('ZeroDivisionError')
- 3
+ 6
The job status is set to 'error'.
Modified: lovely.remotetask/trunk/src/lovely/remotetask/ftesting.zcml
===================================================================
--- lovely.remotetask/trunk/src/lovely/remotetask/ftesting.zcml 2007-12-03 00:01:41 UTC (rev 82085)
+++ lovely.remotetask/trunk/src/lovely/remotetask/ftesting.zcml 2007-12-03 00:18:13 UTC (rev 82086)
@@ -4,7 +4,7 @@
i18n_domain="zope">
<include package="zope.app.securitypolicy" file="meta.zcml" />
-
+
<include
zcml:condition="installed zope.app.zcmlfiles"
package="zope.app.zcmlfiles"
@@ -37,7 +37,7 @@
title="Administrator"
login="mgr"
password="mgrpw" />
-
+
<grant
role="zope.Manager"
principal="zope.manager"
Modified: lovely.remotetask/trunk/src/lovely/remotetask/interfaces.py
===================================================================
--- lovely.remotetask/trunk/src/lovely/remotetask/interfaces.py 2007-12-03 00:01:41 UTC (rev 82085)
+++ lovely.remotetask/trunk/src/lovely/remotetask/interfaces.py 2007-12-03 00:18:13 UTC (rev 82086)
@@ -45,7 +45,7 @@
schema=interface.Interface,
required=False)
- def __call__(self, service, jobid, input):
+ def __call__(service, jobid, input):
"""Execute the task.
The ``service`` argument is the task service object. It allows access to
@@ -74,6 +74,12 @@
default = ITask,
)
+ processor = schema.Field(
+ title = u'Processor',
+ description = u'A callable that processes queued jobs using '
+ u'an infinite loop.',
+ )
+
def getAvailableTasks():
"""Return a mapping of task name to the task."""
@@ -114,6 +120,12 @@
def getError(jobid):
"""Get the error of the job."""
+ def hasJobWaiting():
+ """Determine whether there are jobs that need to be processed.
+
+ Returns a simple boolean.
+ """
+
def processNext():
"""Process the next job in the queue."""
@@ -260,3 +272,30 @@
required = False,
value_type = schema.TextLine()
)
+
+
+class IProcessor(interface.Interface):
+ """Job Processor
+
+ Process the jobs that are waiting in the queue. A processor is meant to
+ be run in a separate thread. To complete a job, it simply calls back into
+ the task server. This works, since it does not use up any Web server
+ threads.
+
+ Processing a job can take a long time. However, we do not have to worry
+ about transaction conflicts, since no other request is touching the job
+ object.
+ """
+
+ running = schema.Bool(
+ title=u"Running Flag",
+ description=u"Tells whether the processor is currently running.",
+ readonly=True)
+
+ def __call__(db, servicePath):
+ """Run the processor.
+
+ The ``db`` is a ZODB instance that is used to call back into the task
+ service. The ``servicePath`` specifies how to traverse to the task
+ service itself.
+ """
Added: lovely.remotetask/trunk/src/lovely/remotetask/processor.py
===================================================================
--- lovely.remotetask/trunk/src/lovely/remotetask/processor.py (rev 0)
+++ lovely.remotetask/trunk/src/lovely/remotetask/processor.py 2007-12-03 00:18:13 UTC (rev 82086)
@@ -0,0 +1,152 @@
+##############################################################################
+#
+# Copyright (c) 2006, 2007 Lovely Systems and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""Processor Implementations
+
+$Id$
+"""
+__docformat__ = 'restructuredtext'
+import logging
+import threading
+import time
+import transaction
+import zope.interface
+import zope.publisher.base
+import zope.publisher.publish
+from zope.app.publication.zopepublication import ZopePublication
+from zope.security import management
+from zope.security.proxy import removeSecurityProxy
+from zope.traversing.api import traverse
+
+from lovely.remotetask import interfaces
+
+THREAD_STARTUP_WAIT = 0.05
+
+ERROR_MARKER = object()
+
+log = logging.getLogger('lovely.remotetask')
+
+
+class ProcessorPublication(ZopePublication):
+ """A custom publication to process the next job."""
+
+ def beforeTraversal(self, request):
+ # Overwrite this method, so that the publication does not attempt to
+ # authenticate; we assume that the processor is allowed to do
+ # everything. Note that the DB is not exposed to the job callable.
+ management.newInteraction(request)
+ transaction.begin()
+
+ def traverseName(self, request, ob, name):
+ # Provide a very simple traversal mechanism.
+ return traverse(removeSecurityProxy(ob), name, None)
+
+class ProcessorRequest(zope.publisher.base.BaseRequest):
+ """A custome publisher request for the processor."""
+
+ def __init__(self, *args):
+ super(ProcessorRequest, self).__init__(None, {}, positional=args)
+
+
+class SimpleProcessor(object):
+ """Simple Job Processor
+
+ This processor only processes one job at a time.
+ """
+ zope.interface.implements(interfaces.IProcessor)
+
+ @property
+ def running(self):
+ return threading.currentThread().running
+
+ def __init__(self, db, servicePath, waitTime=1.0):
+ self.db = db
+ self.servicePath = servicePath
+ self.waitTime = waitTime
+
+ def call(self, method, args=(), errorValue=ERROR_MARKER):
+ # Create the path to the method.
+ path = self.servicePath[:] + [method]
+ path.reverse()
+ # Produce a special processor event to be sent to the publisher.
+ request = ProcessorRequest(*args)
+ request.setPublication(ProcessorPublication(self.db))
+ request.setTraversalStack(path)
+ # Publish the request, amking sure that *all* exceptions are
+ # handled. The processot should *never* crash.
+ try:
+ zope.publisher.publish.publish(request, False)
+ return request.response._result
+ except Exception, error:
+ # This thread should never crash, thus a blank except
+ log.error('Processor: ``%s()`` caused an error!' %method)
+ log.exception(error)
+ return errorValue is ERROR_MARKER and error or errorValue
+
+ def processNext(self, jobid=None):
+ return self.call('processNext', args=(None, jobid))
+
+ def __call__(self):
+ while self.running:
+ result = self.processNext()
+ # If there are no jobs available, sleep a little bit and then
+ # check again.
+ if not result:
+ time.sleep(self.waitTime)
+
+
+class MultiProcessor(SimpleProcessor):
+ """Multi-threaded Job Processor
+
+ This processor can work on multiple jobs at the same time.
+ """
+ zope.interface.implements(interfaces.IProcessor)
+
+ def __init__(self, *args, **kwargs):
+ self.maxThreads = kwargs.pop('maxThreads', 5)
+ super(MultiProcessor, self).__init__(*args, **kwargs)
+ self.threads = []
+
+ def hasJobsWaiting(self):
+ return self.call('hasJobsWaiting', errorValue=False)
+
+ def claimNextJob(self):
+ return self.call('claimNextJob', errorValue=None)
+
+ def __call__(self):
+ # Start the processing loop
+ while self.running:
+ # Remove all dead threads
+ for thread in self.threads:
+ if not thread.isAlive():
+ self.threads.remove(thread)
+ # If the number of threads equals the number of maximum threads,
+ # wait a little bit and then start over
+ if len(self.threads) == self.maxThreads:
+ time.sleep(self.waitTime)
+ continue
+ # Let's wait for jobs to become available
+ while not self.hasJobsWaiting() and self.running:
+ time.sleep(self.waitTime)
+ # Okay, we have to do some work, so let's do that now. Since we
+ # are working with threads, we first have to claim a job and then
+ # execute it.
+ jobid = self.claimNextJob()
+ # If we got a job, let's work on it in a new thread.
+ if jobid is not None:
+ thread = threading.Thread(
+ target=self.processNext, args=(jobid,))
+ self.threads.append(thread)
+ thread.start()
+ # Give the thread some time to start up:
+ time.sleep(THREAD_STARTUP_WAIT)
Property changes on: lovely.remotetask/trunk/src/lovely/remotetask/processor.py
___________________________________________________________________
Name: svn:keywords
+ Id
Added: lovely.remotetask/trunk/src/lovely/remotetask/processor.txt
===================================================================
--- lovely.remotetask/trunk/src/lovely/remotetask/processor.txt (rev 0)
+++ lovely.remotetask/trunk/src/lovely/remotetask/processor.txt 2007-12-03 00:18:13 UTC (rev 82086)
@@ -0,0 +1,263 @@
+==============
+Job Processors
+==============
+
+The actual processing of the jobs in a queue is handled by a spearate
+component, known as the job processor. This component usually runs in its own
+thread and provides its own main loop.
+
+ >>> from lovely.remotetask import processor
+
+The ``processor`` module provides several implementations of the processor
+API. Let's create the necessary components to test the processor:
+
+1. Create the task service and add it to the root site:
+
+ >>> from lovely import remotetask
+ >>> tasks = remotetask.TaskService()
+
+ >>> sm = root['tasks'] = tasks
+
+2. Register the service as a utility:
+
+ >>> from lovely.remotetask import interfaces
+ >>> sm = root.getSiteManager()
+ >>> sm.registerUtility(tasks, interfaces.ITaskService, name='tasks')
+
+3. Register a task that simply sleeps and writes a message:
+
+ >>> import logging
+ >>> log = logging.getLogger('lovely.remotetask')
+ >>> import time
+ >>> def sleep((sleepTime, id)):
+ ... time.sleep(sleepTime)
+ ... log.info('Job: %i' %id)
+
+ >>> import lovely.remotetask.task
+ >>> sleepTask = remotetask.task.SimpleTask(sleep)
+
+ >>> import zope.component
+ >>> zope.component.provideUtility(sleepTask, name='sleep')
+
+4. Setup a database:
+
+ >>> from ZODB.tests import util
+ >>> import transaction
+ >>> db = util.DB()
+
+ >>> from zope.app.publication.zopepublication import ZopePublication
+ >>> conn = db.open()
+ >>> conn.root()[ZopePublication.root_name] = root
+ >>> transaction.commit()
+
+
+The Simple Processor
+--------------------
+
+This processor executes one job at a time. It was designed for jobs that would
+take a long time and use up most of the processing power of a computer. It is
+also the default processor for the task service.
+
+Let's first register a few tasks:
+
+ >>> jobid = tasks.add(u'sleep', (0.04, 1))
+ >>> jobid = tasks.add(u'sleep', (0.1, 2))
+ >>> jobid = tasks.add(u'sleep', (0, 3))
+ >>> jobid = tasks.add(u'sleep', (0.08, 4))
+ >>> transaction.commit()
+
+Let's start by executing a job directly. The first argument to the simple
+processor constructor is the database and the second the traversal stack to
+the task service. All other arguments are optional:
+
+ >>> proc = processor.SimpleProcessor(
+ ... tasks._p_jar.db(), ['tasks'], waitTime=0.0)
+
+Let's now process the first job. We clear the log and we also have to end any
+existing interactions in order to process the job in this thread:
+
+ >>> log_info.clear()
+
+ >>> from zope.security import management
+ >>> management.endInteraction()
+ >>> proc.processNext()
+ True
+
+ >>> print log_info
+ lovely.remotetask INFO
+ Job: 1
+
+Let's now use the processor from within the task service. Since the processor
+constructors also accept additional arguments, they are specified as well:
+
+ >>> tasks.processorFactory
+ <class 'lovely.remotetask.processor.SimpleProcessor'>
+ >>> tasks.processorArguments
+ {'waitTime': 0.0}
+
+The wait time has been set to zero for testing purposes only. It is really set
+to 1 second by default. Let's now start processing tasks, wait a little bit
+for all the jobs to complete and then stop processing again:
+
+ >>> tasks.startProcessing()
+ >>> transaction.commit()
+
+ >>> time.sleep(0.5)
+
+ >>> tasks.stopProcessing()
+ >>> transaction.commit()
+
+The log shows that all jobs have been processed. But more importantly, they
+were all completed in the order they were defined.
+
+ >>> print log_info
+ lovely.remotetask INFO
+ Job: 1
+ lovely.remotetask INFO
+ Job: 2
+ lovely.remotetask INFO
+ Job: 3
+ lovely.remotetask INFO
+ Job: 4
+
+
+The Multi-thread Processor
+--------------------------
+
+The multi-threaded processor executes several jobs at once. It was designed
+for jobs that would take a long time but use very little processing power.
+
+Let's add a few new tasks to execute:
+
+ >>> jobid = tasks.add(u'sleep', (0.04, 1))
+ >>> jobid = tasks.add(u'sleep', (0.18, 2))
+ >>> jobid = tasks.add(u'sleep', (0, 3))
+ >>> jobid = tasks.add(u'sleep', (0.02, 4))
+ >>> transaction.commit()
+
+Before testing the processor in the task service, let's have a look at every
+method by itself. So we instantiate the processor:
+
+ >>> proc = processor.MultiProcessor(
+ ... tasks._p_jar.db(), ['tasks'], waitTime=0)
+
+The maximum amount of threads can be set as well:
+
+ >>> proc.maxThreads
+ 5
+
+All working threads can be reviewed at any time:
+
+ >>> proc.threads
+ []
+
+At any time you can ask the service whether any jobs are waiting to be
+executed:
+
+ >>> from zope.security import management
+ >>> management.endInteraction()
+
+ >>> proc.hasJobsWaiting()
+ True
+
+Once you know that jobs are available, you can claim a job:
+
+ >>> jobid = proc.claimNextJob()
+ >>> jobid
+ 5
+
+We need to claim a job before executing it, so that the database marks the job
+as claimed and no new thread picks up the job. Once we claimed a particular
+job, we can process it:
+
+ >>> log_info.clear()
+
+ >>> proc.processNext(jobid)
+ True
+
+ >>> print log_info
+ lovely.remotetask INFO
+ Job: 1
+
+Let's now have a look at using the processor in the task service. This
+primarily means setting the processor factory:
+
+ >>> management.newInteraction()
+
+ >>> tasks.processorFactory = processor.MultiProcessor
+ >>> transaction.commit()
+
+ >>> log_info.clear()
+
+Let's now process the remaining jobs:
+
+ >>> tasks.startProcessing()
+ >>> transaction.commit()
+
+ >>> time.sleep(0.5)
+
+ >>> tasks.stopProcessing()
+ >>> transaction.commit()
+
+As you can see, this time the jobs are not completed in order, because they
+all need different time to execute:
+
+ >>> print log_info
+ lovely.remotetask INFO
+ Job: 3
+ lovely.remotetask INFO
+ Job: 4
+ lovely.remotetask INFO
+ Job: 2
+
+Let's now set the thread limit to two and construct a new set of tasks that
+demonstrate that not more than two threads run at the same time:
+
+ >>> tasks.processorArguments = {'waitTime': 0.0, 'maxThreads': 2}
+
+ >>> jobid = tasks.add(u'sleep', (0.03, 1))
+ >>> jobid = tasks.add(u'sleep', (0.05, 2))
+ >>> jobid = tasks.add(u'sleep', (0.03, 3))
+ >>> jobid = tasks.add(u'sleep', (0.08, 4))
+ >>> transaction.commit()
+
+If all tasks are processed at once, job 3 should be done first, but since the
+job has to wait for an available thread, it will come in third. We can now run
+the jobs and see the result:
+
+ >>> log_info.clear()
+
+ >>> tasks.startProcessing()
+ >>> transaction.commit()
+
+ >>> time.sleep(0.5)
+
+ >>> tasks.stopProcessing()
+ >>> transaction.commit()
+
+ >>> print log_info
+ lovely.remotetask INFO
+ Job: 1
+ lovely.remotetask INFO
+ Job: 2
+ lovely.remotetask INFO
+ Job: 3
+ lovely.remotetask INFO
+ Job: 4
+
+Note: Sometimes (about 20% of the time in this test) the log will contain a
+conflict error. As long as the error is *not* produced by ``processNext()``,
+they are harmless, even though they are logged on the error level. To avoid
+conflict errors when calling the ``processNext()`` method, we wait a little
+bit to give the thread a chance to start up. This waiting time is defined by a
+module global:
+
+ >>> processor.THREAD_STARTUP_WAIT
+ 0.050000000000000003
+
+On my machine this number seems to work well. On slower machines you might
+want to raise that number.
+
+Let's give Python enough time to stop all the threads.
+
+ >>> time.sleep(0.05)
Property changes on: lovely.remotetask/trunk/src/lovely/remotetask/processor.txt
___________________________________________________________________
Name: svn:eol-style
+ native
Modified: lovely.remotetask/trunk/src/lovely/remotetask/service.py
===================================================================
--- lovely.remotetask/trunk/src/lovely/remotetask/service.py 2007-12-03 00:01:41 UTC (rev 82085)
+++ lovely.remotetask/trunk/src/lovely/remotetask/service.py 2007-12-03 00:18:13 UTC (rev 82086)
@@ -1,6 +1,6 @@
##############################################################################
#
-# Copyright (c) 2006 Lovely Systems and Contributors.
+# Copyright (c) 2006, 2007 Lovely Systems and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
@@ -24,8 +24,6 @@
import time
import zc.queue
import zope.interface
-import zope.publisher.base
-import zope.publisher.publish
from BTrees.IOBTree import IOBTree
from zope import component
from zope.app import zapi
@@ -33,17 +31,13 @@
from zope.app.container import contained
from zope.app.component.interfaces import ISite
from zope.app.publication.zopepublication import ZopePublication
-from zope.security.proxy import removeSecurityProxy
-from zope.traversing.api import traverse
from zope.component.interfaces import ComponentLookupError
-from lovely.remotetask import interfaces, job, task
+from lovely.remotetask import interfaces, job, task, processor
log = logging.getLogger('lovely.remotetask')
storage = threading.local()
-SLEEP_TIME = 1
-
class TaskService(contained.Contained, persistent.Persistent):
"""A persistent task service.
@@ -52,6 +46,8 @@
zope.interface.implements(interfaces.ITaskService)
taskInterface = interfaces.ITask
+ processorFactory = processor.SimpleProcessor
+ processorArguments = {'waitTime': 1.0}
_scheduledJobs = None
_scheduledQueue = None
@@ -150,15 +146,15 @@
self._scheduledJobs = IOBTree()
if self._scheduledQueue == None:
self._scheduledQueue = zc.queue.PersistentQueue()
- path = [parent.__name__ for parent in zapi.getParents(self)
- if parent.__name__]
- path.reverse()
- path.append(self.__name__)
- path.append('processNext')
-
- thread = threading.Thread(
- target=processor, args=(self._p_jar.db(), path),
- name=self._threadName())
+ # Create the path to the service within the DB.
+ servicePath = [parent.__name__ for parent in zapi.getParents(self)
+ if parent.__name__]
+ servicePath.reverse()
+ servicePath.append(self.__name__)
+ # Start the thread running the processor inside.
+ processor = self.processorFactory(
+ self._p_jar.db(), servicePath, **self.processorArguments)
+ thread = threading.Thread(target=processor, name=self._threadName())
thread.setDaemon(True)
thread.running = True
thread.start()
@@ -195,8 +191,38 @@
path.append(self.__name__)
return '.'.join(path)
- def processNext(self, now=None):
+ def hasJobsWaiting(self, now=None):
+ # If there is are any simple jobs in the queue, we have work to do.
+ if self._queue:
+ return True
+ # First, move new cron jobs from the scheduled queue into the cronjob
+ # list.
+ if now is None:
+ now = int(time.time())
+ while len(self._scheduledQueue) > 0:
+ job = self._scheduledQueue.pull()
+ if job.status is not interfaces.CANCELLED:
+ self._insertCronJob(job, now)
+ # Now get all jobs that should be done now or earlier; if there are
+ # any that do not have errors or are cancelled, then we have jobs to
+ # do.
+ for key in self._scheduledJobs.keys(max=now):
+ jobs = [job for job in self._scheduledJobs[key]
+ if job.status not in (interfaces.CANCELLED,
+ interfaces.ERROR)]
+ if jobs:
+ return True
+ return False
+
+ def claimNextJob(self, now=None):
job = self._pullJob(now)
+ return job and job.id or None
+
+ def processNext(self, now=None, jobid=None):
+ if jobid is None:
+ job = self._pullJob(now)
+ else:
+ job = self.jobs[jobid]
if job is None:
return False
try:
@@ -222,8 +248,8 @@
job.status = interfaces.ERROR
except Exception, error:
if storage.runCount <= 3:
- log.error(
- 'catched a generic exception, preventing thread from crashing')
+ log.error('Caught a generic exception, preventing thread '
+ 'from crashing')
log.exception(error)
raise
else:
@@ -244,7 +270,7 @@
# list
if now is None:
now = int(time.time())
- while len(self._scheduledQueue)>0:
+ while len(self._scheduledQueue) > 0:
job = self._scheduledQueue.pull()
if job.status is not interfaces.CANCELLED:
self._insertCronJob(job, now)
@@ -292,33 +318,6 @@
self._scheduledJobs[nextCallTime] = jobs + (job,)
-class ProcessorPublication(ZopePublication):
- """A custom publication to process the next job."""
-
- def traverseName(self, request, ob, name):
- return traverse(removeSecurityProxy(ob), name, None)
-
-
-def processor(db, path):
- """Job Processor
-
- Process the jobs that are waiting in the queue. This processor is meant to
- be run in a separate process; however, it simply goes back to the task
- service to actually do the processing.
- """
- path.reverse()
- while threading.currentThread().running:
- request = zope.publisher.base.BaseRequest(None, {})
- request.setPublication(ProcessorPublication(db))
- request.setTraversalStack(path)
- try:
- zope.publisher.publish.publish(request, False)
- if not request.response._result:
- time.sleep(SLEEP_TIME)
- except:
- # This thread should never crash, thus a blank except
- pass
-
def getAutostartServiceNames():
"""get a list of services to start"""
Modified: lovely.remotetask/trunk/src/lovely/remotetask/tests.py
===================================================================
--- lovely.remotetask/trunk/src/lovely/remotetask/tests.py 2007-12-03 00:01:41 UTC (rev 82085)
+++ lovely.remotetask/trunk/src/lovely/remotetask/tests.py 2007-12-03 00:18:13 UTC (rev 82086)
@@ -35,13 +35,15 @@
log_info = InstalledHandler('lovely.remotetask')
test.globs['log_info'] = log_info
- service.SLEEP_TIME = 0
+ test.origArgs = service.TaskService.processorArguments
+ service.TaskService.processorArguments = {'waitTime': 0.0}
def tearDown(test):
placefulTearDown()
log_info = test.globs['log_info']
+ log_info.clear()
log_info.uninstall()
- service.SLEEP_TIME = 1
+ service.TaskService.processorArguments = test.origArgs
def test_suite():
return unittest.TestSuite((
@@ -52,6 +54,12 @@
|doctest.ELLIPSIS
|INTERPRET_FOOTNOTES
),
+ DocFileSuite('processor.txt',
+ setUp=setUp,
+ tearDown=tearDown,
+ optionflags=doctest.NORMALIZE_WHITESPACE
+ |doctest.ELLIPSIS
+ ),
DocFileSuite('TESTING.txt',
setUp=placelesssetup.setUp,
tearDown=placelesssetup.tearDown,
More information about the Checkins
mailing list