[Zope3-checkins] CVS: Zope3/src/zodb - connection.py:1.9.2.1

Jeremy Hylton jeremy@zope.com
Sat, 1 Mar 2003 21:36:57 -0500


Update of /cvs-repository/Zope3/src/zodb
In directory cvs.zope.org:/tmp/cvs-serv19408/src/zodb

Modified Files:
      Tag: jeremy-atomic-invalidation-branch
	connection.py 
Log Message:
New invalidation API and better handling of read conflicts.

The begin_ and finish_invalidation() methods are no longer needed.
Change invalidate() to take a set of oids instead of just one.

If a read conflict is raised, add the oid to connections' _conflicts
variable.  If a commit() occurs for a txn that got a read conflict,
raise ReadConflictError again.


=== Zope3/src/zodb/connection.py 1.9 => 1.9.2.1 ===
--- Zope3/src/zodb/connection.py:1.9	Thu Feb 27 15:16:45 2003
+++ Zope3/src/zodb/connection.py	Sat Mar  1 21:36:54 2003
@@ -55,7 +55,7 @@
 from zodb import interfaces
 from zodb.conflict import ResolvedSerial
 from zodb.export import ExportImport
-from zodb.interfaces import IConnection, ConflictError, IAppConnection
+from zodb.interfaces import *
 from zodb.serialize import ConnectionObjectReader, ObjectWriter
 from zodb.utils import p64, u64, Set, z64
 
@@ -102,6 +102,11 @@
 
         self._modified = Set()
         self._created = Set()
+        # _conflicts: set of objects that failed to load because
+        # of read conflicts.  We must track these explicitly
+        # because they occur outside the two-phase commit and
+        # we must not allow the transaction they occur in to commit.
+        self._conflicts = Set()
 
         # new_oid is used by serialize
         self.newObjectId = self._storage.newObjectId
@@ -152,11 +157,11 @@
     # IPersistentDataManager requires the next three methods:
     # setstate(), register(), mtime()
 
-    def setstate(self, object):
+    def setstate(self, obj):
         oid = None
         # XXX Is it possible to reorganize the method-level try/except?
         try:
-            oid = object._p_oid
+            oid = obj._p_oid
 
             # XXX this is quite conservative!
 
@@ -170,19 +175,20 @@
             p, serial = self._storage.load(oid, self._version)
 
             if oid in self._invalidated:
-                if not (hasattr(object, '_p_independent')
-                        and object._p_independent()):
+                if not (hasattr(obj, '_p_independent')
+                        and obj._p_independent()):
                     get_transaction().join(self)
-                    raise ConflictError(object=object)
+                    self._conflicts.add(obj._p_oid)
+                    raise ReadConflictError(object=obj)
                 invalid = 1
             else:
                 invalid = 0
 
-            self._reader.setGhostState(object, p)
-            object._p_serial = serial
+            self._reader.setGhostState(obj, p)
+            obj._p_serial = serial
 
             if invalid:
-                if object._p_independent():
+                if obj._p_independent():
                     self._inv_lock.acquire()
                     try:
                         try:
@@ -193,7 +199,7 @@
                         self._inv_lock.release()
                 else:
                     get_transaction().join(self)
-                    raise ConflictError(object=object)
+                    raise ConflictError(object=obj)
 
         except ConflictError:
             raise
@@ -202,7 +208,7 @@
             raise
         else:
             # Add the object to the cache active list
-            self._cache.setstate(oid, object)
+            self._cache.setstate(oid, obj)
 
     def register(self, object):
         txn = get_transaction()
@@ -239,17 +245,16 @@
     def cacheGC(self):
         self._cache.incrgc()
 
-    def invalidate(self, oid):
+    def invalidate(self, oids):
         """Invalidate a particular oid
 
         This marks the oid as invalid, but doesn't actually invalidate
         it.  The object data will be actually invalidated at certain
         transaction boundaries.
         """
-        assert oid is not None
         self._inv_lock.acquire()
         try:
-            self._invalidated.add(oid)
+            self._invalidated.update(oids)
         finally:
             self._inv_lock.release()
 
@@ -264,6 +269,12 @@
     # prepare(), abort(), commit(), savepoint()
 
     def prepare(self, txn):
+        if self._conflicts:
+            # XXX should raise all of the conflicting oids, but
+            # haven't gotten around to changing the exception
+            # to store them.
+            oid = list(self._conflicts)[0]
+            raise ReadConflictError(oid)
         self._modified.clear()
         self._created.clear()
         if self._tmp is not None:
@@ -295,24 +306,24 @@
         self._invalidate_created(self._created)
         self._created = Set()
         self._modified.clear()
+        self._conflicts.clear()
 
     def commit(self, txn):
         # It's important that the storage call the function we pass
-        # (self._invalidate_modified) while it still has its
-        # lock.  We don't want another thread to be able to read any
+        # (self._invalidate_modified) while it still has its lock.
+        # We don't want another thread to be able to read any
         # updated data until we've had a chance to send an
         # invalidation message to all of the other connections!
 
-        self._db.begin_invalidation()
-        # XXX We should really have a try/finally because the begin
-        # call acquired a lock that will only be released in
-        # _invalidate_modified().
+        # If another thread could read the newly committed data
+        # before the invalidation is delivered, the connection would
+        # not be able to detect a read conflict.
         self._storage.tpcFinish(txn, self._invalidate_modified)
         try:
             del self._txns[txn]
         except KeyError:
             pass
-
+        self._conflicts.clear()
         self._flush_invalidations()
 
     def savepoint(self, txn):
@@ -335,6 +346,18 @@
         self._created = Set()
         return Rollback(self, undo)
 
+    def _invalidate_created(self, created):
+        # Dis-own new objects from uncommitted transaction.
+        for oid in created:
+            o = self._cache.get(oid)
+            if o is not None:
+                del o._p_jar
+                del o._p_oid
+                del self._cache[oid]
+
+    def _invalidate_modified(self):
+        self._db.invalidate(self._modified, self)
+
     def _flush_invalidations(self):
         self._inv_lock.acquire()
         try:
@@ -457,24 +480,6 @@
 
         self._cache.invalidateMany(tmp._index)
         self._invalidate_created(tmp._created)
-
-    def _invalidate_created(self, created):
-        # Dis-own new objects from uncommitted transaction.
-        for oid in created:
-            o = self._cache.get(oid)
-            if o is not None:
-                del o._p_jar
-                del o._p_oid
-                del self._cache[oid]
-
-    def _invalidate_modified(self):
-        # Called from the storage's tpc_finish() method after
-        # self._db.begin_invalidation() is called.  The begin_
-        # and finish_invalidation() methods acquire and release
-        # a lock.
-        for oid in self._modified:
-            self._db.invalidate(oid, self)
-        self._db.finish_invalidation()
 
 class Rollback:
     """Rollback changes associated with savepoint"""