[Checkins] SVN: Sandbox/gotcha/z3c.taskqueue/src/z3c/taskqueue/ split service into baseservice to support five.taskqueue

Godefroid Chapelle gotcha at bubblenet.be
Wed Mar 31 09:04:25 EDT 2010


Log message for revision 110360:
  split service into baseservice to support five.taskqueue
  

Changed:
  A   Sandbox/gotcha/z3c.taskqueue/src/z3c/taskqueue/baseservice.py
  U   Sandbox/gotcha/z3c.taskqueue/src/z3c/taskqueue/service.py

-=-
Copied: Sandbox/gotcha/z3c.taskqueue/src/z3c/taskqueue/baseservice.py (from rev 110359, Sandbox/gotcha/z3c.taskqueue/src/z3c/taskqueue/service.py)
===================================================================
--- Sandbox/gotcha/z3c.taskqueue/src/z3c/taskqueue/baseservice.py	                        (rev 0)
+++ Sandbox/gotcha/z3c.taskqueue/src/z3c/taskqueue/baseservice.py	2010-03-31 13:04:25 UTC (rev 110360)
@@ -0,0 +1,328 @@
+##############################################################################
+#
+# Copyright (c) 2006, 2007 Lovely Systems and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""Task Service Implementation
+
+"""
+__docformat__ = 'restructuredtext'
+
+from zope import component
+from zope.container import contained
+from zope.component.interfaces import ComponentLookupError
+import threading
+import datetime
+import logging
+import persistent
+import random
+import time
+import zc.queue
+import zope.interface
+
+from z3c.taskqueue import interfaces, job, task
+from z3c.taskqueue import processor
+
+log = logging.getLogger('z3c.taskqueue')
+
+
+class BaseTaskService(contained.Contained, persistent.Persistent):
+    """A persistent task service.
+
+    The available tasks for this service are managed as utilities.
+    """
+    zope.interface.implements(interfaces.ITaskService)
+
+    taskInterface = interfaces.ITask
+
+    _v_nextid = None
+    containerClass = None
+    processorFactory = processor.SimpleProcessor
+    processorArguments = {'waitTime': 1.0}
+
+    def __init__(self):
+        super(BaseTaskService, self).__init__()
+        self.jobs = self.containerClass()
+        self._scheduledJobs = self.containerClass()
+        self._queue = zc.queue.Queue()
+        self._scheduledQueue = zc.queue.Queue()
+
+    def getAvailableTasks(self):
+        """See interfaces.ITaskService"""
+        return dict(component.getUtilitiesFor(self.taskInterface))
+
+    def add(self, task, input=None, startLater=False):
+        """See interfaces.ITaskService"""
+        if task not in self.getAvailableTasks():
+            raise ValueError('Task does not exist')
+        jobid = self._generateId()
+        newjob = job.Job(jobid, task, input)
+        self.jobs[jobid] = newjob
+        if startLater:
+            newjob.status = interfaces.STARTLATER
+        else:
+            self._queue.put(newjob)
+            newjob.status = interfaces.QUEUED
+        return jobid
+
+    def addCronJob(self, task, input=None,
+                   minute=(),
+                   hour=(),
+                   dayOfMonth=(),
+                   month=(),
+                   dayOfWeek=(),
+                   delay=None,
+                  ):
+        jobid = self._generateId()
+        newjob = job.CronJob(jobid, task, input,
+                minute, hour, dayOfMonth, month, dayOfWeek, delay)
+        self.jobs[jobid] = newjob
+        if newjob.delay is None:
+            newjob.status = interfaces.CRONJOB
+        else:
+            newjob.status = interfaces.DELAYED
+        self._scheduledQueue.put(newjob)
+        return jobid
+
+    def startJob(self, jobid):
+        job = self.jobs[jobid]
+        if job.status == interfaces.STARTLATER:
+            self._queue.put(job)
+            job.status = interfaces.QUEUED
+            return True
+        return False
+
+    def reschedule(self, jobid):
+        self._scheduledQueue.put(self.jobs[jobid])
+
+    def clean(self, status=[interfaces.CANCELLED, interfaces.ERROR,
+                            interfaces.COMPLETED]):
+        """See interfaces.ITaskService"""
+        allowed = [interfaces.CANCELLED, interfaces.ERROR,
+                   interfaces.COMPLETED]
+        for key in list(self.jobs.keys()):
+            job = self.jobs[key]
+            if job.status in status:
+                if job.status not in allowed:
+                    raise ValueError('Not allowed status for removing. %s' %
+                        job.status)
+                del self.jobs[key]
+
+    def cancel(self, jobid):
+        """See interfaces.ITaskService"""
+        for idx, job in enumerate(self._queue):
+            if job.id == jobid:
+                job.status = interfaces.CANCELLED
+                self._queue.pull(idx)
+                break
+        if jobid in self.jobs:
+            job = self.jobs[jobid]
+            if (job.status == interfaces.CRONJOB
+                or job.status == interfaces.DELAYED
+                or job.status == interfaces.STARTLATER):
+                job.status = interfaces.CANCELLED
+
+    def getStatus(self, jobid):
+        """See interfaces.ITaskService"""
+        return self.jobs[jobid].status
+
+    def getResult(self, jobid):
+        """See interfaces.ITaskService"""
+        return self.jobs[jobid].output
+
+    def getError(self, jobid):
+        """See interfaces.ITaskService"""
+        return str(self.jobs[jobid].error)
+
+    def hasJobsWaiting(self, now=None):
+        # If there is are any simple jobs in the queue, we have work to do.
+        if self._queue:
+            return True
+        # First, move new cron jobs from the scheduled queue into the cronjob
+        # list.
+        if now is None:
+            now = int(time.time())
+        while len(self._scheduledQueue) > 0:
+            job = self._scheduledQueue.pull()
+            if job.status is not interfaces.CANCELLED:
+                self._insertCronJob(job, now)
+        # Now get all jobs that should be done now or earlier; if there are
+        # any that do not have errors or are cancelled, then we have jobs to
+        # do.
+        for key in self._scheduledJobs.keys(max=now):
+            jobs = [job for job in self._scheduledJobs[key]
+                    if job.status not in (interfaces.CANCELLED,
+                                          interfaces.ERROR)]
+            if jobs:
+                return True
+        return False
+
+    def claimNextJob(self, now=None):
+        job = self._pullJob(now)
+        return job and job.id or None
+
+    def processNext(self, now=None, jobid=None):
+        """
+        process next job in the queue
+        """
+        if jobid is None:
+            job = self._pullJob(now)
+        else:
+            job = self.jobs[jobid]
+        if job is None:
+            return False
+        if job.status == interfaces.COMPLETED:
+            return True
+        try:
+            jobtask = component.getUtility(self.taskInterface, name=job.task)
+        except ComponentLookupError, error:
+            log.error('Task "%s" not found!' % job.task)
+            log.exception(error)
+            job.error = error
+            if job.status != interfaces.CRONJOB:
+                job.status = interfaces.ERROR
+            return True
+        job.started = datetime.datetime.now()
+        try:
+            job.output = jobtask(self, job.id, job.input)
+            if job.status != interfaces.CRONJOB:
+                job.status = interfaces.COMPLETED
+        except task.TaskError, error:
+            job.error = error
+            if job.status != interfaces.CRONJOB:
+                job.status = interfaces.ERROR
+        except Exception, error:
+            job.error = error
+            if job.status != interfaces.CRONJOB:
+                job.status = interfaces.ERROR
+        job.completed = datetime.datetime.now()
+        return True
+
+    def _pullJob(self, now=None):
+        # first move new cron jobs from the scheduled queue into the cronjob
+        # list
+        if now is None:
+            now = int(time.time())
+        while len(self._scheduledQueue) > 0:
+            job = self._scheduledQueue.pull()
+            if job.status is not interfaces.CANCELLED:
+                self._insertCronJob(job, now)
+        # try to get the next cron job
+        while True:
+            try:
+                first = self._scheduledJobs.minKey()
+            except ValueError:
+                break
+            else:
+                if first > now:
+                    break
+                jobs = self._scheduledJobs[first]
+                job = jobs[0]
+                self._scheduledJobs[first] = jobs[1:]
+                if len(self._scheduledJobs[first]) == 0:
+                    del self._scheduledJobs[first]
+                if (job.status != interfaces.CANCELLED
+                    and job.status != interfaces.ERROR):
+                    if job.status != interfaces.DELAYED:
+                        self._insertCronJob(job, now)
+                    return job
+        # get a job from the input queue
+        if self._queue:
+            return self._queue.pull()
+        return None
+
+    def _insertCronJob(self, job, now):
+        for callTime, scheduled in list(self._scheduledJobs.items()):
+            if job in scheduled:
+                scheduled = list(scheduled)
+                scheduled.remove(job)
+                if len(scheduled) == 0:
+                    del self._scheduledJobs[callTime]
+                else:
+                    self._scheduledJobs[callTime] = tuple(scheduled)
+                break
+        nextCallTime = job.timeOfNextCall(now)
+        job.scheduledFor = datetime.datetime.fromtimestamp(nextCallTime)
+        set = self._scheduledJobs.get(nextCallTime)
+        if set is None:
+            self._scheduledJobs[nextCallTime] = ()
+        jobs = self._scheduledJobs[nextCallTime]
+        self._scheduledJobs[nextCallTime] = jobs + (job,)
+
+    def _generateId(self):
+        """Generate an id which is not yet taken.
+
+        This tries to allocate sequential ids so they fall into the
+        same BTree bucket, and randomizes if it stumbles upon a
+        used one.
+        """
+        while True:
+            if self._v_nextid is None:
+                self._v_nextid = random.randrange(0, self.maxint)
+            uid = self._v_nextid
+            self._v_nextid += 1
+            if uid not in self.jobs:
+                return uid
+            self._v_nextid = None
+
+    def startProcessing(self):
+        """See interfaces.ITaskService"""
+        if self.__parent__ is None:
+            return
+        if self._scheduledJobs is None:
+            self._scheduledJobs = self.containerClass()
+        if self._scheduledQueue is None:
+            self._scheduledQueue = zc.queue.PersistentQueue()
+        # Create the path to the service within the DB.
+        servicePath = self.getServicePath()
+        log.info('starting service %s' % self._threadName())
+        # Start the thread running the processor inside.
+        processor = self.processorFactory(
+            self._p_jar.db(), servicePath, **self.processorArguments)
+        thread = threading.Thread(target=processor, name=self._threadName())
+        thread.setDaemon(True)
+        thread.running = True
+        thread.start()
+
+    def stopProcessing(self):
+        """See interfaces.ITaskService"""
+        if self.__name__ is None:
+            return
+        name = self._threadName()
+        log.info('stopping service %s' % name)
+        for thread in threading.enumerate():
+            if thread.getName() == name:
+                thread.running = False
+                break
+
+    def isProcessing(self):
+        """See interfaces.ITaskService"""
+        if self.__name__ is not None:
+            name = self._threadName()
+            for thread in threading.enumerate():
+                if thread.getName() == name:
+                    if thread.running:
+                        return True
+                    break
+        return False
+
+    def getServicePath(self):
+        raise NotImplemented
+
+    def _threadName(self):
+        """Return name of the processing thread."""
+        # This name isn't unique based on the path to self, but this doesn't
+        # change the name that's been used in past versions.
+        path = self.getServicePath()
+        path.append('remotetasks')
+        path.reverse()
+        path.append(self.__name__)
+        return '.'.join(path)

Modified: Sandbox/gotcha/z3c.taskqueue/src/z3c/taskqueue/service.py
===================================================================
--- Sandbox/gotcha/z3c.taskqueue/src/z3c/taskqueue/service.py	2010-03-31 12:21:07 UTC (rev 110359)
+++ Sandbox/gotcha/z3c.taskqueue/src/z3c/taskqueue/service.py	2010-03-31 13:04:25 UTC (rev 110360)
@@ -16,321 +16,13 @@
 """
 __docformat__ = 'restructuredtext'
 
-from zope import component
-from zope.container import contained
-from zope.component.interfaces import ComponentLookupError
 from zope.traversing.api import getParents
-import threading
-import datetime
-import logging
-import persistent
-import random
-import time
-import zc.queue
-import zope.interface
 
 from BTrees import family32
 
-from z3c.taskqueue import interfaces, job, task
-from z3c.taskqueue import processor
+from z3c.taskqueue.baseservice import BaseTaskService
 
-log = logging.getLogger('z3c.taskqueue')
 
-
-class BaseTaskService(contained.Contained, persistent.Persistent):
-    """A persistent task service.
-
-    The available tasks for this service are managed as utilities.
-    """
-    zope.interface.implements(interfaces.ITaskService)
-
-    taskInterface = interfaces.ITask
-
-    _v_nextid = None
-    containerClass = None
-    processorFactory = processor.SimpleProcessor
-    processorArguments = {'waitTime': 1.0}
-
-    def __init__(self):
-        super(BaseTaskService, self).__init__()
-        self.jobs = self.containerClass()
-        self._scheduledJobs = self.containerClass()
-        self._queue = zc.queue.Queue()
-        self._scheduledQueue = zc.queue.Queue()
-
-    def getAvailableTasks(self):
-        """See interfaces.ITaskService"""
-        return dict(component.getUtilitiesFor(self.taskInterface))
-
-    def add(self, task, input=None, startLater=False):
-        """See interfaces.ITaskService"""
-        if task not in self.getAvailableTasks():
-            raise ValueError('Task does not exist')
-        jobid = self._generateId()
-        newjob = job.Job(jobid, task, input)
-        self.jobs[jobid] = newjob
-        if startLater:
-            newjob.status = interfaces.STARTLATER
-        else:
-            self._queue.put(newjob)
-            newjob.status = interfaces.QUEUED
-        return jobid
-
-    def addCronJob(self, task, input=None,
-                   minute=(),
-                   hour=(),
-                   dayOfMonth=(),
-                   month=(),
-                   dayOfWeek=(),
-                   delay=None,
-                  ):
-        jobid = self._generateId()
-        newjob = job.CronJob(jobid, task, input,
-                minute, hour, dayOfMonth, month, dayOfWeek, delay)
-        self.jobs[jobid] = newjob
-        if newjob.delay is None:
-            newjob.status = interfaces.CRONJOB
-        else:
-            newjob.status = interfaces.DELAYED
-        self._scheduledQueue.put(newjob)
-        return jobid
-
-    def startJob(self, jobid):
-        job = self.jobs[jobid]
-        if job.status == interfaces.STARTLATER:
-            self._queue.put(job)
-            job.status = interfaces.QUEUED
-            return True
-        return False
-
-    def reschedule(self, jobid):
-        self._scheduledQueue.put(self.jobs[jobid])
-
-    def clean(self, status=[interfaces.CANCELLED, interfaces.ERROR,
-                            interfaces.COMPLETED]):
-        """See interfaces.ITaskService"""
-        allowed = [interfaces.CANCELLED, interfaces.ERROR,
-                   interfaces.COMPLETED]
-        for key in list(self.jobs.keys()):
-            job = self.jobs[key]
-            if job.status in status:
-                if job.status not in allowed:
-                    raise ValueError('Not allowed status for removing. %s' %
-                        job.status)
-                del self.jobs[key]
-
-    def cancel(self, jobid):
-        """See interfaces.ITaskService"""
-        for idx, job in enumerate(self._queue):
-            if job.id == jobid:
-                job.status = interfaces.CANCELLED
-                self._queue.pull(idx)
-                break
-        if jobid in self.jobs:
-            job = self.jobs[jobid]
-            if (job.status == interfaces.CRONJOB
-                or job.status == interfaces.DELAYED
-                or job.status == interfaces.STARTLATER):
-                job.status = interfaces.CANCELLED
-
-    def getStatus(self, jobid):
-        """See interfaces.ITaskService"""
-        return self.jobs[jobid].status
-
-    def getResult(self, jobid):
-        """See interfaces.ITaskService"""
-        return self.jobs[jobid].output
-
-    def getError(self, jobid):
-        """See interfaces.ITaskService"""
-        return str(self.jobs[jobid].error)
-
-    def hasJobsWaiting(self, now=None):
-        # If there is are any simple jobs in the queue, we have work to do.
-        if self._queue:
-            return True
-        # First, move new cron jobs from the scheduled queue into the cronjob
-        # list.
-        if now is None:
-            now = int(time.time())
-        while len(self._scheduledQueue) > 0:
-            job = self._scheduledQueue.pull()
-            if job.status is not interfaces.CANCELLED:
-                self._insertCronJob(job, now)
-        # Now get all jobs that should be done now or earlier; if there are
-        # any that do not have errors or are cancelled, then we have jobs to
-        # do.
-        for key in self._scheduledJobs.keys(max=now):
-            jobs = [job for job in self._scheduledJobs[key]
-                    if job.status not in (interfaces.CANCELLED,
-                                          interfaces.ERROR)]
-            if jobs:
-                return True
-        return False
-
-    def claimNextJob(self, now=None):
-        job = self._pullJob(now)
-        return job and job.id or None
-
-    def processNext(self, now=None, jobid=None):
-        """
-        process next job in the queue
-        """
-        if jobid is None:
-            job = self._pullJob(now)
-        else:
-            job = self.jobs[jobid]
-        if job is None:
-            return False
-        if job.status == interfaces.COMPLETED:
-            return True
-        try:
-            jobtask = component.getUtility(self.taskInterface, name=job.task)
-        except ComponentLookupError, error:
-            log.error('Task "%s" not found!' % job.task)
-            log.exception(error)
-            job.error = error
-            if job.status != interfaces.CRONJOB:
-                job.status = interfaces.ERROR
-            return True
-        job.started = datetime.datetime.now()
-        try:
-            job.output = jobtask(self, job.id, job.input)
-            if job.status != interfaces.CRONJOB:
-                job.status = interfaces.COMPLETED
-        except task.TaskError, error:
-            job.error = error
-            if job.status != interfaces.CRONJOB:
-                job.status = interfaces.ERROR
-        except Exception, error:
-            job.error = error
-            if job.status != interfaces.CRONJOB:
-                job.status = interfaces.ERROR
-        job.completed = datetime.datetime.now()
-        return True
-
-    def _pullJob(self, now=None):
-        # first move new cron jobs from the scheduled queue into the cronjob
-        # list
-        if now is None:
-            now = int(time.time())
-        while len(self._scheduledQueue) > 0:
-            job = self._scheduledQueue.pull()
-            if job.status is not interfaces.CANCELLED:
-                self._insertCronJob(job, now)
-        # try to get the next cron job
-        while True:
-            try:
-                first = self._scheduledJobs.minKey()
-            except ValueError:
-                break
-            else:
-                if first > now:
-                    break
-                jobs = self._scheduledJobs[first]
-                job = jobs[0]
-                self._scheduledJobs[first] = jobs[1:]
-                if len(self._scheduledJobs[first]) == 0:
-                    del self._scheduledJobs[first]
-                if (job.status != interfaces.CANCELLED
-                    and job.status != interfaces.ERROR):
-                    if job.status != interfaces.DELAYED:
-                        self._insertCronJob(job, now)
-                    return job
-        # get a job from the input queue
-        if self._queue:
-            return self._queue.pull()
-        return None
-
-    def _insertCronJob(self, job, now):
-        for callTime, scheduled in list(self._scheduledJobs.items()):
-            if job in scheduled:
-                scheduled = list(scheduled)
-                scheduled.remove(job)
-                if len(scheduled) == 0:
-                    del self._scheduledJobs[callTime]
-                else:
-                    self._scheduledJobs[callTime] = tuple(scheduled)
-                break
-        nextCallTime = job.timeOfNextCall(now)
-        job.scheduledFor = datetime.datetime.fromtimestamp(nextCallTime)
-        set = self._scheduledJobs.get(nextCallTime)
-        if set is None:
-            self._scheduledJobs[nextCallTime] = ()
-        jobs = self._scheduledJobs[nextCallTime]
-        self._scheduledJobs[nextCallTime] = jobs + (job,)
-
-    def _generateId(self):
-        """Generate an id which is not yet taken.
-
-        This tries to allocate sequential ids so they fall into the
-        same BTree bucket, and randomizes if it stumbles upon a
-        used one.
-        """
-        while True:
-            if self._v_nextid is None:
-                self._v_nextid = random.randrange(0, self.maxint)
-            uid = self._v_nextid
-            self._v_nextid += 1
-            if uid not in self.jobs:
-                return uid
-            self._v_nextid = None
-
-    def startProcessing(self):
-        """See interfaces.ITaskService"""
-        if self.__parent__ is None:
-            return
-        if self._scheduledJobs is None:
-            self._scheduledJobs = self.containerClass()
-        if self._scheduledQueue is None:
-            self._scheduledQueue = zc.queue.PersistentQueue()
-        # Create the path to the service within the DB.
-        servicePath = self.getServicePath()
-        log.info('starting service %s' % self._threadName())
-        # Start the thread running the processor inside.
-        processor = self.processorFactory(
-            self._p_jar.db(), servicePath, **self.processorArguments)
-        thread = threading.Thread(target=processor, name=self._threadName())
-        thread.setDaemon(True)
-        thread.running = True
-        thread.start()
-
-    def stopProcessing(self):
-        """See interfaces.ITaskService"""
-        if self.__name__ is None:
-            return
-        name = self._threadName()
-        log.info('stopping service %s' % name)
-        for thread in threading.enumerate():
-            if thread.getName() == name:
-                thread.running = False
-                break
-
-    def isProcessing(self):
-        """See interfaces.ITaskService"""
-        if self.__name__ is not None:
-            name = self._threadName()
-            for thread in threading.enumerate():
-                if thread.getName() == name:
-                    if thread.running:
-                        return True
-                    break
-        return False
-
-    def getServicePath(self):
-        raise NotImplemented
-
-    def _threadName(self):
-        """Return name of the processing thread."""
-        # This name isn't unique based on the path to self, but this doesn't
-        # change the name that's been used in past versions.
-        path = self.getServicePath()
-        path.append('remotetasks')
-        path.reverse()
-        path.append(self.__name__)
-        return '.'.join(path)
-
-
 class TaskService(BaseTaskService):
     containerClass = family32.IO.BTree
     maxint = family32.maxint



More information about the checkins mailing list