[Zodb-checkins] CVS: ZODB3/ZEO - StorageServer.py:1.74.2.10.8.2

Jeremy Hylton jeremy at zope.com
Wed Aug 20 17:29:52 EDT 2003


Update of /cvs-repository/ZODB3/ZEO
In directory cvs.zope.org:/tmp/cvs-serv21966/ZEO

Modified Files:
      Tag: ZODB3-vote-backoff-branch
	StorageServer.py 
Log Message:
Run prepare phase of 2PC in a separate thread for large txns.

Rename one of the restart() methods prepare().  prepare() starts a
transaction from its commit log.  It runs from tpc_begin() through to
the call that blocked the txn, usually vote().

Look for a magic number of store calls in the txn (hard-coded to 25
for now).  If the number of stores calls is bigger than the magic
number, then the prepare is considered "slow" and run in a separate
thread.  The use of a thread allows the mainloop to handle other
requests.

Change the signature of prepare() so that it explicitly takes a delay
argument, which can be None.  This simplifies the control flow a
little, but it is still too complicated.

Extend run_in_thread() so that you can pass it a delay.  When a client
has an already blocked txn with a delay, just use that delay for the
restarted thread.


=== ZODB3/ZEO/StorageServer.py 1.74.2.10.8.1 => 1.74.2.10.8.2 ===
--- ZODB3/ZEO/StorageServer.py:1.74.2.10.8.1	Wed Aug 20 13:41:13 2003
+++ ZODB3/ZEO/StorageServer.py	Wed Aug 20 16:29:51 2003
@@ -569,11 +569,10 @@
         self.timeout.begin(self)
         self.strategy = self.ImmediateCommitStrategyClass(
             self.storage, self.client, self.log)
-        resp = old_strategy.restart(self.strategy)
-        if delay is not None:
-            delay.reply(resp)
-        else:
-            return resp
+        resp = old_strategy.prepare(self.strategy, delay)
+        # If delay is None, then we were called from restart_other()
+        # and the return value doesn't matter.
+        return resp
 
 # A ZEOStorage instance can use different strategies to commit a
 # transaction.  The current implementation uses different strategies
@@ -760,11 +759,26 @@
         self.args = trans_id,
         return self.block()
 
-    def restart(self, new_strategy):
-        # called by the storage when the storage is available
+    def prepare(self, new_strategy, delay):
+        # Called when the storage is available: acquires the lock
+        # and performs all the stores up to the method that actually
+        # blocked.  The prepare() method is a wrapper that decides
+        # whether to use another thread.  The real work is done in
+        # _prepare().
         assert isinstance(new_strategy, ImmediateCommitStrategy)
+        if self.log.stores > 25:
+            # If there are a lot of stores, fire off a separate thread
+            # to avoid blocking other clients.
+            return run_in_thread(self._prepare, new_strategy, delay=delay)
+        else:
+            r = self._prepare(new_strategy)
+            if delay is not None:
+                delay.reply(r)
+            else:
+                return r
+        
+    def _prepare(self, new_strategy):
         new_strategy.tpc_begin(self.txn, self.tid, self.status)
-        print "restarting blocked txn", self.log.stores
         loads, loader = self.log.get_loader()
         for i in range(loads):
             oid, serial, data, version = loader.load()
@@ -850,8 +864,12 @@
                 time.sleep(howlong)
         self.trigger.close()
 
-def run_in_thread(method, *args):
-    t = SlowMethodThread(method, args)
+def run_in_thread(method, *args, **kw):
+    # support kw only to allow delay keyword arg to be used
+    delay = None
+    if kw:
+        delay = kw["delay"]
+    t = SlowMethodThread(method, args, delay)
     t.start()
     return t.delay
 
@@ -868,11 +886,11 @@
     # avoid blocking, we spawn a separate thread, return an MTDelay()
     # object, and have the thread reply() when it finishes.
 
-    def __init__(self, method, args):
+    def __init__(self, method, args, delay=None):
         threading.Thread.__init__(self)
         self._method = method
         self._args = args
-        self.delay = MTDelay()
+        self.delay = delay or MTDelay()
 
     def run(self):
         try:




More information about the Zodb-checkins mailing list