[Zope3-checkins] CVS: Zope3/src/zodb - utils.py:1.3 interfaces.py:1.11 db.py:1.10 connection.py:1.11

Jeremy Hylton jeremy@zope.com
Wed, 5 Mar 2003 17:14:31 -0500


Update of /cvs-repository/Zope3/src/zodb
In directory cvs.zope.org:/tmp/cvs-serv18303/zodb

Modified Files:
	utils.py interfaces.py db.py connection.py 
Log Message:
Merge jeremy-atomic-invalidation-branch.

Change invalidation() APIs to pass a set of oids rather than an
individual oid.  All the oids in one call must be processed
atomically.


=== Zope3/src/zodb/utils.py 1.2 => 1.3 ===
--- Zope3/src/zodb/utils.py:1.2	Wed Dec 25 09:12:16 2002
+++ Zope3/src/zodb/utils.py	Wed Mar  5 17:14:30 2003
@@ -45,10 +45,21 @@
     # This must be Python 2.2, which doesn't have a standard sets module.
     # ZODB needs only a very limited subset of the Set API.
     class Set(dict):
+        def __init__(self, arg=None):
+            if arg:
+                if isinstance(arg, dict):
+                    self.update(arg)
+                else:
+                    # XXX the proper sets version is much more robust
+                    for o in arg:
+                        self[o] = 1
         def add(self, o):
             self[o] = 1
+        def remove(self, o):
+            del self[o]
         def __ior__(self, other):
             if not isinstance(other, Set):
                 return NotImplemented
             self.update(other)
             return self
+


=== Zope3/src/zodb/interfaces.py 1.10 => 1.11 ===
--- Zope3/src/zodb/interfaces.py:1.10	Thu Feb 27 15:11:47 2003
+++ Zope3/src/zodb/interfaces.py	Wed Mar  5 17:14:30 2003
@@ -256,22 +256,19 @@
         If there is a current transaction, it will be aborted.
         """
 
-class IDatabase(Interface):
-    """Interface between the database and its connections."""
+    def get(oid):
+        """Return object for `oid`.
 
-    # XXX This interface needs to be redesigned, so I'm not going to
-    # bother documenting the current interface.
-
-    def begin_invalidation():
-        pass
+        The object may be a ghost.
+        """
 
-    def invalidate(oid, conn):
-        pass
+class IDatabase(Interface):
+    """Interface between the database and its connections."""
 
-    def finish_invalidation():
+    def invalidate(oids, conn=None, version=""):
         pass
 
-    def _closeConnection():
+    def connectionClose(conn):
         pass
 
 


=== Zope3/src/zodb/db.py 1.9 => 1.10 ===
--- Zope3/src/zodb/db.py:1.9	Thu Feb 27 15:16:45 2003
+++ Zope3/src/zodb/db.py	Wed Mar  5 17:14:30 2003
@@ -28,7 +28,7 @@
 from zodb.connection import Connection
 from zodb.serialize import getDBRoot
 from zodb.ztransaction import Transaction
-from zodb.utils import z64
+from zodb.utils import z64, Set
 
 from transaction import get_transaction
 from transaction.interfaces import IDataManager
@@ -55,10 +55,12 @@
 
         self.log = logging.getLogger("zodb")
 
-        # Allocate locks:
-        l=Lock()
-        self._a=l.acquire
-        self._r=l.release
+        # The lock protects access to the pool data structures.
+        # Store the lock acquire and release methods as methods
+        # of the instance.
+        l = Lock()
+        self._a = l.acquire
+        self._r = l.release
 
         # Setup connection pools and cache info
         # _pool is currently available (closed) connections
@@ -148,17 +150,7 @@
     def getPoolSize(self):
         return self._pool_size
 
-    def begin_invalidation(self):
-        # Must be called before first call to invalidate and before
-        # the storage lock is held.
-        self._a()
-
-    def finish_invalidation(self):
-        # Must be called after begin_invalidation() and after final
-        # invalidate() call.
-        self._r()
-
-    def invalidate(self, oid, connection=None, version=''):
+    def invalidate(self, oids, connection=None, version=''):
         """Invalidate references to a given oid.
 
         This is used to indicate that one of the connections has committed a
@@ -166,7 +158,6 @@
         passed in to prevent useless (but harmless) messages to the
         connection.
         """
-        assert oid is not None
         if connection is not None:
             assert version == connection._version
             version = connection._version
@@ -174,18 +165,18 @@
         # Notify connections
         for cc in self._allocated:
             if cc is not connection:
-                self.invalidateConnection(cc, oid, version)
+                self.invalidateConnection(cc, oids, version)
 
         if self._temps:
             # t accumulates all the connections that aren't closed.
             t = []
             for cc in self._temps:
                 if cc is not connection:
-                    self.invalidateConnection(cc, oid, version,
+                    self.invalidateConnection(cc, oids, version,
                                               t.append)
             self._temps = t
 
-    def invalidateConnection(self, conn, oid, version, alive=None):
+    def invalidateConnection(self, conn, oids, version, alive=None):
         """Send invalidation message to conn for oid on version.
 
         If the modification occurred on a version, an invalidation is
@@ -205,7 +196,7 @@
             if alive is not None:
                 alive(conn)
         if not version or conn.getVersion() == version:
-            conn.invalidate(oid)
+            conn.invalidate(oids)
 
     def open(self, version='', transaction=None, temporary=0, force=None,
              waitflag=1):
@@ -369,18 +360,16 @@
         self._dest = dest
 
     def _prepare(self, txn):
-        self._oids = self._storage.commitVersion(self._version, self._dest,
-                                                 txn)
+        self._oids = Set(self._storage.commitVersion(self._version, self._dest,
+                                                     txn))
 
     def commit(self, txn):
         super(CommitVersion, self).commit(txn)
-        for oid in self._oids:
-            self._db.invalidate(oid, version=self._dest)
+        self._db.invalidate(self._oids, version=self._dest)
         if self._dest:
             # the code above just invalidated the dest version.
             # now we need to invalidate the source!
-            for oid in self._oids:
-                self._db.invalidate(oid, version=self._version)
+            self._db.invalidate(self._oids, version=self._version)
 
 class AbortVersion(SimpleDataManager):
     """An object that will see to version abortion."""
@@ -390,12 +379,11 @@
         self._version = version
 
     def _prepare(self, txn):
-        self._oids = self._storage.abortVersion(self._version, txn)
+        self._oids = Set(self._storage.abortVersion(self._version, txn))
 
     def commit(self, txn):
         super(AbortVersion, self).commit(txn)
-        for oid in self._oids:
-            self._db.invalidate(oid, version=self._version)
+        self._db.invalidate(self._oids, version=self._version)
 
 class TransactionalUndo(SimpleDataManager):
     """An object that will see to transactional undo."""
@@ -405,9 +393,8 @@
         self._tid = tid
 
     def _prepare(self, txn):
-        self._oids = self._storage.undo(self._tid, txn)
+        self._oids = Set(self._storage.undo(self._tid, txn))
 
     def commit(self, txn):
         super(TransactionalUndo, self).commit(txn)
-        for oid in self._oids:
-            self._db.invalidate(oid)
+        self._db.invalidate(self._oids)


=== Zope3/src/zodb/connection.py 1.10 => 1.11 ===
--- Zope3/src/zodb/connection.py:1.10	Wed Mar  5 15:24:38 2003
+++ Zope3/src/zodb/connection.py	Wed Mar  5 17:14:30 2003
@@ -49,7 +49,7 @@
 from zodb import interfaces
 from zodb.conflict import ResolvedSerial
 from zodb.export import ExportImport
-from zodb.interfaces import IConnection, ConflictError, IAppConnection
+from zodb.interfaces import *
 from zodb.serialize import ConnectionObjectReader, ObjectWriter
 from zodb.utils import p64, u64, Set, z64
 
@@ -88,7 +88,16 @@
 
         # _invalidated queues invalidate messages delivered from the DB
         # _inv_lock prevents one thread from modifying the set while
-        # another is processing invalidations
+        # another is processing invalidations.  All the invalidations
+        # from a single transaction should be applied atomically, so
+        # the lock must be held when reading _invalidated.
+
+        # XXX It sucks that we have to hold the lock to read
+        # _invalidated.  Normally, _invalidated is written by call
+        # dict.update, which will execute atomically by virtue of the
+        # GIL.  But some storage might generate oids where hash or
+        # compare invokes Python code.  In that case, the GIL can't
+        # save us.
         self._inv_lock = threading.Lock()
         self._invalidated = Set()
         self._committed = []
@@ -100,6 +109,11 @@
         self._registered = Set()
         self._modified = Set() # XXX is this the same as registered?
         self._created = Set()
+        # _conflicts: set of objects that failed to load because
+        # of read conflicts.  We must track these explicitly
+        # because they occur outside the two-phase commit and
+        # we must not allow the transaction they occur in to commit.
+        self._conflicts = Set()
 
         # new_oid is used by serialize
         self.newObjectId = self._storage.newObjectId
@@ -153,13 +167,10 @@
     # setstate(), register(), mtime()
 
     def setstate(self, obj):
+        # extremely paranoid: guard against obj not having an _p_oid.
         oid = None
-        # XXX Is it possible to reorganize the method-level try/except?
         try:
             oid = obj._p_oid
-
-            # XXX this is quite conservative!
-
             # Avoid reading data from a transaction that committed
             # after the current transaction started, as that might
             # lead to mixing of cached data from earlier transactions
@@ -168,33 +179,11 @@
             # Wait for check until after data is loaded from storage
             # to avoid time-of-check to time-of-use race.
             p, serial = self._storage.load(oid, self._version)
-
-            if oid in self._invalidated:
-                if not (hasattr(obj, '_p_independent')
-                        and obj._p_independent()):
-                    self._get_transaction().join(self)
-                    raise ConflictError(object=obj)
-                invalid = 1
-            else:
-                invalid = 0
-
+            invalid = self._is_invalidated(obj)
             self._reader.setGhostState(obj, p)
             obj._p_serial = serial
-
             if invalid:
-                if obj._p_independent():
-                    self._inv_lock.acquire()
-                    try:
-                        try:
-                            del self._invalidated[oid]
-                        except KeyError:
-                            pass
-                    finally:
-                        self._inv_lock.release()
-                else:
-                    self._get_transaction().join(self)
-                    raise ConflictError(object=obj)
-
+                self._handle_independent(obj)
         except ConflictError:
             raise
         except:
@@ -204,6 +193,44 @@
             # Add the object to the cache active list
             self._cache.setstate(oid, obj)
 
+    def _is_invalidated(self, obj):
+        # Helper method for setstate() covers three cases:
+        # returns false if obj is valid
+        # returns true if obj was invalidation, but is independent
+        # otherwise, raises ConflictError for invalidated objects
+        self._inv_lock.acquire()
+        try:
+            if obj._p_oid in self._invalidated:
+                # Defer _p_independent() call until state is loaded.
+                if hasattr(obj, "_p_independent"):
+                    return True
+                else:
+                    self._get_transaction().join(self)
+                    self._conflicts.add(obj._p_oid)
+                    raise ReadConflictError(object=obj)
+            else:
+                return False
+        finally:
+            self._inv_lock.release()
+
+    def _handle_independent(self, obj):
+        # Helper method for setstate() handles possibly independent objects
+        # Call _p_independent(), if it returns True, setstate() wins.
+        # Otherwise, raise a ConflictError.
+        
+        if obj._p_independent():
+            self._inv_lock.acquire()
+            try:
+                try:
+                    self._invalidated.remove(obj._p_oid)
+                except KeyError:
+                    pass
+            finally:
+                self._inv_lock.release()
+        else:
+            self._get_transaction().join(self)
+            raise ReadConflictError(object=obj)
+
     def register(self, obj):
         if not self._registered:
             self._get_transaction().join(self)
@@ -214,7 +241,7 @@
         return None
 
     ######################################################################
-    # IConnection requires the next three methods:
+    # IConnection requires the next five methods:
     # getVersion(), reset(), cacheGC(), invalidate(), close()
 
     def getVersion(self):
@@ -228,26 +255,28 @@
             # version.
             self._cache.clear()
             self._version = version
-        # This method doesn't acquire the lock, so it shouldn't be called
-        # when the DB is delivering invalidations.
-        self._cache.invalidateMany(self._invalidated)
-        self._invalidated.clear()
+        self._inv_lock.acquire()
+        try:
+            self._cache.invalidateMany(self._invalidated)
+            self._invalidated.clear()
+        finally:
+            self._inv_lock.release()
         self._open = True
 
     def cacheGC(self):
         self._cache.incrgc()
 
-    def invalidate(self, oid):
+    def invalidate(self, oids):
         """Invalidate a particular oid
 
         This marks the oid as invalid, but doesn't actually invalidate
         it.  The object data will be actually invalidated at certain
         transaction boundaries.
         """
-        assert oid is not None
+
         self._inv_lock.acquire()
         try:
-            self._invalidated.add(oid)
+            self._invalidated.update(oids)
         finally:
             self._inv_lock.release()
 
@@ -271,6 +300,12 @@
     # prepare(), abort(), commit(), savepoint()
 
     def prepare(self, txn):
+        if self._conflicts:
+            # XXX should raise all of the conflicting oids, but
+            # haven't gotten around to changing the exception
+            # to store them.
+            oid = list(self._conflicts)[0]
+            raise ReadConflictError(oid)
         self._modified.clear()
         self._created.clear()
         if self._tmp is not None:
@@ -303,20 +338,21 @@
         self._flush_invalidations()
         self._created.clear()
         self._modified.clear()
+        self._conflicts.clear()
 
     def commit(self, txn):
         # It's important that the storage call the function we pass
-        # (self._invalidate_modified) while it still has its
-        # lock.  We don't want another thread to be able to read any
+        # (self._invalidate_modified) while it still has its lock.
+        # We don't want another thread to be able to read any
         # updated data until we've had a chance to send an
         # invalidation message to all of the other connections!
 
-        self._db.begin_invalidation()
-        # XXX We should really have a try/finally because the begin
-        # call acquired a lock that will only be released in
-        # _invalidate_modified().
+        # If another thread could read the newly committed data
+        # before the invalidation is delivered, the connection would
+        # not be able to detect a read conflict.
         self._storage.tpcFinish(txn, self._invalidate_modified)
         self._txn = None
+        self._conflicts.clear()
         self._flush_invalidations()
         self._registered.clear()
         self._created.clear()
@@ -342,6 +378,18 @@
         self._created = Set()
         return Rollback(self, undo)
 
+    def _invalidate_created(self, created):
+        # Dis-own new objects from uncommitted transaction.
+        for oid in created:
+            o = self._cache.get(oid)
+            if o is not None:
+                del o._p_jar
+                del o._p_oid
+                del self._cache[oid]
+
+    def _invalidate_modified(self):
+        self._db.invalidate(self._modified, self)
+
     def _flush_invalidations(self):
         self._inv_lock.acquire()
         try:
@@ -404,9 +452,6 @@
             object._p_oid = oid
             self._created.add(oid)
         elif object._p_changed:
-            if (oid in self._invalidated and
-                not hasattr(object, '_p_resolveConflict')):
-                raise ConflictError(object=object)
             self._modified.add(oid)
         else:
             return # Nothing to do
@@ -421,10 +466,13 @@
         if serial is None:
             self._created.add(oid)
         else:
-            # XXX this seems to duplicate code on objcommit()
-            if (oid in self._invalidated and
-                not hasattr(pobject, '_p_resolveConflict')):
-                raise ConflictError(oid=oid)
+            self._inv_lock.acquire()
+            try:
+                if (oid in self._invalidated and
+                    not hasattr(pobject, '_p_resolveConflict')):
+                    raise ConflictError(oid=oid)
+            finally:
+                self._inv_lock.release()
             self._modified.add(oid)
 
         s = self._storage.store(oid, serial, writer.getState(pobject),
@@ -464,24 +512,6 @@
 
         self._cache.invalidateMany(tmp._index)
         self._invalidate_created(tmp._created)
-
-    def _invalidate_created(self, created):
-        # Dis-own new objects from uncommitted transaction.
-        for oid in created:
-            o = self._cache.get(oid)
-            if o is not None:
-                del o._p_jar
-                del o._p_oid
-                del self._cache[oid]
-
-    def _invalidate_modified(self):
-        # Called from the storage's tpc_finish() method after
-        # self._db.begin_invalidation() is called.  The begin_
-        # and finish_invalidation() methods acquire and release
-        # a lock.
-        for oid in self._modified:
-            self._db.invalidate(oid, self)
-        self._db.finish_invalidation()
 
 class Rollback:
     """Rollback changes associated with savepoint"""