[Zope-CVS] CVS: Packages/zasync/client/zasync - client.py:1.6 plugins.py:1.5 zasync.conf:1.4

Gary Poster gary at zope.com
Thu Oct 28 18:22:21 EDT 2004


Update of /cvs-repository/Packages/zasync/client/zasync
In directory cvs.zope.org:/tmp/cvs-serv21659/client/zasync

Modified Files:
	client.py plugins.py zasync.conf 
Log Message:
Add new "aggregate" plugin.  Include exception info in Conflict Error logs.  



=== Packages/zasync/client/zasync/client.py 1.5 => 1.6 ===
--- Packages/zasync/client/zasync/client.py:1.5	Tue Oct 26 10:35:34 2004
+++ Packages/zasync/client/zasync/client.py	Thu Oct 28 18:22:20 2004
@@ -186,7 +186,8 @@
             get_transaction().commit()
         except ConflictError:
             logging.getLogger('zasync').info(
-                'Received ConflictError while trying to retry tool; retrying.')
+                'Received ConflictError while trying to retry tool; retrying.', 
+                exc_info=True)
             get_transaction().abort()
             reactor.callLater(delay, retryTool, path, delay)
         except ClientDisconnected:
@@ -233,7 +234,7 @@
 
 def timeoutErrback(deferred):
     if not deferred.called:
-        deferred.errback(defer.TimeoutError())
+        deferred.errback(defer.TimeoutError('Timed out.'))
 
 def cancelDelayedCall(value, call):
     if call.active():
@@ -298,7 +299,8 @@
                 return
             except ConflictError:
                 get_transaction().abort()
-                log.warning('ZODB ConflictError in pollZope.  Never give up.')
+                log.warning('ZODB ConflictError in pollZope.  Never give up.', 
+                            exc_info=True)
                 reactor.callLater(1, pollZope, path) # never give up
                 return
             except (KeyboardInterrupt, SystemExit):
@@ -314,7 +316,72 @@
         if application is not None:
             application._p_jar.close()
 
-def makeCall(path, zopeDeferredId, name, args, kwargs, count=0):
+def returnResult(value, path, zopeDeferredId, error=False, count=0):
+    global active, app
+    if isinstance(value, failure.Failure):
+        value.cleanFailure()
+    log = logging.getLogger('zasync')
+    log.debug('returnResult got value for %s (%s):\n\n%r', 
+              zopeDeferredId, path, value)
+    try:
+        del active[zopeDeferredId]
+    except KeyError:
+        pass
+    application = None
+    try:
+        if is_connected():
+            try:
+                application = app()
+                try:
+                    tool = getRequestApp(application).unrestrictedTraverse(path)
+                except (AttributeError, LookupError):
+                    scheduleToolRetry(
+                        path, returnResult, value, path, zopeDeferredId, 
+                        error, count)
+                    return value
+                zopeDeferred = tool.getDeferred(zopeDeferredId)
+                if zopeDeferred is None:
+                    return value
+                if error:
+                    call = zopeDeferred.errback
+                else:
+                    call = zopeDeferred.callback
+                res = call(value)
+                get_transaction().commit()
+            except ConflictError:
+                log.warning('ZODB ConflictError in returnResult', exc_info=True)
+                get_transaction().abort()
+                if count < max_conflict_resolution_attempts:
+                    reactor.callLater(
+                        count, returnResult, value, path, zopeDeferredId, 
+                        error, count+1)
+                else:
+                    res = failure.Failure()
+                    out = StringIO.StringIO()
+                    res.printDetailedTraceback(out)
+                    log.error(
+                        'Too many ConflictErrors in returnResult: '
+                        'giving up.\n\n%s', out.getvalue())
+            except ClientDisconnected:
+                scheduleServerRetry(
+                    returnResult, value, path, zopeDeferredId, 
+                    error, count)
+                res = value
+            except (KeyboardInterrupt, SystemExit):
+                raise
+            except: # give up.  Looks like a bug.  Log should help fix it.
+                res = logException('Exception from Zope.', log=log)
+            return res
+        else:
+            scheduleServerRetry(
+                returnResult, value, path, zopeDeferredId, error, count)
+    finally:
+        if application is not None:
+            application._p_jar.close()
+    return res
+
+def makeCall(path, zopeDeferredId, name, args, kwargs, 
+             returnResult=returnResult, count=0):
     # zopeDeferredId is an id of a deferred in the asynchronous tool
     global plugins
     log = logging.getLogger('zasync')
@@ -336,12 +403,12 @@
                     res = call(*args, **kwargs)
                 get_transaction().commit() # just in case plugin touched Zope
             except ConflictError:
-                log.warning('ZODB ConflictError in makeCall')
+                log.warning('ZODB ConflictError in makeCall', exc_info=True)
                 get_transaction().abort()
                 if count < max_conflict_resolution_attempts:
                     reactor.callLater(
                         count, makeCall, path, zopeDeferredId, 
-                        name, args, kwargs, count+1)
+                        name, args, kwargs, returnResult, count+1)
                     return
                 else:
                     res = failure.Failure()
@@ -353,7 +420,8 @@
                     # now we use that res (Failure)
             except ClientDisconnected:
                 scheduleServerRetry(
-                    makeCall, path, zopeDeferredId, name, args, kwargs, count)
+                    makeCall, path, zopeDeferredId, name, args, kwargs, 
+                    returnResult, count)
                 return
             except (KeyboardInterrupt, SystemExit):
                 raise
@@ -362,17 +430,20 @@
                 res = failure.Failure()
         if isinstance(res, defer.Deferred):
             reactor.callLater(
-                0, prepareDeferredCall, path, zopeDeferredId, res, call_info)
+                0, prepareDeferredCall, path, zopeDeferredId, res, call_info, 
+                returnResult)
         elif isinstance(res, failure.Failure):
             res.cleanFailure()
-            reactor.callLater(1, returnResult, res, path, zopeDeferredId, True)
+            reactor.callLater(0, returnResult, res, path, zopeDeferredId, True)
         else:
-            reactor.callLater(1, returnResult, res, path, zopeDeferredId)
+            reactor.callLater(0, returnResult, res, path, zopeDeferredId)
     else:
         scheduleServerRetry(
-            makeCall, path, zopeDeferredId, name, args, kwargs, count)
+            makeCall, path, zopeDeferredId, name, args, kwargs, 
+            returnResult, count)
 
-def prepareDeferredCall(path, zopeDeferredId, deferred, call_info, count=0):
+def prepareDeferredCall(path, zopeDeferredId, deferred, call_info, 
+                        returnResult, count=0):
     log = logging.getLogger('zasync')
     log.debug('prepareDeferredCall called for %s (%s)',
         zopeDeferredId, path)
@@ -386,7 +457,7 @@
                 except (AttributeError, LookupError):
                     scheduleToolRetry(
                         path, prepareDeferredCall, path, zopeDeferredId, 
-                        deferred, call_info, count)
+                        deferred, call_info, returnResult, count)
                     return
                 zopeDeferred = tool.getDeferred(zopeDeferredId)
                 if zopeDeferred is None:
@@ -398,12 +469,13 @@
                 remainingSeconds = zopeDeferred.remainingSeconds()
                 get_transaction().commit()
             except ConflictError:
-                log.warning('ZODB ConflictError in prepareDeferredCall')
+                log.warning('ZODB ConflictError in prepareDeferredCall', 
+                            exc_info=True)
                 get_transaction().abort()
                 if count < max_conflict_resolution_attempts:
                     reactor.callLater(
                         count, prepareDeferredCall, zopeDeferredId, deferred, 
-                        call_info, count+1)
+                        call_info, returnResult, count+1)
                 else:
                     res = failure.Failure()
                     out = StringIO.StringIO()
@@ -414,7 +486,7 @@
             except ClientDisconnected:
                 scheduleServerRetry(
                     prepareDeferredCall, path, zopeDeferredId, deferred, 
-                    call_info, count)
+                    call_info, returnResult, count)
             except (KeyboardInterrupt, SystemExit):
                 raise
             except: # give up.  Looks like a bug.  Log should help fix it.
@@ -434,74 +506,11 @@
                     errbackArgs=(path, zopeDeferredId, True))
         else:
             scheduleServerRetry(
-                makeCall, path, zopeDeferredId, name, args, kwargs, count)
+                prepareDeferredCall, path, zopeDeferredId, deferred, call_info,
+                returnResult, count)
     finally:
         if application is not None:
             application._p_jar.close()
-
-def returnResult(value, path, zopeDeferredId, error=False, count=0):
-    global active, app
-    if isinstance(value, failure.Failure):
-        value.cleanFailure()
-    log = logging.getLogger('zasync')
-    log.debug('returnResult got value for %s (%s):\n\n%r', 
-              zopeDeferredId, path, value)
-    try:
-        del active[zopeDeferredId]
-    except KeyError:
-        pass
-    application = None
-    try:
-        if is_connected():
-            try:
-                application = app()
-                try:
-                    tool = getRequestApp(application).unrestrictedTraverse(path)
-                except (AttributeError, LookupError):
-                    scheduleToolRetry(
-                        path, returnResult, value, path, zopeDeferredId, 
-                        error, count)
-                    return value
-                zopeDeferred = tool.getDeferred(zopeDeferredId)
-                if zopeDeferred is None:
-                    return value
-                if error:
-                    call = zopeDeferred.errback
-                else:
-                    call = zopeDeferred.callback
-                res = call(value)
-                get_transaction().commit()
-            except ConflictError:
-                log.warning('ZODB ConflictError in returnResult')
-                get_transaction().abort()
-                if count < max_conflict_resolution_attempts:
-                    reactor.callLater(
-                        count, returnResult, value, path, zopeDeferredId, 
-                        error, count+1)
-                else:
-                    res = failure.Failure()
-                    out = StringIO.StringIO()
-                    res.printDetailedTraceback(out)
-                    log.error(
-                        'Too many ConflictErrors in returnResult: '
-                        'giving up.\n\n%s', out.getvalue())
-            except ClientDisconnected:
-                scheduleServerRetry(
-                    returnResult, value, path, zopeDeferredId, 
-                    error, count)
-                res = value
-            except (KeyboardInterrupt, SystemExit):
-                raise
-            except: # give up.  Looks like a bug.  Log should help fix it.
-                res = logException('Exception from Zope.', log=log)
-            return res
-        else:
-            scheduleServerRetry(
-                returnResult, value, path, zopeDeferredId, error, count)
-    finally:
-        if application is not None:
-            application._p_jar.close()
-    return res
 
 def run(path=None):
     # to be called after config.initialize


=== Packages/zasync/client/zasync/plugins.py 1.4 => 1.5 ===
--- Packages/zasync/client/zasync/plugins.py:1.4	Tue Oct 26 10:35:34 2004
+++ Packages/zasync/client/zasync/plugins.py	Thu Oct 28 18:22:20 2004
@@ -16,20 +16,86 @@
 """
 
 import logging
+from twisted.python import failure
 from twisted.internet import reactor, defer
+from zasync import client
 
 #### simple schedule
 
 def schedule(seconds):
     """proof of concept and "Hello World"; use to fire your callbacks after 
-    seconds (approximately).  zope_exec (below) is better for potentially 
+    seconds (approximately).  zope_exec is better for potentially 
     expensive tasks because it is cancellable (can time out) and you can guage
     better if the expensive task has started.  You might schedule a zope_exec 
     within a schedule callback, if you needed a scheduled expensive task."""
     d = defer.Deferred()
-    reactor.callLater(seconds, d, seconds)
+    reactor.callLater(seconds, d.callback, seconds)
     return d
 
+#### aggegate plugins
+
+def aggregatePlugins(zopeDeferredTuple, *calls):
+    """aggregate calls to other plugins.  each call may either be a number
+    (integer or float) of seconds to pause, or a tuple of (plugin name, args 
+    tuple, kwargs dict).  For example, 
+    
+    aggregatePlugins(
+        ('zope_exec', ('/my_site', 'home/my_script'), {}),
+        5,
+        ('zope_exec', ('/my_site', 'home/my_other_script'), {}))
+    
+    would ask zope_exec to call my_script in one transaction, wait 5 seconds,
+    and then call my_other_script.
+    
+    If any of the plugins fail, the failure is returned without proceeding 
+    further down the remaining calls.  The failure is annotated with a list of
+    the completed calls ('completed_calls'), the call active during the failure
+    ('active_call'), and remaining calls('remaining_calls').
+    """
+    thunkmaker = AggregateThunkMaker(zopeDeferredTuple, *calls)
+    reactor.callLater(0, thunkmaker.makeCall)
+    return thunkmaker.deferred
+
+class AggregateThunkMaker(object):
+    
+    def __init__(self, zopeDeferredTuple, *calls):
+        self.deferred = defer.Deferred()
+        self.pending = list(calls)
+        self.completed = []
+        self.begun = None
+        self.results = []
+        self.path, self.zopeDeferredId = zopeDeferredTuple
+
+    def makeCall(self):
+        if not self.pending:
+            self.deferred.callback(self.results)
+        else:
+            if self.begun is not None:
+                self.completed.append(self.begun)
+            task = self.pending.pop(0)
+            self.begun = task
+            if isinstance(task, (int, float)):
+                reactor.callLater(task, self.makeCall)
+            else:
+                try:
+                    plugin, args, kwargs = task
+                except (ValueError, TypeError):
+                    self.deferred.errback(failure.Failure())
+                else:
+                    client.makeCall(
+                        self.path, self.zopeDeferredId, plugin, args, kwargs, 
+                        self.returnResult)
+            
+    def returnResult(self, value, path, zopeDeferredId, error=False):
+        if error:
+            value.completed_calls = tuple(self.completed)
+            value.active_call = self.begun
+            value.remaining_calls = tuple(self.pending)
+            self.deferred.errback(value)
+        else:
+            self.results.append(value)
+            reactor.callLater(0, self.makeCall)
+
 #### LDAP, protected (with SSL) and unprotected
 
 try:
@@ -130,7 +196,6 @@
 #### Zope Exec
 
 import Queue, thread, time, sys, StringIO
-from twisted.python import failure
 from ZODB.POSException import ConflictError
 from ZEO.Exceptions import ClientDisconnected
 from Acquisition import aq_parent, aq_inner
@@ -139,7 +204,6 @@
 
 from Products.zasync.manager import Expression
 from Products.zasync.bucketqueue import BucketQueue
-from zasync import client
 
 MAXTHREADPOOL = 5
 
@@ -515,7 +579,7 @@
                     except ConflictError:
                         logger.debug(
                             'zope_exec: worker %s got conflict error', 
-                            thread_id)
+                            thread_id, exc_info=True)
                         get_transaction().abort() 
                         if attempt < max_conflict_resolution_attempts:
                             time.sleep(attempt + 1) # XXX better idea?


=== Packages/zasync/client/zasync/zasync.conf 1.3 => 1.4 ===
--- Packages/zasync/client/zasync/zasync.conf:1.3	Wed Oct 27 21:31:27 2004
+++ Packages/zasync/client/zasync/zasync.conf	Thu Oct 28 18:22:20 2004
@@ -274,6 +274,15 @@
   # retry yes
 </plugin>
 
+<plugin aggregate>
+  handler zasync.plugins.aggregatePlugins
+  # 14400 seconds is four hours
+  timeout 14400
+  zope-aware yes
+  # description Aggregate calls to other plugins
+  retry no
+</plugin>
+
 ##############################################################################
 # configure the loggers
 ##############################################################################



More information about the Zope-CVS mailing list