[Checkins] SVN: zc.FileStorage/branches/jim-dev/ All tests pass with new 3.9-compatible implementation.
Jim Fulton
jim at zope.com
Thu Nov 5 14:08:07 EST 2009
Log message for revision 105501:
All tests pass with new 3.9-compatible implementation.
Changed:
U zc.FileStorage/branches/jim-dev/buildout.cfg
U zc.FileStorage/branches/jim-dev/src/zc/FileStorage/__init__.py
A zc.FileStorage/branches/jim-dev/src/zc/FileStorage/blob_packing.txt
U zc.FileStorage/branches/jim-dev/src/zc/FileStorage/tests.py
-=-
Modified: zc.FileStorage/branches/jim-dev/buildout.cfg
===================================================================
--- zc.FileStorage/branches/jim-dev/buildout.cfg 2009-11-05 18:53:57 UTC (rev 105500)
+++ zc.FileStorage/branches/jim-dev/buildout.cfg 2009-11-05 19:08:06 UTC (rev 105501)
@@ -9,27 +9,13 @@
[test]
recipe = zc.recipe.testrunner
-eggs = ZODB3 <3.9dev
- zc.FileStorage
+eggs = zc.FileStorage
initialization =
import os, tempfile, shutil
if os.path.exists('tmp'): shutil.rmtree('tmp')
os.mkdir('tmp')
tempfile.tempdir = os.path.abspath('tmp')
- import zc.FileStorage
-
- import ZODB.tests.VersionStorage, ZODB.tests.TransactionalUndoVersionStorage
- class C: pass
- ZODB.tests.VersionStorage.VersionStorage = C
- class C: pass
- ZODB.tests.TransactionalUndoVersionStorage.TransactionalUndoVersionStorage = C
- import ZODB.tests.testDB
- del ZODB.tests.testDB.DBTests.test_removeVersionPool
- del ZODB.tests.testDB.DBTests.test_removeVersionPool_while_connection_open
- import ZODB.tests.testZODB
- del ZODB.tests.testZODB.ZODBTests.checkVersionOnly
-
# There's mo point in running the zeo tests, since zeo will run the
# server in a separate process that won't see the zc.FileStorage
# import.
Modified: zc.FileStorage/branches/jim-dev/src/zc/FileStorage/__init__.py
===================================================================
--- zc.FileStorage/branches/jim-dev/src/zc/FileStorage/__init__.py 2009-11-05 18:53:57 UTC (rev 105500)
+++ zc.FileStorage/branches/jim-dev/src/zc/FileStorage/__init__.py 2009-11-05 19:08:06 UTC (rev 105501)
@@ -27,10 +27,15 @@
import ZODB.fsIndex
import ZODB.TimeStamp
+def packer(storage, referencesf, stop, gc):
+ return FileStoragePacker(storage, stop).pack()
+
class FileStoragePacker(FileStorageFormatter):
- def __init__(self, path, stop, la, lr, cla, clr, current_size):
- self._name = path
+ def __init__(self, storage, stop):
+ self.storage = storage
+ self._name = path = storage._file.name
+
# We open our own handle on the storage so that much of pack can
# proceed in parallel. It's important to close this file at every
# return point, else on Windows the caller won't be able to rename
@@ -39,16 +44,22 @@
self._stop = stop
self.locked = 0
- self.file_end = current_size
# The packer needs to acquire the parent's commit lock
# during the copying stage, so the two sets of lock acquire
# and release methods are passed to the constructor.
- self._lock_acquire = la
- self._lock_release = lr
- self._commit_lock_acquire = cla
- self._commit_lock_release = clr
+ self._lock_acquire = storage._lock_acquire
+ self._lock_release = storage._lock_release
+ self._commit_lock_acquire = storage._commit_lock_acquire
+ self._commit_lock_release = storage._commit_lock_release
+ self._lock_acquire()
+ try:
+ storage._file.seek(0, 2)
+ self.file_end = storage._file.tell()
+ finally:
+ self._lock_release()
+
self.ltid = z64
def pack(self):
@@ -59,6 +70,7 @@
stop = self._stop,
size = self.file_end,
syspath = sys.path,
+ blob_dir = self.storage.blob_dir,
))
for name in 'error', 'log':
name = self._name+'.pack'+name
@@ -71,7 +83,6 @@
close_fds=True,
)
-
proc.stdin.close()
out = proc.stdout.read()
if proc.wait():
@@ -91,7 +102,7 @@
os.remove(packindex_path)
os.remove(self._name+".packscript")
- output = OptionalSeekFile(self._name + ".pack", "r+b")
+ output = open(self._name + ".pack", "r+b")
output.seek(0, 2)
assert output.tell() == opos
self.copyRest(self.file_end, output, index)
@@ -100,17 +111,8 @@
pos = output.tell()
output.close()
- # Grrrrr. The caller wants these attrs
- self.index = index
- self.vindex = {}
- self.tindex = {}
- self.tvindex = {}
- self.oid2tid = {}
- self.toid2tid = {}
- self.toid2tid_delete = {}
+ return pos, index
- return pos
-
def copyRest(self, input_pos, output, index):
# Copy data records written since packing started.
@@ -128,7 +130,7 @@
# trailing 0 argument, and then on every platform except
# native Windows it was observed that we could read stale
# data from the tail end of the file.
- self._file = OptionalSeekFile(self._name, "rb", 0)
+ self._file = open(self._name, "rb", 0)
try:
try:
while 1:
@@ -151,7 +153,7 @@
def _copyNewTrans(self, input_pos, output, index,
acquire=None, release=None):
tindex = {}
- copier = PackCopier(output, index, {}, tindex, {})
+ copier = PackCopier(output, index, tindex)
th = self._read_txn_header(input_pos)
if release is not None:
release()
@@ -174,10 +176,7 @@
if h.back:
prev_txn = self.getTxnFromData(h.oid, h.back)
- if h.version:
- self.fail(pos, "Versions are not supported.")
-
- copier.copy(h.oid, h.tid, data, '', prev_txn,
+ copier.copy(h.oid, h.tid, data, prev_txn,
output_tpos, output.tell())
input_pos += h.recordlen()
@@ -207,10 +206,6 @@
data, tid = self._loadBackTxn(oid, back, 0)
return data
-# sys.modules['ZODB.FileStorage.FileStorage'
-# ].FileStoragePacker = FileStoragePacker
-# ZODB.FileStorage.FileStorage.supportsVersions = lambda self: False
-
class PackCopier(ZODB.FileStorage.fspack.PackCopier):
def _txn_find(self, tid, stop_at_pack):
@@ -247,7 +242,8 @@
logging.getLogger().addHandler(handler)
try:
- packer = zc.FileStorage.PackProcess(%(path)r, %(stop)r, %(size)r)
+ packer = zc.FileStorage.PackProcess(%(path)r, %(stop)r, %(size)r,
+ %(blob_dir)r)
packer.pack()
except Exception, v:
logging.exception('packing')
@@ -262,19 +258,21 @@
class PackProcess(FileStoragePacker):
- def __init__(self, path, stop, current_size):
+ def __init__(self, path, stop, current_size, blob_dir):
self._name = path
# We open our own handle on the storage so that much of pack can
# proceed in parallel. It's important to close this file at every
# return point, else on Windows the caller won't be able to rename
# or remove the storage file.
- # We set the buffer quite high (32MB) to try to reduce seeks
- # when the storage is disk is doing other io
+ if blob_dir:
+ self.pack_blobs = True
+ self.blob_removed = open(os.path.join(blob_dir, '.removed'), 'w')
+ else:
+ self.pack_blobs = False
+ self._file = open(path, "rb")
- self._file = OptionalSeekFile(path, "rb")
-
self._name = path
self._stop = stop
self.locked = 0
@@ -290,37 +288,19 @@
self._freecache(pos)
return FileStoragePacker._read_txn_header(self, pos, tid)
- def _log_memory(self): # only on linux, oh well
- status_path = "/proc/%s/status" % os.getpid()
- if not os.path.exists(status_path):
- return
- try:
- f = open(status_path)
- except IOError:
- return
-
- for line in f:
- for name in ('Peak', 'Size', 'RSS'):
- if line.startswith('Vm'+name):
- logging.info(line.strip())
-
-
def pack(self):
packed, index, packpos = self.buildPackIndex(self._stop, self.file_end)
logging.info('initial scan %s objects at %s', len(index), packpos)
- self._log_memory()
if packed:
# nothing to do
logging.info('done, nothing to do')
self._file.close()
return
- self._log_memory()
logging.info('copy to pack time')
- output = OptionalSeekFile(self._name + ".pack", "w+b")
- output._freecache = _freefunc(output)
+ output = open(self._name + ".pack", "w+b")
+ self._freecache = _freefunc(output)
index, new_pos = self.copyToPacktime(packpos, index, output)
- self._log_memory()
if new_pos == packpos:
# pack didn't free any data. there's no point in continuing.
self._file.close()
@@ -331,7 +311,6 @@
logging.info('copy from pack time')
self.copyFromPacktime(packpos, self.file_end, output, index)
- self._log_memory()
# Save the index so the parent process can use it as a starting point.
f = open(self._name + ".packindex", 'wb')
@@ -363,9 +342,12 @@
while pos < end:
dh = self._read_data_header(pos)
self.checkData(th, tpos, dh, pos)
- if dh.version:
- self.fail(pos, "Versions are not supported")
- index[dh.oid] = pos
+ if dh.plen or dh.back:
+ index[dh.oid] = pos
+ else:
+ # deleted
+ if dh.oid in index:
+ del index[dh.oid]
pos += dh.recordlen()
tlen = self._read_num(pos)
@@ -382,6 +364,8 @@
self._file.seek(0)
output.write(self._file.read(self._metadata_size))
new_index = ZODB.fsIndex.fsIndex()
+ pack_blobs = self.pack_blobs
+ is_blob_record = ZODB.blob.is_blob_record
while pos < packpos:
th = self._read_txn_header(pos)
@@ -392,6 +376,35 @@
h = self._read_data_header(pos)
if index.get(h.oid) != pos:
pos += h.recordlen()
+ if pack_blobs:
+ if h.plen:
+ data = self._file.read(h.plen)
+ else:
+ data = self.fetchDataViaBackpointer(h.oid, h.back)
+ if data and is_blob_record(data):
+ # We need to remove the blob record. Maybe we
+ # need to remove oid.
+
+ # But first, we need to make sure the
+ # record we're looking at isn't a dup of
+ # the current record. There's a bug in ZEO
+ # blob support that causes duplicate data
+ # records.
+ rpos = index.get(h.oid)
+ is_dup = (rpos and
+ self._read_data_header(rpos).tid == h.tid)
+ if not is_dup:
+ # Note that we delete the revision.
+ # If rpos was None, then we could
+ # remove the oid. What if somehow,
+ # another blob update happened after
+ # the deletion. This shouldn't happen,
+ # but we can leave it to the cleanup
+ # code to take care of removing the
+ # directory for us.
+ self.blob_removed.write(
+ (h.oid+h.tid).encode('hex')+'\n')
+
continue
pos += h.recordlen()
@@ -437,17 +450,29 @@
output.write(tlen)
output.seek(new_pos)
- output._freecache(new_pos)
+ self._freecache(new_pos)
pos += 8
return new_index, new_pos
+ def fetchDataViaBackpointer(self, oid, back):
+ """Return the data for oid via backpointer back
+
+ If `back` is 0 or ultimately resolves to 0, return None.
+ In this case, the transaction undoes the object
+ creation.
+ """
+ if back == 0:
+ return None
+ data, tid = self._loadBackTxn(oid, back, 0)
+ return data
+
def copyFromPacktime(self, input_pos, file_end, output, index):
while input_pos < file_end:
input_pos = self._copyNewTrans(input_pos, output, index)
- output._freecache(output.tell())
+ self._freecache(output.tell())
return input_pos
Added: zc.FileStorage/branches/jim-dev/src/zc/FileStorage/blob_packing.txt
===================================================================
--- zc.FileStorage/branches/jim-dev/src/zc/FileStorage/blob_packing.txt (rev 0)
+++ zc.FileStorage/branches/jim-dev/src/zc/FileStorage/blob_packing.txt 2009-11-05 19:08:06 UTC (rev 105501)
@@ -0,0 +1,144 @@
+Packing support for blob data
+=============================
+
+XXX Gaaa. This is a copy because the original assumed the storage
+packed with GC. zc.FileStorage only works with external gc. :/
+
+
+Set up:
+
+ >>> from ZODB.serialize import referencesf
+ >>> from ZODB.blob import Blob
+ >>> from ZODB import utils
+ >>> from ZODB.DB import DB
+ >>> import transaction
+
+A helper method to assure a unique timestamp across multiple platforms:
+
+ >>> from ZODB.tests.testblob import new_time
+
+UNDOING
+=======
+
+We need a database with an undoing blob supporting storage:
+
+ >>> import ZODB.FileStorage, zc.FileStorage
+ >>> blob_storage = ZODB.FileStorage.FileStorage(
+ ... 'data.fs', blob_dir='data.blobs',
+ ... packer=zc.FileStorage.packer)
+ >>> database = DB(blob_storage)
+
+Create our root object:
+
+ >>> connection1 = database.open()
+ >>> root = connection1.root()
+
+Put some revisions of a blob object in our database and on the filesystem:
+
+ >>> import os
+ >>> tids = []
+ >>> times = []
+ >>> nothing = transaction.begin()
+ >>> times.append(new_time())
+ >>> blob = Blob()
+ >>> blob.open('w').write('this is blob data 0')
+ >>> root['blob'] = blob
+ >>> transaction.commit()
+ >>> tids.append(blob._p_serial)
+
+ >>> nothing = transaction.begin()
+ >>> times.append(new_time())
+ >>> root['blob'].open('w').write('this is blob data 1')
+ >>> transaction.commit()
+ >>> tids.append(blob._p_serial)
+
+ >>> nothing = transaction.begin()
+ >>> times.append(new_time())
+ >>> root['blob'].open('w').write('this is blob data 2')
+ >>> transaction.commit()
+ >>> tids.append(blob._p_serial)
+
+ >>> nothing = transaction.begin()
+ >>> times.append(new_time())
+ >>> root['blob'].open('w').write('this is blob data 3')
+ >>> transaction.commit()
+ >>> tids.append(blob._p_serial)
+
+ >>> nothing = transaction.begin()
+ >>> times.append(new_time())
+ >>> root['blob'].open('w').write('this is blob data 4')
+ >>> transaction.commit()
+ >>> tids.append(blob._p_serial)
+
+ >>> oid = root['blob']._p_oid
+ >>> fns = [ blob_storage.fshelper.getBlobFilename(oid, x) for x in tids ]
+ >>> [ os.path.exists(x) for x in fns ]
+ [True, True, True, True, True]
+
+Do a pack to the slightly before the first revision was written:
+
+ >>> packtime = times[0]
+ >>> blob_storage.pack(packtime, referencesf)
+ >>> [ os.path.exists(x) for x in fns ]
+ [True, True, True, True, True]
+
+Do a pack to the slightly before the second revision was written:
+
+ >>> packtime = times[1]
+ >>> blob_storage.pack(packtime, referencesf)
+ >>> [ os.path.exists(x) for x in fns ]
+ [True, True, True, True, True]
+
+Do a pack to the slightly before the third revision was written:
+
+ >>> packtime = times[2]
+ >>> blob_storage.pack(packtime, referencesf)
+ >>> [ os.path.exists(x) for x in fns ]
+ [False, True, True, True, True]
+
+Do a pack to the slightly before the fourth revision was written:
+
+ >>> packtime = times[3]
+ >>> blob_storage.pack(packtime, referencesf)
+ >>> [ os.path.exists(x) for x in fns ]
+ [False, False, True, True, True]
+
+Do a pack to the slightly before the fifth revision was written:
+
+ >>> packtime = times[4]
+ >>> blob_storage.pack(packtime, referencesf)
+ >>> [ os.path.exists(x) for x in fns ]
+ [False, False, False, True, True]
+
+Do a pack to now:
+
+ >>> packtime = new_time()
+ >>> blob_storage.pack(packtime, referencesf)
+ >>> [ os.path.exists(x) for x in fns ]
+ [False, False, False, False, True]
+
+Delete the object and do a pack, it should get rid of the most current
+revision as well as the entire directory:
+
+
+ >>> t = transaction.begin()
+ >>> oid, serial = root['blob']._p_oid, root['blob']._p_serial
+ >>> del root['blob']
+ >>> transaction.commit()
+
+ >>> t = transaction.begin()
+ >>> blob_storage.tpc_begin(t)
+ >>> blob_storage.deleteObject(oid, serial, t)
+ >>> blob_storage.tpc_vote(t)
+ >>> blob_storage.tpc_finish(t)
+
+ >>> packtime = new_time()
+ >>> blob_storage.pack(packtime, referencesf)
+ >>> [ os.path.exists(x) for x in fns ]
+ [False, False, False, False, False]
+ >>> os.path.exists(os.path.split(fns[0])[0])
+ False
+
+Clean up our blob directory and database:
+
+ >>> blob_storage.close()
Property changes on: zc.FileStorage/branches/jim-dev/src/zc/FileStorage/blob_packing.txt
___________________________________________________________________
Added: svn:eol-style
+ native
Modified: zc.FileStorage/branches/jim-dev/src/zc/FileStorage/tests.py
===================================================================
--- zc.FileStorage/branches/jim-dev/src/zc/FileStorage/tests.py 2009-11-05 18:53:57 UTC (rev 105500)
+++ zc.FileStorage/branches/jim-dev/src/zc/FileStorage/tests.py 2009-11-05 19:08:06 UTC (rev 105501)
@@ -19,28 +19,32 @@
# tests affected by the lack of gc in pack.
##############################################################################
-
-import os
+import pickle
import unittest
-from zope.testing import doctest
+import zc.FileStorage
+import ZODB.blob
+import ZODB.tests.testblob
from ZODB.tests.testFileStorage import * # :-P
from ZODB.tests.PackableStorage import * # :-P
from ZODB.tests.TransactionalUndoStorage import * # :-P
-class NoGCFileStorageTests(FileStorageTests):
+from zope.testing import doctest, setupstack
+class ZCFileStorageTests(FileStorageTests):
+
+ blob_dir = None
+
def setUp(self):
- self.open(create=1)
- self.__gcpath = os.path.abspath('FileStorageTests.fs.packnogc')
- open(self.__gcpath, 'w')
+ self.open(create=1, packer=zc.FileStorage.packer,
+ blob_dir=self.blob_dir)
def tearDown(self):
self._storage.close()
self._storage.cleanup()
- os.remove(self.__gcpath)
+ if self.blob_dir:
+ ZODB.blob.remove_committed_dir(self.blob_dir)
-
def checkPackAllRevisions(self):
self._initroot()
eq = self.assertEqual
@@ -99,49 +103,22 @@
# The undo log contains only the most resent transaction
self.assertEqual(3, len(self._storage.undoLog()))
-
- def checkTransactionalUndoAfterPack(self):
- eq = self.assertEqual
- # Add a few object revisions
- oid = self._storage.new_oid()
- revid1 = self._dostore(oid, data=MinPO(51))
- snooze()
- packtime = time.time()
- snooze() # time.time() now distinct from packtime
- revid2 = self._dostore(oid, revid=revid1, data=MinPO(52))
- self._dostore(oid, revid=revid2, data=MinPO(53))
- # Now get the undo log
- info = self._storage.undoInfo()
- eq(len(info), 3)
- tid = info[0]['id']
- # Now pack just the initial revision of the object. We need the
- # second revision otherwise we won't be able to undo the third
- # revision!
- self._storage.pack(packtime, referencesf)
- # Make some basic assertions about the undo information now
- info2 = self._storage.undoInfo()
- eq(len(info2), 3)
- # And now attempt to undo the last transaction
- t = Transaction()
- self._storage.tpc_begin(t)
- tid, oids = self._storage.undo(tid, t)
- self._storage.tpc_vote(t)
- self._storage.tpc_finish(t)
- eq(len(oids), 1)
- eq(oids[0], oid)
- data, revid = self._storage.load(oid, '')
- # The object must now be at the second state
- eq(zodb_unpickle(data), MinPO(52))
- self._iterate()
-
-
def checkPackWithGCOnDestinationAfterRestore(self):
pass
def checkPackWithMultiDatabaseReferences(self):
pass
+class ZCFileStorageTestsWithBlobs(ZCFileStorageTests):
+
+ blob_dir = 'blobs'
+
def test_suite():
suite = unittest.TestSuite()
- suite.addTest(unittest.makeSuite(NoGCFileStorageTests, "check"))
+ suite.addTest(unittest.makeSuite(ZCFileStorageTests, "check"))
+ suite.addTest(unittest.makeSuite(ZCFileStorageTestsWithBlobs, "check"))
+ suite.addTest(doctest.DocFileSuite(
+ 'blob_packing.txt',
+ setUp=setupstack.setUpDirectory, tearDown=setupstack.tearDown,
+ ))
return suite
More information about the checkins
mailing list