[Checkins] SVN: lovely.remotetask/trunk/src/lovely/remotetask/ Add stub testing component

Roger Ineichen roger at projekt01.ch
Thu Sep 7 17:07:30 EDT 2006


Log message for revision 70035:
  Add stub testing component
  Improve implementation during transcoding implementation

Changed:
  A   lovely.remotetask/trunk/src/lovely/remotetask/TESTING.txt
  U   lovely.remotetask/trunk/src/lovely/remotetask/job.py
  U   lovely.remotetask/trunk/src/lovely/remotetask/service.py
  A   lovely.remotetask/trunk/src/lovely/remotetask/testing.py
  U   lovely.remotetask/trunk/src/lovely/remotetask/tests.py

-=-
Added: lovely.remotetask/trunk/src/lovely/remotetask/TESTING.txt
===================================================================
--- lovely.remotetask/trunk/src/lovely/remotetask/TESTING.txt	2006-09-07 15:52:25 UTC (rev 70034)
+++ lovely.remotetask/trunk/src/lovely/remotetask/TESTING.txt	2006-09-07 21:07:29 UTC (rev 70035)
@@ -0,0 +1,131 @@
+=====================
+Remote Task Execution
+=====================
+
+This package provides an implementation of a remote task execution Web service
+that allows to execute pre-defined tasks on another server. See more info 
+about the TaskService in README.txt. This test will test the TaskServiceStub
+implementation. The only different is, that the TaskServiceStub will handle 
+task implementation providing ITaskStub interfaces rather then ITask. This way
+we can register stub tasks for a testing setup. See also another usecase for
+a task service stub implementation which is working with XML-RPC in the 
+package lovely.transcoding. 
+
+Let's now start by creating a task service stub:
+
+  >>> from lovely import remotetask
+  >>> from lovely.remotetask import testing
+  >>> service = testing.TaskServiceStub()
+
+We can discover the available tasks:
+
+  >>> service.getAvailableTasks()
+  {}
+
+This list is initially empty, because we have not registered any tasks. Let's
+now define a task that simply echos an input string:
+
+  >>> def echo(input):
+  ...     return input
+
+  >>> import lovely.remotetask.task
+  >>> echoTask = remotetask.task.SimpleTask(echo)
+
+The only API requirement on the converter is to be callable. Now we make sure
+that the task works:
+
+  >>> echoTask(service, 1, input={'foo': 'blah'})
+  {'foo': 'blah'}
+
+Let's now register the task as a utility. Note that we need to register the
+echo utility used in the REAME.txt tests for the ITaskStub interface:
+
+  >>> import zope.component
+  >>> zope.component.provideUtility(echoTask, provides=testing.ITaskStub, 
+  ...     name='echo')
+
+The echo task is now available in the service:
+
+  >>> service.getAvailableTasks()
+  {u'echo': <SimpleTask <function echo ...>>}
+
+
+Since the service cannot instantaneously complete a task, incoming jobs are
+managed by a queue. First we request the echo task to be executed:
+
+  >>> jobid = service.add(u'echo', {'foo': 'bar'})
+  >>> jobid
+  1
+
+Let's also see wat's happen if we add a non existent task:
+
+  >>> service.add(u'undefined', {'foo': 'bar'})
+  Traceback (most recent call last):
+  ...
+  ValueError: Task does not exist
+
+The ``add()`` function schedules the task called "echo" to be executed with
+the specified arguments. The method returns a job id with which we can inquire
+about the job.
+
+  >>> service.getStatus(jobid)
+  'queued'
+
+Since the job has not been processed, the status is set to "queued". Further,
+there is no result available yet:
+
+  >>> service.getResult(jobid) is None
+  True
+
+As long as the job is not being processed, it can be cancelled:
+
+  >>> service.cancel(jobid)
+  >>> service.getStatus(jobid)
+  'cancelled'
+
+Let's also see wat's happen if we cancel a non existent task:
+
+  >>> service.cancel(u'undefined')
+
+Let's now readd a job:
+
+  >>> jobid = service.add(u'echo', {'foo': 'bar'})
+
+The jobs in the queue are processed by calling the service's ``process()``
+method:
+
+  >>> service.process()
+
+This method is usually called by other application logic, but we have to call
+it manually here, since none of the other infrastructure is setup.
+
+  >>> service.getStatus(jobid)
+  'completed'
+  >>> service.getResult(jobid)
+  {'foo': 'bar'}
+
+Now, let's define a new task that causes an error:
+
+  >>> def error(input):
+  ...     raise remotetask.task.TaskError('An error occurred.')
+
+  >>> zope.component.provideUtility(
+  ...     remotetask.task.SimpleTask(error), provides=testing.ITaskStub, 
+  ...     name='error')
+
+Now add and execute it:
+
+  >>> jobid = service.add(u'error')
+  >>> service.process()
+
+Let's now see what happened:
+
+  >>> service.getStatus(jobid)
+  'error'
+  >>> service.getError(jobid)
+  'An error occurred.'
+
+For management purposes, the service also allows you to inspect all jobs:
+
+  >>> dict(service.jobs)
+  {1: <Job 1>, 2: <Job 2>, 3: <Job 3>}


Property changes on: lovely.remotetask/trunk/src/lovely/remotetask/TESTING.txt
___________________________________________________________________
Name: svn:eol-style
   + native

Modified: lovely.remotetask/trunk/src/lovely/remotetask/job.py
===================================================================
--- lovely.remotetask/trunk/src/lovely/remotetask/job.py	2006-09-07 15:52:25 UTC (rev 70034)
+++ lovely.remotetask/trunk/src/lovely/remotetask/job.py	2006-09-07 21:07:29 UTC (rev 70035)
@@ -30,6 +30,7 @@
 
     id = FieldProperty(interfaces.IJob['id'])
     task = FieldProperty(interfaces.IJob['task'])
+    status = FieldProperty(interfaces.IJob['status'])
     input = FieldProperty(interfaces.IJob['input'])
     output = FieldProperty(interfaces.IJob['output'])
     error = FieldProperty(interfaces.IJob['error'])

Modified: lovely.remotetask/trunk/src/lovely/remotetask/service.py
===================================================================
--- lovely.remotetask/trunk/src/lovely/remotetask/service.py	2006-09-07 15:52:25 UTC (rev 70034)
+++ lovely.remotetask/trunk/src/lovely/remotetask/service.py	2006-09-07 21:07:29 UTC (rev 70035)
@@ -28,6 +28,8 @@
 import zope.publisher.base
 import zope.publisher.publish
 from BTrees.IOBTree import IOBTree
+from zope.security.proxy import removeSecurityProxy
+from zope.traversing.api import traverse
 from zope.app import zapi
 from zope.app.container import contained
 from zope.app.publication.zopepublication import ZopePublication
@@ -89,6 +91,7 @@
         """See interfaces.ITaskService"""
         path = [parent.__name__ for parent in zapi.getParents(self)
                  if parent.__name__]
+        path.reverse()
         path.append(self.__name__)
         path.append('processNext')
 
@@ -138,11 +141,7 @@
     """A custom publication to process the next job."""
 
     def traverseName(self, request, ob, name):
-        if hasattr(ob, '__contains__') and name in ob:
-            ob = ob[name]
-        else:
-            ob = getattr(ob, name)
-        return ob
+        return traverse(removeSecurityProxy(ob), name, None)
 
 
 def processor(db, path):
@@ -161,6 +160,6 @@
             zope.publisher.publish.publish(request, False)
         except IndexError:
             time.sleep(1)
-        except:
-            # This thread should never crash, thus a blank except
-            pass
+#        except:
+#            # This thread should never crash, thus a blank except
+#            pass

Added: lovely.remotetask/trunk/src/lovely/remotetask/testing.py
===================================================================
--- lovely.remotetask/trunk/src/lovely/remotetask/testing.py	2006-09-07 15:52:25 UTC (rev 70034)
+++ lovely.remotetask/trunk/src/lovely/remotetask/testing.py	2006-09-07 21:07:29 UTC (rev 70035)
@@ -0,0 +1,161 @@
+##############################################################################
+#
+# Copyright (c) 2006 Lovely Systems and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""Task Service testing tools
+
+$Id$
+"""
+__docformat__ = "reStructuredText"
+
+import datetime
+import threading
+import zc.queue
+import zope.component
+import zope.interface
+from BTrees.IOBTree import IOBTree
+from zope.app import zapi
+from zope.app.container import contained
+from lovely.remotetask import interfaces, job, task
+
+
+###############################################################################
+#
+# Stub implementations (Note: use the ITaskStub interface for this service)
+#
+###############################################################################
+
+class QueueStub(object):
+
+    zope.interface.implements(zc.queue.interfaces.IQueue)
+
+    def __init__(self):
+        self._data = ()
+
+    def pull(self, index=0):
+        if index < 0:
+            len_self = len(self._data)
+            index += len_self
+            if index < 0:
+                raise IndexError(index-len_self)
+        res = self._data[index]
+        self._data = self._data[:index] + self._data[index+1:]
+        return res
+
+    def put(self, item):
+        self._data += (item,)
+
+    def __len__(self):
+        return len(self._data)
+
+    def __iter__(self):
+        return iter(self._data)
+
+    def __getitem__(self, index):
+        return self._data[index] # works with passing a slice too
+
+    def __nonzero__(self):
+        return bool(self._data)
+
+
+class ITaskStub(interfaces.ITask):
+    """Task stub interface for stub tasks."""
+
+
+class TaskServiceStub(contained.Contained):
+    """A task service stub.
+
+    The available tasks for this service are managed as stub utilities.
+    This task service stub could be helpful if you need to use a different 
+    testing setup. If so, register your own testing ITaskStub in ftesting.zcml.
+    """
+    zope.interface.implements(interfaces.ITaskService)
+
+    # NOTE: we use ITaskStub instead of ITask
+    taskInterface = ITaskStub
+
+    def __init__(self):
+        super(TaskServiceStub, self).__init__()
+        self._counter = 1
+        self.jobs = IOBTree()
+        self._queue = QueueStub()
+
+    def getAvailableTasks(self):
+        """See interfaces.ITaskService"""
+        return dict(zope.component.getUtilitiesFor(self.taskInterface))
+
+    def add(self, task, input=None):
+        """See interfaces.ITaskService"""
+        if task not in self.getAvailableTasks():
+            raise ValueError('Task does not exist')
+        jobid = self._counter
+        self._counter += 1
+        newjob = job.Job(jobid, task, input)
+        self.jobs[jobid] = newjob
+        self._queue.put(newjob)
+        newjob.status = interfaces.QUEUED
+        return jobid
+
+    def cancel(self, jobid):
+        """See interfaces.ITaskService"""
+        for idx, job in enumerate(self._queue):
+            if job.id == jobid:
+                job.status = interfaces.CANCELLED
+                self._queue.pull(idx)
+                break
+
+    def getStatus(self, jobid):
+        """See interfaces.ITaskService"""
+        return self.jobs[jobid].status
+
+    def getResult(self, jobid):
+        """See interfaces.ITaskService"""
+        return self.jobs[jobid].output
+
+    def getError(self, jobid):
+        """See interfaces.ITaskService"""
+        return str(self.jobs[jobid].error)
+
+    def startProcessing(self):
+        """See interfaces.ITaskService"""
+        raise NotImplementedError("Stub doesn't startProcessing")
+
+    def stopProcessing(self):
+        """See interfaces.ITaskService"""
+        raise NotImplementedError("Stub doesn't stopProcessing")
+
+    def isProcessing(self):
+        """See interfaces.ITaskService"""
+        name = 'remotetasks.' + self.__name__
+        for thread in threading.enumerate():
+            if thread.getName() == name:
+                if thread.running:
+                    return True
+        return False
+
+    def processNext(self):
+        job = self._queue.pull()
+        jobtask = zope.component.getUtility(
+            self.taskInterface, name=job.task)
+        job.started = datetime.datetime.now()
+        try:
+            job.output = jobtask(self, job.id, job.input)
+            job.status = interfaces.COMPLETED
+        except task.TaskError, error:
+            job.error = error
+            job.status = interfaces.ERROR
+        job.completed = datetime.datetime.now()
+
+    def process(self):
+        """See interfaces.ITaskService"""
+        while self._queue:
+            self.processNext()


Property changes on: lovely.remotetask/trunk/src/lovely/remotetask/testing.py
___________________________________________________________________
Name: svn:keywords
   + Id
Name: svn:eol-style
   + native

Modified: lovely.remotetask/trunk/src/lovely/remotetask/tests.py
===================================================================
--- lovely.remotetask/trunk/src/lovely/remotetask/tests.py	2006-09-07 15:52:25 UTC (rev 70034)
+++ lovely.remotetask/trunk/src/lovely/remotetask/tests.py	2006-09-07 21:07:29 UTC (rev 70035)
@@ -29,6 +29,11 @@
                      tearDown=placelesssetup.tearDown,
                      optionflags=doctest.NORMALIZE_WHITESPACE|doctest.ELLIPSIS,
                      ),
+        DocFileSuite('TESTING.txt',
+                     setUp=placelesssetup.setUp,
+                     tearDown=placelesssetup.tearDown,
+                     optionflags=doctest.NORMALIZE_WHITESPACE|doctest.ELLIPSIS,
+                     ),
         ))
 
 if __name__ == '__main__':



More information about the Checkins mailing list