[Checkins] SVN: lovely.remotetask/trunk/src/lovely/remotetask/
changed the way remotetask is beeing started. it seems that
under special circumstances (zdaemon restarts a killed zope)
remotetask might be started twice. here's the new way to start it.
Jodok Batlogg
jodok.batlogg at lovelysystems.com
Fri May 25 08:08:19 EDT 2007
Log message for revision 75959:
changed the way remotetask is beeing started. it seems that under special circumstances (zdaemon restarts a killed zope) remotetask might be started twice. here's the new way to start it.
Changed:
U lovely.remotetask/trunk/src/lovely/remotetask/README.txt
U lovely.remotetask/trunk/src/lovely/remotetask/browser/service.py
U lovely.remotetask/trunk/src/lovely/remotetask/configure.zcml
U lovely.remotetask/trunk/src/lovely/remotetask/interfaces.py
U lovely.remotetask/trunk/src/lovely/remotetask/service.py
U lovely.remotetask/trunk/src/lovely/remotetask/tests.py
-=-
Modified: lovely.remotetask/trunk/src/lovely/remotetask/README.txt
===================================================================
--- lovely.remotetask/trunk/src/lovely/remotetask/README.txt 2007-05-25 11:40:57 UTC (rev 75958)
+++ lovely.remotetask/trunk/src/lovely/remotetask/README.txt 2007-05-25 12:08:18 UTC (rev 75959)
@@ -15,11 +15,42 @@
2. They also allow to move expensive operations to other servers. This is
valuable, for example, when converting videos on high-traffic sites.
+Installation
+------------
+
+Define the remotetasks that should be started on startup in zope.conf like
+this::
+
+ <product-config lovely.remotetask>
+ autostart site1.TestTaskService1, site2.TestTaskService2
+ </product-config>
+
+This causes the Remotetasks beeing started upon zope startup.
+
+Usage
+_____
+
Let's now start by creating a single service:
>>> from lovely import remotetask
>>> service = remotetask.TaskService()
+
+Let's register it under the name `TestTaskService`:
+ >>> from zope import component
+ >>> from lovely.remotetask import interfaces
+ >>> component.provideUtility(service, interfaces.ITaskService,
+ ... name='TestTaskService1')
+
+The object should be located, so it get's a name:
+
+ >>> root['testTaskService1'] = service
+ >>> service = root['testTaskService1'] # caution! proxy
+ >>> service.__name__
+ u'testTaskService1'
+ >>> service.__parent__ is root
+ True
+
We can discover the available tasks:
>>> service.getAvailableTasks()
@@ -77,18 +108,65 @@
>>> service.getStatus(jobid)
'cancelled'
+The service isn't beeing started by default:
+
+ >>> service.isProcessing()
+ False
+
+The TaskService is beeing started automatically - if specified in zope.conf -
+as soon as the `IDatabaseOpenedEvent` is fired. Let's emulate the zope.conf
+settings:
+
+ >>> class Config(object):
+ ... mapping = {}
+ ... def getSectionName(self):
+ ... return 'lovely.remotetask'
+ >>> config = Config()
+ >>> servicenames = 'site1.TestTaskService1, site2.TestTaskService2'
+ >>> config.mapping['autostart'] = servicenames
+ >>> from zope.app.appsetup.product import setProductConfigurations
+ >>> setProductConfigurations([config])
+ >>> from lovely.remotetask.service import getAutostartServiceNames
+ >>> getAutostartServiceNames()
+ ['site1.TestTaskService1', 'site2.TestTaskService2']
+
+On Zope startup the IDatabaseOpenedEvent is beeing fired, and will call
+the bootStrap method:
+
+ >>> from ZODB.tests import util
+ >>> import transaction
+ >>> db = util.DB()
+ >>> from zope.app.publication.zopepublication import ZopePublication
+ >>> conn = db.open()
+ >>> conn.root()[ZopePublication.root_name] = root
+ >>> from zope.app.folder import Folder
+ >>> root['site1'] = Folder()
+ >>> transaction.commit()
+
+Fire the event::
+
+ >>> from zope.app.appsetup.interfaces import DatabaseOpenedWithRoot
+ >>> from lovely.remotetask.service import bootStrapSubscriber
+ >>> event = DatabaseOpenedWithRoot(db)
+ >>> bootStrapSubscriber(event)
+
+and voila - the service is processing:
+
+ >>> service.isProcessing()
+ True
+
+Finally stop processing and kill the thread. We'll call service.process()
+manually as we don't have the right environment in the tests.
+
+ >>> service.stopProcessing()
+ >>> service.isProcessing()
+ False
+
Let's now readd a job:
>>> jobid = service.add(u'echo', {'foo': 'bar'})
-
-The jobs in the queue are processed by calling the service's ``process()``
-method:
-
>>> service.process()
-This method is usually called by other application logic, but we have to call
-it manually here, since none of the other infrastructure is setup.
-
>>> service.getStatus(jobid)
'completed'
>>> service.getResult(jobid)
Modified: lovely.remotetask/trunk/src/lovely/remotetask/browser/service.py
===================================================================
--- lovely.remotetask/trunk/src/lovely/remotetask/browser/service.py 2007-05-25 11:40:57 UTC (rev 75958)
+++ lovely.remotetask/trunk/src/lovely/remotetask/browser/service.py 2007-05-25 12:08:18 UTC (rev 75959)
@@ -21,9 +21,9 @@
import zope.interface
import zope.component
-from zope.publisher.browser import BrowserPage
from zope.app.pagetemplate import ViewPageTemplateFile
from zope.app.session.interfaces import ISession
+from zope.publisher.browser import BrowserPage
from zc.table import column, table
from zc.table.interfaces import ISortableColumn
from lovely.remotetask import interfaces
Modified: lovely.remotetask/trunk/src/lovely/remotetask/configure.zcml
===================================================================
--- lovely.remotetask/trunk/src/lovely/remotetask/configure.zcml 2007-05-25 11:40:57 UTC (rev 75958)
+++ lovely.remotetask/trunk/src/lovely/remotetask/configure.zcml 2007-05-25 12:08:18 UTC (rev 75959)
@@ -28,6 +28,11 @@
set_schema=".interfaces.ICronJob" />
</class>
+ <subscriber
+ for="zope.app.appsetup.IDatabaseOpenedEvent"
+ handler=".service.bootStrapSubscriber"
+ />
+
<!-- Demo: Echo Task -->
<utility
factory=".task.EchoTask"
Modified: lovely.remotetask/trunk/src/lovely/remotetask/interfaces.py
===================================================================
--- lovely.remotetask/trunk/src/lovely/remotetask/interfaces.py 2007-05-25 11:40:57 UTC (rev 75958)
+++ lovely.remotetask/trunk/src/lovely/remotetask/interfaces.py 2007-05-25 12:08:18 UTC (rev 75959)
@@ -16,9 +16,8 @@
$Id$
"""
__docformat__ = 'restructuredtext'
-import zope.interface
-import zope.interface.common.mapping
-import zope.schema
+from zope import interface
+from zope import schema
from zope.app.container.interfaces import IContained
QUEUED = 'queued'
@@ -31,10 +30,10 @@
class ITaskService(IContained):
"""A service for managing and executing long-running, remote tasks."""
- jobs = zope.schema.Object(
+ jobs = schema.Object(
title=u'Jobs',
description=u'A mapping of all jobs by job id.',
- schema=zope.interface.common.mapping.IMapping)
+ schema=interface.common.mapping.IMapping)
def getAvailableTasks():
"""Return a mapping of task name to the task."""
@@ -95,19 +94,19 @@
"""
-class ITask(zope.interface.Interface):
+class ITask(interface.Interface):
"""A task available in the task service"""
- inputSchema = zope.schema.Object(
+ inputSchema = schema.Object(
title=u'Input Schema',
description=u'A schema describing the task input signature.',
- schema=zope.interface.Interface,
+ schema=interface.Interface,
required=False)
- outputSchema = zope.schema.Object(
+ outputSchema = schema.Object(
title=u'Output Schema',
description=u'A schema describing the task output signature.',
- schema=zope.interface.Interface,
+ schema=interface.Interface,
required=False)
def __call__(self, service, jobid, input):
@@ -125,55 +124,55 @@
"""
-class IJob(zope.interface.Interface):
+class IJob(interface.Interface):
"""An internal job object."""
- id = zope.schema.Int(
+ id = schema.Int(
title=u'Id',
description=u'The job id.',
required=True)
- task = zope.schema.TextLine(
+ task = schema.TextLine(
title=u'Task',
description=u'The task to be completed.',
required=True)
- status = zope.schema.Choice(
+ status = schema.Choice(
title=u'Status',
description=u'The current status of the job.',
values=[QUEUED, PROCESSING, CANCELLED, ERROR, COMPLETED, CRONJOB],
required=True)
- input = zope.schema.Object(
+ input = schema.Object(
title=u'Input',
description=u'The input for the task.',
- schema=zope.interface.Interface,
+ schema=interface.Interface,
required=False)
- output = zope.schema.Object(
+ output = schema.Object(
title=u'Output',
description=u'The output of the task.',
- schema=zope.interface.Interface,
+ schema=interface.Interface,
required=False,
default=None)
- error = zope.schema.Object(
+ error = schema.Object(
title=u'Error',
description=u'The error object when the task failed.',
- schema=zope.interface.Interface,
+ schema=interface.Interface,
required=False,
default=None)
- created = zope.schema.Datetime(
+ created = schema.Datetime(
title=u'Creation Date',
description=u'The date/time at which the job was created.',
required=True)
- started = zope.schema.Datetime(
+ started = schema.Datetime(
title=u'Start Date',
description=u'The date/time at which the job was started.')
- completed = zope.schema.Datetime(
+ completed = schema.Datetime(
title=u'Completion Date',
description=u'The date/time at which the job was completed.')
@@ -181,33 +180,43 @@
class ICronJob(IJob):
"""Parameters for cron jobs"""
- minute = zope.schema.Tuple(
+ minute = schema.Tuple(
title=u'minute(s)',
default=(),
required=False
)
- hour = zope.schema.Tuple(
+ hour = schema.Tuple(
title=u'hour(s)',
default=(),
required=False
)
- dayOfMonth = zope.schema.Tuple(
+ dayOfMonth = schema.Tuple(
title=u'day of month',
default=(),
required=False
)
- month = zope.schema.Tuple(
+ month = schema.Tuple(
title=u'month(s)',
default=(),
required=False
)
- dayOfWeek = zope.schema.Tuple(
+ dayOfWeek = schema.Tuple(
title=u'day of week',
default=(),
required=False
)
+
+class IStartRemoteTasksEvent(interface.Interface):
+ """Event to start the Remote Tasks"""
+
+ serviceNames = schema.List(
+ title = u'Services to start',
+ default = [],
+ required = False,
+ value_type = schema.TextLine()
+ )
\ No newline at end of file
Modified: lovely.remotetask/trunk/src/lovely/remotetask/service.py
===================================================================
--- lovely.remotetask/trunk/src/lovely/remotetask/service.py 2007-05-25 11:40:57 UTC (rev 75958)
+++ lovely.remotetask/trunk/src/lovely/remotetask/service.py 2007-05-25 12:08:18 UTC (rev 75959)
@@ -23,17 +23,18 @@
import threading
import time
import zc.queue
-import zope.component
import zope.interface
import zope.publisher.base
import zope.publisher.publish
from BTrees.IOBTree import IOBTree
from BTrees.IFBTree import IFTreeSet
-from zope.security.proxy import removeSecurityProxy
-from zope.traversing.api import traverse
+from zope import component
from zope.app import zapi
+from zope.app.appsetup.product import getProductConfiguration
from zope.app.container import contained
from zope.app.publication.zopepublication import ZopePublication
+from zope.security.proxy import removeSecurityProxy
+from zope.traversing.api import traverse
from lovely.remotetask import interfaces, job, task
log = logging.getLogger('lovely.remotetask')
@@ -49,7 +50,7 @@
_scheduledJobs = None
_scheduledQueue = None
-
+
def __init__(self):
super(TaskService, self).__init__()
self._counter = 1
@@ -60,7 +61,7 @@
def getAvailableTasks(self):
"""See interfaces.ITaskService"""
- return dict(zope.component.getUtilitiesFor(self.taskInterface))
+ return dict(component.getUtilitiesFor(self.taskInterface))
def add(self, task, input=None):
"""See interfaces.ITaskService"""
@@ -166,8 +167,7 @@
job = self._pullJob(now)
if job is None:
raise IndexError
- jobtask = zope.component.getUtility(
- self.taskInterface, name=job.task)
+ jobtask = component.getUtility(self.taskInterface, name=job.task)
job.started = datetime.datetime.now()
try:
job.output = jobtask(self, job.id, job.input)
@@ -257,3 +257,46 @@
log.error('catched a generic exception, preventing thread from \
crashing: %s'% e)
pass
+
+def getAutostartServiceNames():
+ """get a list of services to start"""
+
+ serviceNames = []
+ config = getProductConfiguration('lovely.remotetask')
+ if config is not None:
+ serviceNames = [name.strip()
+ for name in config.get('autostart', '').split(',')]
+ return serviceNames
+
+
+def bootStrapSubscriber(event):
+ """Start the queue processing services based on the
+ settings in zope.conf"""
+
+ serviceNames = getAutostartServiceNames()
+
+ db = event.database
+ connection = db.open()
+ root = connection.root()
+ root_folder = root.get(ZopePublication.root_name, None)
+ # we assume that portals can only added at site root level
+
+ log.info('handling event IStartRemoteTasksEvent')
+
+ for siteName, serviceName in [name.split('.')
+ for name in serviceNames]:
+ site = root_folder.get(siteName)
+ if site is not None:
+ service = component.queryUtility(interfaces.ITaskService,
+ context=site,
+ name=serviceName)
+ if service is not None and not service.isProcessing():
+ service.startProcessing()
+ log.info('service %s on site %s started' % (serviceName,
+ siteName))
+ else:
+ log.error('service %s on site %s not found' % (serviceName,
+ siteName))
+
+ else:
+ log.error('site %s not found' % siteName)
Modified: lovely.remotetask/trunk/src/lovely/remotetask/tests.py
===================================================================
--- lovely.remotetask/trunk/src/lovely/remotetask/tests.py 2007-05-25 11:40:57 UTC (rev 75958)
+++ lovely.remotetask/trunk/src/lovely/remotetask/tests.py 2007-05-25 12:08:18 UTC (rev 75959)
@@ -20,13 +20,22 @@
import doctest
import unittest
from zope.app.testing import placelesssetup
+from zope.app.testing.setup import (placefulSetUp,
+ placefulTearDown)
from zope.testing.doctestunit import DocFileSuite
+def setUp(test):
+ root = placefulSetUp(site=True)
+ test.globs['root'] = root
+
+def tearDown(test):
+ placefulTearDown()
+
def test_suite():
return unittest.TestSuite((
DocFileSuite('README.txt',
- setUp=placelesssetup.setUp,
- tearDown=placelesssetup.tearDown,
+ setUp=setUp,
+ tearDown=tearDown,
optionflags=doctest.NORMALIZE_WHITESPACE|doctest.ELLIPSIS,
),
DocFileSuite('TESTING.txt',
More information about the Checkins
mailing list