[Checkins] SVN: relstorage/ Added relstorage
Shane Hathaway
shane at hathawaymix.org
Sat Jan 26 19:27:45 EST 2008
Log message for revision 83260:
Added relstorage
Changed:
A relstorage/
A relstorage/CHANGELOG.txt
A relstorage/README.txt
A relstorage/__init__.py
A relstorage/adapters/
A relstorage/adapters/__init__.py
A relstorage/adapters/oracle.py
A relstorage/adapters/postgresql.py
A relstorage/autotemp.py
A relstorage/component.xml
A relstorage/config.py
A relstorage/notes/
A relstorage/notes/oracle_notes.txt
A relstorage/notes/pack_policy.ods
A relstorage/poll-invalidation-1-zodb-3-7-1.patch
A relstorage/relstorage.py
A relstorage/tests/
A relstorage/tests/.cvsignore
A relstorage/tests/__init__.py
A relstorage/tests/comparison.ods
A relstorage/tests/reltestbase.py
A relstorage/tests/speedtest.py
A relstorage/tests/testoracle.py
A relstorage/tests/testpostgresql.py
-=-
Added: relstorage/CHANGELOG.txt
===================================================================
--- relstorage/CHANGELOG.txt (rev 0)
+++ relstorage/CHANGELOG.txt 2008-01-27 00:27:45 UTC (rev 83260)
@@ -0,0 +1,75 @@
+
+relstorage 0.9
+
+- Renamed to reflect expanding database support.
+
+- Support for Oracle added.
+
+- Major overhaul with many scalability and reliability improvements,
+ particularly in the area of packing.
+
+- Moved to svn.zope.org and switched to ZPL 2.1 (required for projects
+ on svn.zope.org.)
+
+
+PGStorage 0.4
+
+- Began using the PostgreSQL LISTEN and NOTIFY statements as a shortcut
+ for invalidation polling.
+
+- Removed the commit_order code. The commit_order idea was intended to
+ allow concurrent commits, but that idea is a little too ambitious while
+ other more important ideas are being tested. Something like it may
+ come later.
+
+- Improved connection management: only one database connection is
+ held continuously open per storage instance.
+
+- Reconnect to the database automatically.
+
+- Removed test mode.
+
+- Switched from using a ZODB.Connection subclass to a ZODB patch. The
+ Connection class changes in subtle ways too often to subclass reliably;
+ a patch is much safer.
+
+- PostgreSQL 8.1 is now a dependency because PGStorage uses two phase commit.
+
+- Fixed an undo bug. Symptom: attempting to examine the undo log revealed
+ broken pickles. Cause: the extension field was not being wrapped in
+ psycopg2.Binary upon insert. Solution: used psycopg2.Binary.
+ Unfortunately, this doesn't fix existing transactions people have
+ committed. If anyone has any data to keep, fixing the old transactions
+ should be easy.
+
+- Moved from a private CVS repository to Sourceforge.
+ See http://pgstorage.sourceforge.net . Also switched to the MIT license.
+
+- David Pratt added a basic getSize() implementation so that the Zope
+ management interface displays an estimate of the size of the database.
+
+- Turned PGStorage into a top-level package. Python generally makes
+ top-level packages easier to install.
+
+
+PGStorage 0.3
+
+- Made compatible with Zope 3, although an undo bug apparently remains.
+
+
+PGStorage 0.2
+
+- Fixed concurrent commits, which were generating deadlocks. Fixed by
+ adding a special table, "commit_lock", which is used for
+ synchronizing increments of commit_seq (but only at final commit.)
+ If you are upgrading from version 0.1, you need to change your
+ database using the 'psql' prompt:
+
+ create table commit_lock ();
+
+- Added speed tests and an OpenDocument spreadsheet comparing
+ FileStorage / ZEO with PGStorage. PGStorage wins at reading objects
+ and writing a lot of small transactions, while FileStorage / ZEO
+ wins at writing big transactions. Interestingly, they tie when
+ writing a RAM disk.
+
Added: relstorage/README.txt
===================================================================
--- relstorage/README.txt (rev 0)
+++ relstorage/README.txt 2008-01-27 00:27:45 UTC (rev 83260)
@@ -0,0 +1,15 @@
+
+To make Zope store in RelStorage, patch ZODB/Connection.py using the
+provided patch, then add the following to zope.conf, modifying the
+adapter and params lines to fit your needs.
+
+
+%import relstorage
+<zodb_db main>
+ mount-point /
+ <relstorage>
+ adapter postgresql
+ params host=localhost dbname=zodb user=zope password=...
+ </relstorage>
+</zodb_db>
+
Added: relstorage/__init__.py
===================================================================
--- relstorage/__init__.py (rev 0)
+++ relstorage/__init__.py 2008-01-27 00:27:45 UTC (rev 83260)
@@ -0,0 +1,22 @@
+##############################################################################
+#
+# Copyright (c) 2008 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""relstorage package"""
+
+# perform a compatibility test
+from ZODB.Connection import Connection
+
+if not hasattr(Connection, '_poll_invalidations'):
+ raise ImportError('RelStorage requires the invalidation polling '
+ 'patch for ZODB.')
+del Connection
Added: relstorage/adapters/__init__.py
===================================================================
--- relstorage/adapters/__init__.py (rev 0)
+++ relstorage/adapters/__init__.py 2008-01-27 00:27:45 UTC (rev 83260)
@@ -0,0 +1 @@
+"""relstorage.adapters package"""
Added: relstorage/adapters/oracle.py
===================================================================
--- relstorage/adapters/oracle.py (rev 0)
+++ relstorage/adapters/oracle.py 2008-01-27 00:27:45 UTC (rev 83260)
@@ -0,0 +1,1005 @@
+##############################################################################
+#
+# Copyright (c) 2008 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""Oracle adapter for RelStorage."""
+
+import logging
+import os
+import re
+import socket
+import thread
+import time
+import traceback
+import cx_Oracle
+from ZODB.POSException import ConflictError, StorageError, UndoError
+
+log = logging.getLogger("relstorage.oracle")
+
+
+class OracleAdapter(object):
+ """Oracle adapter for RelStorage."""
+
+ def __init__(self, user, password, dsn,
+ arraysize=64, max_connect_attempts=3):
+ self._params = (user, password, dsn)
+ self._arraysize = arraysize
+ self._max_connect_attempts = max_connect_attempts
+
+ def create_schema(self, cursor):
+ """Create the database tables."""
+ stmt = """
+ CREATE TABLE commit_lock (dummy CHAR);
+
+ -- The list of all transactions in the database
+ CREATE TABLE transaction (
+ tid NUMBER(20) NOT NULL PRIMARY KEY,
+ packed CHAR DEFAULT 'N' CHECK (packed IN ('N', 'Y')),
+ username VARCHAR2(255) NOT NULL,
+ description VARCHAR2(4000) NOT NULL,
+ extension RAW(2000)
+ );
+
+ -- Create a special transaction to represent object creation. This
+ -- row is often referenced by object_state.prev_tid, but never by
+ -- object_state.tid.
+ INSERT INTO transaction (tid, username, description)
+ VALUES (0, 'system', 'special transaction for object creation');
+
+ CREATE SEQUENCE zoid_seq;
+
+ -- All object states in all transactions.
+ -- md5 and state can be null to represent object uncreation.
+ CREATE TABLE object_state (
+ zoid NUMBER(20) NOT NULL,
+ tid NUMBER(20) NOT NULL REFERENCES transaction
+ CHECK (tid > 0),
+ PRIMARY KEY (zoid, tid),
+ prev_tid NUMBER(20) NOT NULL REFERENCES transaction,
+ md5 CHAR(32),
+ state BLOB
+ );
+ CREATE INDEX object_state_tid ON object_state (tid);
+
+ -- Pointers to the current object state
+ CREATE TABLE current_object (
+ zoid NUMBER(20) NOT NULL PRIMARY KEY,
+ tid NUMBER(20) NOT NULL,
+ FOREIGN KEY (zoid, tid) REFERENCES object_state
+ );
+
+ -- During packing, an exclusive lock is held on pack_lock.
+ CREATE TABLE pack_lock (dummy CHAR);
+
+ -- A list of referenced OIDs from each object_state.
+ -- This table is populated as needed during packing.
+ -- To prevent unnecessary table locking, it does not use
+ -- foreign keys, which is safe because rows in object_state
+ -- are never modified once committed, and rows are removed
+ -- from object_state only by packing.
+ CREATE TABLE object_ref (
+ zoid NUMBER(20) NOT NULL,
+ tid NUMBER(20) NOT NULL,
+ to_zoid NUMBER(20) NOT NULL
+ );
+ CREATE INDEX object_ref_from ON object_ref (zoid);
+ CREATE INDEX object_ref_tid ON object_ref (tid);
+ CREATE INDEX object_ref_to ON object_ref (to_zoid);
+
+ -- The object_refs_added table tracks whether object_refs has
+ -- been populated for all states in a given transaction.
+ -- An entry is added only when the work is finished.
+ -- To prevent unnecessary table locking, it does not use
+ -- foreign keys, which is safe because object states
+ -- are never added to a transaction once committed, and
+ -- rows are removed from the transaction table only by
+ -- packing.
+ CREATE TABLE object_refs_added (
+ tid NUMBER(20) NOT NULL PRIMARY KEY
+ );
+
+ -- Temporary state during packing:
+ -- The list of objects to pack. If keep is 'N',
+ -- the object and all its revisions will be removed.
+ -- If keep is 'Y', instead of removing the object,
+ -- the pack operation will cut the object's history.
+ -- If keep is 'Y' then the keep_tid field must also be set.
+ -- The keep_tid field specifies which revision to keep within
+ -- the list of packable transactions.
+ CREATE TABLE pack_object (
+ zoid NUMBER(20) NOT NULL PRIMARY KEY,
+ keep CHAR NOT NULL CHECK (keep IN ('N', 'Y')),
+ keep_tid NUMBER(20)
+ );
+ CREATE INDEX pack_object_keep_zoid ON pack_object (keep, zoid);
+ """
+ self._run_script(cursor, stmt)
+
+
+ def _run_script(self, cursor, script, params=()):
+ """Execute a series of statements in the database."""
+ lines = []
+ for line in script.split('\n'):
+ line = line.strip()
+ if not line or line.startswith('--'):
+ continue
+ if line.endswith(';'):
+ line = line[:-1]
+ lines.append(line)
+ stmt = '\n'.join(lines)
+ try:
+ cursor.execute(stmt, params)
+ except:
+ log.warning("script statement failed: %s", stmt)
+ raise
+ lines = []
+ else:
+ lines.append(line)
+ if lines:
+ try:
+ stmt = '\n'.join(lines)
+ cursor.execute(stmt, params)
+ except:
+ log.warning("script statement failed: %s", stmt)
+ raise
+
+
+ def prepare_schema(self):
+ """Create the database schema if it does not already exist."""
+ conn, cursor = self.open()
+ try:
+ try:
+ cursor.execute("""
+ SELECT 1 FROM USER_TABLES WHERE TABLE_NAME = 'OBJECT_STATE'
+ """)
+ if not cursor.fetchall():
+ self.create_schema(cursor)
+ except:
+ conn.rollback()
+ raise
+ else:
+ conn.commit()
+ finally:
+ self.close(conn, cursor)
+
+
+ def zap(self):
+ """Clear all data out of the database.
+
+ Used by the test suite.
+ """
+ conn, cursor = self.open()
+ try:
+ try:
+ stmt = """
+ DELETE FROM object_refs_added;
+ DELETE FROM object_ref;
+ DELETE FROM current_object;
+ DELETE FROM object_state;
+ DELETE FROM transaction;
+ -- Create a transaction to represent object creation.
+ INSERT INTO transaction (tid, username, description) VALUES
+ (0, 'system', 'special transaction for object creation');
+ DROP SEQUENCE zoid_seq;
+ CREATE SEQUENCE zoid_seq;
+ """
+ self._run_script(cursor, stmt)
+ except:
+ conn.rollback()
+ raise
+ else:
+ conn.commit()
+ finally:
+ self.close(conn, cursor)
+
+
+ def open(self, transaction_mode="ISOLATION LEVEL SERIALIZABLE",
+ twophase=False):
+ """Open a database connection and return (conn, cursor)."""
+ try:
+ attempt = 1
+ while True:
+ try:
+ kw = {'twophase': twophase} #, 'threaded': True}
+ conn = cx_Oracle.connect(*self._params, **kw)
+ break
+ except cx_Oracle.DatabaseError, e:
+ log.warning("Connection failed: %s", e)
+ attempt += 1
+ if attempt > self._max_connect_attempts:
+ raise
+ else:
+ # Pause, then try to connect again
+ time.sleep(2**(attempt - 1))
+
+ cursor = conn.cursor()
+ cursor.arraysize = self._arraysize
+ if transaction_mode:
+ cursor.execute("SET TRANSACTION %s" % transaction_mode)
+ return conn, cursor
+
+ except cx_Oracle.OperationalError:
+ log.debug("Unable to connect in %s", repr(self))
+ raise
+
+ def close(self, conn, cursor):
+ """Close both a cursor and connection, ignoring certain errors."""
+ for obj in (cursor, conn):
+ if obj is not None:
+ try:
+ obj.close()
+ except (cx_Oracle.InterfaceError,
+ cx_Oracle.OperationalError):
+ pass
+
+ def open_for_load(self):
+ """Open and initialize a connection for loading objects.
+
+ Returns (conn, cursor).
+ """
+ return self.open('READ ONLY')
+
+ def restart_load(self, cursor):
+ """After a rollback, reinitialize a connection for loading objects."""
+ cursor.execute("SET TRANSACTION READ ONLY")
+
+ def get_object_count(self):
+ """Returns the number of objects in the database"""
+ # The tests expect an exact number, but the code below generates
+ # an estimate, so this is disabled for now.
+ if True:
+ return 0
+ else:
+ conn, cursor = self.open('READ ONLY')
+ try:
+ cursor.execute("""
+ SELECT NUM_ROWS
+ FROM USER_TABLES
+ WHERE TABLE_NAME = 'CURRENT_OBJECT'
+ """)
+ res = cursor.fetchone()[0]
+ if res is None:
+ res = 0
+ else:
+ res = int(res)
+ return res
+ finally:
+ self.close(conn, cursor)
+
+ def get_db_size(self):
+ """Returns the approximate size of the database in bytes"""
+ # May not be possible without access to the dba_* objects
+ return 0
+
+ def load_current(self, cursor, oid):
+ """Returns the current pickle and integer tid for an object.
+
+ oid is an integer. Returns (None, None) if object does not exist.
+ """
+ cursor.execute("""
+ SELECT state, tid
+ FROM current_object
+ JOIN object_state USING(zoid, tid)
+ WHERE zoid = :1
+ """, (oid,))
+ for state, tid in cursor:
+ if state is not None:
+ state = state.read()
+ # else this object's creation has been undone
+ return state, tid
+ return None, None
+
+ def load_revision(self, cursor, oid, tid):
+ """Returns the pickle for an object on a particular transaction.
+
+ Returns None if no such state exists.
+ """
+ cursor.execute("""
+ SELECT state
+ FROM object_state
+ WHERE zoid = :1
+ AND tid = :2
+ """, (oid, tid))
+ for (state,) in cursor:
+ if state is not None:
+ return state.read()
+ return None
+
+ def exists(self, cursor, oid):
+ """Returns a true value if the given object exists."""
+ cursor.execute("SELECT 1 FROM current_object WHERE zoid = :1", (oid,))
+ return len(list(cursor))
+
+ def load_before(self, cursor, oid, tid):
+ """Returns the pickle and tid of an object before transaction tid.
+
+ Returns (None, None) if no earlier state exists.
+ """
+ stmt = """
+ SELECT state, tid
+ FROM object_state
+ WHERE zoid = :oid
+ AND tid = (
+ SELECT MAX(tid)
+ FROM object_state
+ WHERE zoid = :oid
+ AND tid < :tid
+ )
+ """
+ cursor.execute(stmt, {'oid': oid, 'tid': tid})
+ for state, tid in cursor:
+ if state is not None:
+ state = state.read()
+ # else this object's creation has been undone
+ return state, tid
+ return None, None
+
+ def get_object_tid_after(self, cursor, oid, tid):
+ """Returns the tid of the next change after an object revision.
+
+ Returns None if no later state exists.
+ """
+ stmt = """
+ SELECT MIN(tid)
+ FROM object_state
+ WHERE zoid = :1
+ AND tid > :2
+ """
+ cursor.execute(stmt, (oid, tid))
+ rows = cursor.fetchall()
+ if rows:
+ assert len(rows) == 1
+ return rows[0][0]
+ else:
+ return None
+
+ def get_object_tids(self, cursor, oids):
+ """Returns a map containing the current tid for each oid in a list.
+
+ OIDs that do not exist are not included.
+ """
+ # query in chunks to avoid running into a maximum query length
+ chunk_size = 512
+ res = {}
+ for i in xrange(0, len(oids), chunk_size):
+ chunk = oids[i : i + chunk_size]
+ oid_str = ','.join(str(oid) for oid in chunk)
+ stmt = """
+ SELECT zoid, tid FROM current_object WHERE zoid IN (%s)
+ """ % oid_str
+ cursor.execute(stmt)
+ res.update(dict(iter(cursor)))
+ return res
+
+ def open_for_commit(self, cursor=None):
+ """Open and initialize a connection for storing objects.
+
+ Returns (conn, cursor).
+ """
+ if cursor is None:
+ conn, cursor = self.open(transaction_mode=None, twophase=True)
+ else:
+ conn = cursor.connection
+ # Use an xid that is unique to this Oracle connection, assuming
+ # the hostname is unique within the Oracle cluster. The xid
+ # can have up to 64 bytes.
+ xid = '%s-%d-%d' % (socket.gethostname(), os.getpid(), id(conn))
+ conn.begin(0, xid, '0')
+ cursor.execute("LOCK TABLE commit_lock IN EXCLUSIVE MODE")
+ return conn, cursor
+
+ def restart_commit(self, cursor):
+ """Rollback the commit and start over.
+
+ The cursor must be the type created by open_for_commit().
+ """
+ cursor.connection.rollback()
+ self.open_for_commit(cursor)
+
+ def _parse_dsinterval(self, s):
+ """Convert an Oracle dsinterval (as a string) to a float."""
+ mo = re.match(r'([+-]\d+) (\d+):(\d+):([0-9.]+)', s)
+ if not mo:
+ raise ValueError(s)
+ day, hour, min, sec = [float(v) for v in mo.groups()]
+ return day * 86400 + hour * 3600 + min * 60 + sec
+
+ def get_tid_and_time(self, cursor):
+ """Returns the most recent tid and the current database time.
+
+ The database time is the number of seconds since the epoch.
+ """
+ cursor.execute("""
+ SELECT MAX(tid), TO_CHAR(TO_DSINTERVAL(SYSTIMESTAMP - TO_TIMESTAMP_TZ(
+ '1970-01-01 00:00:00 +00:00','YYYY-MM-DD HH24:MI:SS TZH:TZM')))
+ FROM transaction
+ """)
+ tid, now = cursor.fetchone()
+ return tid, self._parse_dsinterval(now)
+
+ def add_transaction(self, cursor, tid, username, description, extension):
+ """Add a transaction.
+
+ Raises ConflictError if the given tid has already been used.
+ """
+ try:
+ stmt = """
+ INSERT INTO transaction
+ (tid, username, description, extension)
+ VALUES (:1, :2, :3, :4)
+ """
+ cursor.execute(stmt, (
+ tid, username or '-', description or '-',
+ cx_Oracle.Binary(extension)))
+ except cx_Oracle.IntegrityError:
+ raise ConflictError
+
+ def store(self, cursor, oid, tid, prev_tid, md5sum, data):
+ """Store an object. May raise ConflictError."""
+ try:
+ cursor.setinputsizes(data=cx_Oracle.BLOB)
+ stmt = """
+ INSERT INTO object_state (zoid, tid, prev_tid, md5, state)
+ VALUES (:oid, :tid, :prev_tid, :md5sum, :data)
+ """
+ cursor.execute(stmt, oid=oid, tid=tid, prev_tid=prev_tid,
+ md5sum=md5sum, data=cx_Oracle.Binary(data))
+ except cx_Oracle.ProgrammingError, e:
+ # This can happen if another thread is currently packing
+ # and prev_tid refers to a transaction being packed.
+ if 'concurrent' in e.args[0]:
+ raise ConflictError(e)
+ else:
+ raise
+
+ def update_current(self, cursor, tid):
+ """Update the current object pointers.
+
+ tid is the integer tid of the transaction being committed.
+ """
+ try:
+ # Insert objects created in this transaction into current_object.
+ stmt = """
+ INSERT INTO current_object (zoid, tid)
+ SELECT zoid, tid FROM object_state
+ WHERE tid = :1
+ AND prev_tid = 0
+ """
+ cursor.execute(stmt, (tid,))
+
+ # Change existing objects.
+ stmt = """
+ UPDATE current_object SET tid = :1
+ WHERE zoid IN (
+ SELECT zoid FROM object_state
+ WHERE tid = :1
+ AND prev_tid != 0
+ )
+ """
+ cursor.execute(stmt, (tid,))
+ except cx_Oracle.ProgrammingError, e:
+ if 'concurrent' in e.args[0]:
+ raise ConflictError(e)
+ else:
+ raise
+
+ def commit_phase1(self, cursor, tid):
+ """Begin a commit. Returns the transaction name.
+
+ This method should guarantee that commit_phase2() will succeed,
+ meaning that if commit_phase2() would raise any error, the error
+ should be raised in commit_phase1() instead.
+ """
+ try:
+ cursor.connection.prepare()
+ return '-' # the transaction name is not important
+ except cx_Oracle.ProgrammingError, e:
+ if 'concurrent' in e.args[0]:
+ raise ConflictError(e)
+ else:
+ raise
+
+ def commit_phase2(self, cursor, txn):
+ """Final transaction commit."""
+ cursor.connection.commit()
+
+ def abort(self, cursor, txn=None):
+ """Abort the commit. If txn is not None, phase 1 is also aborted."""
+ cursor.connection.rollback()
+
+ def new_oid(self, cursor):
+ """Return a new, unused OID."""
+ stmt = "SELECT zoid_seq.nextval FROM DUAL"
+ cursor.execute(stmt)
+ return cursor.fetchone()[0]
+
+
+ def iter_transactions(self, cursor):
+ """Iterate over the transaction log.
+
+ Yields (tid, username, description, extension) for each transaction.
+ """
+ stmt = """
+ SELECT tid, username, description, extension
+ FROM transaction
+ WHERE packed = 'N'
+ AND tid != 0
+ ORDER BY tid desc
+ """
+ cursor.execute(stmt)
+ return iter(cursor)
+
+
+ def iter_object_history(self, cursor, oid):
+ """Iterate over an object's history.
+
+ Raises KeyError if the object does not exist.
+ Yields (tid, username, description, extension, pickle_size)
+ for each modification.
+ """
+ stmt = """
+ SELECT 1 FROM current_object WHERE zoid = :1
+ """
+ cursor.execute(stmt, (oid,))
+ if not cursor.fetchall():
+ raise KeyError(oid)
+
+ stmt = """
+ SELECT tid, username, description, extension, LENGTH(state)
+ FROM transaction
+ JOIN object_state USING (tid)
+ WHERE zoid = :1
+ AND packed = 'N'
+ ORDER BY tid desc
+ """
+ cursor.execute(stmt, (oid,))
+ return iter(cursor)
+
+
+ def hold_pack_lock(self, cursor):
+ """Try to acquire the pack lock.
+
+ Raise an exception if packing or undo is already in progress.
+ """
+ stmt = """
+ LOCK TABLE pack_lock IN EXCLUSIVE MODE NOWAIT
+ """
+ try:
+ cursor.execute(stmt)
+ except cx_Oracle.DatabaseError:
+ raise StorageError('A pack or undo operation is in progress')
+
+
+ def verify_undoable(self, cursor, undo_tid):
+ """Raise UndoError if it is not safe to undo the specified txn."""
+ stmt = """
+ SELECT 1 FROM transaction WHERE tid = :1 AND packed = 'N'
+ """
+ cursor.execute(stmt, (undo_tid,))
+ if not cursor.fetchall():
+ raise UndoError("Transaction not found or packed")
+
+ self.hold_pack_lock(cursor)
+
+ # Rule: we can undo an object if the object's state in the
+ # transaction to undo matches the object's current state.
+ # If any object in the transaction does not fit that rule,
+ # refuse to undo.
+ stmt = """
+ SELECT prev_os.zoid, current_object.tid
+ FROM object_state prev_os
+ JOIN object_state cur_os ON (prev_os.zoid = cur_os.zoid)
+ JOIN current_object ON (cur_os.zoid = current_object.zoid
+ AND cur_os.tid = current_object.tid)
+ WHERE prev_os.tid = :1
+ AND cur_os.md5 != prev_os.md5
+ """
+ cursor.execute(stmt, (undo_tid,))
+ if cursor.fetchmany():
+ raise UndoError(
+ "Some data were modified by a later transaction")
+
+ # Rule: don't allow the creation of the root object to
+ # be undone. It's hard to get it back.
+ stmt = """
+ SELECT 1
+ FROM object_state
+ WHERE tid = :1
+ AND zoid = 0
+ AND prev_tid = 0
+ """
+ cursor.execute(stmt, (undo_tid,))
+ if cursor.fetchall():
+ raise UndoError("Can't undo the creation of the root object")
+
+
+ def undo(self, cursor, undo_tid, self_tid):
+ """Undo a transaction.
+
+ Parameters: "undo_tid", the integer tid of the transaction to undo,
+ and "self_tid", the integer tid of the current transaction.
+
+ Returns the list of OIDs undone.
+ """
+ # Update records produced by earlier undo operations
+ # within this transaction. Change the state, but not
+ # prev_tid, since prev_tid is still correct.
+ # Table names: 'undoing' refers to the transaction being
+ # undone and 'prev' refers to the object state identified
+ # by undoing.prev_tid.
+ stmt = """
+ UPDATE object_state SET (md5, state) = (
+ SELECT prev.md5, prev.state
+ FROM object_state undoing
+ LEFT JOIN object_state prev
+ ON (prev.zoid = undoing.zoid
+ AND prev.tid = undoing.prev_tid)
+ WHERE undoing.tid = :undo_tid
+ AND undoing.zoid = object_state.zoid
+ )
+ WHERE tid = :self_tid
+ AND zoid IN (
+ SELECT zoid FROM object_state WHERE tid = :undo_tid);
+
+ -- Add new undo records.
+
+ INSERT INTO object_state (zoid, tid, prev_tid, md5, state)
+ SELECT undoing.zoid, :self_tid, current_object.tid,
+ prev.md5, prev.state
+ FROM object_state undoing
+ JOIN current_object ON (current_object.zoid = undoing.zoid)
+ LEFT JOIN object_state prev
+ ON (prev.zoid = undoing.zoid
+ AND prev.tid = undoing.prev_tid)
+ WHERE undoing.tid = :undo_tid
+ AND undoing.zoid NOT IN (
+ SELECT zoid FROM object_state WHERE tid = :self_tid);
+ """
+ self._run_script(cursor, stmt,
+ {'undo_tid': undo_tid, 'self_tid': self_tid})
+
+ # List the changed OIDs.
+ stmt = "SELECT zoid FROM object_state WHERE tid = :1"
+ cursor.execute(stmt, (undo_tid,))
+ return [oid_int for (oid_int,) in cursor]
+
+
+ def choose_pack_transaction(self, cursor, pack_point):
+ """Return the transaction before or at the specified pack time.
+
+ Returns None if there is nothing to pack.
+ """
+ stmt = """
+ SELECT MAX(tid)
+ FROM transaction
+ WHERE tid > 0
+ AND tid <= :1
+ AND packed = 'N'
+ """
+ cursor.execute(stmt, (pack_point,))
+ rows = cursor.fetchall()
+ if not rows:
+ # Nothing needs to be packed.
+ return None
+ return rows[0][0]
+
+
+ def pre_pack(self, cursor, pack_tid, get_references):
+ """Decide what to pack.
+
+ tid specifies the most recent transaction to pack.
+
+ get_references is a function that accepts a pickled state and
+ returns a set of OIDs that state refers to.
+ """
+ # Fill object_ref with references from object states
+ # in transactions that will not be packed.
+ self._fill_nonpacked_refs(cursor, pack_tid, get_references)
+
+ args = {'pack_tid': pack_tid}
+
+ # Ensure the temporary pack_object table is clear.
+ cursor.execute("DELETE FROM pack_object")
+
+ # Fill the pack_object table with OIDs that either will be
+ # removed (if nothing references the OID) or whose history will
+ # be cut.
+ stmt = """
+ INSERT INTO pack_object (zoid, keep)
+ SELECT DISTINCT zoid, 'N'
+ FROM object_state
+ WHERE tid <= :pack_tid
+ """
+ cursor.execute(stmt, args)
+
+ # If the root object is in pack_object, keep it.
+ stmt = """
+ UPDATE pack_object SET keep = 'Y'
+ WHERE zoid = 0
+ """
+ cursor.execute(stmt)
+
+ # Keep objects that have been revised since pack_tid.
+ stmt = """
+ UPDATE pack_object SET keep = 'Y'
+ WHERE keep = 'N'
+ AND zoid IN (
+ SELECT zoid
+ FROM current_object
+ WHERE tid > :pack_tid
+ )
+ """
+ cursor.execute(stmt, args)
+
+ # Keep objects that are still referenced by object states in
+ # transactions that will not be packed.
+ stmt = """
+ UPDATE pack_object SET keep = 'Y'
+ WHERE keep = 'N'
+ AND zoid IN (
+ SELECT to_zoid
+ FROM object_ref
+ WHERE tid > :pack_tid
+ )
+ """
+ cursor.execute(stmt, args)
+
+ # Each of the packable objects to be kept might
+ # refer to other objects. If some of those references
+ # include objects currently set to be removed, keep
+ # those objects as well. Do this
+ # repeatedly until all references have been satisfied.
+ while True:
+
+ # Set keep_tid for all pack_object rows with keep = 'Y'.
+ # This must be done before _fill_pack_object_refs examines
+ # references.
+ stmt = """
+ UPDATE pack_object SET keep_tid = (
+ SELECT MAX(tid)
+ FROM object_state
+ WHERE zoid = pack_object.zoid
+ AND tid > 0
+ AND tid <= :pack_tid
+ )
+ WHERE keep = 'Y' AND keep_tid IS NULL
+ """
+ cursor.execute(stmt, args)
+
+ self._fill_pack_object_refs(cursor, get_references)
+
+ stmt = """
+ UPDATE pack_object SET keep = 'Y'
+ WHERE keep = 'N'
+ AND zoid IN (
+ SELECT DISTINCT to_zoid
+ FROM object_ref
+ JOIN pack_object parent ON (
+ object_ref.zoid = parent.zoid)
+ WHERE parent.keep = 'Y'
+ )
+ """
+ cursor.execute(stmt)
+ if not cursor.rowcount:
+ # No new references detected.
+ break
+
+
+ def _fill_nonpacked_refs(self, cursor, pack_tid, get_references):
+ """Fill object_ref for all transactions that will not be packed."""
+ stmt = """
+ SELECT DISTINCT tid
+ FROM object_state
+ WHERE tid > :1
+ AND NOT EXISTS (
+ SELECT 1
+ FROM object_refs_added
+ WHERE tid = object_state.tid
+ )
+ """
+ cursor.execute(stmt, (pack_tid,))
+ for (tid,) in cursor.fetchall():
+ self._add_refs_for_tid(cursor, tid, get_references)
+
+
+ def _fill_pack_object_refs(self, cursor, get_references):
+ """Fill object_ref for all pack_object rows that have keep_tid."""
+ stmt = """
+ SELECT DISTINCT keep_tid
+ FROM pack_object
+ WHERE keep_tid IS NOT NULL
+ AND NOT EXISTS (
+ SELECT 1
+ FROM object_refs_added
+ WHERE tid = keep_tid
+ )
+ """
+ cursor.execute(stmt)
+ for (tid,) in cursor.fetchall():
+ self._add_refs_for_tid(cursor, tid, get_references)
+
+
+ def _add_refs_for_tid(self, cursor, tid, get_references):
+ """Fills object_refs with all states for a transaction.
+ """
+ stmt = """
+ SELECT zoid, state
+ FROM object_state
+ WHERE tid = :1
+ """
+ cursor.execute(stmt, (tid,))
+
+ to_add = [] # [(from_oid, tid, to_oid)]
+ for from_oid, state_file in cursor:
+ if state_file is not None:
+ state = state_file.read()
+ if state is not None:
+ to_oids = get_references(state)
+ for to_oid in to_oids:
+ to_add.append((from_oid, tid, to_oid))
+
+ if to_add:
+ stmt = """
+ INSERT INTO object_ref (zoid, tid, to_zoid)
+ VALUES (:1, :2, :3)
+ """
+ cursor.executemany(stmt, to_add)
+
+ # The references have been computed for this transaction.
+ stmt = """
+ INSERT INTO object_refs_added (tid)
+ VALUES (:1)
+ """
+ cursor.execute(stmt, (tid,))
+
+
+ def pack(self, pack_tid):
+ """Pack. Requires populated pack tables."""
+
+ # Read committed mode is sufficient.
+ conn, cursor = self.open('ISOLATION LEVEL READ COMMITTED')
+ try:
+ try:
+
+ for table in ('object_ref', 'current_object', 'object_state'):
+
+ # Remove objects that are in pack_object and have keep
+ # set to 'N'.
+ stmt = """
+ DELETE FROM %s
+ WHERE zoid IN (
+ SELECT zoid
+ FROM pack_object
+ WHERE keep = 'N'
+ )
+ """ % table
+ cursor.execute(stmt)
+
+ if table != 'current_object':
+ # Cut the history of objects in pack_object that
+ # have keep set to 'Y'.
+ stmt = """
+ DELETE FROM %s
+ WHERE zoid IN (
+ SELECT zoid
+ FROM pack_object
+ WHERE keep = 'Y'
+ )
+ AND tid < (
+ SELECT keep_tid
+ FROM pack_object
+ WHERE zoid = %s.zoid
+ )
+ """ % (table, table)
+ cursor.execute(stmt)
+
+ stmt = """
+ -- Terminate prev_tid chains
+ UPDATE object_state SET prev_tid = 0
+ WHERE tid <= :tid
+ AND prev_tid != 0;
+
+ -- For each tid to be removed, delete the corresponding row in
+ -- object_refs_added.
+ DELETE FROM object_refs_added
+ WHERE tid > 0
+ AND tid <= :tid
+ AND NOT EXISTS (
+ SELECT 1
+ FROM object_state
+ WHERE tid = object_refs_added.tid
+ );
+
+ -- Delete transactions no longer used.
+ DELETE FROM transaction
+ WHERE tid > 0
+ AND tid <= :tid
+ AND NOT EXISTS (
+ SELECT 1
+ FROM object_state
+ WHERE tid = transaction.tid
+ );
+
+ -- Mark the remaining packable transactions as packed
+ UPDATE transaction SET packed = 'Y'
+ WHERE tid > 0
+ AND tid <= :tid
+ AND packed = 'N'
+ """
+ self._run_script(cursor, stmt, {'tid': pack_tid})
+
+ # Clean up
+ cursor.execute("DELETE FROM pack_object")
+
+ except:
+ conn.rollback()
+ raise
+
+ else:
+ conn.commit()
+
+ finally:
+ self.close(conn, cursor)
+
+
+ def poll_invalidations(self, conn, cursor, prev_polled_tid, ignore_tid):
+ """Polls for new transactions.
+
+ conn and cursor must have been created previously by open_for_load().
+ prev_polled_tid is the tid returned at the last poll, or None
+ if this is the first poll. If ignore_tid is not None, changes
+ committed in that transaction will not be included in the list
+ of changed OIDs.
+
+ Returns (changed_oids, new_polled_tid). Raises StorageError
+ if the database has disconnected.
+ """
+ try:
+ # find out the tid of the most recent transaction.
+ stmt = "SELECT MAX(tid) FROM transaction"
+ cursor.execute(stmt)
+ rows = cursor.fetchall()
+ new_polled_tid = rows[0][0]
+
+ if prev_polled_tid is None:
+ # This is the first time the connection has polled.
+ return None, new_polled_tid
+
+ if new_polled_tid == prev_polled_tid:
+ # No transactions have been committed since prev_polled_tid.
+ return (), new_polled_tid
+
+ stmt = "SELECT 1 FROM transaction WHERE tid = :1"
+ cursor.execute(stmt, (prev_polled_tid,))
+ rows = cursor.fetchall()
+ if not rows:
+ # Transaction not found; perhaps it has been packed.
+ # The connection cache needs to be cleared.
+ return None, new_polled_tid
+
+ # Get the list of changed OIDs and return it.
+ stmt = """
+ SELECT DISTINCT zoid
+ FROM object_state
+ JOIN transaction USING (tid)
+ WHERE tid > :1
+ """
+ if ignore_tid is not None:
+ stmt += " AND tid != %d" % ignore_tid
+ cursor.execute(stmt, (prev_polled_tid,))
+ oids = [oid for (oid,) in cursor]
+
+ return oids, new_polled_tid
+
+ except (cx_Oracle.OperationalError, cx_Oracle.InterfaceError):
+ raise StorageError("database disconnected")
+
Added: relstorage/adapters/postgresql.py
===================================================================
--- relstorage/adapters/postgresql.py (rev 0)
+++ relstorage/adapters/postgresql.py 2008-01-27 00:27:45 UTC (rev 83260)
@@ -0,0 +1,978 @@
+##############################################################################
+#
+# Copyright (c) 2008 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""PostgreSQL adapter for RelStorage."""
+
+from base64 import decodestring, encodestring
+import logging
+import psycopg2, psycopg2.extensions
+from ZODB.POSException import ConflictError, StorageError, UndoError
+
+log = logging.getLogger("relstorage.postgresql")
+
+
+# Notes about adapters:
+#
+# An adapter must not hold a connection, cursor, or database state, because
+# RelStorage opens multiple concurrent connections using a single adapter
+# instance.
+# All OID and TID values are integers, not binary strings, except as noted.
+
+
+class PostgreSQLAdapter(object):
+ """PostgreSQL adapter for RelStorage."""
+
+ def __init__(self, dsn=''):
+ self._dsn = dsn
+
+ def create_schema(self, cursor):
+ """Create the database tables."""
+ stmt = """
+ CREATE TABLE commit_lock ();
+
+ -- The list of all transactions in the database
+ CREATE TABLE transaction (
+ tid BIGINT NOT NULL PRIMARY KEY,
+ packed BOOLEAN NOT NULL DEFAULT FALSE,
+ username VARCHAR(255) NOT NULL,
+ description TEXT NOT NULL,
+ extension BYTEA
+ );
+
+ -- Create a special transaction to represent object creation. This
+ -- row is often referenced by object_state.prev_tid, but never by
+ -- object_state.tid.
+ INSERT INTO transaction (tid, username, description)
+ VALUES (0, 'system', 'special transaction for object creation');
+
+ CREATE SEQUENCE zoid_seq;
+
+ -- All object states in all transactions. Note that md5 and state
+ -- can be null to represent object uncreation.
+ CREATE TABLE object_state (
+ zoid BIGINT NOT NULL,
+ tid BIGINT NOT NULL REFERENCES transaction
+ CHECK (tid > 0),
+ PRIMARY KEY (zoid, tid),
+ prev_tid BIGINT NOT NULL REFERENCES transaction,
+ md5 CHAR(32),
+ state BYTEA
+ );
+ CREATE INDEX object_state_tid ON object_state (tid);
+
+ -- Pointers to the current object state
+ CREATE TABLE current_object (
+ zoid BIGINT NOT NULL PRIMARY KEY,
+ tid BIGINT NOT NULL,
+ FOREIGN KEY (zoid, tid) REFERENCES object_state
+ );
+
+ -- During packing, an exclusive lock is held on pack_lock.
+ CREATE TABLE pack_lock ();
+
+ -- A list of referenced OIDs from each object_state.
+ -- This table is populated as needed during packing.
+ -- To prevent unnecessary table locking, it does not use
+ -- foreign keys, which is safe because rows in object_state
+ -- are never modified once committed, and rows are removed
+ -- from object_state only by packing.
+ CREATE TABLE object_ref (
+ zoid BIGINT NOT NULL,
+ tid BIGINT NOT NULL,
+ to_zoid BIGINT NOT NULL
+ );
+ CREATE INDEX object_ref_from ON object_ref (zoid);
+ CREATE INDEX object_ref_tid ON object_ref (tid);
+ CREATE INDEX object_ref_to ON object_ref (to_zoid);
+
+ -- The object_refs_added table tracks whether object_refs has
+ -- been populated for all states in a given transaction.
+ -- An entry is added only when the work is finished.
+ -- To prevent unnecessary table locking, it does not use
+ -- foreign keys, which is safe because object states
+ -- are never added to a transaction once committed, and
+ -- rows are removed from the transaction table only by
+ -- packing.
+ CREATE TABLE object_refs_added (
+ tid BIGINT NOT NULL PRIMARY KEY
+ );
+
+ -- Temporary state during packing:
+ -- The list of objects to pack. If keep is 'N',
+ -- the object and all its revisions will be removed.
+ -- If keep is 'Y', instead of removing the object,
+ -- the pack operation will cut the object's history.
+ -- If keep is 'Y' then the keep_tid field must also be set.
+ -- The keep_tid field specifies which revision to keep within
+ -- the list of packable transactions.
+ CREATE TABLE pack_object (
+ zoid BIGINT NOT NULL PRIMARY KEY,
+ keep BOOLEAN NOT NULL,
+ keep_tid BIGINT
+ );
+ CREATE INDEX pack_object_keep_zoid ON pack_object (keep, zoid);
+ """
+ cursor.execute(stmt)
+
+
+ def prepare_schema(self):
+ """Create the database schema if it does not already exist."""
+ conn, cursor = self.open()
+ try:
+ try:
+ cursor.execute("""
+ SELECT tablename
+ FROM pg_tables
+ WHERE tablename = 'object_state'
+ """)
+ if not cursor.rowcount:
+ self.create_schema(cursor)
+ except:
+ conn.rollback()
+ raise
+ else:
+ conn.commit()
+ finally:
+ self.close(conn, cursor)
+
+
+ def zap(self):
+ """Clear all data out of the database.
+
+ Used by the test suite.
+ """
+ conn, cursor = self.open()
+ try:
+ try:
+ cursor.execute("""
+ TRUNCATE object_refs_added, object_ref, current_object,
+ object_state, transaction;
+ -- Create a special transaction to represent object creation.
+ INSERT INTO transaction (tid, username, description)
+ VALUES (0, '', '');
+ ALTER SEQUENCE zoid_seq START WITH 1;
+ """)
+ except:
+ conn.rollback()
+ raise
+ else:
+ conn.commit()
+ finally:
+ self.close(conn, cursor)
+
+
+ def open(self, isolation=psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE):
+ """Open a database connection and return (conn, cursor)."""
+ try:
+ conn = psycopg2.connect(self._dsn)
+ conn.set_isolation_level(isolation)
+ cursor = conn.cursor()
+ cursor.arraysize = 64
+ except psycopg2.OperationalError:
+ log.debug("Unable to connect in %s", repr(self))
+ raise
+ return conn, cursor
+
+ def close(self, conn, cursor):
+ """Close a connection and cursor, ignoring certain errors.
+ """
+ for obj in (cursor, conn):
+ if obj is not None:
+ try:
+ obj.close()
+ except (psycopg2.InterfaceError,
+ psycopg2.OperationalError):
+ pass
+
+ def open_for_load(self):
+ """Open and initialize a connection for loading objects.
+
+ Returns (conn, cursor).
+ """
+ conn, cursor = self.open()
+ cursor.execute("LISTEN invalidate")
+ return conn, cursor
+
+ def restart_load(self, cursor):
+ """After a rollback, reinitialize a connection for loading objects."""
+ # No re-init necessary
+ pass
+
+ def get_object_count(self):
+ """Returns the number of objects in the database"""
+ # do later
+ return 0
+
+ def get_db_size(self):
+ """Returns the approximate size of the database in bytes"""
+ conn, cursor = self.open()
+ try:
+ cursor.execute("SELECT SUM(relpages) FROM pg_class")
+ # A relative page on postgres is 8K
+ relpages = cursor.fetchone()[0]
+ return relpages * 8 * 1024
+ finally:
+ self.close(conn, cursor)
+
+ def load_current(self, cursor, oid):
+ """Returns the current pickle and integer tid for an object.
+
+ oid is an integer. Returns (None, None) if object does not exist.
+ """
+ cursor.execute("""
+ SELECT encode(state, 'base64'), tid
+ FROM current_object
+ JOIN object_state USING(zoid, tid)
+ WHERE zoid = %s
+ """, (oid,))
+ if cursor.rowcount:
+ assert cursor.rowcount == 1
+ state64, tid = cursor.fetchone()
+ if state64 is not None:
+ state = decodestring(state64)
+ else:
+ # This object's creation has been undone
+ state = None
+ return state, tid
+ else:
+ return None, None
+
+ def load_revision(self, cursor, oid, tid):
+ """Returns the pickle for an object on a particular transaction.
+
+ Returns None if no such state exists.
+ """
+ cursor.execute("""
+ SELECT encode(state, 'base64')
+ FROM object_state
+ WHERE zoid = %s
+ AND tid = %s
+ """, (oid, tid))
+ if cursor.rowcount:
+ assert cursor.rowcount == 1
+ (state64,) = cursor.fetchone()
+ if state64 is not None:
+ return decodestring(state64)
+ return None
+
+ def exists(self, cursor, oid):
+ """Returns a true value if the given object exists."""
+ cursor.execute("SELECT 1 FROM current_object WHERE zoid = %s", (oid,))
+ return cursor.rowcount
+
+ def load_before(self, cursor, oid, tid):
+ """Returns the pickle and tid of an object before transaction tid.
+
+ Returns (None, None) if no earlier state exists.
+ """
+ cursor.execute("""
+ SELECT encode(state, 'base64'), tid
+ FROM object_state
+ WHERE zoid = %s
+ AND tid < %s
+ ORDER BY tid desc
+ LIMIT 1
+ """, (oid, tid))
+ if cursor.rowcount:
+ assert cursor.rowcount == 1
+ state64, tid = cursor.fetchone()
+ if state64 is not None:
+ state = decodestring(state64)
+ else:
+ # The object's creation has been undone
+ state = None
+ return state, tid
+ else:
+ return None, None
+
+ def get_object_tid_after(self, cursor, oid, tid):
+ """Returns the tid of the next change after an object revision.
+
+ Returns None if no later state exists.
+ """
+ stmt = """
+ SELECT tid
+ FROM object_state
+ WHERE zoid = %s
+ AND tid > %s
+ ORDER BY tid
+ LIMIT 1
+ """
+ cursor.execute(stmt, (oid, tid))
+ if cursor.rowcount:
+ assert cursor.rowcount == 1
+ return cursor.fetchone()[0]
+ else:
+ return None
+
+ def get_object_tids(self, cursor, oids):
+ """Returns a map containing the current tid for each oid in a list.
+
+ OIDs that do not exist are not included.
+ """
+ # query in chunks to avoid running into a maximum query length
+ chunk_size = 512
+ res = {}
+ for i in xrange(0, len(oids), chunk_size):
+ chunk = oids[i : i + chunk_size]
+ oid_str = ','.join(str(oid) for oid in chunk)
+ stmt = """
+ SELECT zoid, tid FROM current_object WHERE zoid IN (%s)
+ """ % oid_str
+ cursor.execute(stmt)
+ res.update(dict(iter(cursor)))
+ return res
+
+ def open_for_commit(self):
+ """Open and initialize a connection for storing objects.
+
+ Returns (conn, cursor).
+ """
+ # psycopg2 doesn't support prepared transactions, so
+ # tell psycopg2 to use the autocommit isolation level, but covertly
+ # switch to the serializable isolation level.
+ conn, cursor = self.open(
+ isolation=psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT
+ )
+ cursor.execute("""
+ BEGIN ISOLATION LEVEL SERIALIZABLE;
+ LOCK commit_lock IN EXCLUSIVE MODE
+ """)
+ return conn, cursor
+
+ def restart_commit(self, cursor):
+ """Rollback the commit and start over.
+
+ The cursor must be the type created by open_for_commit().
+ """
+ cursor.execute("""
+ ROLLBACK;
+ BEGIN ISOLATION LEVEL SERIALIZABLE;
+ LOCK commit_lock IN EXCLUSIVE MODE
+ """)
+
+ def get_tid_and_time(self, cursor):
+ """Returns the most recent tid and the current database time.
+
+ The database time is the number of seconds since the epoch.
+ """
+ cursor.execute("""
+ SELECT tid, EXTRACT(EPOCH FROM CURRENT_TIMESTAMP)
+ FROM transaction
+ ORDER BY tid DESC
+ LIMIT 1
+ """)
+ assert cursor.rowcount == 1
+ return cursor.fetchone()
+
+ def add_transaction(self, cursor, tid, username, description, extension):
+ """Add a transaction.
+
+ Raises ConflictError if the given tid has already been used.
+ """
+ try:
+ stmt = """
+ INSERT INTO transaction
+ (tid, username, description, extension)
+ VALUES (%s, %s, %s, decode(%s, 'base64'))
+ """
+ cursor.execute(stmt, (
+ tid, username, description, encodestring(extension)))
+ except psycopg2.IntegrityError, e:
+ raise ConflictError(e)
+
+ def store(self, cursor, oid, tid, prev_tid, md5sum, data):
+ """Store an object. May raise ConflictError."""
+ stmt = """
+ INSERT INTO object_state (zoid, tid, prev_tid, md5, state)
+ VALUES (%s, %s, %s, %s, decode(%s, 'base64'))
+ """
+ try:
+ cursor.execute(stmt, (
+ oid, tid, prev_tid, md5sum, encodestring(data)))
+ except psycopg2.ProgrammingError, e:
+ # This can happen if another thread is currently packing
+ # and prev_tid refers to a transaction being packed.
+ if 'concurrent update' in e.args[0]:
+ raise ConflictError(e)
+ else:
+ raise
+
+ def update_current(self, cursor, tid):
+ """Update the current object pointers.
+
+ tid is the integer tid of the transaction being committed.
+ """
+ try:
+ cursor.execute("""
+ -- Insert objects created in this transaction into current_object.
+ INSERT INTO current_object (zoid, tid)
+ SELECT zoid, tid FROM object_state
+ WHERE tid = %(tid)s
+ AND prev_tid = 0;
+
+ -- Change existing objects. To avoid deadlocks,
+ -- update in OID order.
+ UPDATE current_object SET tid = %(tid)s
+ WHERE zoid IN (
+ SELECT zoid FROM object_state
+ WHERE tid = %(tid)s
+ AND prev_tid != 0
+ ORDER BY zoid
+ )
+ """, {'tid': tid})
+ except psycopg2.ProgrammingError, e:
+ if 'concurrent update' in e.args[0]:
+ raise ConflictError(e)
+ else:
+ raise
+
+ def commit_phase1(self, cursor, tid):
+ """Begin a commit. Returns the transaction name.
+
+ This method should guarantee that commit_phase2() will succeed,
+ meaning that if commit_phase2() would raise any error, the error
+ should be raised in commit_phase1() instead.
+ """
+ try:
+ txn = 'T%d' % tid
+ stmt = "NOTIFY invalidate; PREPARE TRANSACTION %s"
+ cursor.execute(stmt, (txn,))
+ return txn
+ except psycopg2.ProgrammingError, e:
+ if 'concurrent update' in e.args[0]:
+ raise ConflictError(e)
+ else:
+ raise
+
+ def commit_phase2(self, cursor, txn):
+ """Final transaction commit."""
+ cursor.execute('COMMIT PREPARED %s', (txn,))
+
+ def abort(self, cursor, txn=None):
+ """Abort the commit. If txn is not None, phase 1 is also aborted."""
+ if txn is not None:
+ cursor.execute('ROLLBACK PREPARED %s', (txn,))
+ else:
+ cursor.execute('ROLLBACK')
+
+ def new_oid(self, cursor):
+ """Return a new, unused OID."""
+ stmt = "SELECT NEXTVAL('zoid_seq')"
+ cursor.execute(stmt)
+ return cursor.fetchone()[0]
+
+
+ def iter_transactions(self, cursor):
+ """Iterate over the transaction log.
+
+ Yields (tid, username, description, extension) for each transaction.
+ """
+ stmt = """
+ SELECT tid, username, description, extension
+ FROM transaction
+ WHERE packed = FALSE
+ AND tid != 0
+ ORDER BY tid desc
+ """
+ cursor.execute(stmt)
+ return iter(cursor)
+
+
+ def iter_object_history(self, cursor, oid):
+ """Iterate over an object's history.
+
+ Raises KeyError if the object does not exist.
+ Yields (tid, username, description, extension, pickle_size)
+ for each modification.
+ """
+ stmt = """
+ SELECT 1 FROM current_object WHERE zoid = %s
+ """
+ cursor.execute(stmt, (oid,))
+ if not cursor.rowcount:
+ raise KeyError(oid)
+
+ stmt = """
+ SELECT tid, username, description, extension, OCTET_LENGTH(state)
+ FROM transaction
+ JOIN object_state USING (tid)
+ WHERE zoid = %s
+ AND packed = FALSE
+ ORDER BY tid desc
+ """
+ cursor.execute(stmt, (oid,))
+ return iter(cursor)
+
+
+ def hold_pack_lock(self, cursor):
+ """Try to acquire the pack lock.
+
+ Raise an exception if packing or undo is already in progress.
+ """
+ stmt = """
+ LOCK pack_lock IN EXCLUSIVE MODE NOWAIT
+ """
+ try:
+ cursor.execute(stmt)
+ except psycopg2.DatabaseError:
+ raise StorageError('A pack or undo operation is in progress')
+
+
+ def verify_undoable(self, cursor, undo_tid):
+ """Raise UndoError if it is not safe to undo the specified txn."""
+ stmt = """
+ SELECT 1 FROM transaction WHERE tid = %s AND packed = FALSE
+ """
+ cursor.execute(stmt, (undo_tid,))
+ if not cursor.rowcount:
+ raise UndoError("Transaction not found or packed")
+
+ self.hold_pack_lock(cursor)
+
+ # Rule: we can undo an object if the object's state in the
+ # transaction to undo matches the object's current state.
+ # If any object in the transaction does not fit that rule,
+ # refuse to undo.
+ stmt = """
+ SELECT prev_os.zoid, current_object.tid
+ FROM object_state prev_os
+ JOIN object_state cur_os ON (prev_os.zoid = cur_os.zoid)
+ JOIN current_object ON (cur_os.zoid = current_object.zoid
+ AND cur_os.tid = current_object.tid)
+ WHERE prev_os.tid = %s
+ AND cur_os.md5 != prev_os.md5
+ LIMIT 1
+ """
+ cursor.execute(stmt, (undo_tid,))
+ if cursor.rowcount:
+ raise UndoError(
+ "Some data were modified by a later transaction")
+
+ # Rule: don't allow the creation of the root object to
+ # be undone. It's hard to get it back.
+ stmt = """
+ SELECT 1
+ FROM object_state
+ WHERE tid = %s
+ AND zoid = 0
+ AND prev_tid = 0
+ """
+ cursor.execute(stmt, (undo_tid,))
+ if cursor.rowcount:
+ raise UndoError("Can't undo the creation of the root object")
+
+
+ def undo(self, cursor, undo_tid, self_tid):
+ """Undo a transaction.
+
+ Parameters: "undo_tid", the integer tid of the transaction to undo,
+ and "self_tid", the integer tid of the current transaction.
+
+ Returns the list of OIDs undone.
+ """
+ # Update records produced by earlier undo operations
+ # within this transaction. Change the state, but not
+ # prev_tid, since prev_tid is still correct.
+ # Table names: 'undoing' refers to the transaction being
+ # undone and 'prev' refers to the object state identified
+ # by undoing.prev_tid.
+ stmt = """
+ UPDATE object_state SET state = (
+ SELECT prev.state
+ FROM object_state undoing
+ LEFT JOIN object_state prev
+ ON (prev.zoid = undoing.zoid
+ AND prev.tid = undoing.prev_tid)
+ WHERE undoing.tid = %(undo_tid)s
+ AND undoing.zoid = object_state.zoid
+ ),
+ md5 = (
+ SELECT prev.md5
+ FROM object_state undoing
+ LEFT JOIN object_state prev
+ ON (prev.zoid = undoing.zoid
+ AND prev.tid = undoing.prev_tid)
+ WHERE undoing.tid = %(undo_tid)s
+ AND undoing.zoid = object_state.zoid
+ )
+ WHERE tid = %(self_tid)s
+ AND zoid IN (
+ SELECT zoid FROM object_state WHERE tid = %(undo_tid)s);
+
+ -- Add new undo records.
+
+ INSERT INTO object_state (zoid, tid, prev_tid, md5, state)
+ SELECT undoing.zoid, %(self_tid)s, current_object.tid,
+ prev.md5, prev.state
+ FROM object_state undoing
+ JOIN current_object USING (zoid)
+ LEFT JOIN object_state prev
+ ON (prev.zoid = undoing.zoid
+ AND prev.tid = undoing.prev_tid)
+ WHERE undoing.tid = %(undo_tid)s
+ AND undoing.zoid NOT IN (
+ SELECT zoid FROM object_state WHERE tid = %(self_tid)s);
+
+ -- List the changed OIDs.
+
+ SELECT zoid FROM object_state WHERE tid = %(undo_tid)s
+ """
+ cursor.execute(stmt, {'undo_tid': undo_tid, 'self_tid': self_tid})
+ return [oid_int for (oid_int,) in cursor]
+
+
+ def choose_pack_transaction(self, cursor, pack_point):
+ """Return the transaction before or at the specified pack time.
+
+ Returns None if there is nothing to pack.
+ """
+ stmt = """
+ SELECT tid
+ FROM transaction
+ WHERE tid > 0 AND tid <= %s
+ AND packed = FALSE
+ ORDER BY tid desc
+ LIMIT 1
+ """
+ cursor.execute(stmt, (pack_point,))
+ if not cursor.rowcount:
+ # Nothing needs to be packed.
+ return None
+ assert cursor.rowcount == 1
+ return cursor.fetchone()[0]
+
+
+ def pre_pack(self, cursor, pack_tid, get_references):
+ """Decide what to pack.
+
+ tid specifies the most recent transaction to pack.
+
+ get_references is a function that accepts a pickled state and
+ returns a set of OIDs that state refers to.
+ """
+ # Fill object_ref with references from object states
+ # in transactions that will not be packed.
+ self._fill_nonpacked_refs(cursor, pack_tid, get_references)
+
+ # Ensure the temporary pack_object table is clear.
+ cursor.execute("TRUNCATE pack_object")
+
+ args = {'pack_tid': pack_tid}
+
+ # Fill the pack_object table with OIDs that either will be
+ # removed (if nothing references the OID) or whose history will
+ # be cut.
+ stmt = """
+ INSERT INTO pack_object (zoid, keep)
+ SELECT DISTINCT zoid, false
+ FROM object_state
+ WHERE tid <= %(pack_tid)s
+ """
+ cursor.execute(stmt, args)
+
+ # If the root object is in pack_object, keep it.
+ stmt = """
+ UPDATE pack_object SET keep = true
+ WHERE zoid = 0
+ """
+ cursor.execute(stmt)
+
+ # Keep objects that have been revised since pack_tid.
+ stmt = """
+ UPDATE pack_object SET keep = true
+ WHERE keep = false
+ AND zoid IN (
+ SELECT zoid
+ FROM current_object
+ WHERE tid > %(pack_tid)s
+ )
+ """
+ cursor.execute(stmt, args)
+
+ # Keep objects that are still referenced by object states in
+ # transactions that will not be packed.
+ stmt = """
+ UPDATE pack_object SET keep = true
+ WHERE keep = false
+ AND zoid IN (
+ SELECT to_zoid
+ FROM object_ref
+ WHERE tid > %(pack_tid)s
+ )
+ """
+ cursor.execute(stmt, args)
+
+ # Each of the packable objects to be kept might
+ # refer to other objects. If some of those references
+ # include objects currently set to be removed, keep
+ # those objects as well. Do this
+ # repeatedly until all references have been satisfied.
+ while True:
+
+ # Set keep_tid for all pack_object rows with keep = 'Y'.
+ # This must be done before _fill_pack_object_refs examines
+ # references.
+ stmt = """
+ UPDATE pack_object SET keep_tid = (
+ SELECT tid
+ FROM object_state
+ WHERE zoid = pack_object.zoid
+ AND tid > 0
+ AND tid <= %(pack_tid)s
+ ORDER BY tid DESC
+ LIMIT 1
+ )
+ WHERE keep = true AND keep_tid IS NULL
+ """
+ cursor.execute(stmt, args)
+
+ self._fill_pack_object_refs(cursor, get_references)
+
+ stmt = """
+ UPDATE pack_object SET keep = true
+ WHERE keep = false
+ AND zoid IN (
+ SELECT DISTINCT to_zoid
+ FROM object_ref
+ JOIN pack_object parent ON (
+ object_ref.zoid = parent.zoid)
+ WHERE parent.keep = true
+ )
+ """
+ cursor.execute(stmt)
+ if not cursor.rowcount:
+ # No new references detected.
+ break
+
+
+ def _fill_nonpacked_refs(self, cursor, pack_tid, get_references):
+ """Fill object_ref for all transactions that will not be packed."""
+ stmt = """
+ SELECT DISTINCT tid
+ FROM object_state
+ WHERE tid > %s
+ AND NOT EXISTS (
+ SELECT 1
+ FROM object_refs_added
+ WHERE tid = object_state.tid
+ )
+ """
+ cursor.execute(stmt, (pack_tid,))
+ for (tid,) in cursor.fetchall():
+ self._add_refs_for_tid(cursor, tid, get_references)
+
+
+ def _fill_pack_object_refs(self, cursor, get_references):
+ """Fill object_ref for all pack_object rows that have keep_tid."""
+ stmt = """
+ SELECT DISTINCT keep_tid
+ FROM pack_object
+ WHERE keep_tid IS NOT NULL
+ AND NOT EXISTS (
+ SELECT 1
+ FROM object_refs_added
+ WHERE tid = keep_tid
+ )
+ """
+ cursor.execute(stmt)
+ for (tid,) in cursor.fetchall():
+ self._add_refs_for_tid(cursor, tid, get_references)
+
+
+ def _add_refs_for_tid(self, cursor, tid, get_references):
+ """Fills object_refs with all states for a transaction.
+ """
+ stmt = """
+ SELECT zoid, encode(state, 'base64')
+ FROM object_state
+ WHERE tid = %s
+ """
+ cursor.execute(stmt, (tid,))
+
+ to_add = [] # [(from_oid, tid, to_oid)]
+ for from_oid, state64 in cursor:
+ if state64 is not None:
+ state = decodestring(state64)
+ to_oids = get_references(state)
+ for to_oid in to_oids:
+ to_add.append((from_oid, tid, to_oid))
+
+ if to_add:
+ stmt = """
+ INSERT INTO object_ref (zoid, tid, to_zoid)
+ VALUES (%s, %s, %s)
+ """
+ cursor.executemany(stmt, to_add)
+
+ # The references have been computed for this transaction.
+ stmt = """
+ INSERT INTO object_refs_added (tid)
+ VALUES (%s)
+ """
+ cursor.execute(stmt, (tid,))
+
+
+ def pack(self, pack_tid):
+ """Pack. Requires populated pack tables."""
+
+ # Read committed mode is sufficient.
+ conn, cursor = self.open(
+ isolation=psycopg2.extensions.ISOLATION_LEVEL_READ_COMMITTED)
+ try:
+ try:
+
+ for table in ('object_ref', 'current_object', 'object_state'):
+
+ # Remove objects that are in pack_object and have keep
+ # set to false.
+ stmt = """
+ DELETE FROM %s
+ WHERE zoid IN (
+ SELECT zoid
+ FROM pack_object
+ WHERE keep = false
+ )
+ """ % table
+ cursor.execute(stmt)
+
+ if table != 'current_object':
+ # Cut the history of objects in pack_object that
+ # have keep set to true.
+ stmt = """
+ DELETE FROM %s
+ WHERE zoid IN (
+ SELECT zoid
+ FROM pack_object
+ WHERE keep = true
+ )
+ AND tid < (
+ SELECT keep_tid
+ FROM pack_object
+ WHERE zoid = %s.zoid
+ )
+ """ % (table, table)
+ cursor.execute(stmt)
+
+ stmt = """
+ -- Terminate prev_tid chains
+ UPDATE object_state SET prev_tid = 0
+ WHERE tid <= %(tid)s
+ AND prev_tid != 0;
+
+ -- For each tid to be removed, delete the corresponding row in
+ -- object_refs_added.
+ DELETE FROM object_refs_added
+ WHERE tid > 0
+ AND tid <= %(tid)s
+ AND NOT EXISTS (
+ SELECT 1
+ FROM object_state
+ WHERE tid = object_refs_added.tid
+ );
+
+ -- Delete transactions no longer used.
+ DELETE FROM transaction
+ WHERE tid > 0
+ AND tid <= %(tid)s
+ AND NOT EXISTS (
+ SELECT 1
+ FROM object_state
+ WHERE tid = transaction.tid
+ );
+
+ -- Mark the remaining packable transactions as packed
+ UPDATE transaction SET packed = true
+ WHERE tid > 0
+ AND tid <= %(tid)s
+ AND packed = false
+ """
+ cursor.execute(stmt, {'tid': pack_tid})
+
+ # Clean up
+ cursor.execute("TRUNCATE pack_object")
+
+ except:
+ conn.rollback()
+ raise
+
+ else:
+ conn.commit()
+
+ finally:
+ self.close(conn, cursor)
+
+
+ def poll_invalidations(self, conn, cursor, prev_polled_tid, ignore_tid):
+ """Polls for new transactions.
+
+ conn and cursor must have been created previously by open_for_load().
+ prev_polled_tid is the tid returned at the last poll, or None
+ if this is the first poll. If ignore_tid is not None, changes
+ committed in that transaction will not be included in the list
+ of changed OIDs.
+
+ Returns (changed_oids, new_polled_tid). Raises StorageError
+ if the database has disconnected.
+ """
+ try:
+ if prev_polled_tid is not None:
+ if not cursor.isready():
+ # No invalidate notifications arrived.
+ return (), prev_polled_tid
+ del conn.notifies[:]
+
+ # find out the tid of the most recent transaction.
+ stmt = """
+ SELECT tid
+ FROM transaction
+ ORDER BY tid desc
+ LIMIT 1
+ """
+ cursor.execute(stmt)
+ # Expect the transaction table to always have at least one row.
+ assert cursor.rowcount == 1
+ new_polled_tid = cursor.fetchone()[0]
+
+ if prev_polled_tid is None:
+ # This is the first time the connection has polled.
+ return None, new_polled_tid
+
+ if new_polled_tid == prev_polled_tid:
+ # No transactions have been committed since prev_polled_tid.
+ return (), new_polled_tid
+
+ stmt = "SELECT 1 FROM transaction WHERE tid = %s"
+ cursor.execute(stmt, (prev_polled_tid,))
+ if not cursor.rowcount:
+ # Transaction not found; perhaps it has been packed.
+ # The connection cache needs to be cleared.
+ return None, new_polled_tid
+
+ # Get the list of changed OIDs and return it.
+ stmt = """
+ SELECT DISTINCT zoid
+ FROM object_state
+ JOIN transaction USING (tid)
+ WHERE tid > %s
+ """
+ if ignore_tid is not None:
+ stmt += " AND tid != %d" % ignore_tid
+ cursor.execute(stmt, (prev_polled_tid,))
+ oids = [oid for (oid,) in cursor]
+
+ return oids, new_polled_tid
+
+ except (psycopg2.OperationalError, psycopg2.InterfaceError):
+ raise StorageError("database disconnected")
+
Added: relstorage/autotemp.py
===================================================================
--- relstorage/autotemp.py (rev 0)
+++ relstorage/autotemp.py 2008-01-27 00:27:45 UTC (rev 83260)
@@ -0,0 +1,43 @@
+##############################################################################
+#
+# Copyright (c) 2008 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""A temporary file that switches from StringIO to TemporaryFile if needed.
+
+This could be a useful addition to Python's tempfile module.
+"""
+
+from cStringIO import StringIO
+import tempfile
+
+class AutoTemporaryFile:
+ """Initially a StringIO, but becomes a TemporaryFile if it grows too big"""
+ def __init__(self, threshold=10485760):
+ self._threshold = threshold
+ self._f = f = StringIO()
+ # delegate most methods
+ for m in ('read', 'readline', 'seek', 'tell', 'close'):
+ setattr(self, m, getattr(f, m))
+
+ def write(self, data):
+ threshold = self._threshold
+ if threshold > 0 and self.tell() + len(data) >= threshold:
+ # convert to TemporaryFile
+ f = tempfile.TemporaryFile()
+ f.write(self._f.getvalue())
+ f.seek(self.tell())
+ self._f = f
+ self._threshold = 0
+ # delegate all important methods
+ for m in ('write', 'read', 'readline', 'seek', 'tell', 'close'):
+ setattr(self, m, getattr(f, m))
+ self._f.write(data)
Added: relstorage/component.xml
===================================================================
--- relstorage/component.xml (rev 0)
+++ relstorage/component.xml 2008-01-27 00:27:45 UTC (rev 83260)
@@ -0,0 +1,64 @@
+<?xml version="1.0"?>
+
+<!-- RelStorage configuration via ZConfig -->
+
+<component prefix="relstorage.config">
+
+ <import package="ZODB"/>
+ <abstracttype name="relstorage.adapter"/>
+
+ <sectiontype name="relstorage" implements="ZODB.storage"
+ datatype=".RelStorageFactory">
+ <section type="relstorage.adapter" name="*" attribute="adapter"/>
+ <key name="name" default="RelStorage"/>
+ <key name="create" datatype="boolean" default="true">
+ <description>
+ Flag that indicates whether the storage should be initialized if
+ it does not already exist.
+ </description>
+ </key>
+ <key name="read-only" datatype="boolean" default="false">
+ <description>
+ If true, only reads may be executed against the storage. Note
+ that the "pack" operation is not considered a write operation
+ and is still allowed on a read-only filestorage.
+ </description>
+ </key>
+ </sectiontype>
+
+ <sectiontype name="postgresql" implements="relstorage.adapter"
+ datatype=".PostgreSQLAdapterFactory">
+ <key name="dsn" datatype="string" required="no" default="">
+ <description>
+ The PostgreSQL data source name. For example:
+
+ dsn dbname='template1' user='user' host='localhost' password='pass'
+
+ If dsn is omitted, the adapter will connect to a local database with
+ no password. Both the user and database name will match the
+ name of the owner of the current process.
+ </description>
+ </key>
+ </sectiontype>
+
+ <sectiontype name="oracle" implements="relstorage.adapter"
+ datatype=".OracleAdapterFactory">
+ <key name="user" datatype="string" required="yes">
+ <description>
+ The Oracle account name
+ </description>
+ </key>
+ <key name="password" datatype="string" required="yes">
+ <description>
+ The Oracle account password
+ </description>
+ </key>
+ <key name="dsn" datatype="string" required="yes">
+ <description>
+ The Oracle data source name. The Oracle client library will
+ normally expect to find the DSN in /etc/oratab.
+ </description>
+ </key>
+ </sectiontype>
+
+</component>
Added: relstorage/config.py
===================================================================
--- relstorage/config.py (rev 0)
+++ relstorage/config.py 2008-01-27 00:27:45 UTC (rev 83260)
@@ -0,0 +1,41 @@
+##############################################################################
+#
+# Copyright (c) 2008 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""ZConfig directive implementations for binding RelStorage to Zope"""
+
+from ZODB.config import BaseConfig
+
+from relstorage import RelStorage
+
+
+class RelStorageFactory(BaseConfig):
+ """Open a storage configured via ZConfig"""
+ def open(self):
+ config = self.config
+ adapter = config.adapter.open()
+ return RelStorage(adapter, name=config.name, create=config.create,
+ read_only=config.read_only)
+
+
+class PostgreSQLAdapterFactory(BaseConfig):
+ def open(self):
+ from adapters.postgresql import PostgreSQLAdapter
+ return PostgreSQLAdapter(self.config.dsn)
+
+
+class OracleAdapterFactory(BaseConfig):
+ def open(self):
+ from adapters.oracle import OracleAdapter
+ config = self.config
+ return OracleAdapter(config.user, config.password, config.dsn)
+
Added: relstorage/notes/oracle_notes.txt
===================================================================
--- relstorage/notes/oracle_notes.txt (rev 0)
+++ relstorage/notes/oracle_notes.txt 2008-01-27 00:27:45 UTC (rev 83260)
@@ -0,0 +1,9 @@
+
+Docs:
+ http://www.oracle.com/pls/db102/homepage
+
+
+Manually rollback an in-dispute transaction:
+ select local_tran_id, state from DBA_2PC_PENDING;
+ rollback force '$local_tran_id';
+
Added: relstorage/notes/pack_policy.ods
===================================================================
(Binary files differ)
Property changes on: relstorage/notes/pack_policy.ods
___________________________________________________________________
Name: svn:mime-type
+ application/octet-stream
Added: relstorage/poll-invalidation-1-zodb-3-7-1.patch
===================================================================
--- relstorage/poll-invalidation-1-zodb-3-7-1.patch (rev 0)
+++ relstorage/poll-invalidation-1-zodb-3-7-1.patch 2008-01-27 00:27:45 UTC (rev 83260)
@@ -0,0 +1,94 @@
+diff -r 34747fbd09ec Connection.py
+--- a/Connection.py Tue Nov 20 21:57:31 2007 -0700
++++ b/Connection.py Fri Jan 11 21:19:00 2008 -0700
+@@ -75,8 +75,14 @@ class Connection(ExportImport, object):
+ """Create a new Connection."""
+
+ self._db = db
+- self._normal_storage = self._storage = db._storage
+- self.new_oid = db._storage.new_oid
++ storage = db._storage
++ m = getattr(storage, 'bind_connection', None)
++ if m is not None:
++ # Use a storage instance bound to this connection.
++ storage = m(self)
++
++ self._normal_storage = self._storage = storage
++ self.new_oid = storage.new_oid
+ self._savepoint_storage = None
+
+ self.transaction_manager = self._synch = self._mvcc = None
+@@ -170,6 +176,12 @@ class Connection(ExportImport, object):
+ # Multi-database support
+ self.connections = {self._db.database_name: self}
+
++ # Allow the storage to decide whether invalidations should
++ # propagate between connections. If the storage provides MVCC
++ # semantics, it is better to not propagate invalidations between
++ # connections.
++ self._propagate_invalidations = getattr(
++ self._storage, 'propagate_invalidations', True)
+
+ def add(self, obj):
+ """Add a new object 'obj' to the database and assign it an oid."""
+@@ -267,6 +279,11 @@ class Connection(ExportImport, object):
+ self.transaction_manager.unregisterSynch(self)
+ self._synch = None
+
++ # If the storage wants to know, tell it this connection is closing.
++ m = getattr(self._storage, 'connection_closing', None)
++ if m is not None:
++ m()
++
+ if primary:
+ for connection in self.connections.values():
+ if connection is not self:
+@@ -295,6 +312,10 @@ class Connection(ExportImport, object):
+
+ def invalidate(self, tid, oids):
+ """Notify the Connection that transaction 'tid' invalidated oids."""
++ if not self._propagate_invalidations:
++ # The storage disabled inter-connection invalidation.
++ return
++
+ self._inv_lock.acquire()
+ try:
+ if self._txn_time is None:
+@@ -438,8 +459,23 @@ class Connection(ExportImport, object):
+ self._registered_objects = []
+ self._creating.clear()
+
++ def _poll_invalidations(self):
++ """Poll and process object invalidations provided by the storage.
++ """
++ m = getattr(self._storage, 'poll_invalidations', None)
++ if m is not None:
++ # Poll the storage for invalidations.
++ invalidated = m()
++ if invalidated is None:
++ # special value: the transaction is so old that
++ # we need to flush the whole cache.
++ self._cache.invalidate(self._cache.cache_data.keys())
++ elif invalidated:
++ self._cache.invalidate(invalidated)
++
+ # Process pending invalidations.
+ def _flush_invalidations(self):
++ self._poll_invalidations()
+ self._inv_lock.acquire()
+ try:
+ # Non-ghostifiable objects may need to read when they are
+diff -r 34747fbd09ec DB.py
+--- a/DB.py Tue Nov 20 21:57:31 2007 -0700
++++ b/DB.py Wed Nov 28 18:33:12 2007 -0700
+@@ -260,6 +260,10 @@ class DB(object):
+ storage.store(z64, None, file.getvalue(), '', t)
+ storage.tpc_vote(t)
+ storage.tpc_finish(t)
++ if hasattr(storage, 'connection_closing'):
++ # Let the storage release whatever resources it used for loading
++ # the root object.
++ storage.connection_closing()
+
+ # Multi-database setup.
+ if databases is None:
Added: relstorage/relstorage.py
===================================================================
--- relstorage/relstorage.py (rev 0)
+++ relstorage/relstorage.py 2008-01-27 00:27:45 UTC (rev 83260)
@@ -0,0 +1,804 @@
+##############################################################################
+#
+# Copyright (c) 2008 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""The core of RelStorage, a ZODB storage for relational databases.
+
+Stores pickles in the database.
+"""
+
+import base64
+import cPickle
+import logging
+import md5
+import os
+import time
+import weakref
+from ZODB.utils import p64, u64, z64
+from ZODB.BaseStorage import BaseStorage
+from ZODB import ConflictResolution, POSException
+from persistent.TimeStamp import TimeStamp
+
+from autotemp import AutoTemporaryFile
+
+log = logging.getLogger("relstorage")
+
+# Set the RELSTORAGE_ABORT_EARLY environment variable when debugging
+# a failure revealed by the ZODB test suite. The test suite often fails
+# to call tpc_abort in the event of an error, leading to deadlocks.
+# The variable causes RelStorage to abort failed transactions
+# early rather than wait for an explicit abort.
+abort_early = os.environ.get('RELSTORAGE_ABORT_EARLY')
+
+
+class RelStorage(BaseStorage,
+ ConflictResolution.ConflictResolvingStorage):
+ """Storage to a relational database, based on invalidation polling"""
+
+ def __init__(self, adapter, name='RelStorage', create=True,
+ read_only=False):
+ self._adapter = adapter
+ self._name = name
+ self._is_read_only = read_only
+
+ # load_conn and load_cursor are always open
+ self._load_conn = None
+ self._load_cursor = None
+ self._load_started = False
+ self._open_load_connection()
+ # store_conn and store_cursor are open only during commit
+ self._store_conn = None
+ self._store_cursor = None
+
+ if create:
+ self._adapter.prepare_schema()
+
+ BaseStorage.__init__(self, name)
+
+ self._tid = None
+ self._ltid = None
+
+ # _tbuf is a temporary file that contains pickles of data to be
+ # committed. _pickler writes pickles to that file. _stored_oids
+ # is a list of integer OIDs to be stored.
+ self._tbuf = None
+ self._pickler = None
+ self._stored_oids = None
+
+ # _prepared_txn is the name of the transaction to commit in the
+ # second phase.
+ self._prepared_txn = None
+
+ # _instances is a list of weak references to storage instances bound
+ # to the same database.
+ self._instances = []
+
+ # _closed is True after self.close() is called. Since close()
+ # can be called from another thread, access to self._closed should
+ # be inside a _lock_acquire()/_lock_release() block.
+ self._closed = False
+
+ def _open_load_connection(self):
+ """Open the load connection to the database. Return nothing."""
+ conn, cursor = self._adapter.open_for_load()
+ self._drop_load_connection()
+ self._load_conn, self._load_cursor = conn, cursor
+ self._load_started = True
+
+ def _drop_load_connection(self):
+ conn, cursor = self._load_conn, self._load_cursor
+ self._load_conn, self._load_cursor = None, None
+ self._adapter.close(conn, cursor)
+
+ def _drop_store_connection(self):
+ conn, cursor = self._store_conn, self._store_cursor
+ self._store_conn, self._store_cursor = None, None
+ self._adapter.close(conn, cursor)
+
+ def _rollback_load_connection(self):
+ if self._load_conn is not None:
+ self._load_started = False
+ self._load_conn.rollback()
+
+ def _start_load(self):
+ if self._load_cursor is None:
+ self._open_load_connection()
+ else:
+ self._adapter.restart_load(self._load_cursor)
+ self._load_started = True
+
+ def _zap(self):
+ """Clear all objects out of the database.
+
+ Used by the test suite.
+ """
+ self._adapter.zap()
+ self._rollback_load_connection()
+
+ def close(self):
+ """Close the connections to the database."""
+ self._lock_acquire()
+ try:
+ self._closed = True
+ self._drop_load_connection()
+ self._drop_store_connection()
+ for wref in self._instances:
+ instance = wref()
+ if instance is not None:
+ instance.close()
+ finally:
+ self._lock_release()
+
+ def bind_connection(self, zodb_conn):
+ """Get a connection-bound storage instance.
+
+ Connections have their own storage instances so that
+ the database can provide the MVCC semantics rather than ZODB.
+ """
+ res = BoundRelStorage(self, zodb_conn)
+ self._instances.append(weakref.ref(res))
+ return res
+
+ def connection_closing(self):
+ """Release resources."""
+ self._rollback_load_connection()
+
+ def __len__(self):
+ return self._adapter.get_object_count()
+
+ def getSize(self):
+ """Return database size in bytes"""
+ return self._adapter.get_db_size()
+
+ def load(self, oid, version):
+ self._lock_acquire()
+ try:
+ if not self._load_started:
+ self._start_load()
+ cursor = self._load_cursor
+ state, tid_int = self._adapter.load_current(cursor, u64(oid))
+ finally:
+ self._lock_release()
+ if tid_int is not None:
+ if state:
+ state = str(state)
+ if not state:
+ # This can happen if something attempts to load
+ # an object whose creation has been undone.
+ raise KeyError(oid)
+ return state, p64(tid_int)
+ else:
+ raise KeyError(oid)
+
+ def loadEx(self, oid, version):
+ # Since we don't support versions, just tack the empty version
+ # string onto load's result.
+ return self.load(oid, version) + ("",)
+
+ def loadSerial(self, oid, serial):
+ """Load a specific revision of an object"""
+ self._lock_acquire()
+ try:
+ if self._store_cursor is not None:
+ # Allow loading data from later transactions
+ # for conflict resolution.
+ cursor = self._store_cursor
+ else:
+ if not self._load_started:
+ self._start_load()
+ cursor = self._load_cursor
+ state = self._adapter.load_revision(cursor, u64(oid), u64(serial))
+ if state is not None:
+ state = str(state)
+ if not state:
+ raise POSKeyError(oid)
+ return state
+ else:
+ raise KeyError(oid)
+ finally:
+ self._lock_release()
+
+ def loadBefore(self, oid, tid):
+ """Return the most recent revision of oid before tid committed."""
+ oid_int = u64(oid)
+
+ self._lock_acquire()
+ try:
+ if self._store_cursor is not None:
+ # Allow loading data from later transactions
+ # for conflict resolution.
+ cursor = self._store_cursor
+ else:
+ if not self._load_started:
+ self._start_load()
+ cursor = self._load_cursor
+ if not self._adapter.exists(cursor, u64(oid)):
+ raise KeyError(oid)
+
+ state, start_tid = self._adapter.load_before(
+ cursor, oid_int, u64(tid))
+ if start_tid is not None:
+ end_int = self._adapter.get_object_tid_after(
+ cursor, oid_int, start_tid)
+ if end_int is not None:
+ end = p64(end_int)
+ else:
+ end = None
+ if state is not None:
+ state = str(state)
+ return state, p64(start_tid), end
+ else:
+ return None
+ finally:
+ self._lock_release()
+
+
+ def store(self, oid, serial, data, version, transaction):
+ if self._is_read_only:
+ raise POSException.ReadOnlyError()
+ if transaction is not self._transaction:
+ raise POSException.StorageTransactionError(self, transaction)
+ if version:
+ raise POSException.Unsupported("Versions aren't supported")
+
+ # If self._prepared_txn is not None, that means something is
+ # attempting to store objects after the vote phase has finished.
+ # That should not happen, should it?
+ assert self._prepared_txn is None
+ md5sum = md5.new(data).hexdigest()
+
+ self._lock_acquire()
+ try:
+ # buffer the stores
+ if self._tbuf is None:
+ self._tbuf = AutoTemporaryFile()
+ self._pickler = cPickle.Pickler(
+ self._tbuf, cPickle.HIGHEST_PROTOCOL)
+ self._stored_oids = []
+
+ self._pickler.dump((oid, serial, md5sum, data))
+ self._stored_oids.append(u64(oid))
+ return None
+ finally:
+ self._lock_release()
+
+ def tpc_begin(self, transaction, tid=None, status=' '):
+ if self._is_read_only:
+ raise POSException.ReadOnlyError()
+ self._lock_acquire()
+ try:
+ if self._transaction is transaction:
+ return
+ self._lock_release()
+ self._commit_lock_acquire()
+ self._lock_acquire()
+ self._transaction = transaction
+ self._clear_temp()
+
+ user = transaction.user
+ desc = transaction.description
+ ext = transaction._extension
+ if ext:
+ ext = cPickle.dumps(ext, 1)
+ else:
+ ext = ""
+ self._ude = user, desc, ext
+ self._tstatus = status
+
+ if tid is not None:
+ # get the commit lock and add the transaction now
+ adapter = self._adapter
+ conn, cursor = adapter.open_for_commit()
+ self._store_conn, self._store_cursor = conn, cursor
+ tid_int = u64(tid)
+ try:
+ adapter.add_transaction(cursor, tid_int, user, desc, ext)
+ except:
+ self._drop_store_connection()
+ raise
+ # else choose the tid later
+ self._tid = tid
+
+ finally:
+ self._lock_release()
+
+ def _prepare_tid(self):
+ """Choose a tid for the current transaction.
+
+ This should be done as late in the commit as possible, since
+ it must hold an exclusive commit lock.
+ """
+ if self._tid is not None:
+ return
+ if self._transaction is None:
+ raise POSException.StorageError("No transaction in progress")
+
+ adapter = self._adapter
+ conn, cursor = adapter.open_for_commit()
+ self._store_conn, self._store_cursor = conn, cursor
+ user, desc, ext = self._ude
+
+ attempt = 1
+ while True:
+ try:
+ # Choose a transaction ID.
+ # Base the transaction ID on the database time,
+ # while ensuring that the tid of this transaction
+ # is greater than any existing tid.
+ last_tid, now = adapter.get_tid_and_time(cursor)
+ stamp = TimeStamp(*(time.gmtime(now)[:5] + (now % 60,)))
+ stamp = stamp.laterThan(TimeStamp(p64(last_tid)))
+ tid = repr(stamp)
+
+ tid_int = u64(tid)
+ adapter.add_transaction(cursor, tid_int, user, desc, ext)
+ self._tid = tid
+ break
+
+ except POSException.ConflictError:
+ if attempt < 3:
+ # A concurrent transaction claimed the tid.
+ # Rollback and try again.
+ adapter.restart_commit(cursor)
+ attempt += 1
+ else:
+ self._drop_store_connection()
+ raise
+ except:
+ self._drop_store_connection()
+ raise
+
+
+ def _clear_temp(self):
+ # It is assumed that self._lock_acquire was called before this
+ # method was called.
+ self._prepared_txn = None
+ tbuf = self._tbuf
+ if tbuf is not None:
+ self._tbuf = None
+ self._pickler = None
+ self._stored_oids = None
+ tbuf.close()
+
+
+ def _send_stored(self):
+ """Send the buffered pickles to the database.
+
+ Returns a list of (oid, tid) to be received by
+ Connection._handle_serial().
+ """
+ cursor = self._store_cursor
+ adapter = self._adapter
+ tid_int = u64(self._tid)
+ self._pickler = None # Don't allow any more store operations
+ self._tbuf.seek(0)
+ unpickler = cPickle.Unpickler(self._tbuf)
+
+ serials = [] # [(oid, serial)]
+ prev_tids = adapter.get_object_tids(cursor, self._stored_oids)
+
+ while True:
+ try:
+ oid, serial, md5sum, data = unpickler.load()
+ except EOFError:
+ break
+ oid_int = u64(oid)
+ if not serial:
+ serial = z64
+
+ prev_tid_int = prev_tids.get(oid_int)
+ if prev_tid_int is None:
+ prev_tid_int = 0
+ prev_tid = z64
+ else:
+ prev_tid = p64(prev_tid_int)
+ if prev_tid != serial:
+ # conflict detected
+ rdata = self.tryToResolveConflict(
+ oid, prev_tid, serial, data)
+ if rdata is None:
+ raise POSException.ConflictError(
+ oid=oid, serials=(prev_tid, serial), data=data)
+ else:
+ data = rdata
+ md5sum = md5.new(data).hexdigest()
+
+ self._adapter.store(
+ cursor, oid_int, tid_int, prev_tid_int, md5sum, data)
+
+ if prev_tid and serial != prev_tid:
+ serials.append((oid, ConflictResolution.ResolvedSerial))
+ else:
+ serials.append((oid, self._tid))
+
+ return serials
+
+
+ def _vote(self):
+ """Prepare the transaction for final commit."""
+ # This method initiates a two-phase commit process,
+ # saving the name of the prepared transaction in self._prepared_txn.
+
+ # It is assumed that self._lock_acquire was called before this
+ # method was called.
+
+ if self._prepared_txn is not None:
+ # the vote phase has already completed
+ return
+
+ self._prepare_tid()
+ tid_int = u64(self._tid)
+ cursor = self._store_cursor
+ assert cursor is not None
+
+ if self._tbuf is not None:
+ serials = self._send_stored()
+ else:
+ serials = []
+
+ self._adapter.update_current(cursor, tid_int)
+ self._prepared_txn = self._adapter.commit_phase1(cursor, tid_int)
+ # From the point of view of self._store_cursor,
+ # it now looks like the transaction has been rolled back.
+
+ return serials
+
+
+ def tpc_vote(self, transaction):
+ self._lock_acquire()
+ try:
+ if transaction is not self._transaction:
+ return
+ try:
+ return self._vote()
+ except:
+ if abort_early:
+ # abort early to avoid lockups while running the
+ # somewhat brittle ZODB test suite
+ self.tpc_abort(transaction)
+ raise
+ finally:
+ self._lock_release()
+
+
+ def _finish(self, tid, user, desc, ext):
+ """Commit the transaction."""
+ # It is assumed that self._lock_acquire was called before this
+ # method was called.
+ assert self._tid is not None
+ self._rollback_load_connection()
+ txn = self._prepared_txn
+ assert txn is not None
+ self._adapter.commit_phase2(self._store_cursor, txn)
+ self._drop_store_connection()
+ self._prepared_txn = None
+ self._ltid = self._tid
+ self._tid = None
+
+ def _abort(self):
+ # the lock is held here
+ self._rollback_load_connection()
+ if self._store_cursor is not None:
+ self._adapter.abort(self._store_cursor, self._prepared_txn)
+ self._prepared_txn = None
+ self._drop_store_connection()
+ self._tid = None
+
+ def lastTransaction(self):
+ return self._ltid
+
+ def new_oid(self):
+ if self._is_read_only:
+ raise POSException.ReadOnlyError()
+ self._lock_acquire()
+ try:
+ cursor = self._load_cursor
+ if cursor is None:
+ self._open_load_connection()
+ cursor = self._load_cursor
+ oid_int = self._adapter.new_oid(cursor)
+ return p64(oid_int)
+ finally:
+ self._lock_release()
+
+ def supportsUndo(self):
+ return True
+
+ def supportsTransactionalUndo(self):
+ return True
+
+ def undoLog(self, first=0, last=-20, filter=None):
+ if last < 0:
+ last = first - last
+
+ # use a private connection to ensure the most current results
+ adapter = self._adapter
+ conn, cursor = adapter.open()
+ try:
+ rows = adapter.iter_transactions(cursor)
+ i = 0
+ res = []
+ for tid_int, user, desc, ext in rows:
+ tid = p64(tid_int)
+ d = {'id': base64.encodestring(tid)[:-1],
+ 'time': TimeStamp(tid).timeTime(),
+ 'user_name': user,
+ 'description': desc}
+ if ext:
+ ext = str(ext)
+ if ext:
+ d.update(cPickle.loads(ext))
+ if filter is None or filter(d):
+ if i >= first:
+ res.append(d)
+ i += 1
+ if i >= last:
+ break
+ return res
+
+ finally:
+ adapter.close(conn, cursor)
+
+ def history(self, oid, version=None, size=1, filter=None):
+ self._lock_acquire()
+ try:
+ cursor = self._load_cursor
+ oid_int = u64(oid)
+ try:
+ rows = self._adapter.iter_object_history(cursor, oid_int)
+ except KeyError:
+ raise KeyError(oid)
+
+ res = []
+ for tid_int, username, description, extension, length in rows:
+ tid = p64(tid_int)
+ if extension:
+ d = loads(extension)
+ else:
+ d = {}
+ d.update({"time": TimeStamp(tid).timeTime(),
+ "user_name": username,
+ "description": description,
+ "tid": tid,
+ "version": '',
+ "size": length,
+ })
+ if filter is None or filter(d):
+ res.append(d)
+ if size is not None and len(res) >= size:
+ break
+ return res
+ finally:
+ self._lock_release()
+
+
+ def undo(self, transaction_id, transaction):
+ """Undo a transaction identified by transaction_id.
+
+ Do so by writing new data that reverses the action taken by
+ the transaction.
+ """
+
+ if self._is_read_only:
+ raise POSException.ReadOnlyError()
+ if transaction is not self._transaction:
+ raise POSException.StorageTransactionError(self, transaction)
+
+ undo_tid = base64.decodestring(transaction_id + '\n')
+ assert len(undo_tid) == 8
+ undo_tid_int = u64(undo_tid)
+
+ self._lock_acquire()
+ try:
+ self._prepare_tid()
+ self_tid_int = u64(self._tid)
+ cursor = self._store_cursor
+ assert cursor is not None
+ adapter = self._adapter
+
+ adapter.verify_undoable(cursor, undo_tid_int)
+ oid_ints = adapter.undo(cursor, undo_tid_int, self_tid_int)
+ oids = [p64(oid_int) for oid_int in oid_ints]
+
+ # Update the current object pointers immediately, so that
+ # subsequent undo operations within this transaction will see
+ # the new current objects.
+ adapter.update_current(cursor, self_tid_int)
+
+ return self._tid, oids
+
+ finally:
+ self._lock_release()
+
+
+ def pack(self, t, referencesf):
+ if self._is_read_only:
+ raise POSException.ReadOnlyError()
+
+ pack_point = repr(TimeStamp(*time.gmtime(t)[:5]+(t%60,)))
+ pack_point_int = u64(pack_point)
+
+ def get_references(state):
+ """Return the set of OIDs the given state refers to."""
+ refs = set()
+ if state:
+ for oid in referencesf(str(state)):
+ refs.add(u64(oid))
+ return refs
+
+ # Use a private connection (lock_conn and lock_cursor) to
+ # hold the pack lock, while using a second private connection
+ # (conn and cursor) to decide exactly what to pack.
+ # Close the second connection. Then, with the lock still held,
+ # perform the pack in a third connection opened by the adapter.
+ # This structure is designed to maximize the scalability
+ # of packing and minimize conflicts with concurrent writes.
+ # A consequence of this structure is that the adapter must
+ # not choke on transactions that may have been added between
+ # pre_pack and pack.
+ adapter = self._adapter
+ lock_conn, lock_cursor = adapter.open()
+ try:
+ adapter.hold_pack_lock(lock_cursor)
+
+ conn, cursor = adapter.open()
+ try:
+ try:
+ # Find the latest commit before or at the pack time.
+ tid_int = adapter.choose_pack_transaction(
+ cursor, pack_point_int)
+ if tid_int is None:
+ # Nothing needs to be packed.
+ # TODO: log the fact that nothing needs to be packed.
+ return
+
+ # In pre_pack, the adapter fills tables with information
+ # about what to pack. The adapter should not actually
+ # pack anything yet.
+ adapter.pre_pack(cursor, tid_int, get_references)
+ except:
+ conn.rollback()
+ raise
+ else:
+ conn.commit()
+ finally:
+ adapter.close(conn, cursor)
+
+ # Now pack. The adapter makes its own connection just for the
+ # pack operation, possibly using a special transaction mode
+ # and connection flags.
+ adapter.pack(tid_int)
+ self._after_pack()
+
+ finally:
+ lock_conn.rollback()
+ adapter.close(lock_conn, lock_cursor)
+
+
+ def _after_pack(self):
+ """Reset the transaction state after packing."""
+ # The tests depend on this.
+ self._rollback_load_connection()
+
+
+class BoundRelStorage(RelStorage):
+ """Storage to a database, bound to a particular ZODB.Connection."""
+
+ # The propagate_invalidations flag, set to a false value, tells
+ # the Connection not to propagate object invalidations across
+ # connections, since that ZODB feature is detrimental when the
+ # storage provides its own MVCC.
+ propagate_invalidations = False
+
+ def __init__(self, parent, zodb_conn):
+ # self._conn = conn
+ RelStorage.__init__(self, adapter=parent._adapter, name=parent._name,
+ read_only=parent._is_read_only, create=False)
+ # _prev_polled_tid contains the tid at the previous poll
+ self._prev_polled_tid = None
+ self._showed_disconnect = False
+
+ def poll_invalidations(self, retry=True):
+ """Looks for OIDs of objects that changed since _prev_polled_tid
+
+ Returns {oid: 1}, or None if all objects need to be invalidated
+ because prev_polled_tid is not in the database (presumably it
+ has been packed).
+ """
+ self._lock_acquire()
+ try:
+ if self._closed:
+ return {}
+ try:
+ self._rollback_load_connection()
+ self._start_load()
+ conn = self._load_conn
+ cursor = self._load_cursor
+
+ # Ignore changes made by the last transaction committed
+ # by this connection.
+ ignore_tid = None
+ if self._ltid is not None:
+ ignore_tid = u64(self._ltid)
+
+ # get a list of changed OIDs and the most recent tid
+ oid_ints, new_polled_tid = self._adapter.poll_invalidations(
+ conn, cursor, self._prev_polled_tid, ignore_tid)
+ self._prev_polled_tid = new_polled_tid
+
+ if oid_ints is None:
+ oids = None
+ else:
+ oids = {}
+ for oid_int in oid_ints:
+ oids[p64(oid_int)] = 1
+ return oids
+ except POSException.StorageError:
+ # disconnected
+ if not retry:
+ raise
+ if not self._showed_disconnect:
+ log.warning("Lost connection in %s", repr(self))
+ self._showed_disconnect = True
+ self._open_load_connection()
+ log.info("Reconnected in %s", repr(self))
+ self._showed_disconnect = False
+ return self.poll_invalidations(retry=False)
+ finally:
+ self._lock_release()
+
+ def _after_pack(self):
+ # Disable transaction reset after packing. The connection
+ # should call sync() to see the new state.
+ pass
+
+
+# very basic test... ought to be moved or deleted.
+def test():
+ import transaction
+ import pprint
+ from ZODB.DB import DB
+ from persistent.mapping import PersistentMapping
+ from relstorage.adapters.postgresql import PostgreSQLAdapter
+
+ adapter = PostgreSQLAdapter(params='dbname=relstoragetest')
+ storage = RelStorage(adapter)
+ db = DB(storage)
+
+ if True:
+ for i in range(100):
+ c = db.open()
+ c.root()['foo'] = PersistentMapping()
+ transaction.get().note('added %d' % i)
+ transaction.commit()
+ c.close()
+ print 'wrote', i
+
+ # undo 2 transactions, then redo them and undo the first again.
+ for i in range(2):
+ log = db.undoLog()
+ db.undo(log[0]['id'])
+ db.undo(log[1]['id'])
+ transaction.get().note('undone! (%d)' % i)
+ transaction.commit()
+ print 'undid', i
+
+ pprint.pprint(db.undoLog())
+ db.pack(time.time() - 0.1)
+ pprint.pprint(db.undoLog())
+ db.close()
+
+
+if __name__ == '__main__':
+ import logging
+ logging.basicConfig()
+ test()
Added: relstorage/tests/.cvsignore
===================================================================
--- relstorage/tests/.cvsignore (rev 0)
+++ relstorage/tests/.cvsignore 2008-01-27 00:27:45 UTC (rev 83260)
@@ -0,0 +1 @@
+*.pyc
Added: relstorage/tests/__init__.py
===================================================================
--- relstorage/tests/__init__.py (rev 0)
+++ relstorage/tests/__init__.py 2008-01-27 00:27:45 UTC (rev 83260)
@@ -0,0 +1 @@
+"""relstorage.tests package"""
Added: relstorage/tests/comparison.ods
===================================================================
(Binary files differ)
Property changes on: relstorage/tests/comparison.ods
___________________________________________________________________
Name: svn:mime-type
+ application/octet-stream
Added: relstorage/tests/reltestbase.py
===================================================================
--- relstorage/tests/reltestbase.py (rev 0)
+++ relstorage/tests/reltestbase.py 2008-01-27 00:27:45 UTC (rev 83260)
@@ -0,0 +1,199 @@
+##############################################################################
+#
+# Copyright (c) 2008 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""A foundation for relstorage adapter tests"""
+
+import unittest
+from relstorage.relstorage import RelStorage
+
+from ZODB.DB import DB
+from persistent.mapping import PersistentMapping
+import transaction
+
+from ZODB.tests import StorageTestBase, BasicStorage, \
+ TransactionalUndoStorage, PackableStorage, \
+ Synchronization, ConflictResolution, HistoryStorage, \
+ RevisionStorage, PersistentStorage, \
+ MTStorage, ReadOnlyStorage
+
+from ZODB.tests.StorageTestBase import zodb_unpickle, zodb_pickle
+
+
+class BaseRelStorageTests(StorageTestBase.StorageTestBase):
+
+ def make_adapter(self):
+ # abstract method
+ raise NotImplementedError
+
+ def open(self, **kwargs):
+ adapter = self.make_adapter()
+ self._storage = RelStorage(adapter, **kwargs)
+
+ def setUp(self):
+ self.open(create=1)
+ self._storage._zap()
+
+ def tearDown(self):
+ self._storage.close()
+ self._storage.cleanup()
+
+
+class RelStorageTests(
+ BaseRelStorageTests,
+ BasicStorage.BasicStorage,
+ TransactionalUndoStorage.TransactionalUndoStorage,
+ RevisionStorage.RevisionStorage,
+ PackableStorage.PackableStorage,
+ PackableStorage.PackableUndoStorage,
+ Synchronization.SynchronizedStorage,
+ ConflictResolution.ConflictResolvingStorage,
+ HistoryStorage.HistoryStorage,
+ PersistentStorage.PersistentStorage,
+ MTStorage.MTStorage,
+ ReadOnlyStorage.ReadOnlyStorage
+ ):
+
+ def checkCrossConnectionInvalidation(self):
+ # Verify connections see updated state at txn boundaries
+ db = DB(self._storage)
+ try:
+ c1 = db.open()
+ r1 = c1.root()
+ r1['myobj'] = 'yes'
+ c2 = db.open()
+ r2 = c2.root()
+ self.assert_('myobj' not in r2)
+
+ storage = c1._storage
+ t = transaction.Transaction()
+ t.description = 'invalidation test'
+ storage.tpc_begin(t)
+ c1.commit(t)
+ storage.tpc_vote(t)
+ storage.tpc_finish(t)
+
+ self.assert_('myobj' not in r2)
+ c2.sync()
+ self.assert_('myobj' in r2)
+ self.assert_(r2['myobj'] == 'yes')
+ finally:
+ db.close()
+
+ def checkCrossConnectionIsolation(self):
+ # Verify MVCC isolates connections
+ db = DB(self._storage)
+ try:
+ c1 = db.open()
+ r1 = c1.root()
+ r1['alpha'] = PersistentMapping()
+ r1['gamma'] = PersistentMapping()
+ transaction.commit()
+
+ # Open a second connection but don't load root['alpha'] yet
+ c2 = db.open()
+ r2 = c2.root()
+
+ r1['alpha']['beta'] = 'yes'
+
+ storage = c1._storage
+ t = transaction.Transaction()
+ t.description = 'isolation test 1'
+ storage.tpc_begin(t)
+ c1.commit(t)
+ storage.tpc_vote(t)
+ storage.tpc_finish(t)
+
+ # The second connection will now load root['alpha'], but due to
+ # MVCC, it should continue to see the old state.
+ self.assert_(r2['alpha']._p_changed is None) # A ghost
+ self.assert_(not r2['alpha'])
+ self.assert_(r2['alpha']._p_changed == 0)
+
+ # make root['alpha'] visible to the second connection
+ c2.sync()
+
+ # Now it should be in sync
+ self.assert_(r2['alpha']._p_changed is None) # A ghost
+ self.assert_(r2['alpha'])
+ self.assert_(r2['alpha']._p_changed == 0)
+ self.assert_(r2['alpha']['beta'] == 'yes')
+
+ # Repeat the test with root['gamma']
+ r1['gamma']['delta'] = 'yes'
+
+ storage = c1._storage
+ t = transaction.Transaction()
+ t.description = 'isolation test 2'
+ storage.tpc_begin(t)
+ c1.commit(t)
+ storage.tpc_vote(t)
+ storage.tpc_finish(t)
+
+ # The second connection will now load root[3], but due to MVCC,
+ # it should continue to see the old state.
+ self.assert_(r2['gamma']._p_changed is None) # A ghost
+ self.assert_(not r2['gamma'])
+ self.assert_(r2['gamma']._p_changed == 0)
+
+ # make root[3] visible to the second connection
+ c2.sync()
+
+ # Now it should be in sync
+ self.assert_(r2['gamma']._p_changed is None) # A ghost
+ self.assert_(r2['gamma'])
+ self.assert_(r2['gamma']._p_changed == 0)
+ self.assert_(r2['gamma']['delta'] == 'yes')
+ finally:
+ db.close()
+
+ def checkResolveConflictBetweenConnections(self):
+ # Verify that conflict resolution works between storage instances
+ # bound to connections.
+ obj = ConflictResolution.PCounter()
+ obj.inc()
+
+ oid = self._storage.new_oid()
+
+ revid1 = self._dostoreNP(oid, data=zodb_pickle(obj))
+
+ storage1 = self._storage.bind_connection(None)
+ storage1.load(oid, '')
+ storage2 = self._storage.bind_connection(None)
+ storage2.load(oid, '')
+
+ obj.inc()
+ obj.inc()
+ # The effect of committing two transactions with the same
+ # pickle is to commit two different transactions relative to
+ # revid1 that add two to _value.
+ root_storage = self._storage
+ try:
+ self._storage = storage1
+ revid2 = self._dostoreNP(oid, revid=revid1, data=zodb_pickle(obj))
+ self._storage = storage2
+ revid3 = self._dostoreNP(oid, revid=revid1, data=zodb_pickle(obj))
+
+ data, serialno = self._storage.load(oid, '')
+ inst = zodb_unpickle(data)
+ self.assertEqual(inst._value, 5)
+ finally:
+ self._storage = root_storage
+
+ def check16MObject(self):
+ # Store 16 * 1024 * 1024 bytes in an object, then retrieve it
+ data = 'a 16 byte string' * (1024 * 1024)
+ oid = self._storage.new_oid()
+ self._dostoreNP(oid, data=data)
+ got, serialno = self._storage.load(oid, '')
+ self.assertEqual(len(got), len(data))
+ self.assertEqual(got, data)
Added: relstorage/tests/speedtest.py
===================================================================
--- relstorage/tests/speedtest.py (rev 0)
+++ relstorage/tests/speedtest.py 2008-01-27 00:27:45 UTC (rev 83260)
@@ -0,0 +1,358 @@
+##############################################################################
+#
+# Copyright (c) 2008 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""Compare the speed of RelStorage with FileStorage + ZEO.
+
+Splits into many processes to avoid contention over the global
+interpreter lock.
+"""
+
+import cPickle
+import logging
+import os
+import shutil
+import signal
+import sys
+import tempfile
+import time
+import traceback
+
+import transaction
+from BTrees.IOBTree import IOBTree
+from ZODB.DB import DB
+from ZODB.Connection import Connection
+
+from relstorage.relstorage import RelStorage
+from relstorage.adapters.postgresql import PostgreSQLAdapter
+from relstorage.adapters.oracle import OracleAdapter
+from relstorage.tests.testoracle import getOracleParams
+
+debug = False
+txn_count = 10
+object_counts = [10000] # [1, 100, 10000]
+concurrency_levels = range(1, 16, 2)
+contenders = [
+ ('ZEO + FileStorage', 'zeofs_test'),
+ ('PostgreSQLAdapter', 'postgres_test'),
+ ('OracleAdapter', 'oracle_test'),
+ ]
+repetitions = 3
+max_attempts = 20
+
+
+class ChildProcessError(Exception):
+ """A child process failed"""
+
+
+def run_in_child(wait, func, *args, **kw):
+ pid = os.fork()
+ if pid == 0:
+ # child
+ try:
+ try:
+ logging.basicConfig()
+ if debug:
+ logging.getLogger().setLevel(logging.DEBUG)
+
+ func(*args, **kw)
+ except:
+ traceback.print_exc()
+ os._exit(1)
+ finally:
+ os._exit(0)
+ elif wait:
+ pid_again, code = os.waitpid(pid, 0)
+ if code:
+ raise ChildProcessError(
+ "process running %r failed with exit code %d" % (func, code))
+ return pid
+
+
+class SpeedTest:
+
+ def __init__(self, concurrency, objects_per_txn):
+ self.concurrency = concurrency
+ self.data_to_store = dict((n, 1) for n in range(objects_per_txn))
+
+ def populate(self, make_storage):
+ # initialize the database
+ storage = make_storage()
+ db = DB(storage)
+ conn = db.open()
+ root = conn.root()
+ root['speedtest'] = t = IOBTree()
+ for i in range(self.concurrency):
+ t[i] = IOBTree()
+ transaction.commit()
+ conn.close()
+ db.close()
+ if debug:
+ print >> sys.stderr, 'Populated storage.'
+
+ def write_test(self, storage, n):
+ db = DB(storage)
+ start = time.time()
+ for i in range(txn_count):
+ conn = db.open()
+ root = conn.root()
+ myobj = root['speedtest'][n]
+ myobj[i] = IOBTree(self.data_to_store)
+ transaction.commit()
+ conn.close()
+ end = time.time()
+ db.close()
+ return end - start
+
+ def read_test(self, storage, n):
+ db = DB(storage)
+ start = time.time()
+ for i in range(txn_count):
+ conn = db.open()
+ root = conn.root()
+ got = len(list(root['speedtest'][n][i]))
+ expect = len(self.data_to_store)
+ if got != expect:
+ raise AssertionError
+ conn.close()
+ end = time.time()
+ db.close()
+ return end - start
+
+ def run_tests(self, make_storage):
+ """Run a write and read test.
+
+ Returns the mean time per write transaction and
+ the mean time per read transaction.
+ """
+ run_in_child(True, self.populate, make_storage)
+ r = range(self.concurrency)
+
+ def write(n):
+ return self.write_test(make_storage(), n)
+ def read(n):
+ return self.read_test(make_storage(), n)
+
+ write_times = distribute(write, r)
+ read_times = distribute(read, r)
+ count = float(self.concurrency * txn_count)
+ return (sum(write_times) / count, sum(read_times) / count)
+
+ def run_zeo_server(self, store_fn, sock_fn):
+ from ZODB.FileStorage import FileStorage
+ from ZEO.StorageServer import StorageServer
+
+ fs = FileStorage(store_fn, create=True)
+ ss = StorageServer(sock_fn, {'1': fs})
+
+ import ThreadedAsync.LoopCallback
+ ThreadedAsync.LoopCallback.loop()
+
+ def start_zeo_server(self, store_fn, sock_fn):
+ pid = run_in_child(False, self.run_zeo_server, store_fn, sock_fn)
+ # parent
+ if debug:
+ sys.stderr.write('Waiting for ZEO server to start...')
+ while not os.path.exists(sock_fn):
+ if debug:
+ sys.stderr.write('.')
+ sys.stderr.flush()
+ time.sleep(0.1)
+ if debug:
+ sys.stderr.write(' started.\n')
+ sys.stderr.flush()
+ return pid
+
+ def zeofs_test(self):
+ dir = tempfile.mkdtemp()
+ try:
+ store_fn = os.path.join(dir, 'storage')
+ sock_fn = os.path.join(dir, 'sock')
+ zeo_pid = self.start_zeo_server(store_fn, sock_fn)
+ try:
+ def make_storage():
+ from ZEO.ClientStorage import ClientStorage
+ return ClientStorage(sock_fn)
+ return self.run_tests(make_storage)
+ finally:
+ os.kill(zeo_pid, signal.SIGTERM)
+ finally:
+ shutil.rmtree(dir)
+
+ def postgres_test(self):
+ adapter = PostgreSQLAdapter()
+ adapter.zap()
+ def make_storage():
+ return RelStorage(adapter)
+ return self.run_tests(make_storage)
+
+ def oracle_test(self):
+ user, password, dsn = getOracleParams()
+ adapter = OracleAdapter(user, password, dsn)
+ adapter.zap()
+ def make_storage():
+ return RelStorage(adapter)
+ return self.run_tests(make_storage)
+
+
+def distribute(func, param_iter):
+ """Call a function in separate processes concurrently.
+
+ param_iter is an iterator that provides the parameter for each
+ function call. The parameter is passed as the single argument.
+ The results of calling the function are appended to a list, which
+ is returned once all functions have returned. If any function
+ raises an error, the error is re-raised in the caller.
+ """
+ dir = tempfile.mkdtemp()
+ try:
+ waiting = set() # set of child process IDs
+ for param in param_iter:
+ pid = os.fork()
+ if pid == 0:
+ # child
+ try:
+ logging.basicConfig()
+ if debug:
+ logging.getLogger().setLevel(logging.DEBUG)
+
+ fn = os.path.join(dir, str(os.getpid()))
+ try:
+ res = 1, func(param)
+ except:
+ traceback.print_exc()
+ res = 0, sys.exc_info()[:2]
+ f = open(fn, 'wb')
+ try:
+ cPickle.dump(res, f)
+ finally:
+ f.close()
+ finally:
+ os._exit(0)
+ else:
+ # parent
+ waiting.add(pid)
+ results = []
+ try:
+ while waiting:
+ for pid in list(waiting):
+ pid_again, code = os.waitpid(pid, os.WNOHANG)
+ if not pid_again:
+ continue
+ waiting.remove(pid)
+ if code:
+ raise ChildProcessError(
+ "A process failed with exit code %d" % code)
+ else:
+ fn = os.path.join(dir, str(pid))
+ f = open(fn, 'rb')
+ try:
+ ok, value = cPickle.load(f)
+ if ok:
+ results.append(value)
+ else:
+ raise ChildProcessError(
+ "a child process raised an error: "
+ "%s: %s" % tuple(value))
+ finally:
+ f.close()
+ time.sleep(0.1)
+ return results
+ finally:
+ # kill the remaining processes
+ for pid in waiting:
+ try:
+ os.kill(pid, signal.SIGTERM)
+ except OSError:
+ pass
+ finally:
+ shutil.rmtree(dir)
+
+
+def main():
+
+ # results: {(objects_per_txn, concurrency, contender, direction): [time]}}
+ results = {}
+ for objects_per_txn in object_counts:
+ for concurrency in concurrency_levels:
+ for contender_name, method_name in contenders:
+ for direction in (0, 1):
+ key = (objects_per_txn, concurrency,
+ contender_name, direction)
+ results[key] = []
+
+ try:
+ for objects_per_txn in object_counts:
+ for concurrency in concurrency_levels:
+ test = SpeedTest(concurrency, objects_per_txn)
+ for contender_name, method_name in contenders:
+ print >> sys.stderr, (
+ 'Testing %s with objects_per_txn=%d and concurrency=%d'
+ % (contender_name, objects_per_txn, concurrency))
+ method = getattr(test, method_name)
+ key = (objects_per_txn, concurrency, contender_name)
+
+ for rep in range(repetitions):
+ for attempt in range(max_attempts):
+ msg = ' Running %d/%d...' % (rep + 1, repetitions)
+ if attempt > 0:
+ msg += ' (attempt %d)' % (attempt + 1)
+ print >> sys.stderr, msg,
+ try:
+ w, r = method()
+ except ChildProcessError:
+ if attempt >= max_attempts - 1:
+ raise
+ else:
+ break
+ msg = 'write %5.3fs, read %5.3fs' % (w, r)
+ print >> sys.stderr, msg
+ results[key + (0,)].append(w)
+ results[key + (1,)].append(r)
+
+ # The finally clause causes test results to print even if the tests
+ # stop early.
+ finally:
+ # show the results in CSV format
+ print >> sys.stderr, (
+ 'Average time per transaction in seconds. Best of 3.')
+
+ for objects_per_txn in object_counts:
+ print '** Results with objects_per_txn=%d **' % objects_per_txn
+
+ line = ['"Concurrency"']
+ for contender_name, func in contenders:
+ for direction in (0, 1):
+ dir_text = ['write', 'read'][direction]
+ line.append('%s - %s' % (contender_name, dir_text))
+ print ', '.join(line)
+
+ for concurrency in concurrency_levels:
+ line = [str(concurrency)]
+
+ for contender_name, method_name in contenders:
+ for direction in (0, 1):
+ key = (objects_per_txn, concurrency,
+ contender_name, direction)
+ lst = results[key]
+ if lst:
+ t = min(lst)
+ line.append('%5.3f' % t)
+ else:
+ line.append('?')
+
+ print ', '.join(line)
+ print
+
+
+if __name__ == '__main__':
+ main()
Added: relstorage/tests/testoracle.py
===================================================================
--- relstorage/tests/testoracle.py (rev 0)
+++ relstorage/tests/testoracle.py 2008-01-27 00:27:45 UTC (rev 83260)
@@ -0,0 +1,54 @@
+##############################################################################
+#
+# Copyright (c) 2008 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""Tests of relstorage.adapters.oracle"""
+
+import logging
+import os
+import re
+import unittest
+
+from reltestbase import RelStorageTests
+from relstorage.adapters.oracle import OracleAdapter
+
+
+def getOracleParams():
+ # Expect an environment variable that specifies how to connect.
+ # A more secure way of providing the password would be nice,
+ # if anyone wants to tackle it.
+ connect_string = os.environ.get('ORACLE_CONNECT')
+ if not connect_string:
+ raise KeyError("An ORACLE_CONNECT environment variable is "
+ "required to run OracleTests")
+ mo = re.match('([^/]+)/([^@]+)@(.*)', connect_string)
+ if mo is None:
+ raise KeyError("The ORACLE_CONNECT environment variable must "
+ "be of the form 'user/password at dsn'")
+ user, password, dsn = mo.groups()
+ return user, password, dsn
+
+
+class OracleTests(RelStorageTests):
+ def make_adapter(self):
+ user, password, dsn = getOracleParams()
+ return OracleAdapter(user, password, dsn)
+
+def test_suite():
+ suite = unittest.TestSuite()
+ suite.addTest(unittest.makeSuite(OracleTests, "check"))
+ return suite
+
+if __name__=='__main__':
+ logging.basicConfig()
+ unittest.main(defaultTest="test_suite")
+
Added: relstorage/tests/testpostgresql.py
===================================================================
--- relstorage/tests/testpostgresql.py (rev 0)
+++ relstorage/tests/testpostgresql.py 2008-01-27 00:27:45 UTC (rev 83260)
@@ -0,0 +1,35 @@
+##############################################################################
+#
+# Copyright (c) 2008 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""Tests of relstorage.adapters.postgresql"""
+
+import logging
+import unittest
+
+from reltestbase import RelStorageTests
+from relstorage.adapters.postgresql import PostgreSQLAdapter
+
+
+class PostgreSQLTests(RelStorageTests):
+ def make_adapter(self):
+ return PostgreSQLAdapter()
+
+def test_suite():
+ suite = unittest.TestSuite()
+ suite.addTest(unittest.makeSuite(PostgreSQLTests, "check"))
+ return suite
+
+if __name__=='__main__':
+ logging.basicConfig()
+ unittest.main(defaultTest="test_suite")
+
More information about the Checkins
mailing list