[Zodb-checkins] SVN: ZODB/trunk/src/ Refactored storage server to support multiple client threads.

Jim Fulton jim at zope.com
Sun Jan 31 14:59:54 EST 2010


Log message for revision 108679:
  Refactored storage server to support multiple client threads.
  
  Changed ZEO undo protocol. (Undo is disabled with older clients.)
  Now use one-way undoa.  Undone oids are now returned by (tpc_)vote for
  ZEO. Undo no-longer gets commit lock.
  

Changed:
  U   ZODB/trunk/src/CHANGES.txt
  U   ZODB/trunk/src/ZEO/ClientStorage.py
  U   ZODB/trunk/src/ZEO/ServerStub.py
  U   ZODB/trunk/src/ZEO/StorageServer.py
  U   ZODB/trunk/src/ZEO/tests/Cache.py
  U   ZODB/trunk/src/ZEO/tests/CommitLockTests.py
  U   ZODB/trunk/src/ZEO/tests/InvalidationTests.py
  U   ZODB/trunk/src/ZEO/tests/servertesting.py
  U   ZODB/trunk/src/ZEO/tests/testZEO.py
  U   ZODB/trunk/src/ZEO/tests/testZEO2.py
  U   ZODB/trunk/src/ZEO/zrpc/connection.py
  U   ZODB/trunk/src/ZODB/Connection.py
  U   ZODB/trunk/src/ZODB/tests/ConflictResolution.py
  U   ZODB/trunk/src/ZODB/tests/RevisionStorage.py
  U   ZODB/trunk/src/ZODB/tests/StorageTestBase.py
  U   ZODB/trunk/src/ZODB/tests/TransactionalUndoStorage.py

-=-
Modified: ZODB/trunk/src/CHANGES.txt
===================================================================
--- ZODB/trunk/src/CHANGES.txt	2010-01-31 19:59:52 UTC (rev 108678)
+++ ZODB/trunk/src/CHANGES.txt	2010-01-31 19:59:54 UTC (rev 108679)
@@ -14,6 +14,10 @@
   database's undo method multiple times in the same transaction now
   raises an exception.
 
+- The ZEO protocol for undo has changed.  The only user-visible
+  consequence of this is that when ZODB 3.10 ZEO servers won't support
+  undo for older clients.
+
 - The storage API (IStorage) has been tightened. Now, storages should
   raise a StorageTransactionError when invalid transactions are passed
   to tpc_begin, tpc_vote, or tpc_finish.

Modified: ZODB/trunk/src/ZEO/ClientStorage.py
===================================================================
--- ZODB/trunk/src/ZEO/ClientStorage.py	2010-01-31 19:59:52 UTC (rev 108678)
+++ ZODB/trunk/src/ZEO/ClientStorage.py	2010-01-31 19:59:54 UTC (rev 108679)
@@ -1198,14 +1198,19 @@
         if self._cache is None:
             return
 
+        for oid, _ in self._seriald.iteritems():
+            self._cache.invalidate(oid, tid, False)
+
         for oid, data in self._tbuf:
-            self._cache.invalidate(oid, tid, False)
             # If data is None, we just invalidate.
             if data is not None:
                 s = self._seriald[oid]
                 if s != ResolvedSerial:
                     assert s == tid, (s, tid)
                     self._cache.store(oid, s, None, data)
+            else:
+                # object deletion
+                self._cache.invalidate(oid, tid, False)
 
         if self.fshelper is not None:
             blobs = self._tbuf.blobs
@@ -1241,10 +1246,7 @@
 
         """
         self._check_trans(txn)
-        tid, oids = self._server.undo(trans_id, id(txn))
-        for oid in oids:
-            self._tbuf.invalidate(oid)
-        return tid, oids
+        self._server.undoa(trans_id, id(txn))
 
     def undoInfo(self, first=0, last=-20, specification=None):
         """Storage API: return undo information."""

Modified: ZODB/trunk/src/ZEO/ServerStub.py
===================================================================
--- ZODB/trunk/src/ZEO/ServerStub.py	2010-01-31 19:59:52 UTC (rev 108678)
+++ ZODB/trunk/src/ZEO/ServerStub.py	2010-01-31 19:59:54 UTC (rev 108679)
@@ -272,8 +272,8 @@
     def new_oid(self):
         return self.rpc.call('new_oid')
 
-    def undo(self, trans_id, trans):
-        return self.rpc.call('undo', trans_id, trans)
+    def undoa(self, trans_id, trans):
+        self.rpc.callAsync('undoa', trans_id, trans)
 
     def undoLog(self, first, last):
         return self.rpc.call('undoLog', first, last)

Modified: ZODB/trunk/src/ZEO/StorageServer.py
===================================================================
--- ZODB/trunk/src/ZEO/StorageServer.py	2010-01-31 19:59:52 UTC (rev 108678)
+++ ZODB/trunk/src/ZEO/StorageServer.py	2010-01-31 19:59:54 UTC (rev 108679)
@@ -20,6 +20,8 @@
 exported for invocation by the server.
 """
 
+from __future__ import with_statement
+
 import asyncore
 import cPickle
 import logging
@@ -32,6 +34,7 @@
 
 import transaction
 
+import ZODB.blob
 import ZODB.serialize
 import ZODB.TimeStamp
 import ZEO.zrpc.error
@@ -40,7 +43,7 @@
 from ZEO.CommitLog import CommitLog
 from ZEO.monitor import StorageStats, StatsServer
 from ZEO.zrpc.server import Dispatcher
-from ZEO.zrpc.connection import ManagedServerConnection, Delay, MTDelay
+from ZEO.zrpc.connection import ManagedServerConnection, Delay, MTDelay, Result
 from ZEO.zrpc.trigger import trigger
 from ZEO.Exceptions import AuthError
 
@@ -48,7 +51,7 @@
 from ZODB.POSException import StorageError, StorageTransactionError
 from ZODB.POSException import TransactionError, ReadOnlyError, ConflictError
 from ZODB.serialize import referencesf
-from ZODB.utils import u64, p64, oid_repr, mktemp
+from ZODB.utils import oid_repr, p64, u64, z64
 from ZODB.loglevels import BLATHER
 
 
@@ -87,7 +90,6 @@
     def __init__(self, server, read_only=0, auth_realm=None):
         self.server = server
         # timeout and stats will be initialized in register()
-        self.timeout = None
         self.stats = None
         self.connection = None
         self.client = None
@@ -95,14 +97,13 @@
         self.storage_id = "uninitialized"
         self.transaction = None
         self.read_only = read_only
-        self.locked = 0
+        self.locked = False             # Don't have storage lock
         self.verifying = 0
         self.store_failed = 0
         self.log_label = _label
         self.authenticated = 0
         self.auth_realm = auth_realm
         self.blob_tempfile = None
-        self.blob_log = []
         # The authentication protocol may define extra methods.
         self._extensions = {}
         for func in self.extensions:
@@ -139,24 +140,13 @@
         self.log_label = _label + "/" + label
 
     def notifyDisconnected(self):
+        self.connection = None
+
         # When this storage closes, we must ensure that it aborts
         # any pending transaction.
         if self.transaction is not None:
             self.log("disconnected during transaction %s" % self.transaction)
-            if not self.locked:
-                # Delete (d, zeo_storage) from the _waiting list, if found.
-                waiting = self.storage._waiting
-                for i in range(len(waiting)):
-                    d, z = waiting[i]
-                    if z is self:
-                        del waiting[i]
-                        self.log("Closed connection removed from waiting list."
-                                 " Clients waiting: %d." % len(waiting))
-                        break
-
-            if self.transaction:
-                self.tpc_abort(self.transaction.id)
-
+            self.tpc_abort(self.transaction.id)
         else:
             self.log("disconnected")
 
@@ -176,6 +166,7 @@
     def setup_delegation(self):
         """Delegate several methods to the storage
         """
+        # Called from register
 
         storage = self.storage
 
@@ -183,9 +174,6 @@
 
         if not info['supportsUndo']:
             self.undoLog = self.undoInfo = lambda *a,**k: ()
-            def undo(*a, **k):
-                raise NotImplementedError
-            self.undo = undo
 
         self.getTid = storage.getTid
         self.load = storage.load
@@ -268,6 +256,7 @@
         if self.storage is not None:
             self.log("duplicate register() call")
             raise ValueError("duplicate register() call")
+
         storage = self.server.storages.get(storage_id)
         if storage is None:
             self.log("unknown storage_id: %s" % storage_id)
@@ -280,19 +269,15 @@
         self.storage_id = storage_id
         self.storage = storage
         self.setup_delegation()
-        self.timeout, self.stats = self.server.register_connection(storage_id,
-                                                                   self)
+        self.stats = self.server.register_connection(storage_id, self)
 
     def get_info(self):
         storage = self.storage
 
-        try:
-            supportsUndo = storage.supportsUndo
-        except AttributeError:
-            supportsUndo = False
-        else:
-            supportsUndo = supportsUndo()
 
+        supportsUndo = (getattr(storage, 'supportsUndo', lambda : False)()
+                        and self.connection.peer_protocol_version >= 'Z310')
+
         # Communicate the backend storage interfaces to the client
         storage_provides = zope.interface.providedBy(storage)
         interfaces = []
@@ -419,6 +404,7 @@
         self.serials = []
         self.invalidated = []
         self.txnlog = CommitLog()
+        self.blob_log = []
         self.tid = tid
         self.status = status
         self.store_failed = 0
@@ -437,19 +423,23 @@
     def tpc_finish(self, id):
         if not self._check_tid(id):
             return
-        assert self.locked
+        assert self.locked, "finished called wo lock"
+
         self.stats.commits += 1
-        self.storage.tpc_finish(self.transaction)
+        self.storage.tpc_finish(self.transaction, self._invalidate)
+        # Note that the tid is still current because we still hold the
+        # commit lock. We'll relinquish it in _clear_transaction.
         tid = self.storage.lastTransaction()
+        # Return the tid, for cache invalidation optimization
+        return Result(tid, self._clear_transaction)
+
+    def _invalidate(self, tid):
         if self.invalidated:
             self.server.invalidate(self, self.storage_id, tid,
                                    self.invalidated, self.get_size_info())
-        self._clear_transaction()
-        # Return the tid, for cache invalidation optimization
-        return tid
 
-    def tpc_abort(self, id):
-        if not self._check_tid(id):
+    def tpc_abort(self, tid):
+        if not self._check_tid(tid):
             return
         self.stats.aborts += 1
         if self.locked:
@@ -458,111 +448,68 @@
 
     def _clear_transaction(self):
         # Common code at end of tpc_finish() and tpc_abort()
-        self.stats.active_txns -= 1
-        self.transaction = None
-        self.txnlog.close()
         if self.locked:
+            self.server.unlock_storage(self)
             self.locked = 0
-            self.timeout.end(self)
-            self.stats.lock_time = None
-            self.log("Transaction released storage lock", BLATHER)
+        self.transaction = None
+        self.stats.active_txns -= 1
+        if self.txnlog is not None:
+            self.txnlog.close()
+            self.txnlog = None
+            for oid, oldserial, data, blobfilename in self.blob_log:
+                ZODB.blob.remove_committed(blobfilename)
+            del self.blob_log
 
-            # Restart any client waiting for the storage lock.
-            while self.storage._waiting:
-                delay, zeo_storage = self.storage._waiting.pop(0)
-                try:
-                    zeo_storage._restart(delay)
-                except:
-                    self.log("Unexpected error handling waiting transaction",
-                             level=logging.WARNING, exc_info=True)
-                    zeo_storage.connection.close()
-                    continue
+    def vote(self, tid):
+        self._check_tid(tid, exc=StorageTransactionError)
+        return self._try_to_vote()
 
-                if self.storage._waiting:
-                    n = len(self.storage._waiting)
-                    self.log("Blocked transaction restarted.  "
-                             "Clients waiting: %d" % n)
+    def _try_to_vote(self, delay=None):
+        if self.connection is None:
+            return # We're disconnected
+        self.locked = self.server.lock_storage(self)
+        if self.locked:
+            try:
+                self._vote()
+            except Exception:
+                if delay is not None:
+                    delay.error()
                 else:
-                    self.log("Blocked transaction restarted.")
-
-                break
-
-    # The following two methods return values, so they must acquire
-    # the storage lock and begin the transaction before returning.
-
-    # It's a bit vile that undo can cause us to get the lock before vote.
-
-    def undo(self, trans_id, id):
-        self._check_tid(id, exc=StorageTransactionError)
-        if self.locked:
-            return self._undo(trans_id)
+                    raise
+            else:
+                if delay is not None:
+                    delay.reply(None)
         else:
-            return self._wait(lambda: self._undo(trans_id))
+            if delay == None:
+                self.log("(%r) queue lock: transactions waiting: %s"
+                         % (self.storage_id, self.server.waiting(self)+1))
+                delay = Delay()
+            self.server.unlock_callback(self, delay)
+            return delay
 
-    def vote(self, id):
-        self._check_tid(id, exc=StorageTransactionError)
-        if self.locked:
-            return self._vote()
-        else:
-            return self._wait(lambda: self._vote())
+    def _unlock_callback(self, delay):
+        connection = self.connection
+        if connection is not None:
+            connection.call_from_thread(self._try_to_vote, delay)
 
-    # When a delayed transaction is restarted, the dance is
-    # complicated.  The restart occurs when one ZEOStorage instance
-    # finishes as a transaction and finds another instance is in the
-    # _waiting list.
+    def _vote(self):
 
-    # It might be better to have a mechanism to explicitly send
-    # the finishing transaction's reply before restarting the waiting
-    # transaction.  If the restart takes a long time, the previous
-    # client will be blocked until it finishes.
-
-    def _wait(self, thunk):
-        # Wait for the storage lock to be acquired.
-        self._thunk = thunk
-        if self.tpc_transaction():
-            d = Delay()
-            self.storage._waiting.append((d, self))
-            self.log("Transaction blocked waiting for storage. "
-                     "Clients waiting: %d." % len(self.storage._waiting))
-            return d
-        else:
-            self.log("Transaction acquired storage lock.", BLATHER)
-            return self._restart()
-
-    def _restart(self, delay=None):
-        # Restart when the storage lock is available.
         if self.txnlog.stores == 1:
             template = "Preparing to commit transaction: %d object, %d bytes"
         else:
             template = "Preparing to commit transaction: %d objects, %d bytes"
+
         self.log(template % (self.txnlog.stores, self.txnlog.size()),
                  level=BLATHER)
 
-        self.locked = 1
-        self.timeout.begin(self)
-        self.stats.lock_time = time.time()
         if (self.tid is not None) or (self.status != ' '):
             self.storage.tpc_begin(self.transaction, self.tid, self.status)
         else:
             self.storage.tpc_begin(self.transaction)
 
         try:
-            loads, loader = self.txnlog.get_loader()
-            for i in range(loads):
-                store = loader.load()
-                store_type = store[0]
-                store_args = store[1:]
-
-                if store_type == 'd':
-                    do_store = self._delete
-                elif store_type == 's':
-                    do_store = self._store
-                elif store_type == 'r':
-                    do_store = self._restore
-                else:
-                    raise ValueError('Invalid store type: %r' % store_type)
-
-                if not do_store(*store_args):
+            for op, args in self.txnlog:
+                if not getattr(self, op)(*args):
                     break
 
             # Blob support
@@ -575,12 +522,17 @@
             self._clear_transaction()
             raise
 
-        resp = self._thunk()
-        if delay is not None:
-            delay.reply(resp)
-        else:
-            return resp
 
+        if not self.store_failed:
+            # Only call tpc_vote of no store call failed, otherwise
+            # the serialnos() call will deliver an exception that will be
+            # handled by the client in its tpc_vote() method.
+            serials = self.storage.tpc_vote(self.transaction)
+            if serials:
+                self.serials.extend(serials)
+
+        self.client.serialnos(self.serials)
+
     # The public methods of the ZEO client API do not do the real work.
     # They defer work until after the storage lock has been acquired.
     # Most of the real implementations are in methods beginning with
@@ -610,14 +562,18 @@
         os.write(self.blob_tempfile[0], chunk)
 
     def storeBlobEnd(self, oid, serial, data, id):
+        self._check_tid(id, exc=StorageTransactionError)
+        assert self.txnlog is not None # effectively not allowed after undo
         fd, tempname = self.blob_tempfile
         self.blob_tempfile = None
         os.close(fd)
         self.blob_log.append((oid, serial, data, tempname))
 
     def storeBlobShared(self, oid, serial, data, filename, id):
+        self._check_tid(id, exc=StorageTransactionError)
+        assert self.txnlog is not None # effectively not allowed after undo
+
         # Reconstruct the full path from the filename in the OID directory
-
         if (os.path.sep in filename
             or not (filename.endswith('.tmp')
                     or filename[:-1].endswith('.tmp')
@@ -635,6 +591,13 @@
     def sendBlob(self, oid, serial):
         self.client.storeBlob(oid, serial, self.storage.loadBlob(oid, serial))
 
+    def undo(*a, **k):
+        raise NotImplementedError
+
+    def undoa(self, trans_id, tid):
+        self._check_tid(tid, exc=StorageTransactionError)
+        self.txnlog.undo(trans_id)
+
     def _delete(self, oid, serial):
         err = None
         try:
@@ -721,6 +684,27 @@
 
         return err is None
 
+    def _undo(self, trans_id):
+        err = None
+        try:
+            tid, oids = self.storage.undo(trans_id, self.transaction)
+        except (SystemExit, KeyboardInterrupt):
+            raise
+        except Exception, err:
+            self.store_failed = 1
+            if not isinstance(err, TransactionError):
+                # Unexpected errors are logged and passed to the client
+                self.log("store error: %s, %s" % sys.exc_info()[:2],
+                         logging.ERROR, exc_info=True)
+            err = self._marshal_error(err)
+            # The exception is reported back as newserial for this oid
+            self.serials.append((z64, err))
+        else:
+            self.invalidated.extend(oids)
+            self.serials.extend((oid, ResolvedSerial) for oid in oids)
+
+        return err is None
+
     def _marshal_error(self, error):
         # Try to pickle the exception.  If it can't be pickled,
         # the RPC response would fail, so use something that can be pickled.
@@ -734,23 +718,6 @@
             error = StorageServerError(msg)
         return error
 
-    def _vote(self):
-        if not self.store_failed:
-            # Only call tpc_vote of no store call failed, otherwise
-            # the serialnos() call will deliver an exception that will be
-            # handled by the client in its tpc_vote() method.
-            serials = self.storage.tpc_vote(self.transaction)
-            if serials:
-                self.serials.extend(serials)
-
-        self.client.serialnos(self.serials)
-        return
-
-    def _undo(self, trans_id):
-        tid, oids = self.storage.undo(trans_id, self.transaction)
-        self.invalidated.extend(oids)
-        return tid, oids
-
     # IStorageIteration support
 
     def iterator_start(self, start, stop):
@@ -929,8 +896,12 @@
              for name, storage in storages.items()])
         log("%s created %s with storages: %s" %
             (self.__class__.__name__, read_only and "RO" or "RW", msg))
-        for s in storages.values():
-            s._waiting = []
+
+
+        self._lock = threading.Lock()
+        self._commit_locks = {}
+        self._unlock_callbacks = dict((name, []) for name in storages)
+
         self.read_only = read_only
         self.auth_protocol = auth_protocol
         self.auth_database = auth_database
@@ -1044,7 +1015,7 @@
         Returns the timeout and stats objects for the appropriate storage.
         """
         self.connections[storage_id].append(conn)
-        return self.timeouts[storage_id], self.stats[storage_id]
+        return self.stats[storage_id]
 
     def _invalidateCache(self, storage_id):
         """We need to invalidate any caches we have.
@@ -1195,8 +1166,6 @@
         self.dispatcher.close()
         if self.monitor is not None:
             self.monitor.close()
-        for storage in self.storages.values():
-            storage.close()
         # Force the asyncore mainloop to exit by hackery, i.e. close
         # every socket in the map.  loop() will return when the map is
         # empty.
@@ -1206,6 +1175,8 @@
             except:
                 pass
         asyncore.socket_map.clear()
+        for storage in self.storages.values():
+            storage.close()
 
     def close_conn(self, conn):
         """Internal: remove the given connection from self.connections.
@@ -1216,7 +1187,46 @@
             if conn.obj in cl:
                 cl.remove(conn.obj)
 
+    def lock_storage(self, zeostore):
+        storage_id = zeostore.storage_id
+        with self._lock:
+            if storage_id in self._commit_locks:
+                return False
+            self._commit_locks[storage_id] = zeostore
+            self.timeouts[storage_id].begin(zeostore)
+            self.stats[storage_id].lock_time = time.time()
+        return True
 
+    def unlock_storage(self, zeostore):
+        storage_id = zeostore.storage_id
+        with self._lock:
+            assert self._commit_locks[storage_id] is zeostore
+            del self._commit_locks[storage_id]
+            self.timeouts[storage_id].end(zeostore)
+            self.stats[storage_id].lock_time = None
+            callbacks = self._unlock_callbacks[storage_id][:]
+            del self._unlock_callbacks[storage_id][:]
+
+        if callbacks:
+            zeostore.log("(%r) unlock: transactions waiting: %s"
+                         % (storage_id, len(callbacks)-1))
+
+            for zeostore, delay in callbacks:
+                try:
+                    zeostore._unlock_callback(delay)
+                except (SystemExit, KeyboardInterrupt):
+                    raise
+                except Exception:
+                    logger.exception("Calling unlock callback")
+
+    def unlock_callback(self, zeostore, delay):
+        storage_id = zeostore.storage_id
+        with self._lock:
+            self._unlock_callbacks[storage_id].append((zeostore, delay))
+
+    def waiting(self, zeostore):
+        return len(self._unlock_callbacks[zeostore.storage_id])
+
 class StubTimeoutThread:
 
     def begin(self, client):
@@ -1238,7 +1248,6 @@
         self._client = None
         self._deadline = None
         self._cond = threading.Condition() # Protects _client and _deadline
-        self._trigger = trigger()
 
     def begin(self, client):
         # Called from the restart code the "main" thread, whenever the
@@ -1281,7 +1290,7 @@
             if howlong <= 0:
                 client.log("Transaction timeout after %s seconds" %
                            self._timeout)
-                self._trigger.pull_trigger(lambda: client.connection.close())
+                client.connection.trigger.pull_trigger(client.connection.close)
             else:
                 time.sleep(howlong)
 

Modified: ZODB/trunk/src/ZEO/tests/Cache.py
===================================================================
--- ZODB/trunk/src/ZEO/tests/Cache.py	2010-01-31 19:59:52 UTC (rev 108678)
+++ ZODB/trunk/src/ZEO/tests/Cache.py	2010-01-31 19:59:54 UTC (rev 108679)
@@ -37,14 +37,11 @@
         # Now start an undo transaction
         t = Transaction()
         t.note('undo1')
-        self._storage.tpc_begin(t)
+        oids = self._begin_undos_vote(t, tid)
 
-        tid, oids = self._storage.undo(tid, t)
-
         # Make sure this doesn't load invalid data into the cache
         self._storage.load(oid, '')
 
-        self._storage.tpc_vote(t)
         self._storage.tpc_finish(t)
 
         assert len(oids) == 1

Modified: ZODB/trunk/src/ZEO/tests/CommitLockTests.py
===================================================================
--- ZODB/trunk/src/ZEO/tests/CommitLockTests.py	2010-01-31 19:59:52 UTC (rev 108678)
+++ ZODB/trunk/src/ZEO/tests/CommitLockTests.py	2010-01-31 19:59:54 UTC (rev 108679)
@@ -181,64 +181,3 @@
 
         self._finish_threads()
         self._cleanup()
-
-class CommitLockUndoTests(CommitLockTests):
-
-    def _get_trans_id(self):
-        self._dostore()
-        L = self._storage.undoInfo()
-        return L[0]['id']
-
-    def _begin_undo(self, trans_id, txn):
-        rpc = self._storage._server.rpc
-        return rpc._deferred_call('undo', trans_id, id(txn))
-
-    def _finish_undo(self, msgid):
-        return self._storage._server.rpc._deferred_wait(msgid)
-
-    def checkCommitLockUndoFinish(self):
-        trans_id = self._get_trans_id()
-        oid, txn = self._start_txn()
-        msgid = self._begin_undo(trans_id, txn)
-
-        self._begin_threads()
-
-        self._finish_undo(msgid)
-        self._storage.tpc_vote(txn)
-        self._storage.tpc_finish(txn)
-        self._storage.load(oid, '')
-
-        self._finish_threads()
-
-        self._dostore()
-        self._cleanup()
-
-    def checkCommitLockUndoAbort(self):
-        trans_id = self._get_trans_id()
-        oid, txn = self._start_txn()
-        msgid = self._begin_undo(trans_id, txn)
-
-        self._begin_threads()
-
-        self._finish_undo(msgid)
-        self._storage.tpc_vote(txn)
-        self._storage.tpc_abort(txn)
-
-        self._finish_threads()
-
-        self._dostore()
-        self._cleanup()
-
-    def checkCommitLockUndoClose(self):
-        trans_id = self._get_trans_id()
-        oid, txn = self._start_txn()
-        msgid = self._begin_undo(trans_id, txn)
-        self._begin_threads()
-
-        self._finish_undo(msgid)
-        self._storage.tpc_vote(txn)
-        self._storage.close()
-
-        self._finish_threads()
-
-        self._cleanup()

Modified: ZODB/trunk/src/ZEO/tests/InvalidationTests.py
===================================================================
--- ZODB/trunk/src/ZEO/tests/InvalidationTests.py	2010-01-31 19:59:52 UTC (rev 108678)
+++ ZODB/trunk/src/ZEO/tests/InvalidationTests.py	2010-01-31 19:59:54 UTC (rev 108679)
@@ -318,9 +318,9 @@
         # tearDown then immediately, but if other threads are still
         # running that can lead to a cascade of spurious exceptions.
         for t in threads:
-            t.join(10)
+            t.join(30)
         for t in threads:
-            t.cleanup()
+            t.cleanup(10)
 
     def checkConcurrentUpdates2Storages_emulated(self):
         self._storage = storage1 = self.openClientStorage()
@@ -378,6 +378,34 @@
         db1.close()
         db2.close()
 
+    def checkConcurrentUpdates19Storages(self):
+        n = 19
+        dbs = [DB(self.openClientStorage()) for i in range(n)]
+        self._storage = dbs[0].storage
+        stop = threading.Event()
+
+        cn = dbs[0].open()
+        tree = cn.root()["tree"] = OOBTree()
+        transaction.commit()
+        cn.close()
+
+        # Run threads that update the BTree
+        cd = {}
+        threads = [self.StressThread(dbs[i], stop, i, cd, i, n)
+                   for i in range(n)]
+        self.go(stop, cd, *threads)
+
+        while len(set(db.lastTransaction() for db in dbs)) > 1:
+            _ = [db._storage.sync() for db in dbs]
+
+        cn = dbs[0].open()
+        tree = cn.root()["tree"]
+        self._check_tree(cn, tree)
+        self._check_threads(tree, *threads)
+
+        cn.close()
+        _ = [db.close() for db in dbs]
+
     def checkConcurrentUpdates1Storage(self):
         self._storage = storage1 = self.openClientStorage()
         db1 = DB(storage1)

Modified: ZODB/trunk/src/ZEO/tests/servertesting.py
===================================================================
--- ZODB/trunk/src/ZEO/tests/servertesting.py	2010-01-31 19:59:52 UTC (rev 108678)
+++ ZODB/trunk/src/ZEO/tests/servertesting.py	2010-01-31 19:59:54 UTC (rev 108679)
@@ -58,3 +58,7 @@
         print self.name, 'callAsync', meth, repr(args)
 
     callAsyncNoPoll = callAsync
+
+    def call_from_thread(self, *args):
+        if args:
+            args[0](*args[1:])

Modified: ZODB/trunk/src/ZEO/tests/testZEO.py
===================================================================
--- ZODB/trunk/src/ZEO/tests/testZEO.py	2010-01-31 19:59:52 UTC (rev 108678)
+++ ZODB/trunk/src/ZEO/tests/testZEO.py	2010-01-31 19:59:54 UTC (rev 108679)
@@ -25,7 +25,6 @@
 from ZODB.tests.MinPO import MinPO
 from ZODB.tests.StorageTestBase import zodb_unpickle
 
-import asyncore
 import doctest
 import logging
 import os
@@ -244,7 +243,6 @@
 class FullGenericTests(
     GenericTests,
     Cache.TransUndoStorageWithCache,
-    CommitLockTests.CommitLockUndoTests,
     ConflictResolution.ConflictResolvingStorage,
     ConflictResolution.ConflictResolvingTransUndoStorage,
     PackableStorage.PackableUndoStorage,
@@ -727,6 +725,10 @@
     blob_cache_dir = 'blobs'
     shared_blob_dir = True
 
+class FauxConn:
+    addr = 'x'
+    peer_protocol_version = ZEO.zrpc.connection.Connection.current_protocol
+
 class StorageServerClientWrapper:
 
     def __init__(self):
@@ -743,8 +745,8 @@
     def __init__(self, server, storage_id):
         self.storage_id = storage_id
         self.server = ZEO.StorageServer.ZEOStorage(server, server.read_only)
+        self.server.notifyConnected(FauxConn())
         self.server.register(storage_id, False)
-        self.server._thunk = lambda : None
         self.server.client = StorageServerClientWrapper()
 
     def sortKey(self):
@@ -766,8 +768,7 @@
         self.server.tpc_begin(id(transaction), '', '', {}, None, ' ')
 
     def tpc_vote(self, transaction):
-        self.server._restart()
-        self.server.vote(id(transaction))
+        assert self.server.vote(id(transaction)) is None
         result = self.server.client.serials[:]
         del self.server.client.serials[:]
         return result
@@ -775,8 +776,11 @@
     def store(self, oid, serial, data, version_ignored, transaction):
         self.server.storea(oid, serial, data, id(transaction))
 
+    def send_reply(self, *args):        # Masquerade as conn
+        pass
+
     def tpc_finish(self, transaction, func = lambda: None):
-        self.server.tpc_finish(id(transaction))
+        self.server.tpc_finish(id(transaction)).set_sender(0, self)
 
 
 def multiple_storages_invalidation_queue_is_not_insane():
@@ -849,6 +853,7 @@
     >>> fs = FileStorage('t.fs')
     >>> sv = StorageServer(('', get_port()), dict(fs=fs))
     >>> s = ZEOStorage(sv, sv.read_only)
+    >>> s.notifyConnected(FauxConn())
     >>> s.register('fs', False)
 
 If we ask for the last transaction, we should get the last transaction
@@ -941,7 +946,7 @@
     ...     def close(self):
     ...         print 'connection closed'
     ...     trigger = property(lambda self: self)
-    ...     pull_trigger = lambda self, func: func()
+    ...     pull_trigger = lambda self, func, *args: func(*args)
 
     >>> class ConnectionManager:
     ...     def __init__(self, addr, client, tmin, tmax):
@@ -1251,6 +1256,8 @@
     >>> thread.join(1)
     """
 
+
+
 if sys.version_info >= (2, 6):
     import multiprocessing
 
@@ -1259,28 +1266,32 @@
         q.put((name, conn.root.x))
         conn.close()
 
-    def work_with_multiprocessing():
-        """Client storage should work with multi-processing.
+    class MultiprocessingTests(unittest.TestCase):
 
-        >>> import StringIO
-        >>> sys.stdin = StringIO.StringIO()
-        >>> addr, _ = start_server()
-        >>> conn = ZEO.connection(addr)
-        >>> conn.root.x = 1
-        >>> transaction.commit()
-        >>> q = multiprocessing.Queue()
-        >>> processes = [multiprocessing.Process(
-        ...     target=work_with_multiprocessing_process,
-        ...     args=(i, addr, q))
-        ...     for i in range(3)]
-        >>> _ = [p.start() for p in processes]
-        >>> sorted(q.get(timeout=60) for p in processes)
-        [(0, 1), (1, 1), (2, 1)]
+        def test_work_with_multiprocessing(self):
+            "Client storage should work with multi-processing."
 
-        >>> _ = [p.join(30) for p in processes]
-        >>> conn.close()
-        """
+            self.globs = {}
+            forker.setUp(self)
+            addr, adminaddr = self.globs['start_server']()
+            conn = ZEO.connection(addr)
+            conn.root.x = 1
+            transaction.commit()
+            q = multiprocessing.Queue()
+            processes = [multiprocessing.Process(
+                target=work_with_multiprocessing_process,
+                args=(i, addr, q))
+                         for i in range(3)]
+            _ = [p.start() for p in processes]
+            self.assertEqual(sorted(q.get(timeout=300) for p in processes),
+                             [(0, 1), (1, 1), (2, 1)])
 
+            _ = [p.join(30) for p in processes]
+            conn.close()
+            zope.testing.setupstack.tearDown(self)
+else:
+    class MultiprocessingTests(unittest.TestCase):
+        pass
 
 slow_test_classes = [
     BlobAdaptedFileStorageTests, BlobWritableCacheTests,
@@ -1353,6 +1364,7 @@
     # unit test layer
     zeo = unittest.TestSuite()
     zeo.addTest(unittest.makeSuite(ZODB.tests.util.AAAA_Test_Runner_Hack))
+    zeo.addTest(unittest.makeSuite(MultiprocessingTests))
     zeo.addTest(doctest.DocTestSuite(
         setUp=forker.setUp, tearDown=zope.testing.setupstack.tearDown))
     zeo.addTest(doctest.DocTestSuite(ZEO.tests.IterationTests,

Modified: ZODB/trunk/src/ZEO/tests/testZEO2.py
===================================================================
--- ZODB/trunk/src/ZEO/tests/testZEO2.py	2010-01-31 19:59:52 UTC (rev 108678)
+++ ZODB/trunk/src/ZEO/tests/testZEO2.py	2010-01-31 19:59:54 UTC (rev 108679)
@@ -93,9 +93,9 @@
 handled correctly:
 
     >>> zs1.tpc_abort('0') # doctest: +ELLIPSIS
+    (511/test-addr) ('1') unlock: transactions waiting: 0
     2 callAsync serialnos ...
     reply 1 None
-    (511/test-addr) Blocked transaction restarted.
 
     >>> fs.tpc_transaction() is not None
     True

Modified: ZODB/trunk/src/ZEO/zrpc/connection.py
===================================================================
--- ZODB/trunk/src/ZEO/zrpc/connection.py	2010-01-31 19:59:52 UTC (rev 108678)
+++ ZODB/trunk/src/ZEO/zrpc/connection.py	2010-01-31 19:59:54 UTC (rev 108679)
@@ -55,6 +55,16 @@
         log("Error raised in delayed method", logging.ERROR, exc_info=True)
         self.conn.return_error(self.msgid, *exc_info[:2])
 
+class Result(Delay):
+
+    def __init__(self, *args):
+        self.args = args
+
+    def set_sender(self, msgid, conn):
+        reply, callback = self.args
+        conn.send_reply(msgid, reply, False)
+        callback()
+
 class MTDelay(Delay):
 
     def __init__(self):
@@ -218,18 +228,25 @@
     #             restorea, iterator_start, iterator_next,
     #             iterator_record_start, iterator_record_next,
     #             iterator_gc
+    #
+    # Z310 -- named after the ZODB release 3.10
+    #         New server methods:
+    #             undoa
+    #         Doesn't support undo for older clients.
+    #         Undone oid info returned by vote.
 
     # Protocol variables:
     # Our preferred protocol.
-    current_protocol = "Z309"
+    current_protocol = "Z310"
 
     # If we're a client, an exhaustive list of the server protocols we
     # can accept.
-    servers_we_can_talk_to = ["Z308", current_protocol]
+    servers_we_can_talk_to = ["Z308", "Z309", current_protocol]
 
     # If we're a server, an exhaustive list of the client protocols we
     # can accept.
-    clients_we_can_talk_to = ["Z200", "Z201", "Z303", "Z308", current_protocol]
+    clients_we_can_talk_to = [
+        "Z200", "Z201", "Z303", "Z308", "Z309", current_protocol]
 
     # This is pretty excruciating.  Details:
     #

Modified: ZODB/trunk/src/ZODB/Connection.py
===================================================================
--- ZODB/trunk/src/ZODB/Connection.py	2010-01-31 19:59:52 UTC (rev 108678)
+++ ZODB/trunk/src/ZODB/Connection.py	2010-01-31 19:59:54 UTC (rev 108679)
@@ -666,32 +666,11 @@
             self._cache.update_object_size_estimation(oid, len(p))
             obj._p_estimated_size = len(p)
 
-            self._handle_serial(s, oid)
+            self._handle_serial(oid, s)
 
-    def _handle_serial(self, store_return, oid=None, change=1):
-        """Handle the returns from store() and tpc_vote() calls."""
-
-        # These calls can return different types depending on whether
-        # ZEO is used.  ZEO uses asynchronous returns that may be
-        # returned in batches by the ClientStorage.  ZEO1 can also
-        # return an exception object and expect that the Connection
-        # will raise the exception.
-
-        # When conflict resolution occurs, the object state held by
-        # the connection does not match what is written to the
-        # database.  Invalidate the object here to guarantee that
-        # the new state is read the next time the object is used.
-
-        if not store_return:
+    def _handle_serial(self, oid, serial, change=True):
+        if not serial:
             return
-        if isinstance(store_return, str):
-            assert oid is not None
-            self._handle_one_serial(oid, store_return, change)
-        else:
-            for oid, serial in store_return:
-                self._handle_one_serial(oid, serial, change)
-
-    def _handle_one_serial(self, oid, serial, change):
         if not isinstance(serial, str):
             raise serial
         obj = self._cache.get(oid, None)
@@ -757,7 +736,9 @@
         except AttributeError:
             return
         s = vote(transaction)
-        self._handle_serial(s)
+        if s:
+            for oid, serial in s:
+                self._handle_serial(oid, serial)
 
     def tpc_finish(self, transaction):
         """Indicate confirmation that the transaction is done."""
@@ -1171,7 +1152,7 @@
                 s = self._storage.store(oid, serial, data,
                                         '', transaction)
 
-            self._handle_serial(s, oid, change=False)
+            self._handle_serial(oid, s, change=False)
         src.close()
 
     def _abort_savepoint(self):

Modified: ZODB/trunk/src/ZODB/tests/ConflictResolution.py
===================================================================
--- ZODB/trunk/src/ZODB/tests/ConflictResolution.py	2010-01-31 19:59:52 UTC (rev 108678)
+++ ZODB/trunk/src/ZODB/tests/ConflictResolution.py	2010-01-31 19:59:54 UTC (rev 108679)
@@ -158,6 +158,7 @@
         t = Transaction()
         self._storage.tpc_begin(t)
         self._storage.undo(tid, t)
+        self._storage.tpc_vote(t)
         self._storage.tpc_finish(t)
 
     def checkUndoUnresolvable(self):
@@ -177,7 +178,5 @@
         info = self._storage.undoInfo()
         tid = info[1]['id']
         t = Transaction()
-        self._storage.tpc_begin(t)
-        self.assertRaises(UndoError, self._storage.undo,
-                          tid, t)
+        self.assertRaises(UndoError, self._begin_undos_vote, t, tid)
         self._storage.tpc_abort(t)

Modified: ZODB/trunk/src/ZODB/tests/RevisionStorage.py
===================================================================
--- ZODB/trunk/src/ZODB/tests/RevisionStorage.py	2010-01-31 19:59:52 UTC (rev 108678)
+++ ZODB/trunk/src/ZODB/tests/RevisionStorage.py	2010-01-31 19:59:54 UTC (rev 108679)
@@ -122,7 +122,7 @@
             tid = info[0]["id"]
             # Always undo the most recent txn, so the value will
             # alternate between 3 and 4.
-            self._undo(tid, [oid], note="undo %d" % i)
+            self._undo(tid, note="undo %d" % i)
             revs.append(self._storage.load(oid, ""))
 
         prev_tid = None

Modified: ZODB/trunk/src/ZODB/tests/StorageTestBase.py
===================================================================
--- ZODB/trunk/src/ZODB/tests/StorageTestBase.py	2010-01-31 19:59:52 UTC (rev 108678)
+++ ZODB/trunk/src/ZODB/tests/StorageTestBase.py	2010-01-31 19:59:54 UTC (rev 108679)
@@ -209,10 +209,12 @@
         t = transaction.Transaction()
         t.note(note or "undo")
         self._storage.tpc_begin(t)
-        tid, oids = self._storage.undo(tid, t)
-        self._storage.tpc_vote(t)
+        undo_result = self._storage.undo(tid, t)
+        vote_result = self._storage.tpc_vote(t)
         self._storage.tpc_finish(t)
         if expected_oids is not None:
+            oids = undo_result and undo_result[1] or []
+            oids.extend(oid for (oid, _) in vote_result or ())
             self.assertEqual(len(oids), len(expected_oids), repr(oids))
             for oid in expected_oids:
                 self.assert_(oid in oids)

Modified: ZODB/trunk/src/ZODB/tests/TransactionalUndoStorage.py
===================================================================
--- ZODB/trunk/src/ZODB/tests/TransactionalUndoStorage.py	2010-01-31 19:59:52 UTC (rev 108678)
+++ ZODB/trunk/src/ZODB/tests/TransactionalUndoStorage.py	2010-01-31 19:59:54 UTC (rev 108679)
@@ -101,12 +101,20 @@
             for rec in txn:
                 pass
 
+    def _begin_undos_vote(self, t, *tids):
+        self._storage.tpc_begin(t)
+        oids = []
+        for tid in tids:
+            undo_result = self._storage.undo(tid, t)
+            if undo_result:
+                oids.extend(undo_result[1])
+        oids.extend(oid for (oid, _) in self._storage.tpc_vote(t) or ())
+        return oids
+
     def undo(self, tid, note):
         t = Transaction()
         t.note(note)
-        self._storage.tpc_begin(t)
-        oids = self._storage.undo(tid, t)
-        self._storage.tpc_vote(t)
+        oids = self._begin_undos_vote(t, tid)
         self._storage.tpc_finish(t)
         return oids
 
@@ -152,9 +160,7 @@
         tid = info[0]['id']
         t = Transaction()
         t.note('undo1')
-        self._storage.tpc_begin(t)
-        self._storage.undo(tid, t)
-        self._storage.tpc_vote(t)
+        self._begin_undos_vote(t, tid)
         self._storage.tpc_finish(t)
         # Check that calling getTid on an uncreated object raises a KeyError
         # The current version of FileStorage fails this test
@@ -281,14 +287,10 @@
         tid = info[0]['id']
         tid1 = info[1]['id']
         t = Transaction()
-        self._storage.tpc_begin(t)
-        tid, oids = self._storage.undo(tid, t)
-        tid, oids1 = self._storage.undo(tid1, t)
-        self._storage.tpc_vote(t)
+        oids = self._begin_undos_vote(t, tid, tid1)
         self._storage.tpc_finish(t)
         # We get the finalization stuff called an extra time:
-        eq(len(oids), 2)
-        eq(len(oids1), 2)
+        eq(len(oids), 4)
         unless(oid1 in oids)
         unless(oid2 in oids)
         data, revid1 = self._storage.load(oid1, '')
@@ -355,9 +357,7 @@
         info = self._storage.undoInfo()
         tid = info[1]['id']
         t = Transaction()
-        self._storage.tpc_begin(t)
-        tid, oids = self._storage.undo(tid, t)
-        self._storage.tpc_vote(t)
+        oids = self._begin_undos_vote(t, tid)
         self._storage.tpc_finish(t)
         eq(len(oids), 1)
         self.failUnless(oid1 in oids)
@@ -368,7 +368,6 @@
         eq(zodb_unpickle(data), MinPO(54))
         self._iterate()
 
-
     def checkNotUndoable(self):
         eq = self.assertEqual
         # Set things up so we've got a transaction that can't be undone
@@ -380,10 +379,7 @@
         info = self._storage.undoInfo()
         tid = info[1]['id']
         t = Transaction()
-        self._storage.tpc_begin(t)
-        self.assertRaises(POSException.UndoError,
-                          self._storage.undo,
-                          tid, t)
+        self.assertRaises(POSException.UndoError, self._begin_undos_vote, t, tid)
         self._storage.tpc_abort(t)
         # Now have more fun: object1 and object2 are in the same transaction,
         # which we'll try to undo to, but one of them has since modified in
@@ -419,10 +415,7 @@
         info = self._storage.undoInfo()
         tid = info[1]['id']
         t = Transaction()
-        self._storage.tpc_begin(t)
-        self.assertRaises(POSException.UndoError,
-                          self._storage.undo,
-                          tid, t)
+        self.assertRaises(POSException.UndoError, self._begin_undos_vote, t, tid)
         self._storage.tpc_abort(t)
         self._iterate()
 
@@ -439,7 +432,7 @@
         # So, basically, this makes sure that undo info doesn't depend
         # on file positions.  We change the file positions in an undo
         # record by packing.
-        
+
         # Add a few object revisions
         oid = '\0'*8
         revid0 = self._dostore(oid, data=MinPO(50))
@@ -462,9 +455,7 @@
         self.assertEqual(len(info2), 2)
         # And now attempt to undo the last transaction
         t = Transaction()
-        self._storage.tpc_begin(t)
-        tid, oids = self._storage.undo(tid, t)
-        self._storage.tpc_vote(t)
+        oids = self._begin_undos_vote(t, tid)
         self._storage.tpc_finish(t)
         self.assertEqual(len(oids), 1)
         self.assertEqual(oids[0], oid)



More information about the Zodb-checkins mailing list