[Zope-Checkins] CVS: Zope/lib/python/ZODB - Connection.py:1.149 ExportImport.py:1.24

Tim Peters tim.one at comcast.net
Thu Apr 15 21:08:43 EDT 2004


Update of /cvs-repository/Zope/lib/python/ZODB
In directory cvs.zope.org:/tmp/cvs-serv10213/lib/python/ZODB

Modified Files:
	Connection.py ExportImport.py 
Log Message:
Finally changed Connection to use the new transaction join API.  This
essentially means a connection keep track of which objects from
the connection are modified, instead of transactions keeping track of that.
A good consequence hasn't yet been implemented:  if a connection is
closed with modifications still pending, we can detect that efficiently
now, and complain.


=== Zope/lib/python/ZODB/Connection.py 1.148 => 1.149 ===
--- Zope/lib/python/ZODB/Connection.py:1.148	Thu Apr 15 12:22:38 2004
+++ Zope/lib/python/ZODB/Connection.py	Thu Apr 15 21:08:12 2004
@@ -158,7 +158,7 @@
 
         A Connection instance should by instantiated by the DB
         instance that it is connected to.
-        
+
         :Parameters:
           - `version`: the "version" that all changes will be made
              in, defaults to no version.
@@ -192,8 +192,17 @@
         self._reset_counter = global_reset_counter
         self._load_count = 0   # Number of objects unghosted
         self._store_count = 0  # Number of objects stored
+
+        # List of oids of modified objects (to be invalidated on an abort).
         self._modified = []
 
+        # List of all objects (not oids) registered as modified by the
+        # persistence machinery.
+        self._registered_objects = []
+
+        # Do we need to join a txn manager?
+        self._needs_to_join = True
+
         # If a transaction manager is passed to the constructor, use
         # it instead of the global transaction manager.  The instance
         # variable will hold a TM instance.
@@ -373,7 +382,7 @@
             obj._p_jar = self
             if self._added_during_commit is not None:
                 self._added_during_commit.append(obj)
-            self._txn_mgr.get().register(obj)
+            self._register(obj)
             # Add to _added after calling register(), so that _added
             # can be used as a test for whether the object has been
             # registered with the transaction.
@@ -440,22 +449,23 @@
         cache_size = self._cache.cache_size
         self._cache = cache = PickleCache(self, cache_size)
 
-    def abort(self, object, transaction):
+    def abort(self, transaction):
         """Abort the object in the transaction.
 
         This just deactivates the thing.
         """
-        if object is self:
-            self._flush_invalidations()
-        else:
-            oid = object._p_oid
+
+        for obj in self._registered_objects:
+            oid = obj._p_oid
             assert oid is not None
             if oid in self._added:
                 del self._added[oid]
-                del object._p_jar
-                del object._p_oid
+                del obj._p_jar
+                del obj._p_oid
             else:
-                self._cache.invalidate(object._p_oid)
+                self._cache.invalidate(oid)
+
+        self._tpc_cleanup()
 
     # XXX should there be a way to call incrgc directly?
     # perhaps "full sweep" should do that?
@@ -546,37 +556,35 @@
             # assert that here, because self may have been reused (by
             # another thread) by the time we get back here.
 
-    def commit(self, obj, transaction):
-        if obj is self:
-            # We registered ourself.  Execute a commit action, if any.
-            if self._import:
-                self._importDuringCommit(transaction, *self._import)
-                self._import = None
-            return
-
-        oid = obj._p_oid
-        if oid in self._conflicts:
-            raise ReadConflictError(object=obj)
+    def commit(self, transaction):
+        if self._import:
+            # XXX eh?
+            self._importDuringCommit(transaction, *self._import)
+            self._import = None
 
-        if oid is None or obj._p_jar is not self:
-            # new object
-            oid = self.new_oid()
-            obj._p_jar = self
-            obj._p_oid = oid
-            assert obj._p_serial == z64
-        elif oid in self._added:
-            assert obj._p_serial == z64
-        elif obj._p_changed:
-            if oid in self._invalidated:
-                resolve = getattr(obj, "_p_resolveConflict", None)
-                if resolve is None:
-                    raise ConflictError(object=obj)
-            self._modified.append(oid)
-        else:
-            # Nothing to do
-            return
+        for obj in self._registered_objects:
+            oid = obj._p_oid
+            assert oid
+            if oid in self._conflicts:
+                raise ReadConflictError(object=obj)
+
+            if obj._p_jar is not self:
+                raise InvalidObjectReference(obj, obj._p_jar)
+            elif oid in self._added:
+                assert obj._p_serial == z64
+            elif obj._p_changed:
+                if oid in self._invalidated:
+                    resolve = getattr(obj, "_p_resolveConflict", None)
+                    if resolve is None:
+                        raise ConflictError(object=obj)
+                self._modified.append(oid)
+            else:
+                # Nothing to do.  It's been said that it's legal, e.g., for
+                # an object to set _p_changed to false after it's been
+                # changed and registered.
+                continue
 
-        self._store_objects(ObjectWriter(obj), transaction)
+            self._store_objects(ObjectWriter(obj), transaction)
 
     def _store_objects(self, writer, transaction):
         self._added_during_commit = []
@@ -626,8 +634,8 @@
         self._storage.tpc_begin(t)
 
         # Copy invalidating and creating info from temporary storage:
-        self._modified[len(self._modified):] = oids
-        self._creating[len(self._creating):] = src._creating
+        self._modified.extend(oids)
+        self._creating.extend(src._creating)
 
         for oid in oids:
             data, serial = src.load(oid, src)
@@ -745,7 +753,14 @@
         elif obj._p_oid in self._added:
             # It was registered before it was added to _added.
             return
-        self._txn_mgr.get().register(obj)
+        self._register(obj)
+
+    def _register(self, obj=None):
+        if obj is not None:
+            self._registered_objects.append(obj)
+        if self._needs_to_join:
+            self._txn_mgr.get().join(self)
+            self._needs_to_join = False
 
     def root(self):
         """Return the database root object.
@@ -825,7 +840,7 @@
         """Load non-current state for obj or raise ReadConflictError."""
 
         if not (self._mvcc and self._setstate_noncurrent(obj)):
-            self._txn_mgr.get().register(obj)
+            self._register(obj)
             self._conflicts[obj._p_oid] = True
             raise ReadConflictError(object=obj)
 
@@ -867,7 +882,7 @@
             finally:
                 self._inv_lock.release()
         else:
-            self._txn_mgr.get().register(obj)
+            self._register(obj)
             raise ReadConflictError(object=obj)
 
     def oldstate(self, obj, tid):
@@ -912,20 +927,6 @@
             self._log.error("setklassstate failed", exc_info=sys.exc_info())
             raise
 
-    def tpc_abort(self, transaction):
-        if self._import:
-            self._import = None
-        self._storage.tpc_abort(transaction)
-        self._cache.invalidate(self._modified)
-        self._conflicts.clear()
-        if not self._synch:
-            self._flush_invalidations()
-        self._invalidate_creating()
-        while self._added:
-            oid, obj = self._added.popitem()
-            del obj._p_oid
-            del obj._p_jar
-
     def tpc_begin(self, transaction, sub=False):
         self._modified = []
 
@@ -1009,10 +1010,28 @@
                     d[oid] = 1
                 self._db.invalidate(tid, d, self)
             self._storage.tpc_finish(transaction, callback)
+        self._tpc_cleanup()
 
+    def tpc_abort(self, transaction):
+        if self._import:
+            self._import = None
+        self._storage.tpc_abort(transaction)
+        self._cache.invalidate(self._modified)
+        self._invalidate_creating()
+        while self._added:
+            oid, obj = self._added.popitem()
+            del obj._p_oid
+            del obj._p_jar
+        self._tpc_cleanup()
+
+    # Common cleanup actions after tpc_finish/tpc_abort.
+    def _tpc_cleanup(self):
         self._conflicts.clear()
         if not self._synch:
             self._flush_invalidations()
+        self._needs_to_join = True
+        self._registered_objects = []
+
 
     def sync(self):
         self._txn_mgr.get().abort()
@@ -1044,5 +1063,5 @@
         new._p_oid = oid
         new._p_jar = self
         new._p_changed = 1
-        self._txn_mgr.get().register(new)
+        self._register(new)
         self._cache[oid] = new


=== Zope/lib/python/ZODB/ExportImport.py 1.23 => 1.24 ===
--- Zope/lib/python/ZODB/ExportImport.py:1.23	Sun Mar 21 11:13:31 2004
+++ Zope/lib/python/ZODB/ExportImport.py	Thu Apr 15 21:08:12 2004
@@ -57,7 +57,7 @@
 
         if isinstance(f, str):
             f = open(f,'rb')
-            
+
         magic = f.read(4)
         if magic != 'ZEXP':
             if customImporters and customImporters.has_key(magic):
@@ -65,13 +65,13 @@
                 return customImporters[magic](self, f, clue)
             raise ExportError("Invalid export header")
 
-        t = self.getTransaction()
+        t = self._txn_mgr.get()
         if clue:
             t.note(clue)
 
         return_oid_list = []
         self._import = f, return_oid_list
-        self.getTransaction().register(self)
+        self._register()
         t.commit(1)
         # Return the root imported object.
         if return_oid_list:




More information about the Zope-Checkins mailing list