[Zodb-checkins] SVN: ZODB/branches/jim-async-client/src/ZEO/zrpc/connection.py Inlined poll function in preparation for adding a heartbeat.

Jim Fulton jim at zope.com
Mon Jul 17 10:50:27 EDT 2006


Log message for revision 69153:
  Inlined poll function in preparation for adding a heartbeat.
  

Changed:
  U   ZODB/branches/jim-async-client/src/ZEO/zrpc/connection.py

-=-
Modified: ZODB/branches/jim-async-client/src/ZEO/zrpc/connection.py
===================================================================
--- ZODB/branches/jim-async-client/src/ZEO/zrpc/connection.py	2006-07-17 14:22:33 UTC (rev 69152)
+++ ZODB/branches/jim-async-client/src/ZEO/zrpc/connection.py	2006-07-17 14:50:26 UTC (rev 69153)
@@ -19,6 +19,8 @@
 import types
 import logging
 
+import traceback, time
+
 import ThreadedAsync
 from ZEO.zrpc import smac
 from ZEO.zrpc.error import ZRPCError, DisconnectedError
@@ -34,17 +36,116 @@
 # Dedicated Client select loop:
 client_map = {}
 client_trigger = trigger(client_map)
+client_timeout = 30.0
 
 def client_loop():
     map = client_map
-    poll_fun = asyncore.poll
     logger = logging.getLogger('ZEO.zrpc.client_loop')
+    logger.addHandler(logging.StreamHandler())
+
+    read = asyncore.read
+    write = asyncore.write
+    _exception = asyncore._exception
+    
     while map:
         try:
-            poll_fun(30.0, map)
+            r = []; w = []; e = []
+            for fd, obj in map.items():
+                is_r = obj.readable()
+                is_w = obj.writable()
+                if is_r:
+                    r.append(fd)
+                if is_w:
+                    w.append(fd)
+                if is_r or is_w:
+                    e.append(fd)
+
+            try:
+                r, w, e = select.select(r, w, e, client_timeout)
+            except select.error, err:
+                if err[0] != errno.EINTR:
+                    if err[0] == errno.EBADF:
+
+                        # If a connection is closed while we are
+                        # calling select on it, we can get a bad
+                        # file-descriptor error.  We'll check for this
+                        # case by looking for entries in r and w that
+                        # are not in the socket map.
+
+                        if [fd for fd in r if fd not in client_map]:
+                            continue
+                        if [fd for fd in w if fd not in client_map]:
+                            continue
+                        
+#                        print 'BADF', list(client_map), r, w, e
+                    raise
+                else:
+                    continue
+
+            for fd in r:
+                obj = map.get(fd)
+                if obj is None:
+                    continue
+                read(obj)
+
+            for fd in w:
+                obj = map.get(fd)
+                if obj is None:
+                    continue
+                write(obj)
+
+            for fd in e:
+                obj = map.get(fd)
+                if obj is None:
+                    continue
+                _exception(obj)
+
         except:
+#            print 'poll failure', sys.exc_info()[1], time.time()
             logger.exception('poll failure')
+            raise
 
+#import time
+def poll(timeout, map):
+    if map:
+        r = []; w = []; e = []
+        for fd, obj in map.items():
+            is_r = obj.readable()
+            is_w = obj.writable()
+            if is_r:
+                r.append(fd)
+            if is_w:
+                w.append(fd)
+            if is_r or is_w:
+                e.append(fd)
+
+        try:
+            r, w, e = select.select(r, w, e, timeout)
+        except select.error, err:
+            if err[0] != errno.EINTR:
+                raise
+            else:
+                return
+
+        for fd in r:
+            obj = map.get(fd)
+            if obj is None:
+                continue
+            asyncore.read(obj)
+
+        for fd in w:
+            obj = map.get(fd)
+            if obj is None:
+                continue
+            asyncore.write(obj)
+
+        for fd in e:
+            obj = map.get(fd)
+            if obj is None:
+                continue
+            asyncore._exception(obj)
+
+
 client_thread = threading.Thread(target=client_loop)
 client_thread.setDaemon(True)
 client_thread.start()



More information about the Zodb-checkins mailing list