[Checkins] SVN: lovely.remotetask/trunk/src/lovely/remotetask/
Added cron jobs to the remote task.
Jürgen Kartnaller
juergen at kartnaller.at
Mon Apr 16 08:14:57 EDT 2007
Log message for revision 74174:
Added cron jobs to the remote task.
Changed:
U lovely.remotetask/trunk/src/lovely/remotetask/README.txt
U lovely.remotetask/trunk/src/lovely/remotetask/interfaces.py
U lovely.remotetask/trunk/src/lovely/remotetask/job.py
U lovely.remotetask/trunk/src/lovely/remotetask/service.py
-=-
Modified: lovely.remotetask/trunk/src/lovely/remotetask/README.txt
===================================================================
--- lovely.remotetask/trunk/src/lovely/remotetask/README.txt 2007-04-16 12:04:56 UTC (rev 74173)
+++ lovely.remotetask/trunk/src/lovely/remotetask/README.txt 2007-04-16 12:14:56 UTC (rev 74174)
@@ -3,8 +3,9 @@
=====================
This package provides an implementation of a remote task execution Web service
-that allows to execute pre-defined tasks on another server. Those services are
-useful in two ways:
+that allows to execute pre-defined tasks on another server. It is also
+possible to run cron jobs at specific times. Those services are useful in two
+ways:
1. They enable us to complete tasks that are not natively available on a
particular machine. For example, it is not possible to convert an AVI file
@@ -129,3 +130,131 @@
>>> sorted([job.status for job in service.jobs.values()])
['queued']
+
+
+Cron jobs
+---------
+
+Cron jobs execute on specific times.
+
+ >>> import time
+ >>> from lovely.remotetask.job import CronJob
+ >>> now = 0
+ >>> time.localtime(now)
+ (1970, 1, 1, 1, 0, 0, 3, 1, 0)
+
+We set up a job to be executed once an hour at the current minute. The next
+call time is the one our from now.
+
+Minutes
+
+ >>> cronJob = CronJob(-1, u'echo', (), minute=(0, 10))
+ >>> time.localtime(cronJob.timeOfNextCall(0))
+ (1970, 1, 1, 1, 10, 0, 3, 1, 0)
+ >>> time.localtime(cronJob.timeOfNextCall(10*60))
+ (1970, 1, 1, 2, 0, 0, 3, 1, 0)
+
+Hour
+
+ >>> cronJob = CronJob(-1, u'echo', (), hour=(2, 13))
+ >>> time.localtime(cronJob.timeOfNextCall(0))
+ (1970, 1, 1, 2, 0, 0, 3, 1, 0)
+ >>> time.localtime(cronJob.timeOfNextCall(2*60*60))
+ (1970, 1, 1, 13, 0, 0, 3, 1, 0)
+
+Month
+
+ >>> cronJob = CronJob(-1, u'echo', (), month=(1, 5, 12))
+ >>> time.localtime(cronJob.timeOfNextCall(0))
+ (1970, 5, 1, 1, 0, 0, 4, 121, 0)
+ >>> time.localtime(cronJob.timeOfNextCall(cronJob.timeOfNextCall(0)))
+ (1970, 12, 1, 1, 0, 0, 1, 335, 0)
+
+Day of week [0..6], jan 1 1970 is a wednesday.
+
+ >>> cronJob = CronJob(-1, u'echo', (), dayOfWeek=(0, 2, 4, 5))
+ >>> time.localtime(cronJob.timeOfNextCall(0))
+ (1970, 1, 2, 1, 0, 0, 4, 2, 0)
+ >>> time.localtime(cronJob.timeOfNextCall(60*60*24))
+ (1970, 1, 3, 1, 0, 0, 5, 3, 0)
+ >>> time.localtime(cronJob.timeOfNextCall(2*60*60*24))
+ (1970, 1, 5, 1, 0, 0, 0, 5, 0)
+ >>> time.localtime(cronJob.timeOfNextCall(4*60*60*24))
+ (1970, 1, 7, 1, 0, 0, 2, 7, 0)
+
+DayOfMonth [1..31]
+
+ >>> cronJob = CronJob(-1, u'echo', (), dayOfMonth=(1, 12, 21, 30))
+ >>> time.localtime(cronJob.timeOfNextCall(0))
+ (1970, 1, 12, 1, 0, 0, 0, 12, 0)
+ >>> time.localtime(cronJob.timeOfNextCall(12*24*60*60))
+ (1970, 1, 21, 1, 0, 0, 2, 21, 0)
+
+Combined
+
+ >>> cronJob = CronJob(-1, u'echo', (), minute=(10,),
+ ... dayOfMonth=(1, 12, 21, 30))
+ >>> time.localtime(cronJob.timeOfNextCall(0))
+ (1970, 1, 1, 1, 10, 0, 3, 1, 0)
+ >>> time.localtime(cronJob.timeOfNextCall(10*60))
+ (1970, 1, 1, 2, 10, 0, 3, 1, 0)
+
+ >>> cronJob = CronJob(-1, u'echo', (), minute=(10,),
+ ... hour=(4,),
+ ... dayOfMonth=(1, 12, 21, 30))
+ >>> time.localtime(cronJob.timeOfNextCall(0))
+ (1970, 1, 1, 4, 10, 0, 3, 1, 0)
+ >>> time.localtime(cronJob.timeOfNextCall(10*60))
+ (1970, 1, 1, 4, 10, 0, 3, 1, 0)
+
+
+Creating Cron Jobs
+------------------
+
+
+ >>> count = 0
+ >>> def counting(input):
+ ... global count
+ ... count += 1
+ ... return count
+ >>> countingTask = remotetask.task.SimpleTask(counting)
+ >>> zope.component.provideUtility(countingTask, name='counter')
+
+here we create a cron job which runs 10 minutes and 13 minutes past the hour.
+
+ >>> jobid = service.addCronJob(u'counter',
+ ... {'foo': 'bar'},
+ ... minute = (10, 13),
+ ... )
+ >>> service.getStatus(jobid)
+ 'cronjob'
+
+We process the remote task but our cron job is not executed because we are too
+early in time.
+
+ >>> service.process(0)
+ >>> service.getStatus(jobid)
+ 'cronjob'
+ >>> service.getResult(jobid) is None
+ True
+
+Now we run the remote task 10 minutes later and get a result.
+
+ >>> service.process(10*60)
+ >>> service.getStatus(jobid)
+ 'cronjob'
+ >>> service.getResult(jobid)
+ 1
+
+And 1 minutes later it is not called.
+
+ >>> service.process(11*60)
+ >>> service.getResult(jobid)
+ 1
+
+But 3 minutes later it is called again.
+
+ >>> service.process(13*60)
+ >>> service.getResult(jobid)
+ 2
+
Modified: lovely.remotetask/trunk/src/lovely/remotetask/interfaces.py
===================================================================
--- lovely.remotetask/trunk/src/lovely/remotetask/interfaces.py 2007-04-16 12:04:56 UTC (rev 74173)
+++ lovely.remotetask/trunk/src/lovely/remotetask/interfaces.py 2007-04-16 12:14:56 UTC (rev 74174)
@@ -26,6 +26,7 @@
CANCELLED = 'cancelled'
ERROR = 'error'
COMPLETED = 'completed'
+CRONJOB = 'cronjob'
class ITaskService(IContained):
"""A service for managing and executing long-running, remote tasks."""
@@ -45,9 +46,18 @@
arguments for the task.
"""
+ def addCronJob(task, input,
+ minute=(),
+ hour=(),
+ dayOfMonth=(),
+ month=(),
+ dayOfWeek=(),
+ ):
+ """Add a new cron job."""
+
def clean(stati=[CANCELLED, ERROR, COMPLETED]):
"""removes all jobs which are completed or canceled or have errors."""
-
+
def cancel(jobid):
"""Cancel a particular job."""
@@ -131,7 +141,7 @@
status = zope.schema.Choice(
title=u'Status',
description=u'The current status of the job.',
- values=[QUEUED, PROCESSING, CANCELLED, ERROR, COMPLETED],
+ values=[QUEUED, PROCESSING, CANCELLED, ERROR, COMPLETED, CRONJOB],
required=True)
input = zope.schema.Object(
@@ -166,3 +176,38 @@
completed = zope.schema.Datetime(
title=u'Completion Date',
description=u'The date/time at which the job was completed.')
+
+
+class ICron(zope.interface.Interface):
+ """Parameters for cron jobs"""
+
+ minute = zope.schema.Tuple(
+ title=u'minute(s)',
+ default=(),
+ required=False
+ )
+
+ hour = zope.schema.Tuple(
+ title=u'hour(s)',
+ default=(),
+ required=False
+ )
+
+ dayOfMonth = zope.schema.Tuple(
+ title=u'day of month',
+ default=(),
+ required=False
+ )
+
+ month = zope.schema.Tuple(
+ title=u'month(s)',
+ default=(),
+ required=False
+ )
+
+ dayOfWeek = zope.schema.Tuple(
+ title=u'day of week',
+ default=(),
+ required=False
+ )
+
Modified: lovely.remotetask/trunk/src/lovely/remotetask/job.py
===================================================================
--- lovely.remotetask/trunk/src/lovely/remotetask/job.py 2007-04-16 12:04:56 UTC (rev 74173)
+++ lovely.remotetask/trunk/src/lovely/remotetask/job.py 2007-04-16 12:14:56 UTC (rev 74174)
@@ -17,6 +17,7 @@
"""
__docformat__ = 'restructuredtext'
+import time
import datetime
import persistent
import zope.interface
@@ -46,3 +47,82 @@
def __repr__(self):
return '<%s %r>' %(self.__class__.__name__, self.id)
+
+
+class CronJob(Job):
+ """A job for reocuring tasks"""
+ zope.interface.implements(interfaces.ICron)
+
+ minute = FieldProperty(interfaces.ICron['minute'])
+ hour = FieldProperty(interfaces.ICron['hour'])
+ dayOfMonth = FieldProperty(interfaces.ICron['dayOfMonth'])
+ month = FieldProperty(interfaces.ICron['month'])
+ dayOfWeek = FieldProperty(interfaces.ICron['dayOfWeek'])
+
+ _lastExecutionTime = None
+
+ def __init__(self, id, task, input,
+ minute=(),
+ hour=(),
+ dayOfMonth=(),
+ month=(),
+ dayOfWeek=(),
+ ):
+ super(CronJob, self).__init__(id, task, input)
+ self.minute = minute
+ self.hour = hour
+ self.dayOfMonth = dayOfMonth
+ self.month = month
+ self.dayOfWeek = dayOfWeek
+
+ def timeOfNextCall(self, now=None):
+ if now is None:
+ now = time.time()
+ next = now
+ inc = lambda t: 60
+ lnow = list(time.localtime(now)[:5])
+ if self.minute:
+ pass
+ elif self.hour:
+ inc = lambda t: 60*60
+ lnow = lnow[:4]
+ elif self.dayOfMonth:
+ inc = lambda t: 24*60*60
+ lnow = lnow[:3]
+ elif self.dayOfWeek:
+ inc = lambda t: 24*60*60
+ lnow = lnow[:3]
+ elif self.month:
+ mlen = (31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31)
+ def minc(t):
+ m = time.localtime(t)[1] - 1
+ if m == 1:
+ # see if we have a leap year
+ y = time.localtime(t)[0]
+ if y % 4 != 0:
+ d = 28
+ elif y % 400 == 0:
+ d = 29
+ elif y % 100 == 0:
+ d = 28
+ else:
+ d = 29
+ return d*24*60*60
+ return mlen[m]*24*60*60
+ inc = minc
+ lnow = lnow[:3]
+ lnow[2] = 1
+ while len(lnow)<9:
+ lnow.append(0)
+ while next <= now+365*24*60*60:
+ next += inc(next)
+ fields = time.localtime(next)
+ if ((self.month and fields[1] not in self.month) or
+ (self.dayOfMonth and fields[2] not in self.dayOfMonth) or
+ (self.dayOfWeek and fields[6] % 7 not in self.dayOfWeek) or
+ (self.hour and fields[3] not in self.hour) or
+ (self.minute and fields[4] not in self.minute)):
+ continue
+ return int(next)
+ return None
+
Modified: lovely.remotetask/trunk/src/lovely/remotetask/service.py
===================================================================
--- lovely.remotetask/trunk/src/lovely/remotetask/service.py 2007-04-16 12:04:56 UTC (rev 74173)
+++ lovely.remotetask/trunk/src/lovely/remotetask/service.py 2007-04-16 12:14:56 UTC (rev 74174)
@@ -27,6 +27,7 @@
import zope.publisher.base
import zope.publisher.publish
from BTrees.IOBTree import IOBTree
+from BTrees.IFBTree import IFTreeSet
from zope.security.proxy import removeSecurityProxy
from zope.traversing.api import traverse
from zope.app import zapi
@@ -49,6 +50,8 @@
self._counter = 1
self.jobs = IOBTree()
self._queue = zc.queue.PersistentQueue()
+ self._scheduledJobs = IOBTree()
+ self._scheduledQueue = zc.queue.PersistentQueue()
def getAvailableTasks(self):
"""See interfaces.ITaskService"""
@@ -66,11 +69,27 @@
newjob.status = interfaces.QUEUED
return jobid
- def clean(self, stati=[interfaces.CANCELLED, interfaces.ERROR,
- interfaces.COMPLETED]):
+ def addCronJob(self, task, input=None,
+ minute=(),
+ hour=(),
+ dayOfMonth=(),
+ month=(),
+ dayOfWeek=(),
+ ):
+ jobid = self._counter
+ self._counter += 1
+ newjob = job.CronJob(jobid, task, input,
+ minute, hour, dayOfMonth, month, dayOfWeek)
+ self.jobs[jobid] = newjob
+ self._scheduledQueue.put(newjob)
+ newjob.status = interfaces.CRONJOB
+ return jobid
+
+ def clean(self, stati=[interfaces.CANCELLED, interfaces.ERROR,
+ interfaces.COMPLETED]):
"""See interfaces.ITaskService"""
- allowed = [interfaces.CANCELLED, interfaces.ERROR,
- interfaces.COMPLETED]
+ allowed = [interfaces.CANCELLED, interfaces.ERROR,
+ interfaces.COMPLETED]
for key in list(self.jobs.keys()):
job = self.jobs[key]
if job.status in stati:
@@ -130,25 +149,67 @@
return True
return False
- def processNext(self):
- job = self._queue.pull()
+ def processNext(self, now=None):
+ job = self._pullJob(now)
+ if job is None:
+ return False
jobtask = zope.component.getUtility(
- self.taskInterface, name=job.task)
+ self.taskInterface, name=job.task)
job.started = datetime.datetime.now()
try:
job.output = jobtask(self, job.id, job.input)
- job.status = interfaces.COMPLETED
+ if job.status != interfaces.CRONJOB:
+ job.status = interfaces.COMPLETED
except task.TaskError, error:
job.error = error
- job.status = interfaces.ERROR
+ if job.status != interfaces.CRONJOB:
+ job.status = interfaces.ERROR
job.completed = datetime.datetime.now()
+ return True
- def process(self):
+ def process(self, now=None):
"""See interfaces.ITaskService"""
- while self._queue:
- self.processNext()
+ while self.processNext(now):
+ pass
+ def _pullJob(self, now=None):
+ # first move new cron jobs from the scheduled queue into the cronjob
+ # list
+ if now is None:
+ now = time.time()
+ while len(self._scheduledQueue)>0:
+ job = self._scheduledQueue.pull()
+ if job.status is not interfaces.CANCELLED:
+ self._insertCronJob(job, now)
+ while True:
+ try:
+ first = self._scheduledJobs.minKey()
+ except ValueError:
+ break
+ else:
+ if first > now:
+ break
+ jobs = self._scheduledJobs[first]
+ job = jobs[0]
+ self._scheduledJobs[first] = jobs[1:]
+ if len(self._scheduledJobs[first]) == 0:
+ del self._scheduledJobs[first]
+ self._insertCronJob(job, now)
+ if job.status != interfaces.CANCELLED:
+ return job
+ if self._queue:
+ return self._queue.pull()
+ return None
+ def _insertCronJob(self, job, now):
+ nextCallTime = job.timeOfNextCall(now)
+ set = self._scheduledJobs.get(nextCallTime)
+ if set is None:
+ self._scheduledJobs[nextCallTime] = ()
+ jobs = self._scheduledJobs[nextCallTime]
+ self._scheduledJobs[nextCallTime] = jobs + (job,)
+
+
class ProcessorPublication(ZopePublication):
"""A custom publication to process the next job."""
More information about the Checkins
mailing list