[Checkins] SVN: lovely.remotetask/trunk/ - Refactored processor to be a replacable component. The original processor is

Bernd Dorn bernd.dorn at lovelysystems.com
Mon Dec 3 03:25:23 EST 2007


hi stephan

are these changes backward compatible, i mean in the sense of  
persistent objects etc? if these changes are incompatible, then the  
major version number needs to be bumped.

we always use multiply remotetask services to have more than one  
thread, have you considered, that you might get conflicts when doing  
multithreaded tasks? it is very important to not get conflicts  
otherwise the task runs again and again. the conflicts usualy appear  
in the queue.

thx for your work, greez bernd


On 03.12.2007, at 01:18, Stephan Richter wrote:

> Log message for revision 82086:
>   - Refactored processor to be a replacable component. The original  
> processor is
>     now known as the ``SimpleProcessor``. Also implemented the
>     ``MultiProcessor``, which is able to process multiple jobs at  
> once using
>     threads.
>
>
> Changed:
>   U   lovely.remotetask/trunk/buildout.cfg
>   U   lovely.remotetask/trunk/src/lovely/remotetask/README.txt
>   U   lovely.remotetask/trunk/src/lovely/remotetask/TESTING.txt
>   U   lovely.remotetask/trunk/src/lovely/remotetask/browser/README.txt
>   U   lovely.remotetask/trunk/src/lovely/remotetask/ftesting.zcml
>   U   lovely.remotetask/trunk/src/lovely/remotetask/interfaces.py
>   A   lovely.remotetask/trunk/src/lovely/remotetask/processor.py
>   A   lovely.remotetask/trunk/src/lovely/remotetask/processor.txt
>   U   lovely.remotetask/trunk/src/lovely/remotetask/service.py
>   U   lovely.remotetask/trunk/src/lovely/remotetask/tests.py
>
> -=-
> Modified: lovely.remotetask/trunk/buildout.cfg
> ===================================================================
> --- lovely.remotetask/trunk/buildout.cfg	2007-12-03 00:01:41 UTC  
> (rev 82085)
> +++ lovely.remotetask/trunk/buildout.cfg	2007-12-03 00:18:13 UTC  
> (rev 82086)
> @@ -1,9 +1,14 @@
>  [buildout]
>  develop = .
> -parts = test
> +parts = py test
>  index = http://download.zope.org/zope3.4
>
>  [test]
>  recipe = zc.recipe.testrunner
>  defaults = ['--tests-pattern', '^f?tests$']
>  eggs = lovely.remotetask [test]
> +
> +[py]
> +recipe = zc.recipe.egg
> +interpreter = python
> +eggs = lovely.remotetask
>
> Modified: lovely.remotetask/trunk/src/lovely/remotetask/README.txt
> ===================================================================
> --- lovely.remotetask/trunk/src/lovely/remotetask/README.txt	 
> 2007-12-03 00:01:41 UTC (rev 82085)
> +++ lovely.remotetask/trunk/src/lovely/remotetask/README.txt	 
> 2007-12-03 00:18:13 UTC (rev 82086)
> @@ -34,6 +34,8 @@
>  Usage
>  _____
>
> +  >>> STOP_SLEEP_TIME = 0.02
> +
>  Let's now start by creating a single service:
>
>>>> from lovely import remotetask
> @@ -126,9 +128,9 @@
>>>> service.isProcessing()
>    False
>
> -The TaskService is being started automatically - if specified in  
> zope.conf -
> -as soon as the `IDatabaseOpenedEvent` is fired. Let's emulate the  
> zope.conf
> -settings:
> +The ``TaskService`` is being started automatically - if specified in
> +``zope.conf`` - as soon as the ``IDatabaseOpenedEvent`` is fired.  
> Let's
> +emulate the ``zope.conf`` settings:
>
>>>> class Config(object):
>    ...     mapping = {}
> @@ -151,8 +153,8 @@
>
>>>> log_info.clear()
>
> -On Zope startup the IDatabaseOpenedEvent is fired, and will call
> -the bootStrap method:
> +On Zope startup the ``IDatabaseOpenedEvent`` is fired, and will call
> +the ``bootStrap()`` method:
>
>>>> from ZODB.tests import util
>>>> import transaction
> @@ -203,6 +205,8 @@
>>>> root_service.isProcessing()
>    False
>
> +  >>> import time; time.sleep(STOP_SLEEP_TIME)
> +
>  And reset the logger::
>
>>>> log_info.clear()
> @@ -240,6 +244,8 @@
>>>> service.isProcessing()
>    False
>
> +  >>> import time; time.sleep(STOP_SLEEP_TIME)
> +
>  Reset the product configuration with the asterisked service names::
>
>>>> config.mapping['autostart'] = '*@*'
> @@ -284,6 +290,8 @@
>>>> root_service.isProcessing()
>    False
>
> +  >>> import time; time.sleep(STOP_SLEEP_TIME)
> +
>  Reset the product configuration with the asterisked service names::
>
>>>> config.mapping['autostart'] = '*@TestTaskService1'
> @@ -320,6 +328,8 @@
>>>> service.isProcessing()
>    False
>
> +  >>> import time; time.sleep(STOP_SLEEP_TIME)
> +
>>>> config.mapping['autostart'] = '*@Foo'
>>>> setProductConfigurations([config])
>>>> getAutostartServiceNames()
> @@ -352,6 +362,8 @@
>>>> root_service.isProcessing()
>    False
>
> +  >>> import time; time.sleep(STOP_SLEEP_TIME)
> +
>  Let's now read a job:
>
>>>> jobid = service.add(u'echo', {'foo': 'bar'})
> @@ -654,8 +666,7 @@
>>>> service2.stopProcessing()
>>>> root_service.stopProcessing()
>
> -  >>> import time
> -  >>> time.sleep(0.5)
> +  >>> import time; time.sleep(STOP_SLEEP_TIME)
>
>  The threads have exited now::
>
>
> Modified: lovely.remotetask/trunk/src/lovely/remotetask/TESTING.txt
> ===================================================================
> --- lovely.remotetask/trunk/src/lovely/remotetask/TESTING.txt	 
> 2007-12-03 00:01:41 UTC (rev 82085)
> +++ lovely.remotetask/trunk/src/lovely/remotetask/TESTING.txt	 
> 2007-12-03 00:18:13 UTC (rev 82086)
> @@ -3,13 +3,13 @@
>  =====================
>
>  This package provides an implementation of a remote task execution  
> Web service
> -that allows to execute pre-defined tasks on another server. See  
> more info
> +that allows to execute pre-defined tasks on another server. See  
> more info
>  about the TaskService in README.txt. This test will test the  
> TaskServiceStub
> -implementation. The only different is, that the TaskServiceStub  
> will handle
> +implementation. The only different is, that the TaskServiceStub  
> will handle
>  task implementation providing ITaskStub interfaces rather then  
> ITask. This way
>  we can register stub tasks for a testing setup. See also another  
> usecase for
> -a task service stub implementation which is working with XML-RPC  
> in the
> -package lovely.transcoding.
> +a task service stub implementation which is working with XML-RPC  
> in the
> +package lovely.transcoding.
>
>  Let's now start by creating a task service stub:
>
> @@ -41,7 +41,7 @@
>  echo utility used in the REAME.txt tests for the ITaskStub interface:
>
>>>> import zope.component
> -  >>> zope.component.provideUtility(echoTask,  
> provides=testing.ITaskStub,
> +  >>> zope.component.provideUtility(echoTask,  
> provides=testing.ITaskStub,
>    ...     name='echo')
>
>  The echo task is now available in the service:
> @@ -110,7 +110,7 @@
>    ...     raise remotetask.task.TaskError('An error occurred.')
>
>>>> zope.component.provideUtility(
> -  ...     remotetask.task.SimpleTask(error),  
> provides=testing.ITaskStub,
> +  ...     remotetask.task.SimpleTask(error),  
> provides=testing.ITaskStub,
>    ...     name='error')
>
>  Now add and execute it:
>
> Modified: lovely.remotetask/trunk/src/lovely/remotetask/browser/ 
> README.txt
> ===================================================================
> --- lovely.remotetask/trunk/src/lovely/remotetask/browser/ 
> README.txt	2007-12-03 00:01:41 UTC (rev 82085)
> +++ lovely.remotetask/trunk/src/lovely/remotetask/browser/ 
> README.txt	2007-12-03 00:18:13 UTC (rev 82086)
> @@ -173,8 +173,11 @@
>>>> transaction.commit()
>>>> service.startProcessing()
>>>> transaction.commit()
> -  >>> sleep(1.5)
>
> +  >>> import time
> +  >>> time.sleep(1.5)
> +
> +
>  Note that the processing thread is daemonic, that way it won't  
> keep the process
>  alive unnecessarily.
>
> @@ -187,21 +190,23 @@
>>>> service.stopProcessing()
>>>> transaction.commit()
>
> +
>  We got log entries with the tracebacks of the division error.
>
>>>> logvalue = io.getvalue()
>>>> print logvalue
> -  catched a generic exception, preventing thread from crashing
> +  Caught a generic exception, preventing thread from crashing
>    integer division or modulo by zero
>    Traceback (most recent call last):
>    ...
>    ZeroDivisionError: integer division or modulo by zero
>    <BLANKLINE>
>
> -We had 3 retries.
> +We had 3 retries, but every error is reported twice, once by the  
> processor and
> +once from by the task service.
>
>>>> logvalue.count('ZeroDivisionError')
> -  3
> +  6
>
>  The job status is set to 'error'.
>
> @@ -227,17 +232,18 @@
>
>>>> logvalue = io.getvalue()
>>>> print logvalue
> -  catched a generic exception, preventing thread from crashing
> +  Caught a generic exception, preventing thread from crashing
>    integer division or modulo by zero
>    Traceback (most recent call last):
>    ...
>    ZeroDivisionError: integer division or modulo by zero
>    <BLANKLINE>
>
> -We had 3 retries.
> +We had 3 retries, but every error is reported twice, once by the  
> processor and
> +once from by the task service.
>
>>>> logvalue.count('ZeroDivisionError')
> -  3
> +  6
>
>  The job status is set to 'error'.
>
>
> Modified: lovely.remotetask/trunk/src/lovely/remotetask/ftesting.zcml
> ===================================================================
> --- lovely.remotetask/trunk/src/lovely/remotetask/ftesting.zcml	 
> 2007-12-03 00:01:41 UTC (rev 82085)
> +++ lovely.remotetask/trunk/src/lovely/remotetask/ftesting.zcml	 
> 2007-12-03 00:18:13 UTC (rev 82086)
> @@ -4,7 +4,7 @@
>             i18n_domain="zope">
>
>    <include package="zope.app.securitypolicy" file="meta.zcml" />
> -
> +
>    <include
>        zcml:condition="installed zope.app.zcmlfiles"
>        package="zope.app.zcmlfiles"
> @@ -37,7 +37,7 @@
>     title="Administrator"
>     login="mgr"
>     password="mgrpw" />
> -
> +
>    <grant
>     role="zope.Manager"
>     principal="zope.manager"
>
> Modified: lovely.remotetask/trunk/src/lovely/remotetask/interfaces.py
> ===================================================================
> --- lovely.remotetask/trunk/src/lovely/remotetask/interfaces.py	 
> 2007-12-03 00:01:41 UTC (rev 82085)
> +++ lovely.remotetask/trunk/src/lovely/remotetask/interfaces.py	 
> 2007-12-03 00:18:13 UTC (rev 82086)
> @@ -45,7 +45,7 @@
>          schema=interface.Interface,
>          required=False)
>
> -    def __call__(self, service, jobid, input):
> +    def __call__(service, jobid, input):
>          """Execute the task.
>
>          The ``service`` argument is the task service object. It  
> allows access to
> @@ -74,6 +74,12 @@
>              default = ITask,
>              )
>
> +    processor = schema.Field(
> +            title = u'Processor',
> +            description = u'A callable that processes queued jobs  
> using '
> +                          u'an infinite loop.',
> +            )
> +
>      def getAvailableTasks():
>          """Return a mapping of task name to the task."""
>
> @@ -114,6 +120,12 @@
>      def getError(jobid):
>          """Get the error of the job."""
>
> +    def hasJobWaiting():
> +        """Determine whether there are jobs that need to be  
> processed.
> +
> +        Returns a simple boolean.
> +        """
> +
>      def processNext():
>          """Process the next job in the queue."""
>
> @@ -260,3 +272,30 @@
>              required = False,
>              value_type = schema.TextLine()
>              )
> +
> +
> +class IProcessor(interface.Interface):
> +    """Job Processor
> +
> +    Process the jobs that are waiting in the queue. A processor is  
> meant to
> +    be run in a separate thread. To complete a job, it simply  
> calls back into
> +    the task server. This works, since it does not use up any Web  
> server
> +    threads.
> +
> +    Processing a job can take a long time. However, we do not have  
> to worry
> +    about transaction conflicts, since no other request is  
> touching the job
> +    object.
> +    """
> +
> +    running = schema.Bool(
> +        title=u"Running Flag",
> +        description=u"Tells whether the processor is currently  
> running.",
> +        readonly=True)
> +
> +    def __call__(db, servicePath):
> +        """Run the processor.
> +
> +        The ``db`` is a ZODB instance that is used to call back  
> into the task
> +        service. The ``servicePath`` specifies how to traverse to  
> the task
> +        service itself.
> +        """
>
> Added: lovely.remotetask/trunk/src/lovely/remotetask/processor.py
> ===================================================================
> --- lovely.remotetask/trunk/src/lovely/remotetask/ 
> processor.py	                        (rev 0)
> +++ lovely.remotetask/trunk/src/lovely/remotetask/processor.py	 
> 2007-12-03 00:18:13 UTC (rev 82086)
> @@ -0,0 +1,152 @@
> +##################################################################### 
> #########
> +#
> +# Copyright (c) 2006, 2007 Lovely Systems and Contributors.
> +# All Rights Reserved.
> +#
> +# This software is subject to the provisions of the Zope Public  
> License,
> +# Version 2.1 (ZPL).  A copy of the ZPL should accompany this  
> distribution.
> +# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR  
> IMPLIED
> +# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE  
> IMPLIED
> +# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND  
> FITNESS
> +# FOR A PARTICULAR PURPOSE.
> +#
> +##################################################################### 
> #########
> +"""Processor Implementations
> +
> +$Id$
> +"""
> +__docformat__ = 'restructuredtext'
> +import logging
> +import threading
> +import time
> +import transaction
> +import zope.interface
> +import zope.publisher.base
> +import zope.publisher.publish
> +from zope.app.publication.zopepublication import ZopePublication
> +from zope.security import management
> +from zope.security.proxy import removeSecurityProxy
> +from zope.traversing.api import traverse
> +
> +from lovely.remotetask import interfaces
> +
> +THREAD_STARTUP_WAIT = 0.05
> +
> +ERROR_MARKER = object()
> +
> +log = logging.getLogger('lovely.remotetask')
> +
> +
> +class ProcessorPublication(ZopePublication):
> +    """A custom publication to process the next job."""
> +
> +    def beforeTraversal(self, request):
> +        # Overwrite this method, so that the publication does not  
> attempt to
> +        # authenticate; we assume that the processor is allowed to do
> +        # everything. Note that the DB is not exposed to the job  
> callable.
> +        management.newInteraction(request)
> +        transaction.begin()
> +
> +    def traverseName(self, request, ob, name):
> +        # Provide a very simple traversal mechanism.
> +        return traverse(removeSecurityProxy(ob), name, None)
> +
> +class ProcessorRequest(zope.publisher.base.BaseRequest):
> +    """A custome publisher request for the processor."""
> +
> +    def __init__(self, *args):
> +        super(ProcessorRequest, self).__init__(None, {},  
> positional=args)
> +
> +
> +class SimpleProcessor(object):
> +    """Simple Job Processor
> +
> +    This processor only processes one job at a time.
> +    """
> +    zope.interface.implements(interfaces.IProcessor)
> +
> +    @property
> +    def running(self):
> +        return threading.currentThread().running
> +
> +    def __init__(self, db, servicePath, waitTime=1.0):
> +        self.db = db
> +        self.servicePath = servicePath
> +        self.waitTime = waitTime
> +
> +    def call(self, method, args=(), errorValue=ERROR_MARKER):
> +        # Create the path to the method.
> +        path = self.servicePath[:] + [method]
> +        path.reverse()
> +        # Produce a special processor event to be sent to the  
> publisher.
> +        request = ProcessorRequest(*args)
> +        request.setPublication(ProcessorPublication(self.db))
> +        request.setTraversalStack(path)
> +        # Publish the request, amking sure that *all* exceptions are
> +        # handled. The processot should *never* crash.
> +        try:
> +            zope.publisher.publish.publish(request, False)
> +            return request.response._result
> +        except Exception, error:
> +            # This thread should never crash, thus a blank except
> +            log.error('Processor: ``%s()`` caused an error!' %method)
> +            log.exception(error)
> +            return errorValue is ERROR_MARKER and error or errorValue
> +
> +    def processNext(self, jobid=None):
> +        return self.call('processNext', args=(None, jobid))
> +
> +    def __call__(self):
> +        while self.running:
> +            result = self.processNext()
> +            # If there are no jobs available, sleep a little bit  
> and then
> +            # check again.
> +            if not result:
> +                time.sleep(self.waitTime)
> +
> +
> +class MultiProcessor(SimpleProcessor):
> +    """Multi-threaded Job Processor
> +
> +    This processor can work on multiple jobs at the same time.
> +    """
> +    zope.interface.implements(interfaces.IProcessor)
> +
> +    def __init__(self, *args, **kwargs):
> +        self.maxThreads = kwargs.pop('maxThreads', 5)
> +        super(MultiProcessor, self).__init__(*args, **kwargs)
> +        self.threads = []
> +
> +    def hasJobsWaiting(self):
> +        return self.call('hasJobsWaiting', errorValue=False)
> +
> +    def claimNextJob(self):
> +        return self.call('claimNextJob', errorValue=None)
> +
> +    def __call__(self):
> +        # Start the processing loop
> +        while self.running:
> +            # Remove all dead threads
> +            for thread in self.threads:
> +                if not thread.isAlive():
> +                    self.threads.remove(thread)
> +            # If the number of threads equals the number of  
> maximum threads,
> +            # wait a little bit and then start over
> +            if len(self.threads) == self.maxThreads:
> +                time.sleep(self.waitTime)
> +                continue
> +            # Let's wait for jobs to become available
> +            while not self.hasJobsWaiting() and self.running:
> +                time.sleep(self.waitTime)
> +            # Okay, we have to do some work, so let's do that now.  
> Since we
> +            # are working with threads, we first have to claim a  
> job and then
> +            # execute it.
> +            jobid = self.claimNextJob()
> +            # If we got a job, let's work on it in a new thread.
> +            if jobid is not None:
> +                thread = threading.Thread(
> +                    target=self.processNext, args=(jobid,))
> +                self.threads.append(thread)
> +                thread.start()
> +                # Give the thread some time to start up:
> +                time.sleep(THREAD_STARTUP_WAIT)
>
>
> Property changes on: lovely.remotetask/trunk/src/lovely/remotetask/ 
> processor.py
> ___________________________________________________________________
> Name: svn:keywords
>    + Id
>
> Added: lovely.remotetask/trunk/src/lovely/remotetask/processor.txt
> ===================================================================
> --- lovely.remotetask/trunk/src/lovely/remotetask/ 
> processor.txt	                        (rev 0)
> +++ lovely.remotetask/trunk/src/lovely/remotetask/processor.txt	 
> 2007-12-03 00:18:13 UTC (rev 82086)
> @@ -0,0 +1,263 @@
> +==============
> +Job Processors
> +==============
> +
> +The actual processing of the jobs in a queue is handled by a spearate
> +component, known as the job processor. This component usually runs  
> in its own
> +thread and provides its own main loop.
> +
> +  >>> from lovely.remotetask import processor
> +
> +The ``processor`` module provides several implementations of the  
> processor
> +API. Let's create the necessary components to test the processor:
> +
> +1. Create the task service and add it to the root site:
> +
> +  >>> from lovely import remotetask
> +  >>> tasks = remotetask.TaskService()
> +
> +  >>> sm = root['tasks'] = tasks
> +
> +2. Register the service as a utility:
> +
> +  >>> from lovely.remotetask import interfaces
> +  >>> sm = root.getSiteManager()
> +  >>> sm.registerUtility(tasks, interfaces.ITaskService,  
> name='tasks')
> +
> +3. Register a task that simply sleeps and writes a message:
> +
> +  >>> import logging
> +  >>> log = logging.getLogger('lovely.remotetask')
> +  >>> import time
> +  >>> def sleep((sleepTime, id)):
> +  ...     time.sleep(sleepTime)
> +  ...     log.info('Job: %i' %id)
> +
> +  >>> import lovely.remotetask.task
> +  >>> sleepTask = remotetask.task.SimpleTask(sleep)
> +
> +  >>> import zope.component
> +  >>> zope.component.provideUtility(sleepTask, name='sleep')
> +
> +4. Setup a database:
> +
> +  >>> from ZODB.tests import util
> +  >>> import transaction
> +  >>> db = util.DB()
> +
> +  >>> from zope.app.publication.zopepublication import  
> ZopePublication
> +  >>> conn = db.open()
> +  >>> conn.root()[ZopePublication.root_name] = root
> +  >>> transaction.commit()
> +
> +
> +The Simple Processor
> +--------------------
> +
> +This processor executes one job at a time. It was designed for  
> jobs that would
> +take a long time and use up most of the processing power of a  
> computer. It is
> +also the default processor for the task service.
> +
> +Let's first register a few tasks:
> +
> +  >>> jobid = tasks.add(u'sleep', (0.04, 1))
> +  >>> jobid = tasks.add(u'sleep', (0.1,  2))
> +  >>> jobid = tasks.add(u'sleep', (0,    3))
> +  >>> jobid = tasks.add(u'sleep', (0.08, 4))
> +  >>> transaction.commit()
> +
> +Let's start by executing a job directly. The first argument to the  
> simple
> +processor constructor is the database and the second the traversal  
> stack to
> +the task service. All other arguments are optional:
> +
> +  >>> proc = processor.SimpleProcessor(
> +  ...     tasks._p_jar.db(), ['tasks'], waitTime=0.0)
> +
> +Let's now process the first job. We clear the log and we also have  
> to end any
> +existing interactions in order to process the job in this thread:
> +
> +  >>> log_info.clear()
> +
> +  >>> from zope.security import management
> +  >>> management.endInteraction()
> +  >>> proc.processNext()
> +  True
> +
> +  >>> print log_info
> +  lovely.remotetask INFO
> +    Job: 1
> +
> +Let's now use the processor from within the task service. Since  
> the processor
> +constructors also accept additional arguments, they are specified  
> as well:
> +
> +  >>> tasks.processorFactory
> +  <class 'lovely.remotetask.processor.SimpleProcessor'>
> +  >>> tasks.processorArguments
> +  {'waitTime': 0.0}
> +
> +The wait time has been set to zero for testing purposes only. It  
> is really set
> +to 1 second by default. Let's now start processing tasks, wait a  
> little bit
> +for all the jobs to complete and then stop processing again:
> +
> +  >>> tasks.startProcessing()
> +  >>> transaction.commit()
> +
> +  >>> time.sleep(0.5)
> +
> +  >>> tasks.stopProcessing()
> +  >>> transaction.commit()
> +
> +The log shows that all jobs have been processed. But more  
> importantly, they
> +were all completed in the order they were defined.
> +
> +  >>> print log_info
> +  lovely.remotetask INFO
> +    Job: 1
> +  lovely.remotetask INFO
> +    Job: 2
> +  lovely.remotetask INFO
> +    Job: 3
> +  lovely.remotetask INFO
> +    Job: 4
> +
> +
> +The Multi-thread Processor
> +--------------------------
> +
> +The multi-threaded processor executes several jobs at once. It was  
> designed
> +for jobs that would take a long time but use very little  
> processing power.
> +
> +Let's add a few new tasks to execute:
> +
> +  >>> jobid = tasks.add(u'sleep', (0.04,  1))
> +  >>> jobid = tasks.add(u'sleep', (0.18,  2))
> +  >>> jobid = tasks.add(u'sleep', (0,     3))
> +  >>> jobid = tasks.add(u'sleep', (0.02,  4))
> +  >>> transaction.commit()
> +
> +Before testing the processor in the task service, let's have a  
> look at every
> +method by itself. So we instantiate the processor:
> +
> +  >>> proc = processor.MultiProcessor(
> +  ...     tasks._p_jar.db(), ['tasks'], waitTime=0)
> +
> +The maximum amount of threads can be set as well:
> +
> +  >>> proc.maxThreads
> +  5
> +
> +All working threads can be reviewed at any time:
> +
> +  >>> proc.threads
> +  []
> +
> +At any time you can ask the service whether any jobs are waiting  
> to be
> +executed:
> +
> +  >>> from zope.security import management
> +  >>> management.endInteraction()
> +
> +  >>> proc.hasJobsWaiting()
> +  True
> +
> +Once you know that jobs are available, you can claim a job:
> +
> +  >>> jobid = proc.claimNextJob()
> +  >>> jobid
> +  5
> +
> +We need to claim a job before executing it, so that the database  
> marks the job
> +as claimed and no new thread picks up the job. Once we claimed a  
> particular
> +job, we can process it:
> +
> +  >>> log_info.clear()
> +
> +  >>> proc.processNext(jobid)
> +  True
> +
> +  >>> print log_info
> +  lovely.remotetask INFO
> +    Job: 1
> +
> +Let's now have a look at using the processor in the task service.  
> This
> +primarily means setting the processor factory:
> +
> +  >>> management.newInteraction()
> +
> +  >>> tasks.processorFactory = processor.MultiProcessor
> +  >>> transaction.commit()
> +
> +  >>> log_info.clear()
> +
> +Let's now process the remaining jobs:
> +
> +  >>> tasks.startProcessing()
> +  >>> transaction.commit()
> +
> +  >>> time.sleep(0.5)
> +
> +  >>> tasks.stopProcessing()
> +  >>> transaction.commit()
> +
> +As you can see, this time the jobs are not completed in order,  
> because they
> +all need different time to execute:
> +
> +  >>> print log_info
> +  lovely.remotetask INFO
> +    Job: 3
> +  lovely.remotetask INFO
> +    Job: 4
> +  lovely.remotetask INFO
> +    Job: 2
> +
> +Let's now set the thread limit to two and construct a new set of  
> tasks that
> +demonstrate that not more than two threads run at the same time:
> +
> +  >>> tasks.processorArguments = {'waitTime': 0.0, 'maxThreads': 2}
> +
> +  >>> jobid = tasks.add(u'sleep', (0.03, 1))
> +  >>> jobid = tasks.add(u'sleep', (0.05, 2))
> +  >>> jobid = tasks.add(u'sleep', (0.03, 3))
> +  >>> jobid = tasks.add(u'sleep', (0.08, 4))
> +  >>> transaction.commit()
> +
> +If all tasks are processed at once, job 3 should be done first,  
> but since the
> +job has to wait for an available thread, it will come in third. We  
> can now run
> +the jobs and see the result:
> +
> +  >>> log_info.clear()
> +
> +  >>> tasks.startProcessing()
> +  >>> transaction.commit()
> +
> +  >>> time.sleep(0.5)
> +
> +  >>> tasks.stopProcessing()
> +  >>> transaction.commit()
> +
> +  >>> print log_info
> +  lovely.remotetask INFO
> +    Job: 1
> +  lovely.remotetask INFO
> +    Job: 2
> +  lovely.remotetask INFO
> +    Job: 3
> +  lovely.remotetask INFO
> +    Job: 4
> +
> +Note: Sometimes (about 20% of the time in this test) the log will  
> contain a
> +conflict error. As long as the error is *not* produced by  
> ``processNext()``,
> +they are harmless, even though they are logged on the error level.  
> To avoid
> +conflict errors when calling the ``processNext()`` method, we wait  
> a little
> +bit to give the thread a chance to start up. This waiting time is  
> defined by a
> +module global:
> +
> +  >>> processor.THREAD_STARTUP_WAIT
> +  0.050000000000000003
> +
> +On my machine this number seems to work well. On slower machines  
> you might
> +want to raise that number.
> +
> +Let's give Python enough time to stop all the threads.
> +
> +  >>> time.sleep(0.05)
>
>
> Property changes on: lovely.remotetask/trunk/src/lovely/remotetask/ 
> processor.txt
> ___________________________________________________________________
> Name: svn:eol-style
>    + native
>
> Modified: lovely.remotetask/trunk/src/lovely/remotetask/service.py
> ===================================================================
> --- lovely.remotetask/trunk/src/lovely/remotetask/service.py	 
> 2007-12-03 00:01:41 UTC (rev 82085)
> +++ lovely.remotetask/trunk/src/lovely/remotetask/service.py	 
> 2007-12-03 00:18:13 UTC (rev 82086)
> @@ -1,6 +1,6 @@
>   
> ###################################################################### 
> ########
>  #
> -# Copyright (c) 2006 Lovely Systems and Contributors.
> +# Copyright (c) 2006, 2007 Lovely Systems and Contributors.
>  # All Rights Reserved.
>  #
>  # This software is subject to the provisions of the Zope Public  
> License,
> @@ -24,8 +24,6 @@
>  import time
>  import zc.queue
>  import zope.interface
> -import zope.publisher.base
> -import zope.publisher.publish
>  from BTrees.IOBTree import IOBTree
>  from zope import component
>  from zope.app import zapi
> @@ -33,17 +31,13 @@
>  from zope.app.container import contained
>  from zope.app.component.interfaces import ISite
>  from zope.app.publication.zopepublication import ZopePublication
> -from zope.security.proxy import removeSecurityProxy
> -from zope.traversing.api import traverse
>  from zope.component.interfaces import ComponentLookupError
> -from lovely.remotetask import interfaces, job, task
> +from lovely.remotetask import interfaces, job, task, processor
>
>  log = logging.getLogger('lovely.remotetask')
>
>  storage = threading.local()
>
> -SLEEP_TIME = 1
> -
>  class TaskService(contained.Contained, persistent.Persistent):
>      """A persistent task service.
>
> @@ -52,6 +46,8 @@
>      zope.interface.implements(interfaces.ITaskService)
>
>      taskInterface = interfaces.ITask
> +    processorFactory = processor.SimpleProcessor
> +    processorArguments = {'waitTime': 1.0}
>
>      _scheduledJobs  = None
>      _scheduledQueue = None
> @@ -150,15 +146,15 @@
>              self._scheduledJobs = IOBTree()
>          if self._scheduledQueue == None:
>              self._scheduledQueue = zc.queue.PersistentQueue()
> -        path = [parent.__name__ for parent in zapi.getParents(self)
> -                if parent.__name__]
> -        path.reverse()
> -        path.append(self.__name__)
> -        path.append('processNext')
> -
> -        thread = threading.Thread(
> -            target=processor, args=(self._p_jar.db(), path),
> -            name=self._threadName())
> +        # Create the path to the service within the DB.
> +        servicePath = [parent.__name__ for parent in  
> zapi.getParents(self)
> +                       if parent.__name__]
> +        servicePath.reverse()
> +        servicePath.append(self.__name__)
> +        # Start the thread running the processor inside.
> +        processor = self.processorFactory(
> +            self._p_jar.db(), servicePath, **self.processorArguments)
> +        thread = threading.Thread(target=processor,  
> name=self._threadName())
>          thread.setDaemon(True)
>          thread.running = True
>          thread.start()
> @@ -195,8 +191,38 @@
>          path.append(self.__name__)
>          return '.'.join(path)
>
> -    def processNext(self, now=None):
> +    def hasJobsWaiting(self, now=None):
> +        # If there is are any simple jobs in the queue, we have  
> work to do.
> +        if self._queue:
> +            return True
> +        # First, move new cron jobs from the scheduled queue into  
> the cronjob
> +        # list.
> +        if now is None:
> +            now = int(time.time())
> +        while len(self._scheduledQueue) > 0:
> +            job = self._scheduledQueue.pull()
> +            if job.status is not interfaces.CANCELLED:
> +                self._insertCronJob(job, now)
> +        # Now get all jobs that should be done now or earlier; if  
> there are
> +        # any that do not have errors or are cancelled, then we  
> have jobs to
> +        # do.
> +        for key in self._scheduledJobs.keys(max=now):
> +            jobs = [job for job in self._scheduledJobs[key]
> +                    if job.status not in (interfaces.CANCELLED,
> +                                          interfaces.ERROR)]
> +            if jobs:
> +                return True
> +        return False
> +
> +    def claimNextJob(self, now=None):
>          job = self._pullJob(now)
> +        return job and job.id or None
> +
> +    def processNext(self, now=None, jobid=None):
> +        if jobid is None:
> +            job = self._pullJob(now)
> +        else:
> +            job = self.jobs[jobid]
>          if job is None:
>              return False
>          try:
> @@ -222,8 +248,8 @@
>                  job.status = interfaces.ERROR
>          except Exception, error:
>              if storage.runCount <= 3:
> -                log.error(
> -                    'catched a generic exception, preventing  
> thread from crashing')
> +                log.error('Caught a generic exception, preventing  
> thread '
> +                          'from crashing')
>                  log.exception(error)
>                  raise
>              else:
> @@ -244,7 +270,7 @@
>          # list
>          if now is None:
>              now = int(time.time())
> -        while len(self._scheduledQueue)>0:
> +        while len(self._scheduledQueue) > 0:
>              job = self._scheduledQueue.pull()
>              if job.status is not interfaces.CANCELLED:
>                  self._insertCronJob(job, now)
> @@ -292,33 +318,6 @@
>          self._scheduledJobs[nextCallTime] = jobs + (job,)
>
>
> -class ProcessorPublication(ZopePublication):
> -    """A custom publication to process the next job."""
> -
> -    def traverseName(self, request, ob, name):
> -        return traverse(removeSecurityProxy(ob), name, None)
> -
> -
> -def processor(db, path):
> -    """Job Processor
> -
> -    Process the jobs that are waiting in the queue. This processor  
> is meant to
> -    be run in a separate process; however, it simply goes back to  
> the task
> -    service to actually do the processing.
> -    """
> -    path.reverse()
> -    while threading.currentThread().running:
> -        request = zope.publisher.base.BaseRequest(None, {})
> -        request.setPublication(ProcessorPublication(db))
> -        request.setTraversalStack(path)
> -        try:
> -            zope.publisher.publish.publish(request, False)
> -            if not request.response._result:
> -                time.sleep(SLEEP_TIME)
> -        except:
> -            # This thread should never crash, thus a blank except
> -            pass
> -
>  def getAutostartServiceNames():
>      """get a list of services to start"""
>
>
> Modified: lovely.remotetask/trunk/src/lovely/remotetask/tests.py
> ===================================================================
> --- lovely.remotetask/trunk/src/lovely/remotetask/tests.py	 
> 2007-12-03 00:01:41 UTC (rev 82085)
> +++ lovely.remotetask/trunk/src/lovely/remotetask/tests.py	 
> 2007-12-03 00:18:13 UTC (rev 82086)
> @@ -35,13 +35,15 @@
>
>      log_info = InstalledHandler('lovely.remotetask')
>      test.globs['log_info'] = log_info
> -    service.SLEEP_TIME = 0
> +    test.origArgs = service.TaskService.processorArguments
> +    service.TaskService.processorArguments = {'waitTime': 0.0}
>
>  def tearDown(test):
>      placefulTearDown()
>      log_info = test.globs['log_info']
> +    log_info.clear()
>      log_info.uninstall()
> -    service.SLEEP_TIME = 1
> +    service.TaskService.processorArguments = test.origArgs
>
>  def test_suite():
>      return unittest.TestSuite((
> @@ -52,6 +54,12 @@
>                       |doctest.ELLIPSIS
>                       |INTERPRET_FOOTNOTES
>                       ),
> +        DocFileSuite('processor.txt',
> +                     setUp=setUp,
> +                     tearDown=tearDown,
> +                     optionflags=doctest.NORMALIZE_WHITESPACE
> +                     |doctest.ELLIPSIS
> +                     ),
>          DocFileSuite('TESTING.txt',
>                       setUp=placelesssetup.setUp,
>                       tearDown=placelesssetup.tearDown,
>
> _______________________________________________
> Checkins mailing list
> Checkins at zope.org
> http://mail.zope.org/mailman/listinfo/checkins

-- 
Lovely Systems, senior developer

phone: +43 5572 908060, fax: +43 5572 908060-77
Schmelzhütterstraße 26a, 6850 Dornbirn, Austria
skype: bernd.dorn



-------------- next part --------------
A non-text attachment was scrubbed...
Name: smime.p7s
Type: application/pkcs7-signature
Size: 2548 bytes
Desc: not available
Url : http://mail.zope.org/pipermail/checkins/attachments/20071203/dcb7e7e2/smime-0001.bin


More information about the Checkins mailing list