[Checkins] SVN: Sandbox/gotcha/z3c.taskqueue/ make tests pass
Godefroid Chapelle
gotcha at bubblenet.be
Wed Mar 31 08:21:07 EDT 2010
Log message for revision 110359:
make tests pass
remove ZOPE2 code
Changed:
U Sandbox/gotcha/z3c.taskqueue/buildout.cfg
U Sandbox/gotcha/z3c.taskqueue/setup.py
U Sandbox/gotcha/z3c.taskqueue/src/z3c/taskqueue/README.txt
U Sandbox/gotcha/z3c.taskqueue/src/z3c/taskqueue/interfaces.py
U Sandbox/gotcha/z3c.taskqueue/src/z3c/taskqueue/processor.py
U Sandbox/gotcha/z3c.taskqueue/src/z3c/taskqueue/processor.txt
U Sandbox/gotcha/z3c.taskqueue/src/z3c/taskqueue/service.py
U Sandbox/gotcha/z3c.taskqueue/src/z3c/taskqueue/tests.py
-=-
Modified: Sandbox/gotcha/z3c.taskqueue/buildout.cfg
===================================================================
--- Sandbox/gotcha/z3c.taskqueue/buildout.cfg 2010-03-31 08:50:35 UTC (rev 110358)
+++ Sandbox/gotcha/z3c.taskqueue/buildout.cfg 2010-03-31 12:21:07 UTC (rev 110359)
@@ -7,6 +7,7 @@
recipe = zc.recipe.testrunner
defaults = ['--tests-pattern', '^f?tests$']
eggs = z3c.taskqueue
+ z3c.taskqueue [test]
[py]
recipe = zc.recipe.egg
Modified: Sandbox/gotcha/z3c.taskqueue/setup.py
===================================================================
--- Sandbox/gotcha/z3c.taskqueue/setup.py 2010-03-31 08:50:35 UTC (rev 110358)
+++ Sandbox/gotcha/z3c.taskqueue/setup.py 2010-03-31 12:21:07 UTC (rev 110359)
@@ -30,7 +30,10 @@
'zope.configuration',
'zope.container',
'zc.queue',
+ 'zope.app.publication',
],
+ extras_require=dict(test=['zope.app.testing',
+ ]),
entry_points="""
# -*- Entry points: -*-
""",
Modified: Sandbox/gotcha/z3c.taskqueue/src/z3c/taskqueue/README.txt
===================================================================
--- Sandbox/gotcha/z3c.taskqueue/src/z3c/taskqueue/README.txt 2010-03-31 08:50:35 UTC (rev 110358)
+++ Sandbox/gotcha/z3c.taskqueue/src/z3c/taskqueue/README.txt 2010-03-31 12:21:07 UTC (rev 110359)
@@ -75,7 +75,7 @@
>>> service.getStatus(jobid)
'cancelled'
-Let's now read a job:
+Let's add another job:
>>> jobid = service.add(u'echo', {'foo': 'bar'})
>>> service.processNext()
Modified: Sandbox/gotcha/z3c.taskqueue/src/z3c/taskqueue/interfaces.py
===================================================================
--- Sandbox/gotcha/z3c.taskqueue/src/z3c/taskqueue/interfaces.py 2010-03-31 08:50:35 UTC (rev 110358)
+++ Sandbox/gotcha/z3c.taskqueue/src/z3c/taskqueue/interfaces.py 2010-03-31 12:21:07 UTC (rev 110359)
@@ -242,3 +242,30 @@
now is a convenience parameter for testing.
"""
+
+
+class IProcessor(interface.Interface):
+ """Job Processor
+
+ Process the jobs that are waiting in the queue. A processor is meant to
+ be run in a separate thread. To complete a job, it simply calls back into
+ the task server. This works, since it does not use up any Web server
+ threads.
+
+ Processing a job can take a long time. However, we do not have to worry
+ about transaction conflicts, since no other request is touching the job
+ object.
+ """
+
+ running = schema.Bool(
+ title=u"Running Flag",
+ description=u"Tells whether the processor is currently running.",
+ readonly=True)
+
+ def __call__(db, servicePath):
+ """Run the processor.
+
+ The ``db`` is a ZODB instance that is used to call back into the task
+ service. The ``servicePath`` specifies how to traverse to the task
+ service itself.
+ """
Modified: Sandbox/gotcha/z3c.taskqueue/src/z3c/taskqueue/processor.py
===================================================================
--- Sandbox/gotcha/z3c.taskqueue/src/z3c/taskqueue/processor.py 2010-03-31 08:50:35 UTC (rev 110358)
+++ Sandbox/gotcha/z3c.taskqueue/src/z3c/taskqueue/processor.py 2010-03-31 12:21:07 UTC (rev 110359)
@@ -23,27 +23,19 @@
import zope.interface
import zope.publisher.base
import zope.publisher.publish
+import types
from zope.app.publication.zopepublication import ZopePublication
from zope.security import management
from zope.security.proxy import removeSecurityProxy
from zope.traversing.api import traverse
-try:
- from ZPublisher.HTTPRequest import HTTPRequest
- from ZPublisher.HTTPResponse import HTTPResponse
- import ZPublisher
- from types import StringType
- ZOPE2 = True
-except ImportError:
- ZOPE2 = False
+from z3c.taskqueue import interfaces
-from lovely.remotetask import interfaces
-
THREAD_STARTUP_WAIT = 0.05
ERROR_MARKER = object()
-log = logging.getLogger('lovely.remotetask')
+log = logging.getLogger('z3c.taskqueue')
class ProcessorPublication(ZopePublication):
@@ -102,62 +94,29 @@
if not result:
time.sleep(self.waitTime)
-if not ZOPE2:
- class SimpleProcessor(BaseSimpleProcessor):
+class SimpleProcessor(BaseSimpleProcessor):
- def call(self, method, args=(), errorValue=ERROR_MARKER):
- # Create the path to the method.
- path = self.servicePath[:] + [method]
- path.reverse()
- # Produce a special processor event to be sent to the publisher.
- request = ProcessorRequest(*args)
- request.setPublication(ProcessorPublication(self.db))
- request.setTraversalStack(path)
- # Publish the request, making sure that *all* exceptions are
- # handled. The processor should *never* crash.
- try:
- zope.publisher.publish.publish(request, False)
- return request.response._result
- except Exception, error:
- # This thread should never crash, thus a blank except
- log.error('Processor: ``%s()`` caused an error!' % method)
- log.exception(error)
- return errorValue is ERROR_MARKER and error or errorValue
+ def call(self, method, args=(), errorValue=ERROR_MARKER):
+ # Create the path to the method.
+ path = self.servicePath[:] + [method]
+ path.reverse()
+ # Produce a special processor event to be sent to the publisher.
+ request = ProcessorRequest(*args)
+ request.setPublication(ProcessorPublication(self.db))
+ request.setTraversalStack(path)
+ # Publish the request, making sure that *all* exceptions are
+ # handled. The processor should *never* crash.
+ try:
+ zope.publisher.publish.publish(request, False)
+ return request.response._result
+ except Exception, error:
+ # This thread should never crash, thus a blank except
+ log.error('Processor: ``%s()`` caused an error!' % method)
+ log.exception(error)
+ return errorValue is ERROR_MARKER and error or errorValue
-else:
- class SimpleProcessor(BaseSimpleProcessor):
- """ SimpleProcessor for Zope2 """
-
- def call(self, method, args=(), errorValue=ERROR_MARKER):
- path = self.servicePath[:] + [method]
- response = HTTPResponse()
- env = {'SERVER_NAME': 'dummy',
- 'SERVER_PORT': '8080',
- 'PATH_INFO': '/' + '/'.join(path)}
- log.info(env['PATH_INFO'])
- request = HTTPRequest(None, env, response)
- conn = self.db.open()
- root = conn.root()
- request['PARENTS'] = [root[ZopePublication.root_name]]
- try:
- try:
- ZPublisher.Publish.publish(request, 'Zope2', [None])
- except Exception, error:
- # This thread should never crash, thus a blank except
- log.error('Processor: ``%s()`` caused an error!' % method)
- log.exception(error)
- return errorValue is ERROR_MARKER and error or errorValue
- finally:
- request.close()
- conn.close()
- if not request.response.body:
- time.sleep(1)
- else:
- return request.response.body
-
-
class BaseMultiProcessor(SimpleProcessor):
"""Multi-threaded Job Processor
@@ -205,40 +164,35 @@
time.sleep(THREAD_STARTUP_WAIT)
-if not ZOPE2:
+class MultiProcessor(BaseMultiProcessor):
+ """Multi-threaded Job Processor
- class MultiProcessor(BaseMultiProcessor):
- pass
+ This processor can work on multiple jobs at the same time.
-else:
+ WARNING: This still does not work correctly in Zope2
+ """
+ zope.interface.implements(interfaces.IProcessor)
- class MultiProcessor(BaseMultiProcessor):
- """Multi-threaded Job Processor
+ def __init__(self, *args, **kwargs):
+ self.maxThreads = kwargs.pop('maxThreads', 5)
+ super(MultiProcessor, self).__init__(*args, **kwargs)
+ self.threads = []
- This processor can work on multiple jobs at the same time.
+ def hasJobsWaiting(self):
+ value = self.call('hasJobsWaiting', errorValue=False)
+ if isinstance(value, types.StringType):
+ if value == 'True':
+ return True
+ else:
+ return False
+ return value
- WARNING: This still does not work correctly in Zope2
- """
- zope.interface.implements(interfaces.IProcessor)
-
- def __init__(self, *args, **kwargs):
- self.maxThreads = kwargs.pop('maxThreads', 5)
- super(MultiProcessor, self).__init__(*args, **kwargs)
- self.threads = []
-
- def hasJobsWaiting(self):
- value = self.call('hasJobsWaiting', errorValue=False)
- if isinstance(value, StringType):
- if value == 'True':
- return True
- else:
- return False
- return value
-
- def claimNextJob(self):
- value = self.call('claimNextJob', errorValue=None)
- try:
- value = int(value)
- except ValueError:
- pass
- return value
+ def claimNextJob(self):
+ value = self.call('claimNextJob', errorValue=None)
+ try:
+ value = int(value)
+ except TypeError:
+ log.debug(value)
+ except ValueError:
+ pass
+ return value
Modified: Sandbox/gotcha/z3c.taskqueue/src/z3c/taskqueue/processor.txt
===================================================================
--- Sandbox/gotcha/z3c.taskqueue/src/z3c/taskqueue/processor.txt 2010-03-31 08:50:35 UTC (rev 110358)
+++ Sandbox/gotcha/z3c.taskqueue/src/z3c/taskqueue/processor.txt 2010-03-31 12:21:07 UTC (rev 110359)
@@ -6,35 +6,35 @@
component, known as the job processor. This component usually runs in its own
thread and provides its own main loop.
- >>> from lovely.remotetask import processor
+ >>> from z3c.taskqueue import processor
The ``processor`` module provides several implementations of the processor
API. Let's create the necessary components to test the processor:
1. Create the task service and add it to the root site:
- >>> from lovely import remotetask
- >>> tasks = remotetask.TaskService()
+ >>> from z3c.taskqueue import TaskService
+ >>> tasks = TaskService()
>>> sm = root['tasks'] = tasks
2. Register the service as a utility:
- >>> from lovely.remotetask import interfaces
+ >>> from z3c.taskqueue import interfaces
>>> sm = root.getSiteManager()
>>> sm.registerUtility(tasks, interfaces.ITaskService, name='tasks')
3. Register a task that simply sleeps and writes a message:
>>> import logging
- >>> log = logging.getLogger('lovely.remotetask')
+ >>> log = logging.getLogger('z3c.taskqueue')
>>> import time
>>> def sleep((sleepTime, id)):
... time.sleep(sleepTime)
... log.info('Job: %i' %id)
- >>> import lovely.remotetask.task
- >>> sleepTask = remotetask.task.SimpleTask(sleep)
+ >>> from z3c.taskqueue import task
+ >>> sleepTask = task.SimpleTask(sleep)
>>> import zope.component
>>> zope.component.provideUtility(sleepTask, name='sleep')
@@ -84,14 +84,14 @@
True
>>> print log_info
- lovely.remotetask INFO
+ z3c.taskqueue INFO
Job: 1
Let's now use the processor from within the task service. Since the processor
constructors also accept additional arguments, they are specified as well:
>>> tasks.processorFactory
- <class 'lovely.remotetask.processor.SimpleProcessor'>
+ <class 'z3c.taskqueue.processor.SimpleProcessor'>
>>> tasks.processorArguments
{'waitTime': 0.0}
@@ -111,14 +111,18 @@
were all completed in the order they were defined.
>>> print log_info
- lovely.remotetask INFO
+ z3c.taskqueue INFO
Job: 1
- lovely.remotetask INFO
+ z3c.taskqueue INFO
+ starting service remotetasks.tasks.tasks
+ z3c.taskqueue INFO
Job: 2
- lovely.remotetask INFO
+ z3c.taskqueue INFO
Job: 3
- lovely.remotetask INFO
+ z3c.taskqueue INFO
Job: 4
+ z3c.taskqueue INFO
+ stopping service remotetasks.tasks.tasks
Transactions in jobs
--------------------
@@ -137,7 +141,7 @@
... global counter
... counter += 1
... transaction.abort()
- >>> countTask = remotetask.task.SimpleTask(count)
+ >>> countTask = task.SimpleTask(count)
>>> zope.component.provideUtility(countTask, name='count')
>>> jobid = tasks.add(u'count', ())
@@ -209,7 +213,7 @@
True
>>> print log_info
- lovely.remotetask INFO
+ z3c.taskqueue INFO
Job: 1
Let's now have a look at using the processor in the task service. This
@@ -236,13 +240,18 @@
all need different time to execute:
>>> print log_info
- lovely.remotetask INFO
+ z3c.taskqueue INFO
+ starting service remotetasks.tasks.tasks
+ z3c.taskqueue INFO
Job: 3
- lovely.remotetask INFO
+ z3c.taskqueue INFO
Job: 4
- lovely.remotetask INFO
+ z3c.taskqueue INFO
Job: 2
+ z3c.taskqueue INFO
+ stopping service remotetasks.tasks.tasks
+
Let's now set the thread limit to two and construct a new set of tasks that
demonstrate that not more than two threads run at the same time:
@@ -269,14 +278,18 @@
>>> transaction.commit()
>>> print log_info
- lovely.remotetask INFO
+ z3c.taskqueue INFO
+ starting service remotetasks.tasks.tasks
+ z3c.taskqueue INFO
Job: 1
- lovely.remotetask INFO
+ z3c.taskqueue INFO
Job: 2
- lovely.remotetask INFO
+ z3c.taskqueue INFO
Job: 3
- lovely.remotetask INFO
+ z3c.taskqueue INFO
Job: 4
+ z3c.taskqueue INFO
+ stopping service remotetasks.tasks.tasks
Note: Sometimes (about 20% of the time in this test) the log will contain a
conflict error. As long as the error is *not* produced by ``processNext()``,
Modified: Sandbox/gotcha/z3c.taskqueue/src/z3c/taskqueue/service.py
===================================================================
--- Sandbox/gotcha/z3c.taskqueue/src/z3c/taskqueue/service.py 2010-03-31 08:50:35 UTC (rev 110358)
+++ Sandbox/gotcha/z3c.taskqueue/src/z3c/taskqueue/service.py 2010-03-31 12:21:07 UTC (rev 110359)
@@ -16,10 +16,11 @@
"""
__docformat__ = 'restructuredtext'
-from z3c.taskqueue import interfaces, job, task
from zope import component
from zope.container import contained
from zope.component.interfaces import ComponentLookupError
+from zope.traversing.api import getParents
+import threading
import datetime
import logging
import persistent
@@ -28,14 +29,11 @@
import zc.queue
import zope.interface
-try:
- from Products import Five
- ZOPE2 = True
- del Five
- import sys
-except ImportError:
- ZOPE2 = False
+from BTrees import family32
+from z3c.taskqueue import interfaces, job, task
+from z3c.taskqueue import processor
+
log = logging.getLogger('z3c.taskqueue')
@@ -48,10 +46,10 @@
taskInterface = interfaces.ITask
- _scheduledJobs = None
- _scheduledQueue = None
_v_nextid = None
containerClass = None
+ processorFactory = processor.SimpleProcessor
+ processorArguments = {'waitTime': 1.0}
def __init__(self):
super(BaseTaskService, self).__init__()
@@ -178,7 +176,6 @@
"""
process next job in the queue
"""
- log.debug('processNext')
if jobid is None:
job = self._pullJob(now)
else:
@@ -279,17 +276,68 @@
return uid
self._v_nextid = None
-if not ZOPE2:
+ def startProcessing(self):
+ """See interfaces.ITaskService"""
+ if self.__parent__ is None:
+ return
+ if self._scheduledJobs is None:
+ self._scheduledJobs = self.containerClass()
+ if self._scheduledQueue is None:
+ self._scheduledQueue = zc.queue.PersistentQueue()
+ # Create the path to the service within the DB.
+ servicePath = self.getServicePath()
+ log.info('starting service %s' % self._threadName())
+ # Start the thread running the processor inside.
+ processor = self.processorFactory(
+ self._p_jar.db(), servicePath, **self.processorArguments)
+ thread = threading.Thread(target=processor, name=self._threadName())
+ thread.setDaemon(True)
+ thread.running = True
+ thread.start()
- class TaskService(BaseTaskService):
- from BTrees import family32
- containerClass = family32.IO.BTree
- maxint = family32.maxint
+ def stopProcessing(self):
+ """See interfaces.ITaskService"""
+ if self.__name__ is None:
+ return
+ name = self._threadName()
+ log.info('stopping service %s' % name)
+ for thread in threading.enumerate():
+ if thread.getName() == name:
+ thread.running = False
+ break
-else:
- from OFS.SimpleItem import SimpleItem
+ def isProcessing(self):
+ """See interfaces.ITaskService"""
+ if self.__name__ is not None:
+ name = self._threadName()
+ for thread in threading.enumerate():
+ if thread.getName() == name:
+ if thread.running:
+ return True
+ break
+ return False
- class TaskService(BaseTaskService, SimpleItem):
- from BTrees.IOBTree import IOBTree
- containerClass = IOBTree
- maxint = sys.maxint
+ def getServicePath(self):
+ raise NotImplemented
+
+ def _threadName(self):
+ """Return name of the processing thread."""
+ # This name isn't unique based on the path to self, but this doesn't
+ # change the name that's been used in past versions.
+ path = self.getServicePath()
+ path.append('remotetasks')
+ path.reverse()
+ path.append(self.__name__)
+ return '.'.join(path)
+
+
+class TaskService(BaseTaskService):
+ containerClass = family32.IO.BTree
+ maxint = family32.maxint
+
+ def getServicePath(self):
+ path = [parent.__name__ for parent in getParents(self)
+ if parent.__name__]
+ path.reverse()
+ path.append(self.__name__)
+ return path
Modified: Sandbox/gotcha/z3c.taskqueue/src/z3c/taskqueue/tests.py
===================================================================
--- Sandbox/gotcha/z3c.taskqueue/src/z3c/taskqueue/tests.py 2010-03-31 08:50:35 UTC (rev 110358)
+++ Sandbox/gotcha/z3c.taskqueue/src/z3c/taskqueue/tests.py 2010-03-31 12:21:07 UTC (rev 110359)
@@ -19,18 +19,28 @@
from z3c.taskqueue import service
from zope.testing.doctest import INTERPRET_FOOTNOTES
from zope.testing.doctestunit import DocFileSuite
+from zope.testing.loggingsupport import InstalledHandler
+from zope.app.testing.setup import (placefulSetUp, placefulTearDown)
import doctest
import random
import unittest
def setUp(test):
+ root = placefulSetUp(site=True)
+ test.globs['root'] = root
+ log_info = InstalledHandler('z3c.taskqueue')
+ test.globs['log_info'] = log_info
+ test.origArgs = service.TaskService.processorArguments
+ service.TaskService.processorArguments = {'waitTime': 0.0}
# Make tests predictable
random.seed(27)
def tearDown(test):
+ placefulTearDown()
random.seed()
+ service.TaskService.processorArguments = test.origArgs
class TestIdGenerator(unittest.TestCase):
@@ -62,6 +72,7 @@
unittest.makeSuite(TestIdGenerator),
DocFileSuite('README.txt',
'startlater.txt',
+ 'processor.txt',
setUp=setUp,
tearDown=tearDown,
optionflags=doctest.NORMALIZE_WHITESPACE
More information about the checkins
mailing list