[Zodb-checkins] CVS: Packages/ZEO - StorageServer.py:1.21.4.7

jeremy@digicool.com jeremy@digicool.com
Wed, 25 Apr 2001 16:43:53 -0400 (EDT)


Update of /cvs-repository/Packages/ZEO
In directory korak:/tmp/cvs-serv28767

Modified Files:
      Tag: ZEO-ZRPC-Dev
	StorageServer.py 
Log Message:
Fixes to support multiple simultaneous clients of a single storage.

Add _check_tid() helper that does check if the current call is using
the current transaction id.  Complain if not and raise exception if
specified. 

Use _check_tid() helper for all interesting methods except
tpc_begin(). 

Use _restart_delayed_transaction() to do correct bookkeeping about
current transaction when one is restarted from the delayed queue.  This
new comment explains some of the details:

    # When multiple clients are using a single storage, there are several
    # different _transaction attributes to keep track of.  Each
    # StorageProxy object has a single _transaction that refers to its
    # current transaction.  The storage (self.__storage) has another
    # _transaction that is used for the *real* transaction.

    # The real trick comes with the __waiting queue for a storage.
    # When a StorageProxy pulls a new transaction from the queue, it
    # must inform the new transaction's proxy.  (The two proxies may
    # be the same.)  The new transaction's proxy sets its _transaction
    # and continues from there.

Use handle_waiting() to check if a queue transaction exists.

Remove old async tpc_begin() defs.

Add __repr__() and _log() methods that provide per-instance labels for
debugging purposes.









--- Updated File StorageServer.py in package Packages/ZEO --
--- StorageServer.py	2001/04/20 19:14:08	1.21.4.6
+++ StorageServer.py	2001/04/25 20:43:52	1.21.4.7
@@ -59,8 +59,8 @@
 #    labeled as unofficial distributions.  Modifications which do not
 #    carry the name Zope may be packaged in any form, as long as they
 #    conform to all of the clauses above.
+#
 # 
-# 
 # Disclaimer
 # 
 #   THIS SOFTWARE IS PROVIDED BY DIGITAL CREATIONS ``AS IS'' AND ANY
@@ -86,12 +86,16 @@
 
 This server acts as a front-end for one or more real storages, like
 file storage or Berkeley storage.
+
+XXX Need some basic access control-- a declaration of the methods
+exported for invocation by the server.
 """
 
 import cPickle
 import os
 import sys
 import threading
+import types
 
 import ClientStub
 import zrpc2
@@ -110,8 +114,9 @@
 pickler.fast = 1 # Don't use the memo
 dump = pickler.dump
 
-def log(message, level=zeolog.INFO, label="zeoserver:%s" % os.getpid()):
-    zeolog.LOG(label, level, message)
+def log(message, level=zeolog.INFO, label="ZEO Server:%s" % os.getpid(),
+        error=None):
+    zeolog.LOG(label, level, message, error=error)
 
 class StorageServerError(StorageError):
     pass
@@ -165,6 +170,59 @@
         self.__invalidated = []
         self._transaction = None
 
+    def __repr__(self):
+        tid = self._transaction and repr(self._transaction.id)
+        if self.__storage:
+            stid = self.__storage._transaction and \
+                   repr(self.__storage._transaction.id)
+        else:
+            stid = None
+        return "<StorageProxy %X trans=%s s_trans=%s>" % (id(self), tid,
+                                                          stid)
+
+    def _log(self, msg, level=zeolog.INFO, error=None, pid=os.getpid()):
+        zeolog.LOG("ZEO Server %s %X" % (pid, id(self)),
+                   level, msg, error=error)
+
+    def setup_delegation(self):
+        """Delegate several methods to the storage"""
+        self.undoInfo = self.__storage.undoInfo
+        self.undoLog = self.__storage.undoLog
+        self.versionEmpty = self.__storage.versionEmpty
+        self.versions = self.__storage.versions
+        self.history = self.__storage.history
+        self.load = self.__storage.load
+        self.loadSerial = self.__storage.loadSerial
+
+    def _check_tid(self, tid, exc=None):
+        caller = sys._getframe().f_back.f_code.co_name
+        if self._transaction is None:
+            self._log("no current transaction: %s()" % caller,
+                zeolog.PROBLEM)
+            if exc is not None:
+                raise exc(None, tid)
+            else:
+                return 0
+        if self._transaction.id != tid:
+            self._log("%s(%s) invalid; current transaction = %s" % \
+                (caller, repr(tid), repr(self._transaction.id)),
+                zeolog.PROBLEM)
+            if exc is not None:
+                raise exc(self._transaction.id, tid)
+            else:
+                return 0
+        return 1
+
+    def _restart_delayed_transaction(self, delay, tinfo):
+        self._transaction = t = Transaction()
+        t.id = tinfo[0]
+        t.user = tinfo[1]
+        t.description = tinfo[2]
+        self.__storage.tpc_begin(t)
+        self.__invalidated = []
+        assert self._transaction.id == self.__storage._transaction.id
+        delay.reply(None)
+
     def register(self, storage_id):
         """Select the storage that this client will use
 
@@ -172,25 +230,15 @@
         """
         storage = self.server.storages.get(storage_id)
         if storage is None:
-            log("unknown storage_id: %s" % storage_id)
+            self._log("unknown storage_id: %s" % storage_id)
             self.get_caller.close()
 
         self.__storage_id = storage_id
         self.__storage = storage
         self.setup_delegation()
         self.server.register(storage_id, self)
-        log("registered storage %s: %s" % (storage_id, storage))
+        self._log("registered storage %s: %s" % (storage_id, storage))
 
-    def setup_delegation(self):
-        """Delegate several methods to the storage"""
-        self.undoInfo = self.__storage.undoInfo
-        self.undoLog = self.__storage.undoLog
-        self.versionEmpty = self.__storage.versionEmpty
-        self.versions = self.__storage.versions
-        self.history = self.__storage.history
-        self.load = self.__storage.load
-        self.loadSerial = self.__storage.loadSerial
-
     def get_info(self):
         return {
             'length': len(self.__storage),
@@ -247,9 +295,9 @@
         try: 
             self.__storage.pack(t, referencesf)
         except:
-            log('ZEO Server', zeolog.ERROR,
-                'Pack failed for %s' % self.__storage_id,
-                error=sys.exc_info())
+            self._log('ZEO Server', zeolog.ERROR,
+                      'Pack failed for %s' % self.__storage_id,
+                      error=sys.exc_info())
             if wait:
                 raise
         else:
@@ -259,19 +307,15 @@
                                        self.get_size_info())
 
     def abortVersion(self, src, id):
-        t = self._transaction
-        if t is None or id != t.id:
-            raise StorageTransactionError(self._transaction, id)
-        oids = self.__storage.abortVersion(src, t)
+        self._check_tid(id, exc=StorageTransactionError)
+        oids = self.__storage.abortVersion(src, self._transaction)
         for oid in oids:
             self.__invalidated.append((oid, src))
         return oids
 
     def commitVersion(self, src, dest, id):
-        t = self._transaction
-        if t is None or id != t.id:
-            raise StorageTransactionError(self, id)
-        oids = self.__storage.commitVersion(src, dest, t)
+        self._check_tid(id, exc=StorageTransactionError)
+        oids = self.__storage.commitVersion(src, dest, self._transaction)
         for oid in oids:
             self.__invalidated.append((oid, dest))
             if dest:
@@ -279,26 +323,25 @@
         return oids
 
     def storea(self, oid, serial, data, version, id):
-        t = self._transaction
+        self._check_tid(id, exc=StorageTransactionError)
         try:
             # XXX does this stmt need to be in the try/except?
-            if t is None or id != t.id:
-                raise StorageTransactionError(self, id)
         
-            newserial = self.__storage.store(oid, serial, data, version, t)
+            newserial = self.__storage.store(oid, serial, data, version,
+                                             self._transaction)
         except TransactionError, v:
             # This is a normal transaction error such as a conflict error
             # or a version lock or conflict error. It doesn't need to be
             # logged.
-            log("transaction error: %s" % repr(v))
+            self._log("transaction error: %s" % repr(v))
             newserial = v
         except:
             # all errors need to be serialized to prevent unexpected
             # returns, which would screw up the return handling.
             # IOW, Anything that ends up here is evil enough to be logged.
             error = sys.exc_info()
-            log('store error: %s: %s' % (error[0], error[1]).
-                zeolog.ERROR)
+            self._log('store error: %s: %s' % (error[0], error[1]),
+                      zeolog.ERROR, error=error)
             newserial = sys.exc_info()[1]
         else:
             if serial != '\0\0\0\0\0\0\0\0':
@@ -307,8 +350,8 @@
         try:
             nil = dump(newserial, 1)
         except:
-            log("couldn't pickle newserial: %s" % repr(newserial),
-                zeolog.ERROR)
+            self._log("couldn't pickle newserial: %s" % repr(newserial),
+                      zeolog.ERROR)
             dump('', 1) # clear pickler
             r = StorageServerError("Couldn't pickle exception %s" % \
                                    `newserial`)
@@ -316,11 +359,9 @@
 
         self.client.serialno((oid, newserial))
 
-    def vote(self, id): 
-        t = self._transaction
-        if t is None or id != t.id:
-            raise StorageTransactionError(self._transaction, id)
-        return self.__storage.tpc_vote(t)
+    def vote(self, id):
+        self._check_tid(id, exc=StorageTransactionError)
+        return self.__storage.tpc_vote(self._transaction)
         
     def undo(self, transaction_id):
         oids = self.__storage.undo(transaction_id)
@@ -330,38 +371,36 @@
             return oids
         return ()
 
-    def tpc_abort(self, id):
-        t = self._transaction
-        if t is None or id != t.id:
-            return
-        r = self.__storage.tpc_abort(t)
-
-        waiting = self.__storage.__waiting
-        while waiting:
-            f, args = waiting.pop(0)
-            if apply(f, args):
-                break
-
-        self._transaction = None
-        self.__invalidated = []
-        
     def unlock(self):
-#        if self.__closed:
-#            return
+##        if self.__closed:
+##            return
         self.client.unlock()
 
+    # When multiple clients are using a single storage, there are several
+    # different _transaction attributes to keep track of.  Each
+    # StorageProxy object has a single _transaction that refers to its
+    # current transaction.  The storage (self.__storage) has another
+    # _transaction that is used for the *real* transaction.
+
+    # The real trick comes with the __waiting queue for a storage.
+    # When a StorageProxy pulls a new transaction from the queue, it
+    # must inform the new transaction's proxy.  (The two proxies may
+    # be the same.)  The new transaction's proxy sets its _transaction
+    # and continues from there.
+
     def tpc_begin(self, id, user, description, ext):
-        t = self._transaction
-        if t is not None:
-            if id == t.id:
+        if self._transaction is not None:
+            if self._transaction.id == id:
+                self._log("duplicate tpc_begin(%s)" % repr(id))
                 return
             else:
-                raise StorageServerError("Multiple simultaneous tpc_begin "
-                                         "requests from the same client.")
+                raise StorageTransactionError("Multiple simultaneous tpc_begin"
+                                              " requests from one client.")
         if self.__storage._transaction is not None:
-            self.__storage.__waiting.append((self.unlock, ()))
-            return 1 # Return a flag indicating a lock condition.
-            
+            d = zrpc2.Delay()
+            self.__storage.__waiting.append((d, self, (id, user, description)))
+            return d
+
         self._transaction = t = Transaction()
         t.id = id
         t.user = user
@@ -369,62 +408,52 @@
         self.__storage.tpc_begin(t)
         self.__invalidated = []
 
-    def tpc_begin_sync(self, id, user, description, ext):
-#        if self.__closed:
-#            return
-        t = self._transaction
-        if t is not None and id == t.id:
+    def tpc_finish(self, id, user, description, ext):
+        if not self._check_tid(id):
             return
-        if self.__storage._transaction is None:
-            return self.try_again_sync(id, user, description, ext)
-        else:
-            d = Delay()
-            self.__storage.__waiting.append((self.try_again_sync,
-                                             (id, user, description, ext, d)))
-            return d
-        
-    def try_again_sync(self, id, user, description, ext, delay=None):
-        if self.__storage._transaction is None:
-            self._transaction = t = Transaction()
-            t.id = id
-            t.user = user
-            t.description = description
-            self.__storage.tpc_begin(t)
-            self.__invalidated = []
-            if delay is not None:
-                delay.reply(None)
-            else:
-                return None
-        return 1
 
-    def tpc_finish(self, id, user, description, ext):
+        # XXX Why do we do this for the begin and the end?
         t = self._transaction
-        if id != t.id:
-            return
         t.user = user
         t.description = description
         t.ext = ext
 
-        r = self.__storage.tpc_finish(t)
-        
-        while self.__storage.__waiting:
-            f, args = self.__storage.__waiting.pop(0)
-            if apply(f,args):
-                break
+        r = self.__storage.tpc_finish(self._transaction)
+        assert self.__storage._transaction is None
 
-        self._transaction = None
         if self.__invalidated:
             self.server.invalidate(self, self.__storage_id,
                                    self.__invalidated,
                                    self.get_size_info())
+
+        if not self.handle_waiting():
+            self._transaction = None
+            self.__invalidated = []
+            assert self._transaction is None
+
+    def tpc_abort(self, id):
+        if not self._check_tid(id):
+            return
+        r = self.__storage.tpc_abort(self._transaction)
+        assert self.__storage._transaction is None
+
+        if not self.handle_waiting():
+            self._transaction = None
             self.__invalidated = []
+            assert self._transaction is None
 
+    def handle_waiting(self):
+        if self.__storage.__waiting:
+            d, proxy, tinfo = self.__storage.__waiting.pop(0)
+            proxy._restart_delayed_transaction(d, tinfo)
+            if self is proxy:
+                return 1
+        
     def new_oids(self, n=100):
         """Return a sequence of n new oids, where n defaults to 100"""
         if n < 0:
             n = 1
         return map(lambda x, self=self: self.__storage.new_oid(), range(n))
-
 
 def fixup_storage(storage):
     # backwards compatibility hack