[Checkins] SVN: relstorage/trunk/ Added blob support.
Shane Hathaway
shane at hathawaymix.org
Sun Jul 12 03:49:01 EDT 2009
Log message for revision 101824:
Added blob support.
Changed:
U relstorage/trunk/CHANGES.txt
U relstorage/trunk/relstorage/adapters/common.py
U relstorage/trunk/relstorage/relstorage.py
A relstorage/trunk/relstorage/tests/README.txt
U relstorage/trunk/relstorage/tests/testmysql.py
U relstorage/trunk/relstorage/tests/testpostgresql.py
-=-
Modified: relstorage/trunk/CHANGES.txt
===================================================================
--- relstorage/trunk/CHANGES.txt 2009-07-12 07:42:07 UTC (rev 101823)
+++ relstorage/trunk/CHANGES.txt 2009-07-12 07:49:01 UTC (rev 101824)
@@ -1,4 +1,14 @@
+Next Feature Release
+--------------------
+
+- Added support for a blob directory. No BlobStorage wrapper is needed.
+ Cluster nodes will need to use a shared filesystem such as NFS or
+ SMB/CIFS.
+
+- [TODO: add the blob_dir parameter to component.xml and README]
+
+
Version 1.2.0b2 (2009-05-05)
----------------------------
Modified: relstorage/trunk/relstorage/adapters/common.py
===================================================================
--- relstorage/trunk/relstorage/adapters/common.py 2009-07-12 07:42:07 UTC (rev 101823)
+++ relstorage/trunk/relstorage/adapters/common.py 2009-07-12 07:49:01 UTC (rev 101824)
@@ -354,7 +354,8 @@
Parameters: "undo_tid", the integer tid of the transaction to undo,
and "self_tid", the integer tid of the current transaction.
- Returns the list of OIDs undone.
+ Returns the states copied forward by the undo operation as a
+ list of (oid, old_tid).
"""
stmt = self._scripts['create_temp_undo']
if stmt:
@@ -387,7 +388,7 @@
WHERE zoid IN (SELECT zoid FROM temp_undo)
AND tid = %(self_tid)s;
- -- Add new undo records.
+ -- Copy old states forward.
INSERT INTO object_state (zoid, tid, prev_tid, md5, state)
SELECT temp_undo.zoid, %(self_tid)s, current_object.tid,
prev.md5, prev.state
@@ -397,12 +398,12 @@
ON (prev.zoid = temp_undo.zoid
AND prev.tid = temp_undo.prev_tid);
- -- List the changed OIDs.
- SELECT zoid FROM temp_undo
+ -- List the copied states.
+ SELECT zoid, prev_tid FROM temp_undo
"""
self._run_script(cursor, stmt,
{'undo_tid': undo_tid, 'self_tid': self_tid})
- res = [oid for (oid,) in cursor]
+ res = list(cursor)
stmt = self._scripts['reset_temp_undo']
if stmt:
@@ -747,7 +748,7 @@
pass
- def pack(self, pack_tid, options, sleep=time.sleep):
+ def pack(self, pack_tid, options, sleep=time.sleep, packed_func=None):
"""Pack. Requires the information provided by pre_pack."""
# Read committed mode is sufficient.
@@ -779,31 +780,26 @@
# Pack in small batches of transactions in order to minimize
# the interruption of concurrent write operations.
start = time.time()
+ packed_list = []
self._hold_commit_lock(cursor)
for tid, packed, has_removable in tid_rows:
self._pack_transaction(
- cursor, pack_tid, tid, packed, has_removable)
+ cursor, pack_tid, tid, packed, has_removable,
+ packed_list)
if time.time() >= start + options.pack_batch_timeout:
- # commit the work done so far and release the
- # commit lock for a short time
conn.commit()
+ if packed_func is not None:
+ for oid, tid in packed_list:
+ packed_func(oid, tid)
+ del packed_list[:]
self._release_commit_lock(cursor)
- # Add a delay based on the configured duty cycle.
- elapsed = time.time() - start
- if elapsed == 0.0:
- # Compensate for low timer resolution by
- # assuming that at least 10 ms elapsed.
- elapsed = 0.01
- duty_cycle = options.pack_duty_cycle
- if duty_cycle > 0.0 and duty_cycle < 1.0:
- delay = min(options.pack_max_delay,
- elapsed * (1.0 / duty_cycle - 1.0))
- if delay > 0:
- log.debug('pack: sleeping %.4g second(s)',
- delay)
- sleep(delay)
+ self._pause_pack(sleep, options, start)
self._hold_commit_lock(cursor)
start = time.time()
+ if packed_func is not None:
+ for oid, tid in packed_list:
+ packed_func(oid, tid)
+ packed_list = None
self._pack_cleanup(conn, cursor)
@@ -819,9 +815,23 @@
finally:
self.close(conn, cursor)
+ def _pause_pack(self, sleep, options, start):
+ """Pause packing to allow concurrent commits."""
+ elapsed = time.time() - start
+ if elapsed == 0.0:
+ # Compensate for low timer resolution by
+ # assuming that at least 10 ms elapsed.
+ elapsed = 0.01
+ duty_cycle = options.pack_duty_cycle
+ if duty_cycle > 0.0 and duty_cycle < 1.0:
+ delay = min(options.pack_max_delay,
+ elapsed * (1.0 / duty_cycle - 1.0))
+ if delay > 0:
+ log.debug('pack: sleeping %.4g second(s)', delay)
+ sleep(delay)
def _pack_transaction(self, cursor, pack_tid, tid, packed,
- has_removable):
+ has_removable, packed_list):
"""Pack one transaction. Requires populated pack tables."""
log.debug("pack: transaction %d: packing", tid)
removed_objects = 0
@@ -845,6 +855,15 @@
self._run_script_stmt(cursor, stmt,
{'pack_tid': pack_tid, 'tid': tid})
+ stmt = """
+ SELECT pack_state.zoid
+ FROM pack_state
+ WHERE pack_state.tid = %(tid)s
+ """
+ self._run_script_stmt(cursor, stmt, {'tid': tid})
+ for (oid,) in cursor:
+ packed_list.append((oid, tid))
+
# Find out whether the transaction is empty
stmt = self._scripts['transaction_has_data']
self._run_script_stmt(cursor, stmt, {'tid': tid})
Modified: relstorage/trunk/relstorage/relstorage.py
===================================================================
--- relstorage/trunk/relstorage/relstorage.py 2009-07-12 07:42:07 UTC (rev 101823)
+++ relstorage/trunk/relstorage/relstorage.py 2009-07-12 07:49:01 UTC (rev 101824)
@@ -16,18 +16,26 @@
Stores pickles in the database.
"""
+from persistent.TimeStamp import TimeStamp
+from ZODB.BaseStorage import BaseStorage
+from ZODB.BaseStorage import DataRecord
+from ZODB.BaseStorage import TransactionRecord
+from ZODB import ConflictResolution
+from ZODB import POSException
+from ZODB.POSException import POSKeyError
+from ZODB.utils import p64
+from ZODB.utils import u64
+from zope.interface import implements
+from zope.interface import Interface
import base64
import cPickle
import logging
import os
+import sys
+import tempfile
import time
import weakref
-from ZODB.utils import p64, u64
-from ZODB.BaseStorage import BaseStorage, TransactionRecord, DataRecord
-from ZODB import ConflictResolution, POSException
-from persistent.TimeStamp import TimeStamp
-from zope.interface import Interface
-from zope.interface import implements
+import ZODB.interfaces
try:
from ZODB.interfaces import StorageStopIteration
@@ -37,12 +45,18 @@
backwards-compatible exception.
"""
-try:
- from ZODB.interfaces import IMVCCStorage
-except ImportError:
- class IMVCCStorage(Interface):
- """Stub for versions of ZODB that do not define IMVCCStorage.
- """
+_relstorage_interfaces = []
+for name in (
+ 'IStorage',
+ 'IMVCCStorage',
+ 'IStorageRestoreable',
+ 'IStorageIteration',
+ 'IStorageUndoable',
+ 'IBlobStorage',
+ 'IBlobStorageRestoreable',
+ ):
+ if hasattr(ZODB.interfaces, name):
+ _relstorage_interfaces.append(getattr(ZODB.interfaces, name))
try:
from hashlib import md5
@@ -63,10 +77,10 @@
class RelStorage(BaseStorage,
ConflictResolution.ConflictResolvingStorage):
"""Storage to a relational database, based on invalidation polling"""
- implements(IMVCCStorage)
+ implements(*_relstorage_interfaces)
def __init__(self, adapter, name=None, create=True,
- read_only=False, options=None, **kwoptions):
+ read_only=False, options=None, blob_dir=None, **kwoptions):
if name is None:
name = 'RelStorage on %s' % adapter.__class__.__name__
@@ -145,7 +159,20 @@
# _poll_at is the time to poll regardless of commit_count
self._poll_at = 0
+ # _txn_blobs: {oid->filename}; contains blob data for the
+ # currently uncommitted transaction.
+ self._txn_blobs = None
+ self._blob_dir = blob_dir
+ if blob_dir:
+ from ZODB.blob import FilesystemHelper
+ self.fshelper = FilesystemHelper(blob_dir)
+ if create:
+ self.fshelper.create()
+ self.fshelper.checkSecure()
+ else:
+ self.fshelper = None
+
def _open_load_connection(self):
"""Open the load connection to the database. Return nothing."""
conn, cursor = self._adapter.open_for_load()
@@ -262,7 +289,7 @@
"""
other = RelStorage(adapter=self._adapter, name=self._name,
create=False, read_only=self._is_read_only,
- options=self._options)
+ options=self._options, blob_dir=self._blob_dir)
self._instances.append(weakref.ref(other))
return other
@@ -337,7 +364,7 @@
cache.set(cachekey, tid_int)
if tid_int is None:
self._log_keyerror(oid_int, "no tid found(1)")
- raise KeyError(oid)
+ raise POSKeyError(oid)
# get state from the cache or the database
cachekey = 'state:%d:%d' % (oid_int, tid_int)
@@ -358,11 +385,11 @@
# This can happen if something attempts to load
# an object whose creation has been undone.
self._log_keyerror(oid_int, "creation has been undone")
- raise KeyError(oid)
+ raise POSKeyError(oid)
return state, p64(tid_int)
else:
self._log_keyerror(oid_int, "no tid found(2)")
- raise KeyError(oid)
+ raise POSKeyError(oid)
def loadEx(self, oid, version):
# Since we don't support versions, just tack the empty version
@@ -402,7 +429,7 @@
cache.set(cachekey, state)
return state
else:
- raise KeyError(oid)
+ raise POSKeyError(oid)
def loadBefore(self, oid, tid):
"""Return the most recent revision of oid before tid committed."""
@@ -419,7 +446,7 @@
self._restart_load()
cursor = self._load_cursor
if not self._adapter.exists(cursor, u64(oid)):
- raise KeyError(oid)
+ raise POSKeyError(oid)
state, start_tid = self._adapter.load_before(
cursor, oid_int, u64(tid))
@@ -663,6 +690,15 @@
self._adapter.update_current(cursor, tid_int)
self._prepared_txn = self._adapter.commit_phase1(cursor, tid_int)
+ if self._txn_blobs:
+ # We now have a transaction ID, so rename all the blobs
+ # accordingly.
+ for oid, sourcename in self._txn_blobs.items():
+ targetname = self.fshelper.getBlobFilename(oid, self._tid)
+ if sourcename != targetname:
+ ZODB.blob.rename_or_copy_blob(sourcename, targetname)
+ self._txn_blobs[oid] = targetname
+
return serials
@@ -701,6 +737,7 @@
# initial commit_count. Increment commit_count so that it
# doesn't matter who won.
cache.incr('commit_count')
+ self._txn_blobs = None
self._prepared_txn = None
self._ltid = self._tid
self._tid = None
@@ -710,6 +747,16 @@
self._rollback_load_connection()
if self._store_cursor is not None:
self._adapter.abort(self._store_cursor, self._prepared_txn)
+ if self._txn_blobs:
+ try:
+ for oid, filename in self._txn_blobs.iteritems():
+ if os.path.exists(filename):
+ ZODB.blob.remove_committed(filename)
+ dirname = os.path.dirname(filename)
+ if not os.listdir(dirname):
+ ZODB.blob.remove_committed_dir(dirname)
+ finally:
+ self._txn_blobs = None
self._prepared_txn = None
self._tid = None
@@ -784,7 +831,7 @@
try:
rows = self._adapter.iter_object_history(cursor, oid_int)
except KeyError:
- raise KeyError(oid)
+ raise POSKeyError(oid)
res = []
for tid_int, username, description, extension, length in rows:
@@ -842,21 +889,45 @@
adapter.verify_undoable(cursor, undo_tid_int)
self_tid_int = u64(self._tid)
- oid_ints = adapter.undo(cursor, undo_tid_int, self_tid_int)
- oids = [p64(oid_int) for oid_int in oid_ints]
+ copied = adapter.undo(cursor, undo_tid_int, self_tid_int)
+ oids = [p64(oid_int) for oid_int, _ in copied]
# Update the current object pointers immediately, so that
# subsequent undo operations within this transaction will see
# the new current objects.
adapter.update_current(cursor, self_tid_int)
+ if self.fshelper is not None:
+ self._copy_undone_blobs(copied)
+
return self._tid, oids
finally:
adapter.release_pack_lock(cursor)
finally:
self._lock_release()
+ def _copy_undone_blobs(self, copied):
+ """After an undo operation, copies the matching blobs forward.
+ The copied parameter is a list of (integer oid, integer tid).
+ """
+ for oid_int, old_tid_int in copied:
+ oid = p64(oid_int)
+ old_tid = p64(old_tid_int)
+ orig_fn = self.fshelper.getBlobFilename(oid, old_tid)
+ if not os.path.exists(orig_fn):
+ # not a blob
+ continue
+
+ new_fn = self.fshelper.getBlobFilename(oid, self._tid)
+ orig = open(orig_fn, 'r')
+ new = open(new_fn, 'wb')
+ ZODB.utils.cp(orig, new)
+ orig.close()
+ new.close()
+
+ self._add_blob_to_transaction(oid, new_fn)
+
def pack(self, t, referencesf, sleep=time.sleep):
if self._is_read_only:
raise POSException.ReadOnlyError()
@@ -904,21 +975,40 @@
log.info("pack: dry run complete")
else:
# Now pack.
- adapter.pack(tid_int, self._options, sleep=sleep)
+ if self.fshelper is not None:
+ packed_func = self._after_pack
+ else:
+ packed_func = None
+ adapter.pack(tid_int, self._options, sleep=sleep,
+ packed_func=packed_func)
finally:
adapter.release_pack_lock(lock_cursor)
finally:
lock_conn.rollback()
adapter.close(lock_conn, lock_cursor)
+ def _after_pack(self, oid_int, tid_int):
+ """Called after an object state has been removed by packing.
+ Removes the corresponding blob file.
+ """
+ oid = p64(oid_int)
+ tid = p64(tid_int)
+ fn = self.fshelper.getBlobFilename(oid, tid)
+ if os.path.exists(fn):
+ ZODB.blob.remove_committed(fn)
+ dirname = os.path.dirname(fn)
+ if not os.listdir(dirname):
+ ZODB.blob.remove_committed_dir(dirname)
+
def iterator(self, start=None, stop=None):
return TransactionIterator(self._adapter, start, stop)
def sync(self, force=True):
"""Updates to a current view of the database.
- This is implemented by rolling back the transaction.
+ This is implemented by rolling back the relational database
+ transaction.
If force is False and a poll interval has been set, this call
is ignored. The poll_invalidations method will later choose to
@@ -1004,6 +1094,125 @@
finally:
self._lock_release()
+ def loadBlob(self, oid, serial):
+ """Return the filename of the Blob data for this OID and serial.
+
+ Returns a filename.
+
+ Raises POSKeyError if the blobfile cannot be found.
+ """
+ if self.fshelper is None:
+ raise POSException.Unsupported("No blob directory is configured.")
+
+ blob_filename = self.fshelper.getBlobFilename(oid, serial)
+ if os.path.exists(blob_filename):
+ return blob_filename
+ else:
+ raise POSKeyError("No blob file", oid, serial)
+
+ def openCommittedBlobFile(self, oid, serial, blob=None):
+ """Return a file for committed data for the given object id and serial
+
+ If a blob is provided, then a BlobFile object is returned,
+ otherwise, an ordinary file is returned. In either case, the
+ file is opened for binary reading.
+
+ This method is used to allow storages that cache blob data to
+ make sure that data are available at least long enough for the
+ file to be opened.
+ """
+ blob_filename = self.loadBlob(oid, serial)
+ if blob is None:
+ return open(blob_filename, 'rb')
+ else:
+ return ZODB.blob.BlobFile(blob_filename, 'r', blob)
+
+ def temporaryDirectory(self):
+ """Return a directory that should be used for uncommitted blob data.
+
+ If Blobs use this, then commits can be performed with a simple rename.
+ """
+ return self.fshelper.temp_dir
+
+ def storeBlob(self, oid, oldserial, data, blobfilename, version, txn):
+ """Stores data that has a BLOB attached.
+
+ The blobfilename argument names a file containing blob data.
+ The storage will take ownership of the file and will rename it
+ (or copy and remove it) immediately, or at transaction-commit
+ time. The file must not be open.
+
+ The new serial is returned.
+ """
+ assert not version
+ self.store(oid, oldserial, data, '', txn)
+ self._store_blob_data(oid, oldserial, blobfilename)
+ return None
+
+ def restoreBlob(self, oid, serial, data, blobfilename, prev_txn, txn):
+ """Write blob data already committed in a separate database
+
+ See the restore and storeBlob methods.
+ """
+ self.restore(oid, serial, data, '', prev_txn, txn)
+ self._lock_acquire()
+ try:
+ self.fshelper.getPathForOID(oid, create=True)
+ targetname = self.fshelper.getBlobFilename(oid, serial)
+ ZODB.blob.rename_or_copy_blob(blobfilename, targetname)
+ finally:
+ self._lock_release()
+
+ def _store_blob_data(self, oid, oldserial, filename):
+ self.fshelper.getPathForOID(oid, create=True)
+ fd, target = self.fshelper.blob_mkstemp(oid, oldserial)
+ os.close(fd)
+ if sys.platform == 'win32':
+ # On windows, we can't rename to an existing file. We'll
+ # use a slightly different file name. We keep the old one
+ # until we're done to avoid conflicts. Then remove the old name.
+ target += 'w'
+ ZODB.blob.rename_or_copy_blob(filename, target)
+ os.remove(target[:-1])
+ else:
+ ZODB.blob.rename_or_copy_blob(filename, target)
+
+ self._add_blob_to_transaction(oid, target)
+
+ def _add_blob_to_transaction(self, oid, filename):
+ if self._txn_blobs is None:
+ self._txn_blobs = {}
+ else:
+ old_filename = self._txn_blobs.get(oid)
+ if old_filename is not None and old_filename != filename:
+ ZODB.blob.remove_committed(old_filename)
+ self._txn_blobs[oid] = filename
+
+ def copyTransactionsFrom(self, other):
+ # copied from ZODB.blob.BlobStorageMixin
+ for trans in other.iterator():
+ self.tpc_begin(trans, trans.tid, trans.status)
+ for record in trans:
+ blobfilename = None
+ if ZODB.blob.is_blob_record(record.data):
+ try:
+ blobfilename = other.loadBlob(record.oid, record.tid)
+ except POSKeyError:
+ pass
+ if blobfilename is not None:
+ fd, name = tempfile.mkstemp(
+ suffix='.tmp', dir=self.fshelper.temp_dir)
+ os.close(fd)
+ ZODB.utils.cp(open(blobfilename, 'rb'), open(name, 'wb'))
+ self.restoreBlob(record.oid, record.tid, record.data,
+ name, record.data_txn, trans)
+ else:
+ self.restore(record.oid, record.tid, record.data,
+ '', record.data_txn, trans)
+
+ self.tpc_vote(trans)
+ self.tpc_finish(trans)
+
# The propagate_invalidations flag implements the old
# invalidation polling API and is not otherwise used. Set to a
# false value, it tells the Connection not to propagate object
Added: relstorage/trunk/relstorage/tests/README.txt
===================================================================
--- relstorage/trunk/relstorage/tests/README.txt (rev 0)
+++ relstorage/trunk/relstorage/tests/README.txt 2009-07-12 07:49:01 UTC (rev 101824)
@@ -0,0 +1,31 @@
+
+Running Tests
+=============
+
+To run these tests, you need to create a test user account and several
+databases. Use or adapt the SQL statements below to create the
+databases.
+
+PostgreSQL
+----------
+
+CREATE USER relstoragetest WITH PASSWORD 'relstoragetest';
+CREATE DATABASE relstoragetest OWNER relstoragetest;
+CREATE DATABASE relstoragetest2 OWNER relstoragetest;
+
+Also, add the following lines to the top of pg_hba.conf (if you put
+them at the bottom, they may be overridden by other parameters):
+
+local relstoragetest relstoragetest md5
+local relstoragetest2 relstoragetest md5
+
+
+MySQL
+-----
+
+CREATE USER 'relstoragetest'@'localhost' IDENTIFIED BY 'relstoragetest';
+CREATE DATABASE relstoragetest;
+GRANT ALL ON relstoragetest.* TO 'relstoragetest'@'localhost';
+CREATE DATABASE relstoragetest2;
+GRANT ALL ON relstoragetest2.* TO 'relstoragetest'@'localhost';
+FLUSH PRIVILEGES;
Modified: relstorage/trunk/relstorage/tests/testmysql.py
===================================================================
--- relstorage/trunk/relstorage/tests/testmysql.py 2009-07-12 07:42:07 UTC (rev 101823)
+++ relstorage/trunk/relstorage/tests/testmysql.py 2009-07-12 07:49:01 UTC (rev 101824)
@@ -37,11 +37,42 @@
class FileToMySQL(UseMySQLAdapter, reltestbase.FromFileStorage):
pass
+db_names = {
+ 'data': 'relstoragetest',
+ '1': 'relstoragetest',
+ '2': 'relstoragetest2',
+ 'dest': 'relstoragetest2',
+ }
def test_suite():
suite = unittest.TestSuite()
for klass in [MySQLTests, MySQLToFile, FileToMySQL]:
suite.addTest(unittest.makeSuite(klass, "check"))
+
+ try:
+ from ZODB.tests.testblob import storage_reusable_suite
+ except ImportError:
+ # ZODB < 3.9
+ pass
+ else:
+ def create_storage(name, blob_dir):
+ from relstorage.relstorage import RelStorage
+ adapter = MySQLAdapter(
+ db=db_names[name],
+ user='relstoragetest',
+ passwd='relstoragetest',
+ )
+ storage = RelStorage(adapter, name=name, create=True,
+ blob_dir=blob_dir)
+ storage.zap_all()
+ return storage
+
+ suite.addTest(storage_reusable_suite(
+ 'MySQL', create_storage,
+ test_blob_storage_recovery=True,
+ test_packing=True,
+ ))
+
return suite
if __name__=='__main__':
Modified: relstorage/trunk/relstorage/tests/testpostgresql.py
===================================================================
--- relstorage/trunk/relstorage/tests/testpostgresql.py 2009-07-12 07:42:07 UTC (rev 101823)
+++ relstorage/trunk/relstorage/tests/testpostgresql.py 2009-07-12 07:49:01 UTC (rev 101824)
@@ -34,11 +34,40 @@
class FileToPG(UsePostgreSQLAdapter, reltestbase.FromFileStorage):
pass
+db_names = {
+ 'data': 'relstoragetest',
+ '1': 'relstoragetest',
+ '2': 'relstoragetest2',
+ 'dest': 'relstoragetest2',
+ }
def test_suite():
suite = unittest.TestSuite()
for klass in [PostgreSQLTests, PGToFile, FileToPG]:
suite.addTest(unittest.makeSuite(klass, "check"))
+
+ try:
+ from ZODB.tests.testblob import storage_reusable_suite
+ except ImportError:
+ # ZODB < 3.9
+ pass
+ else:
+ def create_storage(name, blob_dir):
+ from relstorage.relstorage import RelStorage
+ adapter = PostgreSQLAdapter(
+ 'dbname=%s user=relstoragetest password=relstoragetest' %
+ db_names[name])
+ storage = RelStorage(adapter, name=name, create=True,
+ blob_dir=blob_dir)
+ storage.zap_all()
+ return storage
+
+ suite.addTest(storage_reusable_suite(
+ 'PostgreSQL', create_storage,
+ test_blob_storage_recovery=True,
+ test_packing=True,
+ ))
+
return suite
if __name__=='__main__':
More information about the Checkins
mailing list