[Zope-Checkins] CVS: ZODB3/ZODB - MappingStorage.py:1.11 DemoStorage.py:1.23 DB.py:1.58 Connection.py:1.105 BaseStorage.py:1.39

Jeremy Hylton jeremy at zope.com
Wed Dec 24 11:02:32 EST 2003


Update of /cvs-repository/ZODB3/ZODB
In directory cvs.zope.org:/tmp/cvs-serv27465/ZODB

Modified Files:
	MappingStorage.py DemoStorage.py DB.py Connection.py 
	BaseStorage.py 
Log Message:
Merge MVCC branch to the HEAD.


=== ZODB3/ZODB/MappingStorage.py 1.10 => 1.11 ===
--- ZODB3/ZODB/MappingStorage.py:1.10	Fri Nov 28 11:44:49 2003
+++ ZODB3/ZODB/MappingStorage.py	Wed Dec 24 11:02:00 2003
@@ -58,6 +58,16 @@
         finally:
             self._lock_release()
 
+    def loadEx(self, oid, version):
+        self._lock_acquire()
+        try:
+            # Since this storage doesn't support versions, tid and
+            # serial will always be the same.
+            p = self._index[oid]
+            return p[8:], p[:8], "" # pickle, serial, tid
+        finally:
+            self._lock_release()
+
     def store(self, oid, serial, data, version, transaction):
         if transaction is not self._transaction:
             raise POSException.StorageTransactionError(self, transaction)
@@ -75,11 +85,10 @@
                                                      serials=(oserial, serial),
                                                      data=data)
 
-            serial = self._serial
-            self._tindex.append((oid, serial+data))
+            self._tindex.append((oid, self._tid + data))
         finally:
             self._lock_release()
-        return serial
+        return self._tid
 
     def _clear_temp(self):
         self._tindex = []
@@ -87,7 +96,7 @@
     def _finish(self, tid, user, desc, ext):
         for oid, p in self._tindex:
             self._index[oid] = p
-        self._ltid = self._serial
+        self._ltid = self._tid
 
     def lastTransaction(self):
         return self._ltid
@@ -95,6 +104,8 @@
     def pack(self, t, referencesf):
         self._lock_acquire()
         try:
+            if not self._index:
+                return
             # Build an index of *only* those objects reachable from the root.
             rootl = ['\0\0\0\0\0\0\0\0']
             pindex = {}


=== ZODB3/ZODB/DemoStorage.py 1.22 => 1.23 ===
--- ZODB3/ZODB/DemoStorage.py:1.22	Fri Nov 28 11:44:49 2003
+++ ZODB3/ZODB/DemoStorage.py	Wed Dec 24 11:02:00 2003
@@ -45,14 +45,12 @@
 
 A record is a tuple:
 
-  oid, serial, pre, vdata, p,
+  oid, pre, vdata, p, tid
 
 where:
 
      oid -- object id
 
-     serial -- object serial number
-
      pre -- The previous record for this object (or None)
 
      vdata -- version data
@@ -62,6 +60,8 @@
 
      p -- the pickle data or None
 
+     tid -- the transaction id that wrote the record
+
 The pickle data will be None for a record for an object created in
 an aborted version.
 
@@ -93,12 +93,13 @@
         BaseStorage.BaseStorage.__init__(self, name, base)
 
         # We use a BTree because the items are sorted!
-        self._data=OOBTree.OOBTree()
-        self._index={}
-        self._vindex={}
-        self._base=base
-        self._size=0
-        self._quota=quota
+        self._data = OOBTree.OOBTree()
+        self._index = {}
+        self._vindex = {}
+        self._base = base
+        self._size = 0
+        self._quota = quota
+        self._ltid = None
         self._clear_temp()
         if base is not None and base.versions():
             raise POSException.StorageError, (
@@ -113,7 +114,7 @@
         s=100
         for tid, (p, u, d, e, t) in self._data.items():
             s=s+16+24+12+4+16+len(u)+16+len(d)+16+len(e)+16
-            for oid, serial, pre, vdata, p in t:
+            for oid, pre, vdata, p, tid in t:
                 s=s+16+24+24+4+4+(p and (16+len(p)) or 4)
                 if vdata: s=s+12+16+len(vdata[0])+4
 
@@ -139,16 +140,16 @@
 
             oids = []
             for r in v.values():
-                oid, serial, pre, (version, nv), p = r
+                oid, pre, (version, nv), p, tid = r
                 oids.append(oid)
                 if nv:
-                    oid, serial, pre, vdata, p = nv
-                    self._tindex.append([oid, serial, r, None, p])
+                    oid, pre, vdata, p, tid = nv
+                    self._tindex.append([oid, r, None, p, self._tid])
                 else:
                     # effectively, delete the thing
-                    self._tindex.append([oid, None, r, None, None])
+                    self._tindex.append([oid, r, None, None, self._tid])
 
-            return oids
+            return self._tid, oids
 
         finally: self._lock_release()
 
@@ -168,53 +169,60 @@
             if v is None:
                 return
 
-            newserial = self._serial
+            newserial = self._tid
             tindex = self._tindex
             oids = []
             for r in v.values():
-                oid, serial, pre, vdata, p = r
+                oid, pre, vdata, p, tid = r
                 assert vdata is not None
                 oids.append(oid)
                 if dest:
                     new_vdata = dest, vdata[1]
                 else:
                     new_vdata = None
-                tindex.append([oid, newserial, r, new_vdata, p])
+                tindex.append([oid, r, new_vdata, p, self._tid])
 
-            return oids
+            return self._tid, oids
 
         finally:
             self._lock_release()
 
-    def load(self, oid, version):
+    def loadEx(self, oid, version):
         self._lock_acquire()
         try:
             try:
-                oid, serial, pre, vdata, p = self._index[oid]
+                oid, pre, vdata, p, tid = self._index[oid]
             except KeyError:
                 if self._base:
                     return self._base.load(oid, '')
                 raise KeyError, oid
 
+            ver = ""
             if vdata:
                 oversion, nv = vdata
                 if oversion != version:
                     if nv:
-                        oid, serial, pre, vdata, p = nv
+                        # Return the current txn's tid with the non-version
+                        # data.
+                        oid, pre, vdata, p, skiptid = nv
                     else:
                         raise KeyError, oid
+                ver = oversion
 
             if p is None:
                 raise KeyError, oid
 
-            return p, serial
+            return p, tid, ver
         finally: self._lock_release()
 
+    def load(self, oid, version):
+        return self.loadEx(oid, version)[:2]
+
     def modifiedInVersion(self, oid):
         self._lock_acquire()
         try:
             try:
-                oid, serial, pre, vdata, p = self._index[oid]
+                oid, pre, vdata, p, tid = self._index[oid]
                 if vdata: return vdata[0]
                 return ''
             except: return ''
@@ -231,15 +239,15 @@
                 # Hm, nothing here, check the base version:
                 if self._base:
                     try:
-                        p, oserial = self._base.load(oid, '')
+                        p, tid = self._base.load(oid, '')
                     except KeyError:
                         pass
                     else:
-                        old = oid, oserial, None, None, p
+                        old = oid, None, None, p, tid
 
             nv=None
             if old:
-                oid, oserial, pre, vdata, p = old
+                oid, pre, vdata, p, tid = old
 
                 if vdata:
                     if vdata[0] != version:
@@ -249,12 +257,11 @@
                 else:
                     nv=old
 
-                if serial != oserial:
+                if serial != tid:
                     raise POSException.ConflictError(
-                        oid=oid, serials=(oserial, serial), data=data)
+                        oid=oid, serials=(tid, serial), data=data)
 
-            serial=self._serial
-            r=[oid, serial, old, version and (version, nv) or None, data]
+            r = [oid, old, version and (version, nv) or None, data, self._tid]
             self._tindex.append(r)
 
             s=self._tsize
@@ -268,15 +275,21 @@
                     has been exceeded.<br>Have a nice day.''')
 
         finally: self._lock_release()
-        return serial
+        return self._tid
 
-    def supportsUndo(self): return 1
-    def supportsVersions(self): return 1
+    def supportsUndo(self):
+        return 1
+
+    def supportsVersions(self):
+        return 1
 
     def _clear_temp(self):
         self._tindex = []
         self._tsize = self._size + 160
 
+    def lastTransaction(self):
+        return self._ltid
+
     def _begin(self, tid, u, d, e):
         self._tsize = self._size + 120 + len(u) + len(d) + len(e)
 
@@ -285,11 +298,11 @@
 
         self._data[tid] = None, user, desc, ext, tuple(self._tindex)
         for r in self._tindex:
-            oid, serial, pre, vdata, p = r
+            oid, pre, vdata, p, tid = r
             old = self._index.get(oid)
             # If the object had version data, remove the version data.
             if old is not None:
-                oldvdata = old[3]
+                oldvdata = old[2]
                 if oldvdata:
                     v = self._vindex[oldvdata[0]]
                     del v[oid]
@@ -306,6 +319,7 @@
                 if v is None:
                     v = self._vindex[version] = {}
                 v[oid] = r
+        self._ltid = self._tid
 
     def undo(self, transaction_id):
         self._lock_acquire()
@@ -324,7 +338,7 @@
 
             oids=[]
             for r in t:
-                oid, serial, pre, vdata, p = r
+                oid, pre, vdata, p, tid = r
                 if pre:
 
                     index[oid] = pre
@@ -337,7 +351,7 @@
                         if v: del v[oid]
 
                     # Add new version data (from pre):
-                    oid, serial, prepre, vdata, p = pre
+                    oid, prepre, vdata, p, tid = pre
                     if vdata:
                         version=vdata[0]
                         v=vindex.get(version, None)
@@ -404,17 +418,17 @@
 
     def _build_indexes(self, stop='\377\377\377\377\377\377\377\377'):
         # Rebuild index structures from transaction data
-        index={}
-        vindex={}
-        _data=self._data
-        for tid, (p, u, d, e, t) in _data.items():
-            if tid >= stop: break
+        index = {}
+        vindex = {}
+        for tid, (p, u, d, e, t) in self._data.items():
+            if tid >= stop:
+                break
             for r in t:
-                oid, serial, pre, vdata, p = r
+                oid, pre, vdata, p, tid = r
                 old=index.get(oid, None)
 
                 if old is not None:
-                    oldvdata=old[3]
+                    oldvdata=old[2]
                     if oldvdata:
                         v=vindex[oldvdata[0]]
                         del v[oid]
@@ -439,54 +453,56 @@
         try:
 
             stop=`TimeStamp(*time.gmtime(t)[:5]+(t%60,))`
-            _data=self._data
 
             # Build indexes up to the pack time:
             index, vindex = self._build_indexes(stop)
 
             # Now build an index of *only* those objects reachable
             # from the root.
-            rootl=['\0\0\0\0\0\0\0\0']
-            pop=rootl.pop
-            pindex={}
-            referenced=pindex.has_key
+            rootl = ['\0\0\0\0\0\0\0\0']
+            pindex = {}
             while rootl:
-                oid=pop()
-                if referenced(oid): continue
+                oid = rootl.pop()
+                if oid in pindex:
+                    continue
 
                 # Scan non-version pickle for references
-                r=index.get(oid, None)
+                r = index.get(oid, None)
                 if r is None:
                     if self._base:
                         p, s = self._base.load(oid, '')
                         referencesf(p, rootl)
                 else:
-                    pindex[oid]=r
-                    oid, serial, pre, vdata, p = r
+                    pindex[oid] = r
+                    oid, pre, vdata, p, tid = r
                     referencesf(p, rootl)
                     if vdata:
-                        nv=vdata[1]
+                        nv = vdata[1]
                         if nv:
-                            oid, serial, pre, vdata, p = nv
+                            oid, pre, vdata, p, tid = nv
                             referencesf(p, rootl)
 
             # Now we're ready to do the actual packing.
             # We'll simply edit the transaction data in place.
             # We'll defer deleting transactions till the end
             # to avoid messing up the BTree items.
-            deleted=[]
-            for tid, (p, u, d, e, t) in _data.items():
-                if tid >= stop: break
-                o=[]
-                for r in t:
-                    c=pindex.get(r[0])
+            deleted = []
+            for tid, (p, u, d, e, records) in self._data.items():
+                if tid >= stop:
+                    break
+                o = []
+                for r in records:
+                    c = pindex.get(r[0])
                     if c is None:
                         # GC this record, no longer referenced
                         continue
-                    elif c is not r:
+                    if c == r:
+                        # This is the most recent revision.
+                        o.append(r)
+                    else:
                         # This record is not the indexed record,
                         # so it may not be current. Let's see.
-                        oid, serial, pre, vdata, p = r
+                        vdata = r[3]
                         if vdata:
                             # Version record are current *only* if they
                             # are indexed
@@ -494,7 +510,7 @@
                         else:
                             # OK, this isn't a version record, so it may be the
                             # non-version record for the indexed record.
-                            oid, serial, pre, vdata, p = c
+                            vdata = c[3]
                             if vdata:
                                 if vdata[1] != r:
                                     # This record is not the non-version
@@ -505,25 +521,25 @@
                                 # so this record can not be the non-version
                                 # record for it.
                                 continue
-                    o.append(r)
+                        o.append(r)
 
                 if o:
-                    if len(o) != len(t):
-                        _data[tid] = 1, u, d, e, tuple(o) # Reset data
+                    if len(o) != len(records):
+                        self._data[tid] = 1, u, d, e, tuple(o) # Reset data
                 else:
                     deleted.append(tid)
 
             # Now delete empty transactions
             for tid in deleted:
-                del _data[tid]
+                del self._data[tid]
 
             # Now reset previous pointers for "current" records:
             for r in pindex.values():
-                r[2] = None # Previous record
-                if r[3] and r[3][1]: # vdata
+                r[1] = None # Previous record
+                if r[2] and r[2][1]: # vdata
                     # If this record contains version data and
                     # non-version data, then clear it out.
-                    r[3][1][2] = None
+                    r[2][1][2] = None
 
             # Finally, rebuild indexes from transaction data:
             self._index, self._vindex = self._build_indexes()
@@ -541,21 +557,22 @@
         for tid, (p, u, d, e, t) in self._data.items():
             o.append("  %s %s" % (TimeStamp(tid), p))
             for r in t:
-                oid, serial, pre, vdata, p = r
-                oid=utils.u64(oid)
-                if serial is not None: serial=str(TimeStamp(serial))
+                oid, pre, vdata, p, tid = r
+                oid = utils.oid_repr(oid)
+                tid = utils.oid_repr(tid)
+##                if serial is not None: serial=str(TimeStamp(serial))
                 pre=id(pre)
                 if vdata and vdata[1]: vdata=vdata[0], id(vdata[1])
                 if p: p=''
                 o.append('    %s: %s' %
-                         (id(r), `(oid, serial, pre, vdata, p)`))
+                         (id(r), `(oid, pre, vdata, p, tid)`))
 
         o.append('\nIndex:')
         items=self._index.items()
         items.sort()
         for oid, r in items:
             if r: r=id(r)
-            o.append('  %s: %s' % (utils.u64(oid), r))
+            o.append('  %s: %s' % (utils.oid_repr(oid), r))
 
         o.append('\nVersion Index:')
         items=self._vindex.items()
@@ -566,7 +583,6 @@
             vitems.sort()
             for oid, r in vitems:
                 if r: r=id(r)
-                o.append('    %s: %s' % (utils.u64(oid), r))
-
+                o.append('    %s: %s' % (utils.oid_repr(oid), r))
 
         return string.join(o,'\n')


=== ZODB3/ZODB/DB.py 1.57 => 1.58 ===
--- ZODB3/ZODB/DB.py:1.57	Fri Nov 28 11:44:49 2003
+++ ZODB3/ZODB/DB.py	Wed Dec 24 11:02:00 2003
@@ -74,7 +74,7 @@
         self._version_cache_size=version_cache_size
         self._version_cache_deactivate_after = version_cache_deactivate_after
 
-        self._miv_cache={}
+        self._miv_cache = {}
 
         # Setup storage
         self._storage=storage
@@ -300,8 +300,7 @@
     def importFile(self, file):
         raise NotImplementedError
 
-    def invalidate(self, oids, connection=None, version='',
-                   rc=sys.getrefcount):
+    def invalidate(self, tid, oids, connection=None, version=''):
         """Invalidate references to a given oid.
 
         This is used to indicate that one of the connections has committed a
@@ -323,21 +322,21 @@
             for cc in allocated:
                 if (cc is not connection and
                     (not version or cc._version==version)):
-                    if rc(cc) <= 3:
+                    if sys.getrefcount(cc) <= 3:
                         cc.close()
-                    cc.invalidate(oids)
+                    cc.invalidate(tid, oids)
 
-        temps=self._temps
-        if temps:
+        if self._temps:
             t=[]
-            for cc in temps:
-                if rc(cc) > 3:
+            for cc in self._temps:
+                if sys.getrefcount(cc) > 3:
                     if (cc is not connection and
-                        (not version or cc._version==version)):
-                        cc.invalidate(oids)
+                        (not version or cc._version == version)):
+                        cc.invalidate(tid, oids)
                     t.append(cc)
-                else: cc.close()
-            self._temps=t
+                else:
+                    cc.close()
+            self._temps = t
 
     def modifiedInVersion(self, oid):
         h=hash(oid)%131
@@ -353,7 +352,7 @@
         return len(self._storage)
 
     def open(self, version='', transaction=None, temporary=0, force=None,
-             waitflag=1):
+             waitflag=1, mvcc=True):
         """Return a object space (AKA connection) to work in
 
         The optional version argument can be used to specify that a
@@ -371,25 +370,25 @@
         try:
 
             if transaction is not None:
-                connections=transaction._connections
+                connections = transaction._connections
                 if connections:
                     if connections.has_key(version) and not temporary:
                         return connections[version]
                 else:
-                    transaction._connections=connections={}
-                transaction=transaction._connections
-
+                    transaction._connections = connections = {}
+                transaction = transaction._connections
 
             if temporary:
                 # This is a temporary connection.
                 # We won't bother with the pools.  This will be
                 # a one-use connection.
-                c=self.klass(
-                    version=version,
-                    cache_size=self._version_cache_size)
+                c = self.klass(version=version,
+                               cache_size=self._version_cache_size,
+                               mvcc=mvcc)
                 c._setDB(self)
                 self._temps.append(c)
-                if transaction is not None: transaction[id(c)]=c
+                if transaction is not None:
+                    transaction[id(c)] = c
                 return c
 
 
@@ -430,18 +429,18 @@
 
 
             if not pool:
-                c=None
+                c = None
                 if version:
                     if self._version_pool_size > len(allocated) or force:
-                        c=self.klass(
-                            version=version,
-                            cache_size=self._version_cache_size)
+                        c = self.klass(version=version,
+                                       cache_size=self._version_cache_size,
+                                       mvcc=mvcc)
                         allocated.append(c)
                         pool.append(c)
                 elif self._pool_size > len(allocated) or force:
-                    c=self.klass(
-                        version=version,
-                        cache_size=self._cache_size)
+                    c = self.klass(version=version,
+                                   cache_size=self._cache_size,
+                                   mvcc=mvcc)
                     allocated.append(c)
                     pool.append(c)
 
@@ -456,7 +455,7 @@
                             pool_lock.release()
                     else: return
 
-            elif len(pool)==1:
+            elif len(pool) == 1:
                 # Taking last one, lock the pool
                 # Note that another thread might grab the lock
                 # before us, so we might actually block, however,
@@ -470,14 +469,15 @@
                     # but it could be higher due to a race condition.
                     pool_lock.release()
 
-            c=pool[-1]
+            c = pool[-1]
             del pool[-1]
             c._setDB(self)
             for pool, allocated in pooll:
                 for cc in pool:
                     cc._incrgc()
 
-            if transaction is not None: transaction[version]=c
+            if transaction is not None:
+                transaction[version] = c
             return c
 
         finally: self._r()
@@ -588,7 +588,8 @@
             d = {}
             for oid in storage.undo(id):
                 d[oid] = 1
-            self.invalidate(d)
+            # XXX I think we need to remove old undo to use mvcc
+            self.invalidate(None, d)
 
     def versionEmpty(self, version):
         return self._storage.versionEmpty(version)
@@ -616,13 +617,13 @@
 
     def commit(self, reallyme, t):
         dest=self._dest
-        oids = self._db._storage.commitVersion(self._version, dest, t)
+        tid, oids = self._db._storage.commitVersion(self._version, dest, t)
         oids = list2dict(oids)
-        self._db.invalidate(oids, version=dest)
+        self._db.invalidate(tid, oids, version=dest)
         if dest:
             # the code above just invalidated the dest version.
             # now we need to invalidate the source!
-            self._db.invalidate(oids, version=self._version)
+            self._db.invalidate(tid, oids, version=self._version)
 
 class AbortVersion(CommitVersion):
     """An object that will see to version abortion
@@ -631,9 +632,9 @@
     """
 
     def commit(self, reallyme, t):
-        version=self._version
-        oids = self._db._storage.abortVersion(version, t)
-        self._db.invalidate(list2dict(oids), version=version)
+        version = self._version
+        tid, oids = self._db._storage.abortVersion(version, t)
+        self._db.invalidate(tid, list2dict(oids), version=version)
 
 
 class TransactionalUndo(CommitVersion):
@@ -647,5 +648,5 @@
     # similarity of rhythm that I think it's justified.
 
     def commit(self, reallyme, t):
-        oids = self._db._storage.transactionalUndo(self._version, t)
-        self._db.invalidate(list2dict(oids))
+        tid, oids = self._db._storage.transactionalUndo(self._version, t)
+        self._db.invalidate(tid, list2dict(oids))


=== ZODB3/ZODB/Connection.py 1.104 => 1.105 ===
--- ZODB3/ZODB/Connection.py:1.104	Wed Dec 10 15:02:15 2003
+++ ZODB3/ZODB/Connection.py	Wed Dec 24 11:02:00 2003
@@ -15,9 +15,17 @@
 
 $Id$"""
 
+import logging
 import sys
 import threading
 from time import time
+from types import ClassType
+
+_marker = object()
+
+def myhasattr(obj, attr):
+    # builtin hasattr() swallows exceptions
+    return getattr(obj, attr, _marker) is not _marker
 
 from persistent import PickleCache
 from zLOG import LOG, ERROR, BLATHER, WARNING
@@ -56,16 +64,19 @@
 
     The Connection manages movement of objects in and out of object storage.
     """
-    _tmp=None
-    _debug_info=()
-    _opened=None
-    _reset_counter = 0
+    _tmp = None
+    _debug_info = ()
+    _opened = None
+    _code_timestamp = 0
     _transaction = None
 
     def __init__(self, version='', cache_size=400,
-                 cache_deactivate_after=60):
+                 cache_deactivate_after=60, mvcc=True):
         """Create a new Connection"""
-        self._version=version
+
+        self._log = logging.getLogger("zodb.conn")
+
+        self._version = version
         self._cache = cache = PickleCache(self, cache_size)
         if version:
             # Caches for versions end up empty if the version
@@ -97,6 +108,16 @@
         self._invalidated = d = {}
         self._invalid = d.has_key
         self._conflicts = {}
+        self._noncurrent = {}
+
+        # If MVCC is enabled, then _mvcc is True and _txn_time stores
+        # the upper bound on transactions visible to this connection.
+        # That is, all object revisions must be written before _txn_time.
+        # If it is None, then the current revisions are acceptable.
+        # If the connection is in a version, mvcc will be disabled, because
+        # loadBefore() only returns non-version data.
+        self._mvcc = mvcc and not version
+        self._txn_time = None
 
     def getTransaction(self):
         t = self._transaction
@@ -216,11 +237,12 @@
         # Call the close callbacks.
         if self.__onCloseCallbacks is not None:
             for f in self.__onCloseCallbacks:
-                try: f()
-                except:
-                    f=getattr(f, 'im_self', f)
-                    LOG('ZODB',ERROR, 'Close callback failed for %s' % f,
-                        error=sys.exc_info())
+                try:
+                    f()
+                except: # except what?
+                    f = getattr(f, 'im_self', f)
+                    self._log.error("Close callback failed for %s", f,
+                                    sys.exc_info())
             self.__onCloseCallbacks = None
         self._storage = self._tmp = self.new_oid = self._opened = None
         self._debug_info = ()
@@ -303,8 +325,8 @@
         if tmp is None: return
         src=self._storage
 
-        LOG('ZODB', BLATHER,
-            'Commiting subtransaction of size %s' % src.getSize())
+        self._log.debug("Commiting subtransaction of size %s",
+                        src.getSize())
 
         self._storage=tmp
         self._tmp=None
@@ -363,7 +385,7 @@
     def isReadOnly(self):
         return self._storage.isReadOnly()
 
-    def invalidate(self, oids):
+    def invalidate(self, tid, oids):
         """Invalidate a set of oids.
 
         This marks the oid as invalid, but doesn't actually invalidate
@@ -372,6 +394,8 @@
         """
         self._inv_lock.acquire()
         try:
+            if self._txn_time is None:
+                self._txn_time = tid
             self._invalidated.update(oids)
         finally:
             self._inv_lock.release()
@@ -381,13 +405,15 @@
         try:
             self._cache.invalidate(self._invalidated)
             self._invalidated.clear()
+            self._txn_time = None
         finally:
             self._inv_lock.release()
         # Now is a good time to collect some garbage
         self._cache.incrgc()
 
     def modifiedInVersion(self, oid):
-        try: return self._db.modifiedInVersion(oid)
+        try:
+            return self._db.modifiedInVersion(oid)
         except KeyError:
             return self._version
 
@@ -411,54 +437,94 @@
         if self._storage is None:
             msg = ("Shouldn't load state for %s "
                    "when the connection is closed" % oid_repr(oid))
-            LOG('ZODB', ERROR, msg)
+            self._log.error(msg)
             raise RuntimeError(msg)
 
         try:
-            # Avoid reading data from a transaction that committed
-            # after the current transaction started, as that might
-            # lead to mixing of cached data from earlier transactions
-            # and new inconsistent data.
-            #
-            # Wait for check until after data is loaded from storage
-            # to avoid time-of-check to time-of-use race.
-            p, serial = self._storage.load(oid, self._version)
-            self._load_count = self._load_count + 1
-            invalid = self._is_invalidated(obj)
-            self._reader.setGhostState(obj, p)
-            obj._p_serial = serial
-            if invalid:
-                self._handle_independent(obj)
+            self._setstate(obj)
         except ConflictError:
             raise
         except:
-            LOG('ZODB', ERROR,
-                "Couldn't load state for %s" % oid_repr(oid),
-                error=sys.exc_info())
+            self._log.error("Couldn't load state for %s", oid_repr(oid),
+                            exc_info=sys.exc_info())
             raise
 
-    def _is_invalidated(self, obj):
-        # Helper method for setstate() covers three cases:
-        # returns false if obj is valid
-        # returns true if obj was invalidation, but is independent
-        # otherwise, raises ConflictError for invalidated objects
+    def _setstate(self, obj):
+        # Helper for setstate(), which provides logging of failures.
+
+        # The control flow is complicated here to avoid loading an
+        # object revision that we are sure we aren't going to use.  As
+        # a result, invalidation tests occur before and after the
+        # load.  We can only be sure about invalidations after the
+        # load.
+
+        # If an object has been invalidated, there are several cases
+        # to consider:
+        # 1. Check _p_independent()
+        # 2. Try MVCC
+        # 3. Raise ConflictError.
+
+        # Does anything actually use _p_independent()?  It would simplify
+        # the code if we could drop support for it.
+
+        # There is a harmless data race with self._invalidated.  A
+        # dict update could go on in another thread, but we don't care
+        # because we have to check again after the load anyway.
+        if (obj._p_oid in self._invalidated
+            and not myhasattr(obj, "_p_independent")):
+            # If the object has _p_independent(), we will handle it below.
+            if not (self._mvcc and self._setstate_noncurrent(obj)):
+                self.getTransaction().register(obj)
+                self._conflicts[obj._p_oid] = 1
+                raise ReadConflictError(object=obj)
+
+        p, serial = self._storage.load(obj._p_oid, self._version)
+        self._load_count += 1
+
         self._inv_lock.acquire()
         try:
-            if self._invalidated.has_key(obj._p_oid):
-                # Defer _p_independent() call until state is loaded.
-                ind = getattr(obj, "_p_independent", None)
-                if ind is not None:
-                    # Defer _p_independent() call until state is loaded.
-                    return 1
-                else:
-                    self.getTransaction().register(obj)
-                    self._conflicts[obj._p_oid] = 1
-                    raise ReadConflictError(object=obj)
-            else:
-                return 0
+            invalid = obj._p_oid in self._invalidated
         finally:
             self._inv_lock.release()
 
+        if invalid:
+            if myhasattr(obj, "_p_independent"):
+                # This call will raise a ReadConflictError if something
+                # goes wrong
+                self._handle_independent(obj)
+            elif not (self._mvcc and self._setstate_noncurrent(obj)):
+                self.getTransaction().register(obj)
+                self._conflicts[obj._p_oid] = 1
+                raise ReadConflictError(object=obj)
+
+        self._reader.setGhostState(obj, p)
+        obj._p_serial = serial
+
+    def _setstate_noncurrent(self, obj):
+        """Set state using non-current data.
+
+        Return True if state was available, False if not.
+        """
+        try:
+            # Load data that was current before the commit at txn_time.
+            t = self._storage.loadBefore(obj._p_oid, self._txn_time)
+        except KeyError:
+            return False
+        if t is None:
+            return False
+        data, start, end = t
+        # The non-current transaction must have been written before
+        # txn_time.  It must be current at txn_time, but could have
+        # been modified at txn_time.
+
+        # It's possible that end is None, if, e.g., the most recent
+        # invalidation was for version data.
+        assert start < self._txn_time <= end, \
+               (U64(start), U64(self._txn_time), U64(end))
+        self._noncurrent[obj._p_oid] = True
+        self._reader.setGhostState(obj, data)
+        obj._p_serial = start
+
     def _handle_independent(self, obj):
         # Helper method for setstate() handles possibly independent objects
         # Call _p_independent(), if it returns True, setstate() wins.
@@ -499,7 +565,7 @@
             obj._p_changed = 0
             obj._p_serial = serial
         except:
-            LOG('ZODB',ERROR, 'setklassstate failed', error=sys.exc_info())
+            self._log.error("setklassstate failed", exc_info=sys.exc_info())
             raise
 
     def tpc_abort(self, transaction):
@@ -590,11 +656,11 @@
             self._storage._creating[:0]=self._creating
             del self._creating[:]
         else:
-            def callback():
+            def callback(tid):
                 d = {}
                 for oid in self._modified:
                     d[oid] = 1
-                self._db.invalidate(d, self)
+                self._db.invalidate(tid, d, self)
             self._storage.tpc_finish(transaction, callback)
 
         self._conflicts.clear()


=== ZODB3/ZODB/BaseStorage.py 1.38 => 1.39 ===
--- ZODB3/ZODB/BaseStorage.py:1.38	Tue Dec 23 09:37:13 2003
+++ ZODB3/ZODB/BaseStorage.py	Wed Dec 24 11:02:00 2003
@@ -32,7 +32,6 @@
 
 class BaseStorage(UndoLogCompatible):
     _transaction=None # Transaction that is being committed
-    _serial=z64       # Transaction serial number
     _tstatus=' '      # Transaction status, used for copying data
     _is_read_only = 0
 
@@ -51,7 +50,7 @@
 
         t=time.time()
         t=self._ts=apply(TimeStamp,(time.gmtime(t)[:5]+(t%60,)))
-        self._serial=`t`
+        self._tid = `t`
         if base is None:
             self._oid='\0\0\0\0\0\0\0\0'
         else:
@@ -60,16 +59,19 @@
     def abortVersion(self, src, transaction):
         if transaction is not self._transaction:
             raise POSException.StorageTransactionError(self, transaction)
-        return []
+        return self._tid, []
 
     def commitVersion(self, src, dest, transaction):
         if transaction is not self._transaction:
             raise POSException.StorageTransactionError(self, transaction)
-        return []
+        return self._tid, []
 
     def close(self):
         pass
 
+    def cleanup(self):
+        pass
+
     def sortKey(self):
         """Return a string that can be used to sort storage instances.
 
@@ -85,7 +87,7 @@
     def getSize(self):
         return len(self)*300 # WAG!
 
-    def history(self, oid, version, length=1):
+    def history(self, oid, version, length=1, filter=None):
         pass
 
     def modifiedInVersion(self, oid):
@@ -167,13 +169,13 @@
                 now = time.time()
                 t = TimeStamp(*(time.gmtime(now)[:5] + (now % 60,)))
                 self._ts = t = t.laterThan(self._ts)
-                self._serial = `t`
+                self._tid = `t`
             else:
                 self._ts = TimeStamp(tid)
-                self._serial = tid
+                self._tid = tid
 
             self._tstatus = status
-            self._begin(self._serial, user, desc, ext)
+            self._begin(self._tid, user, desc, ext)
         finally:
             self._lock_release()
 
@@ -203,10 +205,11 @@
                 return
             try:
                 if f is not None:
-                    f()
+                    f(self._tid)
                 u, d, e = self._ude
-                self._finish(self._serial, u, d, e)
+                self._finish(self._tid, u, d, e)
                 self._clear_temp()
+                return self._tid
             finally:
                 self._ude = None
                 self._transaction = None
@@ -250,6 +253,48 @@
         raise POSException.Unsupported, (
             "Retrieval of historical revisions is not supported")
 
+    def loadBefore(self, oid, tid):
+        """Return most recent revision of oid before tid committed."""
+
+        # XXX Is it okay for loadBefore() to return current data?
+        # There doesn't seem to be a good reason to forbid it, even
+        # though the typical use of this method will never find
+        # current data.  But maybe we should call it loadByTid()?
+
+        n = 2
+        start_time = None
+        end_time = None
+        while start_time is None:
+            # The history() approach is a hack, because the dict
+            # returned by history() doesn't contain a tid.  It
+            # contains a serialno, which is often the same, but isn't
+            # required to be.  We'll pretend it is for now.
+
+            # A second problem is that history() doesn't say anything
+            # about whether the transaction status.  If it falls before
+            # the pack time, we can't honor the MVCC request.
+
+            # Note: history() returns the most recent record first.
+
+            # XXX The filter argument to history() only appears to be
+            # supported by FileStorage.  Perhaps it shouldn't be used.
+            L = self.history(oid, "", n, lambda d: not d["version"])
+            if not L:
+                return
+            for d in L:
+                if d["serial"] < tid:
+                    start_time = d["serial"]
+                    break
+                else:
+                    end_time = d["serial"]
+            if len(L) < n:
+                break
+            n *= 2
+        if start_time is None:
+            return None
+        data = self.loadSerial(oid, start_time)
+        return data, start_time, end_time
+
     def getExtensionMethods(self):
         """getExtensionMethods
 
@@ -314,7 +359,7 @@
                 oid=r.oid
                 if verbose: print oid_repr(oid), r.version, len(r.data)
                 if restoring:
-                    self.restore(oid, r.serial, r.data, r.version,
+                    self.restore(oid, r.tid, r.data, r.version,
                                  r.data_txn, transaction)
                 else:
                     pre=preget(oid, None)




More information about the Zope-Checkins mailing list