[Zodb-checkins] SVN: ZODB/branches/jim-thready-zeo2/src/ZEO/ Fixed bug that caused server to stop committing transactions when

Jim Fulton jim at zope.com
Fri Oct 2 13:18:18 EDT 2009


Log message for revision 104753:
  Fixed bug that caused server to stop committing transactions when
  there was a conflict error on blobs.
  
  Refactored an optimization that speeds server output by writing ahead
  of select loop.
  

Changed:
  U   ZODB/branches/jim-thready-zeo2/src/ZEO/StorageServer.py
  U   ZODB/branches/jim-thready-zeo2/src/ZEO/tests/testConversionSupport.py
  U   ZODB/branches/jim-thready-zeo2/src/ZEO/tests/testZEO.py
  A   ZODB/branches/jim-thready-zeo2/src/ZEO/tests/testZEO2.py
  U   ZODB/branches/jim-thready-zeo2/src/ZEO/zrpc/connection.py

-=-
Modified: ZODB/branches/jim-thready-zeo2/src/ZEO/StorageServer.py
===================================================================
--- ZODB/branches/jim-thready-zeo2/src/ZEO/StorageServer.py	2009-10-02 17:18:16 UTC (rev 104752)
+++ ZODB/branches/jim-thready-zeo2/src/ZEO/StorageServer.py	2009-10-02 17:18:18 UTC (rev 104753)
@@ -273,6 +273,8 @@
         if self.auth_realm and not self.authenticated:
             raise AuthError("Client was never authenticated with server!")
 
+        self.connection.auth_done()
+
         if self.storage is not None:
             self.log("duplicate register() call")
             raise ValueError("duplicate register() call")
@@ -290,7 +292,6 @@
         self.storage = storage
         self.setup_delegation()
         self.stats = self.server.register_connection(storage_id, self)
-        self.connection.thread_ident = self.connection.unregistered_thread_ident
 
     def get_info(self):
         storage = self.storage
@@ -546,29 +547,35 @@
         else:
             self.storage.tpc_begin(self.transaction)
 
-        loads, loader = self.txnlog.get_loader()
-        for i in range(loads):
-            store = loader.load()
-            store_type = store[0]
-            store_args = store[1:]
+        try:
+            loads, loader = self.txnlog.get_loader()
+            for i in range(loads):
+                store = loader.load()
+                store_type = store[0]
+                store_args = store[1:]
 
-            if store_type == 'd':
-                do_store = self._delete
-            elif store_type == 's':
-                do_store = self._store
-            elif store_type == 'r':
-                do_store = self._restore
-            else:
-                raise ValueError('Invalid store type: %r' % store_type)
+                if store_type == 'd':
+                    do_store = self._delete
+                elif store_type == 's':
+                    do_store = self._store
+                elif store_type == 'r':
+                    do_store = self._restore
+                else:
+                    raise ValueError('Invalid store type: %r' % store_type)
 
-            if not do_store(*store_args):
-                break
+                if not do_store(*store_args):
+                    break
 
-        # Blob support
-        for oid, oldserial, data, blobfilename in self.blob_log:
-            self.storage.storeBlob(oid, oldserial, data, blobfilename,
-                                   '', self.transaction,)
+            # Blob support
+            while self.blob_log and not self.store_failed:
+                oid, oldserial, data, blobfilename = self.blob_log.pop()
+                self._store(oid, oldserial, data, blobfilename)
 
+        except:
+            self.storage.tpc_abort(self.transaction)
+            self._clear_transaction()
+            raise
+
         thunk = self._thunk
         delay = self._delay
         self._thunk = self._delay = None
@@ -664,11 +671,15 @@
 
         return err is None
 
-    def _store(self, oid, serial, data):
+    def _store(self, oid, serial, data, blobfile=None):
         err = None
         try:
-            newserial = self.storage.store(oid, serial, data, '',
-                                           self.transaction)
+            if blobfile is None:
+                newserial = self.storage.store(
+                    oid, serial, data, '', self.transaction)
+            else:
+                newserial = self.storage.storeBlob(
+                    oid, serial, data, blobfile, '', self.transaction)
         except (SystemExit, KeyboardInterrupt):
             raise
         except Exception, err:

Modified: ZODB/branches/jim-thready-zeo2/src/ZEO/tests/testConversionSupport.py
===================================================================
--- ZODB/branches/jim-thready-zeo2/src/ZEO/tests/testConversionSupport.py	2009-10-02 17:18:16 UTC (rev 104752)
+++ ZODB/branches/jim-thready-zeo2/src/ZEO/tests/testConversionSupport.py	2009-10-02 17:18:18 UTC (rev 104753)
@@ -57,6 +57,9 @@
     peer_protocol_version = (
         ZEO.zrpc.connection.Connection.current_protocol)
 
+    def auth_done(self):
+        pass
+
 def test_server_record_iternext():
     """
     

Modified: ZODB/branches/jim-thready-zeo2/src/ZEO/tests/testZEO.py
===================================================================
--- ZODB/branches/jim-thready-zeo2/src/ZEO/tests/testZEO.py	2009-10-02 17:18:16 UTC (rev 104752)
+++ ZODB/branches/jim-thready-zeo2/src/ZEO/tests/testZEO.py	2009-10-02 17:18:18 UTC (rev 104753)
@@ -722,10 +722,12 @@
 
 class FauxConn:
     addr = 'x'
-    thread_ident = unregistered_thread_ident = None
     peer_protocol_version = (
         ZEO.zrpc.connection.Connection.current_protocol)
 
+    def auth_done(self):
+        pass
+
 class StorageServerClientWrapper:
 
     def __init__(self):

Copied: ZODB/branches/jim-thready-zeo2/src/ZEO/tests/testZEO2.py (from rev 104716, ZODB/trunk/src/ZEO/tests/testZEO2.py)
===================================================================
--- ZODB/branches/jim-thready-zeo2/src/ZEO/tests/testZEO2.py	                        (rev 0)
+++ ZODB/branches/jim-thready-zeo2/src/ZEO/tests/testZEO2.py	2009-10-02 17:18:18 UTC (rev 104753)
@@ -0,0 +1,171 @@
+##############################################################################
+#
+# Copyright Zope Foundation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+from zope.testing import doctest, setupstack, renormalizing
+import logging
+import re
+import sys
+import transaction
+import unittest
+import ZEO.StorageServer
+import ZEO.tests.servertesting
+import ZODB.blob
+import ZODB.FileStorage
+import ZODB.tests.util
+import ZODB.utils
+
+def proper_handling_of_blob_conflicts():
+    r"""
+
+Conflict errors weren't properly handled when storing blobs, the
+result being that the storage was left in a transaction.
+
+We originally saw this when restarting a block transaction, although
+it doesn't really matter.
+
+Set up the storage with some initial blob data.
+
+    >>> fs = ZODB.FileStorage.FileStorage('t.fs', blob_dir='t.blobs')
+    >>> db = ZODB.DB(fs)
+    >>> conn = db.open()
+    >>> conn.root.b = ZODB.blob.Blob('x')
+    >>> transaction.commit()
+
+Get the iod and first serial. We'll use the serial later to provide
+out-of-date data.
+
+    >>> oid = conn.root.b._p_oid
+    >>> serial = conn.root.b._p_serial
+    >>> conn.root.b.open('w').write('y')
+    >>> transaction.commit()
+    >>> data = fs.load(oid)[0]
+
+Create the server:
+
+    >>> server = ZEO.tests.servertesting.StorageServer('x', {'1': fs})
+
+And an initial client.
+
+    >>> zs1 = ZEO.StorageServer.ZEOStorage(server)
+    >>> conn1 = ZEO.tests.servertesting.Connection(1)
+    >>> zs1.notifyConnected(conn1)
+    >>> zs1.register('1', 0)
+    >>> zs1.tpc_begin('0', '', '', {})
+    >>> zs1.storea(ZODB.utils.p64(99), ZODB.utils.z64, 'x', '0')
+    >>> _ = zs1.vote('0') # doctest: +ELLIPSIS
+    1 callAsync serialnos ...
+
+In a second client, we'll try to commit using the old serial. This
+will conflict. It will be blocked at the vote call.
+
+    >>> zs2 = ZEO.StorageServer.ZEOStorage(server)
+    >>> conn2 = ZEO.tests.servertesting.Connection(2)
+    >>> zs2.notifyConnected(conn2)
+    >>> zs2.register('1', 0)
+    >>> zs2.tpc_begin('1', '', '', {})
+    >>> zs2.storeBlobStart()
+    >>> zs2.storeBlobChunk('z')
+    >>> zs2.storeBlobEnd(oid, serial, data, '1')
+    >>> delay = zs2.vote('1')
+
+    >>> def send_reply(id, reply):
+    ...     print 'reply', id, reply
+    >>> delay.set_sender(1, send_reply, None)
+
+    >>> logger = logging.getLogger('ZEO')
+    >>> handler = logging.StreamHandler(sys.stdout)
+    >>> logger.setLevel(logging.INFO)
+    >>> logger.addHandler(handler)
+
+Now, when we abort the transaction for the first client. the second
+client will be restarted.  It will get a conflict error, that is
+handled correctly:
+
+    >>> zs1.tpc_abort('0') # doctest: +ELLIPSIS
+    (511/test-addr) ('1') unlock: transactions waiting: 0
+    2 callAsync serialnos ...
+    reply 1 None
+
+    >>> fs.tpc_transaction() is not None
+    True
+    >>> conn2.connected
+    True
+
+    >>> logger.setLevel(logging.NOTSET)
+    >>> logger.removeHandler(handler)
+    >>> zs2.tpc_abort('1')
+    >>> fs.close()
+    """
+
+def proper_handling_of_errors_in_restart():
+    r"""
+
+It's critical that if there is an error in _restart (ie vote) that the
+storage isn't left in tpc.
+
+    >>> fs = ZODB.FileStorage.FileStorage('t.fs', blob_dir='t.blobs')
+    >>> server = ZEO.tests.servertesting.StorageServer('x', {'1': fs})
+
+And an initial client.
+
+    >>> zs1 = ZEO.StorageServer.ZEOStorage(server)
+    >>> conn1 = ZEO.tests.servertesting.Connection(1)
+    >>> zs1.notifyConnected(conn1)
+    >>> zs1.register('1', 0)
+    >>> zs1.tpc_begin('0', '', '', {})
+    >>> zs1.storea(ZODB.utils.p64(99), ZODB.utils.z64, 'x', '0')
+
+Intentionally break zs1:
+
+    >>> zs1._store = lambda : None
+    >>> _ = zs1.vote('0') # doctest: +ELLIPSIS
+    Traceback (most recent call last):
+    ...
+    TypeError: <lambda>() takes no arguments (3 given)
+
+We're not in a transaction:
+
+    >>> fs.tpc_transaction() is None
+    True
+
+We can start another client and get the storage lock.
+
+    >>> zs1 = ZEO.StorageServer.ZEOStorage(server)
+    >>> conn1 = ZEO.tests.servertesting.Connection(1)
+    >>> zs1.notifyConnected(conn1)
+    >>> zs1.register('1', 0)
+    >>> zs1.tpc_begin('1', '', '', {})
+    >>> zs1.storea(ZODB.utils.p64(99), ZODB.utils.z64, 'x', '1')
+    >>> _ = zs1.vote('1') # doctest: +ELLIPSIS
+    1 callAsync serialnos ...
+
+    >>> zs1.tpc_finish('1') is not None
+    True
+
+    >>> fs.close()
+    """
+
+
+def test_suite():
+    return unittest.TestSuite((
+        doctest.DocTestSuite(
+            setUp=ZODB.tests.util.setUp, tearDown=setupstack.tearDown,
+            checker=renormalizing.RENormalizing([
+                (re.compile('\d+/test-addr'), ''),
+                ]),
+            ),
+        ))
+
+if __name__ == '__main__':
+    unittest.main(defaultTest='test_suite')
+

Modified: ZODB/branches/jim-thready-zeo2/src/ZEO/zrpc/connection.py
===================================================================
--- ZODB/branches/jim-thready-zeo2/src/ZEO/zrpc/connection.py	2009-10-02 17:18:16 UTC (rev 104752)
+++ ZODB/branches/jim-thready-zeo2/src/ZEO/zrpc/connection.py	2009-10-02 17:18:18 UTC (rev 104753)
@@ -783,8 +783,13 @@
         else:
             self.trigger.pull_trigger()
 
+    def auth_done(self):
+        # We're done with the auth dance. We can be fast now.
+        self.thread_ident = self.unregistered_thread_ident
+
 def server_loop(map, conn):
     conn.unregistered_thread_ident = thread.get_ident()
+
     while len(map) > 1:
         asyncore.poll(30.0, map)
     for o in map.values():



More information about the Zodb-checkins mailing list