[Zodb-checkins] SVN: ZODB/branches/ctheune-blobsupport/s - different cleanups

Christian Theune ct at gocept.com
Mon Mar 21 23:16:04 EST 2005


Log message for revision 29637:
   - different cleanups
   - merged from head
   - added configuration methods to configure a blobfilestorage
   - made stuff work ;)
  
  

Changed:
  U   ZODB/branches/ctheune-blobsupport/setup.py
  U   ZODB/branches/ctheune-blobsupport/src/ZODB/BaseStorage.py
  U   ZODB/branches/ctheune-blobsupport/src/ZODB/Blobs/Blob.py
  U   ZODB/branches/ctheune-blobsupport/src/ZODB/Blobs/BlobStorage.py
  U   ZODB/branches/ctheune-blobsupport/src/ZODB/Blobs/TODO.txt
  U   ZODB/branches/ctheune-blobsupport/src/ZODB/Blobs/interfaces.py
  U   ZODB/branches/ctheune-blobsupport/src/ZODB/Blobs/tests/connection.txt
  U   ZODB/branches/ctheune-blobsupport/src/ZODB/Blobs/tests/test_doctests.py
  U   ZODB/branches/ctheune-blobsupport/src/ZODB/Connection.py
  U   ZODB/branches/ctheune-blobsupport/src/ZODB/DB.py
  U   ZODB/branches/ctheune-blobsupport/src/ZODB/FileStorage/FileStorage.py
  U   ZODB/branches/ctheune-blobsupport/src/ZODB/FileStorage/format.py
  U   ZODB/branches/ctheune-blobsupport/src/ZODB/TmpStore.py
  U   ZODB/branches/ctheune-blobsupport/src/ZODB/component.xml
  U   ZODB/branches/ctheune-blobsupport/src/ZODB/config.py
  U   ZODB/branches/ctheune-blobsupport/src/ZODB/interfaces.py
  A   ZODB/branches/ctheune-blobsupport/src/ZODB/tests/loggingsupport.py
  A   ZODB/branches/ctheune-blobsupport/src/ZODB/tests/multidb.txt
  U   ZODB/branches/ctheune-blobsupport/src/ZODB/tests/testConnection.py
  U   ZODB/branches/ctheune-blobsupport/src/ZODB/tests/test_doctest_files.py
  U   ZODB/branches/ctheune-blobsupport/src/ZODB/utils.py
  U   ZODB/branches/ctheune-blobsupport/src/persistent/interfaces.py
  U   ZODB/branches/ctheune-blobsupport/src/transaction/interfaces.py
  A   ZODB/branches/ctheune-blobsupport/src/zope/proxy/
  _U  ZODB/branches/ctheune-blobsupport/src/zope/proxy/SETUP.cfg
  _U  ZODB/branches/ctheune-blobsupport/src/zope/proxy/__init__.py
  _U  ZODB/branches/ctheune-blobsupport/src/zope/proxy/_zope_proxy_proxy.c
  _U  ZODB/branches/ctheune-blobsupport/src/zope/proxy/interfaces.py
  _U  ZODB/branches/ctheune-blobsupport/src/zope/proxy/proxy.h
  _U  ZODB/branches/ctheune-blobsupport/src/zope/proxy/tests/__init__.py
  _U  ZODB/branches/ctheune-blobsupport/src/zope/proxy/tests/test_proxy.py

-=-
Modified: ZODB/branches/ctheune-blobsupport/setup.py
===================================================================
--- ZODB/branches/ctheune-blobsupport/setup.py	2005-03-21 23:51:47 UTC (rev 29636)
+++ ZODB/branches/ctheune-blobsupport/setup.py	2005-03-22 04:16:03 UTC (rev 29637)
@@ -128,21 +128,37 @@
             sources= ['src/zope/interface/_zope_interface_coptimizations.c']
             )
 
-exts += [cPersistence, cPickleCache, TimeStamp, winlock, cZopeInterface]
+cZopeProxy = Extension(
+            name = 'zope.proxy._zope_proxy_proxy',
+            sources= ['src/zope/proxy/_zope_proxy_proxy.c']
+            )
 
+exts += [cPersistence,
+         cPickleCache,
+         TimeStamp,
+         winlock,
+         cZopeInterface,
+         cZopeProxy,
+        ]
+
 # The ZODB.zodb4 code is not being packaged, because it is only
 # need to convert early versions of Zope3 databases to ZODB3.
 
 packages = ["BTrees", "BTrees.tests",
             "ZEO", "ZEO.auth", "ZEO.zrpc", "ZEO.tests",
-            "ZODB", "ZODB.FileStorage", "ZODB.Blobs",
+            "ZODB", "ZODB.FileStorage", "ZODB.Blobs", "ZODB.Blobs.tests",
             "ZODB.tests",
             "Persistence", "Persistence.tests",
             "persistent", "persistent.tests",
             "transaction", "transaction.tests",
             "ThreadedAsync",
             "zdaemon", "zdaemon.tests",
-            "zope", "zope.interface", "zope.testing",
+
+            "zope",
+            "zope.interface", "zope.interface.tests",
+            "zope.proxy", "zope.proxy.tests",
+            "zope.testing",
+
             "ZopeUndo", "ZopeUndo.tests",
             "ZConfig", "ZConfig.tests",
             "ZConfig.components",
@@ -187,6 +203,7 @@
         "ZODB/tests",
         "zdaemon",
         "zdaemon/tests",
+        "zope/interface", "zope/interface/tests",
         ]:
         dir = convert_path(dir)
         inputdir = os.path.join("src", dir)

Modified: ZODB/branches/ctheune-blobsupport/src/ZODB/BaseStorage.py
===================================================================
--- ZODB/branches/ctheune-blobsupport/src/ZODB/BaseStorage.py	2005-03-21 23:51:47 UTC (rev 29636)
+++ ZODB/branches/ctheune-blobsupport/src/ZODB/BaseStorage.py	2005-03-22 04:16:03 UTC (rev 29637)
@@ -252,6 +252,12 @@
         pass
 
     def tpc_finish(self, transaction, f=None):
+        # It's important that the storage calls the function we pass
+        # while it still has its lock.  We don't want another thread
+        # to be able to read any updated data until we've had a chance
+        # to send an invalidation message to all of the other
+        # connections!
+
         self._lock_acquire()
         try:
             if transaction is not self._transaction:

Modified: ZODB/branches/ctheune-blobsupport/src/ZODB/Blobs/Blob.py
===================================================================
--- ZODB/branches/ctheune-blobsupport/src/ZODB/Blobs/Blob.py	2005-03-21 23:51:47 UTC (rev 29636)
+++ ZODB/branches/ctheune-blobsupport/src/ZODB/Blobs/Blob.py	2005-03-22 04:16:03 UTC (rev 29637)
@@ -1,5 +1,6 @@
 
 import os
+import tempfile
 
 from zope.interface import implements
 
@@ -8,33 +9,22 @@
 from ZODB import utils
 from persistent import Persistent
 
-class TempFileHandler(object):
-    """Handles holding a tempfile around.
+try:
+    from ZPublisher.Iterators import IStreamIterator
+except ImportError:
+    IStreamIterator = None
 
-    The tempfile is unlinked when the tempfilehandler is GCed.
-    """
-    
-    def __init__(self, directory, mode)
-        self.handle, self.filename = tempfile.mkstemp(dir=directory,
-                                                      text=mode)
-        
-    def __del__(self):
-        self.handle
-        os.unlink(self.filename)
-
 class Blob(Persistent):
  
     implements(IBlob)
 
-    def __init__(self):
-        self._p_blob_readers = 0
-        self._p_blob_writers = 0
-        self._p_blob_uncommitted = None
-        self._p_blob_data = None
+    _p_blob_readers = 0
+    _p_blob_writers = 0
+    _p_blob_uncommitted = None
+    _p_blob_data = None
 
     def open(self, mode):
         """Returns a file(-like) object for handling the blob data."""
-
         if mode == "r":
             if self._current_filename() is None:
                 raise BlobError, "Blob does not exist."
@@ -43,17 +33,17 @@
                 raise BlobError, "Already opened for writing."
 
             self._p_blob_readers += 1
-            return BlobTempFile(self._current_filename(), "rb", self)
+            return BlobFile(self._current_filename(), "rb", self)
 
         if mode == "w":
             if self._p_blob_readers != 0:
                 raise BlobError, "Already opened for reading."
 
             if self._p_blob_uncommitted is None:
-                self._p_blob_uncommitted = self._get_uncommitted_filename()
+                self._p_blob_uncommitted = utils.mktemp()
 
             self._p_blob_writers += 1
-            return BlobTempFile(self._p_blob_uncommitted, "wb", self)
+            return BlobFile(self._p_blob_uncommitted, "wb", self)
 
         if mode =="a":
             if self._current_filename() is None:
@@ -62,15 +52,15 @@
             if self._p_blob_readers != 0:
                 raise BlobError, "Already opened for reading."
 
-            if not self._p_blob_uncommitted:
+            if self._p_blob_uncommitted is None:
                 # Create a new working copy
-                self._p_blob_uncommitted = self._get_uncommitted_filename()
-                uncommitted = BlobTempFile(self._p_blob_uncommitted, "wb", self)
+                self._p_blob_uncommitted = utils.mktmp()
+                uncommitted = BlobFile(self._p_blob_uncommitted, "wb", self)
                 utils.cp(file(self._p_blob_data), uncommitted)
                 uncommitted.seek(0)
             else:
                 # Re-use existing working copy
-                uncommitted = BlobTempFile(self._p_blob_uncommitted, "ab", self)
+                uncommitted = BlobFile(self._p_blob_uncommitted, "ab", self)
             
             self._p_blob_writers +=1
             return uncommitted
@@ -80,28 +70,29 @@
     def _current_filename(self):
         return self._p_blob_uncommitted or self._p_blob_data
 
-    def _get_uncommitted_filename(self):
-        return os.tempnam()
+class BlobFile(file):
 
-class BlobFileBase:
-
     # XXX those files should be created in the same partition as
     # the storage later puts them to avoid copying them ...
 
+    if IStreamIterator is not None:
+        __implements__ = (IStreamIterator,)
+
     def __init__(self, name, mode, blob):
-        file.__init__(self, name, mode)
+        super(BlobFile, self).__init__(name, mode)
         self.blob = blob
+        self.streamsize = 1<<16
 
     def write(self, data):
-        file.write(self, data)
+        super(BlobFile, self).write(data)
         self.blob._p_changed = 1
 
     def writelines(self, lines):
-        file.writelines(self, lines)
+        super(BlobFile, self).writelines(lines)
         self.blob._p_changed = 1
 
     def truncate(self, size):
-        file.truncate(self, size)
+        super(BlobFile, self).truncate(size)
         self.blob._p_changed = 1
         
     def close(self):
@@ -110,15 +101,20 @@
             self.blob._p_blob_writers -= 1
         else:
             self.blob._p_blob_readers -= 1
-        file.close(self)
+        super(BlobFile, self).close()
 
-class BlobFile(BlobFileBase, file):
-    pass
+    def next(self):
+        data = self.read(self.streamsize)
+        if not data:
+            self.blob._p_blob_readers -= 1
+            raise StopIteration
+        return data
 
-class BlobTempFile(BlobFileBase, NamedTempFile)
-    pass
+    def __len__(self):
+        cur_pos = self.tell()
+        self.seek(0, 2)
+        size = self.tell()
+        self.seek(cur_pos, 0)
+        return size
 
-def copy_file(old, new):
-    for chunk in old.read(4096):
-        new.write(chunk)
-    new.seek(0)
+

Modified: ZODB/branches/ctheune-blobsupport/src/ZODB/Blobs/BlobStorage.py
===================================================================
--- ZODB/branches/ctheune-blobsupport/src/ZODB/Blobs/BlobStorage.py	2005-03-21 23:51:47 UTC (rev 29636)
+++ ZODB/branches/ctheune-blobsupport/src/ZODB/Blobs/BlobStorage.py	2005-03-22 04:16:03 UTC (rev 29637)
@@ -12,41 +12,85 @@
 #
 ##############################################################################
 
+import os
+
 from zope.interface import implements
-from zope.proxy import ProxyBase
+from zope.proxy import ProxyBase, getProxiedObject
 
-from ZODB.interfaces import \
-        IStorageAdapter, IUndoableStorage, IVersioningStorage, IBlobStorage
+from ZODB import utils
+from ZODB.Blobs.interfaces import IBlobStorage, IBlob
 
 class BlobStorage(ProxyBase):
     """A storage to support blobs."""
 
     implements(IBlobStorage)
 
-    __slots__ = ('base_directory',)
+    __slots__ = ('base_directory', 'dirty_oids')
 
-    def __init__(self, base_directory, storage):
+    def __new__(self, base_directory, storage):
+        return ProxyBase.__new__(self, storage)
+
+    def __init__(self, base_directory, storage):    
+        # TODO Complain if storage is ClientStorage
         ProxyBase.__init__(self, storage)
         self.base_directory = base_directory
+        self.dirty_oids = []
         
-    def storeBlob(oid, serial, data, blob, version, transaction):
+    def storeBlob(self, oid, oldserial, data, blobfilename, version, transaction):
         """Stores data that has a BLOB attached."""
-        if transaction is not self._transaction:
-            raise POSException.StorageTransactionError(self, transaction)
+        serial = self.store(oid, oldserial, data, version, transaction)
+        assert isinstance(serial, str) # XXX in theory serials could be 
+                                       # something else
 
         self._lock_acquire()
         try:
-            # 
+            targetname = self._getCleanFilename(oid, serial)
+            try:
+                os.rename(blobfilename, targetname)
+            except OSError:
+                target = file(targetname, "wb")
+                source = file(blobfilename, "rb")
+                utils.cp(blobfile, target)
+                target.close()
+                source.close()
+                os.unlink(blobfilename)
 
-
+            # XXX if oid already in there, something is really hosed.
+            # The underlying storage should have complained anyway
+            self.dirty_oids.append((oid, serial))
         finally:
             self._lock_release()
         return self._tid
 
+    def _getDirtyFilename(self, oid):
+        """Generate an intermediate filename for two-phase commit.
 
+        XXX Not used right now due to conceptual flux. Please keep it around
+        anyway. 
+        """
+        return self._getCleanFilename(oid, "store")
 
+    def _getCleanFilename(self, oid, tid):
+        return "%s/%s-%s.blob" % \
+                (self.base_directory, 
+                 utils.oid_repr(oid),
+                 utils.tid_repr(tid),
+                 )
 
+    def _finish(self, tid, u, d, e): 
+        ProxyBase._finish(self, tid, u, d, e)
+        self.dirty_blobs = []
 
-    def loadBlob(oid, serial, version, blob):
-        """Loads the BLOB data for 'oid' into the given blob object.
+    def _abort(self):
+        ProxyBase._abort(self)
+
+        # Throw away the stuff we'd had committed
+        while self.dirty_blobs:
+            oid, serial = self.dirty_blobs.pop()
+            os.unlink(self._getCleanFilename(oid))
+        
+    def loadBlob(self, oid, serial, version):
+        """Return the filename where the blob file can be found.
         """
+        return self._getCleanFilename(oid, serial)
+

Modified: ZODB/branches/ctheune-blobsupport/src/ZODB/Blobs/TODO.txt
===================================================================
--- ZODB/branches/ctheune-blobsupport/src/ZODB/Blobs/TODO.txt	2005-03-21 23:51:47 UTC (rev 29636)
+++ ZODB/branches/ctheune-blobsupport/src/ZODB/Blobs/TODO.txt	2005-03-22 04:16:03 UTC (rev 29637)
@@ -1,2 +1,4 @@
 
 - Blob instances should clean up temporary files after committing
+
+- Support database import/export

Modified: ZODB/branches/ctheune-blobsupport/src/ZODB/Blobs/interfaces.py
===================================================================
--- ZODB/branches/ctheune-blobsupport/src/ZODB/Blobs/interfaces.py	2005-03-21 23:51:47 UTC (rev 29636)
+++ ZODB/branches/ctheune-blobsupport/src/ZODB/Blobs/interfaces.py	2005-03-22 04:16:03 UTC (rev 29637)
@@ -13,3 +13,20 @@
     # XXX need a method to initialize the blob from the storage
     # this means a) setting the _p_blob_data filename and b) putting
     # the current data in that file
+
+class IBlobStorage(Interface):
+    """A storage supporting BLOBs."""
+
+    def storeBlob(oid, oldserial, data, blob, version, transaction):
+        """Stores data that has a BLOB attached."""
+
+    def loadBlob(oid, serial, version):
+        """Return the filename of the Blob data responding to this OID and
+        serial.
+
+        Returns a filename or None if no Blob data is connected with this OID. 
+        """
+
+    def getBlobDirectory():
+        """
+        """

Modified: ZODB/branches/ctheune-blobsupport/src/ZODB/Blobs/tests/connection.txt
===================================================================
--- ZODB/branches/ctheune-blobsupport/src/ZODB/Blobs/tests/connection.txt	2005-03-21 23:51:47 UTC (rev 29636)
+++ ZODB/branches/ctheune-blobsupport/src/ZODB/Blobs/tests/connection.txt	2005-03-22 04:16:03 UTC (rev 29637)
@@ -23,15 +23,18 @@
     >>> blob = Blob()
     >>> data = blob.open("w")
     >>> data.write("I'm a happy Blob.")
+    >>> data.close()
 
 We also need a database with a blob supporting storage:
 
     >>> from ZODB.MappingStorage import MappingStorage
+    >>> from ZODB.Blobs.BlobStorage import BlobStorage
+    >>> from ZODB.DB import DB
     >>> from tempfile import mkdtemp
     >>> base_storage = MappingStorage("test")
     >>> blob_dir = mkdtemp()
     >>> blob_storage = BlobStorage(blob_dir, base_storage)
-    >>> database = DB(storage)
+    >>> database = DB(blob_storage)
     
 Putting a Blob into a Connection works like every other object:
 
@@ -40,12 +43,11 @@
     >>> root['myblob'] = blob
     >>> import transaction
     >>> transaction.commit()
-    >>> connection.close()
 
 Getting stuff out of there works similar:
 
-    >>> connection = database.open()
-    >>> root = connection.root()
+    >>> connection2 = database.open()
+    >>> root = connection2.root()
     >>> blob2 = root['myblob']
     >>> IBlob.isImplementedBy(blob2)
     True
@@ -56,17 +58,18 @@
 
     >>> no_blob_storage = MappingStorage()
     >>> database2 = DB(no_blob_storage)
-    >>> connection = database.open()
-    >>> root = connection.root()
-    >>> root['myblob'] = blob
-    >>> transaction.commit()
+    >>> connection3 = database2.open()
+    >>> root = connection3.root()
+    >>> root['myblob'] = Blob()
+    >>> transaction.commit()        # doctest: +ELLIPSIS
     Traceback (most recent call last):
         ...
-    POSException.Unsupported: Storing Blobs is not supported.
+    Unsupported: Storing Blobs in <ZODB.MappingStorage.MappingStorage instance at ...> is not supported.
 
 While we are testing this, we don't need the storage directory and databases anymore:
 
-    >>> import os
-    >>> os.unlink(blob_dir)
+    >>> import shutil
+    >>> shutil.rmtree(blob_dir)
+    >>> transaction.abort()
     >>> database.close()
     >>> database2.close()

Modified: ZODB/branches/ctheune-blobsupport/src/ZODB/Blobs/tests/test_doctests.py
===================================================================
--- ZODB/branches/ctheune-blobsupport/src/ZODB/Blobs/tests/test_doctests.py	2005-03-21 23:51:47 UTC (rev 29636)
+++ ZODB/branches/ctheune-blobsupport/src/ZODB/Blobs/tests/test_doctests.py	2005-03-22 04:16:03 UTC (rev 29637)
@@ -15,4 +15,4 @@
 from zope.testing.doctestunit import DocFileSuite
 
 def test_suite():
-    return DocFileSuite("../README.txt")
+    return DocFileSuite("../Blob.txt",  "connection.txt")

Modified: ZODB/branches/ctheune-blobsupport/src/ZODB/Connection.py
===================================================================
--- ZODB/branches/ctheune-blobsupport/src/ZODB/Connection.py	2005-03-21 23:51:47 UTC (rev 29636)
+++ ZODB/branches/ctheune-blobsupport/src/ZODB/Connection.py	2005-03-22 04:16:03 UTC (rev 29637)
@@ -23,21 +23,25 @@
 
 from persistent import PickleCache
 
+# interfaces
+from persistent.interfaces import IPersistentDataManager 
+from ZODB.interfaces import IConnection 
+from ZODB.Blobs.interfaces import IBlob, IBlobStorage
+from transaction.interfaces import IDataManager
+from zope.interface import implements
+
 import transaction
 
 from ZODB.ConflictResolution import ResolvedSerial
 from ZODB.ExportImport import ExportImport
 from ZODB.POSException \
      import ConflictError, ReadConflictError, InvalidObjectReference, \
-            ConnectionStateError
+            ConnectionStateError, Unsupported
 from ZODB.TmpStore import TmpStore
-from ZODB.utils import u64, oid_repr, z64, positive_id
 from ZODB.serialize import ObjectWriter, ConnectionObjectReader, myhasattr
-from ZODB.interfaces import IConnection
-from ZODB.utils import DEPRECATED_ARGUMENT, deprecated36
+from ZODB.utils import u64, oid_repr, z64, positive_id, \
+        DEPRECATED_ARGUMENT, deprecated36
 
-from zope.interface import implements
-
 global_reset_counter = 0
 
 def resetCaches():
@@ -54,127 +58,19 @@
     global_reset_counter += 1
 
 class Connection(ExportImport, object):
-    """Connection to ZODB for loading and storing objects.
+    """Connection to ZODB for loading and storing objects."""
 
-    The Connection object serves as a data manager.  The root() method
-    on a Connection returns the root object for the database.  This
-    object and all objects reachable from it are associated with the
-    Connection that loaded them.  When a transaction commits, it uses
-    the Connection to store modified objects.
+    implements(IConnection, IDataManager, IPersistentDataManager)
 
-    Typical use of ZODB is for each thread to have its own
-    Connection and that no thread should have more than one Connection
-    to the same database.  A thread is associated with a Connection by
-    loading objects from that Connection.  Objects loaded by one
-    thread should not be used by another thread.
-
-    A Connection can be associated with a single version when it is
-    created.  By default, a Connection is not associated with a
-    version; it uses non-version data.
-
-    Each Connection provides an isolated, consistent view of the
-    database, by managing independent copies of objects in the
-    database.  At transaction boundaries, these copies are updated to
-    reflect the current state of the database.
-
-    You should not instantiate this class directly; instead call the
-    open() method of a DB instance.
-
-    In many applications, root() is the only method of the Connection
-    that you will need to use.
-
-    Synchronization
-    ---------------
-
-    A Connection instance is not thread-safe.  It is designed to
-    support a thread model where each thread has its own transaction.
-    If an application has more than one thread that uses the
-    connection or the transaction the connection is registered with,
-    the application should provide locking.
-
-    The Connection manages movement of objects in and out of object
-    storage.
-
-    TODO:  We should document an intended API for using a Connection via
-    multiple threads.
-
-    TODO:  We should explain that the Connection has a cache and that
-    multiple calls to get() will return a reference to the same
-    object, provided that one of the earlier objects is still
-    referenced.  Object identity is preserved within a connection, but
-    not across connections.
-
-    TODO:  Mention the database pool.
-
-    A database connection always presents a consistent view of the
-    objects in the database, although it may not always present the
-    most current revision of any particular object.  Modifications
-    made by concurrent transactions are not visible until the next
-    transaction boundary (abort or commit).
-
-    Two options affect consistency.  By default, the mvcc and synch
-    options are enabled by default.
-
-    If you pass mvcc=True to db.open(), the Connection will never read
-    non-current revisions of an object.  Instead it will raise a
-    ReadConflictError to indicate that the current revision is
-    unavailable because it was written after the current transaction
-    began.
-
-    The logic for handling modifications assumes that the thread that
-    opened a Connection (called db.open()) is the thread that will use
-    the Connection.  If this is not true, you should pass synch=False
-    to db.open().  When the synch option is disabled, some transaction
-    boundaries will be missed by the Connection; in particular, if a
-    transaction does not involve any modifications to objects loaded
-    from the Connection and synch is disabled, the Connection will
-    miss the transaction boundary.  Two examples of this behavior are
-    db.undo() and read-only transactions.
-
-
-    :Groups:
-
-      - `User Methods`: root, get, add, close, db, sync, isReadOnly,
-        cacheGC, cacheFullSweep, cacheMinimize, getVersion,
-        modifiedInVersion
-      - `Experimental Methods`: setLocalTransaction, getTransaction,
-        onCloseCallbacks
-      - `Transaction Data Manager Methods`: tpc_begin, tpc_vote,
-        tpc_finish, tpc_abort, sortKey, abort, commit, commit_sub,
-        abort_sub
-      - `Database Invalidation Methods`: invalidate, _setDB
-      - `IPersistentDataManager Methods`: setstate, register,
-        setklassstate
-      - `Other Methods`: oldstate, exchange, getDebugInfo, setDebugInfo,
-        getTransferCounts
-
-    """
-    implements(IConnection)
-
     _tmp = None
     _code_timestamp = 0
 
+    # ZODB.IConnection
+
     def __init__(self, version='', cache_size=400,
                  cache_deactivate_after=None, mvcc=True, txn_mgr=None,
                  synch=True):
-        """Create a new Connection.
-
-        A Connection instance should by instantiated by the DB
-        instance that it is connected to.
-
-        :Parameters:
-          - `version`: the "version" that all changes will be made
-             in, defaults to no version.
-          - `cache_size`: the target size of the in-memory object
-             cache, measured in objects.
-          - `cache_deactivate_after`: deprecated, ignored
-          - `mvcc`: boolean indicating whether MVCC is enabled
-          - `txn_mgr`: transaction manager to use.  None means
-             used the default transaction manager.
-          - `synch`: boolean indicating whether Connection should
-             register for afterCompletion() calls.
-        """
-
+        """Create a new Connection."""
         self._log = logging.getLogger("ZODB.Connection")
         self._storage = None
         self._debug_info = ()
@@ -220,7 +116,7 @@
         # from a single transaction should be applied atomically, so
         # the lock must be held when reading _invalidated.
 
-        # It sucks that we have to hold the lock to read _invalidated. 
+        # It sucks that we have to hold the lock to read _invalidated.
         # Normally, _invalidated is written by calling dict.update, which
         # will execute atomically by virtue of the GIL.  But some storage
         # might generate oids where hash or compare invokes Python code.  In
@@ -253,79 +149,20 @@
         # to pass to _importDuringCommit().
         self._import = None
 
-    def getTransaction(self):
-        """Get the current transaction for this connection.
+        self.connections = None
 
-        :deprecated:
+    def get_connection(self, database_name):
+        """Return a Connection for the named database."""
+        connection = self.connections.get(database_name)
+        if connection is None:
+            new_con = self._db.databases[database_name].open()
+            self.connections.update(new_con.connections)
+            new_con.connections = self.connections
+            connection = new_con
+        return connection
 
-        The transaction manager's get method works the same as this
-        method.  You can pass a transaction manager (TM) to DB.open()
-        to control which TM the Connection uses.
-        """
-        deprecated36("getTransaction() is deprecated. "
-                     "Use the txn_mgr argument to DB.open() instead.")
-        return self._txn_mgr.get()
-
-    def setLocalTransaction(self):
-        """Use a transaction bound to the connection rather than the thread.
-
-        :deprecated:
-
-        Returns the transaction manager used by the connection.  You
-        can pass a transaction manager (TM) to DB.open() to control
-        which TM the Connection uses.
-        """
-        deprecated36("setLocalTransaction() is deprecated. "
-                     "Use the txn_mgr argument to DB.open() instead.")
-        if self._txn_mgr is transaction.manager:
-            if self._synch:
-                self._txn_mgr.unregisterSynch(self)
-            self._txn_mgr = transaction.TransactionManager()
-            if self._synch:
-                self._txn_mgr.registerSynch(self)
-        return self._txn_mgr
-
-    def _cache_items(self):
-        # find all items on the lru list
-        items = self._cache.lru_items()
-        # fine everything. some on the lru list, some not
-        everything = self._cache.cache_data
-        # remove those items that are on the lru list
-        for k,v in items:
-            del everything[k]
-        # return a list of [ghosts....not recently used.....recently used]
-        return everything.items() + items
-
-    def __repr__(self):
-        if self._version:
-            ver = ' (in version %s)' % `self._version`
-        else:
-            ver = ''
-        return '<Connection at %08x%s>' % (positive_id(self), ver)
-
     def get(self, oid):
-        """Return the persistent object with oid 'oid'.
-
-        If the object was not in the cache and the object's class is
-        ghostable, then a ghost will be returned.  If the object is
-        already in the cache, a reference to the cached object will be
-        returned.
-
-        Applications seldom need to call this method, because objects
-        are loaded transparently during attribute lookup.
-
-        :return: persistent object corresponding to `oid`
-
-        :Parameters:
-          - `oid`: an object id
-
-        :Exceptions:
-          - `KeyError`: if oid does not exist.  It is possible that an
-            object does not exist as of the current transaction, but
-            existed in the past.  It may even exist again in the
-            future, if the transaction that removed it is undone.
-          - `ConnectionStateError`:  if the connection is closed.
-        """
+        """Return the persistent object with oid 'oid'."""
         if self._storage is None:
             raise ConnectionStateError("The database connection is closed")
 
@@ -347,33 +184,8 @@
         self._cache[oid] = obj
         return obj
 
-    # deprecate this method?
-    __getitem__ = get
-
     def add(self, obj):
-        """Add a new object 'obj' to the database and assign it an oid.
-
-        A persistent object is normally added to the database and
-        assigned an oid when it becomes reachable to an object already in
-        the database.  In some cases, it is useful to create a new
-        object and use its oid (_p_oid) in a single transaction.
-
-        This method assigns a new oid regardless of whether the object
-        is reachable.
-
-        The object is added when the transaction commits.  The object
-        must implement the IPersistent interface and must not
-        already be associated with a Connection.
-
-        :Parameters:
-          - `obj`: a Persistent object
-
-        :Exceptions:
-          - `TypeError`: if obj is not a persistent object.
-          - `InvalidObjectReference`: if obj is already associated
-            with another connection.
-          - `ConnectionStateError`: if the connection is closed.
-        """
+        """Add a new object 'obj' to the database and assign it an oid."""
         if self._storage is None:
             raise ConnectionStateError("The database connection is closed")
 
@@ -397,72 +209,11 @@
             raise InvalidObjectReference(obj, obj._p_jar)
 
     def sortKey(self):
-        # If two connections use the same storage, give them a
-        # consistent order using id().  This is unique for the
-        # lifetime of a connection, which is good enough.
-        return "%s:%s" % (self._sortKey(), id(self))
+        """Return a consistent sort key for this connection."""
+        return "%s:%s" % (self._storage.sortKey(), id(self))
 
-    def _setDB(self, odb, mvcc=None, txn_mgr=None, synch=None):
-        """Register odb, the DB that this Connection uses.
-
-        This method is called by the DB every time a Connection
-        is opened.  Any invalidations received while the Connection
-        was closed will be processed.
-
-        If the global module function resetCaches() was called, the
-        cache will be cleared.
-
-        :Parameters:
-          - `odb`: database that owns the Connection
-          - `mvcc`: boolean indicating whether MVCC is enabled
-          - `txn_mgr`: transaction manager to use.  None means
-             used the default transaction manager.
-          - `synch`: boolean indicating whether Connection should
-             register for afterCompletion() calls.
-        """
-
-        # TODO:  Why do we go to all the trouble of setting _db and
-        # other attributes on open and clearing them on close?
-        # A Connection is only ever associated with a single DB
-        # and Storage.
-
-        self._db = odb
-        self._storage = odb._storage
-        self._sortKey = odb._storage.sortKey
-        self.new_oid = odb._storage.new_oid
-        self._opened = time()
-        if synch is not None:
-            self._synch = synch
-        if mvcc is not None:
-            self._mvcc = mvcc
-        self._txn_mgr = txn_mgr or transaction.manager
-        if self._reset_counter != global_reset_counter:
-            # New code is in place.  Start a new cache.
-            self._resetCache()
-        else:
-            self._flush_invalidations()
-        if self._synch:
-            self._txn_mgr.registerSynch(self)
-        self._reader = ConnectionObjectReader(self, self._cache,
-                                              self._db.classFactory)
-
-    def _resetCache(self):
-        """Creates a new cache, discarding the old one.
-
-        See the docstring for the resetCaches() function.
-        """
-        self._reset_counter = global_reset_counter
-        self._invalidated.clear()
-        cache_size = self._cache.cache_size
-        self._cache = cache = PickleCache(self, cache_size)
-
     def abort(self, transaction):
-        """Abort modifications to registered objects.
-
-        This tells the cache to invalidate changed objects.  _p_jar
-        and _p_oid are deleted from new objects.
-        """
-
+        """Abort a transaction and forget all changes."""
         for obj in self._registered_objects:
             oid = obj._p_oid
             assert oid is not None
@@ -475,70 +226,22 @@
 
         self._tpc_cleanup()
 
-    # Should there be a way to call incrgc directly?
-    # Perhaps "full sweep" should do that?
+    # TODO: we should test what happens when cacheGC is called mid-transaction.
 
-    # TODO: we should test what happens when these methods are called
-    # mid-transaction.
-
-    def cacheFullSweep(self, dt=None):
-        deprecated36("cacheFullSweep is deprecated. "
-                     "Use cacheMinimize instead.")
-        if dt is None:
-            self._cache.full_sweep()
-        else:
-            self._cache.full_sweep(dt)
-
-    def cacheMinimize(self, dt=DEPRECATED_ARGUMENT):
-        """Deactivate all unmodified objects in the cache.
-
-        Call _p_deactivate() on each cached object, attempting to turn
-        it into a ghost.  It is possible for individual objects to
-        remain active.
-
-        :Parameters:
-          - `dt`: ignored.  It is provided only for backwards compatibility.
-        """
-        if dt is not DEPRECATED_ARGUMENT:
-            deprecated36("cacheMinimize() dt= is ignored.")
-        self._cache.minimize()
-
     def cacheGC(self):
-        """Reduce cache size to target size.
-
-        Call _p_deactivate() on cached objects until the cache size
-        falls under the target size.
-        """
+        """Reduce cache size to target size."""
         self._cache.incrgc()
 
     __onCloseCallbacks = None
 
     def onCloseCallback(self, f):
-        """Register a callable, f, to be called by close().
-
-        The callable, f, will be called at most once, the next time
-        the Connection is closed.
-
-        :Parameters:
-          - `f`: object that will be called on `close`
-        """
+        """Register a callable, f, to be called by close()."""
         if self.__onCloseCallbacks is None:
             self.__onCloseCallbacks = []
         self.__onCloseCallbacks.append(f)
 
     def close(self):
-        """Close the Connection.
-
-        A closed Connection should not be used by client code.  It
-        can't load or store objects.  Objects in the cache are not
-        freed, because Connections are re-used and the cache are
-        expected to be useful to the next client.
-
-        When the Connection is closed, all callbacks registered by
-        onCloseCallback() are invoked and the cache is scanned for
-        old objects.
-        """
-
+        """Close the Connection."""
         if not self._needs_to_join:
             # We're currently joined to a transaction.
             raise ConnectionStateError("Cannot close a connection joined to "
@@ -575,7 +278,10 @@
             # assert that here, because self may have been reused (by
             # another thread) by the time we get back here.
 
+    # transaction.interfaces.IDataManager
+
     def commit(self, transaction):
+        """Commit changes to an object"""
         if self._import:
             # TODO:  This code seems important for Zope, but needs docs
             # to explain why.
@@ -636,8 +342,20 @@
                 self._modified.append(oid)
             p = writer.serialize(obj)  # This calls __getstate__ of obj
 
-            s = self._storage.store(oid, serial, p, self._version, transaction)
+            if IBlob.providedBy(obj):
+                if not IBlobStorage.providedBy(self._storage):
+                    raise Unsupported(
+                        "Storing Blobs in %s is not supported." % 
+                        repr(self._storage))
+                s = self._storage.storeBlob(oid, serial, p,
+                                            obj._p_blob_uncommitted,
+                                            self._version, transaction)
+            else:
+                s = self._storage.store(oid, serial, p, self._version,
+                                        transaction)
             self._store_count += 1
+
+                                        
             # Put the object in the cache before handling the
             # response, just in case the response contains the
             # serial number for a newly created object
@@ -652,9 +370,17 @@
                     raise
 
             self._handle_serial(s, oid)
+            
+            if IBlob.providedBy(obj):
+                # We need to update internals of the blobs here
+                obj._p_blob_uncommitted = None
+                obj._p_blob_data = \
+                        self._storage.loadBlob(oid, obj._p_serial, 
+                                               self._version )
 
     def commit_sub(self, t):
-        """Commit all work done in all subtransactions for this transaction."""
+        """Commit all changes made in subtransactions and begin 2-phase commit
+        """
         if self._tmp is None:
             return
         src = self._storage
@@ -671,11 +397,16 @@
 
         for oid in oids:
             data, serial = src.load(oid, src)
-            s = self._storage.store(oid, serial, data, self._version, t)
+            blobfile = src.loadBlob(oid, serial, self._version)
+            if blobfile is not None:
+                s = self._storage.storeBlob(oid, serial, data, blobfile,
+                                        self._version, t)
+            else:
+                s = self._storage.store(oid, serial, data, self._version, t)
             self._handle_serial(s, oid, change=False)
 
     def abort_sub(self, t):
-        """Abort work done in all subtransactions for this transaction."""
+        """Discard all subtransaction data."""
         if self._tmp is None:
             return
         src = self._storage
@@ -686,7 +417,7 @@
         self._invalidate_creating(src._creating)
 
     def _invalidate_creating(self, creating=None):
-        """Dissown any objects newly saved in an uncommitted transaction."""
+        """Disown any objects newly saved in an uncommitted transaction."""
         if creating is None:
             creating = self._creating
             self._creating = []
@@ -698,34 +429,42 @@
                 del o._p_jar
                 del o._p_oid
 
+    # The next two methods are callbacks for transaction synchronization.
+
+    def beforeCompletion(self, txn):
+        # We don't do anything before a commit starts.
+        pass
+
+    def afterCompletion(self, txn):
+        self._flush_invalidations()
+
+    def _flush_invalidations(self):
+        self._inv_lock.acquire()
+        try:
+            self._cache.invalidate(self._invalidated)
+            self._invalidated.clear()
+            self._txn_time = None
+        finally:
+            self._inv_lock.release()
+        # Now is a good time to collect some garbage
+        self._cache.incrgc()
+
+    def root(self):
+        """Return the database root object."""
+        return self.get(z64)
+
     def db(self):
+        """Returns a handle to the database this connection belongs to."""
         return self._db
 
-    def getVersion(self):
-        if self._storage is None:
-            raise ConnectionStateError("The database connection is closed")
-        return self._version
-
     def isReadOnly(self):
+        """Returns True if the storage for this connection is read only."""
         if self._storage is None:
             raise ConnectionStateError("The database connection is closed")
         return self._storage.isReadOnly()
 
     def invalidate(self, tid, oids):
-        """Notify the Connection that transaction 'tid' invalidated oids.
-
-        When the next transaction boundary is reached, objects will be
-        invalidated.  If any of the invalidated objects is accessed by
-        the current transaction, the revision written before C{tid}
-        will be used.
-
-        The DB calls this method, even when the Connection is closed.
-
-        :Parameters:
-          - `tid`: the storage-level id of the transaction that committed
-          - `oids`: oids is a set of oids, represented as a dict with oids
-            as keys.
-        """
+        """Notify the Connection that transaction 'tid' invalidated oids."""
         self._inv_lock.acquire()
         try:
             if self._txn_time is None:
@@ -734,72 +473,149 @@
         finally:
             self._inv_lock.release()
 
-    # The next two methods are callbacks for transaction synchronization.
+    # IDataManager
 
-    def beforeCompletion(self, txn):
-        # We don't do anything before a commit starts.
-        pass
+    def tpc_begin(self, transaction, sub=False):
+        """Begin commit of a transaction, starting the two-phase commit."""
+        self._modified = []
 
-    def afterCompletion(self, txn):
-        self._flush_invalidations()
+        # _creating is a list of oids of new objects, which is used to
+        # remove them from the cache if a transaction aborts.
+        self._creating = []
+        if sub and self._tmp is None:
+            # Sub-transaction!
+            self._tmp = self._storage
+            self._storage = TmpStore(self._version, self._storage)
 
-    def _flush_invalidations(self):
-        self._inv_lock.acquire()
-        try:
-            self._cache.invalidate(self._invalidated)
-            self._invalidated.clear()
-            self._txn_time = None
-        finally:
-            self._inv_lock.release()
-        # Now is a good time to collect some garbage
-        self._cache.incrgc()
+        self._storage.tpc_begin(transaction)
 
-    def modifiedInVersion(self, oid):
+    def tpc_vote(self, transaction):
+        """Verify that a data manager can commit the transaction."""
         try:
-            return self._db.modifiedInVersion(oid)
-        except KeyError:
-            return self._version
+            vote = self._storage.tpc_vote
+        except AttributeError:
+            return
+        s = vote(transaction)
+        self._handle_serial(s)
 
-    def register(self, obj):
-        """Register obj with the current transaction manager.
+    def _handle_serial(self, store_return, oid=None, change=1):
+        """Handle the returns from store() and tpc_vote() calls."""
 
-        A subclass could override this method to customize the default
-        policy of one transaction manager for each thread.
+        # These calls can return different types depending on whether
+        # ZEO is used.  ZEO uses asynchronous returns that may be
+        # returned in batches by the ClientStorage.  ZEO1 can also
+        # return an exception object and expect that the Connection
+        # will raise the exception.
 
-        obj must be an object loaded from this Connection.
-        """
-        assert obj._p_jar is self
-        if obj._p_oid is None:
-            # There is some old Zope code that assigns _p_jar
-            # directly.  That is no longer allowed, but we need to
-            # provide support for old code that still does it.
+        # When commit_sub() exceutes a store, there is no need to
+        # update the _p_changed flag, because the subtransaction
+        # tpc_vote() calls already did this.  The change=1 argument
+        # exists to allow commit_sub() to avoid setting the flag
+        # again.
 
-            # The actual complaint here is that an object without
-            # an oid is being registered.  I can't think of any way to
-            # achieve that without assignment to _p_jar.  If there is
-            # a way, this will be a very confusing warning.
-            deprecated36("Assigning to _p_jar is deprecated, and will be "
-                         "changed to raise an exception.")
-        elif obj._p_oid in self._added:
-            # It was registered before it was added to _added.
+        # When conflict resolution occurs, the object state held by
+        # the connection does not match what is written to the
+        # database.  Invalidate the object here to guarantee that
+        # the new state is read the next time the object is used.
+
+        if not store_return:
             return
-        self._register(obj)
+        if isinstance(store_return, str):
+            assert oid is not None
+            self._handle_one_serial(oid, store_return, change)
+        else:
+            for oid, serial in store_return:
+                self._handle_one_serial(oid, serial, change)
 
-    def _register(self, obj=None):
-        if obj is not None:
-            self._registered_objects.append(obj)
-        if self._needs_to_join:
-            self._txn_mgr.get().join(self)
-            self._needs_to_join = False
+    def _handle_one_serial(self, oid, serial, change):
+        if not isinstance(serial, str):
+            raise serial
+        obj = self._cache.get(oid, None)
+        if obj is None:
+            return
+        if serial == ResolvedSerial:
+            del obj._p_changed # transition from changed to ghost
+        else:
+            if change:
+                obj._p_changed = 0 # transition from changed to up-to-date
+            obj._p_serial = serial
 
-    def root(self):
-        """Return the database root object.
+    def tpc_finish(self, transaction):
+        """Indicate confirmation that the transaction is done."""
+        if self._tmp is not None:
+            # Commiting a subtransaction!
+            # There is no need to invalidate anything.
+            self._storage.tpc_finish(transaction)
+            self._storage._creating[:0]=self._creating
+            del self._creating[:]
+        else:
+            def callback(tid):
+                d = {}
+                for oid in self._modified:
+                    d[oid] = 1
+                self._db.invalidate(tid, d, self)
+            self._storage.tpc_finish(transaction, callback)
+        self._tpc_cleanup()
 
-        The root is a persistent.mapping.PersistentMapping.
-        """
-        return self.get(z64)
+    def tpc_abort(self, transaction):
+        """Abort a transaction."""
+        if self._import:
+            self._import = None
+        self._storage.tpc_abort(transaction)
+        self._cache.invalidate(self._modified)
+        self._invalidate_creating()
+        while self._added:
+            oid, obj = self._added.popitem()
+            del obj._p_oid
+            del obj._p_jar
+        self._tpc_cleanup()
 
+    def _tpc_cleanup(self):
+        """Performs cleanup operations to support tpc_finish and tpc_abort."""
+        self._conflicts.clear()
+        if not self._synch:
+            self._flush_invalidations()
+        self._needs_to_join = True
+        self._registered_objects = []
+
+    def sync(self):
+        """Manually update the view on the database."""
+        self._txn_mgr.get().abort()
+        sync = getattr(self._storage, 'sync', 0)
+        if sync:
+            sync()
+        self._flush_invalidations()
+
+    def getDebugInfo(self):
+        """Returns a tuple with different items for debugging the
+        connection.
+        """ 
+        return self._debug_info
+
+    def setDebugInfo(self, *args):
+        """Add the given items to the debug information of this connection."""
+        self._debug_info = self._debug_info + args
+
+    def getTransferCounts(self, clear=False):
+        """Returns the number of objects loaded and stored."""
+        res = self._load_count, self._store_count
+        if clear:
+            self._load_count = 0
+            self._store_count = 0
+        return res
+
+    ##############################################
+    # persistent.interfaces.IPersistentDatamanager
+
+    def oldstate(self, obj, tid):
+        """Return copy of 'obj' that was written by transaction 'tid'."""
+        assert obj._p_jar is self
+        p = self._storage.loadSerial(obj._p_oid, tid)
+        return self._reader.getState(p)
+
     def setstate(self, obj):
+        """Turns the ghost 'obj' into a real object by loading it's from the
+        database."""
         oid = obj._p_oid
 
         if self._storage is None:
@@ -845,12 +661,15 @@
             self._load_before_or_conflict(obj)
             return
 
-        p, serial = self._storage.load(obj._p_oid, self._version)
+        oid = obj._p_oid
+
+        p, serial = self._storage.load(oid, self._version)
         self._load_count += 1
 
+
         self._inv_lock.acquire()
         try:
-            invalid = obj._p_oid in self._invalidated
+            invalid = oid in self._invalidated
         finally:
             self._inv_lock.release()
 
@@ -866,9 +685,13 @@
         self._reader.setGhostState(obj, p)
         obj._p_serial = serial
 
+        # Blob support
+        if IBlob.providedBy(obj):
+            obj._p_blob_data = \
+                    self._storage.loadBlob(oid, serial, self._version)
+
     def _load_before_or_conflict(self, obj):
         """Load non-current state for obj or raise ReadConflictError."""
-
         if not (self._mvcc and self._setstate_noncurrent(obj)):
             self._register(obj)
             self._conflicts[obj._p_oid] = True
@@ -917,27 +740,137 @@
             self._register(obj)
             raise ReadConflictError(object=obj)
 
-    def oldstate(self, obj, tid):
-        """Return copy of obj that was written by tid.
+    def register(self, obj):
+        """Register obj with the current transaction manager.
 
-        The returned object does not have the typical metadata
-        (_p_jar, _p_oid, _p_serial) set.  I'm not sure how references
-        to other peristent objects are handled.
+        A subclass could override this method to customize the default
+        policy of one transaction manager for each thread.
 
-        :return: a persistent object
+        obj must be an object loaded from this Connection.
+        """
+        assert obj._p_jar is self
+        if obj._p_oid is None:
+            # There is some old Zope code that assigns _p_jar
+            # directly.  That is no longer allowed, but we need to
+            # provide support for old code that still does it.
 
-        :Parameters:
-          - `obj`: a persistent object from this Connection.
-          - `tid`: id of a transaction that wrote an earlier revision.
+            # The actual complaint here is that an object without
+            # an oid is being registered.  I can't think of any way to
+            # achieve that without assignment to _p_jar.  If there is
+            # a way, this will be a very confusing warning.
+            deprecated36("Assigning to _p_jar is deprecated, and will be "
+                         "changed to raise an exception.")
+        elif obj._p_oid in self._added:
+            # It was registered before it was added to _added.
+            return
+        self._register(obj)
 
-        :Exceptions:
-          - `KeyError`: if tid does not exist or if tid deleted a revision
-            of obj.
+    def _register(self, obj=None):
+        if obj is not None:
+            self._registered_objects.append(obj)
+        if self._needs_to_join:
+            self._txn_mgr.get().join(self)
+            self._needs_to_join = False
+
+    # PROTECTED stuff (used by e.g. ZODB.DB.DB)
+
+    def _cache_items(self):
+        # find all items on the lru list
+        items = self._cache.lru_items()
+        # fine everything. some on the lru list, some not
+        everything = self._cache.cache_data
+        # remove those items that are on the lru list
+        for k,v in items:
+            del everything[k]
+        # return a list of [ghosts....not recently used.....recently used]
+        return everything.items() + items
+
+    def _setDB(self, odb, mvcc=None, txn_mgr=None, synch=None):
+        """Register odb, the DB that this Connection uses.
+
+        This method is called by the DB every time a Connection
+        is opened.  Any invalidations received while the Connection
+        was closed will be processed.
+
+        If the global module function resetCaches() was called, the
+        cache will be cleared.
+
+        Parameters:
+        odb: database that owns the Connection
+        mvcc: boolean indicating whether MVCC is enabled
+        txn_mgr: transaction manager to use.  None means
+            used the default transaction manager.
+        synch: boolean indicating whether Connection should
+        register for afterCompletion() calls.
         """
-        assert obj._p_jar is self
-        p = self._storage.loadSerial(obj._p_oid, tid)
-        return self._reader.getState(p)
 
+        # TODO:  Why do we go to all the trouble of setting _db and
+        # other attributes on open and clearing them on close?
+        # A Connection is only ever associated with a single DB
+        # and Storage.
+
+        self._db = odb
+        self._storage = odb._storage
+        self.new_oid = odb._storage.new_oid
+        self._opened = time()
+        if synch is not None:
+            self._synch = synch
+        if mvcc is not None:
+            self._mvcc = mvcc
+        self._txn_mgr = txn_mgr or transaction.manager
+        if self._reset_counter != global_reset_counter:
+            # New code is in place.  Start a new cache.
+            self._resetCache()
+        else:
+            self._flush_invalidations()
+        if self._synch:
+            self._txn_mgr.registerSynch(self)
+        self._reader = ConnectionObjectReader(self, self._cache,
+                                              self._db.classFactory)
+
+        # Multi-database support
+        self.connections = {self._db.database_name: self}
+
+    def _resetCache(self):
+        """Creates a new cache, discarding the old one.
+
+        See the docstring for the resetCaches() function.
+        """
+        self._reset_counter = global_reset_counter
+        self._invalidated.clear()
+        cache_size = self._cache.cache_size
+        self._cache = cache = PickleCache(self, cache_size)
+
+    # Python protocol
+
+    def __repr__(self):
+        if self._version:
+            ver = ' (in version %s)' % `self._version`
+        else:
+            ver = ''
+        return '<Connection at %08x%s>' % (positive_id(self), ver)
+
+    # DEPRECATION candidates
+
+    __getitem__ = get
+
+    def modifiedInVersion(self, oid):
+        """Returns the version the object with the given oid was modified in.
+
+        If it wasn't modified in a version, the current version of this 
+        connection is returned.
+        """
+        try:
+            return self._db.modifiedInVersion(oid)
+        except KeyError:
+            return self.getVersion()
+
+    def getVersion(self):
+        """Returns the version this connection is attached to."""
+        if self._storage is None:
+            raise ConnectionStateError("The database connection is closed")
+        return self._version
+
     def setklassstate(self, obj):
         # Special case code to handle ZClasses, I think.
         # Called the cache when an object of type type is invalidated.
@@ -959,141 +892,60 @@
             self._log.error("setklassstate failed", exc_info=sys.exc_info())
             raise
 
-    def tpc_begin(self, transaction, sub=False):
-        self._modified = []
+    def exchange(self, old, new):
+        # called by a ZClasses method that isn't executed by the test suite
+        oid = old._p_oid
+        new._p_oid = oid
+        new._p_jar = self
+        new._p_changed = 1
+        self._register(new)
+        self._cache[oid] = new
 
-        # _creating is a list of oids of new objects, which is used to
-        # remove them from the cache if a transaction aborts.
-        self._creating = []
-        if sub and self._tmp is None:
-            # Sub-transaction!
-            self._tmp = self._storage
-            self._storage = TmpStore(self._version, self._storage)
+    # DEPRECATED methods
 
-        self._storage.tpc_begin(transaction)
+    def getTransaction(self):
+        """Get the current transaction for this connection.
 
-    def tpc_vote(self, transaction):
-        try:
-            vote = self._storage.tpc_vote
-        except AttributeError:
-            return
-        s = vote(transaction)
-        self._handle_serial(s)
+        :deprecated:
 
-    def _handle_serial(self, store_return, oid=None, change=1):
-        """Handle the returns from store() and tpc_vote() calls."""
+        The transaction manager's get method works the same as this
+        method.  You can pass a transaction manager (TM) to DB.open()
+        to control which TM the Connection uses.
+        """
+        deprecated36("getTransaction() is deprecated. "
+                     "Use the txn_mgr argument to DB.open() instead.")
+        return self._txn_mgr.get()
 
-        # These calls can return different types depending on whether
-        # ZEO is used.  ZEO uses asynchronous returns that may be
-        # returned in batches by the ClientStorage.  ZEO1 can also
-        # return an exception object and expect that the Connection
-        # will raise the exception.
+    def setLocalTransaction(self):
+        """Use a transaction bound to the connection rather than the thread.
 
-        # When commit_sub() exceutes a store, there is no need to
-        # update the _p_changed flag, because the subtransaction
-        # tpc_vote() calls already did this.  The change=1 argument
-        # exists to allow commit_sub() to avoid setting the flag
-        # again.
+        :deprecated:
 
-        # When conflict resolution occurs, the object state held by
-        # the connection does not match what is written to the
-        # database.  Invalidate the object here to guarantee that
-        # the new state is read the next time the object is used.
+        Returns the transaction manager used by the connection.  You
+        can pass a transaction manager (TM) to DB.open() to control
+        which TM the Connection uses.
+        """
+        deprecated36("setLocalTransaction() is deprecated. "
+                     "Use the txn_mgr argument to DB.open() instead.")
+        if self._txn_mgr is transaction.manager:
+            if self._synch:
+                self._txn_mgr.unregisterSynch(self)
+            self._txn_mgr = transaction.TransactionManager()
+            if self._synch:
+                self._txn_mgr.registerSynch(self)
+        return self._txn_mgr
 
-        if not store_return:
-            return
-        if isinstance(store_return, str):
-            assert oid is not None
-            self._handle_one_serial(oid, store_return, change)
+    def cacheFullSweep(self, dt=None):
+        deprecated36("cacheFullSweep is deprecated. "
+                     "Use cacheMinimize instead.")
+        if dt is None:
+            self._cache.full_sweep()
         else:
-            for oid, serial in store_return:
-                self._handle_one_serial(oid, serial, change)
+            self._cache.full_sweep(dt)
 
-    def _handle_one_serial(self, oid, serial, change):
-        if not isinstance(serial, str):
-            raise serial
-        obj = self._cache.get(oid, None)
-        if obj is None:
-            return
-        if serial == ResolvedSerial:
-            del obj._p_changed # transition from changed to ghost
-        else:
-            if change:
-                obj._p_changed = 0 # transition from changed to up-to-date
-            obj._p_serial = serial
+    def cacheMinimize(self, dt=DEPRECATED_ARGUMENT):
+        """Deactivate all unmodified objects in the cache."""
+        if dt is not DEPRECATED_ARGUMENT:
+            deprecated36("cacheMinimize() dt= is ignored.")
+        self._cache.minimize()
 
-    def tpc_finish(self, transaction):
-        # It's important that the storage calls the function we pass
-        # while it still has its lock.  We don't want another thread
-        # to be able to read any updated data until we've had a chance
-        # to send an invalidation message to all of the other
-        # connections!
-
-        if self._tmp is not None:
-            # Commiting a subtransaction!
-            # There is no need to invalidate anything.
-            self._storage.tpc_finish(transaction)
-            self._storage._creating[:0]=self._creating
-            del self._creating[:]
-        else:
-            def callback(tid):
-                d = {}
-                for oid in self._modified:
-                    d[oid] = 1
-                self._db.invalidate(tid, d, self)
-            self._storage.tpc_finish(transaction, callback)
-        self._tpc_cleanup()
-
-    def tpc_abort(self, transaction):
-        if self._import:
-            self._import = None
-        self._storage.tpc_abort(transaction)
-        self._cache.invalidate(self._modified)
-        self._invalidate_creating()
-        while self._added:
-            oid, obj = self._added.popitem()
-            del obj._p_oid
-            del obj._p_jar
-        self._tpc_cleanup()
-
-    # Common cleanup actions after tpc_finish/tpc_abort.
-    def _tpc_cleanup(self):
-        self._conflicts.clear()
-        if not self._synch:
-            self._flush_invalidations()
-        self._needs_to_join = True
-        self._registered_objects = []
-
-
-    def sync(self):
-        self._txn_mgr.get().abort()
-        sync = getattr(self._storage, 'sync', 0)
-        if sync:
-            sync()
-        self._flush_invalidations()
-
-    def getDebugInfo(self):
-        return self._debug_info
-
-    def setDebugInfo(self, *args):
-        self._debug_info = self._debug_info + args
-
-    def getTransferCounts(self, clear=False):
-        """Returns the number of objects loaded and stored.
-
-        If clear is True, reset the counters.
-        """
-        res = self._load_count, self._store_count
-        if clear:
-            self._load_count = 0
-            self._store_count = 0
-        return res
-
-    def exchange(self, old, new):
-        # called by a ZClasses method that isn't executed by the test suite
-        oid = old._p_oid
-        new._p_oid = oid
-        new._p_jar = self
-        new._p_changed = 1
-        self._register(new)
-        self._cache[oid] = new

Modified: ZODB/branches/ctheune-blobsupport/src/ZODB/DB.py
===================================================================
--- ZODB/branches/ctheune-blobsupport/src/ZODB/DB.py	2005-03-21 23:51:47 UTC (rev 29636)
+++ ZODB/branches/ctheune-blobsupport/src/ZODB/DB.py	2005-03-22 04:16:03 UTC (rev 29637)
@@ -27,6 +27,9 @@
 from ZODB.utils import WeakSet
 from ZODB.utils import DEPRECATED_ARGUMENT, deprecated36
 
+from zope.interface import implements
+from ZODB.interfaces import IDatabase
+
 import transaction
 
 logger = logging.getLogger('ZODB.DB')
@@ -178,6 +181,7 @@
         setCacheDeactivateAfter,
         getVersionCacheDeactivateAfter, setVersionCacheDeactivateAfter
     """
+    implements(IDatabase)
 
     klass = Connection  # Class to use for connections
     _activity_monitor = None
@@ -188,6 +192,8 @@
                  cache_deactivate_after=DEPRECATED_ARGUMENT,
                  version_pool_size=3,
                  version_cache_size=100,
+                 database_name='unnamed',
+                 databases=None,
                  version_cache_deactivate_after=DEPRECATED_ARGUMENT,
                  ):
         """Create an object database.
@@ -248,6 +254,16 @@
             storage.tpc_vote(t)
             storage.tpc_finish(t)
 
+        # Multi-database setup.
+        if databases is None:
+            databases = {}
+        self.databases = databases
+        self.database_name = database_name
+        if database_name in databases:
+            raise ValueError("database_name %r already in databases" %
+                             database_name)
+        databases[database_name] = self
+
         # Pass through methods:
         for m in ['history', 'supportsUndo', 'supportsVersions', 'undoLog',
                   'versionEmpty', 'versions']:
@@ -565,7 +581,7 @@
         def get_info(c):
             # `result`, `time` and `version` are lexically inherited.
             o = c._opened
-            d = c._debug_info
+            d = c.getDebugInfo()
             if d:
                 if len(d) == 1:
                     d = d[0]

Modified: ZODB/branches/ctheune-blobsupport/src/ZODB/FileStorage/FileStorage.py
===================================================================
--- ZODB/branches/ctheune-blobsupport/src/ZODB/FileStorage/FileStorage.py	2005-03-21 23:51:47 UTC (rev 29636)
+++ ZODB/branches/ctheune-blobsupport/src/ZODB/FileStorage/FileStorage.py	2005-03-22 04:16:03 UTC (rev 29637)
@@ -547,6 +547,7 @@
             self._lock_release()
 
     def load(self, oid, version):
+        """Return pickle data and serial number."""
         self._lock_acquire()
         try:
             pos = self._lookup_pos(oid)
@@ -629,7 +630,7 @@
         finally:
             self._lock_release()
 
-    def store(self, oid, serial, data, version, transaction):
+    def store(self, oid, oldserial, data, version, transaction):
         if self._is_read_only:
             raise POSException.ReadOnlyError()
         if transaction is not self._transaction:
@@ -652,12 +653,12 @@
                         pnv = h.pnv
                     cached_tid = h.tid
 
-                if serial != cached_tid:
+                if oldserial != cached_tid:
                     rdata = self.tryToResolveConflict(oid, cached_tid,
-                                                     serial, data)
+                                                     oldserial, data)
                     if rdata is None:
                         raise POSException.ConflictError(
-                            oid=oid, serials=(cached_tid, serial), data=data)
+                            oid=oid, serials=(cached_tid, oldserial), data=data)
                     else:
                         data = rdata
 
@@ -687,7 +688,7 @@
                 raise FileStorageQuotaError(
                     "The storage quota has been exceeded.")
 
-            if old and serial != cached_tid:
+            if old and oldserial != cached_tid:
                 return ConflictResolution.ResolvedSerial
             else:
                 return self._tid

Modified: ZODB/branches/ctheune-blobsupport/src/ZODB/FileStorage/format.py
===================================================================
--- ZODB/branches/ctheune-blobsupport/src/ZODB/FileStorage/format.py	2005-03-21 23:51:47 UTC (rev 29636)
+++ ZODB/branches/ctheune-blobsupport/src/ZODB/FileStorage/format.py	2005-03-22 04:16:03 UTC (rev 29637)
@@ -68,16 +68,16 @@
 #
 #   - 8-byte data length
 #
-#   ? 8-byte position of non-version data
+#   ? 8-byte position of non-version data record
 #     (if version length > 0)
 #
 #   ? 8-byte position of previous record in this version
 #     (if version length > 0)
 #
-#   ?   version string
+#   ? version string
 #     (if version length > 0)
 #
-#   ?   data
+#   ? data
 #     (data length > 0)
 #
 #   ? 8-byte position of data record containing data

Modified: ZODB/branches/ctheune-blobsupport/src/ZODB/TmpStore.py
===================================================================
--- ZODB/branches/ctheune-blobsupport/src/ZODB/TmpStore.py	2005-03-21 23:51:47 UTC (rev 29636)
+++ ZODB/branches/ctheune-blobsupport/src/ZODB/TmpStore.py	2005-03-22 04:16:03 UTC (rev 29637)
@@ -12,8 +12,11 @@
 #
 ##############################################################################
 
+from zope.interface import implements
+
+from ZODB.Blobs.interfaces import IBlobStorage
 from ZODB import POSException
-from ZODB.utils import p64, u64, z64
+from ZODB.utils import p64, u64, z64, cp
 
 import tempfile
 
@@ -22,6 +25,8 @@
 
     _bver = ''
 
+    implements(IBlobStorage)
+
     def __init__(self, base_version, storage):
         self._transaction = None
         self._storage = storage
@@ -37,6 +42,8 @@
         self._tindex = {}
         self._creating = []
 
+        self.blob_files = {}
+
     def close(self):
         self._file.close()
 
@@ -61,6 +68,9 @@
         serial = h[:8]
         return self._file.read(size), serial
 
+    def sortKey(self):
+        return self._storage.sortKey()
+
     # TODO: clarify difference between self._storage & self._db._storage
 
     def modifiedInVersion(self, oid):
@@ -119,5 +129,27 @@
 
     def versionEmpty(self, version):
         # TODO: what is this supposed to do?
+        # NOTE: This appears to implement the opposite of what it should.
         if version == self._bver:
             return len(self._index)
+
+    # Blob support
+
+    def loadBlob(self, oid, serial, version):
+        return self.blob_files.get(oid)
+        
+    def storeBlob(self, oid, oldserial, data, blobfile, version, transaction):
+        result = self.store(oid, oldserial, data, version, transaction)
+
+        target = file(self.generateBlobFile(oid), "w")
+        src = file(blobfile, "r")
+        cp(src, target)
+
+        return result
+        
+    def generateBlobFile(self, oid):
+        if not self.blob_files.has_key(oid):
+            handle, name = tempfile.mkstemp()
+            handle.close()
+            self.blob_files[oid] = name
+        return self.blob_files[oid]

Modified: ZODB/branches/ctheune-blobsupport/src/ZODB/component.xml
===================================================================
--- ZODB/branches/ctheune-blobsupport/src/ZODB/component.xml	2005-03-21 23:51:47 UTC (rev 29636)
+++ ZODB/branches/ctheune-blobsupport/src/ZODB/component.xml	2005-03-22 04:16:03 UTC (rev 29637)
@@ -158,4 +158,15 @@
     <key name="version-cache-size" datatype="integer" default="100"/>
   </sectiontype>
 
+  <sectiontype name="blobfilestorage" datatype=".BlobFileStorage"
+    implements="ZODB.storage" extends="filestorage">
+    <key name="blob-dir" required="yes">
+      <description>
+        Path name to the blob storage directory.
+      </description>
+    </key>
+  </sectiontype>
+    
+
+
 </component>

Modified: ZODB/branches/ctheune-blobsupport/src/ZODB/config.py
===================================================================
--- ZODB/branches/ctheune-blobsupport/src/ZODB/config.py	2005-03-21 23:51:47 UTC (rev 29636)
+++ ZODB/branches/ctheune-blobsupport/src/ZODB/config.py	2005-03-22 04:16:03 UTC (rev 29637)
@@ -132,6 +132,15 @@
                            read_only=self.config.read_only,
                            quota=self.config.quota)
 
+class BlobFileStorage(FileStorage):
+
+    def open(self):
+        from ZODB.Blobs.BlobStorage import BlobStorage
+        base_storage = FileStorage.open(self)
+        return BlobStorage(self.config.blob_dir, base_storage)
+
+
+        
 class ZEOClient(BaseConfig):
 
     def open(self):

Modified: ZODB/branches/ctheune-blobsupport/src/ZODB/interfaces.py
===================================================================
--- ZODB/branches/ctheune-blobsupport/src/ZODB/interfaces.py	2005-03-21 23:51:47 UTC (rev 29636)
+++ ZODB/branches/ctheune-blobsupport/src/ZODB/interfaces.py	2005-03-22 04:16:03 UTC (rev 29637)
@@ -16,14 +16,122 @@
 $Id$
 """
 
-import zope.interface
+from zope.interface import Interface, Attribute
 
-class IConnection(zope.interface.Interface):
-    """ZODB connection.
+class IConnection(Interface):
+    """Connection to ZODB for loading and storing objects.
 
-    TODO: This interface is incomplete.
+    The Connection object serves as a data manager.  The root() method
+    on a Connection returns the root object for the database.  This
+    object and all objects reachable from it are associated with the
+    Connection that loaded them.  When a transaction commits, it uses
+    the Connection to store modified objects.
+
+    Typical use of ZODB is for each thread to have its own
+    Connection and that no thread should have more than one Connection
+    to the same database.  A thread is associated with a Connection by
+    loading objects from that Connection.  Objects loaded by one
+    thread should not be used by another thread.
+
+    A Connection can be associated with a single version when it is
+    created.  By default, a Connection is not associated with a
+    version; it uses non-version data.
+
+    Each Connection provides an isolated, consistent view of the
+    database, by managing independent copies of objects in the
+    database.  At transaction boundaries, these copies are updated to
+    reflect the current state of the database.
+
+    You should not instantiate this class directly; instead call the
+    open() method of a DB instance.
+
+    In many applications, root() is the only method of the Connection
+    that you will need to use.
+
+    Synchronization
+    ---------------
+
+    A Connection instance is not thread-safe.  It is designed to
+    support a thread model where each thread has its own transaction.
+    If an application has more than one thread that uses the
+    connection or the transaction the connection is registered with,
+    the application should provide locking.
+
+    The Connection manages movement of objects in and out of object
+    storage.
+
+    TODO:  We should document an intended API for using a Connection via
+    multiple threads.
+
+    TODO:  We should explain that the Connection has a cache and that
+    multiple calls to get() will return a reference to the same
+    object, provided that one of the earlier objects is still
+    referenced.  Object identity is preserved within a connection, but
+    not across connections.
+
+    TODO:  Mention the database pool.
+
+    A database connection always presents a consistent view of the
+    objects in the database, although it may not always present the
+    most current revision of any particular object.  Modifications
+    made by concurrent transactions are not visible until the next
+    transaction boundary (abort or commit).
+
+    Two options affect consistency.  By default, the mvcc and synch
+    options are enabled by default.
+
+    If you pass mvcc=True to db.open(), the Connection will never read
+    non-current revisions of an object.  Instead it will raise a
+    ReadConflictError to indicate that the current revision is
+    unavailable because it was written after the current transaction
+    began.
+
+    The logic for handling modifications assumes that the thread that
+    opened a Connection (called db.open()) is the thread that will use
+    the Connection.  If this is not true, you should pass synch=False
+    to db.open().  When the synch option is disabled, some transaction
+    boundaries will be missed by the Connection; in particular, if a
+    transaction does not involve any modifications to objects loaded
+    from the Connection and synch is disabled, the Connection will
+    miss the transaction boundary.  Two examples of this behavior are
+    db.undo() and read-only transactions.
+
+    Groups of methods:
+
+        User Methods:
+            root, get, add, close, db, sync, isReadOnly, cacheGC, cacheFullSweep, 
+            cacheMinimize, getVersion, modifiedInVersion
+
+        Experimental Methods: 
+            onCloseCallbacks
+
+        Database Invalidation Methods:
+            invalidate
+
+        Other Methods: exchange, getDebugInfo, setDebugInfo, 
+            getTransferCounts
     """
 
+    def __init__(version='', cache_size=400,
+                 cache_deactivate_after=None, mvcc=True, txn_mgr=None,
+                 synch=True):
+        """Create a new Connection.
+
+        A Connection instance should by instantiated by the DB
+        instance that it is connected to.
+
+        Parameters:
+        version: the "version" that all changes will be made in, defaults 
+            to no version.
+        cache_size: the target size of the in-memory object cache, measured 
+            in objects.
+        mvcc: boolean indicating whether MVCC is enabled
+        txn_mgr: transaction manager to use. None means used the default 
+            transaction manager.
+        synch: boolean indicating whether Connection should register for 
+            afterCompletion() calls.
+        """
+
     def add(ob):
         """Add a new object 'obj' to the database and assign it an oid.
 
@@ -38,15 +146,330 @@
         The object is added when the transaction commits.  The object
         must implement the IPersistent interface and must not
         already be associated with a Connection.
+
+        Parameters:
+        obj: a Persistent object
+
+        Raises TypeError if obj is not a persistent object.
+
+        Raises InvalidObjectReference if obj is already associated with another
+        connection.
+
+        Raises ConnectionStateError if the connection is closed.
         """
 
-class IBlobStorage(zope.interface.Interface):
-    """A storage supporting BLOBs."""
+    def get(oid):
+        """Return the persistent object with oid 'oid'.
 
-    def storeBlob(oid, serial, data, blob, version, transaction):
-        """Stores data that has a BLOB attached."""
+        If the object was not in the cache and the object's class is
+        ghostable, then a ghost will be returned.  If the object is
+        already in the cache, a reference to the cached object will be
+        returned.
 
-    def loadBlob(oid, serial, version, blob):
-        """Loads the BLOB data for 'oid' into the given blob object.
+        Applications seldom need to call this method, because objects
+        are loaded transparently during attribute lookup.
+
+        Parameters:
+        oid: an object id
+
+        Raises KeyError if oid does not exist.  
+        
+            It is possible that an object does not exist as of the current
+            transaction, but existed in the past.  It may even exist again in
+            the future, if the transaction that removed it is undone.
+
+        Raises ConnectionStateError if the connection is closed.
         """
 
+    def cacheMinimize():
+        """Deactivate all unmodified objects in the cache.
+
+        Call _p_deactivate() on each cached object, attempting to turn
+        it into a ghost.  It is possible for individual objects to
+        remain active.
+        """
+
+    def cacheGC():
+        """Reduce cache size to target size.
+
+        Call _p_deactivate() on cached objects until the cache size
+        falls under the target size.
+        """
+
+    def onCloseCallback(f):
+        """Register a callable, f, to be called by close().
+
+        f will be called with no arguments before the Connection is closed.
+
+        Parameters:
+        f: method that will be called on `close`
+        """
+
+    def close():
+        """Close the Connection.
+
+        When the Connection is closed, all callbacks registered by
+        onCloseCallback() are invoked and the cache is garbage collected.
+
+        A closed Connection should not be used by client code.  It can't load
+        or store objects.  Objects in the cache are not freed, because
+        Connections are re-used and the cache is expected to be useful to the
+        next client.
+        """
+
+    def db():
+        """Returns a handle to the database this connection belongs to."""
+
+    def isReadOnly():
+        """Returns True if the storage for this connection is read only."""
+
+    def invalidate(tid, oids):
+        """Notify the Connection that transaction 'tid' invalidated oids.
+
+        When the next transaction boundary is reached, objects will be
+        invalidated.  If any of the invalidated objects are accessed by the
+        current transaction, the revision written before Connection.tid will be
+        used.
+
+        The DB calls this method, even when the Connection is closed.
+
+        Parameters:
+        tid: the storage-level id of the transaction that committed
+        oids: oids is a set of oids, represented as a dict with oids as keys.
+        """
+
+    def root():
+        """Return the database root object.
+
+        The root is a persistent.mapping.PersistentMapping.
+        """
+
+    def getVersion():
+        """Returns the version this connection is attached to."""
+
+    # Multi-database support.
+
+    connections = Attribute("""\
+        A mapping from database name to a Connection to that database.
+
+        In multi-database use, the Connections of all members of a database
+        collection share the same .connections object.
+
+        In single-database use, of course this mapping contains a single
+        entry.
+        """)
+
+    # TODO:  should this accept all the arguments one may pass to DB.open()?
+    def get_connection(database_name):
+        """Return a Connection for the named database.
+
+        This is intended to be called from an open Connection associated with
+        a multi-database.  In that case, database_name must be the name of a
+        database within the database collection (probably the name of a
+        different database than is associated with the calling Connection
+        instance, but it's fine to use the name of the calling Connection
+        object's database).  A Connection for the named database is
+        returned.  If no connection to that database is already open, a new
+        Connection is opened.  So long as the multi-database remains open,
+        passing the same name to get_connection() multiple times returns the
+        same Connection object each time.
+        """
+
+    def sync():
+        """Manually update the view on the database.
+
+        This includes aborting the current transaction, getting a fresh and
+        consistent view of the data (synchronizing with the storage if possible)
+        and call cacheGC() for this connection.
+        
+        This method was especially useful in ZODB 3.2 to better support
+        read-only connections that were affected by a couple of problems.  
+        """
+
+    # Debug information
+
+    def getDebugInfo():
+        """Returns a tuple with different items for debugging the connection. 
+
+        Debug information can be added to a connection by using setDebugInfo.
+        """
+
+    def setDebugInfo(*items):
+        """Add the given items to the debug information of this connection."""
+
+    def getTransferCounts(clear=False):
+        """Returns the number of objects loaded and stored.
+
+        If clear is True, reset the counters.
+        """
+
+class IDatabase(Interface):
+    """ZODB DB.
+
+    TODO: This interface is incomplete.
+    """
+
+    def __init__(storage,
+                 pool_size=7,
+                 cache_size=400,
+                 version_pool_size=3,
+                 version_cache_size=100,
+                 database_name='unnamed',
+                 databases=None,
+                 ):
+        """Create an object database.
+
+        storage: the storage used by the database, e.g. FileStorage
+        pool_size: expected maximum number of open connections
+        cache_size: target size of Connection object cache, in number of
+            objects
+        version_pool_size: expected maximum number of connections (per
+            version)
+        version_cache_size: target size of Connection object cache for
+             version connections, in number of objects
+        database_name: when using a multi-database, the name of this DB
+            within the database group.  It's a (detected) error if databases
+            is specified too and database_name is already a key in it.
+            This becomes the value of the DB's database_name attribute.
+        databases: when using a multi-database, a mapping to use as the
+            binding of this DB's .databases attribute.  It's intended
+            that the second and following DB's added to a multi-database
+            pass the .databases attribute set on the first DB added to the
+            collection.
+        """
+
+    databases = Attribute("""\
+        A mapping from database name to DB (database) object.
+
+        In multi-database use, all DB members of a database collection share
+        the same .databases object.
+
+        In single-database use, of course this mapping contains a single
+        entry.
+        """)
+
+class IStorage(Interface):
+    """A storage is responsible for storing and retrieving data of objects.
+    """
+
+    def load(oid, version):
+        """XXX"""
+
+    def close():
+        """XXX"""
+        
+    def cleanup():
+        """XXX"""
+        
+    def lastSerial():
+        """XXX"""
+        
+    def lastTransaction():
+        """XXX"""
+
+    def lastTid(oid):
+        """Return last serialno committed for object oid."""
+
+    def loadSerial(oid, serial):
+        """XXX"""
+        
+    def loadBefore(oid, tid):
+        """XXX"""
+        
+    def iterator(start=None, stop=None):
+        """XXX"""
+    
+    def sortKey():
+        """XXX"""
+
+    def getName():
+        """XXX"""
+        
+    def getSize():
+        """XXX"""
+
+    def history(oid, version, length=1, filter=None):
+        """XXX"""
+    
+    def new_oid(last=None):
+        """XXX"""
+        
+    def set_max_oid(possible_new_max_oid):
+        """XXX"""
+
+    def registerDB(db, limit):
+        """XXX"""
+    
+    def isReadOnly():
+        """XXX"""
+    
+    def supportsUndo():
+        """XXX"""
+    
+    def supportsVersions():
+        """XXX"""
+
+    def tpc_abort(transaction):
+        """XXX"""
+        
+    def tpc_begin(transaction):
+        """XXX"""
+
+    def tpc_vote(transaction):
+        """XXX"""
+
+    def tpc_finish(transaction, f=None):
+        """XXX"""
+
+    def getSerial(oid):
+        """XXX"""
+    
+    def loadSerial(oid, serial):
+        """XXX"""
+
+    def loadBefore(oid, tid):
+        """XXX"""
+
+    def getExtensionMethods():
+        """XXX"""
+
+    def copyTransactionsFrom():
+        """XXX"""
+
+    def store(oid, oldserial, data, version, transaction):
+        """
+
+        may return the new serial or not
+        """
+
+class IUndoableStorage(IStorage):
+
+    def undo(transaction_id, txn):
+        """XXX"""
+    
+    def undoInfo():
+        """XXX"""
+    
+    def undoLog(first, last, filter=None):
+        """XXX"""
+    
+    def pack(t, referencesf):
+        """XXX"""
+
+class IVersioningStorage(IStorage):
+
+    def abortVersion(src, transaction):
+        """XXX"""
+    
+    def commitVersion(src, dest, transaction):
+        """XXX"""
+    
+    def modifiedInVersion(oid):
+        """XXX"""
+    
+    def versionEmpty(version):
+        """XXX"""
+    
+    def versions(max=None):
+        """XXX"""
+

Added: ZODB/branches/ctheune-blobsupport/src/ZODB/tests/loggingsupport.py
===================================================================
--- ZODB/branches/ctheune-blobsupport/src/ZODB/tests/loggingsupport.py	2005-03-21 23:51:47 UTC (rev 29636)
+++ ZODB/branches/ctheune-blobsupport/src/ZODB/tests/loggingsupport.py	2005-03-22 04:16:03 UTC (rev 29637)
@@ -0,0 +1,121 @@
+##############################################################################
+#
+# Copyright (c) 2004 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""Support for testing logging code
+
+If you want to test that your code generates proper log output, you
+can create and install a handler that collects output:
+
+  >>> handler = InstalledHandler('foo.bar')
+
+The handler is installed into loggers for all of the names passed. In
+addition, the logger level is set to 1, which means, log
+everything. If you want to log less than everything, you can provide a
+level keyword argument.  The level setting effects only the named
+loggers.
+
+Then, any log output is collected in the handler:
+
+  >>> logging.getLogger('foo.bar').exception('eek')
+  >>> logging.getLogger('foo.bar').info('blah blah')
+
+  >>> for record in handler.records:
+  ...     print record.name, record.levelname
+  ...     print ' ', record.getMessage()
+  foo.bar ERROR
+    eek
+  foo.bar INFO
+    blah blah
+
+A similar effect can be gotten by just printing the handler:
+
+  >>> print handler
+  foo.bar ERROR
+    eek
+  foo.bar INFO
+    blah blah
+
+After checking the log output, you need to uninstall the handler:
+
+  >>> handler.uninstall()
+
+At which point, the handler won't get any more log output.
+Let's clear the handler:
+
+  >>> handler.clear()
+  >>> handler.records
+  []
+
+And then log something:
+
+  >>> logging.getLogger('foo.bar').info('blah')
+
+and, sure enough, we still have no output:
+
+  >>> handler.records
+  []
+
+$Id: loggingsupport.py 28349 2004-11-06 00:10:32Z tim_one $
+"""
+
+import logging
+
+class Handler(logging.Handler):
+
+    def __init__(self, *names, **kw):
+        logging.Handler.__init__(self)
+        self.names = names
+        self.records = []
+        self.setLoggerLevel(**kw)
+
+    def setLoggerLevel(self, level=1):
+        self.level = level
+        self.oldlevels = {}
+
+    def emit(self, record):
+        self.records.append(record)
+
+    def clear(self):
+        del self.records[:]
+
+    def install(self):
+        for name in self.names:
+            logger = logging.getLogger(name)
+            self.oldlevels[name] = logger.level
+            logger.setLevel(self.level)
+            logger.addHandler(self)
+
+    def uninstall(self):
+        for name in self.names:
+            logger = logging.getLogger(name)
+            logger.setLevel(self.oldlevels[name])
+            logger.removeHandler(self)
+
+    def __str__(self):
+        return '\n'.join(
+            [("%s %s\n  %s" %
+              (record.name, record.levelname,
+               '\n'.join([line
+                          for line in record.getMessage().split('\n')
+                          if line.strip()])
+               )
+              )
+              for record in self.records]
+              )
+
+
+class InstalledHandler(Handler):
+
+    def __init__(self, *names):
+        Handler.__init__(self, *names)
+        self.install()

Copied: ZODB/branches/ctheune-blobsupport/src/ZODB/tests/multidb.txt (from rev 29627, ZODB/trunk/src/ZODB/tests/multidb.txt)

Modified: ZODB/branches/ctheune-blobsupport/src/ZODB/tests/testConnection.py
===================================================================
--- ZODB/branches/ctheune-blobsupport/src/ZODB/tests/testConnection.py	2005-03-21 23:51:47 UTC (rev 29636)
+++ ZODB/branches/ctheune-blobsupport/src/ZODB/tests/testConnection.py	2005-03-22 04:16:03 UTC (rev 29637)
@@ -647,6 +647,8 @@
         self._storage = StubStorage()
 
     classFactory = None
+    database_name = 'stubdatabase'
+    databases = {'stubdatabase': database_name}
 
     def invalidate(self, transaction, dict_with_oid_keys, connection):
         pass

Modified: ZODB/branches/ctheune-blobsupport/src/ZODB/tests/test_doctest_files.py
===================================================================
--- ZODB/branches/ctheune-blobsupport/src/ZODB/tests/test_doctest_files.py	2005-03-21 23:51:47 UTC (rev 29636)
+++ ZODB/branches/ctheune-blobsupport/src/ZODB/tests/test_doctest_files.py	2005-03-22 04:16:03 UTC (rev 29637)
@@ -15,4 +15,6 @@
 from zope.testing.doctestunit import DocFileSuite
 
 def test_suite():
-    return DocFileSuite("dbopen.txt")
+    return DocFileSuite("dbopen.txt",
+                        "multidb.txt",
+                        )

Modified: ZODB/branches/ctheune-blobsupport/src/ZODB/utils.py
===================================================================
--- ZODB/branches/ctheune-blobsupport/src/ZODB/utils.py	2005-03-21 23:51:47 UTC (rev 29636)
+++ ZODB/branches/ctheune-blobsupport/src/ZODB/utils.py	2005-03-22 04:16:03 UTC (rev 29637)
@@ -21,6 +21,8 @@
 from cStringIO import StringIO
 import weakref
 import warnings
+from tempfile import mkstemp
+import os
 
 from persistent.TimeStamp import TimeStamp
 
@@ -305,3 +307,10 @@
         # We're cheating by breaking into the internals of Python's
         # WeakValueDictionary here (accessing its .data attribute).
         return self.data.data.values()
+
+
+def mktemp():
+    """Create a temp file, known by name, in a semi-secure manner."""
+    handle, filename = mkstemp()
+    os.close(handle)
+    return filename

Modified: ZODB/branches/ctheune-blobsupport/src/persistent/interfaces.py
===================================================================
--- ZODB/branches/ctheune-blobsupport/src/persistent/interfaces.py	2005-03-21 23:51:47 UTC (rev 29636)
+++ ZODB/branches/ctheune-blobsupport/src/persistent/interfaces.py	2005-03-22 04:16:03 UTC (rev 29637)
@@ -257,18 +257,35 @@
     def setstate(object):
         """Load the state for the given object.
 
-        The object should be in the ghost state.
-        The object's state will be set and the object will end up
-        in the saved state.
+        The object should be in the ghost state. The object's state will be
+        set and the object will end up in the saved state.
 
         The object must provide the IPersistent interface.
         """
 
+    def oldstate(obj, tid):
+        """Return copy of 'obj' that was written by transaction 'tid'.
+
+        The returned object does not have the typical metadata (_p_jar, _p_oid,
+        _p_serial) set. I'm not sure how references to other peristent objects
+        are handled.
+
+        Parameters
+        obj: a persistent object from this Connection.
+        tid: id of a transaction that wrote an earlier revision.
+
+        Raises KeyError if tid does not exist or if tid deleted a revision of 
+            obj. 
+        """
+
     def register(object):
         """Register an IPersistent with the current transaction.
 
         This method must be called when the object transitions to
         the changed state.
+
+        A subclass could override this method to customize the default
+        policy of one transaction manager for each thread.
         """
 
     def mtime(object):

Modified: ZODB/branches/ctheune-blobsupport/src/transaction/interfaces.py
===================================================================
--- ZODB/branches/ctheune-blobsupport/src/transaction/interfaces.py	2005-03-21 23:51:47 UTC (rev 29636)
+++ ZODB/branches/ctheune-blobsupport/src/transaction/interfaces.py	2005-03-22 04:16:03 UTC (rev 29637)
@@ -18,104 +18,7 @@
 
 import zope.interface
 
-class IResourceManager(zope.interface.Interface):
-    """Objects that manage resources transactionally.
-
-    These objects may manage data for other objects, or they may manage
-    non-object storages, such as relational databases.
-
-    IDataManagerOriginal is the interface currently provided by ZODB
-    database connections, but the intent is to move to the newer
-    IDataManager.
-    """
-
-    # Two-phase commit protocol.  These methods are called by the
-    # ITransaction object associated with the transaction being
-    # committed.
-
-    def tpc_begin(transaction):
-        """Begin two-phase commit, to save data changes.
-
-        An implementation should do as much work as possible without
-        making changes permanent.  Changes should be made permanent
-        when tpc_finish is called (or aborted if tpc_abort is called).
-        The work can be divided between tpc_begin() and tpc_vote(), and
-        the intent is that tpc_vote() be as fast as possible (to minimize
-        the period of uncertainty).
-
-        transaction is the ITransaction instance associated with the
-        transaction being committed.
-        """
-
-    def tpc_vote(transaction):
-        """Verify that a resource manager can commit the transaction.
-
-        This is the last chance for a resource manager to vote 'no'.  A
-        resource manager votes 'no' by raising an exception.
-
-        transaction is the ITransaction instance associated with the
-        transaction being committed.
-        """
-
-    def tpc_finish(transaction):
-        """Indicate confirmation that the transaction is done.
-
-        transaction is the ITransaction instance associated with the
-        transaction being committed.
-
-        This should never fail. If this raises an exception, the
-        database is not expected to maintain consistency; it's a
-        serious error.
-        """
-
-    def tpc_abort(transaction):
-        """Abort a transaction.
-
-        transaction is the ITransaction instance associated with the
-        transaction being committed.
-
-        All changes made by the current transaction are aborted.  Note
-        that this includes all changes stored in any savepoints that may
-        be associated with the current transaction.
-
-        tpc_abort() can be called at any time, either in or out of the
-        two-phase commit.
-
-        This should never fail.
-        """
-
-    # The savepoint/rollback API.
-
-    def savepoint(transaction):
-        """Save partial transaction changes.
-
-        There are two purposes:
-
-        1) To allow discarding partial changes without discarding all
-           dhanges.
-
-        2) To checkpoint changes to disk that would otherwise live in
-           memory for the duration of the transaction.
-
-        Returns an object implementing ISavePoint2 that can be used
-        to discard changes made since the savepoint was captured.
-
-        An implementation that doesn't support savepoints should implement
-        this method by returning a savepoint object that raises an
-        exception when its rollback method is called.  The savepoint method
-        shouldn't raise an error.  This way, transactions that create
-        savepoints can proceed as long as an attempt is never made to roll
-        back a savepoint.
-        """
-
-    def discard(transaction):
-        """Discard changes within the transaction since the last savepoint.
-
-        That means changes made since the last savepoint if one exists, or
-        since the start of the transaction.
-        """
-
-class IDataManagerOriginal(zope.interface.Interface):
+class IDataManager(zope.interface.Interface):
     """Objects that manage transactional storage.
 
     These objects may manage data for other objects, or they may manage
@@ -155,7 +58,7 @@
         has been called; this is only used when the transaction is
         being committed.
 
-        This call also implied the beginning of 2-phase commit.
+        This call also implies the beginning of 2-phase commit.
         """
 
     # Two-phase commit protocol.  These methods are called by the
@@ -180,10 +83,12 @@
 
         """
 
-
     def tpc_abort(transaction):
         """Abort a transaction.
 
+        This is called by a transaction manager to end a two-phase commit on
+        the data manager.
+
         This is always called after a tpc_begin call.
 
         transaction is the ITransaction instance associated with the
@@ -202,6 +107,11 @@
         database is not expected to maintain consistency; it's a
         serious error.
 
+        It's important that the storage calls the passed function 
+        while it still has its lock.  We don't want another thread
+        to be able to read any updated data until we've had a chance
+        to send an invalidation message to all of the other
+        connections!
         """
 
     def tpc_vote(transaction):
@@ -214,126 +124,47 @@
         transaction being committed.
         """
 
-    def commit(object, transaction):
-        """CCCommit changes to an object
+    def commit(transaction):
+        """Commit modifications to registered objects.
 
         Save the object as part of the data to be made persistent if
         the transaction commits.
-        """
 
-    def abort(object, transaction):
-        """Abort changes to an object
-
-        Only changes made since the last transaction or
-        sub-transaction boundary are discarded.
-
-        This method may be called either:
-
-        o Outside of two-phase commit, or
-
-        o In the first phase of two-phase commit
-
+        This includes conflict detection and handling. If no conflicts or
+        errors occur it saves the objects in the storage. 
         """
 
-    def sortKey():
-        """
-        Return a key to use for ordering registered DataManagers
-
-        ZODB uses a global sort order to prevent deadlock when it commits
-        transactions involving multiple resource managers.  The resource
-        manager must define a sortKey() method that provides a global ordering
-        for resource managers.
-        """
-
-class IDataManager(zope.interface.Interface):
-    """Data management interface for storing objects transactionally.
-
-    ZODB database connections currently provides the older
-    IDataManagerOriginal interface, but the intent is to move to this newer
-    IDataManager interface.
-
-    Our hope is that this interface will evolve and become the standard
-    interface.  There are some issues to be resolved first, like:
-
-    - Probably want separate abort methods for use in and out of
-      two-phase commit.
-
-    - The savepoint api may need some more thought.
-
-    """
-
-    def prepare(transaction):
-        """Perform the first phase of a 2-phase commit
-
-        The data manager prepares for commit any changes to be made
-        persistent.  A normal return from this method indicated that
-        the data manager is ready to commit the changes.
-
-        The data manager must raise an exception if it is not prepared
-        to commit the transaction after executing prepare().
-
-        The transaction must match that used for preceeding
-        savepoints, if any.
-        """
-
-        # This is equivalent to zodb3's tpc_begin, commit, and
-        # tpc_vote combined.
-
     def abort(transaction):
-        """Abort changes made by transaction
+        """Abort a transaction and forget all changes.
 
-        This may be called before two-phase commit or in the second
-        phase of two-phase commit.
+        Abort must be called outside of a two-phase commit.
 
-        The transaction must match that used for preceeding
-        savepoints, if any.
-
+        Abort is called by the transaction manager to abort transactions 
+        that are not yet in a two-phase commit. 
         """
 
-        # This is equivalent to *both* zodb3's abort and tpc_abort
-        # calls. This should probably be split into 2 methods.
-
-    def commit(transaction):
-        """Finish two-phase commit
-
-        The prepare method must be called, with the same transaction,
-        before calling commit.
-
-        """
-
-        # This is equivalent to zodb3's tpc_finish
-
-    def savepoint(transaction):
-        """Do tentative commit of changes to this point.
-
-        Should return an object implementing IRollback that can be used
-        to rollback to the savepoint.
-
-        Note that (unlike zodb3) this doesn't use a 2-phase commit
-        protocol.  If this call fails, or if a rollback call on the
-        result fails, the (containing) transaction should be
-        aborted.  Aborting the containing transaction is *not* the
-        responsibility of the data manager, however.
-
-        An implementation that doesn't support savepoints should
-        implement this method by returning a rollback implementation
-        that always raises an error when it's rollback method is
-        called. The savepoing method shouldn't raise an error. This
-        way, transactions that create savepoints can proceed as long
-        as an attempt is never made to roll back a savepoint.
-
-        """
-
     def sortKey():
-        """
-        Return a key to use for ordering registered DataManagers
+        """Return a key to use for ordering registered DataManagers
 
         ZODB uses a global sort order to prevent deadlock when it commits
         transactions involving multiple resource managers.  The resource
         manager must define a sortKey() method that provides a global ordering
         for resource managers.
         """
+        # XXX: Alternate version:
+        #"""Return a consistent sort key for this connection.
+        #
+        #This allows ordering multiple connections that use the same storage in
+        #a consistent manner. This is unique for the lifetime of a connection,
+        #which is good enough to avoid ZEO deadlocks.
+        #"""
 
+    def beforeCompletion(transaction):
+        """Hook that is called by the transaction before completing a commit"""
+
+    def afterCompletion(transaction):
+        """Hook that is called by the transaction after completing a commit"""
+
 class ITransaction(zope.interface.Interface):
     """Object representing a running transaction.
 
@@ -414,35 +245,7 @@
         # Unsure:  is this allowed to cause an exception here, during
         # the two-phase commit, or can it toss data silently?
 
-class ISavePoint(zope.interface.Interface):
-    """ISavePoint objects represent partial transaction changes.
 
-    Sequences of savepoint objects are associated with transactions,
-    and with IResourceManagers.
-    """
-
-    def rollback():
-        """Discard changes made after this savepoint.
-
-        This includes discarding (call the discard method on) all
-        subsequent savepoints.
-        """
-
-    def discard():
-        """Discard changes saved by this savepoint.
-
-        That means changes made since the immediately preceding
-        savepoint if one exists, or since the start of the transaction,
-        until this savepoint.
-
-        Once a savepoint has been discarded, it's an error to attempt
-        to rollback or discard it again.
-        """
-
-    next_savepoint = zope.interface.Attribute(
-        """The next savepoint (later in time), or None if self is the
-           most recent savepoint.""")
-
 class IRollback(zope.interface.Interface):
 
     def rollback():
@@ -457,3 +260,4 @@
 
         - The transaction has ended.
         """
+

Copied: ZODB/branches/ctheune-blobsupport/src/zope/proxy (from rev 29627, ZODB/trunk/src/zope/proxy)


Property changes on: ZODB/branches/ctheune-blobsupport/src/zope/proxy
___________________________________________________________________
Name: svn:ignore
   + *so




More information about the Zodb-checkins mailing list