[Checkins] SVN: relstorage/trunk/ Added blob support.

Shane Hathaway shane at hathawaymix.org
Sun Jul 12 03:49:01 EDT 2009


Log message for revision 101824:
  Added blob support.
  

Changed:
  U   relstorage/trunk/CHANGES.txt
  U   relstorage/trunk/relstorage/adapters/common.py
  U   relstorage/trunk/relstorage/relstorage.py
  A   relstorage/trunk/relstorage/tests/README.txt
  U   relstorage/trunk/relstorage/tests/testmysql.py
  U   relstorage/trunk/relstorage/tests/testpostgresql.py

-=-
Modified: relstorage/trunk/CHANGES.txt
===================================================================
--- relstorage/trunk/CHANGES.txt	2009-07-12 07:42:07 UTC (rev 101823)
+++ relstorage/trunk/CHANGES.txt	2009-07-12 07:49:01 UTC (rev 101824)
@@ -1,4 +1,14 @@
 
+Next Feature Release
+--------------------
+
+- Added support for a blob directory. No BlobStorage wrapper is needed.
+  Cluster nodes will need to use a shared filesystem such as NFS or
+  SMB/CIFS.
+
+- [TODO: add the blob_dir parameter to component.xml and README]
+
+
 Version 1.2.0b2 (2009-05-05)
 ----------------------------
 

Modified: relstorage/trunk/relstorage/adapters/common.py
===================================================================
--- relstorage/trunk/relstorage/adapters/common.py	2009-07-12 07:42:07 UTC (rev 101823)
+++ relstorage/trunk/relstorage/adapters/common.py	2009-07-12 07:49:01 UTC (rev 101824)
@@ -354,7 +354,8 @@
         Parameters: "undo_tid", the integer tid of the transaction to undo,
         and "self_tid", the integer tid of the current transaction.
 
-        Returns the list of OIDs undone.
+        Returns the states copied forward by the undo operation as a
+        list of (oid, old_tid).
         """
         stmt = self._scripts['create_temp_undo']
         if stmt:
@@ -387,7 +388,7 @@
         WHERE zoid IN (SELECT zoid FROM temp_undo)
             AND tid = %(self_tid)s;
 
-        -- Add new undo records.
+        -- Copy old states forward.
         INSERT INTO object_state (zoid, tid, prev_tid, md5, state)
         SELECT temp_undo.zoid, %(self_tid)s, current_object.tid,
             prev.md5, prev.state
@@ -397,12 +398,12 @@
                 ON (prev.zoid = temp_undo.zoid
                     AND prev.tid = temp_undo.prev_tid);
 
-        -- List the changed OIDs.
-        SELECT zoid FROM temp_undo
+        -- List the copied states.
+        SELECT zoid, prev_tid FROM temp_undo
         """
         self._run_script(cursor, stmt,
             {'undo_tid': undo_tid, 'self_tid': self_tid})
-        res = [oid for (oid,) in cursor]
+        res = list(cursor)
 
         stmt = self._scripts['reset_temp_undo']
         if stmt:
@@ -747,7 +748,7 @@
         pass
 
 
-    def pack(self, pack_tid, options, sleep=time.sleep):
+    def pack(self, pack_tid, options, sleep=time.sleep, packed_func=None):
         """Pack.  Requires the information provided by pre_pack."""
 
         # Read committed mode is sufficient.
@@ -779,31 +780,26 @@
                 # Pack in small batches of transactions in order to minimize
                 # the interruption of concurrent write operations.
                 start = time.time()
+                packed_list = []
                 self._hold_commit_lock(cursor)
                 for tid, packed, has_removable in tid_rows:
                     self._pack_transaction(
-                        cursor, pack_tid, tid, packed, has_removable)
+                        cursor, pack_tid, tid, packed, has_removable,
+                        packed_list)
                     if time.time() >= start + options.pack_batch_timeout:
-                        # commit the work done so far and release the
-                        # commit lock for a short time
                         conn.commit()
+                        if packed_func is not None:
+                            for oid, tid in packed_list:
+                                packed_func(oid, tid)
+                        del packed_list[:]
                         self._release_commit_lock(cursor)
-                        # Add a delay based on the configured duty cycle.
-                        elapsed = time.time() - start
-                        if elapsed == 0.0:
-                            # Compensate for low timer resolution by
-                            # assuming that at least 10 ms elapsed.
-                            elapsed = 0.01
-                        duty_cycle = options.pack_duty_cycle
-                        if duty_cycle > 0.0 and duty_cycle < 1.0:
-                            delay = min(options.pack_max_delay,
-                                elapsed * (1.0 / duty_cycle - 1.0))
-                            if delay > 0:
-                                log.debug('pack: sleeping %.4g second(s)',
-                                    delay)
-                                sleep(delay)
+                        self._pause_pack(sleep, options, start)
                         self._hold_commit_lock(cursor)
                         start = time.time()
+                if packed_func is not None:
+                    for oid, tid in packed_list:
+                        packed_func(oid, tid)
+                packed_list = None
 
                 self._pack_cleanup(conn, cursor)
 
@@ -819,9 +815,23 @@
         finally:
             self.close(conn, cursor)
 
+    def _pause_pack(self, sleep, options, start):
+        """Pause packing to allow concurrent commits."""
+        elapsed = time.time() - start
+        if elapsed == 0.0:
+            # Compensate for low timer resolution by
+            # assuming that at least 10 ms elapsed.
+            elapsed = 0.01
+        duty_cycle = options.pack_duty_cycle
+        if duty_cycle > 0.0 and duty_cycle < 1.0:
+            delay = min(options.pack_max_delay,
+                elapsed * (1.0 / duty_cycle - 1.0))
+            if delay > 0:
+                log.debug('pack: sleeping %.4g second(s)', delay)
+                sleep(delay)
 
     def _pack_transaction(self, cursor, pack_tid, tid, packed,
-            has_removable):
+            has_removable, packed_list):
         """Pack one transaction.  Requires populated pack tables."""
         log.debug("pack: transaction %d: packing", tid)
         removed_objects = 0
@@ -845,6 +855,15 @@
             self._run_script_stmt(cursor, stmt,
                 {'pack_tid': pack_tid, 'tid': tid})
 
+            stmt = """
+            SELECT pack_state.zoid
+            FROM pack_state
+            WHERE pack_state.tid = %(tid)s
+            """
+            self._run_script_stmt(cursor, stmt, {'tid': tid})
+            for (oid,) in cursor:
+                packed_list.append((oid, tid))
+
         # Find out whether the transaction is empty
         stmt = self._scripts['transaction_has_data']
         self._run_script_stmt(cursor, stmt, {'tid': tid})

Modified: relstorage/trunk/relstorage/relstorage.py
===================================================================
--- relstorage/trunk/relstorage/relstorage.py	2009-07-12 07:42:07 UTC (rev 101823)
+++ relstorage/trunk/relstorage/relstorage.py	2009-07-12 07:49:01 UTC (rev 101824)
@@ -16,18 +16,26 @@
 Stores pickles in the database.
 """
 
+from persistent.TimeStamp import TimeStamp
+from ZODB.BaseStorage import BaseStorage
+from ZODB.BaseStorage import DataRecord
+from ZODB.BaseStorage import TransactionRecord
+from ZODB import ConflictResolution
+from ZODB import POSException
+from ZODB.POSException import POSKeyError
+from ZODB.utils import p64
+from ZODB.utils import u64
+from zope.interface import implements
+from zope.interface import Interface
 import base64
 import cPickle
 import logging
 import os
+import sys
+import tempfile
 import time
 import weakref
-from ZODB.utils import p64, u64
-from ZODB.BaseStorage import BaseStorage, TransactionRecord, DataRecord
-from ZODB import ConflictResolution, POSException
-from persistent.TimeStamp import TimeStamp
-from zope.interface import Interface
-from zope.interface import implements
+import ZODB.interfaces
 
 try:
     from ZODB.interfaces import StorageStopIteration
@@ -37,12 +45,18 @@
         backwards-compatible exception.
         """
 
-try:
-    from ZODB.interfaces import IMVCCStorage
-except ImportError:
-    class IMVCCStorage(Interface):
-        """Stub for versions of ZODB that do not define IMVCCStorage.
-        """
+_relstorage_interfaces = []
+for name in (
+    'IStorage',
+    'IMVCCStorage',
+    'IStorageRestoreable',
+    'IStorageIteration',
+    'IStorageUndoable',
+    'IBlobStorage',
+    'IBlobStorageRestoreable',
+    ):
+    if hasattr(ZODB.interfaces, name):
+        _relstorage_interfaces.append(getattr(ZODB.interfaces, name))
 
 try:
     from hashlib import md5
@@ -63,10 +77,10 @@
 class RelStorage(BaseStorage,
                 ConflictResolution.ConflictResolvingStorage):
     """Storage to a relational database, based on invalidation polling"""
-    implements(IMVCCStorage)
+    implements(*_relstorage_interfaces)
 
     def __init__(self, adapter, name=None, create=True,
-            read_only=False, options=None, **kwoptions):
+            read_only=False, options=None, blob_dir=None, **kwoptions):
         if name is None:
             name = 'RelStorage on %s' % adapter.__class__.__name__
 
@@ -145,7 +159,20 @@
         # _poll_at is the time to poll regardless of commit_count
         self._poll_at = 0
 
+        # _txn_blobs: {oid->filename}; contains blob data for the
+        # currently uncommitted transaction.
+        self._txn_blobs = None
 
+        self._blob_dir = blob_dir
+        if blob_dir:
+            from ZODB.blob import FilesystemHelper
+            self.fshelper = FilesystemHelper(blob_dir)
+            if create:
+                self.fshelper.create()
+                self.fshelper.checkSecure()
+        else:
+            self.fshelper = None
+
     def _open_load_connection(self):
         """Open the load connection to the database.  Return nothing."""
         conn, cursor = self._adapter.open_for_load()
@@ -262,7 +289,7 @@
         """
         other = RelStorage(adapter=self._adapter, name=self._name,
             create=False, read_only=self._is_read_only,
-            options=self._options)
+            options=self._options, blob_dir=self._blob_dir)
         self._instances.append(weakref.ref(other))
         return other
 
@@ -337,7 +364,7 @@
                         cache.set(cachekey, tid_int)
                 if tid_int is None:
                     self._log_keyerror(oid_int, "no tid found(1)")
-                    raise KeyError(oid)
+                    raise POSKeyError(oid)
 
                 # get state from the cache or the database
                 cachekey = 'state:%d:%d' % (oid_int, tid_int)
@@ -358,11 +385,11 @@
                 # This can happen if something attempts to load
                 # an object whose creation has been undone.
                 self._log_keyerror(oid_int, "creation has been undone")
-                raise KeyError(oid)
+                raise POSKeyError(oid)
             return state, p64(tid_int)
         else:
             self._log_keyerror(oid_int, "no tid found(2)")
-            raise KeyError(oid)
+            raise POSKeyError(oid)
 
     def loadEx(self, oid, version):
         # Since we don't support versions, just tack the empty version
@@ -402,7 +429,7 @@
                 cache.set(cachekey, state)
             return state
         else:
-            raise KeyError(oid)
+            raise POSKeyError(oid)
 
     def loadBefore(self, oid, tid):
         """Return the most recent revision of oid before tid committed."""
@@ -419,7 +446,7 @@
                     self._restart_load()
                 cursor = self._load_cursor
             if not self._adapter.exists(cursor, u64(oid)):
-                raise KeyError(oid)
+                raise POSKeyError(oid)
 
             state, start_tid = self._adapter.load_before(
                 cursor, oid_int, u64(tid))
@@ -663,6 +690,15 @@
         self._adapter.update_current(cursor, tid_int)
         self._prepared_txn = self._adapter.commit_phase1(cursor, tid_int)
 
+        if self._txn_blobs:
+            # We now have a transaction ID, so rename all the blobs
+            # accordingly.
+            for oid, sourcename in self._txn_blobs.items():
+                targetname = self.fshelper.getBlobFilename(oid, self._tid)
+                if sourcename != targetname:
+                    ZODB.blob.rename_or_copy_blob(sourcename, targetname)
+                    self._txn_blobs[oid] = targetname
+
         return serials
 
 
@@ -701,6 +737,7 @@
                 # initial commit_count.  Increment commit_count so that it
                 # doesn't matter who won.
                 cache.incr('commit_count')
+        self._txn_blobs = None
         self._prepared_txn = None
         self._ltid = self._tid
         self._tid = None
@@ -710,6 +747,16 @@
         self._rollback_load_connection()
         if self._store_cursor is not None:
             self._adapter.abort(self._store_cursor, self._prepared_txn)
+        if self._txn_blobs:
+            try:
+                for oid, filename in self._txn_blobs.iteritems():
+                    if os.path.exists(filename):
+                        ZODB.blob.remove_committed(filename)
+                        dirname = os.path.dirname(filename)
+                        if not os.listdir(dirname):
+                            ZODB.blob.remove_committed_dir(dirname)
+            finally:
+                self._txn_blobs = None
         self._prepared_txn = None
         self._tid = None
 
@@ -784,7 +831,7 @@
             try:
                 rows = self._adapter.iter_object_history(cursor, oid_int)
             except KeyError:
-                raise KeyError(oid)
+                raise POSKeyError(oid)
 
             res = []
             for tid_int, username, description, extension, length in rows:
@@ -842,21 +889,45 @@
                 adapter.verify_undoable(cursor, undo_tid_int)
 
                 self_tid_int = u64(self._tid)
-                oid_ints = adapter.undo(cursor, undo_tid_int, self_tid_int)
-                oids = [p64(oid_int) for oid_int in oid_ints]
+                copied = adapter.undo(cursor, undo_tid_int, self_tid_int)
+                oids = [p64(oid_int) for oid_int, _ in copied]
 
                 # Update the current object pointers immediately, so that
                 # subsequent undo operations within this transaction will see
                 # the new current objects.
                 adapter.update_current(cursor, self_tid_int)
 
+                if self.fshelper is not None:
+                    self._copy_undone_blobs(copied)
+
                 return self._tid, oids
             finally:
                 adapter.release_pack_lock(cursor)
         finally:
             self._lock_release()
 
+    def _copy_undone_blobs(self, copied):
+        """After an undo operation, copies the matching blobs forward.
 
+        The copied parameter is a list of (integer oid, integer tid).
+        """
+        for oid_int, old_tid_int in copied:
+            oid = p64(oid_int)
+            old_tid = p64(old_tid_int)
+            orig_fn = self.fshelper.getBlobFilename(oid, old_tid)
+            if not os.path.exists(orig_fn):
+                # not a blob
+                continue
+
+            new_fn = self.fshelper.getBlobFilename(oid, self._tid)
+            orig = open(orig_fn, 'r')
+            new = open(new_fn, 'wb')
+            ZODB.utils.cp(orig, new)
+            orig.close()
+            new.close()
+
+            self._add_blob_to_transaction(oid, new_fn)
+
     def pack(self, t, referencesf, sleep=time.sleep):
         if self._is_read_only:
             raise POSException.ReadOnlyError()
@@ -904,21 +975,40 @@
                     log.info("pack: dry run complete")
                 else:
                     # Now pack.
-                    adapter.pack(tid_int, self._options, sleep=sleep)
+                    if self.fshelper is not None:
+                        packed_func = self._after_pack
+                    else:
+                        packed_func = None
+                    adapter.pack(tid_int, self._options, sleep=sleep,
+                        packed_func=packed_func)
             finally:
                 adapter.release_pack_lock(lock_cursor)
         finally:
             lock_conn.rollback()
             adapter.close(lock_conn, lock_cursor)
 
+    def _after_pack(self, oid_int, tid_int):
+        """Called after an object state has been removed by packing.
 
+        Removes the corresponding blob file.
+        """
+        oid = p64(oid_int)
+        tid = p64(tid_int)
+        fn = self.fshelper.getBlobFilename(oid, tid)
+        if os.path.exists(fn):
+            ZODB.blob.remove_committed(fn)
+            dirname = os.path.dirname(fn)
+            if not os.listdir(dirname):
+                ZODB.blob.remove_committed_dir(dirname)
+
     def iterator(self, start=None, stop=None):
         return TransactionIterator(self._adapter, start, stop)
 
     def sync(self, force=True):
         """Updates to a current view of the database.
 
-        This is implemented by rolling back the transaction.
+        This is implemented by rolling back the relational database
+        transaction.
 
         If force is False and a poll interval has been set, this call
         is ignored. The poll_invalidations method will later choose to
@@ -1004,6 +1094,125 @@
         finally:
             self._lock_release()
 
+    def loadBlob(self, oid, serial):
+        """Return the filename of the Blob data for this OID and serial.
+
+        Returns a filename.
+
+        Raises POSKeyError if the blobfile cannot be found.
+        """
+        if self.fshelper is None:
+            raise POSException.Unsupported("No blob directory is configured.")
+
+        blob_filename = self.fshelper.getBlobFilename(oid, serial)
+        if os.path.exists(blob_filename):
+            return blob_filename
+        else:
+            raise POSKeyError("No blob file", oid, serial)
+
+    def openCommittedBlobFile(self, oid, serial, blob=None):
+        """Return a file for committed data for the given object id and serial
+
+        If a blob is provided, then a BlobFile object is returned,
+        otherwise, an ordinary file is returned.  In either case, the
+        file is opened for binary reading.
+
+        This method is used to allow storages that cache blob data to
+        make sure that data are available at least long enough for the
+        file to be opened.
+        """
+        blob_filename = self.loadBlob(oid, serial)
+        if blob is None:
+            return open(blob_filename, 'rb')
+        else:
+            return ZODB.blob.BlobFile(blob_filename, 'r', blob)
+
+    def temporaryDirectory(self):
+        """Return a directory that should be used for uncommitted blob data.
+
+        If Blobs use this, then commits can be performed with a simple rename.
+        """
+        return self.fshelper.temp_dir
+
+    def storeBlob(self, oid, oldserial, data, blobfilename, version, txn):
+        """Stores data that has a BLOB attached.
+
+        The blobfilename argument names a file containing blob data.
+        The storage will take ownership of the file and will rename it
+        (or copy and remove it) immediately, or at transaction-commit
+        time.  The file must not be open.
+
+        The new serial is returned.
+        """
+        assert not version
+        self.store(oid, oldserial, data, '', txn)
+        self._store_blob_data(oid, oldserial, blobfilename)
+        return None
+
+    def restoreBlob(self, oid, serial, data, blobfilename, prev_txn, txn):
+        """Write blob data already committed in a separate database
+
+        See the restore and storeBlob methods.
+        """
+        self.restore(oid, serial, data, '', prev_txn, txn)
+        self._lock_acquire()
+        try:
+            self.fshelper.getPathForOID(oid, create=True)
+            targetname = self.fshelper.getBlobFilename(oid, serial)
+            ZODB.blob.rename_or_copy_blob(blobfilename, targetname)
+        finally:
+            self._lock_release()
+
+    def _store_blob_data(self, oid, oldserial, filename):
+        self.fshelper.getPathForOID(oid, create=True)
+        fd, target = self.fshelper.blob_mkstemp(oid, oldserial)
+        os.close(fd)
+        if sys.platform == 'win32':
+            # On windows, we can't rename to an existing file.  We'll
+            # use a slightly different file name. We keep the old one
+            # until we're done to avoid conflicts. Then remove the old name.
+            target += 'w'
+            ZODB.blob.rename_or_copy_blob(filename, target)
+            os.remove(target[:-1])
+        else:
+            ZODB.blob.rename_or_copy_blob(filename, target)
+
+        self._add_blob_to_transaction(oid, target)
+
+    def _add_blob_to_transaction(self, oid, filename):
+        if self._txn_blobs is None:
+            self._txn_blobs = {}
+        else:
+            old_filename = self._txn_blobs.get(oid)
+            if old_filename is not None and old_filename != filename:
+                ZODB.blob.remove_committed(old_filename)
+        self._txn_blobs[oid] = filename
+
+    def copyTransactionsFrom(self, other):
+        # copied from ZODB.blob.BlobStorageMixin
+        for trans in other.iterator():
+            self.tpc_begin(trans, trans.tid, trans.status)
+            for record in trans:
+                blobfilename = None
+                if ZODB.blob.is_blob_record(record.data):
+                    try:
+                        blobfilename = other.loadBlob(record.oid, record.tid)
+                    except POSKeyError:
+                        pass
+                if blobfilename is not None:
+                    fd, name = tempfile.mkstemp(
+                        suffix='.tmp', dir=self.fshelper.temp_dir)
+                    os.close(fd)
+                    ZODB.utils.cp(open(blobfilename, 'rb'), open(name, 'wb'))
+                    self.restoreBlob(record.oid, record.tid, record.data,
+                                     name, record.data_txn, trans)
+                else:
+                    self.restore(record.oid, record.tid, record.data,
+                                 '', record.data_txn, trans)
+
+            self.tpc_vote(trans)
+            self.tpc_finish(trans)
+
     # The propagate_invalidations flag implements the old
     # invalidation polling API and is not otherwise used. Set to a
     # false value, it tells the Connection not to propagate object

Added: relstorage/trunk/relstorage/tests/README.txt
===================================================================
--- relstorage/trunk/relstorage/tests/README.txt	                        (rev 0)
+++ relstorage/trunk/relstorage/tests/README.txt	2009-07-12 07:49:01 UTC (rev 101824)
@@ -0,0 +1,31 @@
+
+Running Tests
+=============
+
+To run these tests, you need to create a test user account and several
+databases. Use or adapt the SQL statements below to create the
+databases.
+
+PostgreSQL
+----------
+
+CREATE USER relstoragetest WITH PASSWORD 'relstoragetest';
+CREATE DATABASE relstoragetest OWNER relstoragetest;
+CREATE DATABASE relstoragetest2 OWNER relstoragetest;
+
+Also, add the following lines to the top of pg_hba.conf (if you put
+them at the bottom, they may be overridden by other parameters):
+
+local   relstoragetest  relstoragetest                md5
+local   relstoragetest2 relstoragetest                md5
+
+
+MySQL
+-----
+
+CREATE USER 'relstoragetest'@'localhost' IDENTIFIED BY 'relstoragetest';
+CREATE DATABASE relstoragetest;
+GRANT ALL ON relstoragetest.* TO 'relstoragetest'@'localhost';
+CREATE DATABASE relstoragetest2;
+GRANT ALL ON relstoragetest2.* TO 'relstoragetest'@'localhost';
+FLUSH PRIVILEGES;

Modified: relstorage/trunk/relstorage/tests/testmysql.py
===================================================================
--- relstorage/trunk/relstorage/tests/testmysql.py	2009-07-12 07:42:07 UTC (rev 101823)
+++ relstorage/trunk/relstorage/tests/testmysql.py	2009-07-12 07:49:01 UTC (rev 101824)
@@ -37,11 +37,42 @@
 class FileToMySQL(UseMySQLAdapter, reltestbase.FromFileStorage):
     pass
 
+db_names = {
+    'data': 'relstoragetest',
+    '1': 'relstoragetest',
+    '2': 'relstoragetest2',
+    'dest': 'relstoragetest2',
+    }
 
 def test_suite():
     suite = unittest.TestSuite()
     for klass in [MySQLTests, MySQLToFile, FileToMySQL]:
         suite.addTest(unittest.makeSuite(klass, "check"))
+
+    try:
+        from ZODB.tests.testblob import storage_reusable_suite
+    except ImportError:
+        # ZODB < 3.9
+        pass
+    else:
+        def create_storage(name, blob_dir):
+            from relstorage.relstorage import RelStorage
+            adapter = MySQLAdapter(
+                db=db_names[name],
+                user='relstoragetest',
+                passwd='relstoragetest',
+                )
+            storage = RelStorage(adapter, name=name, create=True,
+                blob_dir=blob_dir)
+            storage.zap_all()
+            return storage
+
+        suite.addTest(storage_reusable_suite(
+            'MySQL', create_storage,
+            test_blob_storage_recovery=True,
+            test_packing=True,
+            ))
+
     return suite
 
 if __name__=='__main__':

Modified: relstorage/trunk/relstorage/tests/testpostgresql.py
===================================================================
--- relstorage/trunk/relstorage/tests/testpostgresql.py	2009-07-12 07:42:07 UTC (rev 101823)
+++ relstorage/trunk/relstorage/tests/testpostgresql.py	2009-07-12 07:49:01 UTC (rev 101824)
@@ -34,11 +34,40 @@
 class FileToPG(UsePostgreSQLAdapter, reltestbase.FromFileStorage):
     pass
 
+db_names = {
+    'data': 'relstoragetest',
+    '1': 'relstoragetest',
+    '2': 'relstoragetest2',
+    'dest': 'relstoragetest2',
+    }
 
 def test_suite():
     suite = unittest.TestSuite()
     for klass in [PostgreSQLTests, PGToFile, FileToPG]:
         suite.addTest(unittest.makeSuite(klass, "check"))
+
+    try:
+        from ZODB.tests.testblob import storage_reusable_suite
+    except ImportError:
+        # ZODB < 3.9
+        pass
+    else:
+        def create_storage(name, blob_dir):
+            from relstorage.relstorage import RelStorage
+            adapter = PostgreSQLAdapter(
+                'dbname=%s user=relstoragetest password=relstoragetest' %
+                db_names[name])
+            storage = RelStorage(adapter, name=name, create=True,
+                blob_dir=blob_dir)
+            storage.zap_all()
+            return storage
+
+        suite.addTest(storage_reusable_suite(
+            'PostgreSQL', create_storage,
+            test_blob_storage_recovery=True,
+            test_packing=True,
+            ))
+
     return suite
 
 if __name__=='__main__':



More information about the Checkins mailing list