[Zope-CVS] CVS: Products/Ape/lib/apelib/zodb3 - connection.py:1.9 scanner.py:1.5 storage.py:1.11

Shane Hathaway shane at zope.com
Sat Feb 28 15:06:59 EST 2004


Update of /cvs-repository/Products/Ape/lib/apelib/zodb3
In directory cvs.zope.org:/tmp/cvs-serv30293/lib/apelib/zodb3

Modified Files:
	connection.py scanner.py storage.py 
Log Message:
Merged ape-fs-oid-branch.

Ape now uses arbitrary OIDs on the filesystem, rather than using paths 
as OIDs.  This solves problems with moving and replacing objects and 
further unifies SQL and filesystem databases.



=== Products/Ape/lib/apelib/zodb3/connection.py 1.8 => 1.9 ===
--- Products/Ape/lib/apelib/zodb3/connection.py:1.8	Tue Feb 17 00:25:13 2004
+++ Products/Ape/lib/apelib/zodb3/connection.py	Sat Feb 28 15:06:28 2004
@@ -139,6 +139,7 @@
         obj._p_oid=oid
         obj._p_jar=self
         obj._p_changed=None
+        self.setSerial(obj, serial)
 
         self._cache[oid] = obj
         
@@ -261,8 +262,10 @@
             ext_refs = event.external
             if ext_refs:
                 for (ext_oid, ext_ref) in ext_refs:
-                    if self.getSerial(ext_ref) == HASH0:
-                        # New object
+                    assert ext_oid
+                    assert ext_ref is not None
+                    if self._cache.get(ext_oid, None) is not ext_ref:
+                        # New object or a bad reference
                         if ext_ref._p_jar is not None:
                             if ext_ref._p_jar is not self:
                                 raise InvalidObjectReference, (
@@ -293,14 +296,6 @@
             # response, just in case the response contains the
             # serial number for a newly created object
             try: cache[oid] = obj
-            except ValueError:
-                # "Cannot re-register an object under a different
-                # oid".  This can happen when the user is working on
-                # the filesystem and creates an object with an ID that
-                # was used recently.  Try to fix it by minimizing
-                # the cache and trying again.
-                cache.minimize()
-                cache[oid] = obj
             except:
                 if aq_base(obj) is not obj:
                     # Yuck, someone tried to store a wrapper.  Try to


=== Products/Ape/lib/apelib/zodb3/scanner.py 1.4 => 1.5 ===
--- Products/Ape/lib/apelib/zodb3/scanner.py:1.4	Tue Feb 17 00:25:13 2004
+++ Products/Ape/lib/apelib/zodb3/scanner.py	Sat Feb 28 15:06:28 2004
@@ -157,7 +157,6 @@
     def __init__(self):
         self.current = OOBTree()  # OOBTree({ oid -> {source->state} })
         self.future = {}          # { oid -> ({source->state}, atime) }
-        self.uncommitted = {}     # { tid -> {oid->{source->state}} }
         self.lock = allocate_lock()
         self.storage = None
 
@@ -230,17 +229,6 @@
             self.lock.release()
 
 
-    def afterStore(self, oid, tid, sources):
-        """Called by the storage after an object is stored (but not committed.)
-        """
-        self.lock.acquire()
-        try:
-            t = self.uncommitted.setdefault(tid, {})
-            t[oid] = sources
-        finally:
-            self.lock.release()
-
-
     def scan(self):
         """Scan sources, returning the OIDs of changed objects.
         """
@@ -291,53 +279,9 @@
                 'Future sources cache size: %d objects.' % len(self.future))
 
 
-    def afterCommit(self, tid):
-        """Commits information recorded by setUncommittedSources().
-        """
-        self.lock.acquire()
-        try:
-            if not self.uncommitted.has_key(tid):
-                return
-            t = self.uncommitted[tid]
-            del self.uncommitted[tid]
-        finally:
-            self.lock.release()
-        # Update the sources with new states for the committed OIDs.
-        to_scan = {}        # { repo -> { source -> state } }
-        for oid, sources in t.items():
-            if sources:
-                for source, state in sources.items():
-                    repo, location = source
-                    to_scan.setdefault(repo, {})[source] = state
-        changes = {}
-        for repo, d in to_scan.items():
-            c = repo.poll(d)
-            if c:
-                changes.update(c)
-        self.lock.acquire()
-        try:
-            now = time()
-            for oid, sources in t.items():
-                new_sources = {}
-                if sources:
-                    for source, state in sources.items():
-                        state = changes.get(source, state)
-                        new_sources[source] = state
-                if self.current.has_key(oid):
-                    self.current[oid] = new_sources
-                else:
-                    self.future[oid] = (new_sources, now)
-        finally:
-            self.lock.release()
-
-
-    def afterAbort(self, tid):
-        """Aborts information recorded by setUncommittedSources().
+    def afterCommit(self, oid, sources):
+        """Records changes to sources after commit..
         """
-        self.lock.acquire()
-        try:
-            if self.uncommitted.has_key(tid):
-                del self.uncommitted[tid]
-        finally:
-            self.lock.release()
-
+        self.current[oid] = sources
+        if self.future.has_key(oid):
+            del self.future[oid]


=== Products/Ape/lib/apelib/zodb3/storage.py 1.10 => 1.11 ===
--- Products/Ape/lib/apelib/zodb3/storage.py:1.10	Tue Feb 17 00:25:13 2004
+++ Products/Ape/lib/apelib/zodb3/storage.py	Sat Feb 28 15:06:28 2004
@@ -52,6 +52,9 @@
             name = 'ApeStorage: ' + ', '.join(names)
         self._ltid = None
         self.scanner = None
+        self.changed = {}  # {tid: {oid: 1}}
+        if DEBUG:
+            self._loaded_hashes = {}  # {oid: hash}
         BaseStorage.BaseStorage.__init__(self, name)
 
     def __len__(self):
@@ -78,8 +81,6 @@
         if h == HASH0:
             # Avoid the special zero hash.
             h = HASH1
-        if DEBUG:
-            print '64-bit hash of %r is %r' % (value, h)
         return h
 
     def load(self, oid, version):
@@ -96,7 +97,7 @@
             data = file.getvalue()
             h = self.hash64(hash_value)
             if DEBUG:
-                print 'loaded', `oid`, `h`
+                self._loaded_hashes[oid] = hash_value
             if self.scanner is not None:
                 sources = event.mapper.gateway.getPollSources(event)
                 self.scanner.afterLoad(oid, sources)
@@ -118,8 +119,6 @@
             # First detect conflicts.
             # The "h64" argument, if its value is not 0,
             # was previously generated by hash64().
-            if DEBUG:
-                print 'storing', `oid`, `h64`
             if h64 == HASH0:
                 # Writing a new object.
                 is_new = True
@@ -130,9 +129,15 @@
                 event, old_c, old_state, old_hash = self._gwio.load(oid)
                 old_h64 = self.hash64(old_hash)
                 if h64 != old_h64:
+                    h = None
+                    if DEBUG:
+                        h = self._loaded_hashes.get(oid)
+                    if h is None:
+                        h = h64
+                        old_hash = old_h64
                     raise POSException.ConflictError(
                         "Storing %s based on old data.  %s != %s." % (
-                        repr(oid), repr(h64), repr(old_h64)))
+                        repr(oid), repr(h), repr(old_hash)))
 
             # Now unpickle and store the data.
             file = StringIO(data)
@@ -142,14 +147,16 @@
             event, new_hash = self._gwio.store(
                 oid, classification, state, is_new)
             new_h64 = self.hash64(new_hash)
-            if self.scanner is not None:
-                sources = event.mapper.gateway.getPollSources(event)
-                self.scanner.afterStore(oid, self._serial, sources)
+
+            # Remember that this OID changed (for scanning)
+            t = self.changed.get(self._serial)
+            if t is None:
+                t = {}
+                self.changed[self._serial] = t
+            t[oid] = 1
         finally:
             self._lock_release()
 
-        if DEBUG:
-            print 'stored', `oid`, `h64`, `new_h64`
         return new_h64
 
     def getPollSources(self, oid):
@@ -171,8 +178,8 @@
     def _abort(self):
         for c in self._conn_list:
             c.abort()
-        if self.scanner is not None:
-            self.scanner.afterAbort(self._serial)
+        if self.changed.has_key(self._serial):
+            del self.changed[self._serial]
 
     def _begin(self, tid, u, d, e):
         for c in self._conn_list:
@@ -182,8 +189,13 @@
         for c in self._conn_list:
             c.finish()
         self._ltid = self._serial
-        if self.scanner is not None:
-            self.scanner.afterCommit(self._serial)
+        if self.changed.has_key(self._serial):
+            oids = self.changed[self._serial]
+            del self.changed[self._serial]
+            if self.scanner:
+                for oid in oids:
+                    sources = self._gwio.getPollSources(oid)
+                    self.scanner.afterCommit(oid, sources)
 
     def _vote(self):
         for c in self._conn_list:
@@ -201,4 +213,3 @@
         for c in self._conn_list:
             c.close()
         self.conf_resource.release(self)
-




More information about the Zope-CVS mailing list