[Checkins] SVN: relstorage/branches/ Added a 1.1 branch for pack optimizations

Shane Hathaway shane at hathawaymix.org
Tue Apr 8 03:58:59 EDT 2008


Log message for revision 85160:
  Added a 1.1 branch for pack optimizations
  

Changed:
  A   relstorage/branches/
  A   relstorage/branches/1.1/
  U   relstorage/branches/1.1/notes/migrate-1.0.1.txt
  U   relstorage/branches/1.1/relstorage/adapters/common.py
  U   relstorage/branches/1.1/relstorage/adapters/postgresql.py
  U   relstorage/branches/1.1/relstorage/tests/packstresstest.py

-=-
Copied: relstorage/branches/1.1 (from rev 85056, relstorage/trunk)

Modified: relstorage/branches/1.1/notes/migrate-1.0.1.txt
===================================================================
--- relstorage/trunk/notes/migrate-1.0.1.txt	2008-04-01 07:00:46 UTC (rev 85056)
+++ relstorage/branches/1.1/notes/migrate-1.0.1.txt	2008-04-08 07:58:59 UTC (rev 85160)
@@ -1,17 +1,32 @@
 
 Migrating from version 1.0.1
 
-Create a new index on the object_state table.  The new iterative pack code
-will operate slowly without this index.  The following statement works in
-all 3 supported databases:
+PostgreSQL:
 
     CREATE INDEX object_state_prev_tid ON object_state (prev_tid);
 
-Also, for PostgreSQL only, apply this pack optimization:
-
     DROP INDEX pack_object_keep_zoid;
     CREATE INDEX pack_object_keep_false ON pack_object (zoid)
         WHERE keep = false;
     CREATE INDEX pack_object_keep_true ON pack_object (zoid, keep_tid)
         WHERE keep = true;
 
+    ALTER TABLE transaction ADD COLUMN empty BOOLEAN NOT NULL DEFAULT FALSE;
+
+    CREATE INDEX current_object_tid ON current_object (tid);
+
+    ALTER TABLE object_ref PRIMARY KEY (tid, zoid, to_zoid);
+    DROP INDEX object_ref_from;
+    DROP INDEX object_ref_tid;
+    DROP INDEX object_ref_to;
+
+    CREATE TABLE pack_state (
+        tid         BIGINT NOT NULL,
+        zoid        BIGINT NOT NULL,
+        PRIMARY KEY (tid, zoid)
+    );
+
+    CREATE TABLE pack_state_tid (
+        tid         BIGINT NOT NULL PRIMARY KEY
+    );
+

Modified: relstorage/branches/1.1/relstorage/adapters/common.py
===================================================================
--- relstorage/trunk/relstorage/adapters/common.py	2008-04-01 07:00:46 UTC (rev 85056)
+++ relstorage/branches/1.1/relstorage/adapters/common.py	2008-04-08 07:58:59 UTC (rev 85160)
@@ -42,6 +42,7 @@
         'TRUE':         'TRUE',
         'FALSE':        'FALSE',
         'OCTET_LENGTH': 'OCTET_LENGTH',
+        'TRUNCATE':     'TRUNCATE',
         'oid':          '%(oid)s',
         'tid':          '%(tid)s',
         'pack_tid':     '%(pack_tid)s',
@@ -403,6 +404,53 @@
                     log.info("pre_pack: start without gc")
                     self._pre_pack_without_gc(
                         conn, cursor, pack_tid)
+                conn.commit()
+
+                log.info("pre_pack: enumerating states to pack")
+                stmt = "%(TRUNCATE)s pack_state"
+                self._run_script_stmt(cursor, stmt)
+                to_remove = 0
+
+                if gc:
+                    stmt = """
+                    INSERT INTO pack_state (tid, zoid)
+                    SELECT tid, zoid
+                    FROM object_state
+                        JOIN pack_object USING (zoid)
+                    WHERE keep = %(FALSE)s
+                        AND tid > 0
+                        AND tid <= %(pack_tid)s
+                    """
+                    self._run_script_stmt(cursor, stmt, {'pack_tid':
+                        pack_tid})
+                    to_remove += cursor.rowcount
+
+                stmt = """
+                INSERT INTO pack_state (tid, zoid)
+                SELECT tid, zoid
+                FROM object_state
+                    JOIN pack_object USING (zoid)
+                WHERE keep = %(TRUE)s
+                    AND tid > 0
+                    AND tid != keep_tid
+                    AND tid <= %(pack_tid)s
+                """
+                self._run_script_stmt(cursor, stmt, {'pack_tid':pack_tid})
+                to_remove += cursor.rowcount
+
+                log.info("pre_pack: enumerating transactions to pack")
+                stmt = "%(TRUNCATE)s pack_state_tid"
+                self._run_script_stmt(cursor, stmt)
+                stmt = """
+                INSERT INTO pack_state_tid (tid)
+                SELECT DISTINCT tid
+                FROM pack_state
+                """
+                cursor.execute(stmt)
+
+                log.info("pre_pack: will remove %d object state(s)",
+                    to_remove)
+
             except:
                 log.exception("pre_pack: failed")
                 conn.rollback()
@@ -425,7 +473,7 @@
         log.debug("pre_pack: populating pack_object")
         subselect = self._scripts['select_keep_tid']
         stmt = """
-        DELETE FROM pack_object;
+        %(TRUNCATE)s pack_object;
 
         INSERT INTO pack_object (zoid, keep)
         SELECT DISTINCT zoid, %(TRUE)s
@@ -450,7 +498,7 @@
         # removed (if nothing references the OID) or whose history will
         # be cut.
         stmt = """
-        DELETE FROM pack_object;
+        %(TRUNCATE)s pack_object;
 
         INSERT INTO pack_object (zoid, keep)
         SELECT DISTINCT zoid, %(FALSE)s
@@ -503,7 +551,7 @@
             # references.
             subselect = self._scripts['select_keep_tid']
             stmt = """
-            DELETE FROM temp_pack_visit;
+            %(TRUNCATE)s temp_pack_visit;
 
             INSERT INTO temp_pack_visit (zoid)
             SELECT zoid
@@ -537,7 +585,7 @@
             self._run_script_stmt(cursor, stmt)
             found_count = cursor.rowcount
 
-            log.info("pre_pack: found %d more referenced object(s) in "
+            log.debug("pre_pack: found %d more referenced object(s) in "
                 "pass %d", found_count, pass_num)
             if not found_count:
                 # No new references detected.
@@ -640,7 +688,7 @@
         """Fill object_refs with all states for multiple transactions."""
         if tids:
             added = 0
-            log.info("pre_pack: examining all references from objects in %d "
+            log.info("pre_pack: discovering references from objects in %d "
                 "transaction(s)" % len(tids))
             for tid in tids:
                 added += self._add_refs_for_tid(cursor, tid, get_references)
@@ -661,7 +709,8 @@
         pass
 
 
-    def pack(self, pack_tid, batch_timeout=2.0, min_delay=2.0, max_delay=15.0):
+    def pack(self, pack_tid, batch_timeout=5.0, min_delay=5.0,
+            max_delay=20.0):
         """Pack.  Requires populated pack tables."""
 
         # Read committed mode is sufficient.
@@ -669,45 +718,30 @@
         try:
             try:
                 stmt = """
-                SELECT tid
+                SELECT transaction.tid,
+                    CASE WHEN packed = %(TRUE)s THEN 1 ELSE 0 END,
+                    CASE WHEN pack_state_tid.tid IS NOT NULL THEN 1 ELSE 0 END
                 FROM transaction
-                WHERE tid > 0
-                    AND tid <= %(pack_tid)s
+                    LEFT JOIN pack_state_tid USING (tid)
+                WHERE transaction.tid > 0
+                    AND transaction.tid <= %(pack_tid)s
+                    AND (packed = %(FALSE)s OR pack_state_tid.tid IS NOT NULL)
                 """
                 self._run_script_stmt(cursor, stmt, {'pack_tid': pack_tid})
-                tids = [tid for (tid,) in cursor]
-                tids.sort()  # oldest first
+                tid_rows = list(cursor)
+                tid_rows.sort()  # oldest first
 
-                stmt = """
-                SELECT COUNT(1)
-                FROM pack_object
-                WHERE keep = %(FALSE)s
-                """
-                self._run_script_stmt(cursor, stmt)
-                delete_count = cursor.fetchone()[0]
+                log.info("pack: will pack %d transaction(s)", len(tid_rows))
 
-                stmt = """
-                SELECT COUNT(1)
-                FROM pack_object
-                WHERE keep = %(TRUE)s
-                """
-                self._run_script_stmt(cursor, stmt)
-                trim_count = cursor.fetchone()[0]
-
-                log.info(
-                    "pack: will pack %d transaction(s), delete %s object(s),"
-                        " and trim %s old object(s)",
-                        len(tids), delete_count, trim_count)
-
                 # Hold the commit lock while packing to prevent deadlocks.
                 # Pack in small batches of transactions in order to minimize
                 # the interruption of concurrent write operations.
                 start = time.time()
                 self._hold_commit_lock(cursor)
-                for tid in tids:
-                    self._pack_transaction(cursor, pack_tid, tid)
-                    elapsed = time.time() - start
-                    if elapsed >= batch_timeout:
+                for tid, packed, has_states_to_remove in tid_rows:
+                    self._pack_transaction(
+                        cursor, pack_tid, tid, packed, has_states_to_remove)
+                    if time.time() >= start + batch_timeout:
                         # commit the work done so far and release the
                         # commit lock for a short time
                         conn.commit()
@@ -715,6 +749,7 @@
                         # Add a delay that matches the amount of time
                         # spent with the commit lock held, within limits.
                         # This targets a 50% duty cycle.
+                        elapsed = time.time() - start
                         delay = max(min_delay, min(max_delay, elapsed))
                         if delay > 0:
                             log.debug('pack: sleeping %.4g second(s)', delay)
@@ -722,8 +757,7 @@
                         self._hold_commit_lock(cursor)
                         start = time.time()
 
-                log.debug("pack: clearing pack_object")
-                cursor.execute("DELETE FROM pack_object")
+                self._pack_cleanup(conn, cursor)
 
             except:
                 log.exception("pack: failed")
@@ -738,76 +772,103 @@
             self.close(conn, cursor)
 
 
-    def _pack_transaction(self, cursor, pack_tid, tid):
+    def _pack_transaction(self, cursor, pack_tid, tid, packed,
+            has_states_to_remove):
         """Pack one transaction.  Requires populated pack tables."""
         log.debug("pack: transaction %d: packing", tid)
-        deleted = 0
-        for _table in ('object_ref', 'current_object', 'object_state'):
-            # Remove objects that are in pack_object and have keep
-            # set to false.
-            stmt = """
-            DELETE FROM _table
-            WHERE tid = %(tid)s
-                AND zoid IN (
-                    SELECT zoid
-                    FROM pack_object
-                    WHERE keep = %(FALSE)s
-                )
-            """.replace('_table', _table)
-            self._run_script_stmt(cursor, stmt, {'tid': tid})
-            deleted += cursor.rowcount
+        counters = {}
 
-            if _table != 'current_object':
-                # Cut the history of objects in pack_object that
-                # have keep set to true.
+        if has_states_to_remove:
+            for _table in ('current_object', 'object_state'):
                 stmt = """
                 DELETE FROM _table
                 WHERE tid = %(tid)s
                     AND zoid IN (
-                        SELECT zoid
-                        FROM pack_object
-                        WHERE keep = %(TRUE)s
-                            AND keep_tid != %(tid)s
+                        SELECT pack_state.zoid
+                        FROM pack_state
+                        WHERE pack_state.tid = %(tid)s
                     )
                 """.replace('_table', _table)
                 self._run_script_stmt(cursor, stmt, {'tid': tid})
-                deleted += cursor.rowcount
+                counters[_table] = cursor.rowcount
 
-        # Terminate prev_tid chains
-        stmt = """
-        UPDATE object_state SET prev_tid = 0
-        WHERE prev_tid = %(tid)s
-            AND tid <= %(pack_tid)s
-        """
-        self._run_script_stmt(cursor, stmt,
-            {'pack_tid': pack_tid, 'tid': tid})
+            # Terminate prev_tid chains
+            stmt = """
+            UPDATE object_state SET prev_tid = 0
+            WHERE prev_tid = %(tid)s
+                AND tid <= %(pack_tid)s
+            """
+            self._run_script_stmt(cursor, stmt,
+                {'pack_tid': pack_tid, 'tid': tid})
 
-        # Find out whether the transaction can be removed
+        # Find out whether the transaction is empty
         stmt = self._scripts['transaction_has_data']
         self._run_script_stmt(cursor, stmt, {'tid': tid})
-        has_data = list(cursor)
+        empty = not list(cursor)
 
-        if has_data:
-            stmt = """
-            UPDATE transaction SET packed = %(TRUE)s
-            WHERE tid = %(tid)s
-                AND packed = %(FALSE)s
-            """
-            self._run_script_stmt(cursor, stmt, {'tid': tid})
-
+        # mark the transaction packed and possibly empty
+        if empty:
+            clause = 'empty = %(TRUE)s'
+            state = 'empty'
         else:
-            stmt = """
-            DELETE FROM object_refs_added
-            WHERE tid = %(tid)s;
-            DELETE FROM transaction
-            WHERE tid = %(tid)s
-            """
-            self._run_script(cursor, stmt, {'tid': tid})
-            deleted += cursor.rowcount
+            clause = 'empty = %(FALSE)s'
+            state = 'not empty'
+        stmt = "UPDATE transaction SET packed = %(TRUE)s, " + clause
+        stmt += " WHERE tid = %(tid)s"
+        self._run_script_stmt(cursor, stmt, {'tid': tid})
 
-        log.debug("pack: transaction %d: removed %d row(s)", tid, deleted)
+        log.debug(
+            "pack: transaction %d (%s): removed %d object(s) and %d state(s)",
+            tid, state,
+            counters.get('current_object', 0),
+            counters.get('object_state', 0))
 
 
+    def _pack_cleanup(self, conn, cursor):
+        """Remove unneeded table rows after packing"""
+        # commit the work done so far
+        conn.commit()
+        self._release_commit_lock(cursor)
+        self._hold_commit_lock(cursor)
+        log.info("pack: removing empty packed transactions")
+        stmt = """
+        DELETE FROM transaction
+        WHERE packed = %(TRUE)s
+            AND empty = %(TRUE)s
+        """
+        self._run_script_stmt(cursor, stmt)
+
+        # perform cleanup that does not require the commit lock
+        conn.commit()
+        self._release_commit_lock(cursor)
+
+        log.debug("pack: clearing temporary pack state")
+        for _table in ('pack_object', 'pack_state', 'pack_state_tid'):
+            stmt = '%(TRUNCATE)s ' + _table
+            self._run_script_stmt(cursor, stmt)
+
+        log.debug("pack: removing unused object references")
+        stmt = """
+        DELETE FROM object_ref
+        WHERE tid IN (
+            SELECT tid
+            FROM transaction
+            WHERE empty = %(TRUE)s
+            )
+        """
+        self._run_script_stmt(cursor, stmt)
+
+        stmt = """
+        DELETE FROM object_refs_added
+        WHERE tid IN (
+            SELECT tid
+            FROM transaction
+            WHERE empty = %(TRUE)s
+            )
+        """
+        self._run_script_stmt(cursor, stmt)
+
+
     def poll_invalidations(self, conn, cursor, prev_polled_tid, ignore_tid):
         """Polls for new transactions.
 

Modified: relstorage/branches/1.1/relstorage/adapters/postgresql.py
===================================================================
--- relstorage/trunk/relstorage/adapters/postgresql.py	2008-04-01 07:00:46 UTC (rev 85056)
+++ relstorage/branches/1.1/relstorage/adapters/postgresql.py	2008-04-08 07:58:59 UTC (rev 85160)
@@ -41,6 +41,7 @@
         CREATE TABLE transaction (
             tid         BIGINT NOT NULL PRIMARY KEY,
             packed      BOOLEAN NOT NULL DEFAULT FALSE,
+            empty       BOOLEAN NOT NULL DEFAULT FALSE,
             username    BYTEA NOT NULL,
             description BYTEA NOT NULL,
             extension   BYTEA
@@ -74,6 +75,7 @@
             tid         BIGINT NOT NULL,
             FOREIGN KEY (zoid, tid) REFERENCES object_state
         );
+        CREATE INDEX current_object_tid ON current_object (tid);
 
         -- During packing, an exclusive lock is held on pack_lock.
         CREATE TABLE pack_lock ();
@@ -87,11 +89,9 @@
         CREATE TABLE object_ref (
             zoid        BIGINT NOT NULL,
             tid         BIGINT NOT NULL,
-            to_zoid     BIGINT NOT NULL
+            to_zoid     BIGINT NOT NULL,
+            PRIMARY KEY (tid, zoid, to_zoid)
         );
-        CREATE INDEX object_ref_from ON object_ref (zoid);
-        CREATE INDEX object_ref_tid ON object_ref (tid);
-        CREATE INDEX object_ref_to ON object_ref (to_zoid);
 
         -- The object_refs_added table tracks whether object_refs has
         -- been populated for all states in a given transaction.
@@ -122,6 +122,19 @@
             WHERE keep = false;
         CREATE INDEX pack_object_keep_true ON pack_object (zoid, keep_tid)
             WHERE keep = true;
+
+        -- Temporary state during packing: the list of object states to pack.
+        CREATE TABLE pack_state (
+            tid         BIGINT NOT NULL,
+            zoid        BIGINT NOT NULL,
+            PRIMARY KEY (tid, zoid)
+        );
+
+        -- Temporary state during packing: the list of transactions that
+        -- have at least one object state to pack.
+        CREATE TABLE pack_state_tid (
+            tid         BIGINT NOT NULL PRIMARY KEY
+        );
         """
         cursor.execute(stmt)
 

Modified: relstorage/branches/1.1/relstorage/tests/packstresstest.py
===================================================================
--- relstorage/trunk/relstorage/tests/packstresstest.py	2008-04-01 07:00:46 UTC (rev 85056)
+++ relstorage/branches/1.1/relstorage/tests/packstresstest.py	2008-04-08 07:58:59 UTC (rev 85160)
@@ -40,8 +40,6 @@
     print 'size:'
     print d.getSize()
 
-    import pdb; pdb.set_trace()
-
 print 'packing...'
 d.pack()
 



More information about the Checkins mailing list