[Zodb-checkins] CVS: StandaloneZODB/ZEO - logger.py:1.2 ClientStorage.py:1.40 StorageServer.py:1.36 smac.py:1.16 start.py:1.31 trigger.py:1.5 zrpc.py:1.23

Jeremy Hylton jeremy@zope.com
Thu, 4 Apr 2002 18:09:31 -0500


Update of /cvs-repository/StandaloneZODB/ZEO
In directory cvs.zope.org:/tmp/cvs-serv14866

Modified Files:
	ClientStorage.py StorageServer.py smac.py start.py trigger.py 
	zrpc.py 
Added Files:
	logger.py 
Log Message:
Commit the zeo-1_0-debug-branch to the trunk.

I expect this code will become ZEO 1.1.


=== StandaloneZODB/ZEO/logger.py 1.1 => 1.2 ===
+from types import StringType
+from zLOG import *
+
+__all__ = ["zLogger", "format_msg"]
+
+_MAX_MSG_SIZE = 120
+
+def format_msg(*args):
+    accum = []
+    total_len = 0
+    for arg in args:
+        if not isinstance(arg, StringType):
+            arg = str(arg)
+        accum.append(arg)
+        total_len = total_len + len(arg)
+        if total_len >= _MAX_MSG_SIZE:
+            break
+    m = string.join(accum)
+    if len(m) > _MAX_MSG_SIZE:
+        m = m[:_MAX_MSG_SIZE] + ' ...'
+    return m
+
+class zLogger:
+
+    def __init__(self, channel):
+        self.channel = channel
+
+    def __str__(self):
+        raise RuntimeError, "don't print me"
+
+    def trace(self, msg):
+        LOG(self.channel, TRACE, msg)
+
+    def debug(self, msg):
+        LOG(self.channel, DEBUG, msg)
+
+    def blather(self, msg):
+        LOG(self.channel, BLATHER, msg)
+
+    def info(self, msg):
+        LOG(self.channel, INFO, msg)
+
+    def problem(self, msg):
+        LOG(self.channel, PROBLEM, msg)
+
+    def warning(self, msg):
+        LOG(self.channel, WARNING, msg)
+
+    def error(self, msg, error=None):
+        LOG(self.channel, ERROR, msg, error=error)
+
+    def panic(self, msg):
+        LOG(self.channel, PANIC, msg)


=== StandaloneZODB/ZEO/ClientStorage.py 1.39 => 1.40 ===
 __version__='$Revision$'[11:-2]
 
-import struct, time, os, socket, string, Sync, zrpc, ClientCache
-import tempfile, Invalidator, ExtensionClass, thread
-import ThreadedAsync
-
-now=time.time
+import struct, time, os, socket, string
+import tempfile, thread
 from struct import pack, unpack
+from types import TupleType
+
+import Invalidator, ExtensionClass
+import ThreadedAsync, Sync, zrpc, ClientCache
+
 from ZODB import POSException, BaseStorage
 from ZODB.TimeStamp import TimeStamp
-from zLOG import LOG, PROBLEM, INFO
 
-try: from ZODB.ConflictResolution import ResolvedSerial
-except: ResolvedSerial='rs'
+from ZEO.logger import zLogger
+
+log = zLogger("ZEO Client")
 
-TupleType=type(())
+try:
+    from ZODB.ConflictResolution import ResolvedSerial
+except:
+    ResolvedSerial='rs'
 
 class ClientStorageError(POSException.StorageError):
     """An error occured in the ZEO Client Storage"""
@@ -62,8 +67,12 @@
         self._info={'length': 0, 'size': 0, 'name': 'ZEO Client',
                     'supportsUndo':0, 'supportsVersions': 0,
                     }
-        
-        self._call=zrpc.asyncRPC(connection, debug=debug,
+
+        if debug:
+            debug_log = log
+        else:
+            debug_log = None
+        self._call=zrpc.asyncRPC(connection, debug=debug_log,
                                  tmin=min_disconnect_poll,
                                  tmax=max_disconnect_poll)
 
@@ -132,7 +141,7 @@
             # If we can't connect right away, go ahead and open the cache
             # and start a separate thread to try and reconnect.
 
-            LOG("ClientStorage", PROBLEM, "Failed to connect to storage")
+            log.problem("Failed to connect to storage")
             self._cache.open()
             thread.start_new_thread(self._call.connect,(0,))
 
@@ -140,7 +149,7 @@
             # notifyConnected
 
     def notifyConnected(self, s):
-        LOG("ClientStorage", INFO, "Connected to storage")
+        log.info("Connected to storage")
         self._lock_acquire()
         try:
             
@@ -197,7 +206,7 @@
     ### responsible for starting the thread that makes the connection.
 
     def notifyDisconnected(self, ignored):
-        LOG("ClientStorage", PROBLEM, "Disconnected from storage")
+        log.problem("Disconnected from storage")
         self._connected=0
         self._transaction=None
         thread.start_new_thread(self._call.connect,(0,))
@@ -233,7 +242,7 @@
     def close(self):
         self._lock_acquire()
         try:
-            LOG("ClientStorage", INFO, "close")
+            log.info("close")
             self._call.closeIntensionally()
             try:
                 self._tfile.close()
@@ -549,6 +558,9 @@
         finally: self._lock_release()
 
     def sync(self): self._call.sync()
+
+    def status(self):
+        self._call.sendMessage('status')
 
 def getWakeup(_w=[]):
     if _w: return _w[0]


=== StandaloneZODB/ZEO/StorageServer.py 1.35 => 1.36 ===
 
 import asyncore, socket, string, sys, os
-from smac import SizedMessageAsyncConnection
-from ZODB import POSException
 import cPickle
 from cPickle import Unpickler
+from cStringIO import StringIO
+from thread import start_new_thread
+import time
+from types import StringType
+
+from ZODB import POSException
 from ZODB.POSException import TransactionError, UndoError, VersionCommitError
 from ZODB.Transaction import Transaction
-import traceback
-from zLOG import LOG, INFO, ERROR, TRACE, BLATHER
 from ZODB.referencesf import referencesf
-from thread import start_new_thread
-from cStringIO import StringIO
+from ZODB.utils import U64
+
 from ZEO import trigger
 from ZEO import asyncwrap
-from ZEO.smac import Disconnected
-from types import StringType
-
-class StorageServerError(POSException.StorageError): pass
-
-max_blather=120
-def blather(*args):
-    accum = []
-    total_len = 0
-    for arg in args:
-        if not isinstance(arg, StringType):
-            arg = str(arg)
-        accum.append(arg)
-        total_len = total_len + len(arg)
-        if total_len >= max_blather:
-            break
-    m = string.join(accum)
-    if len(m) > max_blather: m = m[:max_blather] + ' ...'
-    LOG('ZEO Server', TRACE, m)
+from ZEO.smac import Disconnected, SizedMessageAsyncConnection
+from ZEO.logger import zLogger, format_msg
 
+class StorageServerError(POSException.StorageError):
+    pass
 
 # We create a special fast pickler! This allows us
 # to create slightly more efficient pickles and
@@ -56,6 +43,8 @@
 pickler.fast=1 # Don't use the memo
 dump=pickler.dump
 
+log = zLogger("ZEO Server")
+
 class StorageServer(asyncore.dispatcher):
 
     def __init__(self, connection, storages):
@@ -80,14 +69,14 @@
             self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
             self.set_reuse_addr()
 
-        LOG('ZEO Server', INFO, 'Listening on %s' % repr(connection))
+        log.info('Listening on %s' % repr(connection))
         self.bind(connection)
         self.listen(5)
 
     def register_connection(self, connection, storage_id):
         storage=self.__storages.get(storage_id, None)
         if storage is None:
-            LOG('ZEO Server', ERROR, "Unknown storage_id: %s" % storage_id)
+            log.error("Unknown storage_id: %s" % storage_id)
             connection.close()
             return None, None
         
@@ -126,18 +115,29 @@
     
     def handle_accept(self):
         try:
-            sock, addr = self.accept()
-        except socket.error:
-            sys.stderr.write('warning: accept failed\n')
+            r = self.accept()
+            if r is None:
+                return
+            sock, addr = r
+        except socket.error, err:
+            log.warning("accept() failed: %s" % err)
         else:
             ZEOConnection(self, sock, addr)
 
-    def log_info(self, message, type='info'):
-        if type=='error': type=ERROR
-        else: type=INFO
-        LOG('ZEO Server', type, message)
+    def status(self):
+        """Log status information about connections and storages"""
 
-    log=log_info
+        lines = []
+        for storage_id, connections in self.__connections.items():
+            s = "Storage %s has %d connections" % (storage_id,
+                                                   len(connections))
+            lines.append(s)
+            for c in connections:
+                lines.append("%s readable=%s writeable=%s" % (
+                   c, c.readable(), c.writable()))
+                lines.append("\t" + c.stats())
+        log.info(string.join(lines, "\n"))
+        return _noreturn
 
 storage_methods={}
 for n in (
@@ -148,6 +148,7 @@
     'tpc_finish', 'undo', 'undoLog', 'undoInfo', 'versionEmpty', 'versions',
     'transactionalUndo',
     'vote', 'zeoLoad', 'zeoVerify', 'beginZeoVerify', 'endZeoVerify',
+    'status'
     ):
     storage_methods[n]=1
 storage_method=storage_methods.has_key
@@ -159,7 +160,8 @@
         raise StorageServerError, (
             "Couldn\'t import global module %s" % module)
 
-    try: r=getattr(m, name)
+    try:
+        r=getattr(m, name)
     except:
         raise StorageServerError, (
             "Couldn\'t find global %s in module %s" % (name, module))
@@ -177,12 +179,52 @@
 
     def __init__(self, server, sock, addr):
         self.__server=server
+        self.status = server.status
         self.__invalidated=[]
         self.__closed=None
-        if __debug__: debug='ZEO Server'
-        else: debug=0
+        if __debug__:
+            debug = log
+        else:
+            debug = None
+
+        if __debug__:
+            # store some detailed statistics about method calls
+            self._last_method = None
+            self._t_begin = None
+            self._t_end = None
+            self._ncalls = 0
+            
         SizedMessageAsyncConnection.__init__(self, sock, addr, debug=debug)
-        LOG('ZEO Server', INFO, 'Connect %s %s' % (id(self), `addr`))
+        self.logaddr = repr(addr) # form of addr suitable for logging
+        log.info('Connect %s %s' % (id(self), self.logaddr))
+
+    def stats(self):
+        # This method is called via the status() command.  The stats
+        # are of limited use for the current command, because the
+        # actual invocation of status() will clobber the previous
+        # method's statistics.
+        #
+        # When there are multiple connections active, a new connection
+        # can always get detailed statistics about other connections.
+        if __debug__:
+            if self._last_method == "status":
+                return "method=status begin=%s end=... ncalls=%d" % (
+                    self._t_begin, self._ncalls)
+            if self._t_end is not None and self._t_begin is not None:
+                delta = self._t_end - self._t_begin
+            else:
+                delta = -1
+            return "method=%s begin=%s end=%s delta=%.3f ncalls=%d" % (
+                self._last_method, self._t_begin, self._t_end, delta,
+                self._ncalls)
+        else:
+            return ""
+
+    def __repr__(self):
+        return "<ZEOConnection %s%s" % (`self.addr`,
+                         # sort of messy way to add tag 'closed' to
+                         # connections that are closed
+                         (self.__closed is None and '>' or ' closed>'))
 
     def close(self):
         t=self._transaction
@@ -196,19 +238,26 @@
         self.__server.unregister_connection(self, self.__storage_id)
         self.__closed=1
         SizedMessageAsyncConnection.close(self)
-        LOG('ZEO Server', INFO, 'Close %s' % id(self))
+        log.info('Close %s' % id(self))
 
     def message_input(self, message,
                       dump=dump, Unpickler=Unpickler, StringIO=StringIO,
                       None=None):
         if __debug__:
-            if len(message) > max_blather:
-                tmp = `message[:max_blather]`
+
+            self._t_begin = time.time()
+            self._t_end = None
+            
+            if len(message) > 120: # XXX need constant from logger
+                tmp = `message[:120]`
             else:
                 tmp = `message`
-            blather('message_input', id(self), tmp)
+            log.trace("message_input %s" % tmp)
 
         if self.__storage is None:
+            if __debug__:
+                log.blather("register connection to %s from %s" % (message,
+                                                                self.logaddr))
             # This is the first communication from the client
             self.__storage, self.__storage_id = (
                 self.__server.register_connection(self, message))
@@ -226,27 +275,42 @@
             
             name, args = args[0], args[1:]
             if __debug__:
-                apply(blather,
-                      ("call", id(self), ":", name,) + args)
+                self._last_method = name
+                self._ncalls = self._ncalls + 1
+                log.debug("call %s%s from %s" % (name, format_msg(args),
+                                                 self.logaddr))
                 
             if not storage_method(name):
+                log.warning("Invalid method name: %s" % name)
+                if __debug__:
+                    self._t_end = time.time()
                 raise 'Invalid Method Name', name
             if hasattr(self, name):
                 r=apply(getattr(self, name), args)
             else:
                 r=apply(getattr(self.__storage, name), args)
-            if r is _noreturn: return
-        except (UndoError, VersionCommitError):
-            # These are normal usage errors. No need to leg them
+            if r is _noreturn:
+                if __debug__:
+                    log.debug("no return to %s" % self.logaddr)
+                    self._t_end = time.time()
+                return
+        except (UndoError, VersionCommitError), err:
+            if __debug__:
+                log.debug("return error %s to %s" % (err, self.logaddr))
+                self._t_end = time.time()
+            # These are normal usage errors. No need to log them.
             self.return_error(sys.exc_info()[0], sys.exc_info()[1])
             return
         except:
-            LOG('ZEO Server', ERROR, 'error', error=sys.exc_info())
+            if __debug__:
+                self._t_end = time.time()
+            log.error("error", error=sys.exc_info())
             self.return_error(sys.exc_info()[0], sys.exc_info()[1])
             return
 
         if __debug__:
-            blather("%s R: %s" % (id(self), `r`))
+            log.debug("return %s to %s" % (format_msg(r), self.logaddr))
+            self._t_end = time.time()
             
         r=dump(r,1)            
         self.message_output('R'+r)
@@ -256,7 +320,7 @@
             err_value = err_type, err_value
 
         if __debug__:
-            blather("%s E: %s" % (id(self), `err_value`))
+            log.trace("%s E: %s" % (id(self), `err_value`))
                     
         try: r=dump(err_value, 1)
         except:
@@ -292,6 +356,8 @@
             }
 
     def zeoLoad(self, oid):
+        if __debug__:
+            log.blather("zeoLoad(%s) %s" % (U64(oid), self.logaddr))
         storage=self.__storage
         v=storage.modifiedInVersion(oid)
         if v: pv, sv = storage.load(oid, v)
@@ -308,6 +374,8 @@
             
 
     def beginZeoVerify(self):
+        if __debug__:
+            log.blather("beginZeoVerify() %s" % self.logaddr)
         self.message_output('bN.')            
         return _noreturn
 
@@ -324,6 +392,8 @@
         return _noreturn
 
     def endZeoVerify(self):
+        if __debug__:
+            log.blather("endZeoVerify() %s" % self.logaddr)
         self.message_output('eN.')
         return _noreturn
 
@@ -340,11 +410,11 @@
 
     def _pack(self, t, wait=0):
         try:
-            LOG('ZEO Server', BLATHER, 'pack begin')
+            log.blather('pack begin')
             self.__storage.pack(t, referencesf)
-            LOG('ZEO Server', BLATHER, 'pack end')
+            log.blather('pack end')
         except:
-            LOG('ZEO Server', ERROR,
+            log.error(
                 'Pack failed for %s' % self.__storage_id,
                 error=sys.exc_info())
             if wait:
@@ -381,6 +451,9 @@
 
     def storea(self, oid, serial, data, version, id,
                dump=dump):
+        if __debug__:
+            log.blather("storea(%s, [%d], %s) %s" % (U64(oid), len(data),
+                                                   U64(id), self.logaddr))
         try:
             t=self._transaction
             if t is None or id != t.id:
@@ -396,7 +469,7 @@
             # all errors need to be serialized to prevent unexpected
             # returns, which would screw up the return handling.
             # IOW, Anything that ends up here is evil enough to be logged.
-            LOG('ZEO Server', ERROR, 'store error', error=sys.exc_info())
+            log.error('store error', error=sys.exc_info())
             newserial=sys.exc_info()[1]
         else:
             if serial != '\0\0\0\0\0\0\0\0':
@@ -420,12 +493,17 @@
         return self.__storage.tpc_vote(t)
 
     def transactionalUndo(self, trans_id, id):
+        if __debug__:
+            log.blather("transactionalUndo(%s, %s) %s" % (trans_id,
+                                                        U64(id), self.logaddr))
         t=self._transaction
         if t is None or id != t.id:
             raise POSException.StorageTransactionError(self, id)
         return self.__storage.transactionalUndo(trans_id, self._transaction)
         
     def undo(self, transaction_id):
+        if __debug__:
+            log.blather("undo(%s) %s" % (transaction_id, self.logaddr))
         oids=self.__storage.undo(transaction_id)
         if oids:
             self.__server.invalidate(
@@ -457,11 +535,15 @@
 
     def commitlock_suspend(self, resume, args, onerror):
         self.__storage._waiting.append((resume, args, onerror))
+        log.blather("suspend %s.  %d queued clients" % (resume.im_self,
+                                            len(self.__storage._waiting)))
 
     def commitlock_resume(self):
         waiting = self.__storage._waiting
         while waiting:
             resume, args, onerror = waiting.pop(0)
+            log.blather("resuming queued client %s, %d still queued" % (
+                resume.im_self, len(waiting)))
             try:
                 if apply(resume, args):
                     break
@@ -471,12 +553,18 @@
                 # disconnect will have generated its own log event.
                 onerror()
             except:
-                LOG('ZEO Server', ERROR,
+                log.error(
                     "Unexpected error handling queued tpc_begin()",
                     error=sys.exc_info())
                 onerror()
 
     def tpc_abort(self, id):
+        if __debug__:
+            try:
+                log.blather("tpc_abort(%s) %s" % (U64(id), self.logaddr))
+            except:
+                print repr(id)
+                raise
         t = self._transaction
         if t is None or id != t.id:
             return
@@ -492,6 +580,10 @@
         self.message_output('UN.')
 
     def tpc_begin(self, id, user, description, ext):
+        if __debug__:
+            log.blather("tpc_begin(%s, %s, %s) %s" % (U64(id), `user`,
+                                                      `description`,
+                                                      self.logaddr))
         t = self._transaction
         if t is not None:
             if id == t.id:
@@ -505,7 +597,8 @@
         if storage._transaction is not None:
             self.commitlock_suspend(self.unlock, (), self.close)
             return 1 # Return a flag indicating a lock condition.
-            
+
+        assert id != 't'
         self._transaction=t=Transaction()
         t.id=id
         t.user=user
@@ -542,6 +635,8 @@
         return 1
 
     def tpc_finish(self, id, user, description, ext):
+        if __debug__:
+            log.blather("tpc_finish(%s) %s" % (U64(id), self.logaddr))
         t = self._transaction
         if id != t.id:
             return
@@ -564,7 +659,7 @@
 if __name__=='__main__':
     import ZODB.FileStorage
     name, port = sys.argv[1:3]
-    blather(name, port)
+    log.trace(format_msg(name, port))
     try:
         port='', int(port)
     except:


=== StandaloneZODB/ZEO/smac.py 1.15 => 1.16 ===
 import asyncore, string, struct, zLOG, sys, Acquisition
 import socket, errno
-from zLOG import LOG, TRACE, ERROR, INFO
+from logger import zLogger
 
 # Use the dictionary to make sure we get the minimum number of errno
 # entries.   We expect that EWOULDBLOCK == EAGAIN on most systems --
@@ -48,10 +48,10 @@
         SizedMessageAsyncConnection.inheritedAttribute(
             '__init__')(self, sock, map)
         self.addr=addr
-        if debug is not None:
-            self._debug=debug
-        elif not hasattr(self, '_debug'):
-            self._debug=__debug__ and 'smac'
+        if debug is None and __debug__:
+            self._debug = zLogger("smac")
+        else:
+            self._debug = debug
         self.__state=None
         self.__inp=None
         self.__inpl=0
@@ -132,23 +132,18 @@
 
     def message_output(self, message,
                        pack=struct.pack, len=len):
-        if self._debug:
-            if len(message) > 40: m=message[:40]+' ...'
-            else: m=message
-            LOG(self._debug, TRACE, 'message_output %s' % `m`)
+        if self._debug is not None:
+            if len(message) > 40:
+                m = message[:40]+' ...'
+            else:
+                m = message
+            self._debug.trace('message_output %s' % `m`)
 
         append=self.__append
         if append is None:
             raise Disconnected("This action is temporarily unavailable.<p>")
         
         append(pack(">i",len(message))+message)
-
-    def log_info(self, message, type='info'):
-        if type=='error': type=ERROR
-        else: type=INFO
-        LOG('ZEO', type, message)
-
-    log=log_info
 
     def close(self):
         if self.__append is not None:


=== StandaloneZODB/ZEO/start.py 1.30 => 1.31 ===
                            )
 
-    opts, args = getopt.getopt(args, 'p:Ddh:U:sS:u:')
+    fs = os.path.join(var, 'Data.fs')
 
-    fs=os.path.join(var, 'Data.fs')
-
-    usage="""%s [options] [filename]
+    usage = """%s [options] [filename]
 
     where options are:
 
@@ -121,6 +119,13 @@
     if no file name is specified, then %s is used.
     """ % (me, fs)
 
+    try:
+        opts, args = getopt.getopt(args, 'p:Ddh:U:sS:u:')
+    except getopt.error, err:
+        print err
+        print usage
+        sys.exit(1)
+
     port=None
     debug=detailed=0
     host=''
@@ -217,15 +222,15 @@
             import signal
 
             signal.signal(signal.SIGTERM,
-                          lambda sig, frame, s=storages: shutdown(s)
-                          )
+                          lambda sig, frame, s=storages: shutdown(s))
             signal.signal(signal.SIGINT,
-                          lambda sig, frame, s=storages: shutdown(s, 0)
-                          )
-            try: signal.signal(signal.SIGHUP, rotate_logs_handler)
-            except: pass
-
-        except: pass
+                          lambda sig, frame, s=storages: shutdown(s, 0))
+            try:
+                signal.signal(signal.SIGHUP, rotate_logs_handler)
+            except:
+                pass
+        except:
+            pass
 
         items=storages.items()
         items.sort()
@@ -236,13 +241,16 @@
 
         ZEO.StorageServer.StorageServer(unix, storages)
 
-        try: ppid, pid = os.getppid(), os.getpid()
-        except: pass # getpid not supported
-        else: open(zeo_pid,'w').write("%s %s" % (ppid, pid))
+        try:
+            ppid, pid = os.getppid(), os.getpid()
+        except:
+            pass # getpid not supported
+        else:
+            open(zeo_pid,'w').write("%s %s" % (ppid, pid))
 
     except:
         # Log startup exception and tell zdaemon not to restart us.
-        info=sys.exc_info()
+        info = sys.exc_info()
         try:
             import zLOG
             zLOG.LOG("z2", zLOG.PANIC, "Startup exception",
@@ -280,21 +288,29 @@
     # unnecessary, since we now use so_reuseaddr.
     for ignored in 1,2:
         for socket in asyncore.socket_map.values():
-            try: socket.close()
-            except: pass
+            try:
+                socket.close()
+            except:
+                pass
 
     for storage in storages.values():
-        try: storage.close()
-        except: pass
+        try:
+            storage.close()
+        except:
+            pass
 
     try:
         from zLOG import LOG, INFO
         LOG('ZEO Server', INFO,
             "Shutting down (%s)" % (die and "shutdown" or "restart")
             )
-    except: pass
+    except:
+        pass
     
-    if die: sys.exit(0)
-    else: sys.exit(1)
+    if die:
+        sys.exit(0)
+    else:
+        sys.exit(1)
 
-if __name__=='__main__': main(sys.argv)
+if __name__ == '__main__':
+    main(sys.argv)


=== StandaloneZODB/ZEO/trigger.py 1.4 => 1.5 ===
 # from Sam Rushing's Medusa server.
 
-
 import asyncore
-#import asynchat
-
+import errno
 import os
 import socket
 import string
@@ -26,7 +24,7 @@
     
 if os.name == 'posix':
 
-    class trigger (asyncore.file_dispatcher):
+    class trigger(asyncore.file_dispatcher):
 
         "Wake up a call to select() running in the main thread"
 
@@ -58,10 +56,10 @@
         # new data onto a channel's outgoing data queue at the same time that
         # the main thread is trying to remove some]
 
-        def __init__ (self):
+        def __init__(self):
             r, w = self._fds = os.pipe()
             self.trigger = w
-            asyncore.file_dispatcher.__init__ (self, r)
+            asyncore.file_dispatcher.__init__(self, r)
             self.lock = thread.allocate_lock()
             self.thunks = []
 
@@ -69,30 +67,35 @@
             os.close(self._fds[0])
             os.close(self._fds[1])
 
-        def __repr__ (self):
-            return '<select-trigger (pipe) at %x>' % id(self)
+        def __repr__(self):
+            return '<select-trigger(pipe) at %x>' % id(self)
 
-        def readable (self):
+        def readable(self):
             return 1
 
-        def writable (self):
+        def writable(self):
             return 0
 
-        def handle_connect (self):
+        def handle_connect(self):
             pass
 
-        def pull_trigger (self, thunk=None):
+        def pull_trigger(self, thunk=None):
             # print 'PULL_TRIGGER: ', len(self.thunks)
             if thunk:
                 try:
                     self.lock.acquire()
-                    self.thunks.append (thunk)
+                    self.thunks.append(thunk)
                 finally:
                     self.lock.release()
-            os.write (self.trigger, 'x')
+            os.write(self.trigger, 'x')
 
-        def handle_read (self):
-            self.recv (8192)
+        def handle_read(self):
+            try:
+                self.recv(8192)
+            except os.error, err:
+                if err[0] == errno.EAGAIN: # resource temporarily unavailable
+                    return
+                raise
             try:
                 self.lock.acquire()
                 for thunk in self.thunks:
@@ -101,7 +104,7 @@
                     except:
                         nil, t, v, tbinfo = asyncore.compact_traceback()
                         print ('exception in trigger thunk:'
-                               ' (%s:%s %s)' % (t, v, tbinfo))
+                               '(%s:%s %s)' % (t, v, tbinfo))
                 self.thunks = []
             finally:
                 self.lock.release()
@@ -113,13 +116,13 @@
 
     # win32-safe version
 
-    class trigger (asyncore.dispatcher):
+    class trigger(asyncore.dispatcher):
 
         address = ('127.9.9.9', 19999)
 
-        def __init__ (self):
-            a = socket.socket (socket.AF_INET, socket.SOCK_STREAM)
-            w = socket.socket (socket.AF_INET, socket.SOCK_STREAM)
+        def __init__(self):
+            a = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+            w = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 
             # set TCP_NODELAY to true to avoid buffering
             w.setsockopt(socket.IPPROTO_TCP, 1, 1)
@@ -137,45 +140,50 @@
                         raise 'Bind Error', 'Cannot bind trigger!'
                     port=port - 1
             
-            a.listen (1)
-            w.setblocking (0)
+            a.listen(1)
+            w.setblocking(0)
             try:
-                w.connect (self.address)
+                w.connect(self.address)
             except:
                 pass
             r, addr = a.accept()
             a.close()
-            w.setblocking (1)
+            w.setblocking(1)
             self.trigger = w
 
-            asyncore.dispatcher.__init__ (self, r)
+            asyncore.dispatcher.__init__(self, r)
             self.lock = thread.allocate_lock()
             self.thunks = []
             self._trigger_connected = 0
 
-        def __repr__ (self):
+        def __repr__(self):
             return '<select-trigger (loopback) at %x>' % id(self)
 
-        def readable (self):
+        def readable(self):
             return 1
 
-        def writable (self):
+        def writable(self):
             return 0
 
-        def handle_connect (self):
+        def handle_connect(self):
             pass
 
-        def pull_trigger (self, thunk=None):
+        def pull_trigger(self, thunk=None):
             if thunk:
                 try:
                     self.lock.acquire()
-                    self.thunks.append (thunk)
+                    self.thunks.append(thunk)
                 finally:
                     self.lock.release()
-            self.trigger.send ('x')
+            self.trigger.send('x')
 
-        def handle_read (self):
-            self.recv (8192)
+        def handle_read(self):
+            try:
+                self.recv(8192)
+            except os.error, err:
+                if err[0] == errno.EAGAIN: # resource temporarily unavailable
+                    return
+                raise
             try:
                 self.lock.acquire()
                 for thunk in self.thunks:


=== StandaloneZODB/ZEO/zrpc.py 1.22 => 1.23 ===
         self.__call_lr=l.release
 
-    def connect(self, tryonce=1, log_type='client'):
+    def connect(self, tryonce=1):
         t=self._tmin
         connection = self._connection
         debug=self._debug
         while self.__closed == 0:
-            if log_type: LOG(log_type, INFO,
-                             'Trying to connect to server: %s' % `connection`)
+            LOG("client", INFO,
+                'Trying to connect to server: %s' % `connection`)
             try:
                 if type(connection) is type(''):
                     s=socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
@@ -75,15 +75,15 @@
                     s=socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                 s.connect(connection)    
             except Exception, err:
-                if debug:
-                    LOG(debug, DEBUG, "Failed to connect to server: %s" % err)
+                if debug is not None:
+                    debug.blather("Failed to connect to server: %s" % err)
                 if tryonce: return 0
                 time.sleep(t)
                 t=t*2
                 if t > self._tmax: t=self._tmax
             else:
-                if debug:
-                    LOG(debug, DEBUG, "Connected to server")
+                if debug is not None:
+                    debug.blather("Connected to server")
                     
                 # Make sure the result lock is set, se we don't
                 # get an old result (e.g. the exception that
@@ -199,12 +199,12 @@
         self._outOfBand=f
 
     def message_input(self, m):
-        if self._debug:
+        if self._debug is not None:
             if len(m) > 60:
                 md = repr(m[:60]) + ' ...'
             else:
                 md = repr(m)
-            LOG(self._debug, TRACE, 'message_input %s' % md)
+            self._debug.trace('message_input %s' % md)
 
         c=m[:1]
         if c in 'RE':