[Checkins] SVN: lovely.remotetask/trunk/src/lovely/remotetask/ Add
stub testing component
Roger Ineichen
roger at projekt01.ch
Thu Sep 7 17:07:30 EDT 2006
Log message for revision 70035:
Add stub testing component
Improve implementation during transcoding implementation
Changed:
A lovely.remotetask/trunk/src/lovely/remotetask/TESTING.txt
U lovely.remotetask/trunk/src/lovely/remotetask/job.py
U lovely.remotetask/trunk/src/lovely/remotetask/service.py
A lovely.remotetask/trunk/src/lovely/remotetask/testing.py
U lovely.remotetask/trunk/src/lovely/remotetask/tests.py
-=-
Added: lovely.remotetask/trunk/src/lovely/remotetask/TESTING.txt
===================================================================
--- lovely.remotetask/trunk/src/lovely/remotetask/TESTING.txt 2006-09-07 15:52:25 UTC (rev 70034)
+++ lovely.remotetask/trunk/src/lovely/remotetask/TESTING.txt 2006-09-07 21:07:29 UTC (rev 70035)
@@ -0,0 +1,131 @@
+=====================
+Remote Task Execution
+=====================
+
+This package provides an implementation of a remote task execution Web service
+that allows to execute pre-defined tasks on another server. See more info
+about the TaskService in README.txt. This test will test the TaskServiceStub
+implementation. The only different is, that the TaskServiceStub will handle
+task implementation providing ITaskStub interfaces rather then ITask. This way
+we can register stub tasks for a testing setup. See also another usecase for
+a task service stub implementation which is working with XML-RPC in the
+package lovely.transcoding.
+
+Let's now start by creating a task service stub:
+
+ >>> from lovely import remotetask
+ >>> from lovely.remotetask import testing
+ >>> service = testing.TaskServiceStub()
+
+We can discover the available tasks:
+
+ >>> service.getAvailableTasks()
+ {}
+
+This list is initially empty, because we have not registered any tasks. Let's
+now define a task that simply echos an input string:
+
+ >>> def echo(input):
+ ... return input
+
+ >>> import lovely.remotetask.task
+ >>> echoTask = remotetask.task.SimpleTask(echo)
+
+The only API requirement on the converter is to be callable. Now we make sure
+that the task works:
+
+ >>> echoTask(service, 1, input={'foo': 'blah'})
+ {'foo': 'blah'}
+
+Let's now register the task as a utility. Note that we need to register the
+echo utility used in the REAME.txt tests for the ITaskStub interface:
+
+ >>> import zope.component
+ >>> zope.component.provideUtility(echoTask, provides=testing.ITaskStub,
+ ... name='echo')
+
+The echo task is now available in the service:
+
+ >>> service.getAvailableTasks()
+ {u'echo': <SimpleTask <function echo ...>>}
+
+
+Since the service cannot instantaneously complete a task, incoming jobs are
+managed by a queue. First we request the echo task to be executed:
+
+ >>> jobid = service.add(u'echo', {'foo': 'bar'})
+ >>> jobid
+ 1
+
+Let's also see wat's happen if we add a non existent task:
+
+ >>> service.add(u'undefined', {'foo': 'bar'})
+ Traceback (most recent call last):
+ ...
+ ValueError: Task does not exist
+
+The ``add()`` function schedules the task called "echo" to be executed with
+the specified arguments. The method returns a job id with which we can inquire
+about the job.
+
+ >>> service.getStatus(jobid)
+ 'queued'
+
+Since the job has not been processed, the status is set to "queued". Further,
+there is no result available yet:
+
+ >>> service.getResult(jobid) is None
+ True
+
+As long as the job is not being processed, it can be cancelled:
+
+ >>> service.cancel(jobid)
+ >>> service.getStatus(jobid)
+ 'cancelled'
+
+Let's also see wat's happen if we cancel a non existent task:
+
+ >>> service.cancel(u'undefined')
+
+Let's now readd a job:
+
+ >>> jobid = service.add(u'echo', {'foo': 'bar'})
+
+The jobs in the queue are processed by calling the service's ``process()``
+method:
+
+ >>> service.process()
+
+This method is usually called by other application logic, but we have to call
+it manually here, since none of the other infrastructure is setup.
+
+ >>> service.getStatus(jobid)
+ 'completed'
+ >>> service.getResult(jobid)
+ {'foo': 'bar'}
+
+Now, let's define a new task that causes an error:
+
+ >>> def error(input):
+ ... raise remotetask.task.TaskError('An error occurred.')
+
+ >>> zope.component.provideUtility(
+ ... remotetask.task.SimpleTask(error), provides=testing.ITaskStub,
+ ... name='error')
+
+Now add and execute it:
+
+ >>> jobid = service.add(u'error')
+ >>> service.process()
+
+Let's now see what happened:
+
+ >>> service.getStatus(jobid)
+ 'error'
+ >>> service.getError(jobid)
+ 'An error occurred.'
+
+For management purposes, the service also allows you to inspect all jobs:
+
+ >>> dict(service.jobs)
+ {1: <Job 1>, 2: <Job 2>, 3: <Job 3>}
Property changes on: lovely.remotetask/trunk/src/lovely/remotetask/TESTING.txt
___________________________________________________________________
Name: svn:eol-style
+ native
Modified: lovely.remotetask/trunk/src/lovely/remotetask/job.py
===================================================================
--- lovely.remotetask/trunk/src/lovely/remotetask/job.py 2006-09-07 15:52:25 UTC (rev 70034)
+++ lovely.remotetask/trunk/src/lovely/remotetask/job.py 2006-09-07 21:07:29 UTC (rev 70035)
@@ -30,6 +30,7 @@
id = FieldProperty(interfaces.IJob['id'])
task = FieldProperty(interfaces.IJob['task'])
+ status = FieldProperty(interfaces.IJob['status'])
input = FieldProperty(interfaces.IJob['input'])
output = FieldProperty(interfaces.IJob['output'])
error = FieldProperty(interfaces.IJob['error'])
Modified: lovely.remotetask/trunk/src/lovely/remotetask/service.py
===================================================================
--- lovely.remotetask/trunk/src/lovely/remotetask/service.py 2006-09-07 15:52:25 UTC (rev 70034)
+++ lovely.remotetask/trunk/src/lovely/remotetask/service.py 2006-09-07 21:07:29 UTC (rev 70035)
@@ -28,6 +28,8 @@
import zope.publisher.base
import zope.publisher.publish
from BTrees.IOBTree import IOBTree
+from zope.security.proxy import removeSecurityProxy
+from zope.traversing.api import traverse
from zope.app import zapi
from zope.app.container import contained
from zope.app.publication.zopepublication import ZopePublication
@@ -89,6 +91,7 @@
"""See interfaces.ITaskService"""
path = [parent.__name__ for parent in zapi.getParents(self)
if parent.__name__]
+ path.reverse()
path.append(self.__name__)
path.append('processNext')
@@ -138,11 +141,7 @@
"""A custom publication to process the next job."""
def traverseName(self, request, ob, name):
- if hasattr(ob, '__contains__') and name in ob:
- ob = ob[name]
- else:
- ob = getattr(ob, name)
- return ob
+ return traverse(removeSecurityProxy(ob), name, None)
def processor(db, path):
@@ -161,6 +160,6 @@
zope.publisher.publish.publish(request, False)
except IndexError:
time.sleep(1)
- except:
- # This thread should never crash, thus a blank except
- pass
+# except:
+# # This thread should never crash, thus a blank except
+# pass
Added: lovely.remotetask/trunk/src/lovely/remotetask/testing.py
===================================================================
--- lovely.remotetask/trunk/src/lovely/remotetask/testing.py 2006-09-07 15:52:25 UTC (rev 70034)
+++ lovely.remotetask/trunk/src/lovely/remotetask/testing.py 2006-09-07 21:07:29 UTC (rev 70035)
@@ -0,0 +1,161 @@
+##############################################################################
+#
+# Copyright (c) 2006 Lovely Systems and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""Task Service testing tools
+
+$Id$
+"""
+__docformat__ = "reStructuredText"
+
+import datetime
+import threading
+import zc.queue
+import zope.component
+import zope.interface
+from BTrees.IOBTree import IOBTree
+from zope.app import zapi
+from zope.app.container import contained
+from lovely.remotetask import interfaces, job, task
+
+
+###############################################################################
+#
+# Stub implementations (Note: use the ITaskStub interface for this service)
+#
+###############################################################################
+
+class QueueStub(object):
+
+ zope.interface.implements(zc.queue.interfaces.IQueue)
+
+ def __init__(self):
+ self._data = ()
+
+ def pull(self, index=0):
+ if index < 0:
+ len_self = len(self._data)
+ index += len_self
+ if index < 0:
+ raise IndexError(index-len_self)
+ res = self._data[index]
+ self._data = self._data[:index] + self._data[index+1:]
+ return res
+
+ def put(self, item):
+ self._data += (item,)
+
+ def __len__(self):
+ return len(self._data)
+
+ def __iter__(self):
+ return iter(self._data)
+
+ def __getitem__(self, index):
+ return self._data[index] # works with passing a slice too
+
+ def __nonzero__(self):
+ return bool(self._data)
+
+
+class ITaskStub(interfaces.ITask):
+ """Task stub interface for stub tasks."""
+
+
+class TaskServiceStub(contained.Contained):
+ """A task service stub.
+
+ The available tasks for this service are managed as stub utilities.
+ This task service stub could be helpful if you need to use a different
+ testing setup. If so, register your own testing ITaskStub in ftesting.zcml.
+ """
+ zope.interface.implements(interfaces.ITaskService)
+
+ # NOTE: we use ITaskStub instead of ITask
+ taskInterface = ITaskStub
+
+ def __init__(self):
+ super(TaskServiceStub, self).__init__()
+ self._counter = 1
+ self.jobs = IOBTree()
+ self._queue = QueueStub()
+
+ def getAvailableTasks(self):
+ """See interfaces.ITaskService"""
+ return dict(zope.component.getUtilitiesFor(self.taskInterface))
+
+ def add(self, task, input=None):
+ """See interfaces.ITaskService"""
+ if task not in self.getAvailableTasks():
+ raise ValueError('Task does not exist')
+ jobid = self._counter
+ self._counter += 1
+ newjob = job.Job(jobid, task, input)
+ self.jobs[jobid] = newjob
+ self._queue.put(newjob)
+ newjob.status = interfaces.QUEUED
+ return jobid
+
+ def cancel(self, jobid):
+ """See interfaces.ITaskService"""
+ for idx, job in enumerate(self._queue):
+ if job.id == jobid:
+ job.status = interfaces.CANCELLED
+ self._queue.pull(idx)
+ break
+
+ def getStatus(self, jobid):
+ """See interfaces.ITaskService"""
+ return self.jobs[jobid].status
+
+ def getResult(self, jobid):
+ """See interfaces.ITaskService"""
+ return self.jobs[jobid].output
+
+ def getError(self, jobid):
+ """See interfaces.ITaskService"""
+ return str(self.jobs[jobid].error)
+
+ def startProcessing(self):
+ """See interfaces.ITaskService"""
+ raise NotImplementedError("Stub doesn't startProcessing")
+
+ def stopProcessing(self):
+ """See interfaces.ITaskService"""
+ raise NotImplementedError("Stub doesn't stopProcessing")
+
+ def isProcessing(self):
+ """See interfaces.ITaskService"""
+ name = 'remotetasks.' + self.__name__
+ for thread in threading.enumerate():
+ if thread.getName() == name:
+ if thread.running:
+ return True
+ return False
+
+ def processNext(self):
+ job = self._queue.pull()
+ jobtask = zope.component.getUtility(
+ self.taskInterface, name=job.task)
+ job.started = datetime.datetime.now()
+ try:
+ job.output = jobtask(self, job.id, job.input)
+ job.status = interfaces.COMPLETED
+ except task.TaskError, error:
+ job.error = error
+ job.status = interfaces.ERROR
+ job.completed = datetime.datetime.now()
+
+ def process(self):
+ """See interfaces.ITaskService"""
+ while self._queue:
+ self.processNext()
Property changes on: lovely.remotetask/trunk/src/lovely/remotetask/testing.py
___________________________________________________________________
Name: svn:keywords
+ Id
Name: svn:eol-style
+ native
Modified: lovely.remotetask/trunk/src/lovely/remotetask/tests.py
===================================================================
--- lovely.remotetask/trunk/src/lovely/remotetask/tests.py 2006-09-07 15:52:25 UTC (rev 70034)
+++ lovely.remotetask/trunk/src/lovely/remotetask/tests.py 2006-09-07 21:07:29 UTC (rev 70035)
@@ -29,6 +29,11 @@
tearDown=placelesssetup.tearDown,
optionflags=doctest.NORMALIZE_WHITESPACE|doctest.ELLIPSIS,
),
+ DocFileSuite('TESTING.txt',
+ setUp=placelesssetup.setUp,
+ tearDown=placelesssetup.tearDown,
+ optionflags=doctest.NORMALIZE_WHITESPACE|doctest.ELLIPSIS,
+ ),
))
if __name__ == '__main__':
More information about the Checkins
mailing list