[Checkins] SVN: relstorage/trunk/ Various performance enhancements:

Shane Hathaway shane at hathawaymix.org
Wed Jan 30 01:55:52 EST 2008


Log message for revision 83311:
  Various performance enhancements:
  
  * Made strict two phase commit optional
  
  * After verifying it's safe, switched from serializable to read committed isolation mode in a lot of places
  
  * Buffer new objects in a table rather than a file
  
  * In PostgreSQL, poll using a prepared statement instead of listen/notify
  
  

Changed:
  U   relstorage/trunk/CHANGELOG.txt
  U   relstorage/trunk/relstorage/adapters/oracle.py
  U   relstorage/trunk/relstorage/adapters/postgresql.py
  D   relstorage/trunk/relstorage/autotemp.py
  U   relstorage/trunk/relstorage/relstorage.py
  U   relstorage/trunk/relstorage/tests/reltestbase.py

-=-
Modified: relstorage/trunk/CHANGELOG.txt
===================================================================
--- relstorage/trunk/CHANGELOG.txt	2008-01-30 00:40:34 UTC (rev 83310)
+++ relstorage/trunk/CHANGELOG.txt	2008-01-30 06:55:51 UTC (rev 83311)
@@ -11,7 +11,30 @@
 - Moved to svn.zope.org and switched to ZPL 2.1 (required for projects
   on svn.zope.org.)
 
+- Made two-phase commit optional in both Oracle and PostgreSQL.  They
+  both use commit_lock in such a way that the commit is not likely to
+  fail in the second phase.
 
+- Switched most database transaction isolation levels from serializable
+  to read committed.  It turns out that commit_lock already provides
+  the serializability guarantees we need, so it is safe to take advantage
+  of the potential speed gains.  The one major exception is the load
+  connection, which requires an unchanging view of the database.
+
+- Stored objects are now buffered in a database table rather than a file.
+
+- Stopped using the LISTEN and NOTIFY statements in PostgreSQL since
+  they are not reliable enough.
+
+- Started using a prepared statement in PostgreSQL for getting the
+  newest transaction ID quickly.
+
+- Removed the code in the Oracle adapter for retrying connection attempts.
+  (It is better to just reconfigure Oracle.)
+
+
+
+
 PGStorage 0.4
 
 - Began using the PostgreSQL LISTEN and NOTIFY statements as a shortcut

Modified: relstorage/trunk/relstorage/adapters/oracle.py
===================================================================
--- relstorage/trunk/relstorage/adapters/oracle.py	2008-01-30 00:40:34 UTC (rev 83310)
+++ relstorage/trunk/relstorage/adapters/oracle.py	2008-01-30 06:55:51 UTC (rev 83311)
@@ -14,12 +14,9 @@
 """Oracle adapter for RelStorage."""
 
 import logging
-import os
 import re
-import socket
 import thread
 import time
-import traceback
 import cx_Oracle
 from ZODB.POSException import ConflictError, StorageError, UndoError
 
@@ -29,11 +26,10 @@
 class OracleAdapter(object):
     """Oracle adapter for RelStorage."""
 
-    def __init__(self, user, password, dsn,
-            arraysize=64, max_connect_attempts=3):
+    def __init__(self, user, password, dsn, twophase=False, arraysize=64):
         self._params = (user, password, dsn)
+        self._twophase = twophase
         self._arraysize = arraysize
-        self._max_connect_attempts = max_connect_attempts
 
     def create_schema(self, cursor):
         """Create the database tables."""
@@ -77,6 +73,14 @@
             FOREIGN KEY (zoid, tid) REFERENCES object_state
         );
 
+        -- States that will soon be stored
+        CREATE GLOBAL TEMPORARY TABLE temp_store (
+            zoid        NUMBER(20) NOT NULL PRIMARY KEY,
+            prev_tid    NUMBER(20) NOT NULL,
+            md5         CHAR(32),
+            state       BLOB
+        ) ON COMMIT DELETE ROWS;
+
         -- During packing, an exclusive lock is held on pack_lock.
         CREATE TABLE pack_lock (dummy CHAR);
 
@@ -202,25 +206,12 @@
             self.close(conn, cursor)
 
 
-    def open(self, transaction_mode="ISOLATION LEVEL SERIALIZABLE",
+    def open(self, transaction_mode="ISOLATION LEVEL READ COMMITTED",
             twophase=False):
         """Open a database connection and return (conn, cursor)."""
         try:
-            attempt = 1
-            while True:
-                try:
-                    kw = {'twophase': twophase}  #, 'threaded': True}
-                    conn = cx_Oracle.connect(*self._params, **kw)
-                    break
-                except cx_Oracle.DatabaseError, e:
-                    log.warning("Connection failed: %s", e)
-                    attempt += 1
-                    if attempt > self._max_connect_attempts:
-                        raise
-                    else:
-                        # Pause, then try to connect again
-                        time.sleep(2**(attempt - 1))
-
+            kw = {'twophase': twophase}  #, 'threaded': True}
+            conn = cx_Oracle.connect(*self._params, **kw)
             cursor = conn.cursor()
             cursor.arraysize = self._arraysize
             if transaction_mode:
@@ -228,7 +219,7 @@
             return conn, cursor
 
         except cx_Oracle.OperationalError:
-            log.debug("Unable to connect in %s", repr(self))
+            log.error("Unable to connect to DSN %s", self._params[2])
             raise
 
     def close(self, conn, cursor):
@@ -362,49 +353,60 @@
         else:
             return None
 
-    def get_object_tids(self, cursor, oids):
-        """Returns a map containing the current tid for each oid in a list.
-
-        OIDs that do not exist are not included.
-        """
-        # query in chunks to avoid running into a maximum query length
-        chunk_size = 512
-        res = {}
-        for i in xrange(0, len(oids), chunk_size):
-            chunk = oids[i : i + chunk_size]
-            oid_str = ','.join(str(oid) for oid in chunk)
-            stmt = """
-            SELECT zoid, tid FROM current_object WHERE zoid IN (%s)
-            """ % oid_str
-            cursor.execute(stmt)
-            res.update(dict(iter(cursor)))
-        return res
-
-    def open_for_commit(self, cursor=None):
+    def open_for_store(self):
         """Open and initialize a connection for storing objects.
 
         Returns (conn, cursor).
         """
-        if cursor is None:
+        if self._twophase:
             conn, cursor = self.open(transaction_mode=None, twophase=True)
+            try:
+                stmt = """
+                SELECT SYS_CONTEXT('USERENV', 'SID') FROM DUAL
+                """
+                cursor.execute(stmt)
+                xid = str(cursor.fetchone()[0])
+                conn.begin(0, xid, '0')
+            except:
+                self.close(conn, cursor)
+                raise
         else:
-            conn = cursor.connection
-        # Use an xid that is unique to this Oracle connection, assuming
-        # the hostname is unique within the Oracle cluster.  The xid
-        # can have up to 64 bytes.
-        xid = '%s-%d-%d' % (socket.gethostname(), os.getpid(), id(conn))
-        conn.begin(0, xid, '0')
-        cursor.execute("LOCK TABLE commit_lock IN EXCLUSIVE MODE")
+            conn, cursor = self.open()
         return conn, cursor
 
-    def restart_commit(self, cursor):
-        """Rollback the commit and start over.
+    def store_temp(self, cursor, oid, prev_tid, md5sum, data):
+        """Store an object in the temporary table."""
+        cursor.setinputsizes(data=cx_Oracle.BLOB)
+        stmt = """
+        INSERT INTO temp_store (zoid, prev_tid, md5, state)
+        VALUES (:oid, :prev_tid, :md5sum, :data)
+        """
+        cursor.execute(stmt, oid=oid, prev_tid=prev_tid,
+            md5sum=md5sum, data=cx_Oracle.Binary(data))
 
-        The cursor must be the type created by open_for_commit().
+    def replace_temp(self, cursor, oid, prev_tid, md5sum, data):
+        """Replace an object in the temporary table."""
+        cursor.setinputsizes(data=cx_Oracle.BLOB)
+        stmt = """
+        UPDATE temp_store SET
+            prev_tid = :prev_tid,
+            md5 = :md5sum,
+            state = :data
+        WHERE zoid = :oid
         """
-        cursor.connection.rollback()
-        self.open_for_commit(cursor)
+        cursor.execute(stmt, oid=oid, prev_tid=prev_tid,
+            md5sum=md5sum, data=cx_Oracle.Binary(data))
 
+    def start_commit(self, cursor):
+        """Prepare to commit."""
+        cursor.execute("SAVEPOINT start_commit")
+        cursor.execute("LOCK TABLE commit_lock IN EXCLUSIVE MODE")
+
+    def restart_commit(self, cursor):
+        """Rollback the attempt to commit and start over."""
+        cursor.execute("ROLLBACK TO SAVEPOINT start_commit")
+        cursor.execute("LOCK TABLE commit_lock IN EXCLUSIVE MODE")
+
     def _parse_dsinterval(self, s):
         """Convert an Oracle dsinterval (as a string) to a float."""
         mo = re.match(r'([+-]\d+) (\d+):(\d+):([0-9.]+)', s)
@@ -443,55 +445,67 @@
         except cx_Oracle.IntegrityError:
             raise ConflictError
 
-    def store(self, cursor, oid, tid, prev_tid, md5sum, data):
-        """Store an object.  May raise ConflictError."""
-        try:
-            cursor.setinputsizes(data=cx_Oracle.BLOB)
-            stmt = """
-            INSERT INTO object_state (zoid, tid, prev_tid, md5, state)
-            VALUES (:oid, :tid, :prev_tid, :md5sum, :data)
-            """
-            cursor.execute(stmt, oid=oid, tid=tid, prev_tid=prev_tid,
-                md5sum=md5sum, data=cx_Oracle.Binary(data))
-        except cx_Oracle.ProgrammingError, e:
-            # This can happen if another thread is currently packing
-            # and prev_tid refers to a transaction being packed.
-            if 'concurrent' in e.args[0]:
-                raise ConflictError(e)
-            else:
-                raise
+    def detect_conflict(self, cursor):
+        """Find one conflict in the data about to be committed.
 
+        If there is a conflict, returns (oid, prev_tid, attempted_prev_tid,
+        attempted_data).  If there is no conflict, returns None.
+        """
+        stmt = """
+        SELECT temp_store.zoid, current_object.tid, temp_store.prev_tid,
+            temp_store.state
+        FROM temp_store
+            JOIN current_object ON (temp_store.zoid = current_object.zoid)
+        WHERE temp_store.prev_tid != current_object.tid
+        """
+        cursor.execute(stmt)
+        for oid, prev_tid, attempted_prev_tid, data in cursor:
+            return oid, prev_tid, attempted_prev_tid, data.read()
+        return None
+
+    def move_from_temp(self, cursor, tid):
+        """Moved the temporarily stored objects to permanent storage.
+
+        Returns the list of oids stored.
+        """
+        stmt = """
+        INSERT INTO object_state (zoid, tid, prev_tid, md5, state)
+        SELECT zoid, :tid, prev_tid, md5, state
+        FROM temp_store
+        """
+        cursor.execute(stmt, tid=tid)
+
+        stmt = """
+        SELECT zoid FROM temp_store
+        """
+        cursor.execute(stmt)
+        return [oid for (oid,) in cursor]
+
     def update_current(self, cursor, tid):
         """Update the current object pointers.
 
         tid is the integer tid of the transaction being committed.
         """
-        try:
-            # Insert objects created in this transaction into current_object.
-            stmt = """
-            INSERT INTO current_object (zoid, tid)
-            SELECT zoid, tid FROM object_state
+        # Insert objects created in this transaction into current_object.
+        stmt = """
+        INSERT INTO current_object (zoid, tid)
+        SELECT zoid, tid FROM object_state
+        WHERE tid = :1
+            AND prev_tid = 0
+        """
+        cursor.execute(stmt, (tid,))
+
+        # Change existing objects.
+        stmt = """
+        UPDATE current_object SET tid = :1
+        WHERE zoid IN (
+            SELECT zoid FROM object_state
             WHERE tid = :1
-                AND prev_tid = 0
-            """
-            cursor.execute(stmt, (tid,))
+                AND prev_tid != 0
+        )
+        """
+        cursor.execute(stmt, (tid,))
 
-            # Change existing objects.
-            stmt = """
-            UPDATE current_object SET tid = :1
-            WHERE zoid IN (
-                SELECT zoid FROM object_state
-                WHERE tid = :1
-                    AND prev_tid != 0
-            )
-            """
-            cursor.execute(stmt, (tid,))
-        except cx_Oracle.ProgrammingError, e:
-            if 'concurrent' in e.args[0]:
-                raise ConflictError(e)
-            else:
-                raise
-
     def commit_phase1(self, cursor, tid):
         """Begin a commit.  Returns the transaction name.
 
@@ -499,14 +513,9 @@
         meaning that if commit_phase2() would raise any error, the error
         should be raised in commit_phase1() instead.
         """
-        try:
+        if self._twophase:
             cursor.connection.prepare()
-            return '-'  # the transaction name is not important
-        except cx_Oracle.ProgrammingError, e:
-            if 'concurrent' in e.args[0]:
-                raise ConflictError(e)
-            else:
-                raise
+        return '-'
 
     def commit_phase2(self, cursor, txn):
         """Final transaction commit."""
@@ -866,7 +875,7 @@
         """Pack.  Requires populated pack tables."""
 
         # Read committed mode is sufficient.
-        conn, cursor = self.open('ISOLATION LEVEL READ COMMITTED')
+        conn, cursor = self.open()
         try:
             try:
 
@@ -967,8 +976,7 @@
             # find out the tid of the most recent transaction.
             stmt = "SELECT MAX(tid) FROM transaction"
             cursor.execute(stmt)
-            rows = cursor.fetchall()
-            new_polled_tid = rows[0][0]
+            new_polled_tid = list(cursor)[0][0]
 
             if prev_polled_tid is None:
                 # This is the first time the connection has polled.

Modified: relstorage/trunk/relstorage/adapters/postgresql.py
===================================================================
--- relstorage/trunk/relstorage/adapters/postgresql.py	2008-01-30 00:40:34 UTC (rev 83310)
+++ relstorage/trunk/relstorage/adapters/postgresql.py	2008-01-30 06:55:51 UTC (rev 83311)
@@ -32,8 +32,9 @@
 class PostgreSQLAdapter(object):
     """PostgreSQL adapter for RelStorage."""
 
-    def __init__(self, dsn=''):
+    def __init__(self, dsn='', twophase=False):
         self._dsn = dsn
+        self._twophase = twophase
 
     def create_schema(self, cursor):
         """Create the database tables."""
@@ -155,8 +156,11 @@
         try:
             try:
                 cursor.execute("""
-                TRUNCATE object_refs_added, object_ref, current_object,
-                    object_state, transaction;
+                DELETE FROM object_refs_added;
+                DELETE FROM object_ref;
+                DELETE FROM current_object;
+                DELETE FROM object_state;
+                DELETE FROM transaction;
                 -- Create a special transaction to represent object creation.
                 INSERT INTO transaction (tid, username, description)
                     VALUES (0, '', '');
@@ -171,7 +175,8 @@
             self.close(conn, cursor)
 
 
-    def open(self, isolation=psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE):
+    def open(self,
+            isolation=psycopg2.extensions.ISOLATION_LEVEL_READ_COMMITTED):
         """Open a database connection and return (conn, cursor)."""
         try:
             conn = psycopg2.connect(self._dsn)
@@ -199,8 +204,16 @@
 
         Returns (conn, cursor).
         """
-        conn, cursor = self.open()
-        cursor.execute("LISTEN invalidate")
+        conn, cursor = self.open(
+            psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE)
+        stmt = """
+        PREPARE get_latest_tid AS
+        SELECT tid
+        FROM transaction
+        ORDER BY tid DESC
+        LIMIT 1
+        """
+        cursor.execute(stmt)
         return conn, cursor
 
     def restart_load(self, cursor):
@@ -217,10 +230,8 @@
         """Returns the approximate size of the database in bytes"""
         conn, cursor = self.open()
         try:
-            cursor.execute("SELECT SUM(relpages) FROM pg_class")
-            # A relative page on postgres is 8K
-            relpages = cursor.fetchone()[0]
-            return relpages * 8 * 1024
+            cursor.execute("SELECT pg_database_size(current_database())")
+            return cursor.fetchone()[0]
         finally:
             self.close(conn, cursor)
 
@@ -315,52 +326,70 @@
         else:
             return None
 
-    def get_object_tids(self, cursor, oids):
-        """Returns a map containing the current tid for each oid in a list.
+    def open_for_store(self):
+        """Open and initialize a connection for storing objects.
 
-        OIDs that do not exist are not included.
+        Returns (conn, cursor).
         """
-        # query in chunks to avoid running into a maximum query length
-        chunk_size = 512
-        res = {}
-        for i in xrange(0, len(oids), chunk_size):
-            chunk = oids[i : i + chunk_size]
-            oid_str = ','.join(str(oid) for oid in chunk)
-            stmt = """
-            SELECT zoid, tid FROM current_object WHERE zoid IN (%s)
-            """ % oid_str
+        conn, cursor = self.open()
+        try:
+            if self._twophase:
+                # PostgreSQL does not allow two phase transactions
+                # to use temporary tables. :-(
+                stmt = """
+                CREATE TABLE temp_store (
+                    zoid        BIGINT NOT NULL,
+                    prev_tid    BIGINT NOT NULL,
+                    md5         CHAR(32),
+                    state       BYTEA
+                );
+                CREATE UNIQUE INDEX temp_store_zoid ON temp_store (zoid)
+                """
+            else:
+                stmt = """
+                CREATE TEMPORARY TABLE temp_store (
+                    zoid        BIGINT NOT NULL,
+                    prev_tid    BIGINT NOT NULL,
+                    md5         CHAR(32),
+                    state       BYTEA
+                ) ON COMMIT DROP;
+                CREATE UNIQUE INDEX temp_store_zoid ON temp_store (zoid)
+                """
             cursor.execute(stmt)
-            res.update(dict(iter(cursor)))
-        return res
+            return conn, cursor
+        except:
+            self.close(conn, cursor)
+            raise
 
-    def open_for_commit(self):
-        """Open and initialize a connection for storing objects.
+    def store_temp(self, cursor, oid, prev_tid, md5sum, data):
+        """Store an object."""
+        stmt = """
+        INSERT INTO temp_store (zoid, prev_tid, md5, state)
+        VALUES (%s, %s, %s, decode(%s, 'base64'))
+        """
+        cursor.execute(stmt, (oid, prev_tid, md5sum, encodestring(data)))
 
-        Returns (conn, cursor).
+    def replace_temp(self, cursor, oid, prev_tid, md5sum, data):
+        """Replace an object in the temporary table."""
+        stmt = """
+        UPDATE temp_store SET
+            prev_tid = %s,
+            md5 = %s,
+            state = decode(%s, 'base64')
+        WHERE zoid = %s
         """
-        # psycopg2 doesn't support prepared transactions, so
-        # tell psycopg2 to use the autocommit isolation level, but covertly
-        # switch to the serializable isolation level.
-        conn, cursor = self.open(
-            isolation=psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT
-            )
-        cursor.execute("""
-        BEGIN ISOLATION LEVEL SERIALIZABLE;
-        LOCK commit_lock IN EXCLUSIVE MODE
-        """)
-        return conn, cursor
+        cursor.execute(stmt, (prev_tid, md5sum, encodestring(data), oid))
 
+    def start_commit(self, cursor):
+        """Prepare to commit."""
+        cursor.execute("SAVEPOINT start_commit")
+        cursor.execute("LOCK TABLE commit_lock IN EXCLUSIVE MODE")
+
     def restart_commit(self, cursor):
-        """Rollback the commit and start over.
+        """Rollback the attempt to commit and start over."""
+        cursor.execute("ROLLBACK TO SAVEPOINT start_commit")
+        cursor.execute("LOCK TABLE commit_lock IN EXCLUSIVE MODE")
 
-        The cursor must be the type created by open_for_commit().
-        """
-        cursor.execute("""
-        ROLLBACK;
-        BEGIN ISOLATION LEVEL SERIALIZABLE;
-        LOCK commit_lock IN EXCLUSIVE MODE
-        """)
-
     def get_tid_and_time(self, cursor):
         """Returns the most recent tid and the current database time.
 
@@ -391,52 +420,67 @@
         except psycopg2.IntegrityError, e:
             raise ConflictError(e)
 
-    def store(self, cursor, oid, tid, prev_tid, md5sum, data):
-        """Store an object.  May raise ConflictError."""
+    def detect_conflict(self, cursor):
+        """Find one conflict in the data about to be committed.
+
+        If there is a conflict, returns (oid, prev_tid, attempted_prev_tid,
+        attempted_data).  If there is no conflict, returns None.
+        """
         stmt = """
+        SELECT temp_store.zoid, current_object.tid, temp_store.prev_tid,
+            encode(temp_store.state, 'base64')
+        FROM temp_store
+            JOIN current_object ON (temp_store.zoid = current_object.zoid)
+        WHERE temp_store.prev_tid != current_object.tid
+        LIMIT 1
+        """
+        cursor.execute(stmt)
+        if cursor.rowcount:
+            oid, prev_tid, attempted_prev_tid, data = cursor.fetchone()
+            return oid, prev_tid, attempted_prev_tid, decodestring(data)
+        return None
+
+    def move_from_temp(self, cursor, tid):
+        """Moved the temporarily stored objects to permanent storage.
+
+        Returns the list of oids stored.
+        """
+        stmt = """
         INSERT INTO object_state (zoid, tid, prev_tid, md5, state)
-        VALUES (%s, %s, %s, %s, decode(%s, 'base64'))
+        SELECT zoid, %s, prev_tid, md5, state
+        FROM temp_store
         """
-        try:
-            cursor.execute(stmt, (
-                oid, tid, prev_tid, md5sum, encodestring(data)))
-        except psycopg2.ProgrammingError, e:
-            # This can happen if another thread is currently packing
-            # and prev_tid refers to a transaction being packed.
-            if 'concurrent update' in e.args[0]:
-                raise ConflictError(e)
-            else:
-                raise
+        cursor.execute(stmt, (tid,))
 
+        stmt = """
+        SELECT zoid FROM temp_store
+        """
+        cursor.execute(stmt)
+        return [oid for (oid,) in cursor]
+
     def update_current(self, cursor, tid):
         """Update the current object pointers.
 
         tid is the integer tid of the transaction being committed.
         """
-        try:
-            cursor.execute("""
-            -- Insert objects created in this transaction into current_object.
-            INSERT INTO current_object (zoid, tid)
-            SELECT zoid, tid FROM object_state
+        cursor.execute("""
+        -- Insert objects created in this transaction into current_object.
+        INSERT INTO current_object (zoid, tid)
+        SELECT zoid, tid FROM object_state
+        WHERE tid = %(tid)s
+            AND prev_tid = 0;
+
+        -- Change existing objects.  To avoid deadlocks,
+        -- update in OID order.
+        UPDATE current_object SET tid = %(tid)s
+        WHERE zoid IN (
+            SELECT zoid FROM object_state
             WHERE tid = %(tid)s
-                AND prev_tid = 0;
+                AND prev_tid != 0
+            ORDER BY zoid
+        )
+        """, {'tid': tid})
 
-            -- Change existing objects.  To avoid deadlocks,
-            -- update in OID order.
-            UPDATE current_object SET tid = %(tid)s
-            WHERE zoid IN (
-                SELECT zoid FROM object_state
-                WHERE tid = %(tid)s
-                    AND prev_tid != 0
-                ORDER BY zoid
-            )
-            """, {'tid': tid})
-        except psycopg2.ProgrammingError, e:
-            if 'concurrent update' in e.args[0]:
-                raise ConflictError(e)
-            else:
-                raise
-
     def commit_phase1(self, cursor, tid):
         """Begin a commit.  Returns the transaction name.
 
@@ -444,27 +488,33 @@
         meaning that if commit_phase2() would raise any error, the error
         should be raised in commit_phase1() instead.
         """
-        try:
+        if self._twophase:
             txn = 'T%d' % tid
-            stmt = "NOTIFY invalidate; PREPARE TRANSACTION %s"
+            stmt = """
+            DROP TABLE temp_store;
+            PREPARE TRANSACTION %s
+            """
             cursor.execute(stmt, (txn,))
+            cursor.connection.set_isolation_level(
+                psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
             return txn
-        except psycopg2.ProgrammingError, e:
-            if 'concurrent update' in e.args[0]:
-                raise ConflictError(e)
-            else:
-                raise
+        else:
+            return '-'
 
     def commit_phase2(self, cursor, txn):
         """Final transaction commit."""
-        cursor.execute('COMMIT PREPARED %s', (txn,))
+        if self._twophase:
+            cursor.execute('COMMIT PREPARED %s', (txn,))
+        else:
+            cursor.connection.commit()
 
     def abort(self, cursor, txn=None):
         """Abort the commit.  If txn is not None, phase 1 is also aborted."""
-        if txn is not None:
-            cursor.execute('ROLLBACK PREPARED %s', (txn,))
+        if self._twophase:
+            if txn is not None:
+                cursor.execute('ROLLBACK PREPARED %s', (txn,))
         else:
-            cursor.execute('ROLLBACK')
+            cursor.connection.rollback()
 
     def new_oid(self, cursor):
         """Return a new, unused OID."""
@@ -827,8 +877,7 @@
         """Pack.  Requires populated pack tables."""
 
         # Read committed mode is sufficient.
-        conn, cursor = self.open(
-            isolation=psycopg2.extensions.ISOLATION_LEVEL_READ_COMMITTED)
+        conn, cursor = self.open()
         try:
             try:
 
@@ -926,20 +975,8 @@
         if the database has disconnected.
         """
         try:
-            if prev_polled_tid is not None:
-                if not cursor.isready():
-                    # No invalidate notifications arrived.
-                    return (), prev_polled_tid
-                del conn.notifies[:]
-
             # find out the tid of the most recent transaction.
-            stmt = """
-            SELECT tid
-            FROM transaction
-            ORDER BY tid desc
-            LIMIT 1
-            """
-            cursor.execute(stmt)
+            cursor.execute("EXECUTE get_latest_tid")
             # Expect the transaction table to always have at least one row.
             assert cursor.rowcount == 1
             new_polled_tid = cursor.fetchone()[0]

Deleted: relstorage/trunk/relstorage/autotemp.py
===================================================================
--- relstorage/trunk/relstorage/autotemp.py	2008-01-30 00:40:34 UTC (rev 83310)
+++ relstorage/trunk/relstorage/autotemp.py	2008-01-30 06:55:51 UTC (rev 83311)
@@ -1,43 +0,0 @@
-##############################################################################
-#
-# Copyright (c) 2008 Zope Corporation and Contributors.
-# All Rights Reserved.
-#
-# This software is subject to the provisions of the Zope Public License,
-# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
-# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
-# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
-# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
-# FOR A PARTICULAR PURPOSE.
-#
-##############################################################################
-"""A temporary file that switches from StringIO to TemporaryFile if needed.
-
-This could be a useful addition to Python's tempfile module.
-"""
-
-from cStringIO import StringIO
-import tempfile
-
-class AutoTemporaryFile:
-    """Initially a StringIO, but becomes a TemporaryFile if it grows too big"""
-    def __init__(self, threshold=10485760):
-        self._threshold = threshold
-        self._f = f = StringIO()
-        # delegate most methods
-        for m in ('read', 'readline', 'seek', 'tell', 'close'):
-            setattr(self, m, getattr(f, m))
-
-    def write(self, data):
-        threshold = self._threshold
-        if threshold > 0 and self.tell() + len(data) >= threshold:
-            # convert to TemporaryFile
-            f = tempfile.TemporaryFile()
-            f.write(self._f.getvalue())
-            f.seek(self.tell())
-            self._f = f
-            self._threshold = 0
-            # delegate all important methods
-            for m in ('write', 'read', 'readline', 'seek', 'tell', 'close'):
-                setattr(self, m, getattr(f, m))
-        self._f.write(data)

Modified: relstorage/trunk/relstorage/relstorage.py
===================================================================
--- relstorage/trunk/relstorage/relstorage.py	2008-01-30 00:40:34 UTC (rev 83310)
+++ relstorage/trunk/relstorage/relstorage.py	2008-01-30 06:55:51 UTC (rev 83311)
@@ -28,8 +28,6 @@
 from ZODB import ConflictResolution, POSException
 from persistent.TimeStamp import TimeStamp
 
-from autotemp import AutoTemporaryFile
-
 log = logging.getLogger("relstorage")
 
 # Set the RELSTORAGE_ABORT_EARLY environment variable when debugging
@@ -50,7 +48,10 @@
         self._name = name
         self._is_read_only = read_only
 
-        # load_conn and load_cursor are always open
+        if create:
+            self._adapter.prepare_schema()
+
+        # load_conn and load_cursor are usually open
         self._load_conn = None
         self._load_cursor = None
         self._load_started = False
@@ -59,21 +60,11 @@
         self._store_conn = None
         self._store_cursor = None
 
-        if create:
-            self._adapter.prepare_schema()
-
         BaseStorage.__init__(self, name)
 
         self._tid = None
         self._ltid = None
 
-        # _tbuf is a temporary file that contains pickles of data to be
-        # committed.  _pickler writes pickles to that file.  _stored_oids
-        # is a list of integer OIDs to be stored.
-        self._tbuf = None
-        self._pickler = None
-        self._stored_oids = None
-
         # _prepared_txn is the name of the transaction to commit in the
         # second phase.
         self._prepared_txn = None
@@ -256,21 +247,24 @@
         assert self._prepared_txn is None
         md5sum = md5.new(data).hexdigest()
 
+        adapter = self._adapter
+        cursor = self._store_cursor
+        assert cursor is not None
+        oid_int = u64(oid)
+        if serial:
+            prev_tid_int = u64(serial)
+        else:
+            prev_tid_int = 0
+
         self._lock_acquire()
         try:
-            # buffer the stores
-            if self._tbuf is None:
-                self._tbuf = AutoTemporaryFile()
-                self._pickler = cPickle.Pickler(
-                    self._tbuf, cPickle.HIGHEST_PROTOCOL)
-                self._stored_oids = []
-
-            self._pickler.dump((oid, serial, md5sum, data))
-            self._stored_oids.append(u64(oid))
+            # save the data in a temporary table
+            adapter.store_temp(cursor, oid_int, prev_tid_int, md5sum, data)
             return None
         finally:
             self._lock_release()
 
+
     def tpc_begin(self, transaction, tid=None, status=' '):
         if self._is_read_only:
             raise POSException.ReadOnlyError()
@@ -294,11 +288,13 @@
             self._ude = user, desc, ext
             self._tstatus = status
 
+            adapter = self._adapter
+            conn, cursor = adapter.open_for_store()
+            self._store_conn, self._store_cursor = conn, cursor
+
             if tid is not None:
                 # get the commit lock and add the transaction now
-                adapter = self._adapter
-                conn, cursor = adapter.open_for_commit()
-                self._store_conn, self._store_cursor = conn, cursor
+                adapter.start_commit(cursor)
                 tid_int = u64(tid)
                 try:
                     adapter.add_transaction(cursor, tid_int, user, desc, ext)
@@ -323,12 +319,13 @@
             raise POSException.StorageError("No transaction in progress")
 
         adapter = self._adapter
-        conn, cursor = adapter.open_for_commit()
-        self._store_conn, self._store_cursor = conn, cursor
+        cursor = self._store_cursor
+        adapter.start_commit(cursor)
         user, desc, ext = self._ude
 
         attempt = 1
         while True:
+            # get the commit lock
             try:
                 # Choose a transaction ID.
                 # Base the transaction ID on the database time,
@@ -362,63 +359,53 @@
         # It is assumed that self._lock_acquire was called before this
         # method was called.
         self._prepared_txn = None
-        tbuf = self._tbuf
-        if tbuf is not None:
-            self._tbuf = None
-            self._pickler = None
-            self._stored_oids = None
-            tbuf.close()
 
 
-    def _send_stored(self):
-        """Send the buffered pickles to the database.
+    def _finish_store(self):
+        """Move stored objects from the temporary table to final storage.
 
         Returns a list of (oid, tid) to be received by
         Connection._handle_serial().
         """
+        assert self._tid is not None
         cursor = self._store_cursor
         adapter = self._adapter
-        tid_int = u64(self._tid)
-        self._pickler = None  # Don't allow any more store operations
-        self._tbuf.seek(0)
-        unpickler = cPickle.Unpickler(self._tbuf)
 
-        serials = []  # [(oid, serial)]
-        prev_tids = adapter.get_object_tids(cursor, self._stored_oids)
-
+        # List conflicting changes.
+        # Try to resolve the conflicts.
+        resolved = set()  # a set of OIDs
         while True:
-            try:
-                oid, serial, md5sum, data = unpickler.load()
-            except EOFError:
+            conflict = adapter.detect_conflict(cursor)
+            if conflict is None:
                 break
-            oid_int = u64(oid)
-            if not serial:
-                serial = z64
 
-            prev_tid_int = prev_tids.get(oid_int)
-            if prev_tid_int is None:
-                prev_tid_int = 0
-                prev_tid = z64
+            oid_int, prev_tid_int, serial_int, data = conflict
+            oid = p64(oid_int)
+            prev_tid = p64(prev_tid_int)
+            serial = p64(serial_int)
+
+            rdata = self.tryToResolveConflict(oid, prev_tid, serial, data)
+            if rdata is None:
+                raise POSException.ConflictError(
+                    oid=oid, serials=(prev_tid, serial), data=data)
             else:
-                prev_tid = p64(prev_tid_int)
-                if prev_tid != serial:
-                    # conflict detected
-                    rdata = self.tryToResolveConflict(
-                        oid, prev_tid, serial, data)
-                    if rdata is None:
-                        raise POSException.ConflictError(
-                            oid=oid, serials=(prev_tid, serial), data=data)
-                    else:
-                        data = rdata
-                        md5sum = md5.new(data).hexdigest()
+                data = rdata
+                md5sum = md5.new(data).hexdigest()
+                self._adapter.replace_temp(
+                    cursor, oid_int, prev_tid_int, md5sum, data)
+                resolved.add(oid)
 
-            self._adapter.store(
-                cursor, oid_int, tid_int, prev_tid_int, md5sum, data)
-
-            if prev_tid and serial != prev_tid:
-                serials.append((oid, ConflictResolution.ResolvedSerial))
+        # Move the data
+        tid_int = u64(self._tid)
+        serials = []
+        oid_ints = adapter.move_from_temp(cursor, tid_int)
+        for oid_int in oid_ints:
+            oid = p64(oid_int)
+            if oid in resolved:
+                serial = ConflictResolution.ResolvedSerial
             else:
-                serials.append((oid, self._tid))
+                serial = self._tid
+            serials.append((oid, serial))
 
         return serials
 
@@ -440,11 +427,7 @@
         cursor = self._store_cursor
         assert cursor is not None
 
-        if self._tbuf is not None:
-            serials = self._send_stored()
-        else:
-            serials = []
-
+        serials = self._finish_store()
         self._adapter.update_current(cursor, tid_int)
         self._prepared_txn = self._adapter.commit_phase1(cursor, tid_int)
         # From the point of view of self._store_cursor,
@@ -700,7 +683,7 @@
     propagate_invalidations = False
 
     def __init__(self, parent, zodb_conn):
-        # self._conn = conn
+        # self._zodb_conn = zodb_conn
         RelStorage.__init__(self, adapter=parent._adapter, name=parent._name,
             read_only=parent._is_read_only, create=False)
         # _prev_polled_tid contains the tid at the previous poll

Modified: relstorage/trunk/relstorage/tests/reltestbase.py
===================================================================
--- relstorage/trunk/relstorage/tests/reltestbase.py	2008-01-30 00:40:34 UTC (rev 83310)
+++ relstorage/trunk/relstorage/tests/reltestbase.py	2008-01-30 06:55:51 UTC (rev 83311)
@@ -189,6 +189,15 @@
         finally:
             self._storage = root_storage
 
+    def check16KObject(self):
+        # Store 16 * 1024 bytes in an object, then retrieve it
+        data = 'a 16 byte string' * 1024
+        oid = self._storage.new_oid()
+        self._dostoreNP(oid, data=data)
+        got, serialno = self._storage.load(oid, '')
+        self.assertEqual(len(got), len(data))
+        self.assertEqual(got, data)
+
     def check16MObject(self):
         # Store 16 * 1024 * 1024 bytes in an object, then retrieve it
         data = 'a 16 byte string' * (1024 * 1024)



More information about the Checkins mailing list