[Checkins] SVN: ZODB/branches/jim-dev/src/ZODB/ checkpoint

Jim Fulton jim at zope.com
Sun Dec 14 16:59:49 EST 2008


Log message for revision 94066:
  checkpoint

Changed:
  U   ZODB/branches/jim-dev/src/ZODB/FileStorage/FileStorage.py
  U   ZODB/branches/jim-dev/src/ZODB/FileStorage/fspack.py
  U   ZODB/branches/jim-dev/src/ZODB/blob.py

-=-
Modified: ZODB/branches/jim-dev/src/ZODB/FileStorage/FileStorage.py
===================================================================
--- ZODB/branches/jim-dev/src/ZODB/FileStorage/FileStorage.py	2008-12-14 19:05:36 UTC (rev 94065)
+++ ZODB/branches/jim-dev/src/ZODB/FileStorage/FileStorage.py	2008-12-14 21:59:49 UTC (rev 94066)
@@ -210,8 +210,13 @@
                                         ZODB.interfaces.IBlobStorageRestoreable)
         else:
             self._blob_init_no_blobs()
-        
 
+    def copyTransactionsFrom(self, other):
+        if self.blob_dir:
+            return ZODB.blob.BlobStorageMixin.copyTransactionsFrom(self, other)
+        else:
+            return BaseStorage.BaseStorage.copyTransactionsFrom(self, other)
+
     def _initIndex(self, index, tindex):
         self._index=index
         self._tindex=tindex
@@ -1016,15 +1021,7 @@
         # Our default packer is built around the original packer.  We
         # simply adapt the old interface to the new.  We don't really
         # want to invest much in the old packer, at least for now.
-        p = FileStoragePacker(
-            storage._file.name,
-            stop,
-            storage._lock_acquire,
-            storage._lock_release,
-            storage._commit_lock_acquire,
-            storage._commit_lock_release,
-            storage.getSize(),
-            gc)
+        p = FileStoragePacker(storage, referencesf, stop, gc)
         opos = p.pack()
         if opos is None:
             return None
@@ -1078,13 +1075,21 @@
                 try:
                     if os.path.exists(oldpath):
                         os.remove(oldpath)
+                    if self.blob_dir and os.path.exists(self.blob_dir + ".old"):
+                        ZODB.blob.remove_committed_dir(self.blob_dir + ".old")
+                        
                     os.rename(self._file_name, oldpath)
+                    if self.blob_dir:
+                        os.rename(self.blob_dir, self.blob_dir + ".old")
                 except Exception:
                     self._file = open(self._file_name, 'r+b')
                     raise
 
                 # OK, we're beyond the point of no return
                 os.rename(self._file_name + '.pack', self._file_name)
+                if self.blob_dir:
+                    os.rename(self.blob_dir + '.pack', self.blob_dir)
+                    
                 self._file = open(self._file_name, 'r+b')
                 self._initIndex(index, self._tindex)
                 self._pos = opos

Modified: ZODB/branches/jim-dev/src/ZODB/FileStorage/fspack.py
===================================================================
--- ZODB/branches/jim-dev/src/ZODB/FileStorage/fspack.py	2008-12-14 19:05:36 UTC (rev 94065)
+++ ZODB/branches/jim-dev/src/ZODB/FileStorage/fspack.py	2008-12-14 21:59:49 UTC (rev 94066)
@@ -24,16 +24,15 @@
 a backpointer after that time.
 """
 
-import os
-
-from ZODB.serialize import referencesf
+from ZODB.FileStorage.format import DataHeader, TRANS_HDR_LEN
+from ZODB.FileStorage.format import FileStorageFormatter, CorruptedDataError
 from ZODB.utils import p64, u64, z64
 
-from ZODB.fsIndex import fsIndex
-from ZODB.FileStorage.format import FileStorageFormatter, CorruptedDataError
-from ZODB.FileStorage.format import DataHeader, TRANS_HDR_LEN
+import logging
+import os
+import ZODB.blob
+import ZODB.fsIndex
 import ZODB.POSException
-import logging
 
 logger = logging.getLogger(__name__)
 
@@ -147,7 +146,7 @@
 
 class GC(FileStorageFormatter):
 
-    def __init__(self, file, eof, packtime, gc):
+    def __init__(self, file, eof, packtime, gc, referencesf):
         self._file = file
         self._name = file.name
         self.eof = eof
@@ -155,8 +154,10 @@
         self.gc = gc
         # packpos: position of first txn header after pack time
         self.packpos = None
-        self.oid2curpos = fsIndex() # maps oid to current data record position
 
+        # {oid -> current data record position}:
+        self.oid2curpos = ZODB.fsIndex.fsIndex()
+
         # The set of reachable revisions of each object.
         #
         # This set as managed using two data structures.  The first is
@@ -166,12 +167,14 @@
         # second is a dictionary mapping objects to lists of
         # positions; it is used to handle the same number of objects
         # for which we must keep multiple revisions.
-        self.reachable = fsIndex()
+        self.reachable = ZODB.fsIndex.fsIndex()
         self.reach_ex = {}
 
         # keep ltid for consistency checks during initial scan
         self.ltid = z64
 
+        self.referencesf = referencesf
+
     def isReachable(self, oid, pos):
         """Return 1 if revision of `oid` at `pos` is reachable."""
 
@@ -318,7 +321,7 @@
         while dh.back:
             dh = self._read_data_header(dh.back)
         if dh.plen:
-            return referencesf(self._file.read(dh.plen))
+            return self.referencesf(self._file.read(dh.plen))
         else:
             return []
 
@@ -331,7 +334,17 @@
     # current_size is the storage's _pos.  All valid data at the start
     # lives before that offset (there may be a checkpoint transaction in
     # progress after it).
-    def __init__(self, path, stop, la, lr, cla, clr, current_size, gc=True):
+    def __init__(self, storage, referencesf, stop, gc=True):
+        self._storage = storage
+        if storage.blob_dir:
+            self.blob_dir = storage.blob_dir+'.pack'
+            self.fshelper = ZODB.blob.FilesystemHelper(
+                self.blob_dir, storage.fshelper.layout_name)
+            self.fshelper.create()
+        else:
+            self.blob_dir = None
+            
+        path = storage._file.name
         self._name = path
         # We open our own handle on the storage so that much of pack can
         # proceed in parallel.  It's important to close this file at every
@@ -341,24 +354,24 @@
         self._path = path
         self._stop = stop
         self.locked = False
-        self.file_end = current_size
+        self.file_end = storage.getSize()
 
-        self.gc = GC(self._file, self.file_end, self._stop, gc)
+        self.gc = GC(self._file, self.file_end, self._stop, gc, referencesf)
 
         # The packer needs to acquire the parent's commit lock
         # during the copying stage, so the two sets of lock acquire
         # and release methods are passed to the constructor.
-        self._lock_acquire = la
-        self._lock_release = lr
-        self._commit_lock_acquire = cla
-        self._commit_lock_release = clr
+        self._lock_acquire = storage._lock_acquire
+        self._lock_release = storage._lock_release
+        self._commit_lock_acquire = storage._commit_lock_acquire
+        self._commit_lock_release = storage._commit_lock_release
 
         # The packer will use several indexes.
         # index: oid -> pos
         # tindex: oid -> pos, for current txn
         # oid2tid: not used by the packer
 
-        self.index = fsIndex()
+        self.index = ZODB.fsIndex.fsIndex()
         self.tindex = {}
         self.oid2tid = {}
         self.toid2tid = {}
@@ -464,18 +477,6 @@
 
         return pos, new_pos
 
-    def fetchBackpointer(self, oid, back):
-        """Return data and refs backpointer `back` to object `oid.
-
-        If `back` is 0 or ultimately resolves to 0, return None
-        and None.  In this case, the transaction undoes the object
-        creation.
-        """
-        if back == 0:
-            return None
-        data, tid = self._loadBackTxn(oid, back, 0)
-        return data
-
     def copyDataRecords(self, pos, th):
         """Copy any current data records between pos and tend.
 
@@ -517,8 +518,32 @@
             self.writePackedDataRecord(h, data, new_tpos)
             new_pos = self._tfile.tell()
 
+            if ZODB.blob.is_blob_record(data):
+                self.copyBlob(h.oid, h.tid)
+
         return new_tpos, pos
 
+    def fetchBackpointer(self, oid, back):
+        """Return data and refs backpointer `back` to object `oid.
+
+        If `back` is 0 or ultimately resolves to 0, return None
+        and None.  In this case, the transaction undoes the object
+        creation.
+        """
+        if back == 0:
+            return None
+        data, tid = self._loadBackTxn(oid, back, 0)
+        return data
+
+    def copyBlob(self, oid, tid):
+        if not self.blob_dir:
+            return
+        self.fshelper.createPathForOID(oid)
+        ZODB.blob.link_or_copy(
+            self._storage.fshelper.getBlobFilename(oid, tid),
+            self.fshelper.getBlobFilename(oid, tid),
+            )
+
     def writePackedDataRecord(self, h, data, new_tpos):
         # Update the header to reflect current information, then write
         # it to the output file.
@@ -581,6 +606,9 @@
             self._copier.copy(h.oid, h.tid, data, prev_txn,
                               pos, self._tfile.tell())
 
+            if ZODB.blob.is_blob_record(data):
+                self.copyBlob(h.oid, h.tid)
+
         tlen = self._tfile.tell() - pos
         assert tlen == th.tlen
         self._tfile.write(p64(tlen))

Modified: ZODB/branches/jim-dev/src/ZODB/blob.py
===================================================================
--- ZODB/branches/jim-dev/src/ZODB/blob.py	2008-12-14 19:05:36 UTC (rev 94065)
+++ ZODB/branches/jim-dev/src/ZODB/blob.py	2008-12-14 21:59:49 UTC (rev 94066)
@@ -583,9 +583,6 @@
 class BlobStorageMixin(object):
     """A mix-in to help storages support blobssupport blobs."""
 
-    zope.interface.implements(ZODB.interfaces.IBlobStorage)
-
-    @non_overridable
     def _blob_init(self, blob_dir, layout='automatic'):
         # XXX Log warning if storage is ClientStorage
         self.fshelper = FilesystemHelper(blob_dir, layout)
@@ -597,7 +594,6 @@
         self.fshelper = NoBlobsFileSystemHelper()
         self.dirty_oids = []
 
-    @non_overridable
     def _blob_tpc_abort(self):
         """Blob cleanup to be called from subclass tpc_abort
         """
@@ -607,13 +603,11 @@
             if os.path.exists(clean):
                 remove_committed(clean)
 
-    @non_overridable
     def _blob_tpc_finish(self):
         """Blob cleanup to be called from subclass tpc_finish
         """
         self.dirty_oids = []
 
-    @non_overridable
     def copyTransactionsFrom(self, other):
         for trans in other.iterator():
             self.tpc_begin(trans, trans.tid, trans.status)
@@ -638,7 +632,6 @@
             self.tpc_vote(trans)
             self.tpc_finish(trans)
 
-    @non_overridable
     def loadBlob(self, oid, serial):
         """Return the filename where the blob file can be found.
         """
@@ -647,7 +640,6 @@
             raise POSKeyError("No blob file", oid, serial)
         return filename
 
-    @non_overridable
     def openCommittedBlobFile(self, oid, serial, blob=None):
         blob_filename = self.loadBlob(oid, serial)
         if blob is None:
@@ -655,7 +647,6 @@
         else:
             return BlobFile(blob_filename, 'r', blob)
 
-    @non_overridable
     def restoreBlob(self, oid, serial, data, blobfilename, prev_txn,
                     transaction):
         """Write blob data already committed in a separate database
@@ -665,7 +656,6 @@
 
         return self._tid
 
-    @non_overridable
     def _blob_storeblob(self, oid, serial, blobfilename):
         self._lock_acquire()
         try:
@@ -679,7 +669,6 @@
         finally:
             self._lock_release()
             
-    @non_overridable
     def storeBlob(self, oid, oldserial, data, blobfilename, version,
                   transaction):
         """Stores data that has a BLOB attached."""
@@ -689,12 +678,11 @@
 
         return self._tid
 
-    @non_overridable
     def temporaryDirectory(self):
         return self.fshelper.temp_dir
 
 
-class BlobStorage(SpecificationDecoratorBase, BlobStorageMixin):
+class BlobStorage(SpecificationDecoratorBase):
     """A storage to support blobs."""
 
     zope.interface.implements(ZODB.interfaces.IBlobStorage)
@@ -704,6 +692,7 @@
     __slots__ = ('fshelper', 'dirty_oids', '_BlobStorage__supportsUndo',
                  '_blobs_pack_is_in_progress', )
 
+
     def __new__(self, base_directory, storage, layout='automatic'):
         return SpecificationDecoratorBase.__new__(self, storage)
 
@@ -870,6 +859,12 @@
         return undo_serial, keys
 
 
+for name, v in BlobStorageMixin.__dict__.items():
+    if isinstance(v, type(BlobStorageMixin.__dict__['storeBlob'])):
+        assert name not in BlobStorage.__dict__
+        setattr(BlobStorage, name, non_overridable(v))
+del name, v
+
 copied = logging.getLogger('ZODB.blob.copied').debug
 def rename_or_copy_blob(f1, f2, chmod=True):
     """Try to rename f1 to f2, fallback to copy.
@@ -908,9 +903,12 @@
                 filename = os.path.join(dirpath, filename)
                 remove_committed(filename)
         shutil.rmtree(path)
+
+    link_or_copy = shutil.copy
 else:
     remove_committed = os.remove
     remove_committed_dir = shutil.rmtree
+    link_or_copy = os.link
 
 
 def is_blob_record(record):



More information about the Checkins mailing list