[Zodb-checkins] CVS: StandaloneZODB/ZEO - zrpc2.py:1.1.2.17

Jeremy Hylton jeremy@zope.com
Thu, 3 Jan 2002 19:18:54 -0500


Update of /cvs-repository/StandaloneZODB/ZEO
In directory cvs.zope.org:/tmp/cvs-serv23687

Modified Files:
      Tag: ZEO-ZRPC-Dev
	zrpc2.py 
Log Message:
Three sets of changes.

In a block _do_io() call, raise Disconnected() if the connection is closed.

Refactor __connect() to put some of the socket logic in a separate
method.  _connect_socket() returns either a connected socket or None.
Put __connect() in a try/finally that always clears _thread.

Add readable() and writable() implementations to Connection() that
prevent a closed connection from getting in a socket map.




=== StandaloneZODB/ZEO/zrpc2.py 1.1.2.16 => 1.1.2.17 ===
 import zeolog
 import ThreadedAsync
+from Exceptions import Disconnected
 
 REPLY = ".reply" # message name used for replies
 ASYNC = 1
@@ -140,6 +141,7 @@
     """
     __super_init = smac.SizedMessageAsyncConnection.__init__
     __super_close = smac.SizedMessageAsyncConnection.close
+    __super_writable = smac.SizedMessageAsyncConnection.writable
 
     def __init__(self, sock, addr, obj=None, pickle=None):
         self.msgid = 0
@@ -163,6 +165,16 @@
     def __repr__(self):
         return "<%s %s>" % (self.__class__.__name__, self.addr)
 
+    # XXX are the readable() and writable() methods necessary?
+
+    def readable(self):
+        return not self.closed
+
+    def writable(self):
+        if self.closed:
+            return 0
+        return self.__super_writable()
+
     def close(self):
         if self.closed:
             return
@@ -366,7 +378,9 @@
             if wait:
                 # do loop only if lock is already acquired
                 while not self.__reply_lock.acquire(0):
-                    asyncore.poll(60.0, self._map)
+                    asyncore.poll(10.0, self._map)
+                    if self.closed:
+                        raise Disconnected()
                 self.__reply_lock.release()
             else:
                 asyncore.poll(0.0, self._map)
@@ -394,6 +408,8 @@
         self.tmax = tmax
         self.debug = debug
         self.connected = 0
+        # If _thread is not None, then there is a helper thread
+        # attempting to connect.  _thread is protected by _connect_lock.
         self._thread = None
         self._connect_lock = threading.Lock()
         self.trigger = None
@@ -446,34 +462,51 @@
         This method should always be called by attempt_connect() or by
         connect().
         """
-        
-        tries = 0
-        t = self.tmin
-        while not (self.connected or self.closed) \
-              and (repeat or (tries == 0)):
-            tries = tries + 1
-            log("Trying to connect to server")
-            try:
-                if type(self.addr) is types.StringType:
-                    s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+
+        try:
+            tries = 0
+            t = self.tmin
+            while not (self.connected or self.closed) \
+                  and (repeat or (tries == 0)):
+                tries = tries + 1
+                print self, tries, self.closed
+                log("Trying to connect to server")
+                s = self._connect_socket()
+                if s is None:
+                    if repeat:
+                        t = self._wait(t)
                 else:
-                    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-                s.connect(self.addr)
-            except socket.error, msg:
-                if self.debug:
-                    log("Failed to connect to server: %s" % msg,
-                        level=zeolog.DEBUG)
-                if repeat:
-                    t = self._wait(t)
+                    if self.debug:
+                        log("Connected to server", level=zeolog.DEBUG)
+                    self.connected = 1
+            if self.connected and not self.closed:
+                print "connected"
+                c = ManagedConnection(s, self.addr, self.obj, self)
+                log("Connection created: %s" % c)
+                try:
+                    self.obj.notifyConnected(c)
+                except:
+                    # XXX
+                    c.close()
+                    raise
+        finally:
+            # must always clear _thread on the way out
+            self._thread = None
+
+    def _connect_socket(self):
+        try:
+            if type(self.addr) is types.StringType:
+                s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
             else:
-                if self.debug:
-                    log("Connected to server", level=zeolog.DEBUG)
-                self.connected = 1
-        if self.connected and not self.closed:
-            c = ManagedConnection(s, self.addr, self.obj, self)
-            log("Connection created: %s" % c)
-            self.obj.notifyConnected(c)
-        self._thread = None
+                s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+            s.connect(self.addr)
+        except socket.error, msg:
+            if self.debug:
+                log("Failed to connect to server: %s" % msg,
+                    level=zeolog.DEBUG)
+            s.close()
+            return None
+        return s
 
     def _wait(self, t):
         time.sleep(t)