[Zope3-checkins] CVS: Zope3/src/zodb/zeo - component.xml:1.2.2.1 runzeo.py:1.2.2.1 schema.xml:1.2.2.1 cache.py:1.4.20.1 client.py:1.11.2.1 interfaces.py:1.3.26.1 server.py:1.11.2.1 stubs.py:1.7.2.1 threadedasync.py:1.2.40.1

Grégoire Weber zope@i-con.ch
Sun, 22 Jun 2003 10:24:04 -0400


Update of /cvs-repository/Zope3/src/zodb/zeo
In directory cvs.zope.org:/tmp/cvs-serv24874/src/zodb/zeo

Modified Files:
      Tag: cw-mail-branch
	cache.py client.py interfaces.py server.py stubs.py 
	threadedasync.py 
Added Files:
      Tag: cw-mail-branch
	component.xml runzeo.py schema.xml 
Log Message:
Synced up with HEAD

=== Added File Zope3/src/zodb/zeo/component.xml ===
<component>

  <!-- stub out the type until we figure out how to zconfig logging -->
  <sectiontype name="eventlog" />

  <sectiontype name="zeo">

    <description>
      The content of a ZEO section describe operational parameters
      of a ZEO server except for the storage(s) to be served.
    </description>

    <key name="address" datatype="socket-address"
         required="yes">
      <description>
        The address at which the server should listen.  This can be in
        the form 'host:port' to signify a TCP/IP connection or a
        pathname string to signify a Unix domain socket connection (at
        least one '/' is required).  A hostname may be a DNS name or a
        dotted IP address.  If the hostname is omitted, the platform's
        default behavior is used when binding the listening socket (''
        is passed to socket.bind() as the hostname portion of the
        address).
      </description>
    </key>

    <key name="read-only" datatype="boolean"
         required="no"
         default="false">
      <description>
        Flag indicating whether the server should operate in read-only
        mode.  Defaults to false.  Note that even if the server is
        operating in writable mode, individual storages may still be
        read-only.  But if the server is in read-only mode, no write
        operations are allowed, even if the storages are writable.  Note
        that pack() is considered a read-only operation.
      </description>
    </key>

    <key name="invalidation-queue-size" datatype="integer"
         required="no"
         default="100">
      <description>
        The storage server keeps a queue of the objects modified by the
        last N transactions, where N == invalidation_queue_size.  This
        queue is used to speed client cache verification when a client
        disconnects for a short period of time.
      </description>
    </key>

    <key name="monitor-address" datatype="socket-address"
         required="no">
      <description>
        The address at which the monitor server should listen.  If
        specified, a monitor server is started.  The monitor server
        provides server statistics in a simple text format.  This can
        be in the form 'host:port' to signify a TCP/IP connection or a
        pathname string to signify a Unix domain socket connection (at
        least one '/' is required).  A hostname may be a DNS name or a
        dotted IP address.  If the hostname is omitted, the platform's
        default behavior is used when binding the listening socket (''
        is passed to socket.bind() as the hostname portion of the
        address).
      </description>
    </key>

    <key name="transaction-timeout" datatype="integer"
         required="no">
      <description>
        The maximum amount of time to wait for a transaction to commit
        after acquiring the storage lock, specified in seconds.  If the
        transaction takes too long, the client connection will be closed
        and the transaction aborted.
      </description>
    </key>

    <key name="authentication-protocol" required="no">
      <description>
        The name of the protocol used for authentication.  The
        only protocol provided with ZEO is "digest," but extensions
        may provide other protocols.
      </description>
    </key>

    <key name="authentication-database" required="no">
      <description>
        The path of the database containing authentication credentials.
      </description>
    </key>

    <key name="authentication-realm" required="no">
      <description>
        The authentication realm of the server.  Some authentication
        schemes use a realm to identify the logic set of usernames
        that are accepted by this server.
      </description>
    </key>

  </sectiontype>

</component>


=== Added File Zope3/src/zodb/zeo/runzeo.py ===
#!python
##############################################################################
#
# Copyright (c) 2001, 2002, 2003 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""Start the ZEO storage server.

Usage: %s [-C URL] [-a ADDRESS] [-f FILENAME] [-h]

Options:
-C/--configuration URL -- configuration file or URL
-a/--address ADDRESS -- server address of the form PORT, HOST:PORT, or PATH
                        (a PATH must contain at least one "/")
-f/--filename FILENAME -- filename for FileStorage
-t/--timeout TIMEOUT -- transaction timeout in secondes (default no timeout)
-h/--help -- print this usage message and exit
-m/--monitor ADDRESS -- address of monitor server ([HOST:]PORT or PATH)

Unless -C is specified, -a and -f are required.
"""

# The code here is designed to be reused by other, similar servers.
# For the forseeable future, it must work under Python 2.1 as well as
# 2.2 and above.

import os
import sys
import getopt
import signal
import socket
import logging

import ZConfig
from zdaemon.zdoptions import ZDOptions
from zodb import zeo

def parse_address(arg):
    # XXX Not part of the official ZConfig API
    obj = ZConfig.datatypes.SocketAddress(arg)
    return obj.family, obj.address

class ZEOOptionsMixin:

    storages = None

    def handle_address(self, arg):
        self.family, self.address = parse_address(arg)

    def handle_monitor_address(self, arg):
        self.monitor_family, self.monitor_address = parse_address(arg)

    def handle_filename(self, arg):
        from zodb.config import FileStorage # That's a FileStorage *opener*!
        class FSConfig:
            def __init__(self, name, path):
                self._name = name
                self.path = path
                self.create = 0
                self.read_only = 0
                self.stop = None
                self.quota = None
            def getSectionName(self):
                return self._name
        if not self.storages:
            self.storages = []
        name = str(1 + len(self.storages))
        conf = FileStorage(FSConfig(name, arg))
        self.storages.append(conf)

    def add_zeo_options(self):
        self.add(None, None, "a:", "address=", self.handle_address)
        self.add(None, None, "f:", "filename=", self.handle_filename)
        self.add("family", "zeo.address.family")
        self.add("address", "zeo.address.address",
                 required="no server address specified; use -a or -C")
        self.add("read_only", "zeo.read_only", default=0)
        self.add("invalidation_queue_size", "zeo.invalidation_queue_size",
                 default=100)
        self.add("transaction_timeout", "zeo.transaction_timeout",
                 "t:", "timeout=", float)
        self.add("monitor_address", "zeo.monitor_address", "m:", "monitor=",
                 self.handle_monitor_address)
        self.add('auth_protocol', 'zeo.authentication_protocol',
                 None, 'auth-protocol=', default=None)
        self.add('auth_database', 'zeo.authentication_database',
                 None, 'auth-database=')
        self.add('auth_realm', 'zeo.authentication_realm',
                 None, 'auth-realm=')

class ZEOOptions(ZDOptions, ZEOOptionsMixin):

    logsectionname = "eventlog"

    def __init__(self):
        self.schemadir = os.path.dirname(zeo.__file__)
        ZDOptions.__init__(self)
        self.add_zeo_options()
        self.add("storages", "storages",
                 required="no storages specified; use -f or -C")


class ZEOServer:

    def __init__(self, options):
        self.options = options

    def main(self):
        self.setup_default_logging()
        self.check_socket()
        self.clear_socket()
        try:
            self.open_storages()
            self.setup_signals()
            self.create_server()
            self.loop_forever()
        finally:
            self.close_storages()
            self.clear_socket()

    def setup_default_logging(self):
        if self.options.config_logger is not None:
            return
        if os.getenv("EVENT_LOG_FILE") is not None:
            return
        if os.getenv("STUPID_LOG_FILE") is not None:
            return
        # No log file is configured; default to stderr.  The logging
        # level can still be controlled by {STUPID,EVENT}_LOG_SEVERITY.
        os.environ["EVENT_LOG_FILE"] = ""

    def check_socket(self):
        if self.can_connect(self.options.family, self.options.address):
            self.options.usage("address %s already in use" %
                               repr(self.options.address))

    def can_connect(self, family, address):
        s = socket.socket(family, socket.SOCK_STREAM)
        try:
            s.connect(address)
        except socket.error:
            return 0
        else:
            s.close()
            return 1

    def clear_socket(self):
        if isinstance(self.options.address, type("")):
            try:
                os.unlink(self.options.address)
            except os.error:
                pass

    def open_storages(self):
        self.storages = {}
        for opener in self.options.storages:
            _logger.info("opening storage %r using %s"
                 % (opener.name, opener.__class__.__name__))
            self.storages[opener.name] = opener.open()

    def setup_signals(self):
        """Set up signal handlers.

        The signal handler for SIGFOO is a method handle_sigfoo().
        If no handler method is defined for a signal, the signal
        action is not changed from its initial value.  The handler
        method is called without additional arguments.
        """
        if os.name != "posix":
            return
        if hasattr(signal, 'SIGXFSZ'):
            signal.signal(signal.SIGXFSZ, signal.SIG_IGN) # Special case
        init_signames()
        for sig, name in signames.items():
            method = getattr(self, "handle_" + name.lower(), None)
            if method is not None:
                def wrapper(sig_dummy, frame_dummy, method=method):
                    method()
                signal.signal(sig, wrapper)

    def create_server(self):
        from zodb.zeo.server import StorageServer
        self.server = StorageServer(
            self.options.address,
            self.storages,
            read_only=self.options.read_only,
            invalidation_queue_size=self.options.invalidation_queue_size,
            transaction_timeout=self.options.transaction_timeout,
            monitor_address=self.options.monitor_address,
            auth_protocol=self.options.auth_protocol,
            auth_database=self.options.auth_database,
            auth_realm=self.options.auth_realm)

    def loop_forever(self):
        from zodb.zeo.threadedasync import LoopCallback
        LoopCallback.loop()

    def handle_sigterm(self):
        _logger.info("terminated by SIGTERM")
        sys.exit(0)

    def handle_sigint(self):
        _logger.info("terminated by SIGINT")
        sys.exit(0)

    def handle_sighup(self):
        _logger.info("restarted by SIGHUP")
        sys.exit(1)

    def handle_sigusr2(self):
        # How should this work with new logging?
        
        # This requires a modern zLOG (from Zope 2.6 or later); older
        # zLOG packages don't have the initialize() method
        _logger.info("reinitializing zLOG")
        # XXX Shouldn't this be below with _log()?
        import zLOG
        zLOG.initialize()
        _logger.info("reinitialized zLOG")

    def close_storages(self):
        for name, storage in self.storages.items():
            _logger.info("closing storage %r" % name)
            try:
                storage.close()
            except: # Keep going
                _logging.exception("failed to close storage %r" % name)


# Signal names

signames = None

def signame(sig):
    """Return a symbolic name for a signal.

    Return "signal NNN" if there is no corresponding SIG name in the
    signal module.
    """

    if signames is None:
        init_signames()
    return signames.get(sig) or "signal %d" % sig

def init_signames():
    global signames
    signames = {}
    for name, sig in signal.__dict__.items():
        k_startswith = getattr(name, "startswith", None)
        if k_startswith is None:
            continue
        if k_startswith("SIG") and not k_startswith("SIG_"):
            signames[sig] = name


# Main program

def main(args=None):
    global _logger
    _logger = logging.getLogger("runzeo")

    options = ZEOOptions()
    options.realize(args)
    s = ZEOServer(options)
    s.main()

if __name__ == "__main__":
    main()


=== Added File Zope3/src/zodb/zeo/schema.xml ===
<schema>

  <description>
    This schema describes the configuration of the ZEO storage server
    process.
  </description>

  <!-- Use the storage types defined by ZODB. -->
  <import package="zodb"/>

  <!-- Use the ZEO server information structure. -->
  <import package="zodb/zeo"/>

  <section type="zeo" name="*" required="yes" attribute="zeo" />

  <multisection name="+" type="ZODB.storage"
                attribute="storages"
                required="yes">
    <description>
      One or more storages that are provided by the ZEO server.  The
      section names are used as the storage names, and must be unique
      within each ZEO storage server.  Traditionally, these names
      represent small integers starting at '1'.
    </description>
  </multisection>

  <section name="*" type="eventlog" attribute="eventlog" required="no" />

</schema>


=== Zope3/src/zodb/zeo/cache.py 1.4 => 1.4.20.1 ===
--- Zope3/src/zodb/zeo/cache.py:1.4	Thu Mar 13 16:32:30 2003
+++ Zope3/src/zodb/zeo/cache.py	Sun Jun 22 10:22:30 2003
@@ -13,10 +13,11 @@
 ##############################################################################
 
 # XXX TO DO
-# use two indices rather than the sign bit of the index??????
-# add a shared routine to read + verify a record???
-# redesign header to include vdlen???
-# rewrite the cache using a different algorithm???
+# Add a shared routine to read + verify a record.  Have that routine
+#   return a record object rather than a string.
+# Use two indices rather than the sign bit of the index??????
+# Redesign header to include vdlen???
+# Rewrite the cache using a different algorithm???
 
 """Implement a client cache
 
@@ -44,7 +45,9 @@
 
   offset in record: name -- description
 
-  0: oid -- 8-byte object id
+  0: oidlen -- 2-byte unsigned object id length
+
+  2: reserved (6 bytes)
 
   8: status -- 1-byte status 'v': valid, 'n': non-version valid, 'i': invalid
                ('n' means only the non-version data in the record is valid)
@@ -57,23 +60,25 @@
 
   19: serial -- 8-byte non-version serial (timestamp)
 
-  27: data -- non-version data
+  27: oid -- object id
+
+  27+oidlen: data -- non-version data
 
-  27+dlen: version -- Version string (if vlen > 0)
+  27+oidlen+dlen: version -- Version string (if vlen > 0)
 
-  27+dlen+vlen: vdlen -- 4-byte length of version data (if vlen > 0)
+  27+oidlen+dlen+vlen: vdlen -- 4-byte length of version data (if vlen > 0)
 
-  31+dlen+vlen: vdata -- version data (if vlen > 0)
+  31+oidlen+dlen+vlen: vdata -- version data (if vlen > 0)
 
-  31+dlen+vlen+vdlen: vserial -- 8-byte version serial (timestamp)
+  31+oidlen+dlen+vlen+vdlen: vserial -- 8-byte version serial (timestamp)
                                  (if vlen > 0)
 
-  27+dlen (if vlen == 0) **or**
-  39+dlen+vlen+vdlen: tlen -- 4-byte (unsigned) record length (for
-                              redundancy and backward traversal)
+  27+oidlen+dlen (if vlen == 0) **or**
+  39+oidlen+dlen+vlen+vdlen: tlen -- 4-byte (unsigned) record length (for
+                                     redundancy and backward traversal)
 
-  31+dlen (if vlen == 0) **or**
-  43+dlen+vlen+vdlen: -- total record length (equal to tlen)
+  31+oidlen+dlen (if vlen == 0) **or**
+  43+oidlen+dlen+vlen+vdlen: -- total record length (equal to tlen)
 
 There is a cache size limit.
 
@@ -105,7 +110,6 @@
 file 0 and file 1.
 """
 
-import logging
 import os
 import time
 import logging
@@ -114,9 +118,9 @@
 from thread import allocate_lock
 
 from zodb.utils import u64
-from zodb.interfaces import ZERO
+from zodb.interfaces import ZERO, _fmt_oid
 
-magic = 'ZEC1'
+magic = 'ZEC2'
 headersize = 12
 
 MB = 1024**2
@@ -158,15 +162,13 @@
                 if os.path.exists(p[i]):
                     fi = open(p[i],'r+b')
                     if fi.read(4) == magic: # Minimal sanity
-                        fi.seek(0, 2)
-                        if fi.tell() > headersize:
-                            # Read serial at offset 19 of first record
-                            fi.seek(headersize + 19)
-                            s[i] = fi.read(8)
+                        # Read the ltid for this file.  If it never
+                        # saw a transaction commit, it will get tossed,
+                        # even if it has valid data.
+                        s[i] = fi.read(8)
                     # If we found a non-zero serial, then use the file
                     if s[i] != ZERO:
                         f[i] = fi
-                    fi = None
 
             # Whoever has the larger serial is the current
             if s[1] > s[0]:
@@ -186,11 +188,16 @@
             self._p = p = [None, None]
             f[0].write(magic + '\0' * (headersize - len(magic)))
             current = 0
+        self._current = current
 
-        self.log("%s: storage=%r, size=%r; file[%r]=%r",
-                 self.__class__.__name__, storage, size, current, p[current])
+        if self._ltid:
+            ts = "; last txn=%x" % u64(self._ltid)
+        else:
+            ts = ""
+        self.log("%s: storage=%r, size=%r; file[%r]=%r%s" %
+                 (self.__class__.__name__, storage, size, current, p[current],
+                  ts))
 
-        self._current = current
         self._setup_trace()
 
     def open(self):
@@ -224,6 +231,18 @@
                 except OSError:
                     pass
 
+    def _read_header(self, f, pos):
+        # Read record header from f at pos, returning header and oid.
+        f.seek(pos)
+        h = f.read(27)
+        if len(h) != 27:
+            self.log("_read_header: short record at %s in %s", pos, f.name)
+            return None, None
+        oidlen = unpack(">H", h[:2])[0]
+        oid = f.read(oidlen)
+        return h, oid
+        
+
     def getLastTid(self):
         """Get the last transaction id stored by setLastTid().
 
@@ -243,7 +262,7 @@
         f = self._f[self._current]
         f.seek(4)
         tid = f.read(8)
-        if len(tid) < 8 or tid == '\0\0\0\0\0\0\0\0':
+        if len(tid) < 8 or tid == ZERO:
             return None
         else:
             return tid
@@ -255,7 +274,7 @@
         cache file; otherwise it's an instance variable.
         """
         if self._client is None:
-            if tid == '\0\0\0\0\0\0\0\0':
+            if tid == ZERO:
                 tid = None
             self._ltid = tid
         else:
@@ -267,7 +286,7 @@
 
     def _setLastTid(self, tid):
         if tid is None:
-            tid = '\0\0\0\0\0\0\0\0'
+            tid = ZERO
         else:
             tid = str(tid)
             assert len(tid) == 8
@@ -292,18 +311,14 @@
                 return None
             f = self._f[p < 0]
             ap = abs(p)
-            f.seek(ap)
-            h = f.read(27)
-            if len(h) != 27:
-                self.log("invalidate: short record for oid %16x "
-                         "at position %d in cache file %d",
-                         U64(oid), ap, p < 0)
+            h, rec_oid = self._read_header(f, ap)
+            if h is None:
                 del self._index[oid]
                 return None
-            if h[:8] != oid:
-                self.log("invalidate: oid mismatch: expected %16x read %16x "
+            if rec_oid != oid:
+                self.log("invalidate: oid mismatch: expected %s read %s "
                          "at position %d in cache file %d",
-                         U64(oid), U64(h[:8]), ap, p < 0)
+                         _fmt_oid(oid), _fmt_oid(rec_oid), ap, p < 0)
                 del self._index[oid]
                 return None
             f.seek(ap+8) # Switch from reading to writing
@@ -329,16 +344,18 @@
             ap = abs(p)
             seek = f.seek
             read = f.read
-            seek(ap)
-            h = read(27)
-            if len(h)==27 and h[8] in 'nv' and h[:8]==oid:
+            h, rec_oid = self._read_header(f, ap)
+            if h is None:
+                del self._index[oid]
+                return None
+            if len(h) == 27 and h[8] in 'nv' and rec_oid == oid:
                 tlen, vlen, dlen = unpack(">iHi", h[9:19])
             else:
                 tlen = -1
             if tlen <= 0 or vlen < 0 or dlen < 0 or vlen+dlen > tlen:
-                self.log("load: bad record for oid %16x "
+                self.log("load: bad record for oid %s "
                          "at position %d in cache file %d",
-                         U64(oid), ap, p < 0)
+                         _fmt_oid(oid), ap, p < 0)
                 del self._index[oid]
                 return None
 
@@ -357,7 +374,14 @@
                     data = read(dlen)
                     self._trace(0x2A, oid, version, h[19:], dlen)
                     if (p < 0) != self._current:
-                        self._copytocurrent(ap, tlen, dlen, vlen, h, data)
+                        # If the cache read we are copying has version info,
+                        # we need to pass the header to copytocurrent().
+                        if vlen:
+                            vheader = read(vlen + 4)
+                        else:
+                            vheader = None
+                        self._copytocurrent(ap, tlen, dlen, vlen, h, oid,
+                                            data, vheader)
                     return data, h[19:]
                 else:
                     self._trace(0x26, oid, version)
@@ -369,12 +393,12 @@
             v = vheader[:-4]
             if version != v:
                 if dlen:
-                    seek(ap+27)
+                    seek(ap+27+len(oid))
                     data = read(dlen)
                     self._trace(0x2C, oid, version, h[19:], dlen)
                     if (p < 0) != self._current:
                         self._copytocurrent(ap, tlen, dlen, vlen, h,
-                                            data, vheader)
+                                            oid, data, vheader)
                     return data, h[19:]
                 else:
                     self._trace(0x28, oid, version)
@@ -386,12 +410,12 @@
             self._trace(0x2E, oid, version, vserial, vdlen)
             if (p < 0) != self._current:
                 self._copytocurrent(ap, tlen, dlen, vlen, h,
-                                    None, vheader, vdata, vserial)
+                                    oid, None, vheader, vdata, vserial)
             return vdata, vserial
         finally:
             self._release()
 
-    def _copytocurrent(self, pos, tlen, dlen, vlen, header,
+    def _copytocurrent(self, pos, tlen, dlen, vlen, header, oid,
                        data=None, vheader=None, vdata=None, vserial=None):
         """Copy a cache hit from the non-current file to the current file.
 
@@ -402,29 +426,32 @@
         if self._pos + tlen > self._limit:
             return # Don't let this cause a cache flip
         assert len(header) == 27
+        oidlen = len(oid)
         if header[8] == 'n':
             # Rewrite the header to drop the version data.
             # This shortens the record.
-            tlen = 31 + dlen
+            tlen = 31 + oidlen + dlen
             vlen = 0
-            # (oid:8, status:1, tlen:4, vlen:2, dlen:4, serial:8)
+            vheader = None
+            # (oidlen:2, reserved:6, status:1, tlen:4,
+            #  vlen:2, dlen:4, serial:8)
             header = header[:9] + pack(">IHI", tlen, vlen, dlen) + header[-8:]
         else:
             assert header[8] == 'v'
         f = self._f[not self._current]
         if data is None:
-            f.seek(pos+27)
+            f.seek(pos + 27 + len(oid))
             data = f.read(dlen)
             if len(data) != dlen:
                 return
-        l = [header, data]
+        l = [header, oid, data]
         if vlen:
             assert vheader is not None
             l.append(vheader)
             assert (vdata is None) == (vserial is None)
             if vdata is None:
                 vdlen = unpack(">I", vheader[-4:])[0]
-                f.seek(pos+27+dlen+vlen+4)
+                f.seek(pos + 27 + len(oid) + dlen + vlen + 4)
                 vdata = f.read(vdlen)
                 if len(vdata) != vdlen:
                     return
@@ -434,19 +461,20 @@
             l.append(vdata)
             l.append(vserial)
         else:
-            assert None is vheader is vdata is vserial
+            assert None is vheader is vdata is vserial, (
+                vlen, vheader, vdata, vserial)
+            
         l.append(header[9:13]) # copy of tlen
         g = self._f[self._current]
         g.seek(self._pos)
         g.writelines(l)
         assert g.tell() == self._pos + tlen
-        oid = header[:8]
         if self._current:
             self._index[oid] = - self._pos
         else:
             self._index[oid] = self._pos
         self._pos += tlen
-        self._trace(0x6A, header[:8], vlen and vheader[:-4] or '',
+        self._trace(0x6A, oid, vlen and vheader[:-4] or '',
                     vlen and vserial or header[-8:], dlen)
 
     def update(self, oid, serial, version, data, refs):
@@ -462,9 +490,11 @@
                 ap = abs(p)
                 seek = f.seek
                 read = f.read
-                seek(ap)
-                h = read(27)
-                if len(h)==27 and h[8] in 'nv' and h[:8]==oid:
+                h, rec_oid = self._read_header(f, ap)
+                if h is None:
+                    del self._index[oid]
+                    return None
+                if len(h)==27 and h[8] in 'nv' and rec_oid == oid:
                     tlen, vlen, dlen = unpack(">iHi", h[9:19])
                 else:
                     return self._store(oid, '', '', version, data, serial)
@@ -500,16 +530,19 @@
             ap = abs(p)
             seek = f.seek
             read = f.read
-            seek(ap)
-            h = read(27)
-            if len(h)==27 and h[8] in 'nv' and h[:8]==oid:
+            h, rec_oid = self._read_header(f, ap)
+            if h is None:
+                del self._index[oid]
+                return None
+                
+            if len(h) == 27 and h[8] in 'nv' and rec_oid == oid:
                 tlen, vlen, dlen = unpack(">iHi", h[9:19])
             else:
                 tlen = -1
             if tlen <= 0 or vlen < 0 or dlen < 0 or vlen+dlen > tlen:
-                self.log("modifiedInVersion: bad record for oid %16x "
+                self.log("modifiedInVersion: bad record for oid %s "
                          "at position %d in cache file %d",
-                         U64(oid), ap, p < 0)
+                         _fmt_oid(oid), ap, p < 0)
                 del self._index[oid]
                 return None
 
@@ -581,7 +614,7 @@
         if not s:
             p = ''
             s = ZERO
-        tlen = 31 + len(p)
+        tlen = 31 + len(oid) + len(p)
         if version:
             tlen = tlen + len(version) + 12 + len(pv)
             vlen = len(version)
@@ -590,7 +623,11 @@
 
         stlen = pack(">I", tlen)
         # accumulate various data to write into a list
-        l = [oid, 'v', stlen, pack(">HI", vlen, len(p)), s]
+        assert len(oid) < 2**16
+        assert vlen < 2**16
+        assert tlen < 2L**32
+        l = [pack(">H6x", len(oid)), 'v', stlen,
+             pack(">HI", vlen, len(p)), s, oid]
         if p:
             l.append(p)
         if version:
@@ -643,11 +680,11 @@
         if version:
             code |= 0x80
         self._tracefile.write(
-            struct_pack(">ii8s8s",
+            struct_pack(">iiH8s",
                         time_time(),
                         (dlen+255) & 0x7fffff00 | code | self._current,
-                        oid,
-                        serial))
+                        len(oid),
+                        serial) + oid)
 
     def read_index(self, serial, fileindex):
         index = self._index
@@ -658,9 +695,8 @@
         count = 0
 
         while 1:
-            f.seek(pos)
-            h = read(27)
-            if len(h) != 27:
+            h, oid = self._read_header(f, pos)
+            if h is None:
                 # An empty read is expected, anything else is suspect
                 if h:
                     self.rilog("truncated header", pos, fileindex)
@@ -674,8 +710,6 @@
                 self.rilog("invalid header data", pos, fileindex)
                 break
 
-            oid = h[:8]
-
             if h[8] == 'v' and vlen:
                 seek(dlen+vlen, 1)
                 vdlen = read(4)
@@ -683,7 +717,7 @@
                     self.rilog("truncated record", pos, fileindex)
                     break
                 vdlen = unpack(">i", vdlen)[0]
-                if vlen+dlen+43+vdlen != tlen:
+                if vlen + dlen + 43 + len(oid) + vdlen != tlen:
                     self.rilog("inconsistent lengths", pos, fileindex)
                     break
                 seek(vdlen, 1)
@@ -693,7 +727,7 @@
                     break
             else:
                 if h[8] in 'vn' and vlen == 0:
-                    if dlen+31 != tlen:
+                    if dlen + len(oid) + 31 != tlen:
                         self.rilog("inconsistent nv lengths", pos, fileindex)
                     seek(dlen, 1)
                     if read(4) != h[9:13]:


=== Zope3/src/zodb/zeo/client.py 1.11 => 1.11.2.1 ===
--- Zope3/src/zodb/zeo/client.py:1.11	Mon May 19 11:02:51 2003
+++ Zope3/src/zodb/zeo/client.py	Sun Jun 22 10:22:30 2003
@@ -30,9 +30,10 @@
 import time
 import logging
 
-from zope.interface import directlyProvides
+from zope.interface import directlyProvides, implements
 
 from zodb.zeo import cache
+from zodb.zeo.auth import get_module
 from zodb.zeo.stubs import StorageServerStub
 from zodb.zeo.tbuf import TransactionBuffer
 from zodb.zeo.zrpc.client import ConnectionManager
@@ -57,7 +58,7 @@
     the argument.
     """
     t = time.time()
-    t = apply(TimeStamp, (time.gmtime(t)[:5] + (t % 60,)))
+    t = TimeStamp(*time.gmtime(t)[:5] + (t % 60,))
     if prev_ts is not None:
         t = t.laterThan(prev_ts)
     return t
@@ -98,15 +99,16 @@
 
     # The exact storage interfaces depend on the server that the client
     # connects to.  We know that every storage must implement IStorage,
-    # but once connected we may change the instance's __implements__
-    # to reflect features available on the storage.
+    # but once connected we may change the interfaces that the instance
+    # provides to reflect features available on the storage.
 
-    __implements__ = IStorage
+    implements(IStorage)
 
     def __init__(self, addr, storage='1', cache_size=20 * MB,
                  name='', client=None, var=None,
                  min_disconnect_poll=5, max_disconnect_poll=300,
-                 wait=True, read_only=False, read_only_fallback=False):
+                 wait=True, read_only=False, read_only_fallback=False,
+                 username='', password='', realm=None):
 
         """ClientStorage constructor.
 
@@ -161,6 +163,17 @@
             writable storages are available.  Defaults to false.  At
             most one of read_only and read_only_fallback should be
             true.
+
+        username -- string with username to be used when authenticating.
+            These only need to be provided if you are connecting to an
+            authenticated server storage.
+
+        password -- string with plaintext password to be used
+            when authenticated.
+
+        Note that the authentication protocol is defined by the server
+        and is detected by the ClientStorage upon connecting (see
+        testConnection() and doAuth() for details).
         """
 
         self.logger = logging.getLogger("ZCS.%d" % os.getpid())
@@ -196,9 +209,15 @@
         self._pending_server = None
         self._ready = threading.Event()
 
+        # _is_read_only stores the constructor argument
         self._is_read_only = read_only
+        # _conn_is_read_only stores the status of the current connection
+        self._conn_is_read_only = 0
         self._storage = storage
         self._read_only_fallback = read_only_fallback
+        self._username = username
+        self._password = password
+        self._realm = realm
         # _server_addr is used by sortKey()
         self._server_addr = None
         self._tfile = None
@@ -233,6 +252,21 @@
         self._oid_lock = threading.Lock()
         self._oids = [] # Object ids retrieved from newObjectIds()
 
+        # load() and tpc_finish() must be serialized to guarantee
+        # that cache modifications from each occur atomically.
+        # It also prevents multiple load calls occuring simultaneously,
+        # which simplifies the cache logic.
+        self._load_lock = threading.Lock()
+        # _load_oid and _load_status are protected by _lock
+        self._load_oid = None
+        self._load_status = None
+
+        # Can't read data in one thread while writing data
+        # (tpc_finish) in another thread.  In general, the lock
+        # must prevent access to the cache while _update_cache
+        # is executing.
+        self._lock = threading.Lock()
+
         t = self._ts = get_timestamp()
         self._serial = `t`
         self._oid = '\0\0\0\0\0\0\0\0'
@@ -247,12 +281,6 @@
                                                     tmax=max_disconnect_poll)
 
         if wait:
-            self._rpc_mgr.connect(sync=1)
-        else:
-            if not self._rpc_mgr.attempt_connect():
-                self._rpc_mgr.connect()
-
-        if wait:
             self._wait()
         else:
             # attempt_connect() will make an attempt that doesn't block
@@ -279,18 +307,21 @@
                     break
                 self.logger.warn("Wait for cache verification to finish")
         else:
-            # If there is no mainloop running, this code needs
-            # to call poll() to cause asyncore to handle events.
-            while 1:
-                if self._ready.isSet():
-                    break
-                self.logger.warn("Wait for cache verification to finish")
-                if self._connection is None:
-                    # If the connection was closed while we were
-                    # waiting for it to become ready, start over.
-                    return self._wait()
-                else:
-                    self._connection.pending(30)
+            self._wait_sync()
+
+    def _wait_sync(self):
+        # If there is no mainloop running, this code needs
+        # to call poll() to cause asyncore to handle events.
+        while 1:
+            if self._ready.isSet():
+                break
+            self.logger.warn("Wait for cache verification to finish")
+            if self._connection is None:
+                # If the connection was closed while we were
+                # waiting for it to become ready, start over.
+                return self._wait()
+            else:
+                self._connection.pending(30)
 
     def close(self):
         """Storage API: finalize the storage, releasing external resources."""
@@ -330,6 +361,29 @@
         if cn is not None:
             cn.pending()
 
+    def doAuth(self, protocol, stub):
+        if not (self._username and self._password):
+            raise AuthError, "empty username or password"
+
+        module = get_module(protocol)
+        if not module:
+            log2(PROBLEM, "%s: no such an auth protocol: %s" %
+                 (self.__class__.__name__, protocol))
+            return
+
+        storage_class, client, db_class = module
+
+        if not client:
+            log2(PROBLEM,
+                 "%s: %s isn't a valid protocol, must have a Client class" %
+                 (self.__class__.__name__, protocol))
+            raise AuthError, "invalid protocol"
+
+        c = client(stub)
+
+        # Initiate authentication, returns boolean specifying whether OK
+        return c.start(self._username, self._realm, self._password)
+
     def testConnection(self, conn):
         """Internal: test the given connection.
 
@@ -353,7 +407,18 @@
         """
         self.logger.warn("Testing connection %r", conn)
         # XXX Check the protocol version here?
+        self._conn_is_read_only = 0
         stub = self.StorageServerStubClass(conn)
+        
+        auth = stub.getAuthProtocol()
+        self.logger.info("Client authentication successful")
+        if auth:
+            if self.doAuth(auth, stub):
+                self.logger.info("Client authentication successful")
+            else:
+                self.logger.error("Authentication failed")
+                raise AuthError, "Authentication failed"
+
         try:
             stub.register(str(self._storage), self._is_read_only)
             return 1
@@ -363,6 +428,7 @@
             self.logger.warn(
                 "Got ReadOnlyError; trying again with read_only=1")
             stub.register(str(self._storage), read_only=1)
+            self._conn_is_read_only = 1
             return 0
 
     def notifyConnected(self, conn):
@@ -401,9 +467,18 @@
         self._info.update(stub.get_info())
         self.update_interfaces()
         self.verify_cache(stub)
+        if not conn.is_async():
+            self.logger.warn("Waiting for cache verification to finish")
+            self._wait_sync()
+        self._handle_extensions()
+
+    def _handle_extensions(self):
+        for name in self.getExtensionMethods().keys():
+            if not hasattr(self, name):
+                setattr(self, name, self._server.extensionMethod(name))
 
     def update_interfaces(self):
-        # Update instance's __implements__ based on the server.
+        # Update what interfaces the instance provides based on the server.
         L = [IStorage]
         for name in self._info.get("implements", ()):
             if name == "IUndoStorage":
@@ -532,12 +607,14 @@
         return self._info['extensionMethods']
 
     def isReadOnly(self):
-        """Storage API: return whether we are in read-only mode.
-
-        XXX In read-only fallback mode, this returns false, even if we
-        are currently connected to a read-only server.
-        """
-        return self._is_read_only
+        """Storage API: return whether we are in read-only mode."""
+        if self._is_read_only:
+            return 1
+        else:
+            # If the client is configured for a read-write connection
+            # but has a read-only fallback connection, _conn_is_read_only
+            # will be True.
+            return self._conn_is_read_only
 
     # XXX version should really be part of _info
 
@@ -593,12 +670,6 @@
         """
         return self._server.history(oid, version, length)
 
-    def __getattr__(self, name):
-        if self.getExtensionMethods().has_key(name):
-            return self._server.extensionMethod(name)
-        else:
-            raise AttributeError(name)
-
     def loadSerial(self, oid, serial):
         """Storage API: load a historical revision of an object."""
         return self._server.loadSerial(oid, serial)
@@ -614,14 +685,39 @@
         specified by the given object id and version, if they exist;
         otherwise a KeyError is raised.
         """
-        p = self._cache.load(oid, version)
-        if p:
-            return p
+        self._lock.acquire()    # for atomic processing of invalidations
+        try:
+            pair = self._cache.load(oid, version)
+            if pair:
+                return pair
+        finally:
+            self._lock.release()
+
         if self._server is None:
             raise ClientDisconnected()
-        p, s, v, pv, sv = self._server.zeoLoad(oid)
-        self._cache.checkSize(0)
-        self._cache.store(oid, p, s, v, pv, sv)
+
+        self._load_lock.acquire()
+        try:
+            self._lock.acquire()
+            try:
+                self._load_oid = oid
+                self._load_status = 1
+            finally:
+                self._lock.release()
+
+            p, s, v, pv, sv = self._server.zeoLoad(oid)
+
+            self._lock.acquire()    # for atomic processing of invalidations
+            try:
+                if self._load_status:
+                    self._cache.checkSize(0)
+                    self._cache.store(oid, p, s, v, pv, sv)
+                self._load_oid = None
+            finally:
+                self._lock.release()
+        finally:
+            self._load_lock.release()
+
         if v and version and v == version:
             return pv, sv
         else:
@@ -634,9 +730,13 @@
 
         If no version modified the object, return an empty string.
         """
-        v = self._cache.modifiedInVersion(oid)
-        if v is not None:
-            return v
+        self._lock.acquire()
+        try:
+            v = self._cache.modifiedInVersion(oid)
+            if v is not None:
+                return v
+        finally:
+            self._lock.release()
         return self._server.modifiedInVersion(oid)
 
     def newObjectId(self):
@@ -733,6 +833,7 @@
 
         self._serial = id
         self._seriald.clear()
+        self._tbuf.clear()
         del self._serials[:]
 
     def end_transaction(self):
@@ -772,18 +873,23 @@
         """Storage API: finish a transaction."""
         if transaction is not self._transaction:
             return
+        self._load_lock.acquire()
         try:
-            if f is not None:
-                f()
+            self._lock.acquire()  # for atomic processing of invalidations
+            try:
+                self._update_cache()
+                if f is not None:
+                    f()
+            finally:
+                self._lock.release()
 
             tid = self._server.tpcFinish(self._serial)
+            self._cache.setLastTid(tid)
 
             r = self._check_serials()
             assert r is None or len(r) == 0, "unhandled serialnos: %s" % r
-
-            self._update_cache()
-            self._cache.setLastTid(tid)
         finally:
+            self._load_lock.release()
             self.end_transaction()
 
     def _update_cache(self):
@@ -792,6 +898,13 @@
         This iterates over the objects in the transaction buffer and
         update or invalidate the cache.
         """
+        # Must be called with _lock already acquired.
+
+        # XXX not sure why _update_cache() would be called on
+        # a closed storage.
+        if self._cache is None:
+            return
+
         self._cache.checkSize(self._tbuf.get_size())
         try:
             self._tbuf.begin_iterate()
@@ -880,44 +993,69 @@
             return
         self._pickler.dump(args)
 
+    def _process_invalidations(self, invs):
+        # Invalidations are sent by the ZEO server as a sequence of
+        # oid, version pairs.  The DB's invalidate() method expects a
+        # dictionary of oids.
+
+        self._lock.acquire()
+        try:
+            # versions maps version names to dictionary of invalidations
+            versions = {}
+            for oid, version in invs:
+                if oid == self._load_oid:
+                    self._load_status = 0
+                self._cache.invalidate(oid, version=version)
+                versions.setdefault(version, {})[oid] = 1
+
+            if self._db is not None:
+                for v, d in versions.items():
+                    self._db.invalidate(d, version=v)
+        finally:
+            self._lock.release()
+
     def endVerify(self):
         """Server callback to signal end of cache validation."""
         if self._pickler is None:
             return
-        self._pickler.dump((0,0))
+        # write end-of-data marker
+        self._pickler.dump((None, None))
         self._pickler = None
         self._tfile.seek(0)
-        unpick = cPickle.Unpickler(self._tfile)
         f = self._tfile
         self._tfile = None
-
-        while 1:
-            oid, version = unpick.load()
-            self.logger.debug("verify invalidate %r", oid)
-            if not oid:
-                break
-            self._cache.invalidate(oid, version=version)
-            if self._db is not None:
-                self._db.invalidate(oid, version=version)
+        self._process_invalidations(InvalidationLogIterator(f))
         f.close()
 
+        self.logger.info("endVerify finishing")
         self._server = self._pending_server
         self._ready.set()
         self._pending_conn = None
-        self.logger.debug("verification finished")
+        self.logger.info("endVerify finished")
 
     def invalidateTransaction(self, tid, args):
         """Invalidate objects modified by tid."""
         self._cache.setLastTid(tid)
         if self._pickler is not None:
-            self.logger.info("Transactional invalidation "
-                             "during cache verification")
+            self.logger.debug(
+                "Transactional invalidation during cache verification")
             for t in args:
-                self.self._pickler.dump(t)
+                self._pickler.dump(t)
             return
-        db = self._db
-        for oid, version in args:
-            self._cache.invalidate(oid, version=version)
-            if db is not None:
-                db.invalidate(oid, version=version)
+        self._process_invalidations(args)
+
+class InvalidationLogIterator:
+    """Helper class for reading invalidations in endVerify."""
+
+    def __init__(self, fileobj):
+        self._unpickler = cPickle.Unpickler(fileobj)
+
+    def __iter__(self):
+        return self
+
+    def next(self):
+        oid, version = self._unpickler.load()
+        if oid is None:
+            raise StopIteration
+        return oid, version
 


=== Zope3/src/zodb/zeo/interfaces.py 1.3 => 1.3.26.1 ===
--- Zope3/src/zodb/zeo/interfaces.py:1.3	Tue Feb 25 13:55:05 2003
+++ Zope3/src/zodb/zeo/interfaces.py	Sun Jun 22 10:22:30 2003
@@ -27,3 +27,5 @@
 class ClientDisconnected(ClientStorageError):
     """The database storage is disconnected from the storage."""
 
+class AuthError(StorageError):
+    """The client provided invalid authentication credentials."""


=== Zope3/src/zodb/zeo/server.py 1.11 => 1.11.2.1 ===
--- Zope3/src/zodb/zeo/server.py:1.11	Mon May 19 11:02:51 2003
+++ Zope3/src/zodb/zeo/server.py	Sun Jun 22 10:22:30 2003
@@ -44,7 +44,7 @@
 from zodb.utils import u64
 from zodb.ztransaction import Transaction
 
-from zope.interface.implements import objectImplements
+from zope.interface import providedBy
 
 from transaction.interfaces import TransactionError
 
@@ -58,7 +58,11 @@
 
     ClientStorageStubClass = ClientStorageStub
 
-    def __init__(self, server, read_only=0):
+    # A list of extension methods.  A subclass with extra methods
+    # should override.
+    extensions = []
+
+    def __init__(self, server, read_only=0, auth_realm=None):
         self.server = server
         # timeout and stats will be initialized in register()
         self.timeout = None
@@ -73,7 +77,22 @@
         self.verifying = 0
         self.logger = logging.getLogger("ZSS.%d.ZEO" % os.getpid())
         self.log_label = ""
+        self.authenticated = 0
+        self.auth_realm = auth_realm
+        # The authentication protocol may define extra methods.
+        self._extensions = {}
+        for func in self.extensions:
+            self._extensions[func.func_name] = None
+
+    def finish_auth(self, authenticated):
+        if not self.auth_realm:
+            return 1
+        self.authenticated = authenticated
+        return authenticated
 
+    def set_database(self, database):
+        self.database = database
+        
     def notifyConnected(self, conn):
         self.connection = conn # For restart_other() below
         self.client = self.ClientStorageStubClass(conn)
@@ -110,6 +129,7 @@
         """Delegate several methods to the storage"""
         self.versionEmpty = self.storage.versionEmpty
         self.versions = self.storage.versions
+        self.getSerial = self.storage.getSerial
         self.load = self.storage.load
         self.modifiedInVersion = self.storage.modifiedInVersion
         self.getVersion = self.storage.getVersion
@@ -125,9 +145,11 @@
             # can be removed
             pass
         else:
-            for name in fn().keys():
-                if not hasattr(self,name):
-                    setattr(self, name, getattr(self.storage, name))
+            d = fn()
+            self._extensions.update(d)
+            for name in d.keys():
+                assert not hasattr(self, name)
+                setattr(self, name, getattr(self.storage, name))
         self.lastTransaction = self.storage.lastTransaction
 
     def _check_tid(self, tid, exc=None):
@@ -149,6 +171,15 @@
                 return 0
         return 1
 
+    def getAuthProtocol(self):
+        """Return string specifying name of authentication module to use.
+
+        The module name should be auth_%s where %s is auth_protocol."""
+        protocol = self.server.auth_protocol
+        if not protocol or protocol == 'none':
+            return None
+        return protocol
+    
     def register(self, storage_id, read_only):
         """Select the storage that this client will use
 
@@ -173,19 +204,14 @@
                                                                    self)
 
     def get_info(self):
-        return {'name': self.storage.getName(),
-                'extensionMethods': self.getExtensionMethods(),
+        return {"name": self.storage.getName(),
+                "extensionMethods": self.getExtensionMethods(),
                  "implements": [iface.__name__
-                                for iface in objectImplements(self.storage)],
+                                for iface in providedBy(self.storage)],
                 }
 
     def getExtensionMethods(self):
-        try:
-            e = self.storage.getExtensionMethods
-        except AttributeError:
-            return {}
-        else:
-            return e()
+        return self._extensions
 
     def zeoLoad(self, oid):
         self.stats.loads += 1
@@ -564,7 +590,10 @@
     def __init__(self, addr, storages, read_only=0,
                  invalidation_queue_size=100,
                  transaction_timeout=None,
-                 monitor_address=None):
+                 monitor_address=None,
+                 auth_protocol=None,
+                 auth_filename=None,
+                 auth_realm=None):
         """StorageServer constructor.
 
         This is typically invoked from the start.py script.
@@ -606,6 +635,21 @@
             should listen.  If specified, a monitor server is started.
             The monitor server provides server statistics in a simple
             text format.
+
+        auth_protocol -- The name of the authentication protocol to use.
+            Examples are "digest" and "srp".
+            
+        auth_filename -- The name of the password database filename.
+            It should be in a format compatible with the authentication
+            protocol used; for instance, "sha" and "srp" require different
+            formats.
+            
+            Note that to implement an authentication protocol, a server
+            and client authentication mechanism must be implemented in a
+            auth_* module, which should be stored inside the "auth"
+            subdirectory. This module may also define a DatabaseClass
+            variable that should indicate what database should be used
+            by the authenticator.
         """
 
         self.addr = addr
@@ -621,6 +665,12 @@
         for s in storages.values():
             s._waiting = []
         self.read_only = read_only
+        self.auth_protocol = auth_protocol
+        self.auth_filename = auth_filename
+        self.auth_realm = auth_realm
+        self.database = None
+        if auth_protocol:
+            self._setup_auth(auth_protocol)
         # A list of at most invalidation_queue_size invalidations
         self.invq = []
         self.invq_bound = invalidation_queue_size
@@ -643,6 +693,40 @@
         else:
             self.monitor = None
 
+    def _setup_auth(self, protocol):
+        # Can't be done in global scope, because of cyclic references
+        from zodb.zeo.auth import get_module
+
+        name = self.__class__.__name__
+
+        module = get_module(protocol)
+        if not module:
+            self.logger.info("%s: no such an auth protocol: %s",
+                             name, protocol)
+            return
+        
+        storage_class, client, db_class = module
+        
+        if not storage_class or not issubclass(storage_class, ZEOStorage):
+            self.logger.info("%s: %s isn't a valid protocol, "
+                             "must have a StorageClass", name, protocol)
+            self.auth_protocol = None
+            return
+        self.ZEOStorageClass = storage_class
+
+        self.logger.info("%s: using auth protocol: %s", name, protocol)
+        
+        # We create a Database instance here for use with the authenticator
+        # modules. Having one instance allows it to be shared between multiple
+        # storages, avoiding the need to bloat each with a new authenticator
+        # Database that would contain the same info, and also avoiding any
+        # possibly synchronization issues between them.
+        self.database = db_class(self.auth_filename)
+        if self.database.realm != self.auth_realm:
+            raise ValueError("password database realm %r "
+                             "does not match storage realm %r"
+                             % (self.database.realm, self.auth_realm))
+
     def new_connection(self, sock, addr):
         """Internal: factory to create a new connection.
 
@@ -650,8 +734,13 @@
         whenever accept() returns a socket for a new incoming
         connection.
         """
-        z = self.ZEOStorageClass(self, self.read_only)
-        c = self.ManagedServerConnectionClass(sock, addr, z, self)
+        if self.auth_protocol and self.database:
+            zstorage = self.ZEOStorageClass(self, self.read_only,
+                                            auth_realm=self.auth_realm)
+            zstorage.set_database(self.database)
+        else:
+            zstorage = self.ZEOStorageClass(self, self.read_only)
+        c = self.ManagedServerConnectionClass(sock, addr, zstorage, self)
         self.logger.warn("new connection %s: %s", addr, `c`)
         return c
 
@@ -749,7 +838,8 @@
             try:
                 s.close()
             except:
-                pass
+                self.logger.exception(
+                    "Unexpected error shutting down mainloop")
 
     def close_conn(self, conn):
         """Internal: remove the given connection from self.connections.


=== Zope3/src/zodb/zeo/stubs.py 1.7 => 1.7.2.1 ===
--- Zope3/src/zodb/zeo/stubs.py:1.7	Mon May 19 11:02:51 2003
+++ Zope3/src/zodb/zeo/stubs.py	Sun Jun 22 10:22:30 2003
@@ -52,7 +52,7 @@
         self.rpc.callAsync('endVerify')
 
     def invalidateTransaction(self, tid, invlist):
-        self.rpc.callAsync('invalidateTransaction', tid, invlist)
+        self.rpc.callAsyncNoPoll('invalidateTransaction', tid, invlist)
 
     def serialnos(self, arg):
         self.rpc.callAsync('serialnos', arg)
@@ -102,6 +102,12 @@
 
     def get_info(self):
         return self.rpc.call('get_info')
+
+    def getAuthProtocol(self):
+        return self.rpc.call('getAuthProtocol')
+    
+    def lastTransaction(self):
+        return self.rpc.call('lastTransaction')
 
     def getInvalidations(self, tid):
         return self.rpc.call('getInvalidations', tid)


=== Zope3/src/zodb/zeo/threadedasync.py 1.2 => 1.2.40.1 ===
--- Zope3/src/zodb/zeo/threadedasync.py:1.2	Wed Dec 25 09:12:22 2002
+++ Zope3/src/zodb/zeo/threadedasync.py	Sun Jun 22 10:22:30 2003
@@ -52,7 +52,7 @@
     _loop_lock.acquire()
     try:
         if _looping is not None:
-            apply(callback, (_looping,) + args, kw or {})
+            callback(_looping, *args, **(kw or {}))
         else:
             _loop_callbacks.append((callback, args, kw))
     finally:
@@ -65,7 +65,7 @@
         _looping = map
         while _loop_callbacks:
             cb, args, kw = _loop_callbacks.pop()
-            apply(cb, (map,) + args, kw or {})
+            cb(map, *args, **(kw or {}))
     finally:
         _loop_lock.release()