[Checkins] SVN: ZODB/branches/jim-dev/src/ZEO/ checkpoint

Jim Fulton jim at zope.com
Sat May 1 11:49:27 EDT 2010


Log message for revision 111839:
  checkpoint

Changed:
  U   ZODB/branches/jim-dev/src/ZEO/StorageServer.py
  U   ZODB/branches/jim-dev/src/ZEO/tests/forker.py
  U   ZODB/branches/jim-dev/src/ZEO/tests/servertesting.py
  U   ZODB/branches/jim-dev/src/ZEO/tests/testZEO2.py

-=-
Modified: ZODB/branches/jim-dev/src/ZEO/StorageServer.py
===================================================================
--- ZODB/branches/jim-dev/src/ZEO/StorageServer.py	2010-05-01 15:45:55 UTC (rev 111838)
+++ ZODB/branches/jim-dev/src/ZEO/StorageServer.py	2010-05-01 15:49:27 UTC (rev 111839)
@@ -57,20 +57,8 @@
 
 logger = logging.getLogger('ZEO.StorageServer')
 
-# TODO:  This used to say "ZSS", which is now implied in the logger name.
-# Can this be either set to str(os.getpid()) (if that makes sense) or removed?
-_label = "" # default label used for logging.
-
-
-def set_label():
-    """Internal helper to reset the logging label (e.g. after fork())."""
-    global _label
-    _label = "%s" % os.getpid()
-
-
-def log(message, level=logging.INFO, label=None, exc_info=False):
+def log(message, level=logging.INFO, label='', exc_info=False):
     """Internal helper to log a message."""
-    label = label or _label
     if label:
         message = "(%s) %s" % (label, message)
     logger.log(level, message, exc_info=exc_info)
@@ -97,10 +85,10 @@
         self.storage_id = "uninitialized"
         self.transaction = None
         self.read_only = read_only
+        self.log_label = 'unconnected'
         self.locked = False             # Don't have storage lock
         self.verifying = 0
         self.store_failed = 0
-        self.log_label = _label
         self.authenticated = 0
         self.auth_realm = auth_realm
         self.blob_tempfile = None
@@ -131,13 +119,7 @@
             conn.register_object(ZEOStorage308Adapter(self))
         else:
             self.client = ClientStub(conn)
-        addr = conn.addr
-        if isinstance(addr, type("")):
-            label = addr
-        else:
-            host, port = addr
-            label = str(host) + ":" + str(port)
-        self.log_label = _label + "/" + label
+        self.log_label = _addr_label(conn.addr)
 
     def notifyDisconnected(self):
         self.connection = None
@@ -146,7 +128,7 @@
         # any pending transaction.
         if self.transaction is not None:
             self.log("disconnected during %s transaction"
-                     % self.locked and 'locked' or 'unlocked')
+                     % (self.locked and 'locked' or 'unlocked'))
             self.tpc_abort(self.transaction.id)
         else:
             self.log("disconnected")
@@ -451,7 +433,9 @@
         if self.locked:
             self.server.unlock_storage(self)
             self.locked = 0
-        self.transaction = None
+        if self.transaction is not None:
+            self.server.stop_waiting(self)
+            self.transaction = None
         self.stats.active_txns -= 1
         if self.txnlog is not None:
             self.txnlog.close()
@@ -462,13 +446,20 @@
 
     def vote(self, tid):
         self._check_tid(tid, exc=StorageTransactionError)
+        if self.locked or self.server.already_waiting(self):
+            raise StorageTransactionError('Already voting %s' % self.locked)
         return self._try_to_vote()
 
     def _try_to_vote(self, delay=None):
         if self.connection is None:
             return # We're disconnected
-        self.locked = self.server.lock_storage(self)
         if self.locked:
+            # as a consequence of the unlocking strategy,
+            # _try_to_vote may be called multiple times.
+            # Once we're locked, we should stop trying. :)
+            return
+        self.locked, delay = self.server.lock_storage(self, delay)
+        if self.locked:
             try:
                 self.log(
                     "Preparing to commit transaction: %d objects, %d bytes"
@@ -511,17 +502,16 @@
             else:
                 if delay is not None:
                     delay.reply(None)
+                else:
+                    return None
         else:
-            if delay == None:
-                self.log("(%r) queue lock: transactions waiting: %s"
-                         % (self.storage_id, self.server.waiting(self)+1))
-                delay = Delay()
-            self.server.unlock_callback(self, delay)
             return delay
 
     def _unlock_callback(self, delay):
         connection = self.connection
-        if connection is not None:
+        if connection is None:
+            self.server.stop_waiting(self)
+        else:
             connection.call_from_thread(self._try_to_vote, delay)
 
     # The public methods of the ZEO client API do not do the real work.
@@ -764,6 +754,8 @@
         for iid in iids:
             self._iterators.pop(iid, None)
 
+    def server_status(self):
+        return self.server.server_status(self)
 
 class StorageServerDB:
 
@@ -873,7 +865,6 @@
 
         self.addr = addr
         self.storages = storages
-        set_label()
         msg = ", ".join(
             ["%s:%s:%s" % (name, storage.isReadOnly() and "RO" or "RW",
                            storage.getName())
@@ -884,7 +875,7 @@
 
         self._lock = threading.Lock()
         self._commit_locks = {}
-        self._unlock_callbacks = dict((name, []) for name in storages)
+        self._waiting = dict((name, []) for name in storages)
 
         self.read_only = read_only
         self.auth_protocol = auth_protocol
@@ -1171,29 +1162,54 @@
             if conn.obj in cl:
                 cl.remove(conn.obj)
 
-    def lock_storage(self, zeostore):
+    def lock_storage(self, zeostore, delay):
         storage_id = zeostore.storage_id
+        waiting = self._waiting[storage_id]
         with self._lock:
+
             if storage_id in self._commit_locks:
-                return False
-            self._commit_locks[storage_id] = zeostore
-            self.timeouts[storage_id].begin(zeostore)
-            self.stats[storage_id].lock_time = time.time()
-        return True
+                # The lock is held by another zeostore
 
+                assert self._commit_locks[storage_id] is not zeostore
+
+                if delay is None:
+                    # New request, queue it
+                    delay = Delay()
+                    waiting.append((zeostore, delay))
+                    zeostore.log("(%r) queue lock: transactions waiting: %s"
+                                 % (storage_id, len(waiting)),
+                                 _level_for_waiting(waiting)
+                                 )
+
+                return False, delay
+            else:
+                self._commit_locks[storage_id] = zeostore
+                self.timeouts[storage_id].begin(zeostore)
+                self.stats[storage_id].lock_time = time.time()
+                if delay:
+                    # we were waiting, stop
+                    waiting[:] = [i for i in waiting if i[0] is not zeostore]
+                zeostore.log("(%r) lock: transactions waiting: %s"
+                             % (storage_id, len(waiting)),
+                             _level_for_waiting(waiting)
+                             )
+                return True, delay
+
     def unlock_storage(self, zeostore):
         storage_id = zeostore.storage_id
+        waiting = self._waiting[storage_id]
         with self._lock:
             assert self._commit_locks[storage_id] is zeostore
             del self._commit_locks[storage_id]
             self.timeouts[storage_id].end(zeostore)
             self.stats[storage_id].lock_time = None
-            callbacks = self._unlock_callbacks[storage_id][:]
-            del self._unlock_callbacks[storage_id][:]
+            callbacks = waiting[:]
 
         if callbacks:
             zeostore.log("(%r) unlock: transactions waiting: %s"
-                         % (storage_id, len(callbacks)-1))
+                         % (storage_id, len(callbacks)),
+                         _level_for_waiting(callbacks)
+                         )
 
             for zeostore, delay in callbacks:
                 try:
@@ -1203,14 +1219,41 @@
                 except Exception:
                     logger.exception("Calling unlock callback")
 
-    def unlock_callback(self, zeostore, delay):
+    def stop_waiting(self, zeostore):
         storage_id = zeostore.storage_id
+        waiting = self._waiting[storage_id]
         with self._lock:
-            self._unlock_callbacks[storage_id].append((zeostore, delay))
+            new_waiting = [i for i in waiting if i[0] is not zeostore]
+            if len(new_waiting) == len(waiting):
+                return
+            waiting[:] = new_waiting
 
-    def waiting(self, zeostore):
-        return len(self._unlock_callbacks[zeostore.storage_id])
+        zeostore.log("(%r) dequeue lock: transactions waiting: %s"
+                     % (storage_id, len(waiting)),
+                     _level_for_waiting(waiting)
+                     )
 
+    def already_waiting(self, zeostore):
+        storage_id = zeostore.storage_id
+        waiting = self._waiting[storage_id]
+        with self._lock:
+            return bool([i for i in waiting if i[0] is zeostore])
+
+    def server_status(self, zeostore):
+        storage_id = zeostore.storage_id
+        status = self.stats[storage_id].__dict__.copy()
+        status['connections'] = len(status['connections'])
+        status['waiting'] = len(self._waiting[storage_id])
+        return status
+
+def _level_for_waiting(waiting):
+    if len(waiting) > 9:
+        return logging.CRITICAL
+    if len(waiting) > 3:
+        return logging.WARNING
+    else:
+        return logging.DEBUG
+
 class StubTimeoutThread:
 
     def begin(self, client):
@@ -1454,4 +1497,10 @@
     def __getattr__(self, name):
         return getattr(self.storage, name)
 
+def _addr_label(addr):
+    if isinstance(addr, type("")):
+        return addr
+    else:
+        host, port = addr
+        return str(host) + ":" + str(port)
 

Modified: ZODB/branches/jim-dev/src/ZEO/tests/forker.py
===================================================================
--- ZODB/branches/jim-dev/src/ZEO/tests/forker.py	2010-05-01 15:45:55 UTC (rev 111838)
+++ ZODB/branches/jim-dev/src/ZEO/tests/forker.py	2010-05-01 15:49:27 UTC (rev 111839)
@@ -42,7 +42,7 @@
         self.authentication_protocol = None
         self.authentication_database = None
         self.authentication_realm = None
-        self.loglevel = 'INFO'
+        self.loglevel = 'DEBUG'
 
     def dump(self, f):
         print >> f, "<zeo>"

Modified: ZODB/branches/jim-dev/src/ZEO/tests/servertesting.py
===================================================================
--- ZODB/branches/jim-dev/src/ZEO/tests/servertesting.py	2010-05-01 15:45:55 UTC (rev 111838)
+++ ZODB/branches/jim-dev/src/ZEO/tests/servertesting.py	2010-05-01 15:49:27 UTC (rev 111839)
@@ -31,9 +31,15 @@
 import ZEO.StorageServer
 import ZEO.zrpc.connection
 import ZEO.zrpc.error
+import ZODB.MappingStorage
 
 class StorageServer(ZEO.StorageServer.StorageServer):
 
+    def __init__(self, addr='test_addr', storages=None, **kw):
+        if storages is None:
+            storages = {'1': ZODB.MappingStorage.MappingStorage()}
+        ZEO.StorageServer.StorageServer.__init__(self, addr, storages, **kw)
+
     def DispatcherClass(*args, **kw):
         pass
 
@@ -42,9 +48,10 @@
     peer_protocol_version = ZEO.zrpc.connection.Connection.current_protocol
     connected = True
 
-    def __init__(self, name='connection', addr='test-addr'):
+    def __init__(self, name='connection', addr=''):
+        name = str(name)
         self.name = name
-        self.addr = addr
+        self.addr = addr or 'test-addr-'+name
 
     def close(self):
         print self.name, 'closed'
@@ -65,3 +72,9 @@
 
     def send_reply(self, *args):
         pass
+
+def client(server, name='client', addr=''):
+    zs = ZEO.StorageServer.ZEOStorage(server)
+    zs.notifyConnected(Connection(name, addr))
+    zs.register('1', 0)
+    return zs

Modified: ZODB/branches/jim-dev/src/ZEO/tests/testZEO2.py
===================================================================
--- ZODB/branches/jim-dev/src/ZEO/tests/testZEO2.py	2010-05-01 15:45:55 UTC (rev 111838)
+++ ZODB/branches/jim-dev/src/ZEO/tests/testZEO2.py	2010-05-01 15:49:27 UTC (rev 111839)
@@ -13,6 +13,7 @@
 ##############################################################################
 from zope.testing import doctest, setupstack, renormalizing
 import logging
+import pprint
 import re
 import sys
 import transaction
@@ -93,7 +94,6 @@
 handled correctly:
 
     >>> zs1.tpc_abort('0') # doctest: +ELLIPSIS
-    (511/test-addr) ('1') unlock: transactions waiting: 0
     2 callAsync serialnos ...
     reply 1 None
 
@@ -200,12 +200,210 @@
     """
 
 
+def some_basic_locking_tests():
+    r"""
+
+    >>> itid = 0
+    >>> def start_trans(zs):
+    ...     global itid
+    ...     itid += 1
+    ...     tid = str(itid)
+    ...     zs.tpc_begin(tid, '', '', {})
+    ...     zs.storea(ZODB.utils.p64(99), ZODB.utils.z64, 'x', tid)
+    ...     return tid
+
+    >>> server = ZEO.tests.servertesting.StorageServer()
+
+    >>> handler = logging.StreamHandler(sys.stdout)
+    >>> handler.setFormatter(logging.Formatter(
+    ...     '%(name)s %(levelname)s\n%(message)s'))
+    >>> logging.getLogger('ZEO').addHandler(handler)
+    >>> logging.getLogger('ZEO').setLevel(logging.DEBUG)
+
+We start a transaction and vote, this leads to getting the lock.
+
+    >>> zs1 = ZEO.tests.servertesting.client(server, '1')
+    >>> tid1 = start_trans(zs1)
+    >>> zs1.vote(tid1) # doctest: +ELLIPSIS
+    ZEO.StorageServer DEBUG
+    (test-addr-1) ('1') lock: transactions waiting: 0
+    ZEO.StorageServer BLATHER
+    (test-addr-1) Preparing to commit transaction: 1 objects, 36 bytes
+    1 callAsync serialnos ...
+
+If another client tried to vote, it's lock request will be queued and
+a delay will be returned:
+
+    >>> zs2 = ZEO.tests.servertesting.client(server, '2')
+    >>> tid2 = start_trans(zs2)
+    >>> delay = zs2.vote(tid2)
+    ZEO.StorageServer DEBUG
+    (test-addr-2) ('1') queue lock: transactions waiting: 1
+
+    >>> delay.set_sender(0, zs2.connection)
+
+When we end the first transaction, the queued vote gets the lock.
+
+    >>> zs1.tpc_abort(tid1) # doctest: +ELLIPSIS
+    ZEO.StorageServer DEBUG
+    (test-addr-1) ('1') unlock: transactions waiting: 1
+    ZEO.StorageServer DEBUG
+    (test-addr-2) ('1') lock: transactions waiting: 0
+    ZEO.StorageServer BLATHER
+    (test-addr-2) Preparing to commit transaction: 1 objects, 36 bytes
+    2 callAsync serialnos ...
+
+Let's try again with the first client. The vote will be queued:
+
+    >>> tid1 = start_trans(zs1)
+    >>> delay = zs1.vote(tid1)
+    ZEO.StorageServer DEBUG
+    (test-addr-1) ('1') queue lock: transactions waiting: 1
+
+If the queued transaction is aborted, it will be dequeued:
+
+    >>> zs1.tpc_abort(tid1) # doctest: +ELLIPSIS
+    ZEO.StorageServer DEBUG
+    (test-addr-1) ('1') dequeue lock: transactions waiting: 0
+
+BTW, voting multiple times will error:
+
+    >>> zs2.vote(tid2)
+    Traceback (most recent call last):
+    ...
+    StorageTransactionError: Already voting
+
+    >>> tid1 = start_trans(zs1)
+    >>> delay = zs1.vote(tid1)
+    ZEO.StorageServer DEBUG
+    (test-addr-1) ('1') queue lock: transactions waiting: 1
+
+    >>> delay.set_sender(0, zs1.connection)
+
+    >>> zs1.vote(tid1)
+    Traceback (most recent call last):
+    ...
+    StorageTransactionError: Already voting
+
+Note that the locking activity is logged at debug level to avoid
+cluttering log files, however, as the number of waiting votes
+increased, so does the logging level:
+
+    >>> clients = []
+    >>> for i in range(9):
+    ...     client = ZEO.tests.servertesting.client(server, str(i+10))
+    ...     tid = start_trans(client)
+    ...     delay = client.vote(tid)
+    ...     clients.append(client)
+    ZEO.StorageServer DEBUG
+    (test-addr-10) ('1') queue lock: transactions waiting: 2
+    ZEO.StorageServer DEBUG
+    (test-addr-11) ('1') queue lock: transactions waiting: 3
+    ZEO.StorageServer WARNING
+    (test-addr-12) ('1') queue lock: transactions waiting: 4
+    ZEO.StorageServer WARNING
+    (test-addr-13) ('1') queue lock: transactions waiting: 5
+    ZEO.StorageServer WARNING
+    (test-addr-14) ('1') queue lock: transactions waiting: 6
+    ZEO.StorageServer WARNING
+    (test-addr-15) ('1') queue lock: transactions waiting: 7
+    ZEO.StorageServer WARNING
+    (test-addr-16) ('1') queue lock: transactions waiting: 8
+    ZEO.StorageServer WARNING
+    (test-addr-17) ('1') queue lock: transactions waiting: 9
+    ZEO.StorageServer CRITICAL
+    (test-addr-18) ('1') queue lock: transactions waiting: 10
+
+If a client with the transaction lock disconnects, it will abort and
+release the lock and one of the waiting clients will get the lock.
+
+    >>> zs2.notifyDisconnected() # doctest: +ELLIPSIS
+    ZEO.StorageServer INFO
+    (test-addr-2) disconnected during locked transaction
+    ZEO.StorageServer CRITICAL
+    (test-addr-2) ('1') unlock: transactions waiting: 10
+    ZEO.StorageServer WARNING
+    (test-addr-1) ('1') lock: transactions waiting: 9
+    ZEO.StorageServer BLATHER
+    (test-addr-1) Preparing to commit transaction: 1 objects, 36 bytes
+    1 callAsync serialnos ...
+
+(In practice, waiting clients won't necessarily get the lock in order.)
+
+We can find out about the current lock state, and get other server
+statistics using the server_status method:
+
+    >>> pprint.pprint(zs1.server_status(), width=1)
+    {'aborts': 3,
+     'active_txns': 10,
+     'commits': 0,
+     'conflicts': 0,
+     'conflicts_resolved': 0,
+     'connections': 11,
+     'loads': 0,
+     'lock_time': 1272653598.693882,
+     'start': 'Fri Apr 30 14:53:18 2010',
+     'stores': 13,
+     'verifying_clients': 0,
+     'waiting': 9}
+
+(Note that the connections count above is off by 1 due to the way the
+test infrastructure works.)
+
+If clients disconnect while waiting, they will be dequeued:
+
+    >>> for client in clients:
+    ...     client.notifyDisconnected()
+    ZEO.StorageServer INFO
+    (test-addr-10) disconnected during unlocked transaction
+    ZEO.StorageServer WARNING
+    (test-addr-10) ('1') dequeue lock: transactions waiting: 8
+    ZEO.StorageServer INFO
+    (test-addr-11) disconnected during unlocked transaction
+    ZEO.StorageServer WARNING
+    (test-addr-11) ('1') dequeue lock: transactions waiting: 7
+    ZEO.StorageServer INFO
+    (test-addr-12) disconnected during unlocked transaction
+    ZEO.StorageServer WARNING
+    (test-addr-12) ('1') dequeue lock: transactions waiting: 6
+    ZEO.StorageServer INFO
+    (test-addr-13) disconnected during unlocked transaction
+    ZEO.StorageServer WARNING
+    (test-addr-13) ('1') dequeue lock: transactions waiting: 5
+    ZEO.StorageServer INFO
+    (test-addr-14) disconnected during unlocked transaction
+    ZEO.StorageServer WARNING
+    (test-addr-14) ('1') dequeue lock: transactions waiting: 4
+    ZEO.StorageServer INFO
+    (test-addr-15) disconnected during unlocked transaction
+    ZEO.StorageServer DEBUG
+    (test-addr-15) ('1') dequeue lock: transactions waiting: 3
+    ZEO.StorageServer INFO
+    (test-addr-16) disconnected during unlocked transaction
+    ZEO.StorageServer DEBUG
+    (test-addr-16) ('1') dequeue lock: transactions waiting: 2
+    ZEO.StorageServer INFO
+    (test-addr-17) disconnected during unlocked transaction
+    ZEO.StorageServer DEBUG
+    (test-addr-17) ('1') dequeue lock: transactions waiting: 1
+    ZEO.StorageServer INFO
+    (test-addr-18) disconnected during unlocked transaction
+    ZEO.StorageServer DEBUG
+    (test-addr-18) ('1') dequeue lock: transactions waiting: 0
+
+    >>> logging.getLogger('ZEO').setLevel(logging.NOTSET)
+    >>> logging.getLogger('ZEO').removeHandler(handler)
+    """
+
+
 def test_suite():
     return unittest.TestSuite((
         doctest.DocTestSuite(
             setUp=ZODB.tests.util.setUp, tearDown=setupstack.tearDown,
             checker=renormalizing.RENormalizing([
                 (re.compile('\d+/test-addr'), ''),
+                (re.compile("'lock_time': \d+.\d+"), 'lock_time'),
+                (re.compile(r"'start': '[^\n]+'"), 'start'),
                 ]),
             ),
         ))



More information about the checkins mailing list