[Checkins] SVN: lovely.remotetask/trunk/src/lovely/remotetask/ changed the way remotetask is beeing started. it seems that under special circumstances (zdaemon restarts a killed zope) remotetask might be started twice. here's the new way to start it.

Jodok Batlogg jodok.batlogg at lovelysystems.com
Fri May 25 08:08:19 EDT 2007


Log message for revision 75959:
  changed the way remotetask is beeing started. it seems that under special circumstances (zdaemon restarts a killed zope) remotetask might be started twice. here's the new way to start it.
  

Changed:
  U   lovely.remotetask/trunk/src/lovely/remotetask/README.txt
  U   lovely.remotetask/trunk/src/lovely/remotetask/browser/service.py
  U   lovely.remotetask/trunk/src/lovely/remotetask/configure.zcml
  U   lovely.remotetask/trunk/src/lovely/remotetask/interfaces.py
  U   lovely.remotetask/trunk/src/lovely/remotetask/service.py
  U   lovely.remotetask/trunk/src/lovely/remotetask/tests.py

-=-
Modified: lovely.remotetask/trunk/src/lovely/remotetask/README.txt
===================================================================
--- lovely.remotetask/trunk/src/lovely/remotetask/README.txt	2007-05-25 11:40:57 UTC (rev 75958)
+++ lovely.remotetask/trunk/src/lovely/remotetask/README.txt	2007-05-25 12:08:18 UTC (rev 75959)
@@ -15,11 +15,42 @@
 2. They also allow to move expensive operations to other servers. This is
    valuable, for example, when converting videos on high-traffic sites.
 
+Installation
+------------
+
+Define the remotetasks that should be started on startup in zope.conf like 
+this::
+
+  <product-config lovely.remotetask>
+    autostart site1.TestTaskService1, site2.TestTaskService2 
+  </product-config>
+
+This causes the Remotetasks beeing started upon zope startup.
+
+Usage
+_____
+
 Let's now start by creating a single service:
 
   >>> from lovely import remotetask
   >>> service = remotetask.TaskService()
+  
+Let's register it under the name `TestTaskService`:
 
+  >>> from zope import component
+  >>> from lovely.remotetask import interfaces
+  >>> component.provideUtility(service, interfaces.ITaskService, 
+  ...                          name='TestTaskService1')
+  
+The object should be located, so it get's a name:
+
+  >>> root['testTaskService1'] = service
+  >>> service = root['testTaskService1'] # caution! proxy
+  >>> service.__name__
+  u'testTaskService1'
+  >>> service.__parent__ is root
+  True
+    
 We can discover the available tasks:
 
   >>> service.getAvailableTasks()
@@ -77,18 +108,65 @@
   >>> service.getStatus(jobid)
   'cancelled'
 
+The service isn't beeing started by default:
+
+  >>> service.isProcessing()
+  False
+
+The TaskService is beeing started automatically - if specified in zope.conf -
+as soon as the `IDatabaseOpenedEvent` is fired. Let's emulate the zope.conf 
+settings:
+
+  >>> class Config(object):
+  ...     mapping = {}
+  ...     def getSectionName(self):
+  ...         return 'lovely.remotetask'
+  >>> config = Config()
+  >>> servicenames = 'site1.TestTaskService1, site2.TestTaskService2'
+  >>> config.mapping['autostart'] = servicenames
+  >>> from zope.app.appsetup.product import setProductConfigurations
+  >>> setProductConfigurations([config])
+  >>> from lovely.remotetask.service import getAutostartServiceNames
+  >>> getAutostartServiceNames()
+  ['site1.TestTaskService1', 'site2.TestTaskService2']
+  
+On Zope startup the IDatabaseOpenedEvent is beeing fired, and will call
+the bootStrap method:
+
+  >>> from ZODB.tests import util
+  >>> import transaction
+  >>> db = util.DB()
+  >>> from zope.app.publication.zopepublication import ZopePublication
+  >>> conn = db.open()
+  >>> conn.root()[ZopePublication.root_name] = root
+  >>> from zope.app.folder import Folder
+  >>> root['site1'] = Folder()
+  >>> transaction.commit()
+
+Fire the event::
+
+  >>> from zope.app.appsetup.interfaces import DatabaseOpenedWithRoot
+  >>> from lovely.remotetask.service import bootStrapSubscriber
+  >>> event = DatabaseOpenedWithRoot(db)
+  >>> bootStrapSubscriber(event)
+
+and voila - the service is processing:
+
+  >>> service.isProcessing()
+  True
+
+Finally stop processing and kill the thread. We'll call service.process() 
+manually as we don't have the right environment in the tests.
+
+  >>> service.stopProcessing()
+  >>> service.isProcessing()
+  False
+
 Let's now readd a job:
 
   >>> jobid = service.add(u'echo', {'foo': 'bar'})
-
-The jobs in the queue are processed by calling the service's ``process()``
-method:
-
   >>> service.process()
 
-This method is usually called by other application logic, but we have to call
-it manually here, since none of the other infrastructure is setup.
-
   >>> service.getStatus(jobid)
   'completed'
   >>> service.getResult(jobid)

Modified: lovely.remotetask/trunk/src/lovely/remotetask/browser/service.py
===================================================================
--- lovely.remotetask/trunk/src/lovely/remotetask/browser/service.py	2007-05-25 11:40:57 UTC (rev 75958)
+++ lovely.remotetask/trunk/src/lovely/remotetask/browser/service.py	2007-05-25 12:08:18 UTC (rev 75959)
@@ -21,9 +21,9 @@
 
 import zope.interface
 import zope.component
-from zope.publisher.browser import BrowserPage
 from zope.app.pagetemplate import ViewPageTemplateFile
 from zope.app.session.interfaces import ISession
+from zope.publisher.browser import BrowserPage
 from zc.table import column, table
 from zc.table.interfaces import ISortableColumn
 from lovely.remotetask import interfaces

Modified: lovely.remotetask/trunk/src/lovely/remotetask/configure.zcml
===================================================================
--- lovely.remotetask/trunk/src/lovely/remotetask/configure.zcml	2007-05-25 11:40:57 UTC (rev 75958)
+++ lovely.remotetask/trunk/src/lovely/remotetask/configure.zcml	2007-05-25 12:08:18 UTC (rev 75959)
@@ -28,6 +28,11 @@
         set_schema=".interfaces.ICronJob" />
   </class>
 
+  <subscriber
+      for="zope.app.appsetup.IDatabaseOpenedEvent"
+      handler=".service.bootStrapSubscriber"
+   />
+   
   <!-- Demo: Echo Task -->
   <utility
       factory=".task.EchoTask"

Modified: lovely.remotetask/trunk/src/lovely/remotetask/interfaces.py
===================================================================
--- lovely.remotetask/trunk/src/lovely/remotetask/interfaces.py	2007-05-25 11:40:57 UTC (rev 75958)
+++ lovely.remotetask/trunk/src/lovely/remotetask/interfaces.py	2007-05-25 12:08:18 UTC (rev 75959)
@@ -16,9 +16,8 @@
 $Id$
 """
 __docformat__ = 'restructuredtext'
-import zope.interface
-import zope.interface.common.mapping
-import zope.schema
+from zope import interface
+from zope import schema
 from zope.app.container.interfaces import IContained
 
 QUEUED = 'queued'
@@ -31,10 +30,10 @@
 class ITaskService(IContained):
     """A service for managing and executing long-running, remote tasks."""
 
-    jobs = zope.schema.Object(
+    jobs = schema.Object(
         title=u'Jobs',
         description=u'A mapping of all jobs by job id.',
-        schema=zope.interface.common.mapping.IMapping)
+        schema=interface.common.mapping.IMapping)
 
     def getAvailableTasks():
         """Return a mapping of task name to the task."""
@@ -95,19 +94,19 @@
         """
 
 
-class ITask(zope.interface.Interface):
+class ITask(interface.Interface):
     """A task available in the task service"""
 
-    inputSchema = zope.schema.Object(
+    inputSchema = schema.Object(
         title=u'Input Schema',
         description=u'A schema describing the task input signature.',
-        schema=zope.interface.Interface,
+        schema=interface.Interface,
         required=False)
 
-    outputSchema = zope.schema.Object(
+    outputSchema = schema.Object(
         title=u'Output Schema',
         description=u'A schema describing the task output signature.',
-        schema=zope.interface.Interface,
+        schema=interface.Interface,
         required=False)
 
     def __call__(self, service, jobid, input):
@@ -125,55 +124,55 @@
         """
 
 
-class IJob(zope.interface.Interface):
+class IJob(interface.Interface):
     """An internal job object."""
 
-    id = zope.schema.Int(
+    id = schema.Int(
         title=u'Id',
         description=u'The job id.',
         required=True)
 
-    task = zope.schema.TextLine(
+    task = schema.TextLine(
         title=u'Task',
         description=u'The task to be completed.',
         required=True)
 
-    status = zope.schema.Choice(
+    status = schema.Choice(
         title=u'Status',
         description=u'The current status of the job.',
         values=[QUEUED, PROCESSING, CANCELLED, ERROR, COMPLETED, CRONJOB],
         required=True)
 
-    input = zope.schema.Object(
+    input = schema.Object(
         title=u'Input',
         description=u'The input for the task.',
-        schema=zope.interface.Interface,
+        schema=interface.Interface,
         required=False)
 
-    output = zope.schema.Object(
+    output = schema.Object(
         title=u'Output',
         description=u'The output of the task.',
-        schema=zope.interface.Interface,
+        schema=interface.Interface,
         required=False,
         default=None)
 
-    error = zope.schema.Object(
+    error = schema.Object(
         title=u'Error',
         description=u'The error object when the task failed.',
-        schema=zope.interface.Interface,
+        schema=interface.Interface,
         required=False,
         default=None)
 
-    created = zope.schema.Datetime(
+    created = schema.Datetime(
         title=u'Creation Date',
         description=u'The date/time at which the job was created.',
         required=True)
 
-    started = zope.schema.Datetime(
+    started = schema.Datetime(
         title=u'Start Date',
         description=u'The date/time at which the job was started.')
 
-    completed = zope.schema.Datetime(
+    completed = schema.Datetime(
         title=u'Completion Date',
         description=u'The date/time at which the job was completed.')
 
@@ -181,33 +180,43 @@
 class ICronJob(IJob):
     """Parameters for cron jobs"""
 
-    minute = zope.schema.Tuple(
+    minute = schema.Tuple(
             title=u'minute(s)',
             default=(),
             required=False
             )
 
-    hour = zope.schema.Tuple(
+    hour = schema.Tuple(
             title=u'hour(s)',
             default=(),
             required=False
             )
 
-    dayOfMonth = zope.schema.Tuple(
+    dayOfMonth = schema.Tuple(
             title=u'day of month',
             default=(),
             required=False
             )
 
-    month = zope.schema.Tuple(
+    month = schema.Tuple(
             title=u'month(s)',
             default=(),
             required=False
             )
 
-    dayOfWeek = zope.schema.Tuple(
+    dayOfWeek = schema.Tuple(
             title=u'day of week',
             default=(),
             required=False
             )
 
+
+class IStartRemoteTasksEvent(interface.Interface):
+    """Event to start the Remote Tasks"""
+
+    serviceNames = schema.List(
+            title = u'Services to start',
+            default = [],
+            required = False,
+            value_type = schema.TextLine()
+            )
\ No newline at end of file

Modified: lovely.remotetask/trunk/src/lovely/remotetask/service.py
===================================================================
--- lovely.remotetask/trunk/src/lovely/remotetask/service.py	2007-05-25 11:40:57 UTC (rev 75958)
+++ lovely.remotetask/trunk/src/lovely/remotetask/service.py	2007-05-25 12:08:18 UTC (rev 75959)
@@ -23,17 +23,18 @@
 import threading
 import time
 import zc.queue
-import zope.component
 import zope.interface
 import zope.publisher.base
 import zope.publisher.publish
 from BTrees.IOBTree import IOBTree
 from BTrees.IFBTree import IFTreeSet
-from zope.security.proxy import removeSecurityProxy
-from zope.traversing.api import traverse
+from zope import component
 from zope.app import zapi
+from zope.app.appsetup.product import getProductConfiguration
 from zope.app.container import contained
 from zope.app.publication.zopepublication import ZopePublication
+from zope.security.proxy import removeSecurityProxy
+from zope.traversing.api import traverse
 from lovely.remotetask import interfaces, job, task
 
 log = logging.getLogger('lovely.remotetask')
@@ -49,7 +50,7 @@
 
     _scheduledJobs  = None
     _scheduledQueue = None
-
+        
     def __init__(self):
         super(TaskService, self).__init__()
         self._counter = 1
@@ -60,7 +61,7 @@
 
     def getAvailableTasks(self):
         """See interfaces.ITaskService"""
-        return dict(zope.component.getUtilitiesFor(self.taskInterface))
+        return dict(component.getUtilitiesFor(self.taskInterface))
 
     def add(self, task, input=None):
         """See interfaces.ITaskService"""
@@ -166,8 +167,7 @@
         job = self._pullJob(now)
         if job is None:
             raise IndexError
-        jobtask = zope.component.getUtility(
-                        self.taskInterface, name=job.task)
+        jobtask = component.getUtility(self.taskInterface, name=job.task)
         job.started = datetime.datetime.now()
         try:
             job.output = jobtask(self, job.id, job.input)
@@ -257,3 +257,46 @@
             log.error('catched a generic exception, preventing thread from \
                        crashing: %s'% e)
             pass
+
+def getAutostartServiceNames():
+    """get a list of services to start"""
+
+    serviceNames = []
+    config = getProductConfiguration('lovely.remotetask')
+    if config is not None:
+        serviceNames = [name.strip() 
+                        for name in config.get('autostart', '').split(',')]
+    return serviceNames
+
+
+def bootStrapSubscriber(event):
+    """Start the queue processing services based on the 
+       settings in zope.conf"""
+    
+    serviceNames = getAutostartServiceNames()
+    
+    db = event.database
+    connection = db.open()
+    root = connection.root()
+    root_folder = root.get(ZopePublication.root_name, None)
+    # we assume that portals can only added at site root level
+
+    log.info('handling event IStartRemoteTasksEvent')
+
+    for siteName, serviceName in [name.split('.')
+                                  for name in serviceNames]:
+        site = root_folder.get(siteName)
+        if site is not None:
+            service = component.queryUtility(interfaces.ITaskService, 
+                                           context=site, 
+                                           name=serviceName)
+            if service is not None and not service.isProcessing():            
+                service.startProcessing()
+                log.info('service %s on site %s started' % (serviceName,
+                                                            siteName))
+            else:
+                log.error('service %s on site %s not found' % (serviceName, 
+                                                               siteName))
+            
+        else:
+            log.error('site %s not found' % siteName)

Modified: lovely.remotetask/trunk/src/lovely/remotetask/tests.py
===================================================================
--- lovely.remotetask/trunk/src/lovely/remotetask/tests.py	2007-05-25 11:40:57 UTC (rev 75958)
+++ lovely.remotetask/trunk/src/lovely/remotetask/tests.py	2007-05-25 12:08:18 UTC (rev 75959)
@@ -20,13 +20,22 @@
 import doctest
 import unittest
 from zope.app.testing import placelesssetup
+from zope.app.testing.setup import (placefulSetUp,
+                                    placefulTearDown)
 from zope.testing.doctestunit import DocFileSuite
 
+def setUp(test):
+    root = placefulSetUp(site=True)
+    test.globs['root'] = root
+
+def tearDown(test):
+    placefulTearDown()
+
 def test_suite():
     return unittest.TestSuite((
         DocFileSuite('README.txt',
-                     setUp=placelesssetup.setUp,
-                     tearDown=placelesssetup.tearDown,
+                     setUp=setUp,
+                     tearDown=tearDown,
                      optionflags=doctest.NORMALIZE_WHITESPACE|doctest.ELLIPSIS,
                      ),
         DocFileSuite('TESTING.txt',



More information about the Checkins mailing list