[Checkins] SVN: zc.FileStorage/branches/jim-dev/ All tests pass with new 3.9-compatible implementation.

Jim Fulton jim at zope.com
Thu Nov 5 14:08:07 EST 2009


Log message for revision 105501:
  All tests pass with new 3.9-compatible implementation.
  

Changed:
  U   zc.FileStorage/branches/jim-dev/buildout.cfg
  U   zc.FileStorage/branches/jim-dev/src/zc/FileStorage/__init__.py
  A   zc.FileStorage/branches/jim-dev/src/zc/FileStorage/blob_packing.txt
  U   zc.FileStorage/branches/jim-dev/src/zc/FileStorage/tests.py

-=-
Modified: zc.FileStorage/branches/jim-dev/buildout.cfg
===================================================================
--- zc.FileStorage/branches/jim-dev/buildout.cfg	2009-11-05 18:53:57 UTC (rev 105500)
+++ zc.FileStorage/branches/jim-dev/buildout.cfg	2009-11-05 19:08:06 UTC (rev 105501)
@@ -9,27 +9,13 @@
 
 [test]
 recipe = zc.recipe.testrunner
-eggs = ZODB3 <3.9dev
-       zc.FileStorage
+eggs = zc.FileStorage
 initialization =
   import os, tempfile, shutil
   if os.path.exists('tmp'): shutil.rmtree('tmp')
   os.mkdir('tmp')
   tempfile.tempdir = os.path.abspath('tmp')
 
-  import zc.FileStorage
-
-  import ZODB.tests.VersionStorage, ZODB.tests.TransactionalUndoVersionStorage
-  class C: pass
-  ZODB.tests.VersionStorage.VersionStorage = C
-  class C: pass
-  ZODB.tests.TransactionalUndoVersionStorage.TransactionalUndoVersionStorage = C
-  import ZODB.tests.testDB
-  del ZODB.tests.testDB.DBTests.test_removeVersionPool
-  del ZODB.tests.testDB.DBTests.test_removeVersionPool_while_connection_open
-  import ZODB.tests.testZODB
-  del ZODB.tests.testZODB.ZODBTests.checkVersionOnly
-
 # There's mo point in running the zeo tests, since zeo will run the
 # server in a separate process that won't see the zc.FileStorage
 # import.

Modified: zc.FileStorage/branches/jim-dev/src/zc/FileStorage/__init__.py
===================================================================
--- zc.FileStorage/branches/jim-dev/src/zc/FileStorage/__init__.py	2009-11-05 18:53:57 UTC (rev 105500)
+++ zc.FileStorage/branches/jim-dev/src/zc/FileStorage/__init__.py	2009-11-05 19:08:06 UTC (rev 105501)
@@ -27,10 +27,15 @@
 import ZODB.fsIndex
 import ZODB.TimeStamp
 
+def packer(storage, referencesf, stop, gc):
+    return FileStoragePacker(storage, stop).pack()
+
 class FileStoragePacker(FileStorageFormatter):
 
-    def __init__(self, path, stop, la, lr, cla, clr, current_size):
-        self._name = path
+    def __init__(self, storage, stop):
+        self.storage = storage
+        self._name = path = storage._file.name
+
         # We open our own handle on the storage so that much of pack can
         # proceed in parallel.  It's important to close this file at every
         # return point, else on Windows the caller won't be able to rename
@@ -39,16 +44,22 @@
 
         self._stop = stop
         self.locked = 0
-        self.file_end = current_size
 
         # The packer needs to acquire the parent's commit lock
         # during the copying stage, so the two sets of lock acquire
         # and release methods are passed to the constructor.
-        self._lock_acquire = la
-        self._lock_release = lr
-        self._commit_lock_acquire = cla
-        self._commit_lock_release = clr
+        self._lock_acquire = storage._lock_acquire
+        self._lock_release = storage._lock_release
+        self._commit_lock_acquire = storage._commit_lock_acquire
+        self._commit_lock_release = storage._commit_lock_release
 
+        self._lock_acquire()
+        try:
+            storage._file.seek(0, 2)
+            self.file_end = storage._file.tell()
+        finally:
+            self._lock_release()
+
         self.ltid = z64
 
     def pack(self):
@@ -59,6 +70,7 @@
             stop = self._stop,
             size = self.file_end,
             syspath = sys.path,
+            blob_dir = self.storage.blob_dir,
             ))
         for name in 'error', 'log':
             name = self._name+'.pack'+name
@@ -71,7 +83,6 @@
             close_fds=True,
             )
 
-
         proc.stdin.close()
         out = proc.stdout.read()
         if proc.wait():
@@ -91,7 +102,7 @@
         os.remove(packindex_path)
         os.remove(self._name+".packscript")
 
-        output = OptionalSeekFile(self._name + ".pack", "r+b")
+        output = open(self._name + ".pack", "r+b")
         output.seek(0, 2)
         assert output.tell() == opos
         self.copyRest(self.file_end, output, index)
@@ -100,17 +111,8 @@
         pos = output.tell()
         output.close()
 
-        # Grrrrr. The caller wants these attrs
-        self.index = index
-        self.vindex = {}
-        self.tindex = {}
-        self.tvindex = {}
-        self.oid2tid = {}
-        self.toid2tid = {}
-        self.toid2tid_delete = {}
+        return pos, index
 
-        return pos
-
     def copyRest(self, input_pos, output, index):
         # Copy data records written since packing started.
 
@@ -128,7 +130,7 @@
         # trailing 0 argument, and then on every platform except
         # native Windows it was observed that we could read stale
         # data from the tail end of the file.
-        self._file = OptionalSeekFile(self._name, "rb", 0)
+        self._file = open(self._name, "rb", 0)
         try:
             try:
                 while 1:
@@ -151,7 +153,7 @@
     def _copyNewTrans(self, input_pos, output, index,
                       acquire=None, release=None):
         tindex = {}
-        copier = PackCopier(output, index, {}, tindex, {})
+        copier = PackCopier(output, index, tindex)
         th = self._read_txn_header(input_pos)
         if release is not None:
             release()
@@ -174,10 +176,7 @@
                 if h.back:
                     prev_txn = self.getTxnFromData(h.oid, h.back)
 
-            if h.version:
-                self.fail(pos, "Versions are not supported.")
-
-            copier.copy(h.oid, h.tid, data, '', prev_txn,
+            copier.copy(h.oid, h.tid, data, prev_txn,
                         output_tpos, output.tell())
 
             input_pos += h.recordlen()
@@ -207,10 +206,6 @@
         data, tid = self._loadBackTxn(oid, back, 0)
         return data
 
-# sys.modules['ZODB.FileStorage.FileStorage'
-#             ].FileStoragePacker = FileStoragePacker
-# ZODB.FileStorage.FileStorage.supportsVersions = lambda self: False
-
 class PackCopier(ZODB.FileStorage.fspack.PackCopier):
 
     def _txn_find(self, tid, stop_at_pack):
@@ -247,7 +242,8 @@
 logging.getLogger().addHandler(handler)
 
 try:
-    packer = zc.FileStorage.PackProcess(%(path)r, %(stop)r, %(size)r)
+    packer = zc.FileStorage.PackProcess(%(path)r, %(stop)r, %(size)r,
+                                        %(blob_dir)r)
     packer.pack()
 except Exception, v:
     logging.exception('packing')
@@ -262,19 +258,21 @@
 
 class PackProcess(FileStoragePacker):
 
-    def __init__(self, path, stop, current_size):
+    def __init__(self, path, stop, current_size, blob_dir):
         self._name = path
         # We open our own handle on the storage so that much of pack can
         # proceed in parallel.  It's important to close this file at every
         # return point, else on Windows the caller won't be able to rename
         # or remove the storage file.
 
-        # We set the buffer quite high (32MB) to try to reduce seeks
-        # when the storage is disk is doing other io
+        if blob_dir:
+            self.pack_blobs = True
+            self.blob_removed = open(os.path.join(blob_dir, '.removed'), 'w')
+        else:
+            self.pack_blobs = False
 
+        self._file = open(path, "rb")
 
-        self._file = OptionalSeekFile(path, "rb")
-
         self._name = path
         self._stop = stop
         self.locked = 0
@@ -290,37 +288,19 @@
         self._freecache(pos)
         return FileStoragePacker._read_txn_header(self, pos, tid)
 
-    def _log_memory(self): # only on linux, oh well
-        status_path = "/proc/%s/status" % os.getpid()
-        if not os.path.exists(status_path):
-            return
-        try:
-            f = open(status_path)
-        except IOError:
-            return
-
-        for line in f:
-            for name in ('Peak', 'Size', 'RSS'):
-                if line.startswith('Vm'+name):
-                    logging.info(line.strip())
-
-
     def pack(self):
         packed, index, packpos = self.buildPackIndex(self._stop, self.file_end)
         logging.info('initial scan %s objects at %s', len(index), packpos)
-        self._log_memory()
         if packed:
             # nothing to do
             logging.info('done, nothing to do')
             self._file.close()
             return
 
-        self._log_memory()
         logging.info('copy to pack time')
-        output = OptionalSeekFile(self._name + ".pack", "w+b")
-        output._freecache = _freefunc(output)
+        output = open(self._name + ".pack", "w+b")
+        self._freecache = _freefunc(output)
         index, new_pos = self.copyToPacktime(packpos, index, output)
-        self._log_memory()
         if new_pos == packpos:
             # pack didn't free any data.  there's no point in continuing.
             self._file.close()
@@ -331,7 +311,6 @@
 
         logging.info('copy from pack time')
         self.copyFromPacktime(packpos, self.file_end, output, index)
-        self._log_memory()
 
         # Save the index so the parent process can use it as a starting point.
         f = open(self._name + ".packindex", 'wb')
@@ -363,9 +342,12 @@
             while pos < end:
                 dh = self._read_data_header(pos)
                 self.checkData(th, tpos, dh, pos)
-                if dh.version:
-                    self.fail(pos, "Versions are not supported")
-                index[dh.oid] = pos
+                if dh.plen or dh.back:
+                    index[dh.oid] = pos
+                else:
+                    # deleted
+                    if dh.oid in index:
+                        del index[dh.oid]
                 pos += dh.recordlen()
 
             tlen = self._read_num(pos)
@@ -382,6 +364,8 @@
         self._file.seek(0)
         output.write(self._file.read(self._metadata_size))
         new_index = ZODB.fsIndex.fsIndex()
+        pack_blobs = self.pack_blobs
+        is_blob_record = ZODB.blob.is_blob_record
 
         while pos < packpos:
             th = self._read_txn_header(pos)
@@ -392,6 +376,35 @@
                 h = self._read_data_header(pos)
                 if index.get(h.oid) != pos:
                     pos += h.recordlen()
+                    if pack_blobs:
+                        if h.plen:
+                            data = self._file.read(h.plen)
+                        else:
+                            data = self.fetchDataViaBackpointer(h.oid, h.back)
+                        if data and is_blob_record(data):
+                            # We need to remove the blob record. Maybe we
+                            # need to remove oid.
+
+                            # But first, we need to make sure the
+                            # record we're looking at isn't a dup of
+                            # the current record. There's a bug in ZEO
+                            # blob support that causes duplicate data
+                            # records.
+                            rpos = index.get(h.oid)
+                            is_dup = (rpos and
+                                      self._read_data_header(rpos).tid == h.tid)
+                            if not is_dup:
+                                # Note that we delete the revision.
+                                # If rpos was None, then we could
+                                # remove the oid.  What if somehow,
+                                # another blob update happened after
+                                # the deletion. This shouldn't happen,
+                                # but we can leave it to the cleanup
+                                # code to take care of removing the
+                                # directory for us.
+                                self.blob_removed.write(
+                                    (h.oid+h.tid).encode('hex')+'\n')
+
                     continue
 
                 pos += h.recordlen()
@@ -437,17 +450,29 @@
                     output.write(tlen)
                     output.seek(new_pos)
 
-                output._freecache(new_pos)
+                self._freecache(new_pos)
 
 
             pos += 8
 
         return new_index, new_pos
 
+    def fetchDataViaBackpointer(self, oid, back):
+        """Return the data for oid via backpointer back
+
+        If `back` is 0 or ultimately resolves to 0, return None.
+        In this case, the transaction undoes the object
+        creation.
+        """
+        if back == 0:
+            return None
+        data, tid = self._loadBackTxn(oid, back, 0)
+        return data
+
     def copyFromPacktime(self, input_pos, file_end, output, index):
         while input_pos < file_end:
             input_pos = self._copyNewTrans(input_pos, output, index)
-            output._freecache(output.tell())
+            self._freecache(output.tell())
         return input_pos
 
 

Added: zc.FileStorage/branches/jim-dev/src/zc/FileStorage/blob_packing.txt
===================================================================
--- zc.FileStorage/branches/jim-dev/src/zc/FileStorage/blob_packing.txt	                        (rev 0)
+++ zc.FileStorage/branches/jim-dev/src/zc/FileStorage/blob_packing.txt	2009-11-05 19:08:06 UTC (rev 105501)
@@ -0,0 +1,144 @@
+Packing support for blob data
+=============================
+
+XXX Gaaa. This is a copy because the original assumed the storage
+packed with GC.  zc.FileStorage only works with external gc. :/
+
+
+Set up:
+
+    >>> from ZODB.serialize import referencesf
+    >>> from ZODB.blob import Blob
+    >>> from ZODB import utils
+    >>> from ZODB.DB import DB
+    >>> import transaction
+
+A helper method to assure a unique timestamp across multiple platforms:
+
+    >>> from ZODB.tests.testblob import new_time
+
+UNDOING
+=======
+
+We need a database with an undoing blob supporting storage:
+
+    >>> import ZODB.FileStorage, zc.FileStorage
+    >>> blob_storage = ZODB.FileStorage.FileStorage(
+    ...     'data.fs', blob_dir='data.blobs',
+    ...     packer=zc.FileStorage.packer)
+    >>> database = DB(blob_storage)
+
+Create our root object:
+
+    >>> connection1 = database.open()
+    >>> root = connection1.root()
+
+Put some revisions of a blob object in our database and on the filesystem:
+
+    >>> import os
+    >>> tids = []
+    >>> times = []
+    >>> nothing = transaction.begin()
+    >>> times.append(new_time())
+    >>> blob = Blob()
+    >>> blob.open('w').write('this is blob data 0')
+    >>> root['blob'] = blob
+    >>> transaction.commit()
+    >>> tids.append(blob._p_serial)
+
+    >>> nothing = transaction.begin()
+    >>> times.append(new_time())
+    >>> root['blob'].open('w').write('this is blob data 1')
+    >>> transaction.commit()
+    >>> tids.append(blob._p_serial)
+
+    >>> nothing = transaction.begin()
+    >>> times.append(new_time())
+    >>> root['blob'].open('w').write('this is blob data 2')
+    >>> transaction.commit()
+    >>> tids.append(blob._p_serial)
+
+    >>> nothing = transaction.begin()
+    >>> times.append(new_time())
+    >>> root['blob'].open('w').write('this is blob data 3')
+    >>> transaction.commit()
+    >>> tids.append(blob._p_serial)
+
+    >>> nothing = transaction.begin()
+    >>> times.append(new_time())
+    >>> root['blob'].open('w').write('this is blob data 4')
+    >>> transaction.commit()
+    >>> tids.append(blob._p_serial)
+
+    >>> oid = root['blob']._p_oid
+    >>> fns = [ blob_storage.fshelper.getBlobFilename(oid, x) for x in tids ]
+    >>> [ os.path.exists(x) for x in fns ]
+    [True, True, True, True, True]
+
+Do a pack to the slightly before the first revision was written:
+
+    >>> packtime = times[0]
+    >>> blob_storage.pack(packtime, referencesf)
+    >>> [ os.path.exists(x) for x in fns ]
+    [True, True, True, True, True]
+
+Do a pack to the slightly before the second revision was written:
+
+    >>> packtime = times[1]
+    >>> blob_storage.pack(packtime, referencesf)
+    >>> [ os.path.exists(x) for x in fns ]
+    [True, True, True, True, True]
+
+Do a pack to the slightly before the third revision was written:
+
+    >>> packtime = times[2]
+    >>> blob_storage.pack(packtime, referencesf)
+    >>> [ os.path.exists(x) for x in fns ]
+    [False, True, True, True, True]
+
+Do a pack to the slightly before the fourth revision was written:
+
+    >>> packtime = times[3]
+    >>> blob_storage.pack(packtime, referencesf)
+    >>> [ os.path.exists(x) for x in fns ]
+    [False, False, True, True, True]
+
+Do a pack to the slightly before the fifth revision was written:
+
+    >>> packtime = times[4]
+    >>> blob_storage.pack(packtime, referencesf)
+    >>> [ os.path.exists(x) for x in fns ]
+    [False, False, False, True, True]
+
+Do a pack to now:
+
+    >>> packtime = new_time()
+    >>> blob_storage.pack(packtime, referencesf)
+    >>> [ os.path.exists(x) for x in fns ]
+    [False, False, False, False, True]
+
+Delete the object and do a pack, it should get rid of the most current
+revision as well as the entire directory:
+
+
+    >>> t = transaction.begin()
+    >>> oid, serial = root['blob']._p_oid, root['blob']._p_serial
+    >>> del root['blob']
+    >>> transaction.commit()
+
+    >>> t = transaction.begin()
+    >>> blob_storage.tpc_begin(t)
+    >>> blob_storage.deleteObject(oid, serial, t)
+    >>> blob_storage.tpc_vote(t)
+    >>> blob_storage.tpc_finish(t)
+
+    >>> packtime = new_time()
+    >>> blob_storage.pack(packtime, referencesf)
+    >>> [ os.path.exists(x) for x in fns ]
+    [False, False, False, False, False]
+    >>> os.path.exists(os.path.split(fns[0])[0])
+    False
+
+Clean up our blob directory and database:
+
+    >>> blob_storage.close()


Property changes on: zc.FileStorage/branches/jim-dev/src/zc/FileStorage/blob_packing.txt
___________________________________________________________________
Added: svn:eol-style
   + native

Modified: zc.FileStorage/branches/jim-dev/src/zc/FileStorage/tests.py
===================================================================
--- zc.FileStorage/branches/jim-dev/src/zc/FileStorage/tests.py	2009-11-05 18:53:57 UTC (rev 105500)
+++ zc.FileStorage/branches/jim-dev/src/zc/FileStorage/tests.py	2009-11-05 19:08:06 UTC (rev 105501)
@@ -19,28 +19,32 @@
 # tests affected by the lack of gc in pack.
 ##############################################################################
 
-
-import os
+import pickle
 import unittest
-from zope.testing import doctest
+import zc.FileStorage
+import ZODB.blob
+import ZODB.tests.testblob
 
 from ZODB.tests.testFileStorage import * # :-P
 from ZODB.tests.PackableStorage import * # :-P
 from ZODB.tests.TransactionalUndoStorage import * # :-P
 
-class NoGCFileStorageTests(FileStorageTests):
+from zope.testing import doctest, setupstack
 
+class ZCFileStorageTests(FileStorageTests):
+
+    blob_dir = None
+
     def setUp(self):
-        self.open(create=1)
-        self.__gcpath = os.path.abspath('FileStorageTests.fs.packnogc')
-        open(self.__gcpath, 'w')
+        self.open(create=1, packer=zc.FileStorage.packer,
+                  blob_dir=self.blob_dir)
 
     def tearDown(self):
         self._storage.close()
         self._storage.cleanup()
-        os.remove(self.__gcpath)
+        if self.blob_dir:
+            ZODB.blob.remove_committed_dir(self.blob_dir)
 
-
     def checkPackAllRevisions(self):
         self._initroot()
         eq = self.assertEqual
@@ -99,49 +103,22 @@
         # The undo log contains only the most resent transaction
         self.assertEqual(3, len(self._storage.undoLog()))
 
-
-    def checkTransactionalUndoAfterPack(self):
-        eq = self.assertEqual
-        # Add a few object revisions
-        oid = self._storage.new_oid()
-        revid1 = self._dostore(oid, data=MinPO(51))
-        snooze()
-        packtime = time.time()
-        snooze()                # time.time() now distinct from packtime
-        revid2 = self._dostore(oid, revid=revid1, data=MinPO(52))
-        self._dostore(oid, revid=revid2, data=MinPO(53))
-        # Now get the undo log
-        info = self._storage.undoInfo()
-        eq(len(info), 3)
-        tid = info[0]['id']
-        # Now pack just the initial revision of the object.  We need the
-        # second revision otherwise we won't be able to undo the third
-        # revision!
-        self._storage.pack(packtime, referencesf)
-        # Make some basic assertions about the undo information now
-        info2 = self._storage.undoInfo()
-        eq(len(info2), 3)
-        # And now attempt to undo the last transaction
-        t = Transaction()
-        self._storage.tpc_begin(t)
-        tid, oids = self._storage.undo(tid, t)
-        self._storage.tpc_vote(t)
-        self._storage.tpc_finish(t)
-        eq(len(oids), 1)
-        eq(oids[0], oid)
-        data, revid = self._storage.load(oid, '')
-        # The object must now be at the second state
-        eq(zodb_unpickle(data), MinPO(52))
-        self._iterate()
-
-    
     def checkPackWithGCOnDestinationAfterRestore(self):
         pass
 
     def checkPackWithMultiDatabaseReferences(self):
         pass
 
+class ZCFileStorageTestsWithBlobs(ZCFileStorageTests):
+
+    blob_dir = 'blobs'
+
 def test_suite():
     suite = unittest.TestSuite()
-    suite.addTest(unittest.makeSuite(NoGCFileStorageTests, "check"))
+    suite.addTest(unittest.makeSuite(ZCFileStorageTests, "check"))
+    suite.addTest(unittest.makeSuite(ZCFileStorageTestsWithBlobs, "check"))
+    suite.addTest(doctest.DocFileSuite(
+        'blob_packing.txt',
+        setUp=setupstack.setUpDirectory, tearDown=setupstack.tearDown,
+        ))
     return suite



More information about the checkins mailing list