[Checkins] SVN: Sandbox/gotcha/z3c.taskqueue/src/z3c/taskqueue/ remove processors, they are z3 specific

Godefroid Chapelle gotcha at bubblenet.be
Mon Mar 8 05:25:09 EST 2010


Log message for revision 109801:
  remove processors, they are z3 specific

Changed:
  D   Sandbox/gotcha/z3c.taskqueue/src/z3c/taskqueue/processor.py
  D   Sandbox/gotcha/z3c.taskqueue/src/z3c/taskqueue/processor.txt
  U   Sandbox/gotcha/z3c.taskqueue/src/z3c/taskqueue/tests.py

-=-
Deleted: Sandbox/gotcha/z3c.taskqueue/src/z3c/taskqueue/processor.py
===================================================================
--- Sandbox/gotcha/z3c.taskqueue/src/z3c/taskqueue/processor.py	2010-03-08 10:23:30 UTC (rev 109800)
+++ Sandbox/gotcha/z3c.taskqueue/src/z3c/taskqueue/processor.py	2010-03-08 10:25:09 UTC (rev 109801)
@@ -1,156 +0,0 @@
-##############################################################################
-#
-# Copyright (c) 2006, 2007 Lovely Systems and Contributors.
-# All Rights Reserved.
-#
-# This software is subject to the provisions of the Zope Public License,
-# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
-# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
-# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
-# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
-# FOR A PARTICULAR PURPOSE.
-#
-##############################################################################
-"""Processor Implementations
-
-$Id$
-"""
-__docformat__ = 'restructuredtext'
-import logging
-import threading
-import time
-import transaction
-import zope.interface
-import zope.publisher.base
-import zope.publisher.publish
-from zope.app.publication.zopepublication import ZopePublication
-from zope.security import management
-from zope.security.proxy import removeSecurityProxy
-from zope.traversing.api import traverse
-
-from lovely.remotetask import interfaces
-
-THREAD_STARTUP_WAIT = 0.05
-
-ERROR_MARKER = object()
-
-log = logging.getLogger('lovely.remotetask')
-
-
-class ProcessorPublication(ZopePublication):
-    """A custom publication to process the next job."""
-
-    def beforeTraversal(self, request):
-        # Overwrite this method, so that the publication does not attempt to
-        # authenticate; we assume that the processor is allowed to do
-        # everything. Note that the DB is not exposed to the job callable.
-        management.newInteraction(request)
-        transaction.begin()
-
-    def traverseName(self, request, ob, name):
-        # Provide a very simple traversal mechanism.
-        return traverse(removeSecurityProxy(ob), name, None)
-
-class ProcessorRequest(zope.publisher.base.BaseRequest):
-    """A custome publisher request for the processor."""
-
-    def __init__(self, *args):
-        super(ProcessorRequest, self).__init__(None, {}, positional=args)
-
-
-class SimpleProcessor(object):
-    """Simple Job Processor
-
-    This processor only processes one job at a time.
-    """
-    zope.interface.implements(interfaces.IProcessor)
-
-    @property
-    def running(self):
-        thread = threading.currentThread()
-        if thread is not None:
-            return thread.running
-        log.error('SimpleProcessor: no currentThread')
-        return False
-
-    def __init__(self, db, servicePath, waitTime=1.0):
-        self.db = db
-        self.servicePath = servicePath
-        self.waitTime = waitTime
-
-    def call(self, method, args=(), errorValue=ERROR_MARKER):
-        # Create the path to the method.
-        path = self.servicePath[:] + [method]
-        path.reverse()
-        # Produce a special processor event to be sent to the publisher.
-        request = ProcessorRequest(*args)
-        request.setPublication(ProcessorPublication(self.db))
-        request.setTraversalStack(path)
-        # Publish the request, making sure that *all* exceptions are
-        # handled. The processor should *never* crash.
-        try:
-            zope.publisher.publish.publish(request, False)
-            return request.response._result
-        except Exception, error:
-            # This thread should never crash, thus a blank except
-            log.error('Processor: ``%s()`` caused an error!' %method)
-            log.exception(error)
-            return errorValue is ERROR_MARKER and error or errorValue
-
-    def processNext(self, jobid=None):
-        return self.call('processNext', args=(None, jobid))
-
-    def __call__(self):
-        while self.running:
-            result = self.processNext()
-            # If there are no jobs available, sleep a little bit and then
-            # check again.
-            if not result:
-                time.sleep(self.waitTime)
-
-
-class MultiProcessor(SimpleProcessor):
-    """Multi-threaded Job Processor
-
-    This processor can work on multiple jobs at the same time.
-    """
-    zope.interface.implements(interfaces.IProcessor)
-
-    def __init__(self, *args, **kwargs):
-        self.maxThreads = kwargs.pop('maxThreads', 5)
-        super(MultiProcessor, self).__init__(*args, **kwargs)
-        self.threads = []
-
-    def hasJobsWaiting(self):
-        return self.call('hasJobsWaiting', errorValue=False)
-
-    def claimNextJob(self):
-        return self.call('claimNextJob', errorValue=None)
-
-    def __call__(self):
-        # Start the processing loop
-        while self.running:
-            # Remove all dead threads
-            for thread in self.threads:
-                if not thread.isAlive():
-                    self.threads.remove(thread)
-            # If the number of threads equals the number of maximum threads,
-            # wait a little bit and then start over
-            if len(self.threads) == self.maxThreads:
-                time.sleep(self.waitTime)
-                continue
-            # Let's wait for jobs to become available
-            while not self.hasJobsWaiting() and self.running:
-                time.sleep(self.waitTime)
-            # Okay, we have to do some work, so let's do that now. Since we
-            # are working with threads, we first have to claim a job and then
-            # execute it.
-            jobid = self.claimNextJob()
-            # If we got a job, let's work on it in a new thread.
-            if jobid is not None:
-                thread = threading.Thread(
-                    target=self.processNext, args=(jobid,))
-                self.threads.append(thread)
-                thread.start()
-                # Give the thread some time to start up:
-                time.sleep(THREAD_STARTUP_WAIT)

Deleted: Sandbox/gotcha/z3c.taskqueue/src/z3c/taskqueue/processor.txt
===================================================================
--- Sandbox/gotcha/z3c.taskqueue/src/z3c/taskqueue/processor.txt	2010-03-08 10:23:30 UTC (rev 109800)
+++ Sandbox/gotcha/z3c.taskqueue/src/z3c/taskqueue/processor.txt	2010-03-08 10:25:09 UTC (rev 109801)
@@ -1,296 +0,0 @@
-==============
-Job Processors
-==============
-
-The actual processing of the jobs in a queue is handled by a spearate
-component, known as the job processor. This component usually runs in its own
-thread and provides its own main loop.
-
-  >>> from lovely.remotetask import processor
-
-The ``processor`` module provides several implementations of the processor
-API. Let's create the necessary components to test the processor:
-
-1. Create the task service and add it to the root site:
-
-  >>> from lovely import remotetask
-  >>> tasks = remotetask.TaskService()
-
-  >>> sm = root['tasks'] = tasks
-
-2. Register the service as a utility:
-
-  >>> from lovely.remotetask import interfaces
-  >>> sm = root.getSiteManager()
-  >>> sm.registerUtility(tasks, interfaces.ITaskService, name='tasks')
-
-3. Register a task that simply sleeps and writes a message:
-
-  >>> import logging
-  >>> log = logging.getLogger('lovely.remotetask')
-  >>> import time
-  >>> def sleep((sleepTime, id)):
-  ...     time.sleep(sleepTime)
-  ...     log.info('Job: %i' %id)
-
-  >>> import lovely.remotetask.task
-  >>> sleepTask = remotetask.task.SimpleTask(sleep)
-
-  >>> import zope.component
-  >>> zope.component.provideUtility(sleepTask, name='sleep')
-
-4. Setup a database:
-
-  >>> from ZODB.tests import util
-  >>> import transaction
-  >>> db = util.DB()
-
-  >>> from zope.app.publication.zopepublication import ZopePublication
-  >>> conn = db.open()
-  >>> conn.root()[ZopePublication.root_name] = root
-  >>> transaction.commit()
-
-
-The Simple Processor
---------------------
-
-This processor executes one job at a time. It was designed for jobs that would
-take a long time and use up most of the processing power of a computer. It is
-also the default processor for the task service.
-
-Let's first register a few tasks:
-
-  >>> jobid = tasks.add(u'sleep', (0.04, 1))
-  >>> jobid = tasks.add(u'sleep', (0.1,  2))
-  >>> jobid = tasks.add(u'sleep', (0,    3))
-  >>> jobid = tasks.add(u'sleep', (0.08, 4))
-  >>> transaction.commit()
-
-Let's start by executing a job directly. The first argument to the simple
-processor constructor is the database and the second the traversal stack to
-the task service. All other arguments are optional:
-
-  >>> proc = processor.SimpleProcessor(
-  ...     tasks._p_jar.db(), ['tasks'], waitTime=0.0)
-
-Let's now process the first job. We clear the log and we also have to end any
-existing interactions in order to process the job in this thread:
-
-  >>> log_info.clear()
-
-  >>> from zope.security import management
-  >>> management.endInteraction()
-  >>> proc.processNext()
-  True
-
-  >>> print log_info
-  lovely.remotetask INFO
-    Job: 1
-
-Let's now use the processor from within the task service. Since the processor
-constructors also accept additional arguments, they are specified as well:
-
-  >>> tasks.processorFactory
-  <class 'lovely.remotetask.processor.SimpleProcessor'>
-  >>> tasks.processorArguments
-  {'waitTime': 0.0}
-
-The wait time has been set to zero for testing purposes only. It is really set
-to 1 second by default. Let's now start processing tasks, wait a little bit
-for all the jobs to complete and then stop processing again:
-
-  >>> tasks.startProcessing()
-  >>> transaction.commit()
-
-  >>> time.sleep(0.5)
-
-  >>> tasks.stopProcessing()
-  >>> transaction.commit()
-
-The log shows that all jobs have been processed. But more importantly, they
-were all completed in the order they were defined.
-
-  >>> print log_info
-  lovely.remotetask INFO
-    Job: 1
-  lovely.remotetask INFO
-    Job: 2
-  lovely.remotetask INFO
-    Job: 3
-  lovely.remotetask INFO
-    Job: 4
-
-Transactions in jobs
---------------------
-
-With the SimpleProcessor, jobs _should_ not change the transaction status, since
-both the administration of the jobs by the TaskService and the job itself run in
-the same transaction, so aborting it from inside the job could wreak havoc with
-the administrative part.
-
-This is a regression test that aborting the transaction inside the job does not
-lead to an infinite loop (because SimpleProcessor pulls the job inside the
-transaction, so if it is aborted, the job remains on the queue):
-
-  >>> counter = 0
-  >>> def count(arg):
-  ...     global counter
-  ...     counter += 1
-  ...     transaction.abort()
-  >>> countTask = remotetask.task.SimpleTask(count)
-  >>> zope.component.provideUtility(countTask, name='count')
-
-  >>> jobid = tasks.add(u'count', ())
-  >>> transaction.commit()
-
-  >>> tasks.startProcessing()
-  >>> transaction.commit()
-  >>> time.sleep(0.5)
-  >>> tasks.stopProcessing()
-  >>> transaction.commit()
-  >>> time.sleep(0.5)
-  >>> transaction.abort() # prevent spurious conflict errors
-  >>> counter
-  1
-
-
-The Multi-thread Processor
---------------------------
-
-The multi-threaded processor executes several jobs at once. It was designed
-for jobs that would take a long time but use very little processing power.
-
-Let's add a few new tasks to execute:
-
-  >>> jobid = tasks.add(u'sleep', (0.04,  1))
-  >>> jobid = tasks.add(u'sleep', (0.18,  2))
-  >>> jobid = tasks.add(u'sleep', (0,     3))
-  >>> jobid = tasks.add(u'sleep', (0.02,  4))
-  >>> transaction.commit()
-
-Before testing the processor in the task service, let's have a look at every
-method by itself. So we instantiate the processor:
-
-  >>> proc = processor.MultiProcessor(
-  ...     tasks._p_jar.db(), ['tasks'], waitTime=0)
-
-The maximum amount of threads can be set as well:
-
-  >>> proc.maxThreads
-  5
-
-All working threads can be reviewed at any time:
-
-  >>> proc.threads
-  []
-
-At any time you can ask the service whether any jobs are waiting to be
-executed:
-
-  >>> from zope.security import management
-  >>> management.endInteraction()
-
-  >>> proc.hasJobsWaiting()
-  True
-
-Once you know that jobs are available, you can claim a job:
-
-  >>> jobid = proc.claimNextJob()
-  >>> jobid
-  1392637180
-
-We need to claim a job before executing it, so that the database marks the job
-as claimed and no new thread picks up the job. Once we claimed a particular
-job, we can process it:
-
-  >>> log_info.clear()
-
-  >>> proc.processNext(jobid)
-  True
-
-  >>> print log_info
-  lovely.remotetask INFO
-    Job: 1
-
-Let's now have a look at using the processor in the task service. This
-primarily means setting the processor factory:
-
-  >>> management.newInteraction()
-
-  >>> tasks.processorFactory = processor.MultiProcessor
-  >>> transaction.commit()
-
-  >>> log_info.clear()
-
-Let's now process the remaining jobs:
-
-  >>> tasks.startProcessing()
-  >>> transaction.commit()
-
-  >>> time.sleep(0.5)
-
-  >>> tasks.stopProcessing()
-  >>> transaction.commit()
-
-As you can see, this time the jobs are not completed in order, because they
-all need different time to execute:
-
-  >>> print log_info
-  lovely.remotetask INFO
-    Job: 3
-  lovely.remotetask INFO
-    Job: 4
-  lovely.remotetask INFO
-    Job: 2
-
-Let's now set the thread limit to two and construct a new set of tasks that
-demonstrate that not more than two threads run at the same time:
-
-  >>> tasks.processorArguments = {'waitTime': 0.0, 'maxThreads': 2}
-
-  >>> jobid = tasks.add(u'sleep', (0.03, 1))
-  >>> jobid = tasks.add(u'sleep', (0.05, 2))
-  >>> jobid = tasks.add(u'sleep', (0.03, 3))
-  >>> jobid = tasks.add(u'sleep', (0.08, 4))
-  >>> transaction.commit()
-
-If all tasks are processed at once, job 3 should be done first, but since the
-job has to wait for an available thread, it will come in third. We can now run
-the jobs and see the result:
-
-  >>> log_info.clear()
-
-  >>> tasks.startProcessing()
-  >>> transaction.commit()
-
-  >>> time.sleep(0.5)
-
-  >>> tasks.stopProcessing()
-  >>> transaction.commit()
-
-  >>> print log_info
-  lovely.remotetask INFO
-    Job: 1
-  lovely.remotetask INFO
-    Job: 2
-  lovely.remotetask INFO
-    Job: 3
-  lovely.remotetask INFO
-    Job: 4
-
-Note: Sometimes (about 20% of the time in this test) the log will contain a
-conflict error. As long as the error is *not* produced by ``processNext()``,
-they are harmless, even though they are logged on the error level. To avoid
-conflict errors when calling the ``processNext()`` method, we wait a little
-bit to give the thread a chance to start up. This waiting time is defined by a
-module global:
-
-  >>> processor.THREAD_STARTUP_WAIT
-  0.050000000000000003
-
-On my machine this number seems to work well. On slower machines you might
-want to raise that number.
-
-Let's give Python enough time to stop all the threads.
-
-  >>> time.sleep(0.05)

Modified: Sandbox/gotcha/z3c.taskqueue/src/z3c/taskqueue/tests.py
===================================================================
--- Sandbox/gotcha/z3c.taskqueue/src/z3c/taskqueue/tests.py	2010-03-08 10:23:30 UTC (rev 109800)
+++ Sandbox/gotcha/z3c.taskqueue/src/z3c/taskqueue/tests.py	2010-03-08 10:25:09 UTC (rev 109801)
@@ -79,7 +79,6 @@
         unittest.makeSuite(TestIdGenerator),
         DocFileSuite('README.txt',
                      'startlater.txt',
-                     'processor.txt',
                      'TESTING.txt',
                      setUp=setUp,
                      tearDown=tearDown,



More information about the checkins mailing list