[Zope-CVS] CVS: Products/AdaptableStorage/gateway_fs - FSConnection.py:1.2 FSDirectoryItems.py:1.2 TransactionalWrites.py:NONE

Shane Hathaway shane@zope.com
Tue, 3 Dec 2002 18:11:23 -0500


Update of /cvs-repository/Products/AdaptableStorage/gateway_fs
In directory cvs.zope.org:/tmp/cvs-serv22143/gateway_fs

Modified Files:
	FSConnection.py FSDirectoryItems.py 
Removed Files:
	TransactionalWrites.py 
Log Message:
Running AdaptableStorage with the latest Zope revealed some flaws.
Fixed them all.

- Consistent ordering of transaction participants now makes it impossible to
  add a jar to the transaction after the commit() method has begun.
  AdaptableStorage (and perhaps other projects like ZPatterns) relied on
  the ability to add a jar after commit has started.  This could lead to
  a deadlock.  Reworked ASStorage, FSConnection, and the tests to deal with
  this.

- Serials are now required to be hashable.  This makes serials, used to
  prevent conflicts, simpler and more robust.

- DBTab needs some kind of class it can call directly, so I added
  the small subclasses FSStorage and FSDatabase to Zope2FS.

- Restored the PersistentExtra patch.

- The directory items gateway wants to write data about its children, but
  sometimes its children aren't being written at the same time.  Added
  a "conditional" optional flag to FSConnection.writeSection(), allowing
  data to be written only if other data gets written.


=== Products/AdaptableStorage/gateway_fs/FSConnection.py 1.1 => 1.2 ===
--- Products/AdaptableStorage/gateway_fs/FSConnection.py:1.1	Wed Nov 27 13:37:05 2002
+++ Products/AdaptableStorage/gateway_fs/FSConnection.py	Tue Dec  3 18:10:52 2002
@@ -23,7 +23,6 @@
 
 from interfaces.public import IFSConnection
 from exceptions import FSWriteError
-from TransactionalWrites import getTransactionalWrites
 
 
 # Try to decipher this one ;-)
@@ -52,9 +51,15 @@
 
     def __init__(self, basepath):
         self.basepath = basepath
+        self._final = 0
+        # _pending holds the data to be written.
+        # _pending: { subpath string -> { section_name -> data } }
+        self._pending = {}
+        # _conditional holds data that will be written only if the
+        #  rest of the data for a subpath is in _pending.
+        # _conditional: { subpath string -> { section_name -> data } }
+        self._conditional = {}
 
-    def sortKey(self):
-        return self.basepath
 
     def expandPath(self, subpath, check_exists=0):
         if self.basepath:
@@ -65,7 +70,7 @@
             # unchanged.
             path = subpath
         if check_exists:
-            assert os.path.exists(path), path
+            assert os.path.exists(path), '%s does not exist' % path
         return path
 
 
@@ -78,7 +83,7 @@
         assert section_name != DATA_SECTION
 
 
-    def _write(self, subpath, section_name, data):
+    def _write(self, subpath, section_name, data, conditional=0):
         # XXX We should be checking for '..'
         path = self.expandPath(subpath)
         # Do some early checking.
@@ -87,12 +92,12 @@
             if not v:
                 raise FSWriteError(
                     "Can't get write access to %s" % subpath)
-        getTransactionalWrites(self).record(subpath, section_name, data)
+        self.record(subpath, section_name, data, conditional)
 
 
-    def writeSection(self, subpath, section_name, data):
+    def writeSection(self, subpath, section_name, data, conditional=0):
         self.checkSectionName(section_name)
-        self._write(subpath, section_name, data)
+        self._write(subpath, section_name, data, conditional)
 
 
     def writeNodeType(self, subpath, data):
@@ -103,8 +108,7 @@
             if (want_dir != (not not os.path.isdir(path))):
                 raise FSWriteError(
                     "Can't mix file and directory at %s" % subpath)
-        getTransactionalWrites(self).record(
-            subpath, NODE_TYPE_SECTION, data)
+        self.record(subpath, NODE_TYPE_SECTION, data, 0)
 
 
     def writeData(self, subpath, data):
@@ -221,9 +225,9 @@
             props_f.close()
 
 
-    def removeUnlinkedItems(self, path, items):
+    def removeUnlinkedItems(self, path, names):
         linked = {}
-        for name in items:
+        for name in names:
             linked[name] = 1
         existing = os.listdir(path)
         for fn in existing:
@@ -242,8 +246,10 @@
         non_containers = {}
         for subpath, sections in items:
             # type must be provided and must always be either 'd' or 'f'.
-            if not sections.has_key(NODE_TYPE_SECTION):
-                raise FSWriteError('node type not specified for %s' % subpath)
+            if (not sections.has_key(NODE_TYPE_SECTION)
+                or not sections.has_key(DATA_SECTION)):
+                raise FSWriteError(
+                    'Data or node type not specified for %s' % subpath)
             t = sections[NODE_TYPE_SECTION]
             if t not in 'df':
                 raise FSWriteError(
@@ -254,8 +260,77 @@
                     "Not a directory: %s" % dir)
             if t == 'f':
                 non_containers[subpath] = 1
+                if not isinstance(sections[DATA_SECTION], StringType):
+                    raise FSWriteError(
+                        'Data for a file must be a string at %s'
+                        % subpath)
             else:
                 if isinstance(sections[DATA_SECTION], StringType):
                     raise FSWriteError(
                         'Data for a directory must be a list or tuple at %s'
                         % subpath)
+
+    #
+    # ITPCConnection implementation
+    #
+
+    def sortKey(self):
+        return self.basepath
+
+    def getName(self):
+        return self.basepath
+
+    def record(self, subpath, section_name, data, conditional=0):
+        """Records data to be written at commit time"""
+        if conditional:
+            m = self._conditional
+        else:
+            m = self._pending
+        sections = m.get(subpath)
+        if sections is None:
+            sections = {}
+            m[subpath] = sections
+        if sections.has_key(section_name):
+            if sections[section_name] != data:
+                raise FSWriteError(
+                    'Conflicting data storage at %s (%s)' %
+                    (subpath, section_name))
+        else:
+            sections[section_name] = data
+
+    def begin(self):
+        pass
+
+    def vote(self):
+        """Do some early verification
+
+        This is done while the transaction can still be vetoed safely.
+        """
+        for subpath, sections in self._conditional.items():
+            if self._pending.get(subpath):
+                # Add the conditional data.
+                for section_name, data in sections.items():
+                    self.record(subpath, section_name, data)
+        self._conditional.clear()
+        items = self._pending.items()
+        items.sort()  # Ensure that base directories come first.
+        self.beforeWrite(items)
+        self._final = 1
+
+    def abort(self):
+        self._final = 0
+        self._pending.clear()
+        self._conditional.clear()
+
+    def finish(self):
+        if self._final:
+            self._final = 0
+            try:
+                items = self._pending.items()
+                items.sort()  # Ensure that base directories come first.
+                for subpath, sections in items:
+                    self.writeFinal(subpath, sections)
+            finally:
+                self._pending.clear()
+                self._conditional.clear()
+


=== Products/AdaptableStorage/gateway_fs/FSDirectoryItems.py 1.1 => 1.2 ===
--- Products/AdaptableStorage/gateway_fs/FSDirectoryItems.py:1.1	Wed Nov 27 13:37:05 2002
+++ Products/AdaptableStorage/gateway_fs/FSDirectoryItems.py	Tue Dec  3 18:10:52 2002
@@ -34,6 +34,7 @@
     def getSchema(self):
         return self.schema
 
+
     def load(self, object_mapper, key):
         c = self.fs_conn
         assert c.readNodeType(key) == 'd'
@@ -59,10 +60,10 @@
             res.append((name, classification))
             items = classification.items()
             items.sort()
-            serial.append((name, items))
+            serial.append((name, tuple(items)))
         res.sort()
         serial.sort()
-        return tuple(res), serial
+        return tuple(res), tuple(serial)
 
 
     def store(self, object_mapper, key, state):
@@ -75,14 +76,14 @@
             subkey = '%s/%s' % (key, name)
             items = classification.items()
             items.sort()
-            serial.append((name, items))
+            serial.append((name, tuple(items)))
             text = []
             for k, v in items:
                 text.append('%s=%s' % (k, v))
             text = '\n'.join(text)
-            c.writeSection(subkey, 'classification', text)
+            c.writeSection(subkey, 'classification', text, 1)
         names.sort()
         serial.sort()
         c.writeData(key, names)
-        return serial
+        return tuple(serial)
 

=== Removed File Products/AdaptableStorage/gateway_fs/TransactionalWrites.py ===