[Zope-Checkins] CVS: ZODB3/ZEO - start.py:1.52 StorageServer.py:1.79 ClientStorage.py:1.78

Jeremy Hylton jeremy@zope.com
Mon, 18 Nov 2002 18:17:41 -0500


Update of /cvs-repository/ZODB3/ZEO
In directory cvs.zope.org:/tmp/cvs-serv21897/ZEO

Modified Files:
	start.py StorageServer.py ClientStorage.py 
Log Message:
Merge ZODB 3.1 changes to the trunk.
XXX Not sure if berkeley still works.


=== ZODB3/ZEO/start.py 1.51 => 1.52 ===
--- ZODB3/ZEO/start.py:1.51	Fri Nov  1 15:36:28 2002
+++ ZODB3/ZEO/start.py	Mon Nov 18 18:17:41 2002
@@ -17,6 +17,8 @@
 
 import sys, os, getopt
 import types
+import errno
+import socket
 
 def directory(p, n=1):
     d = p


=== ZODB3/ZEO/StorageServer.py 1.78 => 1.79 ===
--- ZODB3/ZEO/StorageServer.py:1.78	Wed Nov 13 06:24:36 2002
+++ ZODB3/ZEO/StorageServer.py	Mon Nov 18 18:17:41 2002
@@ -206,17 +206,16 @@
 
     def __init__(self, server, read_only=0):
         self.server = server
+        self.connection = None
         self.client = None
         self.storage = None
         self.storage_id = "uninitialized"
         self.transaction = None
         self.read_only = read_only
-        self.timeout = TimeoutThread()
-        self.timeout.start()
 
     def notifyConnected(self, conn):
+        self.connection = conn # For restart_other() below
         self.client = self.ClientStorageStubClass(conn)
-        self.timeout.notifyConnected(conn)
 
     def notifyDisconnected(self):
         # When this storage closes, we must ensure that it aborts
@@ -226,7 +225,6 @@
             self.abort()
         else:
             self.log("disconnected")
-        self.timeout.notifyDisconnected()
 
     def __repr__(self):
         tid = self.transaction and repr(self.transaction.id)
@@ -416,13 +414,8 @@
                                               " requests from one client.")
 
         # (This doesn't require a lock because we're using asyncore)
-        if self.storage._transaction is None:
-            self.strategy = self.ImmediateCommitStrategyClass(self.storage,
-                                                              self.client)
-            self.timeout.begin()
-        else:
-            self.strategy = self.DelayedCommitStrategyClass(self.storage,
-                                                            self.wait)
+        self.strategy = self.DelayedCommitStrategyClass(self.storage,
+                                                        self.wait)
 
         t = Transaction()
         t.id = id
@@ -436,7 +429,6 @@
     def tpc_finish(self, id):
         if not self.check_tid(id):
             return
-        self.timeout.end()
         invalidated = self.strategy.tpc_finish()
         if invalidated:
             self.server.invalidate(self, self.storage_id,
@@ -448,7 +440,6 @@
     def tpc_abort(self, id):
         if not self.check_tid(id):
             return
-        self.timeout.end()
         strategy = self.strategy
         strategy.tpc_abort()
         self.transaction = None
@@ -469,9 +460,7 @@
 
     def vote(self, id):
         self.check_tid(id, exc=StorageTransactionError)
-        r = self.strategy.tpc_vote()
-        self.timeout.begin()
-        return r
+        return self.strategy.tpc_vote()
 
     def abortVersion(self, src, id):
         self.check_tid(id, exc=StorageTransactionError)
@@ -503,8 +492,10 @@
                      "Clients waiting: %d." % len(self.storage._waiting))
             return d
         else:
-            self.restart()
-            return None
+            return self.restart()
+
+    def dontwait(self):
+        return self.restart()
 
     def handle_waiting(self):
         while self.storage._waiting:
@@ -526,7 +517,7 @@
         except:
             self.log("Unexpected error handling waiting transaction",
                      level=zLOG.WARNING, error=sys.exc_info())
-            zeo_storage._conn.close()
+            zeo_storage.connection.close()
             return 0
         else:
             return 1
@@ -539,6 +530,8 @@
         resp = old_strategy.restart(self.strategy)
         if delay is not None:
             delay.reply(resp)
+        else:
+            return resp
 
 # A ZEOStorage instance can use different strategies to commit a
 # transaction.  The current implementation uses different strategies
@@ -767,79 +760,6 @@
             self.delay.error(sys.exc_info())
         else:
             self.delay.reply(result)
-
-class TimeoutThread(threading.Thread):
-    # A TimeoutThread is associated with a ZEOStorage.  It trackes
-    # how long transactions take to commit.  If a transaction takes
-    # too long, it will close the connection.
-
-    TIMEOUT = 30
-
-    def __init__(self):
-        threading.Thread.__init__(self)
-        self._lock = threading.Lock()
-        self._timestamp = None
-        self._conn = None
-
-    def begin(self):
-        self._lock.acquire()
-        try:
-            self._timestamp = time.time()
-        finally:
-            self._lock.release()
-
-    def end(self):
-        self._lock.acquire()
-        try:
-            self._timestamp = None
-        finally:
-            self._lock.release()
-
-    # There's a race here, but I hope it is harmless.
-
-    def notifyConnected(self, conn):
-        self._conn = conn
-
-    def notifyDisconnected(self):
-        self._conn = None
-
-    def run(self):
-        timeout = self.TIMEOUT
-        while self._conn is not None:
-            time.sleep(timeout)
-            
-            self._lock.acquire()
-            try:
-                if self._timestamp is not None:
-                    deadline = self._timestamp + self.TIMEOUT
-                else:
-                    log("TimeoutThread no current transaction",
-                        zLOG.BLATHER)
-                    timeout = self.TIMEOUT
-                    continue
-            finally:
-                self._lock.release()
-                
-            timeout = deadline - time.time()
-            if deadline < time.time():
-                self._abort()
-                break
-            else:
-                elapsed = self.TIMEOUT - timeout
-                log("TimeoutThread transaction has %0.2f sec to complete"
-                    " (%.2f elapsed)" % (timeout, elapsed), zLOG.BLATHER)
-        log("TimeoutThread exiting.  Connection closed.", zLOG.BLATHER)
-
-    def _abort(self):
-        # It's possible for notifyDisconnected to remove the connection
-        # just before we use it.  I think that's harmless, since it means
-        # the connection was closed.
-        log("TimeoutThread aborting transaction", zLOG.WARNING)
-        try:
-            self._conn.close()
-        except AttributeError, msg:
-            log(msg)
-
 
 # Patch up class references
 StorageServer.ZEOStorageClass = ZEOStorage


=== ZODB3/ZEO/ClientStorage.py 1.77 => 1.78 ===
--- ZODB3/ZEO/ClientStorage.py:1.77	Wed Nov 13 06:24:36 2002
+++ ZODB3/ZEO/ClientStorage.py	Mon Nov 18 18:17:41 2002
@@ -28,9 +28,11 @@
 
 import cPickle
 import os
+import socket
 import tempfile
 import threading
 import time
+import types
 
 from ZEO import ClientCache, ServerStub
 from ZEO.TransactionBuffer import TransactionBuffer
@@ -204,6 +206,8 @@
         self._storage = storage
         self._read_only_fallback = read_only_fallback
         self._connection = None
+        # _server_addr is used by sortKey()
+        self._server_addr = None
 
         self._info = {'length': 0, 'size': 0, 'name': 'ZEO Client',
                       'supportsUndo':0, 'supportsVersions': 0,
@@ -339,6 +343,7 @@
             log2(INFO, "Reconnected to storage")
         else:
             log2(INFO, "Connected to storage")
+        self.set_server_addr(conn.get_addr())
         stub = self.StorageServerStubClass(conn)
         self._oids = []
         self._info.update(stub.get_info())
@@ -350,6 +355,33 @@
         self._connection = conn
         self._server = stub
 
+    def set_server_addr(self, addr):
+        # Normalize server address and convert to string
+        if isinstance(addr, types.StringType):
+            self._server_addr = addr
+        else:
+            assert isinstance(addr, types.TupleType)
+            # If the server is on a remote host, we need to guarantee
+            # that all clients used the same name for the server.  If
+            # they don't, the sortKey() may be different for each client.
+            # The best solution seems to be the official name reported
+            # by gethostbyaddr().
+            host = addr[0]
+            try:
+                canonical, aliases, addrs = socket.gethostbyaddr(host)
+            except socket.error, err:
+                log2(BLATHER, "Error resoving host: %s (%s)" % (host, err))
+                canonical = host
+            self._server_addr = str((canonical, addr[1]))
+
+    def sortKey(self):
+        # If the client isn't connected to anything, it can't have a
+        # valid sortKey().  Raise an error to stop the transaction early.
+        if self._server_addr is None:
+            raise ClientDisconnected
+        else:
+            return self._server_addr
+
     def verify_cache(self, server):
         """Internal routine called to verify the cache."""
         # XXX beginZeoVerify ends up calling back to beginVerify() below.
@@ -622,10 +654,14 @@
         """Internal helper to end a transaction."""
         # the right way to set self._transaction to None
         # calls notify() on _tpc_cond in case there are waiting threads
+        self._ltid = self._serial
         self._tpc_cond.acquire()
         self._transaction = None
         self._tpc_cond.notify()
         self._tpc_cond.release()
+
+    def lastTransaction(self):
+        return self._ltid
 
     def tpc_abort(self, transaction):
         """Storage API: abort a transaction."""