[Checkins] SVN: relstorage/trunk/ Divided the implementation of database adapters into many small

Shane Hathaway shane at hathawaymix.org
Thu Sep 24 14:06:38 EDT 2009


Log message for revision 104499:
  Divided the implementation of database adapters into many small
  objects, making the adapter code more modular.  Added interfaces
  that describe the duties of each part.
  
  (Without the new modularity, adding history-free storage would be a
  brain-busting combinatorial mess.)
  

Changed:
  U   relstorage/trunk/CHANGES.txt
  U   relstorage/trunk/relstorage/adapters/interfaces.py
  U   relstorage/trunk/relstorage/adapters/mysql.py
  U   relstorage/trunk/relstorage/adapters/oracle.py
  U   relstorage/trunk/relstorage/adapters/postgresql.py
  U   relstorage/trunk/relstorage/adapters/txncontrol.py
  U   relstorage/trunk/relstorage/config.py
  U   relstorage/trunk/relstorage/relstorage.py
  U   relstorage/trunk/relstorage/tests/reltestbase.py
  U   relstorage/trunk/relstorage/tests/speedtest.py

-=-
Modified: relstorage/trunk/CHANGES.txt
===================================================================
--- relstorage/trunk/CHANGES.txt	2009-09-24 18:00:10 UTC (rev 104498)
+++ relstorage/trunk/CHANGES.txt	2009-09-24 18:06:37 UTC (rev 104499)
@@ -2,6 +2,12 @@
 Unreleased
 ----------
 
+- Divided the implementation of database adapters into many small
+  objects, making the adapter code more modular.  Added interfaces
+  that describe the duties of each part.
+
+- Oracle: sped up restore operations by sending short blobs inline
+
 - PostgreSQL: use the documented ALTER SEQUENCE RESTART WITH
   statement instead of ALTER SEQUENCE START WITH.
 

Modified: relstorage/trunk/relstorage/adapters/interfaces.py
===================================================================
--- relstorage/trunk/relstorage/adapters/interfaces.py	2009-09-24 18:00:10 UTC (rev 104498)
+++ relstorage/trunk/relstorage/adapters/interfaces.py	2009-09-24 18:06:37 UTC (rev 104499)
@@ -16,7 +16,25 @@
 from zope.interface import Attribute
 from zope.interface import Interface
 
+class IRelStorageAdapter(Interface):
+    """A database adapter for RelStorage"""
+
+    connmanager = Attribute("An IConnectionManager")
+    dbiter = Attribute("An IDatabaseIterator")
+    keep_history = Attribute("True if this adapter supports undo")
+    locker = Attribute("An ILocker")
+    mover = Attribute("An IObjectMover")
+    oidallocator = Attribute("An IOIDAllocator")
+    packundo = Attribute("An IPackUndo")
+    poller = Attribute("An IPoller")
+    runner = Attribute("An IScriptRunner")
+    schema = Attribute("An ISchemaInstaller")
+    stats = Attribute("An IStats")
+    txncontrol = Attribute("An ITransactionControl")
+
+
 class IConnectionManager(Interface):
+    """Open and close database connections"""
 
     def open():
         """Open a database connection and return (conn, cursor)."""
@@ -43,6 +61,8 @@
     def restart_load(conn, cursor):
         """Reinitialize a connection for loading objects.
 
+        This gets called when polling the database, so it needs to be quick.
+
         Raise StorageError if the database has disconnected.
         """
 
@@ -60,6 +80,7 @@
 
 
 class IDatabaseIterator(Interface):
+    """Iterate over the available data in the database"""
 
     def iter_objects(cursor, tid):
         """Iterate over object states in a transaction.
@@ -92,6 +113,7 @@
 
 
 class ILocker(Interface):
+    """Acquire and release the commit and pack locks."""
 
     def hold_commit_lock(cursor, ensure_current=False):
         """Acquire the commit lock.
@@ -117,7 +139,7 @@
 
 
 class IObjectMover(Interface):
-    """Moves object states to/from the database and within the database"""
+    """Move object states to/from the database and within the database."""
 
     def get_current_tid(cursor, oid):
         """Returns the current integer tid for an object.
@@ -195,15 +217,17 @@
 
 
 class IOIDAllocator(Interface):
+    """Allocate OIDs and control future allocation"""
 
+    def new_oid(cursor):
+        """Return a new, unused OID."""
+
     def set_min_oid(cursor, oid):
         """Ensure the next OID is at least the given OID."""
 
-    def new_oid(cursor):
-        """Return a new, unused OID."""
 
-
 class IPackUndo(Interface):
+    """Perform pack and undo operations"""
 
     def verify_undoable(cursor, undo_tid):
         """Raise UndoError if it is not safe to undo the specified txn.
@@ -264,6 +288,7 @@
 
 
 class IPoller(Interface):
+    """Poll for new data"""
 
     def poll_invalidations(conn, cursor, prev_polled_tid, ignore_tid):
         """Polls for new transactions.
@@ -279,6 +304,7 @@
 
 
 class ISchemaInstaller(Interface):
+    """Install the schema in the database, clear it, or uninstall it"""
 
     def create(cursor):
         """Create the database tables, sequences, etc."""
@@ -294,7 +320,13 @@
 
 
 class IScriptRunner(Interface):
+    """Run database-agnostic SQL scripts.
 
+    Using an IScriptRunner is appropriate for batch operations and
+    uncommon operations that can be slow, but is not appropriate
+    for performance-critical code.
+    """
+
     script_vars = Attribute(
         """A mapping providing replacements for parts of scripts.
 
@@ -328,9 +360,23 @@
         stmt should use '%s' parameter format (not %(name)s).
         """
 
+    # Note: the Oracle implementation also provides run_lob_stmt, which
+    # is useful for reading LOBs from the database quickly.
 
+
 class ITransactionControl(Interface):
+    """Begin, commit, and abort transactions."""
 
+    def get_tid_and_time(cursor):
+        """Returns the most recent tid and the current database time.
+
+        The database time is the number of seconds since the epoch.
+        """
+
+    def add_transaction(cursor, tid, username, description, extension,
+            packed=False):
+        """Add a transaction."""
+
     def commit_phase1(conn, cursor, tid):
         """Begin a commit.  Returns the transaction name.
 
@@ -350,13 +396,3 @@
     def abort(conn, cursor, txn=None):
         """Abort the commit.  If txn is not None, phase 1 is also aborted."""
 
-    def get_tid_and_time(cursor):
-        """Returns the most recent tid and the current database time.
-
-        The database time is the number of seconds since the epoch.
-        """
-
-    def add_transaction(cursor, tid, username, description, extension,
-            packed=False):
-        """Add a transaction."""
-

Modified: relstorage/trunk/relstorage/adapters/mysql.py
===================================================================
--- relstorage/trunk/relstorage/adapters/mysql.py	2009-09-24 18:00:10 UTC (rev 104498)
+++ relstorage/trunk/relstorage/adapters/mysql.py	2009-09-24 18:06:37 UTC (rev 104499)
@@ -50,10 +50,12 @@
 
 import logging
 import MySQLdb
+from zope.interface import implements
 
 from relstorage.adapters.connmanager import AbstractConnectionManager
 from relstorage.adapters.dbiter import HistoryPreservingDatabaseIterator
 from relstorage.adapters.hpmover import HistoryPreservingObjectMover
+from relstorage.adapters.interfaces import IRelStorageAdapter
 from relstorage.adapters.locker import MySQLLocker
 from relstorage.adapters.oidallocator import MySQLOIDAllocator
 from relstorage.adapters.packundo import HistoryPreservingPackUndo
@@ -76,6 +78,7 @@
 
 class MySQLAdapter(object):
     """MySQL adapter for RelStorage."""
+    implements(IRelStorageAdapter)
 
     keep_history = True
 
@@ -115,64 +118,7 @@
             connmanager=self.connmanager,
             )
 
-        self.open = self.connmanager.open
-        self.close = self.connmanager.close
-        self.open_for_load = self.connmanager.open_for_load
-        self.restart_load = self.connmanager.restart_load
-        self.open_for_store = self.connmanager.open_for_store
-        self.restart_store = self.connmanager.restart_store
 
-        self.hold_commit_lock = self.locker.hold_commit_lock
-        self.release_commit_lock = self.locker.release_commit_lock
-        self.hold_pack_lock = self.locker.hold_pack_lock
-        self.release_pack_lock = self.locker.release_pack_lock
-
-        self.create_schema = self.schema.create
-        self.prepare_schema = self.schema.prepare
-        self.zap_all = self.schema.zap_all
-        self.drop_all = self.schema.drop_all
-
-        self.get_current_tid = self.mover.get_current_tid
-        self.load_current = self.mover.load_current
-        self.load_revision = self.mover.load_revision
-        self.exists = self.mover.exists
-        self.load_before = self.mover.load_before
-        self.get_object_tid_after = self.mover.get_object_tid_after
-        self.store_temp = self.mover.store_temp
-        self.replace_temp = self.mover.replace_temp
-        self.restore = self.mover.restore
-        self.detect_conflict = self.mover.detect_conflict
-        self.move_from_temp = self.mover.move_from_temp
-        self.update_current = self.mover.update_current
-
-        self.set_min_oid = self.oidallocator.set_min_oid
-        self.new_oid = self.oidallocator.new_oid
-
-        self.get_tid_and_time = self.txncontrol.get_tid_and_time
-        self.add_transaction = self.txncontrol.add_transaction
-        self.commit_phase1 = self.txncontrol.commit_phase1
-        self.commit_phase2 = self.txncontrol.commit_phase2
-        self.abort = self.txncontrol.abort
-
-        self.poll_invalidations = self.poller.poll_invalidations
-
-        self.fill_object_refs = self.packundo.fill_object_refs
-        self.open_for_pre_pack = self.packundo.open_for_pre_pack
-        self.choose_pack_transaction = self.packundo.choose_pack_transaction
-        self.pre_pack = self.packundo.pre_pack
-        self.pack = self.packundo.pack
-        self.verify_undoable = self.packundo.verify_undoable
-        self.undo = self.packundo.undo
-
-        self.iter_objects = self.dbiter.iter_objects
-        self.iter_transactions = self.dbiter.iter_transactions
-        self.iter_transactions_range = self.dbiter.iter_transactions_range
-        self.iter_object_history = self.dbiter.iter_object_history
-
-        self.get_object_count = self.stats.get_object_count
-        self.get_db_size = self.stats.get_db_size
-
-
 class MySQLdbConnectionManager(AbstractConnectionManager):
 
     isolation_read_committed = "ISOLATION LEVEL READ COMMITTED"

Modified: relstorage/trunk/relstorage/adapters/oracle.py
===================================================================
--- relstorage/trunk/relstorage/adapters/oracle.py	2009-09-24 18:00:10 UTC (rev 104498)
+++ relstorage/trunk/relstorage/adapters/oracle.py	2009-09-24 18:06:37 UTC (rev 104499)
@@ -16,10 +16,12 @@
 import logging
 import cx_Oracle
 from ZODB.POSException import StorageError
+from zope.interface import implements
 
 from relstorage.adapters.connmanager import AbstractConnectionManager
 from relstorage.adapters.dbiter import HistoryPreservingDatabaseIterator
 from relstorage.adapters.hpmover import HistoryPreservingObjectMover
+from relstorage.adapters.interfaces import IRelStorageAdapter
 from relstorage.adapters.locker import OracleLocker
 from relstorage.adapters.oidallocator import OracleOIDAllocator
 from relstorage.adapters.packundo import OracleHistoryPreservingPackUndo
@@ -45,6 +47,7 @@
 
 class OracleAdapter(object):
     """Oracle adapter for RelStorage."""
+    implements(IRelStorageAdapter)
 
     keep_history = True
 
@@ -94,6 +97,7 @@
             )
         self.txncontrol = OracleTransactionControl(
             Binary=cx_Oracle.Binary,
+            twophase=bool(twophase),
             )
         self.poller = Poller(
             poll_query="SELECT MAX(tid) FROM transaction",
@@ -112,64 +116,7 @@
             connmanager=self.connmanager,
             )
 
-        self.open = self.connmanager.open
-        self.close = self.connmanager.close
-        self.open_for_load = self.connmanager.open_for_load
-        self.restart_load = self.connmanager.restart_load
-        self.open_for_store = self.connmanager.open_for_store
-        self.restart_store = self.connmanager.restart_store
 
-        self.hold_commit_lock = self.locker.hold_commit_lock
-        self.release_commit_lock = self.locker.release_commit_lock
-        self.hold_pack_lock = self.locker.hold_pack_lock
-        self.release_pack_lock = self.locker.release_pack_lock
-
-        self.create_schema = self.schema.create
-        self.prepare_schema = self.schema.prepare
-        self.zap_all = self.schema.zap_all
-        self.drop_all = self.schema.drop_all
-
-        self.get_current_tid = self.mover.get_current_tid
-        self.load_current = self.mover.load_current
-        self.load_revision = self.mover.load_revision
-        self.exists = self.mover.exists
-        self.load_before = self.mover.load_before
-        self.get_object_tid_after = self.mover.get_object_tid_after
-        self.store_temp = self.mover.store_temp
-        self.replace_temp = self.mover.replace_temp
-        self.restore = self.mover.restore
-        self.detect_conflict = self.mover.detect_conflict
-        self.move_from_temp = self.mover.move_from_temp
-        self.update_current = self.mover.update_current
-
-        self.set_min_oid = self.oidallocator.set_min_oid
-        self.new_oid = self.oidallocator.new_oid
-
-        self.get_tid_and_time = self.txncontrol.get_tid_and_time
-        self.add_transaction = self.txncontrol.add_transaction
-        self.commit_phase1 = self.txncontrol.commit_phase1
-        self.commit_phase2 = self.txncontrol.commit_phase2
-        self.abort = self.txncontrol.abort
-
-        self.poll_invalidations = self.poller.poll_invalidations
-
-        self.fill_object_refs = self.packundo.fill_object_refs
-        self.open_for_pre_pack = self.packundo.open_for_pre_pack
-        self.choose_pack_transaction = self.packundo.choose_pack_transaction
-        self.pre_pack = self.packundo.pre_pack
-        self.pack = self.packundo.pack
-        self.verify_undoable = self.packundo.verify_undoable
-        self.undo = self.packundo.undo
-
-        self.iter_objects = self.dbiter.iter_objects
-        self.iter_transactions = self.dbiter.iter_transactions
-        self.iter_transactions_range = self.dbiter.iter_transactions_range
-        self.iter_object_history = self.dbiter.iter_object_history
-
-        self.get_object_count = self.stats.get_object_count
-        self.get_db_size = self.stats.get_db_size
-
-
 class CXOracleScriptRunner(OracleScriptRunner):
 
     def __init__(self, use_inline_lobs):

Modified: relstorage/trunk/relstorage/adapters/postgresql.py
===================================================================
--- relstorage/trunk/relstorage/adapters/postgresql.py	2009-09-24 18:00:10 UTC (rev 104498)
+++ relstorage/trunk/relstorage/adapters/postgresql.py	2009-09-24 18:06:37 UTC (rev 104499)
@@ -16,10 +16,12 @@
 import logging
 import psycopg2
 import psycopg2.extensions
+from zope.interface import implements
 
 from relstorage.adapters.connmanager import AbstractConnectionManager
 from relstorage.adapters.dbiter import HistoryPreservingDatabaseIterator
 from relstorage.adapters.hpmover import HistoryPreservingObjectMover
+from relstorage.adapters.interfaces import IRelStorageAdapter
 from relstorage.adapters.locker import PostgreSQLLocker
 from relstorage.adapters.oidallocator import PostgreSQLOIDAllocator
 from relstorage.adapters.packundo import HistoryPreservingPackUndo
@@ -44,6 +46,7 @@
 
 class PostgreSQLAdapter(object):
     """PostgreSQL adapter for RelStorage."""
+    implements(IRelStorageAdapter)
 
     keep_history = True
 
@@ -81,64 +84,7 @@
             connmanager=self.connmanager,
             )
 
-        self.open = self.connmanager.open
-        self.close = self.connmanager.close
-        self.open_for_load = self.connmanager.open_for_load
-        self.restart_load = self.connmanager.restart_load
-        self.open_for_store = self.connmanager.open_for_store
-        self.restart_store = self.connmanager.restart_store
 
-        self.hold_commit_lock = self.locker.hold_commit_lock
-        self.release_commit_lock = self.locker.release_commit_lock
-        self.hold_pack_lock = self.locker.hold_pack_lock
-        self.release_pack_lock = self.locker.release_pack_lock
-
-        self.create_schema = self.schema.create
-        self.prepare_schema = self.schema.prepare
-        self.zap_all = self.schema.zap_all
-        self.drop_all = self.schema.drop_all
-
-        self.get_current_tid = self.mover.get_current_tid
-        self.load_current = self.mover.load_current
-        self.load_revision = self.mover.load_revision
-        self.exists = self.mover.exists
-        self.load_before = self.mover.load_before
-        self.get_object_tid_after = self.mover.get_object_tid_after
-        self.store_temp = self.mover.store_temp
-        self.replace_temp = self.mover.replace_temp
-        self.restore = self.mover.restore
-        self.detect_conflict = self.mover.detect_conflict
-        self.move_from_temp = self.mover.move_from_temp
-        self.update_current = self.mover.update_current
-
-        self.set_min_oid = self.oidallocator.set_min_oid
-        self.new_oid = self.oidallocator.new_oid
-
-        self.get_tid_and_time = self.txncontrol.get_tid_and_time
-        self.add_transaction = self.txncontrol.add_transaction
-        self.commit_phase1 = self.txncontrol.commit_phase1
-        self.commit_phase2 = self.txncontrol.commit_phase2
-        self.abort = self.txncontrol.abort
-
-        self.poll_invalidations = self.poller.poll_invalidations
-
-        self.fill_object_refs = self.packundo.fill_object_refs
-        self.open_for_pre_pack = self.packundo.open_for_pre_pack
-        self.choose_pack_transaction = self.packundo.choose_pack_transaction
-        self.pre_pack = self.packundo.pre_pack
-        self.pack = self.packundo.pack
-        self.verify_undoable = self.packundo.verify_undoable
-        self.undo = self.packundo.undo
-
-        self.iter_objects = self.dbiter.iter_objects
-        self.iter_transactions = self.dbiter.iter_transactions
-        self.iter_transactions_range = self.dbiter.iter_transactions_range
-        self.iter_object_history = self.dbiter.iter_object_history
-
-        self.get_object_count = self.stats.get_object_count
-        self.get_db_size = self.stats.get_db_size
-
-
 class Psycopg2ConnectionManager(AbstractConnectionManager):
 
     isolation_read_committed = (

Modified: relstorage/trunk/relstorage/adapters/txncontrol.py
===================================================================
--- relstorage/trunk/relstorage/adapters/txncontrol.py	2009-09-24 18:00:10 UTC (rev 104498)
+++ relstorage/trunk/relstorage/adapters/txncontrol.py	2009-09-24 18:06:37 UTC (rev 104499)
@@ -125,7 +125,7 @@
 class OracleTransactionControl(TransactionControl):
     implements(ITransactionControl)
 
-    def __init__(self, Binary, twophase=False):
+    def __init__(self, Binary, twophase):
         self.Binary = Binary
         self.twophase = twophase
 

Modified: relstorage/trunk/relstorage/config.py
===================================================================
--- relstorage/trunk/relstorage/config.py	2009-09-24 18:00:10 UTC (rev 104498)
+++ relstorage/trunk/relstorage/config.py	2009-09-24 18:06:37 UTC (rev 104499)
@@ -22,7 +22,7 @@
     """Open a storage configured via ZConfig"""
     def open(self):
         config = self.config
-        adapter = config.adapter.open()
+        adapter = config.adapter.create()
         options = Options()
         for key in options.__dict__.keys():
             value = getattr(config, key, None)
@@ -33,20 +33,20 @@
 
 
 class PostgreSQLAdapterFactory(BaseConfig):
-    def open(self):
+    def create(self):
         from adapters.postgresql import PostgreSQLAdapter
         return PostgreSQLAdapter(self.config.dsn)
 
 
 class OracleAdapterFactory(BaseConfig):
-    def open(self):
+    def create(self):
         from adapters.oracle import OracleAdapter
         config = self.config
         return OracleAdapter(config.user, config.password, config.dsn)
 
 
 class MySQLAdapterFactory(BaseConfig):
-    def open(self):
+    def create(self):
         from adapters.mysql import MySQLAdapter
         options = {}
         for key in self.config.getSectionAttributes():

Modified: relstorage/trunk/relstorage/relstorage.py
===================================================================
--- relstorage/trunk/relstorage/relstorage.py	2009-09-24 18:00:10 UTC (rev 104498)
+++ relstorage/trunk/relstorage/relstorage.py	2009-09-24 18:06:37 UTC (rev 104499)
@@ -95,7 +95,7 @@
         self._cache_client = None
 
         if create:
-            self._adapter.prepare_schema()
+            self._adapter.schema.prepare()
 
         # load_conn and load_cursor are open most of the time.
         self._load_conn = None
@@ -168,7 +168,7 @@
 
     def _open_load_connection(self):
         """Open the load connection to the database.  Return nothing."""
-        conn, cursor = self._adapter.open_for_load()
+        conn, cursor = self._adapter.connmanager.open_for_load()
         self._drop_load_connection()
         self._load_conn, self._load_cursor = conn, cursor
         self._load_transaction_open = True
@@ -177,7 +177,7 @@
         """Unconditionally drop the load connection"""
         conn, cursor = self._load_conn, self._load_cursor
         self._load_conn, self._load_cursor = None, None
-        self._adapter.close(conn, cursor)
+        self._adapter.connmanager.close(conn, cursor)
         self._load_transaction_open = False
 
     def _rollback_load_connection(self):
@@ -195,7 +195,7 @@
             self._open_load_connection()
         else:
             try:
-                self._adapter.restart_load(
+                self._adapter.connmanager.restart_load(
                     self._load_conn, self._load_cursor)
             except POSException.StorageError, e:
                 log.warning("Reconnecting load_conn: %s", e)
@@ -212,7 +212,7 @@
 
     def _open_store_connection(self):
         """Open the store connection to the database.  Return nothing."""
-        conn, cursor = self._adapter.open_for_store()
+        conn, cursor = self._adapter.connmanager.open_for_store()
         self._drop_store_connection()
         self._store_conn, self._store_cursor = conn, cursor
 
@@ -220,7 +220,7 @@
         """Unconditionally drop the store connection"""
         conn, cursor = self._store_conn, self._store_cursor
         self._store_conn, self._store_cursor = None, None
-        self._adapter.close(conn, cursor)
+        self._adapter.connmanager.close(conn, cursor)
 
     def _restart_store(self):
         """Restart the store connection, creating a new connection if needed"""
@@ -228,7 +228,7 @@
             self._open_store_connection()
         else:
             try:
-                self._adapter.restart_store(
+                self._adapter.connmanager.restart_store(
                     self._store_conn, self._store_cursor)
             except POSException.StorageError, e:
                 log.warning("Reconnecting store_conn: %s", e)
@@ -247,7 +247,7 @@
 
         Used by the test suite and the ZODBConvert script.
         """
-        self._adapter.zap_all()
+        self._adapter.schema.zap_all()
         self._rollback_load_connection()
         cache = self._cache_client
         if cache is not None:
@@ -289,11 +289,11 @@
         return other
 
     def __len__(self):
-        return self._adapter.get_object_count()
+        return self._adapter.stats.get_object_count()
 
     def getSize(self):
         """Return database size in bytes"""
-        return self._adapter.get_db_size()
+        return self._adapter.stats.get_db_size()
 
     def _log_keyerror(self, oid_int, reason):
         """Log just before raising KeyError in load().
@@ -305,7 +305,7 @@
         adapter = self._adapter
         logfunc = log.warning
         msg = ["Storage KeyError on oid %d: %s" % (oid_int, reason)]
-        rows = adapter.iter_transactions(cursor)
+        rows = adapter.dbiter.iter_transactions(cursor)
         row = None
         for row in rows:
             # just get the first row
@@ -320,7 +320,7 @@
 
         tids = []
         try:
-            rows = adapter.iter_object_history(cursor, oid_int)
+            rows = adapter.dbiter.iter_object_history(cursor, oid_int)
         except KeyError:
             # The object has no history, at least from the point of view
             # of the current database load connection.
@@ -350,14 +350,15 @@
                 self._restart_load()
             cursor = self._load_cursor
             if cache is None:
-                state, tid_int = self._adapter.load_current(cursor, oid_int)
+                state, tid_int = self._adapter.mover.load_current(
+                    cursor, oid_int)
             else:
                 # get tid_int from the cache or the database
                 cachekey = self._get_oid_cache_key(oid_int)
                 if cachekey:
                     tid_int = cache.get(cachekey)
                 if not cachekey or not tid_int:
-                    tid_int = self._adapter.get_current_tid(
+                    tid_int = self._adapter.mover.get_current_tid(
                         cursor, oid_int)
                     if cachekey and tid_int is not None:
                         cache.set(cachekey, tid_int)
@@ -369,7 +370,7 @@
                 cachekey = 'state:%d:%d' % (oid_int, tid_int)
                 state = cache.get(cachekey)
                 if not state:
-                    state = self._adapter.load_revision(
+                    state = self._adapter.mover.load_revision(
                         cursor, oid_int, tid_int)
                     if state:
                         state = str(state)
@@ -410,12 +411,12 @@
         try:
             if not self._load_transaction_open:
                 self._restart_load()
-            state = self._adapter.load_revision(
+            state = self._adapter.mover.load_revision(
                 self._load_cursor, oid_int, tid_int)
             if state is None and self._store_cursor is not None:
                 # Allow loading data from later transactions
                 # for conflict resolution.
-                state = self._adapter.load_revision(
+                state = self._adapter.mover.load_revision(
                     self._store_cursor, oid_int, tid_int)
         finally:
             self._lock_release()
@@ -444,13 +445,13 @@
                 if not self._load_transaction_open:
                     self._restart_load()
                 cursor = self._load_cursor
-            if not self._adapter.exists(cursor, u64(oid)):
+            if not self._adapter.mover.exists(cursor, u64(oid)):
                 raise POSKeyError(oid)
 
-            state, start_tid = self._adapter.load_before(
+            state, start_tid = self._adapter.mover.load_before(
                 cursor, oid_int, u64(tid))
             if start_tid is not None:
-                end_int = self._adapter.get_object_tid_after(
+                end_int = self._adapter.mover.get_object_tid_after(
                     cursor, oid_int, start_tid)
                 if end_int is not None:
                     end = p64(end_int)
@@ -491,7 +492,7 @@
         try:
             self._max_stored_oid = max(self._max_stored_oid, oid_int)
             # save the data in a temporary table
-            adapter.store_temp(cursor, oid_int, prev_tid_int, data)
+            adapter.mover.store_temp(cursor, oid_int, prev_tid_int, data)
             return None
         finally:
             self._lock_release()
@@ -521,7 +522,7 @@
         try:
             self._max_stored_oid = max(self._max_stored_oid, oid_int)
             # save the data.  Note that data can be None.
-            adapter.restore(cursor, oid_int, tid_int, data)
+            adapter.mover.restore(cursor, oid_int, tid_int, data)
         finally:
             self._lock_release()
 
@@ -556,10 +557,10 @@
                 # get the commit lock and add the transaction now
                 cursor = self._store_cursor
                 packed = (status == 'p')
-                adapter.hold_commit_lock(cursor, ensure_current=True)
+                adapter.locker.hold_commit_lock(cursor, ensure_current=True)
                 tid_int = u64(tid)
                 try:
-                    adapter.add_transaction(
+                    adapter.txncontrol.add_transaction(
                         cursor, tid_int, user, desc, ext, packed)
                 except:
                     self._drop_store_connection()
@@ -583,20 +584,20 @@
 
         adapter = self._adapter
         cursor = self._store_cursor
-        adapter.hold_commit_lock(cursor, ensure_current=True)
+        adapter.locker.hold_commit_lock(cursor, ensure_current=True)
         user, desc, ext = self._ude
 
         # Choose a transaction ID.
         # Base the transaction ID on the database time,
         # while ensuring that the tid of this transaction
         # is greater than any existing tid.
-        last_tid, now = adapter.get_tid_and_time(cursor)
+        last_tid, now = adapter.txncontrol.get_tid_and_time(cursor)
         stamp = TimeStamp(*(time.gmtime(now)[:5] + (now % 60,)))
         stamp = stamp.laterThan(TimeStamp(p64(last_tid)))
         tid = repr(stamp)
 
         tid_int = u64(tid)
-        adapter.add_transaction(cursor, tid_int, user, desc, ext)
+        adapter.txncontrol.add_transaction(cursor, tid_int, user, desc, ext)
         self._tid = tid
 
 
@@ -621,7 +622,7 @@
         # Try to resolve the conflicts.
         resolved = set()  # a set of OIDs
         while True:
-            conflict = adapter.detect_conflict(cursor)
+            conflict = adapter.mover.detect_conflict(cursor)
             if conflict is None:
                 break
 
@@ -638,14 +639,14 @@
             else:
                 # resolved
                 data = rdata
-                self._adapter.replace_temp(
+                self._adapter.mover.replace_temp(
                     cursor, oid_int, prev_tid_int, data)
                 resolved.add(oid)
 
         # Move the new states into the permanent table
         tid_int = u64(self._tid)
         serials = []
-        oid_ints = adapter.move_from_temp(cursor, tid_int)
+        oid_ints = adapter.mover.move_from_temp(cursor, tid_int)
         for oid_int in oid_ints:
             oid = p64(oid_int)
             if oid in resolved:
@@ -674,14 +675,15 @@
         conn = self._store_conn
 
         if self._max_stored_oid > self._max_new_oid:
-            self._adapter.set_min_oid(cursor, self._max_stored_oid + 1)
+            self._adapter.oidallocator.set_min_oid(
+                cursor, self._max_stored_oid + 1)
 
         self._prepare_tid()
         tid_int = u64(self._tid)
 
         serials = self._finish_store()
-        self._adapter.update_current(cursor, tid_int)
-        self._prepared_txn = self._adapter.commit_phase1(
+        self._adapter.mover.update_current(cursor, tid_int)
+        self._prepared_txn = self._adapter.txncontrol.commit_phase1(
             conn, cursor, tid_int)
 
         if self._txn_blobs:
@@ -722,9 +724,9 @@
             self._rollback_load_connection()
             txn = self._prepared_txn
             assert txn is not None
-            self._adapter.commit_phase2(
+            self._adapter.txncontrol.commit_phase2(
                 self._store_conn, self._store_cursor, txn)
-            self._adapter.release_commit_lock(self._store_cursor)
+            self._adapter.locker.release_commit_lock(self._store_cursor)
             cache = self._cache_client
             if cache is not None:
                 if cache.incr('commit_count') is None:
@@ -746,9 +748,9 @@
         try:
             self._rollback_load_connection()
             if self._store_cursor is not None:
-                self._adapter.abort(
+                self._adapter.txncontrol.abort(
                     self._store_conn, self._store_cursor, self._prepared_txn)
-                self._adapter.release_commit_lock(self._store_cursor)
+                self._adapter.locker.release_commit_lock(self._store_cursor)
             if self._txn_blobs:
                 for oid, filename in self._txn_blobs.iteritems():
                     if os.path.exists(filename):
@@ -774,7 +776,7 @@
             if cursor is None:
                 self._open_load_connection()
                 cursor = self._load_cursor
-            oid_int = self._adapter.new_oid(cursor)
+            oid_int = self._adapter.oidallocator.new_oid(cursor)
             self._max_new_oid = max(self._max_new_oid, oid_int)
             return p64(oid_int)
         finally:
@@ -801,9 +803,9 @@
 
         # use a private connection to ensure the most current results
         adapter = self._adapter
-        conn, cursor = adapter.open()
+        conn, cursor = adapter.connmanager.open()
         try:
-            rows = adapter.iter_transactions(cursor)
+            rows = adapter.dbiter.iter_transactions(cursor)
             i = 0
             res = []
             for tid_int, user, desc, ext in rows:
@@ -823,7 +825,7 @@
             return res
 
         finally:
-            adapter.close(conn, cursor)
+            adapter.connmanager.close(conn, cursor)
 
     def history(self, oid, version=None, size=1, filter=None):
         self._lock_acquire()
@@ -831,7 +833,8 @@
             cursor = self._load_cursor
             oid_int = u64(oid)
             try:
-                rows = self._adapter.iter_object_history(cursor, oid_int)
+                rows = self._adapter.dbiter.iter_object_history(
+                    cursor, oid_int)
             except KeyError:
                 raise POSKeyError(oid)
 
@@ -881,30 +884,31 @@
             cursor = self._store_cursor
             assert cursor is not None
 
-            adapter.hold_pack_lock(cursor)
+            adapter.locker.hold_pack_lock(cursor)
             try:
                 # Note that _prepare_tid acquires the commit lock.
                 # The commit lock must be acquired after the pack lock
                 # because the database adapters also acquire in that
                 # order during packing.
                 self._prepare_tid()
-                adapter.verify_undoable(cursor, undo_tid_int)
+                adapter.packundo.verify_undoable(cursor, undo_tid_int)
 
                 self_tid_int = u64(self._tid)
-                copied = adapter.undo(cursor, undo_tid_int, self_tid_int)
+                copied = adapter.packundo.undo(
+                    cursor, undo_tid_int, self_tid_int)
                 oids = [p64(oid_int) for oid_int, _ in copied]
 
                 # Update the current object pointers immediately, so that
                 # subsequent undo operations within this transaction will see
                 # the new current objects.
-                adapter.update_current(cursor, self_tid_int)
+                adapter.mover.update_current(cursor, self_tid_int)
 
                 if self.fshelper is not None:
                     self._copy_undone_blobs(copied)
 
                 return self._tid, oids
             finally:
-                adapter.release_pack_lock(cursor)
+                adapter.locker.release_pack_lock(cursor)
         finally:
             self._lock_release()
 
@@ -930,7 +934,7 @@
 
             self._add_blob_to_transaction(oid, new_fn)
 
-    def pack(self, t, referencesf, sleep=time.sleep):
+    def pack(self, t, referencesf, sleep=None):
         if self._is_read_only:
             raise POSException.ReadOnlyError()
 
@@ -950,12 +954,13 @@
         # connections to do the actual work, allowing the adapter
         # to use special transaction modes for packing.
         adapter = self._adapter
-        lock_conn, lock_cursor = adapter.open()
+        lock_conn, lock_cursor = adapter.connmanager.open()
         try:
-            adapter.hold_pack_lock(lock_cursor)
+            adapter.locker.hold_pack_lock(lock_cursor)
             try:
                 # Find the latest commit before or at the pack time.
-                tid_int = adapter.choose_pack_transaction(pack_point_int)
+                tid_int = adapter.packundo.choose_pack_transaction(
+                    pack_point_int)
                 if tid_int is None:
                     log.debug("all transactions before %s have already "
                         "been packed", time.ctime(t))
@@ -971,7 +976,8 @@
                 # In pre_pack, the adapter fills tables with
                 # information about what to pack.  The adapter
                 # must not actually pack anything yet.
-                adapter.pre_pack(tid_int, get_references, self._options)
+                adapter.packundo.pre_pack(
+                    tid_int, get_references, self._options)
 
                 if self._options.pack_dry_run:
                     log.info("pack: dry run complete")
@@ -981,13 +987,13 @@
                         packed_func = self._after_pack
                     else:
                         packed_func = None
-                    adapter.pack(tid_int, self._options, sleep=sleep,
+                    adapter.packundo.pack(tid_int, self._options, sleep=sleep,
                         packed_func=packed_func)
             finally:
-                adapter.release_pack_lock(lock_cursor)
+                adapter.locker.release_pack_lock(lock_cursor)
         finally:
             lock_conn.rollback()
-            adapter.close(lock_conn, lock_cursor)
+            adapter.connmanager.close(lock_conn, lock_cursor)
         self.sync()
 
     def _after_pack(self, oid_int, tid_int):
@@ -1083,7 +1089,7 @@
                 ignore_tid = None
 
             # get a list of changed OIDs and the most recent tid
-            oid_ints, new_polled_tid = self._adapter.poll_invalidations(
+            oid_ints, new_polled_tid = self._adapter.poller.poll_invalidations(
                 conn, cursor, self._prev_polled_tid, ignore_tid)
             self._prev_polled_tid = new_polled_tid
 
@@ -1247,7 +1253,7 @@
 
     def __init__(self, adapter, start, stop):
         self._adapter = adapter
-        self._conn, self._cursor = self._adapter.open_for_load()
+        self._conn, self._cursor = self._adapter.connmanager.open_for_load()
         self._closed = False
 
         if start is not None:
@@ -1260,12 +1266,12 @@
             stop_int = None
 
         # _transactions: [(tid, username, description, extension, packed)]
-        self._transactions = list(adapter.iter_transactions_range(
+        self._transactions = list(adapter.dbiter.iter_transactions_range(
             self._cursor, start_int, stop_int))
         self._index = 0
 
     def close(self):
-        self._adapter.close(self._conn, self._cursor)
+        self._adapter.connmanager.close(self._conn, self._cursor)
         self._closed = True
 
     def iterator(self):
@@ -1326,7 +1332,7 @@
         adapter = record._trans_iter._adapter
         tid_int = record._tid_int
         self.tid = record.tid
-        self._records = list(adapter.iter_objects(cursor, tid_int))
+        self._records = list(adapter.dbiter.iter_objects(cursor, tid_int))
         self._index = 0
 
     def __iter__(self):

Modified: relstorage/trunk/relstorage/tests/reltestbase.py
===================================================================
--- relstorage/trunk/relstorage/tests/reltestbase.py	2009-09-24 18:00:10 UTC (rev 104498)
+++ relstorage/trunk/relstorage/tests/reltestbase.py	2009-09-24 18:06:37 UTC (rev 104499)
@@ -74,8 +74,8 @@
     ):
 
     def checkDropAndPrepare(self):
-        self._storage._adapter.drop_all()
-        self._storage._adapter.prepare_schema()
+        self._storage._adapter.schema.drop_all()
+        self._storage._adapter.schema.prepare()
 
     def checkCrossConnectionInvalidation(self):
         # Verify connections see updated state at txn boundaries

Modified: relstorage/trunk/relstorage/tests/speedtest.py
===================================================================
--- relstorage/trunk/relstorage/tests/speedtest.py	2009-09-24 18:00:10 UTC (rev 104498)
+++ relstorage/trunk/relstorage/tests/speedtest.py	2009-09-24 18:06:37 UTC (rev 104499)
@@ -196,8 +196,8 @@
     def postgres_test(self):
         from relstorage.adapters.postgresql import PostgreSQLAdapter
         adapter = PostgreSQLAdapter('dbname=relstoragetest')
-        adapter.prepare_schema()
-        adapter.zap_all()
+        adapter.schema.prepare()
+        adapter.schema.zap_all()
         def make_storage():
             return RelStorage(adapter)
         return self.run_tests(make_storage)
@@ -207,8 +207,8 @@
         from relstorage.tests.testoracle import getOracleParams
         user, password, dsn = getOracleParams()
         adapter = OracleAdapter(user, password, dsn)
-        adapter.prepare_schema()
-        adapter.zap_all()
+        adapter.schema.prepare()
+        adapter.schema.zap_all()
         def make_storage():
             return RelStorage(adapter)
         return self.run_tests(make_storage)
@@ -216,8 +216,8 @@
     def mysql_test(self):
         from relstorage.adapters.mysql import MySQLAdapter
         adapter = MySQLAdapter(db='relstoragetest')
-        adapter.prepare_schema()
-        adapter.zap_all()
+        adapter.schema.prepare()
+        adapter.schema.zap_all()
         def make_storage():
             return RelStorage(adapter)
         return self.run_tests(make_storage)



More information about the checkins mailing list