[Checkins] SVN: relstorage/branches/1.4.0-fastimport/ Initial changes to add a `--single-transaction` parameter to zodbconvert.

David Blewett davidb at sixfeetup.com
Mon Jul 19 14:52:13 EDT 2010


Log message for revision 114854:
  Initial changes to add a `--single-transaction` parameter to zodbconvert.
  

Changed:
  U   relstorage/branches/1.4.0-fastimport/README.txt
  U   relstorage/branches/1.4.0-fastimport/relstorage/storage.py
  U   relstorage/branches/1.4.0-fastimport/relstorage/zodbconvert.py

-=-
Modified: relstorage/branches/1.4.0-fastimport/README.txt
===================================================================
--- relstorage/branches/1.4.0-fastimport/README.txt	2010-07-19 18:20:38 UTC (rev 114853)
+++ relstorage/branches/1.4.0-fastimport/README.txt	2010-07-19 18:52:13 UTC (rev 114854)
@@ -302,7 +302,12 @@
     Opens both storages and analyzes what would be copied, but does not
     actually copy.
 
+  ``--single-transaction``
+    Import into the destination in a single transaction, instead of one
+    transaction per transaction in the source. This option can
+    significantly speed up conversion times on PostgreSQL.
 
+
 Migrating to a new version of RelStorage
 ----------------------------------------
 

Modified: relstorage/branches/1.4.0-fastimport/relstorage/storage.py
===================================================================
--- relstorage/branches/1.4.0-fastimport/relstorage/storage.py	2010-07-19 18:20:38 UTC (rev 114853)
+++ relstorage/branches/1.4.0-fastimport/relstorage/storage.py	2010-07-19 18:52:13 UTC (rev 114854)
@@ -38,6 +38,7 @@
 import tempfile
 import threading
 import time
+import transaction
 import weakref
 import ZODB.interfaces
 
@@ -79,8 +80,10 @@
     """Storage to a relational database, based on invalidation polling"""
     implements(*_relstorage_interfaces)
 
-    _transaction=None # Transaction that is being committed
-    _tstatus=' '      # Transaction status, used for copying data
+    _transaction=None   # Transaction that is being committed
+    _transactions=None  # List of pending sub-transactions when running
+                        #  a single-transaction import
+    _tstatus=' '        # Transaction status, used for copying data
     _is_read_only = False
 
     # load_conn and load_cursor are open most of the time.
@@ -578,14 +581,20 @@
             self._lock_release()
 
 
-    def restore(self, oid, serial, data, version, prev_txn, transaction):
+    def restore(self, oid, serial, data, version, prev_txn, transaction,
+                transaction_position=None):
         # Like store(), but used for importing transactions.  See the
         # comments in FileStorage.restore().  The prev_txn optimization
         # is not used.
         if self._is_read_only:
             raise POSException.ReadOnlyError()
-        if transaction is not self._transaction:
-            raise POSException.StorageTransactionError(self, transaction)
+        if self._transactions:
+            if transaction_position is None or \
+               transaction.tid != self._transactions[transaction_position]:
+                raise POSException.StorageTransactionError(self, transaction)
+        else:
+            if transaction is not self._transaction:
+                raise POSException.StorageTransactionError(self, transaction)
         if version:
             raise POSException.Unsupported("Versions aren't supported")
 
@@ -624,14 +633,6 @@
             self._clear_temp()
             self._transaction = transaction
 
-            user = str(transaction.user)
-            desc = str(transaction.description)
-            ext = transaction._extension
-            if ext:
-                ext = cPickle.dumps(ext, 1)
-            else:
-                ext = ""
-            self._ude = user, desc, ext
             self._tstatus = status
 
             self._restart_store()
@@ -642,19 +643,11 @@
 
             if tid is not None:
                 # hold the commit lock and add the transaction now
-                cursor = self._store_cursor
-                packed = (status == 'p')
-                adapter.locker.hold_commit_lock(cursor, ensure_current=True)
-                tid_int = u64(tid)
-                try:
-                    adapter.txncontrol.add_transaction(
-                        cursor, tid_int, user, desc, ext, packed)
-                except:
-                    self._drop_store_connection()
-                    raise
+                self._ude = self._add_transaction(transaction)
+            else:
+                self._ude = self._get_ude(transaction)
             # else choose the tid later
             self._tid = tid
-
         finally:
             self._lock_release()
 
@@ -1361,10 +1354,63 @@
                 ZODB.blob.remove_committed(old_filename)
         self._txn_blobs[oid] = filename
 
-    def copyTransactionsFrom(self, other):
+    def _get_ude(self, txn):
+        user = str(txn.user)
+        desc = str(txn.description)
+        ext = txn._extension
+        if ext:
+            ext = cPickle.dumps(ext, 1)
+        else:
+            ext = ""
+        return user, desc, ext
+
+    def _add_transaction(self, txn, hold_lock=True):
+        """Function to add a transaction. Assumes locks have already been taken.
+        This function is used both by tpc_begin and when restoring multiple source
+        transactions in a single destination transaction. When called in the latter
+        context, we want to avoid any modifications to external variables, etc.
+        """
+        user, desc, ext = self._get_ude(txn)
+        if not user:
+            user = '""'
+        if not desc:
+            desc = '""'
+        adapter = self._adapter
+        cursor = self._store_cursor
+        packed = (txn.status == 'p')
+        if hold_lock:
+            adapter.locker.hold_commit_lock(cursor, ensure_current=True)
+        tid_int = u64(txn.tid)
+        try:
+            adapter.txncontrol.add_transaction(
+                cursor, tid_int, user, desc, ext, packed)
+        except:
+            if hold_lock:
+                self._drop_store_connection()
+                raise
+        return user, desc, ext
+
+    def copyTransactionsFrom(self, other, single_transaction=False):
+        begin_time = time.time()
+        if single_transaction:
+            master_txn = transaction.Transaction()
+            master_txn.tid = p64(1)
+            master_txn.description = 'zodbconvert run on: %s' % time.strftime('%Y-%m-%d.%H:%M:%S')
+            self.tpc_begin(master_txn, master_txn.tid)
+        txnum = 0
+        tx_size = 0
+        self._transactions = []
+        for trans in other.iterator():
+            self._transactions.append(trans.tid)
+            if single_transaction:
+                self._add_transaction(trans, hold_lock=False)
+
+        num_txns = len(self._transactions)
         # adapted from ZODB.blob.BlobStorageMixin
         for trans in other.iterator():
-            self.tpc_begin(trans, trans.tid, trans.status)
+            if not single_transaction:
+                self.tpc_begin(trans, trans.tid, trans.status)
+            num_txn_records = 0
             for record in trans:
                 blobfilename = None
                 if self.fshelper is not None:
@@ -1383,11 +1429,33 @@
                                      name, record.data_txn, trans)
                 else:
                     self.restore(record.oid, record.tid, record.data,
-                                 '', record.data_txn, trans)
+                                 '', record.data_txn, trans, txnum)
+                num_txn_records += 1
+                tx_size += len(record.data)
+            txnum += 1
+            tx_end = time.time()
+            pct_complete = (txnum/float(num_txns))*100
+            if pct_complete < 10:
+                pct_complete = '  %1.2f%%' % pct_complete
+            elif pct_complete < 100:
+                pct_complete = ' %1.2f%%' % pct_complete
+            rate = (tx_size/float(1024*1024)) / (tx_end - begin_time)
+            #if single_transaction:
+            #    self._batcher.flush()
+            #else:
+            if not single_transaction:
+                self.tpc_vote(trans)
+                self.tpc_finish(trans)
+            #write("Restored tid %d,%5d records | %1.3fmB/s (%6d/%6d, %.2f%%)\n" %
+            #      (u64(trans.tid), num_txn_records, rate, txnum, num_txns, pct_complete))
+            log.info("Restored tid %d,%5d records | %1.3f MB/s (%6d/%6d,%s)",
+                     u64(trans.tid), num_txn_records, rate, txnum, num_txns, pct_complete)
+        if single_transaction:
+            self.tpc_vote(master_txn)
+            self.tpc_finish(master_txn)
+            self._transactions = None
+        return txnum, tx_size, tx_end - begin_time
 
-            self.tpc_vote(trans)
-            self.tpc_finish(trans)
-
     # The propagate_invalidations flag implements the old
     # invalidation polling API and is not otherwise used. Set to a
     # false value, it tells the Connection not to propagate object

Modified: relstorage/branches/1.4.0-fastimport/relstorage/zodbconvert.py
===================================================================
--- relstorage/branches/1.4.0-fastimport/relstorage/zodbconvert.py	2010-07-19 18:20:38 UTC (rev 114853)
+++ relstorage/branches/1.4.0-fastimport/relstorage/zodbconvert.py	2010-07-19 18:52:13 UTC (rev 114854)
@@ -17,8 +17,10 @@
 See README.txt for details.
 """
 
+import logging
 import optparse
 from persistent.TimeStamp import TimeStamp
+from relstorage.storage import RelStorage
 from StringIO import StringIO
 import sys
 import ZConfig
@@ -35,6 +37,8 @@
 </schema>
 """
 
+log = logging.getLogger("relstorage.zodbconvert")
+logging.basicConfig(level=logging.INFO)
 
 def storage_has_data(storage):
     i = storage.iterator()
@@ -59,29 +63,41 @@
     parser.add_option(
         "--clear", dest="clear", action="store_true",
         help="Clear the contents of the destination storage before copying")
-    parser.set_defaults(dry_run=False, clear=False)
+    parser.add_option(
+        "--single-transaction", dest="single_transaction", action="store_true",
+        help="Convert the source into the destination in a single transaction")
+    parser.add_option(
+        "--batch-size", dest="batch_size", type="int", action="store",
+        help="Batch size to use when converting")
+    parser.set_defaults(dry_run=False, clear=False, single_transaction=False, batch_size=250)
     options, args = parser.parse_args(argv[1:])
 
     if len(args) != 1:
         parser.error("The name of one configuration file is required.")
 
+
     schema = ZConfig.loadSchemaFile(StringIO(schema_xml))
     config, handler = ZConfig.loadConfig(schema, args[0])
     source = config.source.open()
     destination = config.destination.open()
+    destination._batcher_row_limit = options.batch_size
 
-    write("Storages opened successfully.\n")
+    #write("Storages opened successfully.\n")
+    log.info("Storages opened successfully.")
 
     if options.dry_run:
-        write("Dry run mode: not changing the destination.\n")
+        #write("Dry run mode: not changing the destination.\n")
+        log.info("Dry run mode: not changing the destination.")
         if storage_has_data(destination):
-            write("Warning: the destination storage has data\n")
+            #write("Warning: the destination storage has data\n")
+            log.warning("The destination storage has data.")
         count = 0
         for txn in source.iterator():
             write('%s user=%s description=%s\n' % (
                 TimeStamp(txn.tid), txn.user, txn.description))
             count += 1
-        write("Would copy %d transactions.\n" % count)
+        #write("Would copy %d transactions.\n" % count)
+        log.info("Would copy %d transactions.", count)
 
     else:
         if options.clear:
@@ -96,12 +112,25 @@
             msg = "Error: the destination storage has data.  Try --clear."
             sys.exit(msg)
 
-        destination.copyTransactionsFrom(source)
+        copy_args = [source]
+        if issubclass(destination.__class__, RelStorage):
+            copy_args.append(options.single_transaction)
+        num_txns, size, elapsed = destination.copyTransactionsFrom(*copy_args)
 
-        source.close()
+        try:
+            source.close()
+        except IOError:
+            #We don't mind if the source throws errors like:
+            #ERROR:ZODB.FileStorage:Error saving index on close()
+            #IOError: [Errno 13] Permission denied: '/path/to/db'
+            pass
         destination.close()
 
-        write('All transactions copied successfully.\n')
+        rate = (size/float(1024*1024)) / elapsed
+        #write('All %d transactions copied successfully in %4.1f minutes at %1.3fmB/s.\n' %
+        #      (num_txns, elapsed/60, rate))
+        log.info('All %d transactions copied successfully in %4.1f minutes at %1.3fmB/s.',
+                 num_txns, elapsed/60, rate)
 
 
 if __name__ == '__main__':



More information about the checkins mailing list