[Checkins] SVN: lovely.remotetask/trunk/src/lovely/remotetask/ Added cron jobs to the remote task.

Jürgen Kartnaller juergen at kartnaller.at
Mon Apr 16 08:14:57 EDT 2007


Log message for revision 74174:
  Added cron jobs to the remote task.
  

Changed:
  U   lovely.remotetask/trunk/src/lovely/remotetask/README.txt
  U   lovely.remotetask/trunk/src/lovely/remotetask/interfaces.py
  U   lovely.remotetask/trunk/src/lovely/remotetask/job.py
  U   lovely.remotetask/trunk/src/lovely/remotetask/service.py

-=-
Modified: lovely.remotetask/trunk/src/lovely/remotetask/README.txt
===================================================================
--- lovely.remotetask/trunk/src/lovely/remotetask/README.txt	2007-04-16 12:04:56 UTC (rev 74173)
+++ lovely.remotetask/trunk/src/lovely/remotetask/README.txt	2007-04-16 12:14:56 UTC (rev 74174)
@@ -3,8 +3,9 @@
 =====================
 
 This package provides an implementation of a remote task execution Web service
-that allows to execute pre-defined tasks on another server. Those services are
-useful in two ways:
+that allows to execute pre-defined tasks on another server. It is also
+possible to run cron jobs at specific times. Those services are useful in two
+ways:
 
 1. They enable us to complete tasks that are not natively available on a
    particular machine. For example, it is not possible to convert an AVI file
@@ -129,3 +130,131 @@
 
   >>> sorted([job.status for job in service.jobs.values()])
   ['queued']
+
+
+Cron jobs
+---------
+
+Cron jobs execute on specific times.
+
+  >>> import time
+  >>> from lovely.remotetask.job import CronJob
+  >>> now = 0
+  >>> time.localtime(now)
+  (1970, 1, 1, 1, 0, 0, 3, 1, 0)
+
+We set up a job to be executed once an hour at the current minute. The next
+call time is the one our from now.
+
+Minutes
+
+  >>> cronJob = CronJob(-1, u'echo', (), minute=(0, 10))
+  >>> time.localtime(cronJob.timeOfNextCall(0))
+  (1970, 1, 1, 1, 10, 0, 3, 1, 0)
+  >>> time.localtime(cronJob.timeOfNextCall(10*60))
+  (1970, 1, 1, 2, 0, 0, 3, 1, 0)
+
+Hour
+
+  >>> cronJob = CronJob(-1, u'echo', (), hour=(2, 13))
+  >>> time.localtime(cronJob.timeOfNextCall(0))
+  (1970, 1, 1, 2, 0, 0, 3, 1, 0)
+  >>> time.localtime(cronJob.timeOfNextCall(2*60*60))
+  (1970, 1, 1, 13, 0, 0, 3, 1, 0)
+
+Month
+
+  >>> cronJob = CronJob(-1, u'echo', (), month=(1, 5, 12))
+  >>> time.localtime(cronJob.timeOfNextCall(0))
+  (1970, 5, 1, 1, 0, 0, 4, 121, 0)
+  >>> time.localtime(cronJob.timeOfNextCall(cronJob.timeOfNextCall(0)))
+  (1970, 12, 1, 1, 0, 0, 1, 335, 0)
+
+Day of week [0..6], jan 1 1970 is a wednesday.
+
+  >>> cronJob = CronJob(-1, u'echo', (), dayOfWeek=(0, 2, 4, 5))
+  >>> time.localtime(cronJob.timeOfNextCall(0))
+  (1970, 1, 2, 1, 0, 0, 4, 2, 0)
+  >>> time.localtime(cronJob.timeOfNextCall(60*60*24))
+  (1970, 1, 3, 1, 0, 0, 5, 3, 0)
+  >>> time.localtime(cronJob.timeOfNextCall(2*60*60*24))
+  (1970, 1, 5, 1, 0, 0, 0, 5, 0)
+  >>> time.localtime(cronJob.timeOfNextCall(4*60*60*24))
+  (1970, 1, 7, 1, 0, 0, 2, 7, 0)
+
+DayOfMonth [1..31]
+
+  >>> cronJob = CronJob(-1, u'echo', (), dayOfMonth=(1, 12, 21, 30))
+  >>> time.localtime(cronJob.timeOfNextCall(0))
+  (1970, 1, 12, 1, 0, 0, 0, 12, 0)
+  >>> time.localtime(cronJob.timeOfNextCall(12*24*60*60))
+  (1970, 1, 21, 1, 0, 0, 2, 21, 0)
+
+Combined
+
+  >>> cronJob = CronJob(-1, u'echo', (), minute=(10,),
+  ...                                 dayOfMonth=(1, 12, 21, 30))
+  >>> time.localtime(cronJob.timeOfNextCall(0))
+  (1970, 1, 1, 1, 10, 0, 3, 1, 0)
+  >>> time.localtime(cronJob.timeOfNextCall(10*60))
+  (1970, 1, 1, 2, 10, 0, 3, 1, 0)
+
+  >>> cronJob = CronJob(-1, u'echo', (), minute=(10,),
+  ...                                 hour=(4,),
+  ...                                 dayOfMonth=(1, 12, 21, 30))
+  >>> time.localtime(cronJob.timeOfNextCall(0))
+  (1970, 1, 1, 4, 10, 0, 3, 1, 0)
+  >>> time.localtime(cronJob.timeOfNextCall(10*60))
+  (1970, 1, 1, 4, 10, 0, 3, 1, 0)
+
+
+Creating Cron Jobs
+------------------
+
+
+  >>> count = 0
+  >>> def counting(input):
+  ...     global count
+  ...     count += 1
+  ...     return count
+  >>> countingTask = remotetask.task.SimpleTask(counting)
+  >>> zope.component.provideUtility(countingTask, name='counter')
+
+here we create a cron job which runs 10 minutes and 13 minutes past the hour.
+
+  >>> jobid = service.addCronJob(u'counter',
+  ...                            {'foo': 'bar'},
+  ...                            minute = (10, 13),
+  ...                           )
+  >>> service.getStatus(jobid)
+  'cronjob'
+
+We process the remote task but our cron job is not executed because we are too
+early in time.
+
+  >>> service.process(0)
+  >>> service.getStatus(jobid)
+  'cronjob'
+  >>> service.getResult(jobid) is None
+  True
+
+Now we run the remote task 10 minutes later and get a result.
+
+  >>> service.process(10*60)
+  >>> service.getStatus(jobid)
+  'cronjob'
+  >>> service.getResult(jobid)
+  1
+
+And 1 minutes later it is not called.
+
+  >>> service.process(11*60)
+  >>> service.getResult(jobid)
+  1
+
+But 3 minutes later it is called again.
+
+  >>> service.process(13*60)
+  >>> service.getResult(jobid)
+  2
+

Modified: lovely.remotetask/trunk/src/lovely/remotetask/interfaces.py
===================================================================
--- lovely.remotetask/trunk/src/lovely/remotetask/interfaces.py	2007-04-16 12:04:56 UTC (rev 74173)
+++ lovely.remotetask/trunk/src/lovely/remotetask/interfaces.py	2007-04-16 12:14:56 UTC (rev 74174)
@@ -26,6 +26,7 @@
 CANCELLED = 'cancelled'
 ERROR = 'error'
 COMPLETED = 'completed'
+CRONJOB = 'cronjob'
 
 class ITaskService(IContained):
     """A service for managing and executing long-running, remote tasks."""
@@ -45,9 +46,18 @@
         arguments for the task.
         """
 
+    def addCronJob(task, input,
+                   minute=(),
+                   hour=(),
+                   dayOfMonth=(),
+                   month=(),
+                   dayOfWeek=(),
+                  ):
+        """Add a new cron job."""
+
     def clean(stati=[CANCELLED, ERROR, COMPLETED]):
         """removes all jobs which are completed or canceled or have errors."""
-        
+
     def cancel(jobid):
         """Cancel a particular job."""
 
@@ -131,7 +141,7 @@
     status = zope.schema.Choice(
         title=u'Status',
         description=u'The current status of the job.',
-        values=[QUEUED, PROCESSING, CANCELLED, ERROR, COMPLETED],
+        values=[QUEUED, PROCESSING, CANCELLED, ERROR, COMPLETED, CRONJOB],
         required=True)
 
     input = zope.schema.Object(
@@ -166,3 +176,38 @@
     completed = zope.schema.Datetime(
         title=u'Completion Date',
         description=u'The date/time at which the job was completed.')
+
+
+class ICron(zope.interface.Interface):
+    """Parameters for cron jobs"""
+
+    minute = zope.schema.Tuple(
+            title=u'minute(s)',
+            default=(),
+            required=False
+            )
+
+    hour = zope.schema.Tuple(
+            title=u'hour(s)',
+            default=(),
+            required=False
+            )
+
+    dayOfMonth = zope.schema.Tuple(
+            title=u'day of month',
+            default=(),
+            required=False
+            )
+
+    month = zope.schema.Tuple(
+            title=u'month(s)',
+            default=(),
+            required=False
+            )
+
+    dayOfWeek = zope.schema.Tuple(
+            title=u'day of week',
+            default=(),
+            required=False
+            )
+

Modified: lovely.remotetask/trunk/src/lovely/remotetask/job.py
===================================================================
--- lovely.remotetask/trunk/src/lovely/remotetask/job.py	2007-04-16 12:04:56 UTC (rev 74173)
+++ lovely.remotetask/trunk/src/lovely/remotetask/job.py	2007-04-16 12:14:56 UTC (rev 74174)
@@ -17,6 +17,7 @@
 """
 __docformat__ = 'restructuredtext'
 
+import time
 import datetime
 import persistent
 import zope.interface
@@ -46,3 +47,82 @@
 
     def __repr__(self):
         return '<%s %r>' %(self.__class__.__name__, self.id)
+
+
+class CronJob(Job):
+    """A job for reocuring tasks"""
+    zope.interface.implements(interfaces.ICron)
+
+    minute = FieldProperty(interfaces.ICron['minute'])
+    hour = FieldProperty(interfaces.ICron['hour'])
+    dayOfMonth = FieldProperty(interfaces.ICron['dayOfMonth'])
+    month = FieldProperty(interfaces.ICron['month'])
+    dayOfWeek = FieldProperty(interfaces.ICron['dayOfWeek'])
+
+    _lastExecutionTime = None
+
+    def __init__(self, id, task, input,
+                 minute=(),
+                 hour=(),
+                 dayOfMonth=(),
+                 month=(),
+                 dayOfWeek=(),
+                ):
+        super(CronJob, self).__init__(id, task, input)
+        self.minute = minute
+        self.hour = hour
+        self.dayOfMonth = dayOfMonth
+        self.month = month
+        self.dayOfWeek = dayOfWeek
+
+    def timeOfNextCall(self, now=None):
+        if now is None:
+            now = time.time()
+        next = now
+        inc = lambda t: 60
+        lnow = list(time.localtime(now)[:5])
+        if self.minute:
+            pass
+        elif self.hour:
+            inc = lambda t: 60*60
+            lnow = lnow[:4]
+        elif self.dayOfMonth:
+            inc = lambda t: 24*60*60
+            lnow = lnow[:3]
+        elif self.dayOfWeek:
+            inc = lambda t: 24*60*60
+            lnow = lnow[:3]
+        elif self.month:
+            mlen = (31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31)
+            def minc(t):
+                m = time.localtime(t)[1] - 1
+                if m == 1:
+                    # see if we have a leap year
+                    y = time.localtime(t)[0]
+                    if y % 4 != 0:
+                        d = 28
+                    elif y % 400 == 0:
+                        d = 29
+                    elif y % 100 == 0:
+                        d = 28
+                    else:
+                        d = 29
+                    return d*24*60*60
+                return mlen[m]*24*60*60
+            inc = minc
+            lnow = lnow[:3]
+            lnow[2] = 1
+        while len(lnow)<9:
+            lnow.append(0)
+        while next <= now+365*24*60*60:
+            next += inc(next)
+            fields = time.localtime(next)
+            if ((self.month and fields[1] not in self.month) or
+                (self.dayOfMonth and fields[2] not in self.dayOfMonth) or
+                (self.dayOfWeek and fields[6] % 7 not in self.dayOfWeek) or
+                (self.hour and fields[3] not in self.hour) or
+                (self.minute and fields[4] not in self.minute)):
+                continue
+            return int(next)
+        return None
+

Modified: lovely.remotetask/trunk/src/lovely/remotetask/service.py
===================================================================
--- lovely.remotetask/trunk/src/lovely/remotetask/service.py	2007-04-16 12:04:56 UTC (rev 74173)
+++ lovely.remotetask/trunk/src/lovely/remotetask/service.py	2007-04-16 12:14:56 UTC (rev 74174)
@@ -27,6 +27,7 @@
 import zope.publisher.base
 import zope.publisher.publish
 from BTrees.IOBTree import IOBTree
+from BTrees.IFBTree import IFTreeSet
 from zope.security.proxy import removeSecurityProxy
 from zope.traversing.api import traverse
 from zope.app import zapi
@@ -49,6 +50,8 @@
         self._counter = 1
         self.jobs = IOBTree()
         self._queue = zc.queue.PersistentQueue()
+        self._scheduledJobs = IOBTree()
+        self._scheduledQueue = zc.queue.PersistentQueue()
 
     def getAvailableTasks(self):
         """See interfaces.ITaskService"""
@@ -66,11 +69,27 @@
         newjob.status = interfaces.QUEUED
         return jobid
 
-    def clean(self, stati=[interfaces.CANCELLED, interfaces.ERROR, 
-        interfaces.COMPLETED]):
+    def addCronJob(self, task, input=None,
+                   minute=(),
+                   hour=(),
+                   dayOfMonth=(),
+                   month=(),
+                   dayOfWeek=(),
+                  ):
+        jobid = self._counter
+        self._counter += 1
+        newjob = job.CronJob(jobid, task, input,
+                minute, hour, dayOfMonth, month, dayOfWeek)
+        self.jobs[jobid] = newjob
+        self._scheduledQueue.put(newjob)
+        newjob.status = interfaces.CRONJOB
+        return jobid
+
+    def clean(self, stati=[interfaces.CANCELLED, interfaces.ERROR,
+                           interfaces.COMPLETED]):
         """See interfaces.ITaskService"""
-        allowed = [interfaces.CANCELLED, interfaces.ERROR, 
-            interfaces.COMPLETED]
+        allowed = [interfaces.CANCELLED, interfaces.ERROR,
+                   interfaces.COMPLETED]
         for key in list(self.jobs.keys()):
             job = self.jobs[key]
             if job.status in stati:
@@ -130,25 +149,67 @@
                     return True
         return False
 
-    def processNext(self):
-        job = self._queue.pull()
+    def processNext(self, now=None):
+        job = self._pullJob(now)
+        if job is None:
+            return False
         jobtask = zope.component.getUtility(
-            self.taskInterface, name=job.task)
+                        self.taskInterface, name=job.task)
         job.started = datetime.datetime.now()
         try:
             job.output = jobtask(self, job.id, job.input)
-            job.status = interfaces.COMPLETED
+            if job.status != interfaces.CRONJOB:
+                job.status = interfaces.COMPLETED
         except task.TaskError, error:
             job.error = error
-            job.status = interfaces.ERROR
+            if job.status != interfaces.CRONJOB:
+                job.status = interfaces.ERROR
         job.completed = datetime.datetime.now()
+        return True
 
-    def process(self):
+    def process(self, now=None):
         """See interfaces.ITaskService"""
-        while self._queue:
-            self.processNext()
+        while self.processNext(now):
+            pass
 
+    def _pullJob(self, now=None):
+        # first move new cron jobs from the scheduled queue into the cronjob
+        # list
+        if now is None:
+            now = time.time()
+        while len(self._scheduledQueue)>0:
+            job = self._scheduledQueue.pull()
+            if job.status is not interfaces.CANCELLED:
+                self._insertCronJob(job, now)
+        while True:
+            try:
+                first = self._scheduledJobs.minKey()
+            except ValueError:
+                break
+            else:
+                if first > now:
+                    break
+                jobs = self._scheduledJobs[first]
+                job = jobs[0]
+                self._scheduledJobs[first] = jobs[1:]
+                if len(self._scheduledJobs[first]) == 0:
+                    del self._scheduledJobs[first]
+                self._insertCronJob(job, now)
+                if job.status != interfaces.CANCELLED:
+                    return job
+        if self._queue:
+            return self._queue.pull()
+        return None
 
+    def _insertCronJob(self, job, now):
+        nextCallTime = job.timeOfNextCall(now)
+        set = self._scheduledJobs.get(nextCallTime)
+        if set is None:
+            self._scheduledJobs[nextCallTime] = ()
+        jobs = self._scheduledJobs[nextCallTime]
+        self._scheduledJobs[nextCallTime] = jobs + (job,)
+
+
 class ProcessorPublication(ZopePublication):
     """A custom publication to process the next job."""
 



More information about the Checkins mailing list