[Zodb-checkins] CVS: StandaloneZODB/ZEO - StorageServer.py:1.32.6.3.2.2

Jeremy Hylton jeremy@zope.com
Mon, 29 Apr 2002 18:41:28 -0400


Update of /cvs-repository/StandaloneZODB/ZEO
In directory cvs.zope.org:/tmp/cvs-serv29054

Modified Files:
      Tag: ZEO2-branch
	StorageServer.py 
Log Message:
Remove distributed commit lock!  Actually, just move it around a bit.

Instead of blocking a client as soon as it reaches tpc_begin(), accept
messages and queue them until the client calls a method that requires
a response -- tpc_vote(), transactionalUndo(), abortVersion(), or
commitVersion().  This lets one client send updates while another
client is committing.  In practice undo & versions are uncommon, so
most transactions will eventually block on the vote.

The new approach requires substantial changes to the implementation.
The code now uses the Strategy pattern, as explained by this comment
block: 

# A ZEOStorage instance can use different strategies to commit a
# transaction.  The current implementation uses different strategies
# depending on whether the underlying storage is available.  These
# strategies implement the distributed commit lock.

# If the underlying storage is availabe, start the commit immediately
# using the ImmediateCommitStrategy.  If the underlying storage is not
# available because another client is committing a transaction, delay
# the commit as long as possible.  At some point it will no longer be
# possible to delay; either the transaction will reach the vote stage
# or a synchronous method like transactionalUndo() will be called.
# When it is no longer possible to delay, the client must block until
# the storage is ready.  Then we switch back to the immediate strategy.



=== StandaloneZODB/ZEO/StorageServer.py 1.32.6.3.2.1 => 1.32.6.3.2.2 ===
 
 from ZEO import ClientStub
+from ZEO.CommitLog import CommitLog
 from ZEO.zrpc.server import Dispatcher
 from ZEO.zrpc.connection import ManagedServerConnection, Delay
 
@@ -35,6 +36,7 @@
      TransactionError, ReadOnlyError
 from ZODB.referencesf import referencesf
 from ZODB.Transaction import Transaction
+from ZODB.TmpStore import TmpStore
 
 # We create a special fast pickler! This allows us
 # to create slightly more efficient pickles and
@@ -47,6 +49,13 @@
         error=None):
     zLOG.LOG(label, level, message, error=error)
 
+# a version of log that includes the storage name
+def slog(storage, msg, level=zLOG.INFO, error=None, pid=os.getpid()):
+    name = getattr(storage, '__name__', None)
+    if name is None:
+        name = str(self.storage)
+    zLOG.LOG("ZEO Server:%s:%s" % (pid, name), level, msg, error=error)
+
 class StorageServerError(StorageError):
     pass
 
@@ -55,6 +64,8 @@
         # XXX should read_only be a per-storage option? not yet...
         self.addr = addr
         self.storages = storages
+        for s in storages.values():
+            s._waiting = []
         self.read_only = read_only
         self.connections = {}
         self.dispatcher = Dispatcher(addr, factory=self.newConnection,
@@ -73,9 +84,6 @@
         l = self.connections.get(storage_id)
         if l is None:
             l = self.connections[storage_id] = []
-            # intialize waiting list
-            # XXX why are we using a mangled name ?!?
-            self.storages[storage_id]._ZEOStorage__waiting = []
         l.append(proxy)
 
     def invalidate(self, conn, storage_id, invalidated=(), info=0):
@@ -166,6 +174,7 @@
 
         This method must be the first one called by the client.
         """
+        self._log("register(%s, %s)" % (storage_id, read_only))
         storage = self.server.storages.get(storage_id)
         if storage is None:
             self._log("unknown storage_id: %s" % storage_id)
@@ -247,71 +256,11 @@
                 self.server.invalidate(0, self.__storage_id, (),
                                        self.get_size_info())
 
-    def abortVersion(self, src, id):
-        self._check_tid(id, exc=StorageTransactionError)
-        oids = self.__storage.abortVersion(src, self._transaction)
-        for oid in oids:
-            self.__invalidated.append((oid, src))
-        return oids
-
-    def commitVersion(self, src, dest, id):
-        self._check_tid(id, exc=StorageTransactionError)
-        oids = self.__storage.commitVersion(src, dest, self._transaction)
-        for oid in oids:
-            self.__invalidated.append((oid, dest))
-            if dest:
-                self.__invalidated.append((oid, src))
-        return oids
-
-    def storea(self, oid, serial, data, version, id):
-        self._check_tid(id, exc=StorageTransactionError)
-        # XXX The try/except seems to be doing a lot of work.  How
-        # worried are we about errors that can't be pickled.
-        try:
-            newserial = self.__storage.store(oid, serial, data, version,
-                                             self._transaction)
-        except TransactionError, v:
-            # This is a normal transaction error such as a conflict error
-            # or a version lock or conflict error. It doesn't need to be
-            # logged.
-            self._log("transaction error: %s" % repr(v))
-            newserial = v
-        except:
-            # all errors need to be serialized to prevent unexpected
-            # returns, which would screw up the return handling.
-            # IOW, Anything that ends up here is evil enough to be logged.
-            error = sys.exc_info()
-            self._log('store error: %s: %s' % (error[0], error[1]),
-                      zLOG.ERROR, error=error)
-            newserial = error[1]
-            del error
-        else:
-            if serial != '\0\0\0\0\0\0\0\0':
-                self.__invalidated.append((oid, version))
-
-        # Is all this error checking necessary?
-        try:
-            nil = dump(newserial, 1)
-        except:
-            self._log("couldn't pickle newserial: %s" % repr(newserial),
-                      zLOG.ERROR)
-            dump('', 1) # clear pickler
-            r = StorageServerError("Couldn't pickle exception %s" % \
-                                   `newserial`)
-            newserial = r
-
-        self.client.serialno((oid, newserial))
-
-    def vote(self, id):
-        self._check_tid(id, exc=StorageTransactionError)
-        self.__storage.tpc_vote(self._transaction)
-
-    def transactionalUndo(self, trans_id, id):
-        self._check_tid(id, exc=StorageTransactionError)
-        oids = self.__storage.transactionalUndo(trans_id, self._transaction)
-        for oid in oids:
-            self.__invalidated.append((oid, None))
-        return oids
+    def new_oids(self, n=100):
+        """Return a sequence of n new oids, where n defaults to 100"""
+        if n < 0:
+            n = 1
+        return [self.__storage.new_oid() for i in range(n)]
 
     def undo(self, transaction_id):
         oids = self.__storage.undo(transaction_id)
@@ -321,18 +270,6 @@
             return oids
         return ()
 
-    # When multiple clients are using a single storage, there are several
-    # different _transaction attributes to keep track of.  Each
-    # StorageProxy object has a single _transaction that refers to its
-    # current transaction.  The storage (self.__storage) has another
-    # _transaction that is used for the *real* transaction.
-
-    # The real trick comes with the __waiting queue for a storage.
-    # When a StorageProxy pulls a new transaction from the queue, it
-    # must inform the new transaction's proxy.  (The two proxies may
-    # be the same.)  The new transaction's proxy sets its _transaction
-    # and continues from there.
-
     def tpc_begin(self, id, user, description, ext, tid, status):
         if self._transaction is not None:
             if self._transaction.id == id:
@@ -342,63 +279,261 @@
                 raise StorageTransactionError("Multiple simultaneous tpc_begin"
                                               " requests from one client.")
 
+        if self.__storage._transaction is None:
+            self.strategy = ImmediateCommitStrategy(self.__storage,
+                                                    self.client)
+        else:
+            self.strategy = DelayedCommitStrategy(self.__storage,
+                                                  self.wait)
+            
         t = Transaction()
         t.id = id
         t.user = user
         t.description = description
         t._extension = ext
 
-        if self.__storage._transaction is not None:
-            d = Delay()
-            self.__storage.__waiting.append((d, self, t, tid, status))
-            return d
-
+        self.strategy.tpc_begin(t, tid, status)
         self._transaction = t
-        self.__storage.tpc_begin(t, tid, status)
-        self.__invalidated = []
 
     def tpc_finish(self, id):
         if not self._check_tid(id):
             return
-
-        r = self.__storage.tpc_finish(self._transaction)
-        assert self.__storage._transaction is None
-
-        if self.__invalidated:
+        invalidated = self.strategy.tpc_finish()
+        if invalidated:
             self.server.invalidate(self, self.__storage_id,
-                                   self.__invalidated,
-                                   self.get_size_info())
-
+                                   self.__invalidated, self.get_size_info())
         if not self._handle_waiting():
             self._transaction = None
-            self.__invalidated = []
+            self.strategy = None
 
     def tpc_abort(self, id):
         if not self._check_tid(id):
             return
-        r = self.__storage.tpc_abort(self._transaction)
-        assert self.__storage._transaction is None
-
+        self.strategy.tpc_abort()
         if not self._handle_waiting():
             self._transaction = None
-            self.__invalidated = []
+            self.strategy = None
 
-    def _restart_delayed_transaction(self, delay, trans, tid, status):
-        self._transaction = trans
-        self.__storage.tpc_begin(trans, tid, status)
-        self.__invalidated = []
-        assert self._transaction.id == self.__storage._transaction.id
-        delay.reply(None)
+    # XXX handle new serialnos
+
+    def storea(self, oid, serial, data, version, id):
+        self._check_tid(id, exc=StorageTransactionError)
+        self.strategy.store(oid, serial, data, version)
+
+    def vote(self, id):
+        self._check_tid(id, exc=StorageTransactionError)
+        return self.strategy.tpc_vote()
+
+    def abortVersion(self, src, id):
+        self._check_tid(id, exc=StorageTransactionError)
+        return self.strategy.abortVersion(src)
+
+    def commitVersion(self, src, dest, id):
+        self._check_tid(id, exc=StorageTransactionError)
+        return self.strategy.commitVersion(src, dest)
+
+    def transactionalUndo(self, trans_id, id):
+        self._check_tid(id, exc=StorageTransactionError)
+        return self.strategy.transactionalUndo(trans_id)
+
+    # When a delayed transaction is restarted, the dance is
+    # complicated.  The restart occurs when one ZEOStorage instance
+    # finishes as a transaction and finds another instance is in the
+    # _waiting list.
+
+    # XXX It might be better to have a mechanism to explicitly send
+    # the finishing transaction's reply before restarting the waiting
+    # transaction.  If the restart takes a long time, the previous
+    # client will be blocked until it finishes.
+    
+    def wait(self):
+        d = Delay()
+        self.__storage._waiting.append((d, self))
+        return d
 
     def _handle_waiting(self):
-        if self.__storage.__waiting:
-            delay, proxy, trans, tid, status = self.__storage.__waiting.pop(0)
-            proxy._restart_delayed_transaction(delay, trans, tid, status)
-            if self is proxy:
-                return 1
+        if self.__storage._waiting:
+            delay, zeo_storage = self.__storage._waiting.pop(0)
+            self.restart(delay)
+            zeo_storage.restart(delay)
+
+    def restart(self, delay):
+        old_strategy = self.strategy
+        self.strategy = ImmediateCommitStrategy(self.__storage,
+                                                self.client)
+        delay.reply(old_strategy.restart(self.strategy))
+
+# A ZEOStorage instance can use different strategies to commit a
+# transaction.  The current implementation uses different strategies
+# depending on whether the underlying storage is available.  These
+# strategies implement the distributed commit lock.
+
+# If the underlying storage is availabe, start the commit immediately
+# using the ImmediateCommitStrategy.  If the underlying storage is not
+# available because another client is committing a transaction, delay
+# the commit as long as possible.  At some point it will no longer be
+# possible to delay; either the transaction will reach the vote stage
+# or a synchronous method like transactionalUndo() will be called.
+# When it is no longer possible to delay, the client must block until
+# the storage is ready.  Then we switch back to the immediate strategy.
+
+class ICommitStrategy:
+    """A class that describes that commit strategy interface.
+
+    The commit strategy interface does not require the transaction
+    argument, except for tpc_begin().  The storage interface requires
+    the client to pass a transaction object/id to each transactional
+    method.  The strategy does not; it requires the caller to only
+    call methods for a single transaction.
+    """
+    # This isn't a proper Zope interface, because I don't want to
+    # introduce a dependency between ZODB and Zope interfaces.
+
+    def tpc_begin(self, trans, tid, status): pass
+
+    def store(self, oid, serial, data, version): pass
+
+    def abortVersion(self, src): pass
+
+    def commitVersion(self, src, dest): pass
+
+    # the trans_id arg to transactionalUndo is not the current txn's id
+    def transactionalUndo(self, trans_id): pass
+
+    def tpc_vote(self): pass
+
+    def tpc_abort(self): pass
+
+    def tpc_finish(self): pass
+
+class ImmediateCommitStrategy:
+    """The storage is available so do a normal commit."""
+
+    def __init__(self, storage, client):
+        self.storage = storage
+        self.client = client
+        self.invalidated = []
+        self.serials = []
+
+    def tpc_begin(self, txn, tid, status):
+        self.txn = txn
+        self.storage.tpc_begin(txn, tid, status)
+
+    def tpc_vote(self):
+        # send all the serialnos as a batch
+        self.client.serialnos(self.serials)
+        return self.storage.tpc_vote(self.txn)
+
+    def tpc_finish(self):
+        self.storage.tpc_finish(self.txn)
+        return self.invalidated
 
-    def new_oids(self, n=100):
-        """Return a sequence of n new oids, where n defaults to 100"""
-        if n < 0:
-            n = 1
-        return [self.__storage.new_oid() for i in range(n)]
+    def tpc_abort(self):
+        self.storage.tpc_abort(self.txn)
+
+    def store(self, oid, serial, data, version):
+        try:
+            newserial = self.storage.store(oid, serial, data, version,
+                                           self.txn)
+        except TransactionError, err:
+            # Storage errors are passed to the client
+            newserial = err
+        except:
+            # Unexpected storage errors are logged and passed to the client
+            exc_info = sys.exc_info()
+            slog(self.storage, "store error: %s, %s" % exc_info[:2],
+                zLOG.ERROR, error=exc_info)
+            newserial = exc_info[1]
+            del exc_info
+        else:
+            if serial != "\0\0\0\0\0\0\0\0":
+                self.invalidated.append((oid, version))
+
+        try:
+            nil = dump(newserial, 1)
+        except:
+            msg = "Couldn't pickle storage exception: %s" % repr(newserial)
+            slog(self.storage, msg, zLOG.ERROR)
+            dump('', 1) # clear pickler
+            r = StorageServerError(msg)
+            newserial = r
+        self.serials.append((oid, newserial))
+
+    def commitVersion(self, src, dest):
+        oids = self.storage.commitVersion(src, dest, self.txn)
+        inv = [(oid, dest) for oid in oids]
+        self.invalidated.extend(inv)
+        if dest:
+            inv = [(oid, src) for oid in oids]
+            self.invalidated.extend(inv)
+        return oids
+
+    def abortVersion(self, src):
+        oids = self.storage.abortVersion(src, self.txn)
+        inv = [(oid, src) for oid in oids]
+        self.invalidated.extend(inv)
+        return oids
+
+    def transactionalUndo(self, trans_id):
+        oids = self.storage.transactionalUndo(trans_id, self.txn)
+        inv = [(oid, None) for oid in oids]
+        self.invalidated.extend(inv)
+        return oids
+
+class DelayedCommitStrategy:
+    """The storage is unavailable, so log to a file."""
+
+    def __init__(self, storage, block):
+        # the block argument is called when we can't delay any longer
+        self.storage = storage
+        self.block = block
+        self.log = CommitLog()
+        self.invalidated = []
+
+        # Store information about the call that blocks
+        self.name = None
+        self.args = None
+
+    def tpc_begin(self, txn, tid, status):
+        self.txn = txn
+        self.tid = tid
+        self.status = status
+
+    def store(self, oid, serial, data, version):
+        self.log.store(oid, serial, data, version)
+
+    def tpc_abort(self):
+        pass # just forget about this strategy
+
+    def tpc_finish(self):
+        raise RuntimeError, "Logic error.  This method must not be called."
+
+    def tpc_vote(self):
+        self.name = "tpc_vote"
+        self.args = ()
+        return self.block()
+
+    def commitVersion(self, src, dest):
+        self.name = "commitVersion"
+        self.args = src, dest
+        return self.block()
+
+    def abortVersion(self, src):
+        self.name = "abortVersion"
+        self.args = src,
+        return self.block()
+
+    def transactionalUndo(self, trans_id):
+        self.name = "transactionalUndo"
+        self.args = trans_id,
+        return self.block()
+
+    def restart(self, new_strategy):
+        # called by the storage when the storage is available
+        new_strategy.tpc_begin(self.txn, self.tid, self.status)
+        loads, loader = self.log.get_loader()
+        for i in len(loads):
+            oid, serial, data, version = loader.load()
+            new_strategy.store(oid, serial, data, version)
+        meth = getattr(new_strategy, self.name)
+        return meth(*self.args)