[Checkins] SVN: zc.twist/trunk/ add ClientDisconnected retries and a configurable number of ConflictError retries; prepare for 1.3

Gary Poster gary at zope.com
Thu Jun 19 17:24:27 EDT 2008


Log message for revision 87566:
  add ClientDisconnected retries and a configurable number of ConflictError retries; prepare for 1.3

Changed:
  U   zc.twist/trunk/CHANGES.txt
  U   zc.twist/trunk/buildout.cfg
  U   zc.twist/trunk/setup.py
  U   zc.twist/trunk/src/zc/twist/README.txt
  U   zc.twist/trunk/src/zc/twist/__init__.py

-=-
Modified: zc.twist/trunk/CHANGES.txt
===================================================================
--- zc.twist/trunk/CHANGES.txt	2008-06-19 21:01:21 UTC (rev 87565)
+++ zc.twist/trunk/CHANGES.txt	2008-06-19 21:24:26 UTC (rev 87566)
@@ -1,14 +1,22 @@
-1.2 (2008-4-9)
---------------
+1.3 (2008-06-19)
+----------------
 
+* Handle ZEO.Exceptions.ClientDisconnected errors: retry forever, with a
+  backoff, defaulting to a max backoff of 60 seconds.
+
+* Make number of times that ConflictErrors are retried configurable.
+
+1.2 (2008-04-09)
+----------------
+
 * New subclass of twisted.python.failure.Failure begins with only reprs,
   and it pickles to exclude the stack, exclude the global vars in the frames,
   and truncate the reprs of the local vars in the frames.  The goal is to
   keep the pickle size of Failures down to a manageable size.  ``sanitize``
   now uses this class.
 
-1.1 (2008-3-27)
----------------
+1.1 (2008-03-27)
+----------------
 
 * Now depends on twisted 8.0.1 or higher, which is newly setuptools
   compatible.  The twisted build is a little frightening, at least with
@@ -20,14 +28,14 @@
 
 * C extension uses older comment style and has less cruft.
 
-1.0.1 (2008-3-14)
------------------
+1.0.1 (2008-03-14)
+------------------
 
 * Bugfix: if you passed a slot method like a BTree.__setitem__, bad things
   would happen.
 
-1.0.0 (2008-3-13)
------------------
+1.0.0 (2008-03-13)
+------------------
 
 * Add ability to specify an alternate reactor
 

Modified: zc.twist/trunk/buildout.cfg
===================================================================
--- zc.twist/trunk/buildout.cfg	2008-06-19 21:01:21 UTC (rev 87565)
+++ zc.twist/trunk/buildout.cfg	2008-06-19 21:24:26 UTC (rev 87566)
@@ -19,4 +19,5 @@
 [interpreter]
 recipe = zc.recipe.egg
 eggs = zc.twist
+       docutils
 interpreter = py

Modified: zc.twist/trunk/setup.py
===================================================================
--- zc.twist/trunk/setup.py	2008-06-19 21:01:21 UTC (rev 87565)
+++ zc.twist/trunk/setup.py	2008-06-19 21:24:26 UTC (rev 87566)
@@ -2,10 +2,61 @@
 from setuptools.extension import Extension
 from setuptools import setup, find_packages
 
-long_description = (open("src/zc/twist/README.txt").read() +
-                    '\n\n=======\nChanges\n=======\n\n' +
-                    open("CHANGES.txt").read())
+# generic helpers primarily for the long_description
+try:
+    import docutils
+except ImportError:
+    def validateReST(text):
+        return ''
+else:
+    import docutils.utils
+    import docutils.parsers.rst
+    import StringIO
+    def validateReST(text):
+        doc = docutils.utils.new_document('validator')
+        # our desired settings
+        doc.reporter.halt_level = 5
+        doc.reporter.report_level = 1
+        stream = doc.reporter.stream = StringIO.StringIO()
+        # docutils buglets (?)
+        doc.settings.tab_width = 2
+        doc.settings.pep_references = doc.settings.rfc_references = False
+        doc.settings.trim_footnote_reference_space = None
+        # and we're off...
+        parser = docutils.parsers.rst.Parser()
+        parser.parse(text, doc)
+        return stream.getvalue()
 
+def text(*args, **kwargs):
+    # note: distutils explicitly disallows unicode for setup values :-/
+    # http://docs.python.org/dist/meta-data.html
+    tmp = []
+    for a in args:
+        if a.endswith('.txt'):
+            f = open(os.path.join(*a.split('/')))
+            tmp.append(f.read())
+            f.close()
+            tmp.append('\n\n')
+        else:
+            tmp.append(a)
+    if len(tmp) == 1:
+        res = tmp[0]
+    else:
+        res = ''.join(tmp)
+    out = kwargs.get('out')
+    if out is True:
+        out = 'TEST_THIS_REST_BEFORE_REGISTERING.txt'
+    if out:
+        f = open(out, 'w')
+        f.write(res)
+        f.close()
+        report = validateReST(res)
+        if report:
+            print report
+            raise ValueError('ReST validation error')
+    return res
+# end helpers; below this line should be code custom to this package
+
 methodwrapper = Extension(
     name='zc.twist._methodwrapper',
     sources=[os.path.join('src','zc','twist','_methodwrapper.c')],
@@ -13,14 +64,18 @@
 
 setup(
     name='zc.twist',
-    version='1.2',
+    version='1.3',
     packages=find_packages('src'),
+    url='http://pypi.python.org/pypi/zc.twist',
     package_dir={'':'src'},
     zip_safe=False,
     author='Zope Project',
     author_email='zope-dev at zope.org',
     description=open('README.txt').read(),
-    long_description=long_description,
+    long_description=text(
+        "src/zc/twist/README.txt",
+        '=======\nChanges\n=======\n\n',
+        "CHANGES.txt"),
     license='ZPL',
     install_requires=[
         'ZODB3',

Modified: zc.twist/trunk/src/zc/twist/README.txt
===================================================================
--- zc.twist/trunk/src/zc/twist/README.txt	2008-06-19 21:01:21 UTC (rev 87565)
+++ zc.twist/trunk/src/zc/twist/README.txt	2008-06-19 21:24:26 UTC (rev 87566)
@@ -61,7 +61,7 @@
     >>> demo.count # ah-ha!
     1
 
-We can use the deferred returned from the call to do somethin with the
+We can use the deferred returned from the call to do something with the
 return value.  In this case, the deferred is already completed, so
 adding a callback gets instant execution.
 
@@ -139,7 +139,7 @@
     >>> demo.count = demo2.count = 0 # cleanup
     >>> transaction.commit()
 
-ConflictErrors make it retry.  
+ConflictErrors make it retry.
 
 In order to have a chance to simulate a ConflictError, this time imagine
 we have a runner that can switch execution from the call to our code
@@ -169,7 +169,7 @@
     >>> demo.count
     6
 
-After five retries (currently hard-coded), the retry fails, raising the
+By default, after five ConflictError retries, the partial fails, raising the
 last ConflictError.  This is returned to the deferred.  The failure put
 on the deferred will have a sanitized traceback.  Here, imagine we have
 a deferred (named `deferred`) created from such a an event
@@ -186,9 +186,17 @@
     ...
     ZODB.POSException.ConflictError: database conflict error...
 
-Other errors are returned to the deferred as well, as sanitized failures
-[#use_original_demo]_.
+You can control how many ConflictError (and other transaction error) retries
+should be performed by setting the ``max_transaction_errors`` attribute
+[#max_transaction_errors]_.
 
+ZEO ClientDisconnected errors are always retried, with a backoff that, by
+default begins at 5 seconds and is never greater than 60 seconds
+[#relies_on_twisted_reactor]_ [#use_original_demo]_ [#client_disconnected]_.
+
+Other errors are returned to the deferred, like a transaction error that has
+exceeded its available retries, as sanitized failures.
+
     >>> call = Partial(demo)
     >>> d = call('I do not add well with integers')
     >>> d = d.addErrback(get_result)
@@ -200,13 +208,12 @@
 The failure is sanitized in that the traceback is gone and the frame values
 are turned in to reprs.  If you pickle the failure then it truncates the
 reprs to a maximum of 20 characters plus "[...]" to indicate the
-truncation[#show_sanitation]_.
+truncation [#show_sanitation]_.
 
 The call tries to be a good connection citizen, waiting for a connection
 if the pool is at its maximum size.  This code relies on the twisted
 reactor; we'll use a `time_flies` function, which takes seconds to move
-ahead, to simulate time passing in the reactor
-[#relies_on_twisted_reactor]_.
+ahead, to simulate time passing in the reactor.
 
     >>> db.setPoolSize(1)
     >>> db.getPoolSize()
@@ -254,7 +261,7 @@
     >>> call.attempt_count
     0
     >>> res # None
-    >>> time_flies(1.9) >= 2 # for a total of at least 3
+    >>> time_flies(2.0) >= 2 # for a total of at least 3
     True
     >>> res
     2
@@ -316,6 +323,7 @@
     >>> class Demo(persistent.Persistent):
     ...     count = 0
     ...     def __call__(self, amount=1):
+    ...         self._p_deactivate() # to be able to trigger ClientDisconnected
     ...         self.count += amount
     ...         return self.count
     ...     def decrement(self, amount=1):
@@ -390,11 +398,10 @@
     ...         assert _thread.locked()
     ...         _thread.release()
     ...         _main.acquire()
-    ...     def resume(self, retry=True):
+    ...     def resume(self):
     ...         while self.running:
     ...             self.retry()
-    ...         while self.thread.isAlive():
-    ...             pass
+    ...         self.thread.join()
     ...         assert not _thread.locked()
     ...         assert _main.locked()
     ...         _main.release()
@@ -406,36 +413,306 @@
     >>> runner = Runner(call)
     >>> for i in range(5):
     ...     demo.count = i
-    ...     transaction.commit()
+    ...     transaction.commit() # creates a write conflict
     ...     runner.retry()
     ...
-    >>> runner.resume(retry=False)
     >>> demo.count
     4
+
+    When we resume without a conflict error, it is too late: the result is a
+    ConflictError.  The ConflictError is actually shown in the main text.
+
+    >>> runner.resume()
+    >>> demo.count
+    4
     >>> call.attempt_count
     5
     >>> deferred = runner.result
 
+.. [#max_transaction_errors] As the main text mentions, the
+   ``max_transaction_errors`` attribute lets you set how many conflict errors
+   should be retried.
+
+    >>> call = Partial(demo)
+    >>> call.max_transaction_errors
+    5
+    >>> call.max_transaction_errors = 10
+    >>> call.max_transaction_errors
+    10
+    >>> runner = Runner(call)
+    >>> for i in range(10):
+    ...     demo.count = i
+    ...     transaction.commit()
+    ...     runner.retry()
+    ...
+
+   When we resume without a conflict error, it is too late: the result is a
+   ConflictError.
+
+    >>> runner.resume()
+    >>> demo.count
+    9
+    >>> call.attempt_count
+    10
+    >>> deferred = runner.result
+
+    >>> res = None
+    >>> def get_result(r):
+    ...     global res
+    ...     res = r # we return None to quiet Twisted down on the command line
+    ...
+    >>> d = deferred.addErrback(get_result)
+    >>> print res.getTraceback() # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
+    Traceback (most recent call last):
+    ...
+    ZODB.POSException.ConflictError: database conflict error...
+
+   Setting ``None`` means to retry conflict errors forever.  For our example,
+   we will arbitrarily choose 50 iterations to show "forever".
+
+    >>> call = Partial(demo)
+    >>> call.max_transaction_errors
+    5
+    >>> call.max_transaction_errors = None
+    >>> print call.max_transaction_errors
+    None
+    >>> runner = Runner(call)
+    >>> for i in range(50):
+    ...     demo.count = i
+    ...     transaction.commit()
+    ...     runner.retry()
+    ...
+
+   Now when we resume without a conflict error, we get a successful result: it
+   never gave up.
+
+    >>> runner.resume()
+    >>> runner.result # doctest: +ELLIPSIS
+    <Deferred at ...  current result: 50>
+    >>> _ = transaction.begin() # we need to sync to get changes
+    >>> demo.count # notice, 49 + 1
+    50
+
+.. [#relies_on_twisted_reactor] We monkeypatch twisted.internet.reactor
+    (and revert it in another footnote below).
+
+    >>> import twisted.internet.reactor
+    >>> oldCallLater = twisted.internet.reactor.callLater
+    >>> import bisect
+    >>> class FauxReactor(object):
+    ...     def __init__(self):
+    ...         self.time = 0
+    ...         self.calls = []
+    ...     def callLater(self, delay, callable, *args, **kw):
+    ...         res = (delay + self.time, callable, args, kw)
+    ...         bisect.insort(self.calls, res)
+    ...         # normally we're supposed to return something but not needed
+    ...     def time_flies(self, time):
+    ...         end = self.time + time
+    ...         ct = 0
+    ...         while self.calls and self.calls[0][0] <= end:
+    ...             self.time, callable, args, kw = self.calls.pop(0)
+    ...             callable(*args, **kw) # normally this would get try...except
+    ...             ct += 1
+    ...         self.time = end
+    ...         return ct
+    ...
+    >>> faux = FauxReactor()
+    >>> twisted.internet.reactor.callLater = faux.callLater
+    >>> time_flies = faux.time_flies
+
 .. [#use_original_demo] The second demo has too much thread code in it:
     we'll use the old demo for the rest of the discussion.
 
     >>> demo = root['demo']
 
+.. [#client_disconnected] As the main text describes,
+   ZEO.Exceptions.ClientDisconnected errors will always be retried, but with a
+   backoff.
+
+   First we'll mimic a disconnected ZEO at the start of a transaction.
+
+    >>> from ZEO.Exceptions import ClientDisconnected
+    >>> raise_error = [1]
+    >>> storage_class = db._storage.__class__
+    >>> original_load = storage_class.load
+    >>> def load(self, oid, version):
+    ...     if raise_error:
+    ...         raise_error.pop()
+    ...         raise ClientDisconnected()
+    ...     else:
+    ...         return original_load(self, oid, version)
+    ...
+    >>> _ = transaction.begin()
+    >>> demo.count
+    0
+    >>> call = Partial(demo)
+    >>> storage_class.load = load
+
+   We rely on a reactor to implement delayed calls.  We have a fake reactor
+   called ``faux`` for these examples.  It has a list of pending calls, and
+   we can call ``time_flies`` to make time appear to pass.
+
+    >>> len(faux.calls)
+    0
+
+   When we first call the partial, it will fail, and reschedule for later.
+
+    >>> len(faux.calls)
+    0
+    >>> deferred = call()
+    >>> deferred.called
+    0
+    >>> len(faux.calls)
+    1
+
+   The rescheduling is initially for five seconds later, by default.  In this
+   first example, after the first retry, the call will succeed.
+
+    >>> faux.calls[0][0] - faux.time
+    5
+    >>> time_flies(1) # 1 second
+    0
+    >>> deferred.called
+    0
+    >>> time_flies(1) # 1 second
+    0
+    >>> deferred.called
+    0
+    >>> time_flies(1) # 1 second
+    0
+    >>> deferred.called
+    0
+    >>> time_flies(1) # 1 second
+    0
+    >>> deferred.called
+    0
+    >>> time_flies(1) # 1 second
+    1
+    >>> deferred.called
+    True
+    >>> deferred.result
+    1
+    >>> len(faux.calls)
+    0
+
+   By default, the rescheduling backoff increases by five seconds for every
+   retry, to a maximum of a 60 second backoff.
+
+    >>> call = Partial(demo)
+    >>> raise_error.extend([1] * 30)
+    >>> len(faux.calls)
+    0
+    >>> deferred = call()
+    >>> deferred.called
+    0
+    >>> len(faux.calls)
+    1
+    >>> def run(deferred):
+    ...     sleeps = []
+    ...     for i in range(31):
+    ...         if deferred.called:
+    ...             break
+    ...         else:
+    ...             sleep = faux.calls[0][0] - faux.time
+    ...             sleeps.append(sleep)
+    ...             time_flies(sleep)
+    ...     else:
+    ...         print 'oops'
+    ...     return sleeps
+    ...
+    >>> sleeps = run(deferred)
+    >>> deferred.result
+    2
+    >>> len(sleeps)
+    30
+    >>> sleeps # doctest: +ELLIPSIS
+    [5, 10, 15, 20, 25, 30, 35, 40, 45, 50, 55, 60, 60, 60,..., 60, 60, 60, 60]
+
+   The default backoff values can be changed by setting the instance attributes
+   ``initial_backoff``, ``backoff_increment``, and ``max_backoff``.
+
+    >>> call = Partial(demo)
+    >>> call.initial_backoff
+    5
+    >>> call.backoff_increment
+    5
+    >>> call.max_backoff
+    60
+    >>> call.initial_backoff = 4
+    >>> call.backoff_increment = 2
+    >>> call.max_backoff = 21
+    >>> raise_error.extend([1] * 30)
+    >>> len(faux.calls)
+    0
+    >>> deferred = call()
+    >>> deferred.called
+    0
+    >>> len(faux.calls)
+    1
+    >>> sleeps = run(deferred)
+    >>> deferred.result
+    3
+    >>> len(sleeps)
+    30
+    >>> sleeps # doctest: +ELLIPSIS
+    [4, 6, 8, 10, 12, 14, 16, 18, 20, 21, 21, 21, 21, 21, 21,..., 21, 21, 21]
+
+   A ClientDisconnected error can also occur during transaction commit.
+
+    >>> storage_class.load = original_load
+
+    >>> from transaction import TransactionManager
+    >>> old_commit = TransactionManager.commit
+    >>> commit_count = 0
+    >>> error = None
+    >>> max = 2
+    >>> raise_error = [1] # change state to active
+    >>> def new_commit(self):
+    ...     if raise_error:
+    ...         raise_error.pop()
+    ...         raise ClientDisconnected()
+    ...     else:
+    ...         old_commit(self) # changing state to "active" or similar
+    ...
+    >>> TransactionManager.commit = new_commit
+    >>> call = Partial(demo)
+    >>> len(faux.calls)
+    0
+    >>> deferred = call()
+    >>> deferred.called
+    0
+    >>> len(faux.calls)
+    1
+
+    >>> faux.calls[0][0] - faux.time
+    5
+    >>> time_flies(4) # 4 seconds
+    0
+    >>> deferred.called
+    0
+    >>> time_flies(1) # 1 second
+    1
+    >>> deferred.called
+    True
+    >>> deferred.result
+    4
+    >>> len(faux.calls)
+    0
+
+    >>> TransactionManager.commit = old_commit
+    >>> _ = transaction.begin()
+
 .. [#show_sanitation] Before pickling, the failure includes full information
     about before and after the exception was caught, as well as locals and
     globals.  Everything has been repr'd, though, and the traceback object
     removed.
-    
+
     >>> print res.getTraceback() # doctest: +ELLIPSIS
     Traceback (most recent call last):
       File ".../zc/twist/__init__.py", line ..., in __call__
         get_connection(db, reactor=self.getReactor()).addCallback(
-      File ".../twisted/internet/defer.py", line ..., in addCallback
-        callbackKeywords=kw)
-      File ".../twisted/internet/defer.py", line ..., in addCallbacks
-        self._runCallbacks()
-      File ".../twisted/internet/defer.py", line ..., in _runCallbacks
-        self.result = callback(self.result, *args, **kw)
+      File ".../twisted/internet/defer.py", line ..., in addCallback...
     --- <exception caught here> ---
       File ".../zc/twist/__init__.py", line ..., in _call
         res = call(*args, **kwargs)
@@ -459,12 +736,6 @@
       args : '(<Deferred at ...
      ( Globals )...
       Deferred : '<class twisted.internet.defer.Deferred at ...
-    .../twisted/internet/defer.py:...: addCallbacks(...)
-     [ Locals ]...
-     ( Globals )...
-    .../twisted/internet/defer.py:...: _runCallbacks(...)
-     [ Locals ]...
-     ( Globals )...
     --- <exception caught here> ---
     .../zc/twist/__init__.py:...: _call(...)
      [ Locals ]...
@@ -501,55 +772,18 @@
     >>> print pickle.loads(pickle.dumps(res)).getTraceback(detail='verbose')
     ... # doctest: +ELLIPSIS
     *--- Failure #... (pickled) ---
-    /Users/gary/opt/zc.twist/src/zc/twist/__init__.py:232: _call(...)
-     [ Locals ]
-      tm : '<transaction._manager[...]'
-      call : '<zc.twist.README.Demo[...]'
-      d : '<Deferred at ...[...]'
-      kwargs : '{}'
-      self : '<zc.twist.Partial obj[...]'
-      args : "['I do not add well w[...]"
-      conn : '<Connection at ...[...]'
+    /Users/gary/opt/zc.twist/src/zc/twist/__init__.py:...: _call(...)
+     [ Locals...
+      self : '<zc.twist.Partial obj[...]...
      ( Globals )
     <doctest README.txt[...]>:...: __call__(...)
-     [ Locals ]
-      amount : "'I do not add well wi[...]"
-      self : '<zc.twist.README.Demo[...]'
+     [ Locals...
+      amount : "'I do not add well wi[...]...
      ( Globals )
     exceptions.TypeError: unsupported operand type(s) for +=: 'int' and 'str'
     *--- End of Failure #... ---
     <BLANKLINE>
 
-
-
-.. [#relies_on_twisted_reactor] We monkeypatch twisted.internet.reactor
-    (and revert it in another footnote below).
-
-    >>> import twisted.internet.reactor
-    >>> oldCallLater = twisted.internet.reactor.callLater
-    >>> import bisect
-    >>> class FauxReactor(object):
-    ...     def __init__(self):
-    ...         self.time = 0
-    ...         self.calls = []
-    ...     def callLater(self, delay, callable, *args, **kw):
-    ...         res = (delay + self.time, callable, args, kw)
-    ...         bisect.insort(self.calls, res)
-    ...         # normally we're supposed to return something but not needed
-    ...     def time_flies(self, time):
-    ...         end = self.time + time
-    ...         ct = 0
-    ...         while self.calls and self.calls[0][0] <= end:
-    ...             self.time, callable, args, kw = self.calls.pop(0)
-    ...             callable(*args, **kw) # normally this would get try...except
-    ...             ct += 1
-    ...         self.time = end
-    ...         return ct
-    ...
-    >>> faux = FauxReactor()
-    >>> twisted.internet.reactor.callLater = faux.callLater
-    >>> time_flies = faux.time_flies
-
 .. [#teardown_monkeypatch]
 
     >>> twisted.internet.reactor.callLater = oldCallLater

Modified: zc.twist/trunk/src/zc/twist/__init__.py
===================================================================
--- zc.twist/trunk/src/zc/twist/__init__.py	2008-06-19 21:01:21 UTC (rev 87565)
+++ zc.twist/trunk/src/zc/twist/__init__.py	2008-06-19 21:24:26 UTC (rev 87566)
@@ -2,6 +2,7 @@
 
 import ZODB.interfaces
 import ZODB.POSException
+import ZEO.Exceptions
 import transaction
 import transaction.interfaces
 import persistent
@@ -151,7 +152,7 @@
         self, file=None, elideFrameworkCode=0, detail='default'):
         return twisted.python.failure.Failure.printTraceback(
             self, file, elideFrameworkCode or self.sanitized, detail)
-            
+
 class _Dummy: # twisted.python.failure.Failure is an old-style class
     pass # so we use old-style hacks instead of __new__
 
@@ -168,6 +169,17 @@
 
 class Partial(object):
 
+    # for TransactionErrors, such as ConflictErrors
+    transaction_error_count = 0
+    max_transaction_errors = 5
+
+    # for ClientDisconnected errors
+    backoff = None
+    initial_backoff = 5 # seconds
+    backoff_increment = 5 # seconds
+    max_backoff = 60 # seconds
+
+    # more general values
     attempt_count = 0
     _reactor = None
 
@@ -191,7 +203,6 @@
         else: # no persistent bits
             call, args, kwargs = self._resolve(None)
             return call(*args, **kwargs)
-        self.attempt_count = 0
         d = twisted.internet.defer.Deferred()
         get_connection(db, reactor=self.getReactor()).addCallback(
             self._call, d)
@@ -226,21 +237,37 @@
     def _call(self, conn, d):
         self.attempt_count += 1
         tm = transaction.interfaces.ITransactionManager(conn)
-        tm.begin() # syncs
         try:
+            tm.begin() # syncs; inside try:except because of ClientDisconnected
             call, args, kwargs = self._resolve(conn)
             res = call(*args, **kwargs)
             tm.commit()
         except ZODB.POSException.TransactionError:
+            self.transaction_error_count += 1
             tm.abort()
             db = conn.db()
             conn.close()
-            if self.attempt_count >= 5: # TODO configurable
+            if (self.max_transaction_errors is not None and
+                self.transaction_error_count >= self.max_transaction_errors):
                 res = Failure()
                 d.errback(res)
             else:
                 get_connection(db, reactor=self.getReactor()).addCallback(
                     self._call, d)
+        except ZEO.Exceptions.ClientDisconnected:
+            tm.abort()
+            db = conn.db()
+            conn.close()
+            if self.backoff is None:
+                backoff = self.backoff = self.initial_backoff
+            else:
+                backoff = self.backoff = min(
+                    self.backoff + self.backoff_increment, self.max_backoff)
+            reactor = self.getReactor()
+            reactor.callLater(
+                backoff,
+                lambda: get_connection(db, reactor=reactor).addCallback(
+                    self._call, d))
         except EXPLOSIVE_ERRORS:
             tm.abort()
             conn.close()



More information about the Checkins mailing list