[Checkins] SVN: relstorage/trunk/relstorage/adapters/ Added IObjectMover and turned loadstore into a history-preserving

Shane Hathaway shane at hathawaymix.org
Thu Sep 24 01:24:49 EDT 2009

Log message for revision 104469:
  Added IObjectMover and turned loadstore into a history-preserving
  implementation of IObjectMover.

  A   relstorage/trunk/relstorage/adapters/hpmover.py
  U   relstorage/trunk/relstorage/adapters/interfaces.py
  D   relstorage/trunk/relstorage/adapters/loadstore.py
  U   relstorage/trunk/relstorage/adapters/mysql.py
  U   relstorage/trunk/relstorage/adapters/oracle.py
  U   relstorage/trunk/relstorage/adapters/postgresql.py
  U   relstorage/trunk/relstorage/adapters/schema.py

Added: relstorage/trunk/relstorage/adapters/hpmover.py
--- relstorage/trunk/relstorage/adapters/hpmover.py	                        (rev 0)
+++ relstorage/trunk/relstorage/adapters/hpmover.py	2009-09-24 05:24:48 UTC (rev 104469)
@@ -0,0 +1,721 @@
+# Copyright (c) 2009 Zope Foundation and Contributors.
+# All Rights Reserved.
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
+"""History preserving IObjectMover implementation.
+from base64 import decodestring
+from base64 import encodestring
+from relstorage.adapters.interfaces import IObjectMover
+from ZODB.POSException import StorageError
+from zope.interface import implements
+    from hashlib import md5
+except ImportError:
+    from md5 import new as md5
+def compute_md5sum(data):
+    if data is not None:
+        return md5(data).hexdigest()
+    else:
+        # George Bailey object
+        return None
+def for_databases(*database_names):
+    def decorate(f):
+        f._for_databases = database_names
+        return f
+    return decorate
+class HistoryPreservingObjectMover(object):
+    implements(IObjectMover)
+    _method_names = (
+        'get_current_tid',
+        'load_current',
+        'load_revision',
+        'exists',
+        'load_before',
+        'get_object_tid_after',
+        'on_store_opened',
+        'store_temp',
+        'replace_temp',
+        'restore',
+        'detect_conflict',
+        'move_from_temp',
+        'update_current',
+        )
+    def __init__(self, database_name, runner=None,
+            Binary=None, inputsize_BLOB=None, inputsize_BINARY=None):
+        # The inputsize parameters are for Oracle only.
+        self.database_name = database_name
+        self.runner = runner
+        self.Binary = Binary
+        self.inputsize_BLOB = inputsize_BLOB
+        self.inputsize_BINARY = inputsize_BINARY
+        for method_name in self._method_names:
+            method = getattr(self, '%s_%s' % (database_name, method_name))
+            setattr(self, method_name, method)
+    def generic_get_current_tid(self, cursor, oid):
+        """Returns the current integer tid for an object.
+        oid is an integer.  Returns None if object does not exist.
+        """
+        stmt = """
+        SELECT tid
+        FROM current_object
+        WHERE zoid = %s
+        """
+        cursor.execute(stmt, (oid,))
+        for (tid,) in cursor:
+            return tid
+        return None
+    postgresql_get_current_tid = generic_get_current_tid
+    mysql_get_current_tid = generic_get_current_tid
+    def oracle_get_current_tid(self, cursor, oid):
+        """Returns the current integer tid for an object.
+        oid is an integer.  Returns None if object does not exist.
+        """
+        stmt = """
+        SELECT tid
+        FROM current_object
+        WHERE zoid = :1
+        """
+        cursor.execute(stmt, (oid,))
+        for (tid,) in cursor:
+            return tid
+        return None
+    def postgresql_load_current(self, cursor, oid):
+        """Returns the current pickle and integer tid for an object.
+        oid is an integer.  Returns (None, None) if object does not exist.
+        """
+        stmt = """
+        SELECT encode(state, 'base64'), tid
+        FROM current_object
+            JOIN object_state USING(zoid, tid)
+        WHERE zoid = %s
+        """
+        cursor.execute(stmt, (oid,))
+        if cursor.rowcount:
+            assert cursor.rowcount == 1
+            state64, tid = cursor.fetchone()
+            if state64 is not None:
+                state = decodestring(state64)
+            else:
+                # This object's creation has been undone
+                state = None
+            return state, tid
+        else:
+            return None, None
+    def mysql_load_current(self, cursor, oid):
+        """Returns the current pickle and integer tid for an object.
+        oid is an integer.  Returns (None, None) if object does not exist.
+        """
+        stmt = """
+        SELECT state, tid
+        FROM current_object
+            JOIN object_state USING(zoid, tid)
+        WHERE zoid = %s
+        """
+        cursor.execute(stmt, (oid,))
+        if cursor.rowcount:
+            assert cursor.rowcount == 1
+            return cursor.fetchone()
+        else:
+            return None, None
+    def oracle_load_current(self, cursor, oid):
+        """Returns the current pickle and integer tid for an object.
+        oid is an integer.  Returns (None, None) if object does not exist.
+        """
+        stmt = """
+        SELECT state, tid
+        FROM current_object
+            JOIN object_state USING(zoid, tid)
+        WHERE zoid = :1
+        """
+        return self.runner.run_lob_stmt(
+            cursor, stmt, (oid,), default=(None, None))
+    def postgresql_load_revision(self, cursor, oid, tid):
+        """Returns the pickle for an object on a particular transaction.
+        Returns None if no such state exists.
+        """
+        stmt = """
+        SELECT encode(state, 'base64')
+        FROM object_state
+        WHERE zoid = %s
+            AND tid = %s
+        """
+        cursor.execute(stmt, (oid, tid))
+        if cursor.rowcount:
+            assert cursor.rowcount == 1
+            (state64,) = cursor.fetchone()
+            if state64 is not None:
+                return decodestring(state64)
+        return None
+    def mysql_load_revision(self, cursor, oid, tid):
+        """Returns the pickle for an object on a particular transaction.
+        Returns None if no such state exists.
+        """
+        stmt = """
+        SELECT state
+        FROM object_state
+        WHERE zoid = %s
+            AND tid = %s
+        """
+        cursor.execute(stmt, (oid, tid))
+        if cursor.rowcount:
+            assert cursor.rowcount == 1
+            (state,) = cursor.fetchone()
+            return state
+        return None
+    def oracle_load_revision(self, cursor, oid, tid):
+        """Returns the pickle for an object on a particular transaction.
+        Returns None if no such state exists.
+        """
+        stmt = """
+        SELECT state
+        FROM object_state
+        WHERE zoid = :1
+            AND tid = :2
+        """
+        (state,) = self.runner.run_lob_stmt(
+            cursor, stmt, (oid, tid), default=(None,))
+        return state
+    def generic_exists(self, cursor, oid):
+        """Returns a true value if the given object exists."""
+        stmt = "SELECT 1 FROM current_object WHERE zoid = %s"
+        cursor.execute(stmt, (oid,))
+        for row in cursor:
+            return True
+        return False
+    postgresql_exists = generic_exists
+    mysql_exists = generic_exists
+    def oracle_exists(self, cursor, oid):
+        """Returns a true value if the given object exists."""
+        stmt = "SELECT 1 FROM current_object WHERE zoid = :1"
+        cursor.execute(stmt, (oid,))
+        for row in cursor:
+            return True
+        return False
+    def postgresql_load_before(self, cursor, oid, tid):
+        """Returns the pickle and tid of an object before transaction tid.
+        Returns (None, None) if no earlier state exists.
+        """
+        stmt = """
+        SELECT encode(state, 'base64'), tid
+        FROM object_state
+        WHERE zoid = %s
+            AND tid < %s
+        ORDER BY tid DESC
+        LIMIT 1
+        """
+        cursor.execute(stmt, (oid, tid))
+        if cursor.rowcount:
+            assert cursor.rowcount == 1
+            state64, tid = cursor.fetchone()
+            if state64 is not None:
+                state = decodestring(state64)
+            else:
+                # The object's creation has been undone
+                state = None
+            return state, tid
+        else:
+            return None, None
+    def mysql_load_before(self, cursor, oid, tid):
+        """Returns the pickle and tid of an object before transaction tid.
+        Returns (None, None) if no earlier state exists.
+        """
+        stmt = """
+        SELECT state, tid
+        FROM object_state
+        WHERE zoid = %s
+            AND tid < %s
+        ORDER BY tid DESC
+        LIMIT 1
+        """
+        cursor.execute(stmt, (oid, tid))
+        if cursor.rowcount:
+            assert cursor.rowcount == 1
+            return cursor.fetchone()
+        else:
+            return None, None
+    def oracle_load_before(self, cursor, oid, tid):
+        """Returns the pickle and tid of an object before transaction tid.
+        Returns (None, None) if no earlier state exists.
+        """
+        stmt = """
+        SELECT state, tid
+        FROM object_state
+        WHERE zoid = :oid
+            AND tid = (
+                SELECT MAX(tid)
+                FROM object_state
+                WHERE zoid = :oid
+                    AND tid < :tid
+            )
+        """
+        return self.runner.run_lob_stmt(
+            cursor, stmt, {'oid': oid, 'tid': tid}, default=(None, None))
+    def generic_get_object_tid_after(self, cursor, oid, tid):
+        """Returns the tid of the next change after an object revision.
+        Returns None if no later state exists.
+        """
+        stmt = """
+        SELECT tid
+        FROM object_state
+        WHERE zoid = %s
+            AND tid > %s
+        ORDER BY tid
+        LIMIT 1
+        """
+        cursor.execute(stmt, (oid, tid))
+        if cursor.rowcount:
+            assert cursor.rowcount == 1
+            return cursor.fetchone()[0]
+        else:
+            return None
+    postgresql_get_object_tid_after = generic_get_object_tid_after
+    mysql_get_object_tid_after = generic_get_object_tid_after
+    def oracle_get_object_tid_after(self, cursor, oid, tid):
+        """Returns the tid of the next change after an object revision.
+        Returns None if no later state exists.
+        """
+        stmt = """
+        SELECT MIN(tid)
+        FROM object_state
+        WHERE zoid = :1
+            AND tid > :2
+        """
+        cursor.execute(stmt, (oid, tid))
+        rows = cursor.fetchall()
+        if rows:
+            assert len(rows) == 1
+            return rows[0][0]
+        else:
+            return None
+    def postgresql_on_store_opened(self, cursor, restart=False):
+        """Create the temporary table for storing objects"""
+        stmt = """
+        CREATE TEMPORARY TABLE temp_store (
+            zoid        BIGINT NOT NULL,
+            prev_tid    BIGINT NOT NULL,
+            md5         CHAR(32),
+            state       BYTEA
+        ) ON COMMIT DROP;
+        CREATE UNIQUE INDEX temp_store_zoid ON temp_store (zoid)
+        """
+        cursor.execute(stmt)
+    def mysql_on_store_opened(self, cursor, restart=False):
+        """Create the temporary table for storing objects"""
+        if restart:
+            stmt = """
+            DROP TEMPORARY TABLE IF EXISTS temp_store
+            """
+            cursor.execute(stmt)
+        stmt = """
+        CREATE TEMPORARY TABLE temp_store (
+            zoid        BIGINT NOT NULL PRIMARY KEY,
+            prev_tid    BIGINT NOT NULL,
+            md5         CHAR(32),
+            state       LONGBLOB
+        ) ENGINE MyISAM
+        """
+        cursor.execute(stmt)
+    # no store connection initialization needed for Oracle
+    oracle_on_store_opened = None
+    def postgresql_store_temp(self, cursor, oid, prev_tid, data):
+        """Store an object in the temporary table."""
+        md5sum = compute_md5sum(data)
+        stmt = """
+        DELETE FROM temp_store WHERE zoid = %s;
+        INSERT INTO temp_store (zoid, prev_tid, md5, state)
+        VALUES (%s, %s, %s, decode(%s, 'base64'))
+        """
+        cursor.execute(stmt, (oid, oid, prev_tid, md5sum, encodestring(data)))
+    def mysql_store_temp(self, cursor, oid, prev_tid, data):
+        """Store an object in the temporary table."""
+        md5sum = compute_md5sum(data)
+        stmt = """
+        REPLACE INTO temp_store (zoid, prev_tid, md5, state)
+        VALUES (%s, %s, %s, %s)
+        """
+        cursor.execute(stmt, (oid, prev_tid, md5sum, self.Binary(data)))
+    def oracle_store_temp(self, cursor, oid, prev_tid, data):
+        """Store an object in the temporary table."""
+        md5sum = compute_md5sum(data)
+        cursor.execute("DELETE FROM temp_store WHERE zoid = :oid", oid=oid)
+        if len(data) <= 2000:
+            # Send data inline for speed.  Oracle docs say maximum size
+            # of a RAW is 2000 bytes.  inputsize_BINARY corresponds with RAW.
+            cursor.setinputsizes(rawdata=self.inputsize_BINARY)
+            stmt = """
+            INSERT INTO temp_store (zoid, prev_tid, md5, state)
+            VALUES (:oid, :prev_tid, :md5sum, :rawdata)
+            """
+            cursor.execute(stmt, oid=oid, prev_tid=prev_tid,
+                md5sum=md5sum, rawdata=data)
+        else:
+            # Send data as a BLOB
+            cursor.setinputsizes(blobdata=self.inputsize_BLOB)
+            stmt = """
+            INSERT INTO temp_store (zoid, prev_tid, md5, state)
+            VALUES (:oid, :prev_tid, :md5sum, :blobdata)
+            """
+            cursor.execute(stmt, oid=oid, prev_tid=prev_tid,
+                md5sum=md5sum, blobdata=data)
+    def postgresql_replace_temp(self, cursor, oid, prev_tid, data):
+        """Replace an object in the temporary table.
+        This happens after conflict resolution.
+        """
+        md5sum = compute_md5sum(data)
+        stmt = """
+        UPDATE temp_store SET
+            prev_tid = %s,
+            md5 = %s,
+            state = decode(%s, 'base64')
+        WHERE zoid = %s
+        """
+        cursor.execute(stmt, (prev_tid, md5sum, encodestring(data), oid))
+    def mysql_replace_temp(self, cursor, oid, prev_tid, data):
+        """Replace an object in the temporary table.
+        This happens after conflict resolution.
+        """
+        md5sum = compute_md5sum(data)
+        stmt = """
+        UPDATE temp_store SET
+            prev_tid = %s,
+            md5 = %s,
+            state = %s
+        WHERE zoid = %s
+        """
+        cursor.execute(stmt, (prev_tid, md5sum, self.Binary(data), oid))
+    def oracle_replace_temp(self, cursor, oid, prev_tid, data):
+        """Replace an object in the temporary table.
+        This happens after conflict resolution.
+        """
+        md5sum = compute_md5sum(data)
+        cursor.setinputsizes(data=self.inputsize_BLOB)
+        stmt = """
+        UPDATE temp_store SET
+            prev_tid = :prev_tid,
+            md5 = :md5sum,
+            state = :data
+        WHERE zoid = :oid
+        """
+        cursor.execute(stmt, oid=oid, prev_tid=prev_tid,
+            md5sum=md5sum, data=self.Binary(data))
+    def postgresql_restore(self, cursor, oid, tid, data):
+        """Store an object directly, without conflict detection.
+        Used for copying transactions into this database.
+        """
+        md5sum = compute_md5sum(data)
+        stmt = """
+        INSERT INTO object_state (zoid, tid, prev_tid, md5, state)
+        VALUES (%s, %s,
+            COALESCE((SELECT tid FROM current_object WHERE zoid = %s), 0),
+            %s, decode(%s, 'base64'))
+        """
+        if data is not None:
+            data = encodestring(data)
+        cursor.execute(stmt, (oid, tid, oid, md5sum, data))
+    def mysql_restore(self, cursor, oid, tid, data):
+        """Store an object directly, without conflict detection.
+        Used for copying transactions into this database.
+        """
+        md5sum = compute_md5sum(data)
+        stmt = """
+        INSERT INTO object_state (zoid, tid, prev_tid, md5, state)
+        VALUES (%s, %s,
+            COALESCE((SELECT tid FROM current_object WHERE zoid = %s), 0),
+            %s, %s)
+        """
+        if data is not None:
+            data = self.Binary(data)
+        cursor.execute(stmt, (oid, tid, oid, md5sum, data))
+    def oracle_restore(self, cursor, oid, tid, data):
+        """Store an object directly, without conflict detection.
+        Used for copying transactions into this database.
+        """
+        md5sum = compute_md5sum(data)
+        if len(data) <= 2000:
+            # Send data inline for speed.  Oracle docs say maximum size
+            # of a RAW is 2000 bytes.  inputsize_BINARY corresponds with RAW.
+            cursor.setinputsizes(rawdata=self.inputsize_BINARY)
+            stmt = """
+            INSERT INTO object_state (zoid, tid, prev_tid, md5, state)
+            VALUES (:oid, :tid,
+              COALESCE((SELECT tid FROM current_object WHERE zoid = :oid), 0),
+              :md5sum, :rawdata)
+            """
+            cursor.execute(stmt, oid=oid, tid=tid,
+                md5sum=md5sum, rawdata=data)
+        else:
+            # Send data as a BLOB
+            cursor.setinputsizes(blobdata=self.inputsize_BLOB)
+            stmt = """
+            INSERT INTO object_state (zoid, tid, prev_tid, md5, state)
+            VALUES (:oid, :tid,
+              COALESCE((SELECT tid FROM current_object WHERE zoid = :oid), 0),
+              :md5sum, :blobdata)
+            """
+            cursor.execute(stmt, oid=oid, tid=tid,
+                md5sum=md5sum, blobdata=data)
+    def postgresql_detect_conflict(self, cursor):
+        """Find one conflict in the data about to be committed.
+        If there is a conflict, returns (oid, prev_tid, attempted_prev_tid,
+        attempted_data).  If there is no conflict, returns None.
+        """
+        stmt = """
+        SELECT temp_store.zoid, current_object.tid, temp_store.prev_tid,
+            encode(temp_store.state, 'base64')
+        FROM temp_store
+            JOIN current_object ON (temp_store.zoid = current_object.zoid)
+        WHERE temp_store.prev_tid != current_object.tid
+        LIMIT 1
+        """
+        cursor.execute(stmt)
+        if cursor.rowcount:
+            oid, prev_tid, attempted_prev_tid, data = cursor.fetchone()
+            return oid, prev_tid, attempted_prev_tid, decodestring(data)
+        return None
+    def mysql_detect_conflict(self, cursor):
+        """Find one conflict in the data about to be committed.
+        If there is a conflict, returns (oid, prev_tid, attempted_prev_tid,
+        attempted_data).  If there is no conflict, returns None.
+        """
+        # Lock in share mode to ensure the data being read is up to date.
+        stmt = """
+        SELECT temp_store.zoid, current_object.tid, temp_store.prev_tid,
+            temp_store.state
+        FROM temp_store
+            JOIN current_object ON (temp_store.zoid = current_object.zoid)
+        WHERE temp_store.prev_tid != current_object.tid
+        LIMIT 1
+        """
+        cursor.execute(stmt)
+        if cursor.rowcount:
+            return cursor.fetchone()
+        return None
+    def oracle_detect_conflict(self, cursor):
+        """Find one conflict in the data about to be committed.
+        If there is a conflict, returns (oid, prev_tid, attempted_prev_tid,
+        attempted_data).  If there is no conflict, returns None.
+        """
+        stmt = """
+        SELECT temp_store.zoid, current_object.tid, temp_store.prev_tid,
+            temp_store.state
+        FROM temp_store
+            JOIN current_object ON (temp_store.zoid = current_object.zoid)
+        WHERE temp_store.prev_tid != current_object.tid
+        """
+        return self.runner.run_lob_stmt(cursor, stmt)
+    def generic_move_from_temp(self, cursor, tid):
+        """Moved the temporarily stored objects to permanent storage.
+        Returns the list of oids stored.
+        """
+        stmt = """
+        INSERT INTO object_state (zoid, tid, prev_tid, md5, state)
+        SELECT zoid, %s, prev_tid, md5, state
+        FROM temp_store
+        """
+        cursor.execute(stmt, (tid,))
+        stmt = """
+        SELECT zoid FROM temp_store
+        """
+        cursor.execute(stmt)
+        return [oid for (oid,) in cursor]
+    postgresql_move_from_temp = generic_move_from_temp
+    mysql_move_from_temp = generic_move_from_temp
+    def oracle_move_from_temp(self, cursor, tid):
+        """Move the temporarily stored objects to permanent storage.
+        Returns the list of oids stored.
+        """
+        stmt = """
+        INSERT INTO object_state (zoid, tid, prev_tid, md5, state)
+        SELECT zoid, :tid, prev_tid, md5, state
+        FROM temp_store
+        """
+        cursor.execute(stmt, tid=tid)
+        stmt = """
+        SELECT zoid FROM temp_store
+        """
+        cursor.execute(stmt)
+        return [oid for (oid,) in cursor]
+    def postgresql_update_current(self, cursor, tid):
+        """Update the current object pointers.
+        tid is the integer tid of the transaction being committed.
+        """
+        cursor.execute("""
+        -- Insert objects created in this transaction into current_object.
+        INSERT INTO current_object (zoid, tid)
+        SELECT zoid, tid FROM object_state
+        WHERE tid = %(tid)s
+            AND prev_tid = 0;
+        -- Change existing objects.  To avoid deadlocks,
+        -- update in OID order.
+        UPDATE current_object SET tid = %(tid)s
+        WHERE zoid IN (
+            SELECT zoid FROM object_state
+            WHERE tid = %(tid)s
+                AND prev_tid != 0
+            ORDER BY zoid
+        )
+        """, {'tid': tid})
+    def mysql_update_current(self, cursor, tid):
+        """Update the current object pointers.
+        tid is the integer tid of the transaction being committed.
+        """
+        cursor.execute("""
+        REPLACE INTO current_object (zoid, tid)
+        SELECT zoid, tid FROM object_state
+        WHERE tid = %s
+        """, (tid,))
+    def oracle_update_current(self, cursor, tid):
+        """Update the current object pointers.
+        tid is the integer tid of the transaction being committed.
+        """
+        # Insert objects created in this transaction into current_object.
+        stmt = """
+        INSERT INTO current_object (zoid, tid)
+        SELECT zoid, tid FROM object_state
+        WHERE tid = :1
+            AND prev_tid = 0
+        """
+        cursor.execute(stmt, (tid,))
+        # Change existing objects.
+        stmt = """
+        UPDATE current_object SET tid = :1
+        WHERE zoid IN (
+            SELECT zoid FROM object_state
+            WHERE tid = :1
+                AND prev_tid != 0
+        )
+        """
+        cursor.execute(stmt, (tid,))

Modified: relstorage/trunk/relstorage/adapters/interfaces.py
--- relstorage/trunk/relstorage/adapters/interfaces.py	2009-09-24 03:54:59 UTC (rev 104468)
+++ relstorage/trunk/relstorage/adapters/interfaces.py	2009-09-24 05:24:48 UTC (rev 104469)
@@ -86,7 +86,7 @@
         """Iterate over an object's history.
         Raises KeyError if the object does not exist.
-        Yields (tid, username, description, extension, pickle_size)
+        Yields (tid, username, description, extension, state_size)
         for each modification.
@@ -116,6 +116,84 @@
         """Release the pack lock."""
+class IObjectMover(Interface):
+    """Moves object states to/from the database and within the database"""
+    def get_current_tid(cursor, oid):
+        """Returns the current integer tid for an object.
+        oid is an integer.  Returns None if object does not exist.
+        """
+    def load_current(cursor, oid):
+        """Returns the current state and integer tid for an object.
+        oid is an integer.  Returns (None, None) if object does not exist.
+        """
+    def load_revision(cursor, oid, tid):
+        """Returns the state for an object on a particular transaction.
+        Returns None if no such state exists.
+        """
+    def exists(cursor, oid):
+        """Returns a true value if the given object exists."""
+    def load_before(cursor, oid, tid):
+        """Returns the state and tid of an object before transaction tid.
+        Returns (None, None) if no earlier state exists.
+        """
+    def get_object_tid_after(cursor, oid, tid):
+        """Returns the tid of the next change after an object revision.
+        Returns None if no later state exists.
+        """
+    def on_store_opened(cursor, restart=False):
+        """Create the temporary table for storing objects.
+        This method may be None, meaning no store connection
+        initialization is required.
+        """
+    def store_temp(cursor, oid, prev_tid, data):
+        """Store an object in the temporary table."""
+    def replace_temp(cursor, oid, prev_tid, data):
+        """Replace an object in the temporary table.
+        This happens after conflict resolution.
+        """
+    def restore(cursor, oid, tid, data):
+        """Store an object directly, without conflict detection.
+        Used for copying transactions into this database.
+        """
+    def detect_conflict(cursor):
+        """Find one conflict in the data about to be committed.
+        If there is a conflict, returns (oid, prev_tid, attempted_prev_tid,
+        attempted_data).  If there is no conflict, returns None.
+        """
+    def move_from_temp(cursor, tid):
+        """Moved the temporarily stored objects to permanent storage.
+        Returns the list of oids stored.
+        """
+    def update_current(cursor, tid):
+        """Update the current object pointers.
+        tid is the integer tid of the transaction being committed.
+        """
 class IOIDAllocator(Interface):
     def set_min_oid(cursor, oid):
@@ -164,8 +242,8 @@
         pack_tid specifies the most recent transaction to pack.
-        get_references is a function that accepts a pickled state and
-        returns a set of OIDs that state refers to.
+        get_references is a function that accepts a stored object state
+        and returns a set of OIDs that state refers to.
         options is an instance of relstorage.Options.
         In particular, the options.pack_gc flag indicates whether
@@ -175,9 +253,9 @@
     def pack(pack_tid, options, sleep=None, packed_func=None):
         """Pack.  Requires the information provided by pre_pack.
-        packed_func, if provided, will be called for every object
-        packed, just after it is removed.  The function must accept
-        two parameters, oid and tid (64 bit integers).
+        packed_func, if provided, will be called for every object state
+        packed, just after the object is removed. The function must
+        accept two parameters, oid and tid (64 bit integers).
         The sleep function defaults to time.sleep(). It can be
         overridden to do something else instead of sleep during pauses

Deleted: relstorage/trunk/relstorage/adapters/loadstore.py
--- relstorage/trunk/relstorage/adapters/loadstore.py	2009-09-24 03:54:59 UTC (rev 104468)
+++ relstorage/trunk/relstorage/adapters/loadstore.py	2009-09-24 05:24:48 UTC (rev 104469)
@@ -1,679 +0,0 @@
-# Copyright (c) 2009 Zope Foundation and Contributors.
-# All Rights Reserved.
-# This software is subject to the provisions of the Zope Public License,
-# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
-"""Object load/store implementations.
-from base64 import decodestring
-from base64 import encodestring
-from ZODB.POSException import StorageError
-import time
-    from hashlib import md5
-except ImportError:
-    from md5 import new as md5
-def compute_md5sum(data):
-    if data is not None:
-        return md5(data).hexdigest()
-    else:
-        # George Bailey object
-        return None
-class HistoryPreservingPostgreSQLLoadStore(object):
-    def get_current_tid(self, cursor, oid):
-        """Returns the current integer tid for an object.
-        oid is an integer.  Returns None if object does not exist.
-        """
-        stmt = """
-        SELECT tid
-        FROM current_object
-        WHERE zoid = %s
-        """
-        cursor.execute(stmt, (oid,))
-        for (tid,) in cursor:
-            return tid
-        return None
-    def load_current(self, cursor, oid):
-        """Returns the current pickle and integer tid for an object.
-        oid is an integer.  Returns (None, None) if object does not exist.
-        """
-        cursor.execute("""
-        SELECT encode(state, 'base64'), tid
-        FROM current_object
-            JOIN object_state USING(zoid, tid)
-        WHERE zoid = %s
-        """, (oid,))
-        if cursor.rowcount:
-            assert cursor.rowcount == 1
-            state64, tid = cursor.fetchone()
-            if state64 is not None:
-                state = decodestring(state64)
-            else:
-                # This object's creation has been undone
-                state = None
-            return state, tid
-        else:
-            return None, None
-    def load_revision(self, cursor, oid, tid):
-        """Returns the pickle for an object on a particular transaction.
-        Returns None if no such state exists.
-        """
-        cursor.execute("""
-        SELECT encode(state, 'base64')
-        FROM object_state
-        WHERE zoid = %s
-            AND tid = %s
-        """, (oid, tid))
-        if cursor.rowcount:
-            assert cursor.rowcount == 1
-            (state64,) = cursor.fetchone()
-            if state64 is not None:
-                return decodestring(state64)
-        return None
-    def exists(self, cursor, oid):
-        """Returns a true value if the given object exists."""
-        stmt = "SELECT 1 FROM current_object WHERE zoid = %s"
-        cursor.execute(stmt, (oid,))
-        for row in cursor:
-            return True
-        return False
-    def load_before(self, cursor, oid, tid):
-        """Returns the pickle and tid of an object before transaction tid.
-        Returns (None, None) if no earlier state exists.
-        """
-        cursor.execute("""
-        SELECT encode(state, 'base64'), tid
-        FROM object_state
-        WHERE zoid = %s
-            AND tid < %s
-        ORDER BY tid DESC
-        LIMIT 1
-        """, (oid, tid))
-        if cursor.rowcount:
-            assert cursor.rowcount == 1
-            state64, tid = cursor.fetchone()
-            if state64 is not None:
-                state = decodestring(state64)
-            else:
-                # The object's creation has been undone
-                state = None
-            return state, tid
-        else:
-            return None, None
-    def get_object_tid_after(self, cursor, oid, tid):
-        """Returns the tid of the next change after an object revision.
-        Returns None if no later state exists.
-        """
-        stmt = """
-        SELECT tid
-        FROM object_state
-        WHERE zoid = %s
-            AND tid > %s
-        ORDER BY tid
-        LIMIT 1
-        """
-        cursor.execute(stmt, (oid, tid))
-        if cursor.rowcount:
-            assert cursor.rowcount == 1
-            return cursor.fetchone()[0]
-        else:
-            return None
-    def on_store_opened(self, cursor, restart=False):
-        """Create the temporary table for storing objects"""
-        stmt = """
-        CREATE TEMPORARY TABLE temp_store (
-            zoid        BIGINT NOT NULL,
-            prev_tid    BIGINT NOT NULL,
-            md5         CHAR(32),
-            state       BYTEA
-        ) ON COMMIT DROP;
-        CREATE UNIQUE INDEX temp_store_zoid ON temp_store (zoid)
-        """
-        cursor.execute(stmt)
-    def store_temp(self, cursor, oid, prev_tid, data):
-        """Store an object in the temporary table."""
-        md5sum = compute_md5sum(data)
-        stmt = """
-        DELETE FROM temp_store WHERE zoid = %s;
-        INSERT INTO temp_store (zoid, prev_tid, md5, state)
-        VALUES (%s, %s, %s, decode(%s, 'base64'))
-        """
-        cursor.execute(stmt, (oid, oid, prev_tid, md5sum, encodestring(data)))
-    def replace_temp(self, cursor, oid, prev_tid, data):
-        """Replace an object in the temporary table."""
-        md5sum = compute_md5sum(data)
-        stmt = """
-        UPDATE temp_store SET
-            prev_tid = %s,
-            md5 = %s,
-            state = decode(%s, 'base64')
-        WHERE zoid = %s
-        """
-        cursor.execute(stmt, (prev_tid, md5sum, encodestring(data), oid))
-    def restore(self, cursor, oid, tid, data):
-        """Store an object directly, without conflict detection.
-        Used for copying transactions into this database.
-        """
-        md5sum = compute_md5sum(data)
-        stmt = """
-        INSERT INTO object_state (zoid, tid, prev_tid, md5, state)
-        VALUES (%s, %s,
-            COALESCE((SELECT tid FROM current_object WHERE zoid = %s), 0),
-            %s, decode(%s, 'base64'))
-        """
-        if data is not None:
-            data = encodestring(data)
-        cursor.execute(stmt, (oid, tid, oid, md5sum, data))
-    def detect_conflict(self, cursor):
-        """Find one conflict in the data about to be committed.
-        If there is a conflict, returns (oid, prev_tid, attempted_prev_tid,
-        attempted_data).  If there is no conflict, returns None.
-        """
-        stmt = """
-        SELECT temp_store.zoid, current_object.tid, temp_store.prev_tid,
-            encode(temp_store.state, 'base64')
-        FROM temp_store
-            JOIN current_object ON (temp_store.zoid = current_object.zoid)
-        WHERE temp_store.prev_tid != current_object.tid
-        LIMIT 1
-        """
-        cursor.execute(stmt)
-        if cursor.rowcount:
-            oid, prev_tid, attempted_prev_tid, data = cursor.fetchone()
-            return oid, prev_tid, attempted_prev_tid, decodestring(data)
-        return None
-    def move_from_temp(self, cursor, tid):
-        """Moved the temporarily stored objects to permanent storage.
-        Returns the list of oids stored.
-        """
-        stmt = """
-        INSERT INTO object_state (zoid, tid, prev_tid, md5, state)
-        SELECT zoid, %s, prev_tid, md5, state
-        FROM temp_store
-        """
-        cursor.execute(stmt, (tid,))
-        stmt = """
-        SELECT zoid FROM temp_store
-        """
-        cursor.execute(stmt)
-        return [oid for (oid,) in cursor]
-    def update_current(self, cursor, tid):
-        """Update the current object pointers.
-        tid is the integer tid of the transaction being committed.
-        """
-        cursor.execute("""
-        -- Insert objects created in this transaction into current_object.
-        INSERT INTO current_object (zoid, tid)
-        SELECT zoid, tid FROM object_state
-        WHERE tid = %(tid)s
-            AND prev_tid = 0;
-        -- Change existing objects.  To avoid deadlocks,
-        -- update in OID order.
-        UPDATE current_object SET tid = %(tid)s
-        WHERE zoid IN (
-            SELECT zoid FROM object_state
-            WHERE tid = %(tid)s
-                AND prev_tid != 0
-            ORDER BY zoid
-        )
-        """, {'tid': tid})
-class HistoryPreservingMySQLLoadStore(object):
-    def __init__(self, Binary):
-        self.Binary = Binary
-    def get_current_tid(self, cursor, oid):
-        """Returns the current integer tid for an object.
-        oid is an integer.  Returns None if object does not exist.
-        """
-        stmt = """
-        SELECT tid
-        FROM current_object
-        WHERE zoid = %s
-        """
-        cursor.execute(stmt, (oid,))
-        for (tid,) in cursor:
-            return tid
-        return None
-    def load_current(self, cursor, oid):
-        """Returns the current pickle and integer tid for an object.
-        oid is an integer.  Returns (None, None) if object does not exist.
-        """
-        cursor.execute("""
-        SELECT state, tid
-        FROM current_object
-            JOIN object_state USING(zoid, tid)
-        WHERE zoid = %s
-        """, (oid,))
-        if cursor.rowcount:
-            assert cursor.rowcount == 1
-            return cursor.fetchone()
-        else:
-            return None, None
-    def load_revision(self, cursor, oid, tid):
-        """Returns the pickle for an object on a particular transaction.
-        Returns None if no such state exists.
-        """
-        cursor.execute("""
-        SELECT state
-        FROM object_state
-        WHERE zoid = %s
-            AND tid = %s
-        """, (oid, tid))
-        if cursor.rowcount:
-            assert cursor.rowcount == 1
-            (state,) = cursor.fetchone()
-            return state
-        return None
-    def exists(self, cursor, oid):
-        """Returns a true value if the given object exists."""
-        stmt = "SELECT 1 FROM current_object WHERE zoid = %s"
-        cursor.execute(stmt, (oid,))
-        for row in cursor:
-            return True
-        return False
-    def load_before(self, cursor, oid, tid):
-        """Returns the pickle and tid of an object before transaction tid.
-        Returns (None, None) if no earlier state exists.
-        """
-        cursor.execute("""
-        SELECT state, tid
-        FROM object_state
-        WHERE zoid = %s
-            AND tid < %s
-        ORDER BY tid DESC
-        LIMIT 1
-        """, (oid, tid))
-        if cursor.rowcount:
-            assert cursor.rowcount == 1
-            return cursor.fetchone()
-        else:
-            return None, None
-    def get_object_tid_after(self, cursor, oid, tid):
-        """Returns the tid of the next change after an object revision.
-        Returns None if no later state exists.
-        """
-        stmt = """
-        SELECT tid
-        FROM object_state
-        WHERE zoid = %s
-            AND tid > %s
-        ORDER BY tid
-        LIMIT 1
-        """
-        cursor.execute(stmt, (oid, tid))
-        if cursor.rowcount:
-            assert cursor.rowcount == 1
-            return cursor.fetchone()[0]
-        else:
-            return None
-    def on_store_opened(self, cursor, restart=False):
-        """Create the temporary table for storing objects"""
-        if restart:
-            stmt = """
-            DROP TEMPORARY TABLE IF EXISTS temp_store
-            """
-            cursor.execute(stmt)
-        stmt = """
-        CREATE TEMPORARY TABLE temp_store (
-            zoid        BIGINT NOT NULL PRIMARY KEY,
-            prev_tid    BIGINT NOT NULL,
-            md5         CHAR(32),
-            state       LONGBLOB
-        ) ENGINE MyISAM
-        """
-        cursor.execute(stmt)
-    def store_temp(self, cursor, oid, prev_tid, data):
-        """Store an object in the temporary table."""
-        md5sum = compute_md5sum(data)
-        stmt = """
-        REPLACE INTO temp_store (zoid, prev_tid, md5, state)
-        VALUES (%s, %s, %s, %s)
-        """
-        cursor.execute(stmt, (oid, prev_tid, md5sum, self.Binary(data)))
-    def replace_temp(self, cursor, oid, prev_tid, data):
-        """Replace an object in the temporary table."""
-        md5sum = compute_md5sum(data)
-        stmt = """
-        UPDATE temp_store SET
-            prev_tid = %s,
-            md5 = %s,
-            state = %s
-        WHERE zoid = %s
-        """
-        cursor.execute(stmt, (prev_tid, md5sum, self.Binary(data), oid))
-    def restore(self, cursor, oid, tid, data):
-        """Store an object directly, without conflict detection.
-        Used for copying transactions into this database.
-        """
-        md5sum = compute_md5sum(data)
-        stmt = """
-        INSERT INTO object_state (zoid, tid, prev_tid, md5, state)
-        VALUES (%s, %s,
-            COALESCE((SELECT tid FROM current_object WHERE zoid = %s), 0),
-            %s, %s)
-        """
-        if data is not None:
-            data = self.Binary(data)
-        cursor.execute(stmt, (oid, tid, oid, md5sum, data))
-    def detect_conflict(self, cursor):
-        """Find one conflict in the data about to be committed.
-        If there is a conflict, returns (oid, prev_tid, attempted_prev_tid,
-        attempted_data).  If there is no conflict, returns None.
-        """
-        # Lock in share mode to ensure the data being read is up to date.
-        stmt = """
-        SELECT temp_store.zoid, current_object.tid, temp_store.prev_tid,
-            temp_store.state
-        FROM temp_store
-            JOIN current_object ON (temp_store.zoid = current_object.zoid)
-        WHERE temp_store.prev_tid != current_object.tid
-        LIMIT 1
-        """
-        cursor.execute(stmt)
-        if cursor.rowcount:
-            return cursor.fetchone()
-        return None
-    def move_from_temp(self, cursor, tid):
-        """Moved the temporarily stored objects to permanent storage.
-        Returns the list of oids stored.
-        """
-        stmt = """
-        INSERT INTO object_state (zoid, tid, prev_tid, md5, state)
-        SELECT zoid, %s, prev_tid, md5, state
-        FROM temp_store
-        """
-        cursor.execute(stmt, (tid,))
-        stmt = """
-        SELECT zoid FROM temp_store
-        """
-        cursor.execute(stmt)
-        return [oid for (oid,) in cursor]
-    def update_current(self, cursor, tid):
-        """Update the current object pointers.
-        tid is the integer tid of the transaction being committed.
-        """
-        cursor.execute("""
-        REPLACE INTO current_object (zoid, tid)
-        SELECT zoid, tid FROM object_state
-        WHERE tid = %s
-        """, (tid,))
-class HistoryPreservingOracleLoadStore(object):
-    def __init__(self, runner, Binary, inputsize_BLOB, inputsize_BINARY):
-        self.runner = runner
-        self.Binary = Binary
-        self.inputsize_BLOB = inputsize_BLOB
-        self.inputsize_BINARY = inputsize_BINARY
-    def get_current_tid(self, cursor, oid):
-        """Returns the current integer tid for an object.
-        oid is an integer.  Returns None if object does not exist.
-        """
-        stmt = """
-        SELECT tid
-        FROM current_object
-        WHERE zoid = :1
-        """
-        cursor.execute(stmt, (oid,))
-        for (tid,) in cursor:
-            return tid
-        return None
-    def load_current(self, cursor, oid):
-        """Returns the current pickle and integer tid for an object.
-        oid is an integer.  Returns (None, None) if object does not exist.
-        """
-        stmt = """
-        SELECT state, tid
-        FROM current_object
-            JOIN object_state USING(zoid, tid)
-        WHERE zoid = :1
-        """
-        return self.runner.run_lob_stmt(
-            cursor, stmt, (oid,), default=(None, None))
-    def load_revision(self, cursor, oid, tid):
-        """Returns the pickle for an object on a particular transaction.
-        Returns None if no such state exists.
-        """
-        stmt = """
-        SELECT state
-        FROM object_state
-        WHERE zoid = :1
-            AND tid = :2
-        """
-        (state,) = self.runner.run_lob_stmt(
-            cursor, stmt, (oid, tid), default=(None,))
-        return state
-    def exists(self, cursor, oid):
-        """Returns a true value if the given object exists."""
-        stmt = "SELECT 1 FROM current_object WHERE zoid = :1"
-        cursor.execute(stmt, (oid,))
-        for row in cursor:
-            return True
-        return False
-    def load_before(self, cursor, oid, tid):
-        """Returns the pickle and tid of an object before transaction tid.
-        Returns (None, None) if no earlier state exists.
-        """
-        stmt = """
-        SELECT state, tid
-        FROM object_state
-        WHERE zoid = :oid
-            AND tid = (
-                SELECT MAX(tid)
-                FROM object_state
-                WHERE zoid = :oid
-                    AND tid < :tid
-            )
-        """
-        return self.runner.run_lob_stmt(
-            cursor, stmt, {'oid': oid, 'tid': tid}, default=(None, None))
-    def get_object_tid_after(self, cursor, oid, tid):
-        """Returns the tid of the next change after an object revision.
-        Returns None if no later state exists.
-        """
-        stmt = """
-        SELECT MIN(tid)
-        FROM object_state
-        WHERE zoid = :1
-            AND tid > :2
-        """
-        cursor.execute(stmt, (oid, tid))
-        rows = cursor.fetchall()
-        if rows:
-            assert len(rows) == 1
-            return rows[0][0]
-        else:
-            return None
-    on_store_opened = None  # no store connection initialization needed
-    def store_temp(self, cursor, oid, prev_tid, data):
-        """Store an object in the temporary table."""
-        md5sum = compute_md5sum(data)
-        cursor.execute("DELETE FROM temp_store WHERE zoid = :oid", oid=oid)
-        if len(data) <= 2000:
-            # Send data inline for speed.  Oracle docs say maximum size
-            # of a RAW is 2000 bytes.  inputsize_BINARY corresponds with RAW.
-            cursor.setinputsizes(rawdata=self.inputsize_BINARY)
-            stmt = """
-            INSERT INTO temp_store (zoid, prev_tid, md5, state)
-            VALUES (:oid, :prev_tid, :md5sum, :rawdata)
-            """
-            cursor.execute(stmt, oid=oid, prev_tid=prev_tid,
-                md5sum=md5sum, rawdata=data)
-        else:
-            # Send data as a BLOB
-            cursor.setinputsizes(blobdata=self.inputsize_BLOB)
-            stmt = """
-            INSERT INTO temp_store (zoid, prev_tid, md5, state)
-            VALUES (:oid, :prev_tid, :md5sum, :blobdata)
-            """
-            cursor.execute(stmt, oid=oid, prev_tid=prev_tid,
-                md5sum=md5sum, blobdata=data)
-    def replace_temp(self, cursor, oid, prev_tid, data):
-        """Replace an object in the temporary table."""
-        md5sum = compute_md5sum(data)
-        cursor.setinputsizes(data=self.inputsize_BLOB)
-        stmt = """
-        UPDATE temp_store SET
-            prev_tid = :prev_tid,
-            md5 = :md5sum,
-            state = :data
-        WHERE zoid = :oid
-        """
-        cursor.execute(stmt, oid=oid, prev_tid=prev_tid,
-            md5sum=md5sum, data=self.Binary(data))
-    def restore(self, cursor, oid, tid, data):
-        """Store an object directly, without conflict detection.
-        Used for copying transactions into this database.
-        """
-        md5sum = compute_md5sum(data)
-        cursor.setinputsizes(data=self.inputsize_BLOB)
-        stmt = """
-        INSERT INTO object_state (zoid, tid, prev_tid, md5, state)
-        VALUES (:oid, :tid,
-            COALESCE((SELECT tid FROM current_object WHERE zoid = :oid), 0),
-            :md5sum, :data)
-        """
-        if data is not None:
-            data = self.Binary(data)
-        cursor.execute(stmt, oid=oid, tid=tid, md5sum=md5sum, data=data)
-    def detect_conflict(self, cursor):
-        """Find one conflict in the data about to be committed.
-        If there is a conflict, returns (oid, prev_tid, attempted_prev_tid,
-        attempted_data).  If there is no conflict, returns None.
-        """
-        stmt = """
-        SELECT temp_store.zoid, current_object.tid, temp_store.prev_tid,
-            temp_store.state
-        FROM temp_store
-            JOIN current_object ON (temp_store.zoid = current_object.zoid)
-        WHERE temp_store.prev_tid != current_object.tid
-        """
-        return self.runner.run_lob_stmt(cursor, stmt)
-    def move_from_temp(self, cursor, tid):
-        """Move the temporarily stored objects to permanent storage.
-        Returns the list of oids stored.
-        """
-        stmt = """
-        INSERT INTO object_state (zoid, tid, prev_tid, md5, state)
-        SELECT zoid, :tid, prev_tid, md5, state
-        FROM temp_store
-        """
-        cursor.execute(stmt, tid=tid)
-        stmt = """
-        SELECT zoid FROM temp_store
-        """
-        cursor.execute(stmt)
-        return [oid for (oid,) in cursor]
-    def update_current(self, cursor, tid):
-        """Update the current object pointers.
-        tid is the integer tid of the transaction being committed.
-        """
-        # Insert objects created in this transaction into current_object.
-        stmt = """
-        INSERT INTO current_object (zoid, tid)
-        SELECT zoid, tid FROM object_state
-        WHERE tid = :1
-            AND prev_tid = 0
-        """
-        cursor.execute(stmt, (tid,))
-        # Change existing objects.
-        stmt = """
-        UPDATE current_object SET tid = :1
-        WHERE zoid IN (
-            SELECT zoid FROM object_state
-            WHERE tid = :1
-                AND prev_tid != 0
-        )
-        """
-        cursor.execute(stmt, (tid,))

Modified: relstorage/trunk/relstorage/adapters/mysql.py
--- relstorage/trunk/relstorage/adapters/mysql.py	2009-09-24 03:54:59 UTC (rev 104468)
+++ relstorage/trunk/relstorage/adapters/mysql.py	2009-09-24 05:24:48 UTC (rev 104469)
@@ -53,7 +53,7 @@
 from relstorage.adapters.connmanager import AbstractConnectionManager
 from relstorage.adapters.dbiter import HistoryPreservingDatabaseIterator
-from relstorage.adapters.loadstore import HistoryPreservingMySQLLoadStore
+from relstorage.adapters.hpmover import HistoryPreservingObjectMover
 from relstorage.adapters.locker import MySQLLocker
 from relstorage.adapters.oidallocator import MySQLOIDAllocator
 from relstorage.adapters.packundo import HistoryPreservingPackUndo
@@ -88,11 +88,13 @@
-        self.loadstore = HistoryPreservingMySQLLoadStore(
+        self.mover = HistoryPreservingObjectMover(
+            database_name='mysql',
+            runner=self.runner,
+        self.connmanager.set_on_store_opened(self.mover.on_store_opened)
         self.oidallocator = MySQLOIDAllocator()
-        self.connmanager.set_on_store_opened(self.loadstore.on_store_opened)
         self.txncontrol = MySQLTransactionControl(
@@ -130,18 +132,18 @@
         self.zap_all = self.schema.zap_all
         self.drop_all = self.schema.drop_all
-        self.get_current_tid = self.loadstore.get_current_tid
-        self.load_current = self.loadstore.load_current
-        self.load_revision = self.loadstore.load_revision
-        self.exists = self.loadstore.exists
-        self.load_before = self.loadstore.load_before
-        self.get_object_tid_after = self.loadstore.get_object_tid_after
-        self.store_temp = self.loadstore.store_temp
-        self.replace_temp = self.loadstore.replace_temp
-        self.restore = self.loadstore.restore
-        self.detect_conflict = self.loadstore.detect_conflict
-        self.move_from_temp = self.loadstore.move_from_temp
-        self.update_current = self.loadstore.update_current
+        self.get_current_tid = self.mover.get_current_tid
+        self.load_current = self.mover.load_current
+        self.load_revision = self.mover.load_revision
+        self.exists = self.mover.exists
+        self.load_before = self.mover.load_before
+        self.get_object_tid_after = self.mover.get_object_tid_after
+        self.store_temp = self.mover.store_temp
+        self.replace_temp = self.mover.replace_temp
+        self.restore = self.mover.restore
+        self.detect_conflict = self.mover.detect_conflict
+        self.move_from_temp = self.mover.move_from_temp
+        self.update_current = self.mover.update_current
         self.set_min_oid = self.oidallocator.set_min_oid
         self.new_oid = self.oidallocator.new_oid

Modified: relstorage/trunk/relstorage/adapters/oracle.py
--- relstorage/trunk/relstorage/adapters/oracle.py	2009-09-24 03:54:59 UTC (rev 104468)
+++ relstorage/trunk/relstorage/adapters/oracle.py	2009-09-24 05:24:48 UTC (rev 104469)
@@ -19,7 +19,7 @@
 from relstorage.adapters.connmanager import AbstractConnectionManager
 from relstorage.adapters.dbiter import HistoryPreservingDatabaseIterator
-from relstorage.adapters.loadstore import HistoryPreservingOracleLoadStore
+from relstorage.adapters.hpmover import HistoryPreservingObjectMover
 from relstorage.adapters.locker import OracleLocker
 from relstorage.adapters.oidallocator import OracleOIDAllocator
 from relstorage.adapters.packundo import OracleHistoryPreservingPackUndo
@@ -81,16 +81,17 @@
-        self.loadstore = HistoryPreservingOracleLoadStore(
+        self.mover = HistoryPreservingObjectMover(
+            database_name='oracle',
+        self.connmanager.set_on_store_opened(self.mover.on_store_opened)
         self.oidallocator = OracleOIDAllocator(
-        self.connmanager.set_on_store_opened(self.loadstore.on_store_opened)
         self.txncontrol = OracleTransactionControl(
@@ -128,18 +129,18 @@
         self.zap_all = self.schema.zap_all
         self.drop_all = self.schema.drop_all
-        self.get_current_tid = self.loadstore.get_current_tid
-        self.load_current = self.loadstore.load_current
-        self.load_revision = self.loadstore.load_revision
-        self.exists = self.loadstore.exists
-        self.load_before = self.loadstore.load_before
-        self.get_object_tid_after = self.loadstore.get_object_tid_after
-        self.store_temp = self.loadstore.store_temp
-        self.replace_temp = self.loadstore.replace_temp
-        self.restore = self.loadstore.restore
-        self.detect_conflict = self.loadstore.detect_conflict
-        self.move_from_temp = self.loadstore.move_from_temp
-        self.update_current = self.loadstore.update_current
+        self.get_current_tid = self.mover.get_current_tid
+        self.load_current = self.mover.load_current
+        self.load_revision = self.mover.load_revision
+        self.exists = self.mover.exists
+        self.load_before = self.mover.load_before
+        self.get_object_tid_after = self.mover.get_object_tid_after
+        self.store_temp = self.mover.store_temp
+        self.replace_temp = self.mover.replace_temp
+        self.restore = self.mover.restore
+        self.detect_conflict = self.mover.detect_conflict
+        self.move_from_temp = self.mover.move_from_temp
+        self.update_current = self.mover.update_current
         self.set_min_oid = self.oidallocator.set_min_oid
         self.new_oid = self.oidallocator.new_oid

Modified: relstorage/trunk/relstorage/adapters/postgresql.py
--- relstorage/trunk/relstorage/adapters/postgresql.py	2009-09-24 03:54:59 UTC (rev 104468)
+++ relstorage/trunk/relstorage/adapters/postgresql.py	2009-09-24 05:24:48 UTC (rev 104469)
@@ -19,7 +19,7 @@
 from relstorage.adapters.connmanager import AbstractConnectionManager
 from relstorage.adapters.dbiter import HistoryPreservingDatabaseIterator
-from relstorage.adapters.loadstore import HistoryPreservingPostgreSQLLoadStore
+from relstorage.adapters.hpmover import HistoryPreservingObjectMover
 from relstorage.adapters.locker import PostgreSQLLocker
 from relstorage.adapters.oidallocator import PostgreSQLOIDAllocator
 from relstorage.adapters.packundo import HistoryPreservingPackUndo
@@ -57,9 +57,12 @@
-        self.loadstore = HistoryPreservingPostgreSQLLoadStore()
+        self.mover = HistoryPreservingObjectMover(
+            database_name='postgresql',
+            runner=self.runner,
+            )
+        self.connmanager.set_on_store_opened(self.mover.on_store_opened)
         self.oidallocator = PostgreSQLOIDAllocator()
-        self.connmanager.set_on_store_opened(self.loadstore.on_store_opened)
         self.txncontrol = PostgreSQLTransactionControl()
         self.poller = Poller(
             poll_query="EXECUTE get_latest_tid",
@@ -95,18 +98,18 @@
         self.zap_all = self.schema.zap_all
         self.drop_all = self.schema.drop_all
-        self.get_current_tid = self.loadstore.get_current_tid
-        self.load_current = self.loadstore.load_current
-        self.load_revision = self.loadstore.load_revision
-        self.exists = self.loadstore.exists
-        self.load_before = self.loadstore.load_before
-        self.get_object_tid_after = self.loadstore.get_object_tid_after
-        self.store_temp = self.loadstore.store_temp
-        self.replace_temp = self.loadstore.replace_temp
-        self.restore = self.loadstore.restore
-        self.detect_conflict = self.loadstore.detect_conflict
-        self.move_from_temp = self.loadstore.move_from_temp
-        self.update_current = self.loadstore.update_current
+        self.get_current_tid = self.mover.get_current_tid
+        self.load_current = self.mover.load_current
+        self.load_revision = self.mover.load_revision
+        self.exists = self.mover.exists
+        self.load_before = self.mover.load_before
+        self.get_object_tid_after = self.mover.get_object_tid_after
+        self.store_temp = self.mover.store_temp
+        self.replace_temp = self.mover.replace_temp
+        self.restore = self.mover.restore
+        self.detect_conflict = self.mover.detect_conflict
+        self.move_from_temp = self.mover.move_from_temp
+        self.update_current = self.mover.update_current
         self.set_min_oid = self.oidallocator.set_min_oid
         self.new_oid = self.oidallocator.new_oid

Modified: relstorage/trunk/relstorage/adapters/schema.py
--- relstorage/trunk/relstorage/adapters/schema.py	2009-09-24 03:54:59 UTC (rev 104468)
+++ relstorage/trunk/relstorage/adapters/schema.py	2009-09-24 05:24:48 UTC (rev 104469)
@@ -418,6 +418,8 @@
             todo = list(self.all_tables)
             for table in todo:
+                if table.startswith('temp_'):
+                    continue
                 if table in existent:
                     cursor.execute("DELETE FROM %s" % table)
             script = filter_script(self.init_script, self.database_name)

More information about the checkins mailing list