[Checkins] SVN: relstorage/trunk/ Modified the pack code to pack one transaction at a time and

Shane Hathaway shane at hathawaymix.org
Mon Mar 31 23:03:20 EDT 2008


Log message for revision 85051:
  Modified the pack code to pack one transaction at a time and
  release the commit lock frequently.  This should help large pack
  operations.
  
  

Changed:
  U   relstorage/trunk/CHANGELOG.txt
  A   relstorage/trunk/notes/migrate-1.0.1.txt
  U   relstorage/trunk/relstorage/adapters/common.py
  U   relstorage/trunk/relstorage/adapters/mysql.py
  U   relstorage/trunk/relstorage/adapters/oracle.py
  U   relstorage/trunk/relstorage/adapters/postgresql.py
  A   relstorage/trunk/relstorage/tests/packstresstest.py

-=-
Modified: relstorage/trunk/CHANGELOG.txt
===================================================================
--- relstorage/trunk/CHANGELOG.txt	2008-04-01 01:51:37 UTC (rev 85050)
+++ relstorage/trunk/CHANGELOG.txt	2008-04-01 03:03:20 UTC (rev 85051)
@@ -11,7 +11,11 @@
 - Additions to the object_ref table are now periodically committed
   during pre_pack so that the work is not lost if pre_pack fails.
 
+- Modified the pack code to pack one transaction at a time and
+  release the commit lock frequently.  This should help large pack
+  operations.
 
+
 RelStorage 1.0.1
 
 - The speedtest script failed if run on a test database that has no tables.

Added: relstorage/trunk/notes/migrate-1.0.1.txt
===================================================================
--- relstorage/trunk/notes/migrate-1.0.1.txt	                        (rev 0)
+++ relstorage/trunk/notes/migrate-1.0.1.txt	2008-04-01 03:03:20 UTC (rev 85051)
@@ -0,0 +1,9 @@
+
+Migrating from version 1.0.1
+
+Create a new index on the object_state table.  The new pack code will
+probably crawl if this index is missing.  This statement works in all 3
+supported databases:
+
+  CREATE INDEX object_state_prev_tid ON object_state (prev_tid);
+

Modified: relstorage/trunk/relstorage/adapters/common.py
===================================================================
--- relstorage/trunk/relstorage/adapters/common.py	2008-04-01 01:51:37 UTC (rev 85050)
+++ relstorage/trunk/relstorage/adapters/common.py	2008-04-01 03:03:20 UTC (rev 85051)
@@ -16,6 +16,7 @@
 from ZODB.POSException import UndoError
 
 import logging
+import time
 
 log = logging.getLogger("relstorage.adapters.common")
 
@@ -87,6 +88,13 @@
             """,
 
         'reset_temp_undo': "DROP TABLE temp_undo",
+
+        'transaction_has_data': """
+            SELECT tid
+            FROM object_state
+            WHERE tid = %(tid)s
+            LIMIT 1
+            """,
     }
 
 
@@ -647,23 +655,27 @@
         cursor.execute("LOCK TABLE commit_lock IN EXCLUSIVE MODE")
 
 
-    def pack(self, pack_tid):
+    def _release_commit_lock(self, cursor):
+        """Release the commit lock during packing"""
+        # no action needed
+        pass
+
+
+    def pack(self, pack_tid, max_batch_time=1.0, delay_time=1.0):
         """Pack.  Requires populated pack tables."""
 
         # Read committed mode is sufficient.
         conn, cursor = self.open()
         try:
             try:
-                log.info("pack: start, pack_tid = %d", pack_tid)
-
                 stmt = """
-                SELECT COUNT(1)
+                SELECT tid
                 FROM transaction
                 WHERE tid > 0
                     AND tid <= %(pack_tid)s
                 """
                 self._run_script_stmt(cursor, stmt, {'pack_tid': pack_tid})
-                transaction_count = cursor.fetchone()[0]
+                tids = [tid for (tid,) in cursor]
 
                 stmt = """
                 SELECT COUNT(1)
@@ -684,91 +696,26 @@
                 log.info(
                     "pack: will pack %d transaction(s), delete %s object(s),"
                         " and trim %s old object(s)",
-                        transaction_count, delete_count, trim_count)
+                        len(tids), delete_count, trim_count)
 
-                # hold the commit lock for a moment to prevent deadlocks.
+                # Hold the commit lock while packing to prevent deadlocks.
+                # Pack in small batches of transactions in order to minimize
+                # the interruption of concurrent write operations.
+                expiration = time.time() + max_batch_time
                 self._hold_commit_lock(cursor)
+                for tid in tids:
+                    self._pack_transaction(cursor, pack_tid, tid)
+                    if time.time() > expiration:
+                        # commit the work done so far and release the
+                        # commit lock for a short time
+                        conn.commit()
+                        self._release_commit_lock(cursor)
+                        if delay_time > 0:
+                            log.debug('pack: sleeping %d seconds', delay_time)
+                            time.sleep(delay_time)
+                        self._hold_commit_lock(cursor)
+                        expiration = time.time() + max_batch_time
 
-                for table in ('object_ref', 'current_object', 'object_state'):
-
-                    if delete_count > 0:
-                        # Remove objects that are in pack_object and have keep
-                        # set to false.
-                        log.debug("pack: deleting objects from %s", table)
-                        stmt = """
-                        DELETE FROM %s
-                        WHERE zoid IN (
-                                SELECT zoid
-                                FROM pack_object
-                                WHERE keep = %%(FALSE)s
-                            )
-                        """ % table
-                        self._run_script_stmt(cursor, stmt)
-
-                    if trim_count > 0 and table != 'current_object':
-                        # Cut the history of objects in pack_object that
-                        # have keep set to true.
-                        log.debug("pack: trimming objects in %s", table)
-                        stmt = """
-                        DELETE FROM %s
-                        WHERE zoid IN (
-                                SELECT zoid
-                                FROM pack_object
-                                WHERE keep = %%(TRUE)s
-                            )
-                            AND tid < (
-                                SELECT keep_tid
-                                FROM pack_object
-                                WHERE zoid = %s.zoid
-                            )
-                        """ % (table, table)
-                        self._run_script_stmt(cursor, stmt)
-
-                log.debug("pack: terminating prev_tid chains")
-                stmt = """
-                UPDATE object_state SET prev_tid = 0
-                WHERE tid <= %(pack_tid)s
-                    AND prev_tid != 0
-                """
-                self._run_script_stmt(cursor, stmt, {'pack_tid': pack_tid})
-
-                # For each tid to be removed, delete the corresponding row in
-                # object_refs_added.
-                log.debug("pack: deleting from object_refs_added")
-                stmt = """
-                DELETE FROM object_refs_added
-                WHERE tid > 0
-                    AND tid <= %(pack_tid)s
-                    AND NOT EXISTS (
-                        SELECT 1
-                        FROM object_state
-                        WHERE tid = object_refs_added.tid
-                    )
-                """
-                self._run_script_stmt(cursor, stmt, {'pack_tid': pack_tid})
-
-                log.debug("pack: deleting transactions")
-                stmt = """
-                DELETE FROM transaction
-                WHERE tid > 0
-                    AND tid <= %(pack_tid)s
-                    AND NOT EXISTS (
-                        SELECT 1
-                        FROM object_state
-                        WHERE tid = transaction.tid
-                    )
-                """
-                self._run_script_stmt(cursor, stmt, {'pack_tid': pack_tid})
-
-                log.debug("pack: marking transactions as packed")
-                stmt = """
-                UPDATE transaction SET packed = %(TRUE)s
-                WHERE tid > 0
-                    AND tid <= %(pack_tid)s
-                    AND packed = %(FALSE)s
-                """
-                self._run_script_stmt(cursor, stmt, {'pack_tid': pack_tid})
-
                 log.debug("pack: clearing pack_object")
                 cursor.execute("DELETE FROM pack_object")
 
@@ -785,6 +732,79 @@
             self.close(conn, cursor)
 
 
+    def _pack_transaction(self, cursor, pack_tid, tid):
+        """Pack one transaction.  Requires populated pack tables."""
+        log.debug("pack: transaction %d: packing", tid)
+        deleted = 0
+        for table in ('object_ref', 'current_object', 'object_state'):
+            # Remove objects that are in pack_object and have keep
+            # set to false.
+            stmt = """
+            DELETE FROM %s
+            WHERE tid = %%(tid)s
+                AND zoid IN (
+                    SELECT zoid
+                    FROM pack_object
+                    WHERE keep = %%(FALSE)s
+                )
+            """ % table
+            self._run_script_stmt(cursor, stmt, {'tid': tid})
+            deleted += cursor.rowcount
+
+            if table != 'current_object':
+                # Cut the history of objects in pack_object that
+                # have keep set to true.
+                stmt = """
+                DELETE FROM %s
+                WHERE tid = %%(tid)s
+                    AND zoid IN (
+                        SELECT zoid
+                        FROM pack_object
+                        WHERE keep = %%(TRUE)s
+                    )
+                    AND tid < (
+                        SELECT keep_tid
+                        FROM pack_object
+                        WHERE zoid = %s.zoid
+                    )
+                """ % (table, table)
+                self._run_script_stmt(cursor, stmt, {'tid': tid})
+                deleted += cursor.rowcount
+
+        # Terminate prev_tid chains
+        stmt = """
+        UPDATE object_state SET prev_tid = 0
+        WHERE prev_tid = %(tid)s
+            AND tid <= %(pack_tid)s
+        """
+        self._run_script_stmt(cursor, stmt,
+            {'pack_tid': pack_tid, 'tid': tid})
+
+        # Find out whether the transaction can be removed
+        stmt = self._scripts['transaction_has_data']
+        self._run_script_stmt(cursor, stmt, {'tid': tid})
+        has_data = list(cursor)
+
+        if has_data:
+            stmt = """
+            UPDATE transaction SET packed = %(TRUE)s
+            WHERE tid = %(tid)s
+            """
+            self._run_script_stmt(cursor, stmt, {'tid': tid})
+
+        else:
+            stmt = """
+            DELETE FROM object_refs_added
+            WHERE tid = %(tid)s;
+            DELETE FROM transaction
+            WHERE tid = %(tid)s
+            """
+            self._run_script(cursor, stmt, {'tid': tid})
+            deleted += cursor.rowcount
+
+        log.debug("pack: transaction %d: removed %d row(s)", tid, deleted)
+
+
     def poll_invalidations(self, conn, cursor, prev_polled_tid, ignore_tid):
         """Polls for new transactions.
 

Modified: relstorage/trunk/relstorage/adapters/mysql.py
===================================================================
--- relstorage/trunk/relstorage/adapters/mysql.py	2008-04-01 01:51:37 UTC (rev 85050)
+++ relstorage/trunk/relstorage/adapters/mysql.py	2008-04-01 03:03:20 UTC (rev 85051)
@@ -104,6 +104,7 @@
             CHECK (tid > 0)
         ) ENGINE = InnoDB;
         CREATE INDEX object_state_tid ON object_state (tid);
+        CREATE INDEX object_state_prev_tid ON object_state (prev_tid);
 
         -- Pointers to the current object state
         CREATE TABLE current_object (
@@ -572,4 +573,13 @@
             raise StorageError("Unable to acquire commit lock")
 
 
+    def _release_commit_lock(self, cursor):
+        """Release the commit lock.  This is used during packing.
+
+        This overrides the method by the same name in common.Adapter.
+        """
+        stmt = "SELECT RELEASE_LOCK('relstorage.commit')"
+        cursor.execute(stmt)
+
+
     _poll_query = "SELECT tid FROM transaction ORDER BY tid DESC LIMIT 1"

Modified: relstorage/trunk/relstorage/adapters/oracle.py
===================================================================
--- relstorage/trunk/relstorage/adapters/oracle.py	2008-04-01 01:51:37 UTC (rev 85050)
+++ relstorage/trunk/relstorage/adapters/oracle.py	2008-04-01 03:03:20 UTC (rev 85051)
@@ -61,6 +61,12 @@
         'create_temp_pack_visit': None,
         'create_temp_undo': None,
         'reset_temp_undo': "DELETE FROM temp_undo",
+
+        'transaction_has_data': """
+            SELECT DISTINCT tid
+            FROM object_state
+            WHERE tid = %(tid)s
+            """,
     }
 
     def __init__(self, user, password, dsn, twophase=False, arraysize=64):
@@ -133,6 +139,7 @@
             state       BLOB
         );
         CREATE INDEX object_state_tid ON object_state (tid);
+        CREATE INDEX object_state_prev_tid ON object_state (prev_tid);
 
         -- Pointers to the current object state
         CREATE TABLE current_object (

Modified: relstorage/trunk/relstorage/adapters/postgresql.py
===================================================================
--- relstorage/trunk/relstorage/adapters/postgresql.py	2008-04-01 01:51:37 UTC (rev 85050)
+++ relstorage/trunk/relstorage/adapters/postgresql.py	2008-04-01 03:03:20 UTC (rev 85051)
@@ -66,6 +66,7 @@
             state       BYTEA
         );
         CREATE INDEX object_state_tid ON object_state (tid);
+        CREATE INDEX object_state_prev_tid ON object_state (prev_tid);
 
         -- Pointers to the current object state
         CREATE TABLE current_object (

Added: relstorage/trunk/relstorage/tests/packstresstest.py
===================================================================
--- relstorage/trunk/relstorage/tests/packstresstest.py	                        (rev 0)
+++ relstorage/trunk/relstorage/tests/packstresstest.py	2008-04-01 03:03:20 UTC (rev 85051)
@@ -0,0 +1,51 @@
+
+import logging
+
+from ZODB.DB import DB
+from relstorage.adapters.postgresql import PostgreSQLAdapter
+from relstorage.relstorage import RelStorage
+import transaction
+from persistent.mapping import PersistentMapping
+import random
+
+logging.basicConfig()
+logging.getLogger().setLevel(logging.DEBUG)
+
+a = PostgreSQLAdapter(dsn="dbname='packtest'")
+s = RelStorage(a)
+d = DB(s)
+c = d.open()
+
+print 'size:'
+print d.getSize()
+
+if 0:
+    print 'initializing...'
+    container = PersistentMapping()
+    c.root()['container'] = container
+    container_size = 10000
+    for i in range(container_size):
+        container[i] = PersistentMapping()
+    transaction.commit()
+
+    print 'generating transactions...'
+    for trans in range(10000):
+        print trans
+        sources = (random.randint(0, container_size - 1) for j in range(100))
+        for source in sources:
+            obj = container[source]
+            obj[trans] = container[random.randint(0, container_size - 1)]
+        transaction.commit()
+
+    print 'size:'
+    print d.getSize()
+
+    import pdb; pdb.set_trace()
+
+print 'packing...'
+d.pack()
+
+print 'size:'
+print d.getSize()
+
+d.close()



More information about the Checkins mailing list