[Checkins] SVN: Sandbox/gotcha/z3c.taskqueue/ make tests pass

Godefroid Chapelle gotcha at bubblenet.be
Wed Mar 31 08:21:07 EDT 2010


Log message for revision 110359:
  make tests pass
  
  remove ZOPE2 code
  

Changed:
  U   Sandbox/gotcha/z3c.taskqueue/buildout.cfg
  U   Sandbox/gotcha/z3c.taskqueue/setup.py
  U   Sandbox/gotcha/z3c.taskqueue/src/z3c/taskqueue/README.txt
  U   Sandbox/gotcha/z3c.taskqueue/src/z3c/taskqueue/interfaces.py
  U   Sandbox/gotcha/z3c.taskqueue/src/z3c/taskqueue/processor.py
  U   Sandbox/gotcha/z3c.taskqueue/src/z3c/taskqueue/processor.txt
  U   Sandbox/gotcha/z3c.taskqueue/src/z3c/taskqueue/service.py
  U   Sandbox/gotcha/z3c.taskqueue/src/z3c/taskqueue/tests.py

-=-
Modified: Sandbox/gotcha/z3c.taskqueue/buildout.cfg
===================================================================
--- Sandbox/gotcha/z3c.taskqueue/buildout.cfg	2010-03-31 08:50:35 UTC (rev 110358)
+++ Sandbox/gotcha/z3c.taskqueue/buildout.cfg	2010-03-31 12:21:07 UTC (rev 110359)
@@ -7,6 +7,7 @@
 recipe = zc.recipe.testrunner
 defaults = ['--tests-pattern', '^f?tests$']
 eggs = z3c.taskqueue
+       z3c.taskqueue [test]
 
 [py]
 recipe = zc.recipe.egg

Modified: Sandbox/gotcha/z3c.taskqueue/setup.py
===================================================================
--- Sandbox/gotcha/z3c.taskqueue/setup.py	2010-03-31 08:50:35 UTC (rev 110358)
+++ Sandbox/gotcha/z3c.taskqueue/setup.py	2010-03-31 12:21:07 UTC (rev 110359)
@@ -30,7 +30,10 @@
           'zope.configuration',
           'zope.container',
           'zc.queue',
+          'zope.app.publication',
       ],
+      extras_require=dict(test=['zope.app.testing',
+        ]),
       entry_points="""
       # -*- Entry points: -*-
       """,

Modified: Sandbox/gotcha/z3c.taskqueue/src/z3c/taskqueue/README.txt
===================================================================
--- Sandbox/gotcha/z3c.taskqueue/src/z3c/taskqueue/README.txt	2010-03-31 08:50:35 UTC (rev 110358)
+++ Sandbox/gotcha/z3c.taskqueue/src/z3c/taskqueue/README.txt	2010-03-31 12:21:07 UTC (rev 110359)
@@ -75,7 +75,7 @@
   >>> service.getStatus(jobid)
   'cancelled'
 
-Let's now read a job:
+Let's add another job:
 
   >>> jobid = service.add(u'echo', {'foo': 'bar'})
   >>> service.processNext()

Modified: Sandbox/gotcha/z3c.taskqueue/src/z3c/taskqueue/interfaces.py
===================================================================
--- Sandbox/gotcha/z3c.taskqueue/src/z3c/taskqueue/interfaces.py	2010-03-31 08:50:35 UTC (rev 110358)
+++ Sandbox/gotcha/z3c.taskqueue/src/z3c/taskqueue/interfaces.py	2010-03-31 12:21:07 UTC (rev 110359)
@@ -242,3 +242,30 @@
 
         now is a convenience parameter for testing.
         """
+
+
+class IProcessor(interface.Interface):
+    """Job Processor
+
+    Process the jobs that are waiting in the queue. A processor is meant to
+    be run in a separate thread. To complete a job, it simply calls back into
+    the task server. This works, since it does not use up any Web server
+    threads.
+
+    Processing a job can take a long time. However, we do not have to worry
+    about transaction conflicts, since no other request is touching the job
+    object.
+    """
+
+    running = schema.Bool(
+        title=u"Running Flag",
+        description=u"Tells whether the processor is currently running.",
+        readonly=True)
+
+    def __call__(db, servicePath):
+        """Run the processor.
+
+        The ``db`` is a ZODB instance that is used to call back into the task
+        service. The ``servicePath`` specifies how to traverse to the task
+        service itself.
+        """

Modified: Sandbox/gotcha/z3c.taskqueue/src/z3c/taskqueue/processor.py
===================================================================
--- Sandbox/gotcha/z3c.taskqueue/src/z3c/taskqueue/processor.py	2010-03-31 08:50:35 UTC (rev 110358)
+++ Sandbox/gotcha/z3c.taskqueue/src/z3c/taskqueue/processor.py	2010-03-31 12:21:07 UTC (rev 110359)
@@ -23,27 +23,19 @@
 import zope.interface
 import zope.publisher.base
 import zope.publisher.publish
+import types
 from zope.app.publication.zopepublication import ZopePublication
 from zope.security import management
 from zope.security.proxy import removeSecurityProxy
 from zope.traversing.api import traverse
 
-try:
-    from ZPublisher.HTTPRequest import HTTPRequest
-    from ZPublisher.HTTPResponse import HTTPResponse
-    import ZPublisher
-    from types import StringType
-    ZOPE2 = True
-except ImportError:
-    ZOPE2 = False
+from z3c.taskqueue import interfaces
 
-from lovely.remotetask import interfaces
-
 THREAD_STARTUP_WAIT = 0.05
 
 ERROR_MARKER = object()
 
-log = logging.getLogger('lovely.remotetask')
+log = logging.getLogger('z3c.taskqueue')
 
 
 class ProcessorPublication(ZopePublication):
@@ -102,62 +94,29 @@
             if not result:
                 time.sleep(self.waitTime)
 
-if not ZOPE2:
 
-    class SimpleProcessor(BaseSimpleProcessor):
+class SimpleProcessor(BaseSimpleProcessor):
 
-        def call(self, method, args=(), errorValue=ERROR_MARKER):
-            # Create the path to the method.
-            path = self.servicePath[:] + [method]
-            path.reverse()
-            # Produce a special processor event to be sent to the publisher.
-            request = ProcessorRequest(*args)
-            request.setPublication(ProcessorPublication(self.db))
-            request.setTraversalStack(path)
-            # Publish the request, making sure that *all* exceptions are
-            # handled. The processor should *never* crash.
-            try:
-                zope.publisher.publish.publish(request, False)
-                return request.response._result
-            except Exception, error:
-                # This thread should never crash, thus a blank except
-                log.error('Processor: ``%s()`` caused an error!' % method)
-                log.exception(error)
-                return errorValue is ERROR_MARKER and error or errorValue
+    def call(self, method, args=(), errorValue=ERROR_MARKER):
+        # Create the path to the method.
+        path = self.servicePath[:] + [method]
+        path.reverse()
+        # Produce a special processor event to be sent to the publisher.
+        request = ProcessorRequest(*args)
+        request.setPublication(ProcessorPublication(self.db))
+        request.setTraversalStack(path)
+        # Publish the request, making sure that *all* exceptions are
+        # handled. The processor should *never* crash.
+        try:
+            zope.publisher.publish.publish(request, False)
+            return request.response._result
+        except Exception, error:
+            # This thread should never crash, thus a blank except
+            log.error('Processor: ``%s()`` caused an error!' % method)
+            log.exception(error)
+            return errorValue is ERROR_MARKER and error or errorValue
 
-else:
 
-    class SimpleProcessor(BaseSimpleProcessor):
-        """ SimpleProcessor for Zope2 """
-
-        def call(self, method, args=(), errorValue=ERROR_MARKER):
-            path = self.servicePath[:] + [method]
-            response = HTTPResponse()
-            env = {'SERVER_NAME': 'dummy',
-                    'SERVER_PORT': '8080',
-                    'PATH_INFO': '/' + '/'.join(path)}
-            log.info(env['PATH_INFO'])
-            request = HTTPRequest(None, env, response)
-            conn = self.db.open()
-            root = conn.root()
-            request['PARENTS'] = [root[ZopePublication.root_name]]
-            try:
-                try:
-                    ZPublisher.Publish.publish(request, 'Zope2', [None])
-                except Exception, error:
-                    # This thread should never crash, thus a blank except
-                    log.error('Processor: ``%s()`` caused an error!' % method)
-                    log.exception(error)
-                    return errorValue is ERROR_MARKER and error or errorValue
-            finally:
-                request.close()
-                conn.close()
-                if not request.response.body:
-                    time.sleep(1)
-                else:
-                    return request.response.body
-
-
 class BaseMultiProcessor(SimpleProcessor):
     """Multi-threaded Job Processor
 
@@ -205,40 +164,35 @@
                 time.sleep(THREAD_STARTUP_WAIT)
 
 
-if not ZOPE2:
+class MultiProcessor(BaseMultiProcessor):
+    """Multi-threaded Job Processor
 
-    class MultiProcessor(BaseMultiProcessor):
-        pass
+    This processor can work on multiple jobs at the same time.
 
-else:
+    WARNING: This still does not work correctly in Zope2
+    """
+    zope.interface.implements(interfaces.IProcessor)
 
-    class MultiProcessor(BaseMultiProcessor):
-        """Multi-threaded Job Processor
+    def __init__(self, *args, **kwargs):
+        self.maxThreads = kwargs.pop('maxThreads', 5)
+        super(MultiProcessor, self).__init__(*args, **kwargs)
+        self.threads = []
 
-        This processor can work on multiple jobs at the same time.
+    def hasJobsWaiting(self):
+        value = self.call('hasJobsWaiting', errorValue=False)
+        if isinstance(value, types.StringType):
+            if value == 'True':
+                return True
+            else:
+                return False
+        return value
 
-        WARNING: This still does not work correctly in Zope2
-        """
-        zope.interface.implements(interfaces.IProcessor)
-
-        def __init__(self, *args, **kwargs):
-            self.maxThreads = kwargs.pop('maxThreads', 5)
-            super(MultiProcessor, self).__init__(*args, **kwargs)
-            self.threads = []
-
-        def hasJobsWaiting(self):
-            value = self.call('hasJobsWaiting', errorValue=False)
-            if isinstance(value, StringType):
-                if value == 'True':
-                    return True
-                else:
-                    return False
-            return value
-
-        def claimNextJob(self):
-            value = self.call('claimNextJob', errorValue=None)
-            try:
-                value = int(value)
-            except ValueError:
-                pass
-            return value
+    def claimNextJob(self):
+        value = self.call('claimNextJob', errorValue=None)
+        try:
+            value = int(value)
+        except TypeError:
+            log.debug(value)
+        except ValueError:
+            pass
+        return value

Modified: Sandbox/gotcha/z3c.taskqueue/src/z3c/taskqueue/processor.txt
===================================================================
--- Sandbox/gotcha/z3c.taskqueue/src/z3c/taskqueue/processor.txt	2010-03-31 08:50:35 UTC (rev 110358)
+++ Sandbox/gotcha/z3c.taskqueue/src/z3c/taskqueue/processor.txt	2010-03-31 12:21:07 UTC (rev 110359)
@@ -6,35 +6,35 @@
 component, known as the job processor. This component usually runs in its own
 thread and provides its own main loop.
 
-  >>> from lovely.remotetask import processor
+  >>> from z3c.taskqueue import processor
 
 The ``processor`` module provides several implementations of the processor
 API. Let's create the necessary components to test the processor:
 
 1. Create the task service and add it to the root site:
 
-  >>> from lovely import remotetask
-  >>> tasks = remotetask.TaskService()
+  >>> from z3c.taskqueue import TaskService
+  >>> tasks = TaskService()
 
   >>> sm = root['tasks'] = tasks
 
 2. Register the service as a utility:
 
-  >>> from lovely.remotetask import interfaces
+  >>> from z3c.taskqueue import interfaces
   >>> sm = root.getSiteManager()
   >>> sm.registerUtility(tasks, interfaces.ITaskService, name='tasks')
 
 3. Register a task that simply sleeps and writes a message:
 
   >>> import logging
-  >>> log = logging.getLogger('lovely.remotetask')
+  >>> log = logging.getLogger('z3c.taskqueue')
   >>> import time
   >>> def sleep((sleepTime, id)):
   ...     time.sleep(sleepTime)
   ...     log.info('Job: %i' %id)
 
-  >>> import lovely.remotetask.task
-  >>> sleepTask = remotetask.task.SimpleTask(sleep)
+  >>> from z3c.taskqueue import task
+  >>> sleepTask = task.SimpleTask(sleep)
 
   >>> import zope.component
   >>> zope.component.provideUtility(sleepTask, name='sleep')
@@ -84,14 +84,14 @@
   True
 
   >>> print log_info
-  lovely.remotetask INFO
+  z3c.taskqueue INFO
     Job: 1
 
 Let's now use the processor from within the task service. Since the processor
 constructors also accept additional arguments, they are specified as well:
 
   >>> tasks.processorFactory
-  <class 'lovely.remotetask.processor.SimpleProcessor'>
+  <class 'z3c.taskqueue.processor.SimpleProcessor'>
   >>> tasks.processorArguments
   {'waitTime': 0.0}
 
@@ -111,14 +111,18 @@
 were all completed in the order they were defined.
 
   >>> print log_info
-  lovely.remotetask INFO
+  z3c.taskqueue INFO
     Job: 1
-  lovely.remotetask INFO
+  z3c.taskqueue INFO
+    starting service remotetasks.tasks.tasks
+  z3c.taskqueue INFO
     Job: 2
-  lovely.remotetask INFO
+  z3c.taskqueue INFO
     Job: 3
-  lovely.remotetask INFO
+  z3c.taskqueue INFO
     Job: 4
+  z3c.taskqueue INFO
+    stopping service remotetasks.tasks.tasks
 
 Transactions in jobs
 --------------------
@@ -137,7 +141,7 @@
   ...     global counter
   ...     counter += 1
   ...     transaction.abort()
-  >>> countTask = remotetask.task.SimpleTask(count)
+  >>> countTask = task.SimpleTask(count)
   >>> zope.component.provideUtility(countTask, name='count')
 
   >>> jobid = tasks.add(u'count', ())
@@ -209,7 +213,7 @@
   True
 
   >>> print log_info
-  lovely.remotetask INFO
+  z3c.taskqueue INFO
     Job: 1
 
 Let's now have a look at using the processor in the task service. This
@@ -236,13 +240,18 @@
 all need different time to execute:
 
   >>> print log_info
-  lovely.remotetask INFO
+  z3c.taskqueue INFO
+    starting service remotetasks.tasks.tasks
+  z3c.taskqueue INFO
     Job: 3
-  lovely.remotetask INFO
+  z3c.taskqueue INFO
     Job: 4
-  lovely.remotetask INFO
+  z3c.taskqueue INFO
     Job: 2
+  z3c.taskqueue INFO
+    stopping service remotetasks.tasks.tasks
 
+
 Let's now set the thread limit to two and construct a new set of tasks that
 demonstrate that not more than two threads run at the same time:
 
@@ -269,14 +278,18 @@
   >>> transaction.commit()
 
   >>> print log_info
-  lovely.remotetask INFO
+  z3c.taskqueue INFO
+    starting service remotetasks.tasks.tasks
+  z3c.taskqueue INFO
     Job: 1
-  lovely.remotetask INFO
+  z3c.taskqueue INFO
     Job: 2
-  lovely.remotetask INFO
+  z3c.taskqueue INFO
     Job: 3
-  lovely.remotetask INFO
+  z3c.taskqueue INFO
     Job: 4
+  z3c.taskqueue INFO
+    stopping service remotetasks.tasks.tasks
 
 Note: Sometimes (about 20% of the time in this test) the log will contain a
 conflict error. As long as the error is *not* produced by ``processNext()``,

Modified: Sandbox/gotcha/z3c.taskqueue/src/z3c/taskqueue/service.py
===================================================================
--- Sandbox/gotcha/z3c.taskqueue/src/z3c/taskqueue/service.py	2010-03-31 08:50:35 UTC (rev 110358)
+++ Sandbox/gotcha/z3c.taskqueue/src/z3c/taskqueue/service.py	2010-03-31 12:21:07 UTC (rev 110359)
@@ -16,10 +16,11 @@
 """
 __docformat__ = 'restructuredtext'
 
-from z3c.taskqueue import interfaces, job, task
 from zope import component
 from zope.container import contained
 from zope.component.interfaces import ComponentLookupError
+from zope.traversing.api import getParents
+import threading
 import datetime
 import logging
 import persistent
@@ -28,14 +29,11 @@
 import zc.queue
 import zope.interface
 
-try:
-    from Products import Five
-    ZOPE2 = True
-    del Five
-    import sys
-except ImportError:
-    ZOPE2 = False
+from BTrees import family32
 
+from z3c.taskqueue import interfaces, job, task
+from z3c.taskqueue import processor
+
 log = logging.getLogger('z3c.taskqueue')
 
 
@@ -48,10 +46,10 @@
 
     taskInterface = interfaces.ITask
 
-    _scheduledJobs = None
-    _scheduledQueue = None
     _v_nextid = None
     containerClass = None
+    processorFactory = processor.SimpleProcessor
+    processorArguments = {'waitTime': 1.0}
 
     def __init__(self):
         super(BaseTaskService, self).__init__()
@@ -178,7 +176,6 @@
         """
         process next job in the queue
         """
-        log.debug('processNext')
         if jobid is None:
             job = self._pullJob(now)
         else:
@@ -279,17 +276,68 @@
                 return uid
             self._v_nextid = None
 
-if not ZOPE2:
+    def startProcessing(self):
+        """See interfaces.ITaskService"""
+        if self.__parent__ is None:
+            return
+        if self._scheduledJobs is None:
+            self._scheduledJobs = self.containerClass()
+        if self._scheduledQueue is None:
+            self._scheduledQueue = zc.queue.PersistentQueue()
+        # Create the path to the service within the DB.
+        servicePath = self.getServicePath()
+        log.info('starting service %s' % self._threadName())
+        # Start the thread running the processor inside.
+        processor = self.processorFactory(
+            self._p_jar.db(), servicePath, **self.processorArguments)
+        thread = threading.Thread(target=processor, name=self._threadName())
+        thread.setDaemon(True)
+        thread.running = True
+        thread.start()
 
-    class TaskService(BaseTaskService):
-        from BTrees import family32
-        containerClass = family32.IO.BTree
-        maxint = family32.maxint
+    def stopProcessing(self):
+        """See interfaces.ITaskService"""
+        if self.__name__ is None:
+            return
+        name = self._threadName()
+        log.info('stopping service %s' % name)
+        for thread in threading.enumerate():
+            if thread.getName() == name:
+                thread.running = False
+                break
 
-else:
-    from OFS.SimpleItem import SimpleItem
+    def isProcessing(self):
+        """See interfaces.ITaskService"""
+        if self.__name__ is not None:
+            name = self._threadName()
+            for thread in threading.enumerate():
+                if thread.getName() == name:
+                    if thread.running:
+                        return True
+                    break
+        return False
 
-    class TaskService(BaseTaskService, SimpleItem):
-        from BTrees.IOBTree import IOBTree
-        containerClass = IOBTree
-        maxint = sys.maxint
+    def getServicePath(self):
+        raise NotImplemented
+
+    def _threadName(self):
+        """Return name of the processing thread."""
+        # This name isn't unique based on the path to self, but this doesn't
+        # change the name that's been used in past versions.
+        path = self.getServicePath()
+        path.append('remotetasks')
+        path.reverse()
+        path.append(self.__name__)
+        return '.'.join(path)
+
+
+class TaskService(BaseTaskService):
+    containerClass = family32.IO.BTree
+    maxint = family32.maxint
+
+    def getServicePath(self):
+        path = [parent.__name__ for parent in getParents(self)
+                       if parent.__name__]
+        path.reverse()
+        path.append(self.__name__)
+        return path

Modified: Sandbox/gotcha/z3c.taskqueue/src/z3c/taskqueue/tests.py
===================================================================
--- Sandbox/gotcha/z3c.taskqueue/src/z3c/taskqueue/tests.py	2010-03-31 08:50:35 UTC (rev 110358)
+++ Sandbox/gotcha/z3c.taskqueue/src/z3c/taskqueue/tests.py	2010-03-31 12:21:07 UTC (rev 110359)
@@ -19,18 +19,28 @@
 from z3c.taskqueue import service
 from zope.testing.doctest import INTERPRET_FOOTNOTES
 from zope.testing.doctestunit import DocFileSuite
+from zope.testing.loggingsupport import InstalledHandler
+from zope.app.testing.setup import (placefulSetUp, placefulTearDown)
 import doctest
 import random
 import unittest
 
 
 def setUp(test):
+    root = placefulSetUp(site=True)
+    test.globs['root'] = root
+    log_info = InstalledHandler('z3c.taskqueue')
+    test.globs['log_info'] = log_info
+    test.origArgs = service.TaskService.processorArguments
+    service.TaskService.processorArguments = {'waitTime': 0.0}
     # Make tests predictable
     random.seed(27)
 
 
 def tearDown(test):
+    placefulTearDown()
     random.seed()
+    service.TaskService.processorArguments = test.origArgs
 
 
 class TestIdGenerator(unittest.TestCase):
@@ -62,6 +72,7 @@
         unittest.makeSuite(TestIdGenerator),
         DocFileSuite('README.txt',
                      'startlater.txt',
+                     'processor.txt',
                      setUp=setUp,
                      tearDown=tearDown,
                      optionflags=doctest.NORMALIZE_WHITESPACE



More information about the checkins mailing list