[Checkins] SVN: lovely.remotetask/branches/gotcha-z3c-taskqueue/src/lovely/remotetask/ replace with extracted code
Godefroid Chapelle
gotcha at bubblenet.be
Wed Apr 21 15:26:35 EDT 2010
Log message for revision 111222:
replace with extracted code
Changed:
U lovely.remotetask/branches/gotcha-z3c-taskqueue/src/lovely/remotetask/interfaces.py
U lovely.remotetask/branches/gotcha-z3c-taskqueue/src/lovely/remotetask/service.py
-=-
Modified: lovely.remotetask/branches/gotcha-z3c-taskqueue/src/lovely/remotetask/interfaces.py
===================================================================
--- lovely.remotetask/branches/gotcha-z3c-taskqueue/src/lovely/remotetask/interfaces.py 2010-04-21 19:25:19 UTC (rev 111221)
+++ lovely.remotetask/branches/gotcha-z3c-taskqueue/src/lovely/remotetask/interfaces.py 2010-04-21 19:26:35 UTC (rev 111222)
@@ -11,299 +11,4 @@
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
-"""Task Service Interfaces
-
-$Id$
-"""
-__docformat__ = 'restructuredtext'
-from zope import interface
-from zope import schema
-from zope.configuration import fields
-from zope.app.container.interfaces import IContained
-
-QUEUED = 'queued'
-PROCESSING = 'processing'
-CANCELLED = 'cancelled'
-ERROR = 'error'
-COMPLETED = 'completed'
-DELAYED = 'delayed'
-CRONJOB = 'cronjob'
-DELAYED = 'delayed'
-STARTLATER = 'start later'
-
-
-class ITask(interface.Interface):
- """A task available in the task service"""
-
- inputSchema = schema.Object(
- title=u'Input Schema',
- description=u'A schema describing the task input signature.',
- schema=interface.Interface,
- required=False)
-
- outputSchema = schema.Object(
- title=u'Output Schema',
- description=u'A schema describing the task output signature.',
- schema=interface.Interface,
- required=False)
-
- def __call__(service, jobid, input):
- """Execute the task.
-
- The ``service`` argument is the task service object. It allows access to
- service wide data and the system as a whole.
-
- Tasks do not live in a vacuum, but are tightly coupled to the job
- executing it. The ``jobid`` argument provides the id of the job being
- processed.
-
- The ``input`` object must conform to the input schema (if
- specified). The return value must conform to the output schema.
- """
-
-
-class ITaskService(IContained):
- """A service for managing and executing long-running, remote tasks."""
-
- jobs = schema.Object(
- title=u'Jobs',
- description=u'A mapping of all jobs by job id.',
- schema=interface.common.mapping.IMapping)
-
- taskInterface = fields.GlobalInterface(
- title = u'Task Interface',
- description = u'The interface to lookup task utilities',
- default = ITask,
- )
-
- #processor = schema.Field(
- # title = u'Processor',
- # description = u'A callable that processes queued jobs using '
- # u'an infinite loop.',
- # )
-
- def getAvailableTasks():
- """Return a mapping of task name to the task."""
-
- def add(task, input=None, startLater=False):
- """Add a new job for the specified task.
-
- * task argument is a string specifying the task.
- * input are arguments for the task.
- * startLater, if True job will be added (gets a jobid) but needs
- to be started with startJob later
- """
-
- def addCronJob(task, input,
- minute=(),
- hour=(),
- dayOfMonth=(),
- month=(),
- dayOfWeek=(),
- ):
- """Add a new cron job."""
-
- def startJob(jobid):
- """Start a job previously added job with add(..., startLater=True)
- """
-
- def reschedule(jobid):
- """Rescheudle a cron job.
-
- This is neccessary if the cron jobs parameters are changed.
- """
-
- def clean(status=[CANCELLED, ERROR, COMPLETED]):
- """removes all jobs which are completed or canceled or have errors."""
-
- def cancel(jobid):
- """Cancel a particular job."""
-
- def getStatus(jobid):
- """Get the status of a job."""
-
- def getResult(jobid):
- """Get the result data structure of the job."""
-
- def getError(jobid):
- """Get the error of the job."""
-
- def hasJobsWaiting(now=None):
- """Determine whether there are jobs that need to be processed.
-
- Returns a simple boolean.
- """
-
- def processNext():
- """Process the next job in the queue."""
-
- def process():
- """Process all scheduled jobs.
-
- This call blocks the thread it is running in.
- """
-
- def startProcessing():
- """Start processing jobs.
-
- This method has to be called after every server restart.
- """
-
- def stopProcessing():
- """Stop processing jobs."""
-
- def isProcessing():
- """Check whether the jobs are being processed.
-
- Return a boolean representing the state.
- """
-
-
-class IJob(interface.Interface):
- """An internal job object."""
-
- id = schema.Int(
- title=u'Id',
- description=u'The job id.',
- required=True)
-
- task = schema.TextLine(
- title=u'Task',
- description=u'The task to be completed.',
- required=True)
-
- status = schema.Choice(
- title=u'Status',
- description=u'The current status of the job.',
- values=[QUEUED, PROCESSING, CANCELLED, ERROR,
- COMPLETED, DELAYED, CRONJOB, STARTLATER],
- required=True)
-
- input = schema.Object(
- title=u'Input',
- description=u'The input for the task.',
- schema=interface.Interface,
- required=False)
-
- output = schema.Object(
- title=u'Output',
- description=u'The output of the task.',
- schema=interface.Interface,
- required=False,
- default=None)
-
- error = schema.Object(
- title=u'Error',
- description=u'The error object when the task failed.',
- schema=interface.Interface,
- required=False,
- default=None)
-
- created = schema.Datetime(
- title=u'Creation Date',
- description=u'The date/time at which the job was created.',
- required=True)
-
- started = schema.Datetime(
- title=u'Start Date',
- description=u'The date/time at which the job was started.')
-
- completed = schema.Datetime(
- title=u'Completion Date',
- description=u'The date/time at which the job was completed.')
-
-
-class ICronJob(IJob):
- """Parameters for cron jobs"""
-
- minute = schema.Tuple(
- title=u'minute(s)',
- default=(),
- required=False
- )
-
- hour = schema.Tuple(
- title=u'hour(s)',
- default=(),
- required=False
- )
-
- dayOfMonth = schema.Tuple(
- title=u'day of month',
- default=(),
- required=False
- )
-
- month = schema.Tuple(
- title=u'month(s)',
- default=(),
- required=False
- )
-
- dayOfWeek = schema.Tuple(
- title=u'day of week',
- default=(),
- required=False
- )
-
- delay = schema.Int(
- title=u'delay',
- default=0,
- required=False
- )
-
- scheduledFor = schema.Datetime(
- title=u'scheduled',
- default=None,
- required=False
- )
-
- def update(minute, hour, dayOfMonth, month, dayOfWeek, delay):
- """Update the cron job.
-
- The job must be rescheduled in the containing service.
- """
-
- def timeOfNextCall(now=None):
- """Calculate the time for the next call of the job.
-
- now is a convenience parameter for testing.
- """
-
-
-class IStartRemoteTasksEvent(interface.Interface):
- """Event to start the Remote Tasks"""
-
- serviceNames = schema.List(
- title = u'Services to start',
- default = [],
- required = False,
- value_type = schema.TextLine()
- )
-
-
-class IProcessor(interface.Interface):
- """Job Processor
-
- Process the jobs that are waiting in the queue. A processor is meant to
- be run in a separate thread. To complete a job, it simply calls back into
- the task server. This works, since it does not use up any Web server
- threads.
-
- Processing a job can take a long time. However, we do not have to worry
- about transaction conflicts, since no other request is touching the job
- object.
- """
-
- running = schema.Bool(
- title=u"Running Flag",
- description=u"Tells whether the processor is currently running.",
- readonly=True)
-
- def __call__(db, servicePath):
- """Run the processor.
-
- The ``db`` is a ZODB instance that is used to call back into the task
- service. The ``servicePath`` specifies how to traverse to the task
- service itself.
- """
+from z3c.taskqueue.interfaces import *
Modified: lovely.remotetask/branches/gotcha-z3c-taskqueue/src/lovely/remotetask/service.py
===================================================================
--- lovely.remotetask/branches/gotcha-z3c-taskqueue/src/lovely/remotetask/service.py 2010-04-21 19:25:19 UTC (rev 111221)
+++ lovely.remotetask/branches/gotcha-z3c-taskqueue/src/lovely/remotetask/service.py 2010-04-21 19:26:35 UTC (rev 111222)
@@ -11,416 +11,16 @@
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
-"""Task Service Implementation
+from z3c.taskqueue.service import TaskService
+from z3c.taskqueue.startup import databaseOpened
+from z3c.taskqueue.startup import getStartSpecifications
-"""
-__docformat__ = 'restructuredtext'
-from lovely.remotetask import interfaces, job, task, processor
-from zope import component
-from zope.app.appsetup.product import getProductConfiguration
-from zope.app.container import contained
-from zope.app.publication.zopepublication import ZopePublication
-from zope.component.interfaces import ComponentLookupError
-from zope.traversing.api import getParents
-import BTrees
-import datetime
-import logging
-import persistent
-import random
-import threading
-import time
-import zc.queue
-import zope.interface
-import zope.location
-
-
-log = logging.getLogger('lovely.remotetask')
-
-storage = threading.local()
-
-
-class TaskService(contained.Contained, persistent.Persistent):
- """A persistent task service.
-
- The available tasks for this service are managed as utilities.
- """
- zope.interface.implements(interfaces.ITaskService)
-
- taskInterface = interfaces.ITask
- processorFactory = processor.SimpleProcessor
- processorArguments = {'waitTime': 1.0}
-
- _scheduledJobs = None
- _scheduledQueue = None
- _v_nextid = None
- family = BTrees.family32
-
- def __init__(self):
- super(TaskService, self).__init__()
- self.jobs = self.family.IO.BTree()
- self._queue = zc.queue.Queue()
- self._scheduledJobs = self.family.IO.BTree()
- self._scheduledQueue = zc.queue.Queue()
-
- def getAvailableTasks(self):
- """See interfaces.ITaskService"""
- return dict(component.getUtilitiesFor(self.taskInterface))
-
- def add(self, task, input=None, startLater=False):
- """See interfaces.ITaskService"""
- if task not in self.getAvailableTasks():
- raise ValueError('Task does not exist')
- jobid = self._generateId()
- newjob = job.Job(jobid, task, input)
- self.jobs[jobid] = newjob
- if startLater:
- newjob.status = interfaces.STARTLATER
- else:
- self._queue.put(newjob)
- newjob.status = interfaces.QUEUED
- return jobid
-
- def addCronJob(self, task, input=None,
- minute=(),
- hour=(),
- dayOfMonth=(),
- month=(),
- dayOfWeek=(),
- delay=None,
- ):
- jobid = self._generateId()
- newjob = job.CronJob(jobid, task, input,
- minute, hour, dayOfMonth, month, dayOfWeek, delay)
- self.jobs[jobid] = newjob
- if newjob.delay is None:
- newjob.status = interfaces.CRONJOB
- else:
- newjob.status = interfaces.DELAYED
- self._scheduledQueue.put(newjob)
- return jobid
-
- def startJob(self, jobid):
- job = self.jobs[jobid]
- if job.status == interfaces.STARTLATER:
- self._queue.put(job)
- job.status = interfaces.QUEUED
- return True
- return False
-
- def reschedule(self, jobid):
- self._scheduledQueue.put(self.jobs[jobid])
-
- def clean(self, status=[interfaces.CANCELLED, interfaces.ERROR,
- interfaces.COMPLETED]):
- """See interfaces.ITaskService"""
- allowed = [interfaces.CANCELLED, interfaces.ERROR,
- interfaces.COMPLETED]
- for key in list(self.jobs.keys()):
- job = self.jobs[key]
- if job.status in status:
- if job.status not in allowed:
- raise ValueError('Not allowed status for removing. %s' %
- job.status)
- del self.jobs[key]
-
- def cancel(self, jobid):
- """See interfaces.ITaskService"""
- for idx, job in enumerate(self._queue):
- if job.id == jobid:
- job.status = interfaces.CANCELLED
- self._queue.pull(idx)
- break
- if jobid in self.jobs:
- job = self.jobs[jobid]
- if ( job.status == interfaces.CRONJOB
- or job.status == interfaces.DELAYED
- or job.status == interfaces.STARTLATER
- ):
- job.status = interfaces.CANCELLED
-
- def getStatus(self, jobid):
- """See interfaces.ITaskService"""
- return self.jobs[jobid].status
-
- def getResult(self, jobid):
- """See interfaces.ITaskService"""
- return self.jobs[jobid].output
-
- def getError(self, jobid):
- """See interfaces.ITaskService"""
- return str(self.jobs[jobid].error)
-
- def startProcessing(self):
- """See interfaces.ITaskService"""
- if self.__parent__ is None:
- return
- if self._scheduledJobs is None:
- self._scheduledJobs = self.family.IOB.Tree()
- if self._scheduledQueue is None:
- self._scheduledQueue = zc.queue.PersistentQueue()
- # Create the path to the service within the DB.
- servicePath = [parent.__name__ for parent in getParents(self)
- if parent.__name__]
- servicePath.reverse()
- servicePath.append(self.__name__)
- # Start the thread running the processor inside.
- processor = self.processorFactory(
- self._p_jar.db(), servicePath, **self.processorArguments)
- thread = threading.Thread(target=processor, name=self._threadName())
- thread.setDaemon(True)
- thread.running = True
- thread.start()
-
- def stopProcessing(self):
- """See interfaces.ITaskService"""
- if self.__name__ is None:
- return
- name = self._threadName()
- for thread in threading.enumerate():
- if thread.getName() == name:
- thread.running = False
- break
-
- def isProcessing(self):
- """See interfaces.ITaskService"""
- if self.__name__ is not None:
- name = self._threadName()
- for thread in threading.enumerate():
- if thread.getName() == name:
- if thread.running:
- return True
- break
- return False
-
- def _threadName(self):
- """Return name of the processing thread."""
- # This name isn't unique based on the path to self, but this doesn't
- # change the name that's been used in past versions.
- path = [parent.__name__ for parent in getParents(self)
- if parent.__name__]
- path.append('remotetasks')
- path.reverse()
- path.append(self.__name__)
- return '.'.join(path)
-
- def hasJobsWaiting(self, now=None):
- # If there is are any simple jobs in the queue, we have work to do.
- if self._queue:
- return True
- # First, move new cron jobs from the scheduled queue into the cronjob
- # list.
- if now is None:
- now = int(time.time())
- while len(self._scheduledQueue) > 0:
- job = self._scheduledQueue.pull()
- if job.status is not interfaces.CANCELLED:
- self._insertCronJob(job, now)
- # Now get all jobs that should be done now or earlier; if there are
- # any that do not have errors or are cancelled, then we have jobs to
- # do.
- for key in self._scheduledJobs.keys(max=now):
- jobs = [job for job in self._scheduledJobs[key]
- if job.status not in (interfaces.CANCELLED,
- interfaces.ERROR)]
- if jobs:
- return True
- return False
-
- def claimNextJob(self, now=None):
- job = self._pullJob(now)
- return job and job.id or None
-
- def processNext(self, now=None, jobid=None):
- if jobid is None:
- job = self._pullJob(now)
- else:
- job = self.jobs[jobid]
- if job is None:
- return False
- if job.status == interfaces.COMPLETED:
- return True
- try:
- jobtask = component.getUtility(self.taskInterface, name=job.task)
- except ComponentLookupError, error:
- log.error('Task "%s" not found!'% job.task)
- log.exception(str(error))
- job.error = error
- if job.status != interfaces.CRONJOB:
- job.status = interfaces.ERROR
- return True
- job.started = datetime.datetime.now()
- if not hasattr(storage, 'runCount'):
- storage.runCount = 0
- storage.runCount += 1
- try:
- job.output = jobtask(self, job.id, job.input)
- if job.status != interfaces.CRONJOB:
- job.status = interfaces.COMPLETED
- except task.TaskError, error:
- job.error = error
- if job.status != interfaces.CRONJOB:
- job.status = interfaces.ERROR
- except Exception, error:
- if storage.runCount <= 3:
- log.error('Caught a generic exception, preventing thread '
- 'from crashing')
- log.exception(str(error))
- raise
- else:
- job.error = error
- if job.status != interfaces.CRONJOB:
- job.status = interfaces.ERROR
- job.completed = datetime.datetime.now()
- storage.runCount = 0
- return True
-
- def process(self, now=None):
- """See interfaces.ITaskService"""
- while self.processNext(now):
- pass
-
- def _pullJob(self, now=None):
- # first move new cron jobs from the scheduled queue into the cronjob
- # list
- if now is None:
- now = int(time.time())
- while len(self._scheduledQueue) > 0:
- job = self._scheduledQueue.pull()
- if job.status is not interfaces.CANCELLED:
- self._insertCronJob(job, now)
- # try to get the next cron job
- while True:
- try:
- first = self._scheduledJobs.minKey()
- except ValueError:
- break
- else:
- if first > now:
- break
- jobs = self._scheduledJobs[first]
- job = jobs[0]
- self._scheduledJobs[first] = jobs[1:]
- if len(self._scheduledJobs[first]) == 0:
- del self._scheduledJobs[first]
- if ( job.status != interfaces.CANCELLED
- and job.status != interfaces.ERROR
- ):
- if job.status != interfaces.DELAYED:
- self._insertCronJob(job, now)
- return job
- # get a job from the input queue
- if self._queue:
- return self._queue.pull()
- return None
-
- def _insertCronJob(self, job, now):
- for callTime, scheduled in list(self._scheduledJobs.items()):
- if job in scheduled:
- scheduled = list(scheduled)
- scheduled.remove(job)
- if len(scheduled) == 0:
- del self._scheduledJobs[callTime]
- else:
- self._scheduledJobs[callTime] = tuple(scheduled)
- break
- nextCallTime = job.timeOfNextCall(now)
- job.scheduledFor = datetime.datetime.fromtimestamp(nextCallTime)
- set = self._scheduledJobs.get(nextCallTime)
- if set is None:
- self._scheduledJobs[nextCallTime] = ()
- jobs = self._scheduledJobs[nextCallTime]
- self._scheduledJobs[nextCallTime] = jobs + (job,)
-
- def _generateId(self):
- """Generate an id which is not yet taken.
-
- This tries to allocate sequential ids so they fall into the
- same BTree bucket, and randomizes if it stumbles upon a
- used one.
- """
- while True:
- if self._v_nextid is None:
- self._v_nextid = random.randrange(0, self.family.maxint)
- uid = self._v_nextid
- self._v_nextid += 1
- if uid not in self.jobs:
- return uid
- self._v_nextid = None
-
-
-
-def getAutostartServiceNames():
- """get a list of services to start"""
-
- serviceNames = []
- config = getProductConfiguration('lovely.remotetask')
- if config is not None:
- serviceNames = [name.strip()
- for name in config.get('autostart', '').split(',')]
- return serviceNames
-
-
def bootStrapSubscriber(event):
- """Start the queue processing services based on the
- settings in zope.conf"""
+ databaseOpened(event, productName='lovely.remotetask')
- serviceNames = getAutostartServiceNames()
- db = event.database
- connection = db.open()
- root = connection.root()
- root_folder = root.get(ZopePublication.root_name, None)
- # we assume that portals can only added at site root level
-
- log.info('handling event IStartRemoteTasksEvent')
-
- for siteName, serviceName in [name.split('@')
- for name in serviceNames if name]:
- if siteName == '':
- sites = [root_folder]
- elif siteName == '*':
- sites = []
- sites.append(root_folder)
- for folder in root_folder.values():
- if zope.location.interfaces.ISite.providedBy(folder):
- sites.append(folder)
- else:
- sites = [root_folder.get(siteName)]
-
- rootSM = root_folder.getSiteManager()
- rootServices = list(rootSM.getUtilitiesFor(interfaces.ITaskService))
-
- for site in sites:
- csName = getattr(site, "__name__", '')
- if csName is None:
- csName = 'root'
- if site is not None:
- sm = site.getSiteManager()
- if serviceName == '*':
- services = list(sm.getUtilitiesFor(interfaces.ITaskService))
- if siteName != "*" and siteName != '':
- services = [s for s in services
- if s not in rootServices]
- else:
- services = [(serviceName,
- component.queryUtility(interfaces.ITaskService,
- context=site,
- name=serviceName))]
- serviceCount = 0
- for srvname, service in services:
- if service is not None and not service.isProcessing():
- service.startProcessing()
- serviceCount += 1
- msg = 'service %s on site %s started'
- log.info(msg % (srvname, csName))
- else:
- if siteName != "*" and serviceName != "*":
- msg = 'service %s on site %s not found'
- log.error(msg % (srvname, csName))
- else:
- log.error('site %s not found' % siteName)
-
- if (siteName == "*" or serviceName == "*") and serviceCount == 0:
- msg = 'no services started by directive %s'
- log.warn(msg % name)
+def getAutostartServiceNames():
+ from zope.app.appsetup.product import getProductConfiguration
+ configuration = getProductConfiguration('lovely.remotetask')
+ return getStartSpecifications(configuration)
More information about the checkins
mailing list