[Checkins] SVN: zope.fssync/trunk/src/zope/fssync/ Added ability for synchronizers to return callbacks from the load

Amos Latteier amos at latteier.com
Wed Mar 4 18:17:08 EST 2009


Log message for revision 97497:
  Added ability for synchronizers to return callbacks from the load 
  method. This allows for fix ups to be run later after all objects are 
  loaded. This is helpful when adding multiple objects at the same time 
  that depend on each other.
  

Changed:
  U   zope.fssync/trunk/src/zope/fssync/CHANGES.txt
  U   zope.fssync/trunk/src/zope/fssync/task.py
  U   zope.fssync/trunk/src/zope/fssync/tests/test_task.py

-=-
Modified: zope.fssync/trunk/src/zope/fssync/CHANGES.txt
===================================================================
--- zope.fssync/trunk/src/zope/fssync/CHANGES.txt	2009-03-04 21:24:00 UTC (rev 97496)
+++ zope.fssync/trunk/src/zope/fssync/CHANGES.txt	2009-03-04 23:17:08 UTC (rev 97497)
@@ -1,36 +1,47 @@
 Zope FSSync Changes
 
+  XXX
+
+    - Added the support for empty directories in snarf format. Now
+      directories can be explicitly described by snarf.
+
+    - Synchronizers can now return callbacks from the load
+      method. This allows for fix ups to be run later. This is useful
+      when adding multiple objects at the same time that depend on
+      each other. Callbacks can in turn return callbacks.
+
+
   After Zope 3.4.0b1 (trunk only)
 
-    Refactoring of zope.fssync and zope.app.fssync into two clearly 
-    separated packages: 
+    Refactoring of zope.fssync and zope.app.fssync into two clearly
+    separated packages:
 
-    - zope.fssync contains now a Python API that has no critical dependencies 
-      on Zope, the ZODB, and the security machinery. 
+    - zope.fssync contains now a Python API that has no critical dependencies
+      on Zope, the ZODB, and the security machinery.
 
-    - zope.app.fssync contains a protected web-based API and special 
-      synchronizers for zope.app content types. 
+    - zope.app.fssync contains a protected web-based API and special
+      synchronizers for zope.app content types.
 
-    Other major changes are 
+    Other major changes are
 
-    - synchronizers (i.e. serialization/de-serialization adapters) are created 
-      by named utilities which use dotted class names as lookup keys 
+    - synchronizers (i.e. serialization/de-serialization adapters) are created
+      by named utilities which use dotted class names as lookup keys
 
     - added doctests
 
-    - support for large files 
+    - support for large files
 
-    - adapters for pickler, unpickler and handling of persistent pickle ids 
+    - adapters for pickler, unpickler and handling of persistent pickle ids
 
-    - binaries are no longer merged 
+    - binaries are no longer merged
 
-    - case-insensitive filesystems and repositories use disambiguated names on 
-      export and the original names on import 
+    - case-insensitive filesystems and repositories use disambiguated names on
+      export and the original names on import
 
-    - export and import of directly provided interfaces 
+    - export and import of directly provided interfaces
 
-    - direct export to archives/direct import from archives 
+    - direct export to archives/direct import from archives
 
     - addressed encoding problems on Mac OSX
 
-    
\ No newline at end of file
+

Modified: zope.fssync/trunk/src/zope/fssync/task.py
===================================================================
--- zope.fssync/trunk/src/zope/fssync/task.py	2009-03-04 21:24:00 UTC (rev 97496)
+++ zope.fssync/trunk/src/zope/fssync/task.py	2009-03-04 23:17:08 UTC (rev 97497)
@@ -35,7 +35,7 @@
 
 class SynchronizationError(Exception):
     pass
-    
+
 @zope.component.adapter(zope.interface.Interface)
 @zope.interface.implementer(interfaces.IEntryId)
 def EntryId(obj):
@@ -56,7 +56,7 @@
 class SyncTask(object):
     """Convenient base class for synchronization tasks."""
 
-    def __init__(self, getSynchronizer, 
+    def __init__(self, getSynchronizer,
                        repository,
                        context=None):
         self.getSynchronizer = getSynchronizer
@@ -83,7 +83,7 @@
 
     def serializableItems(self, items, dirpath):
         """Returns items which have synchronizer.
-        
+
         Returns a tuple of disambiguated name, original key, and synchronizer.
         """
         result = []
@@ -173,40 +173,57 @@
     """
 
     zope.interface.implements(interfaces.ICommit)
-    
+
     debug = False
-    
+
     def __init__(self, getSynchronizer, repository):
         super(Commit, self).__init__(getSynchronizer, repository)
         self.metadata = self.repository.getMetadata()
 
     def perform(self, container, name, fspath):
-        self.synchronize(container, name, fspath)
+        callbacks = []
+        add_callback = callbacks.append
+        self.synchronize(container, name, fspath, add_callback)
 
-    def synchronize(self, container, name, fspath):
+        # process callbacks
+        passes = 0
+        callbacks = [cb for cb in callbacks if cb is not None]
+        while passes < 10 and callbacks:
+            new_callbacks = []
+            for callback in callbacks:
+                new_callbacks.append(callback())
+            callbacks = [cb for cb in new_callbacks if cb is not None]
+            passes += 1
+
+        # fail if there are still callbacks after 10 passes. this
+        # suggests an infinate loop in callback creation.
+        if callbacks:
+            raise SynchronizationError(
+                'Too many synchronizer callback passes %s' % callbacks)
+
+    def synchronize(self, container, name, fspath, add_callback):
         """Synchronize an object or object tree from a repository.
 
         ``SynchronizationError`` is raised for errors that can't be
         corrected by a update operation, including invalid object
         names.
         """
-        
         self.context = container
         modifications = []
         if invalidName(name):
             raise SynchronizationError("invalid separator in name %r" % name)
-                
+
         if not name:
-            self.synchDirectory(container, fspath)
+            self.synchDirectory(container, fspath, add_callback)
         else:
             synchronizer = self.getSynchronizer(container)
             key = originalKey(fspath, name, self.metadata)
             try:
                 traverseKey(container, key)
             except:
-                self.synchNew(container, key, fspath)
+                self.synchNew(container, key, fspath, add_callback)
             else:
-                modified = self.synchOld(container, key, fspath)
+                modified = self.synchOld(container, key, fspath, add_callback)
                 if modified:
                     modifications.append(modified)
             # Now update extra and annotations
@@ -220,31 +237,32 @@
                 modified = synchronizer.setmetadata(metadata)
                 if modified:
                     modifications.append(modified)
-                        
+
                 extrapath = fsutil.getextra(fspath)
                 if self.repository.exists(extrapath):
                     extras = synchronizer.extras()
-                    extras = self.synchSpecials(extrapath, extras)
+                    extras = self.synchSpecials(extrapath, extras, add_callback)
                     modified = synchronizer.setextras(extras)
                     if modified:
                         modifications.append(modified)
-                
+
                 annpath = fsutil.getannotations(fspath)
                 if self.repository.exists(annpath):
                     annotations = synchronizer.annotations()
-                    annotations = self.synchSpecials(annpath, annotations)
+                    annotations = self.synchSpecials(annpath, annotations,
+                                                     add_callback)
                     modified = synchronizer.setannotations(annotations)
                     if modified:
                         modifications.append(modified)
-                        
+
             if modifications:
                 zope.event.notify(
                     zope.lifecycleevent.ObjectModifiedEvent(
-                        obj, 
+                        obj,
                         *modifications))
 
 
-    def synchSpecials(self, fspath, specials):
+    def synchSpecials(self, fspath, specials, add_callback):
         """Synchronize an extra or annotation mapping."""
         repository = self.repository
         md = self.metadata.getmanager(fspath)
@@ -253,7 +271,7 @@
         if interfaces.IDirectorySynchronizer.providedBy(synchronizer):
             for name, entry in entries.items():
                 path = self.repository.join(fspath, name)
-                self.synchronize(specials, name, path)
+                self.synchronize(specials, name, path, add_callback)
         else:
             if interfaces.IDefaultSynchronizer.providedBy(synchronizer):
                 fp = self.repository.readable(fspath)
@@ -262,12 +280,12 @@
                 fp.close()
             elif interfaces.IFileSynchronizer.providedBy(synchronizer):
                 fp = self.repository.readable(fspath)
-                synchronizer.load(fp)
+                add_callback(synchronizer.load(fp))
                 fp.close()
 
         return specials
 
-    def synchDirectory(self, container, fspath):
+    def synchDirectory(self, container, fspath, add_callback):
         """Helper to synchronize a directory."""
         adapter = self.getSynchronizer(container)
         nameset = {}
@@ -280,10 +298,10 @@
                 nameset[key] = self.repository.join(fspath, key)
         for name in self.metadata.getnames(fspath):
             nameset[name] = self.repository.join(fspath, name)
-            
+
         # Sort the list of keys for repeatability
         names_paths = nameset.items()
-            
+
         names_paths.sort()
         subdirs = []
         # Do the non-directories first.
@@ -293,31 +311,32 @@
             if self.repository.isdir(path):
                 subdirs.append((name, path))
             else:
-                self.synchronize(container, name, path)
+                self.synchronize(container, name, path, add_callback)
         # Now do the directories
         for name, path in subdirs:
-            self.synchronize(container, name, path)
+            self.synchronize(container, name, path, add_callback)
 
-    def synchNew(self, container, name, fspath):
+    def synchNew(self, container, name, fspath, add_callback):
         """Helper to synchronize a new object."""
         entry = self.metadata.getentry(fspath)
         if entry:
             # In rare cases (e.g. if the original name and replicated name
-            # differ and the replica has been deleted) we can get 
+            # differ and the replica has been deleted) we can get
             # something apparently new that is marked for deletion. Since the
             # names are provided by the synchronizer we must at least
             # inform the synchronizer.
             if entry.get("flag") == "removed":
                 self.deleteItem(container, name)
                 return
-            obj = self.createObject(container, name, entry, fspath)
+            obj = self.createObject(container, name, entry, fspath,
+                                    add_callback)
             synchronizer = self.getSynchronizer(obj)
             if interfaces.IDirectorySynchronizer.providedBy(synchronizer):
-                self.synchDirectory(obj, fspath)
+                self.synchDirectory(obj, fspath, add_callback)
 
-    def synchOld(self, container, name, fspath):
+    def synchOld(self, container, name, fspath, add_callback):
         """Helper to synchronize an existing object."""
-        
+
         modification = None
         entry = self.metadata.getentry(fspath)
         if entry.get("flag") == "removed":
@@ -331,11 +350,12 @@
         obj = traverseKey(container, key)
         synchronizer = self.getSynchronizer(obj)
         if interfaces.IDirectorySynchronizer.providedBy(synchronizer):
-            self.synchDirectory(obj, fspath)
+            self.synchDirectory(obj, fspath, add_callback)
         else:
             type = entry.get("type")
             if type and typeIdentifier(obj) != type:
-                self.createObject(container, key, entry, fspath, replace=True)
+                self.createObject(container, key, entry, fspath, add_callback,
+                                  replace=True)
             else:
                 original_fn = fsutil.getoriginal(fspath)
                 if self.repository.exists(original_fn):
@@ -353,7 +373,8 @@
                 if new:
                     if not entry.get("factory"):
                         # If there's no factory, we can't call load
-                        self.createObject(container, key, entry, fspath, True)
+                        self.createObject(container, key, entry, fspath,
+                                          add_callback, True)
                         obj = traverseKey(container, key)
                         modification = ObjectSynchronized()
                     else:
@@ -361,12 +382,13 @@
                         modified = not compare(fp, synchronizer)
                         if modified:
                             fp.seek(0)
-                            synchronizer.load(fp)
+                            add_callback(synchronizer.load(fp))
                             modification = ObjectSynchronized()
                         fp.close()
         return modification
 
-    def createObject(self, container, name, entry, fspath, replace=False):
+    def createObject(self, container, name, entry, fspath, add_callback,
+                     replace=False):
         """Helper to create a deserialized object."""
         factory_name = entry.get("factory")
         type = entry.get("type")
@@ -391,7 +413,7 @@
                 fp.close()
             elif interfaces.IFileSynchronizer.providedBy(synchronizer):
                 fp = self.repository.readable(fspath)
-                synchronizer.load(fp)
+                add_callback(synchronizer.load(fp))
                 fp.close()
         elif type:
             fp = self.repository.readable(fspath)
@@ -423,9 +445,9 @@
                     pickler = interfaces.IUnpickler(self.context)
                     obj = pickler.load(fp)
                 else:
-                    generator.load(obj, fp)
+                    add_callback(generator.load(obj, fp))
                 fp.close()
-        
+
         if not added:
             self.setItem(container, name, obj, replace)
         return obj
@@ -451,7 +473,7 @@
 
         Uses the synchronizer if possible.
         """
-        
+
         dir = self.getSynchronizer(container)
         if interfaces.IDirectorySynchronizer.providedBy(dir):
             del dir[key]
@@ -465,21 +487,40 @@
 
     def perform(self, container, name, fspath):
         """Checkin a new object tree.
-        
+
         Raises a ``SynchronizationError`` if the name already exists
         in the object database.
         """
-        
+        callbacks = []
+        add_callback = callbacks.append
+
         self.context = container    # use container as context of reference
         self.metadata.added()
         try:
             traverseKey(container, name)
         except:
-            self.synchronize(container, name, fspath)
+            self.synchronize(container, name, fspath, add_callback)
         else:
             raise SynchronizationError("object already exists %r" % name)
 
+        # process callbacks
+        passes = 0
+        callbacks = [cb for cb in callbacks if cb is not None]
+        while passes < 10 and callbacks:
+            new_callbacks = []
+            for callback in callbacks:
+                new_callbacks.append(callback())
+            callbacks = [cb for cb in new_callbacks if cb is not None]
+            passes += 1
 
+        # fail if there are still callbacks after 10 passes. this
+        # suggests an infinate loop in callback creation.
+        if callbacks:
+            raise SynchronizationError(
+                'Too many synchronizer callback passes %s' % callbacks)
+
+
+
 class Check(SyncTask):
     """Check that a repository is consistent with the object database.
     """
@@ -492,7 +533,7 @@
         self.metadata = repository.getMetadata()
         self.conflicts = []
         self.raise_on_conflicts = raise_on_conflicts
-        
+
     def errors(self):
         """Return a list of errors (conflicts).
 
@@ -553,7 +594,7 @@
                 extrapath = fsutil.getextra(fspath)
                 if extras and self.repository.exists(extrapath):
                     self.checkSpecials(extras, extrapath)
-                    
+
                 annotations = adapter.annotations()
                 annpath = fsutil.getannotations(fspath)
                 if annotations and self.repository.exists(annpath):
@@ -561,7 +602,7 @@
 
     def checkSpecials(self, container, fspath):
         """Helper to check a directory."""
-        
+
         nameset = {}
         for key in container:
             nameset[key] = 1
@@ -620,7 +661,7 @@
         else:
             if not self.repository.exists(fspath):
                 self.conflict(fspath)
-                
+
         synchronizer = self.getSynchronizer(container)
         key = originalKey(fspath, name, self.metadata)
         obj = traverseKey(container, key)
@@ -636,7 +677,7 @@
                 cmppath = oldfspath
             else:
                 cmppath = fspath
-                
+
             fp = self.repository.readable(cmppath)
             if not compare(fp, adapter):
                 self.conflict(fspath)
@@ -683,7 +724,7 @@
 
 def compare(readable, dumper):
     """Help function for the comparison of a readable and a synchronizer.
-        
+
     Simulates a writeable that raises an exception if the serializer
     dumps data which do not match the content of the readable.
     """
@@ -703,10 +744,10 @@
 
 
 class ComparePickles(object):
-    
+
     def __init__(self, context, pickler):
         self.context = context
         self.pickler = pickler
-        
+
     def dump(self, writeable):
         self.pickler.dump(self.context, writeable)

Modified: zope.fssync/trunk/src/zope/fssync/tests/test_task.py
===================================================================
--- zope.fssync/trunk/src/zope/fssync/tests/test_task.py	2009-03-04 21:24:00 UTC (rev 97496)
+++ zope.fssync/trunk/src/zope/fssync/tests/test_task.py	2009-03-04 23:17:08 UTC (rev 97497)
@@ -42,31 +42,31 @@
 from zope.fssync import interfaces
 from zope.fssync import repository
 from zope.fssync import pickle
-from zope.fssync import task 
+from zope.fssync import task
 
 def provideSynchronizer(klass, Synchronizer):
     zope.component.provideUtility(Synchronizer, interfaces.ISynchronizerFactory,
                                         name=synchronizer.dottedname(klass))
-        
+
 class Sample(object):
     pass
 
 class IPretendFile(zope.interface.Interface):
     pass
-   
+
 class PretendFile(object):
     zope.interface.implements(IPretendFile)
-    
+
     data = ''
     contentType = ''
-    
+
     def __init__(self, data, contentType):
         self.data = data
         self.contentType = contentType
 
 class IPretendContainer(zope.interface.Interface):
     pass
-    
+
 class PretendContainer(Location):
     zope.interface.implements(IPretendContainer, ITraversable, ITraverser)
 
@@ -125,7 +125,7 @@
         super(TestBase, self).setUp()
 
         # Set up serializer factory
-        zope.component.provideUtility(synchronizer.DefaultSynchronizer, 
+        zope.component.provideUtility(synchronizer.DefaultSynchronizer,
                                         interfaces.ISynchronizerFactory)
 
         zope.component.provideAdapter(pickle.XMLPickler)
@@ -186,7 +186,7 @@
     def create_committer(self):
         filesystem = repository.FileSystemRepository()
         return task.Commit(synchronizer.getSynchronizer, filesystem)
-        
+
     def test_set_item_without_serializer(self):
         committer = self.create_committer()
         container = {}
@@ -247,7 +247,7 @@
         name = "contentType"
         root = TestRoot()
         try:
-            self.create_object(container, name, {}, fspath) #, context=root)
+            self.create_object(container, name, {}, fspath, list().append) #, context=root)
         finally:
             os.remove(fspath)
         self.assertEqual(container.name, name)
@@ -259,7 +259,7 @@
         tfn = os.path.join(self.tempdir(), "foo")
         data = {"hello": "world"}
         self.writefile(dumps(data), tfn)
-        self.create_object_debug(container, "foo", entry, tfn)
+        self.create_object_debug(container, "foo", entry, tfn, list().append)
         self.assertEqual(container, {"foo": data})
 
     def test_create_object_factory_directory(self):
@@ -268,7 +268,7 @@
         entry = {"flag": "added", "factory": PCname}
         tfn = os.path.join(self.tempdir(), "foo")
         os.mkdir(tfn)
-        self.create_object(container, "foo", entry, tfn)
+        self.create_object(container, "foo", entry, tfn, list().append)
         self.assertEqual(container.keys(), ["foo"])
         self.assertEqual(container["foo"].__class__, PretendContainer)
 
@@ -278,7 +278,7 @@
         data = ["hello", "world"]
         tfn = os.path.join(self.tempdir(), "foo")
         self.writefile(dumps(data), tfn, "wb")
-        self.create_object(container, "foo", entry, tfn)
+        self.create_object(container, "foo", entry, tfn, list().append)
         self.assertEqual(container.items(), [("foo", ["hello", "world"])])
 
     def test_create_object_ifilefactory(self):
@@ -288,7 +288,7 @@
         data = "hello world"
         tfn = os.path.join(self.tempdir(), "foo")
         self.writefile(data, tfn, "wb")
-        self.create_object(container, "foo", entry, tfn)
+        self.create_object(container, "foo", entry, tfn, list().append)
         self.assertEqual(container.holding["foo"].__class__, PretendFile)
         self.assertEqual(container.holding["foo"].data, "hello world")
 
@@ -298,7 +298,7 @@
         entry = {"flag": "added"}
         tfn = os.path.join(self.tempdir(), "foo")
         os.mkdir(tfn)
-        self.create_object(container, "foo", entry, tfn)
+        self.create_object(container, "foo", entry, tfn, list().append)
         self.assertEqual(container.holding["foo"].__class__, PretendContainer)
 
 
@@ -615,7 +615,7 @@
 
     def verify_file_changed(self):
         self.assertEqual(self.child["foo"], self.newfoo)
-    
+
     def verify_file_removed(self):
         self.assertEqual(self.child.keys(), ["grandchild"])
 
@@ -651,12 +651,102 @@
         self.assertEqual(self.parent.keys(), [])
 
 
+class ExampleFile(object):
+    fixed_up1 = False
+    fixed_up2 = False
+
+    def __init__(self, data=''):
+        self.data = data
+
+class SynchronizerWithCB(synchronizer.FileSynchronizer):
+
+    def load(self, readable):
+        self.context.fixed_up1 = False
+        self.context.fixed_up2 = False
+        self.context.data = readable.read()
+        return self.callback1
+
+    def callback1(self):
+        self.context.fixed_up1 = True
+        return self.callback2
+
+    def callback2(self):
+        self.context.fixed_up2 = True
+
+class SynchronizerWithBadCB(SynchronizerWithCB):
+
+    def callback2(self):
+        return self.callback1
+
+
+class TestCallback(TestCheckClass):
+    """
+    Test that synchronizer callbacks work
+    """
+
+    def test_callback(self):
+        # set up a synchronizer that provides callbacks
+        zope.component.provideUtility(
+            SynchronizerWithCB, interfaces.ISynchronizerFactory,
+            name = synchronizer.dottedname(ExampleFile))
+
+        # add a file that uses a cb synchronizer to the repo
+        self.example_file = self.child['file.txt'] = ExampleFile()
+        self.file_path = os.path.join(self.childdir, 'file.txt')
+        entry = self.getentry(self.file_path)
+        entry["path"] = "/parent/child/file.txt"
+        entry["factory"] = "fake factory name"
+
+        # make sure the before the commit the file is OK
+        self.assertEqual(self.example_file.fixed_up1, False)
+        self.assertEqual(self.example_file.fixed_up2, False)
+
+        # update the file
+        self.writefile('new data', self.file_path)
+
+        # commit the changes
+        committer = task.Commit(synchronizer.getSynchronizer,
+                                self.checker.repository)
+        committer.perform(self.parent, "", self.parentdir)
+
+        # make sure that after the commit the fix ups have been done
+        self.assertEqual(self.example_file.data, 'new data')
+        self.assertEqual(self.example_file.fixed_up1, True)
+        self.assertEqual(self.example_file.fixed_up2, True)
+
+    def test_infinite_loop(self):
+        # set up a synchronizer that provides bad callbacks
+        zope.component.provideUtility(
+            SynchronizerWithBadCB, interfaces.ISynchronizerFactory,
+            name = synchronizer.dottedname(ExampleFile))
+
+        # add a file that uses a cb synchronizer to the repo
+        self.example_file = self.child['file.txt'] = ExampleFile()
+        self.file_path = os.path.join(self.childdir, 'file.txt')
+        entry = self.getentry(self.file_path)
+        entry["path"] = "/parent/child/file.txt"
+        entry["factory"] = "fake factory name"
+
+        # update the file
+        self.writefile('new data', self.file_path)
+
+        # commit the changes
+        committer = task.Commit(synchronizer.getSynchronizer,
+                                self.checker.repository)
+
+        # a synchronization error is raised
+        self.assertRaises(task.SynchronizationError, committer.perform,
+                          self.parent, "", self.parentdir)
+
+
+
 def test_suite():
     s = unittest.TestSuite()
     s.addTest(unittest.makeSuite(TestTaskModule))
     s.addTest(unittest.makeSuite(TestCommitClass))
     s.addTest(unittest.makeSuite(TestCheckClass))
     s.addTest(unittest.makeSuite(TestCheckAndCommit))
+    s.addTest(unittest.makeSuite(TestCallback))
     return s
 
 def test_main():



More information about the Checkins mailing list