[Checkins] SVN: relstorage/ moved to trunk

Shane Hathaway shane at hathawaymix.org
Sat Jan 26 19:36:01 EST 2008


Log message for revision 83261:
  moved to trunk

Changed:
  D   relstorage/CHANGELOG.txt
  D   relstorage/README.txt
  D   relstorage/__init__.py
  D   relstorage/adapters/
  D   relstorage/autotemp.py
  D   relstorage/component.xml
  D   relstorage/config.py
  D   relstorage/notes/
  D   relstorage/poll-invalidation-1-zodb-3-7-1.patch
  D   relstorage/relstorage.py
  D   relstorage/tests/
  A   relstorage/trunk/
  A   relstorage/trunk/CHANGELOG.txt
  A   relstorage/trunk/README.txt
  A   relstorage/trunk/notes/
  A   relstorage/trunk/poll-invalidation-1-zodb-3-7-1.patch
  A   relstorage/trunk/relstorage/
  A   relstorage/trunk/relstorage/__init__.py
  A   relstorage/trunk/relstorage/adapters/
  A   relstorage/trunk/relstorage/autotemp.py
  A   relstorage/trunk/relstorage/component.xml
  A   relstorage/trunk/relstorage/config.py
  A   relstorage/trunk/relstorage/relstorage.py
  A   relstorage/trunk/relstorage/tests/

-=-
Deleted: relstorage/CHANGELOG.txt
===================================================================
--- relstorage/CHANGELOG.txt	2008-01-27 00:27:45 UTC (rev 83260)
+++ relstorage/CHANGELOG.txt	2008-01-27 00:36:00 UTC (rev 83261)
@@ -1,75 +0,0 @@
-
-relstorage 0.9
-
-- Renamed to reflect expanding database support.
-
-- Support for Oracle added.
-
-- Major overhaul with many scalability and reliability improvements,
-  particularly in the area of packing.
-
-- Moved to svn.zope.org and switched to ZPL 2.1 (required for projects
-  on svn.zope.org.)
-
-
-PGStorage 0.4
-
-- Began using the PostgreSQL LISTEN and NOTIFY statements as a shortcut
-  for invalidation polling.
-
-- Removed the commit_order code.  The commit_order idea was intended to
-  allow concurrent commits, but that idea is a little too ambitious while
-  other more important ideas are being tested.  Something like it may
-  come later.
-
-- Improved connection management: only one database connection is
-  held continuously open per storage instance.
-
-- Reconnect to the database automatically.
-
-- Removed test mode.
-
-- Switched from using a ZODB.Connection subclass to a ZODB patch.  The
-  Connection class changes in subtle ways too often to subclass reliably;
-  a patch is much safer.
-
-- PostgreSQL 8.1 is now a dependency because PGStorage uses two phase commit.
-
-- Fixed an undo bug.  Symptom: attempting to examine the undo log revealed
-  broken pickles.  Cause: the extension field was not being wrapped in
-  psycopg2.Binary upon insert.  Solution: used psycopg2.Binary.
-  Unfortunately, this doesn't fix existing transactions people have
-  committed.  If anyone has any data to keep, fixing the old transactions
-  should be easy.
-
-- Moved from a private CVS repository to Sourceforge.
-  See http://pgstorage.sourceforge.net .  Also switched to the MIT license.
-
-- David Pratt added a basic getSize() implementation so that the Zope
-  management interface displays an estimate of the size of the database.
-
-- Turned PGStorage into a top-level package.  Python generally makes
-  top-level packages easier to install.
-
-
-PGStorage 0.3
-
-- Made compatible with Zope 3, although an undo bug apparently remains.
-
-
-PGStorage 0.2
-
-- Fixed concurrent commits, which were generating deadlocks.  Fixed by
-  adding a special table, "commit_lock", which is used for
-  synchronizing increments of commit_seq (but only at final commit.)
-  If you are upgrading from version 0.1, you need to change your
-  database using the 'psql' prompt:
-
-    create table commit_lock ();
-
-- Added speed tests and an OpenDocument spreadsheet comparing
-  FileStorage / ZEO with PGStorage.  PGStorage wins at reading objects
-  and writing a lot of small transactions, while FileStorage / ZEO
-  wins at writing big transactions.  Interestingly, they tie when
-  writing a RAM disk.
-

Deleted: relstorage/README.txt
===================================================================
--- relstorage/README.txt	2008-01-27 00:27:45 UTC (rev 83260)
+++ relstorage/README.txt	2008-01-27 00:36:00 UTC (rev 83261)
@@ -1,15 +0,0 @@
-
-To make Zope store in RelStorage, patch ZODB/Connection.py using the
-provided patch, then add the following to zope.conf, modifying the
-adapter and params lines to fit your needs.
-
-
-%import relstorage
-<zodb_db main>
-  mount-point /
-  <relstorage>
-    adapter postgresql
-    params host=localhost dbname=zodb user=zope password=...
-  </relstorage>
-</zodb_db>
-

Deleted: relstorage/__init__.py
===================================================================
--- relstorage/__init__.py	2008-01-27 00:27:45 UTC (rev 83260)
+++ relstorage/__init__.py	2008-01-27 00:36:00 UTC (rev 83261)
@@ -1,22 +0,0 @@
-##############################################################################
-#
-# Copyright (c) 2008 Zope Corporation and Contributors.
-# All Rights Reserved.
-#
-# This software is subject to the provisions of the Zope Public License,
-# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
-# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
-# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
-# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
-# FOR A PARTICULAR PURPOSE.
-#
-##############################################################################
-"""relstorage package"""
-
-# perform a compatibility test
-from ZODB.Connection import Connection
-
-if not hasattr(Connection, '_poll_invalidations'):
-    raise ImportError('RelStorage requires the invalidation polling '
-        'patch for ZODB.')
-del Connection

Deleted: relstorage/autotemp.py
===================================================================
--- relstorage/autotemp.py	2008-01-27 00:27:45 UTC (rev 83260)
+++ relstorage/autotemp.py	2008-01-27 00:36:00 UTC (rev 83261)
@@ -1,43 +0,0 @@
-##############################################################################
-#
-# Copyright (c) 2008 Zope Corporation and Contributors.
-# All Rights Reserved.
-#
-# This software is subject to the provisions of the Zope Public License,
-# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
-# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
-# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
-# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
-# FOR A PARTICULAR PURPOSE.
-#
-##############################################################################
-"""A temporary file that switches from StringIO to TemporaryFile if needed.
-
-This could be a useful addition to Python's tempfile module.
-"""
-
-from cStringIO import StringIO
-import tempfile
-
-class AutoTemporaryFile:
-    """Initially a StringIO, but becomes a TemporaryFile if it grows too big"""
-    def __init__(self, threshold=10485760):
-        self._threshold = threshold
-        self._f = f = StringIO()
-        # delegate most methods
-        for m in ('read', 'readline', 'seek', 'tell', 'close'):
-            setattr(self, m, getattr(f, m))
-
-    def write(self, data):
-        threshold = self._threshold
-        if threshold > 0 and self.tell() + len(data) >= threshold:
-            # convert to TemporaryFile
-            f = tempfile.TemporaryFile()
-            f.write(self._f.getvalue())
-            f.seek(self.tell())
-            self._f = f
-            self._threshold = 0
-            # delegate all important methods
-            for m in ('write', 'read', 'readline', 'seek', 'tell', 'close'):
-                setattr(self, m, getattr(f, m))
-        self._f.write(data)

Deleted: relstorage/component.xml
===================================================================
--- relstorage/component.xml	2008-01-27 00:27:45 UTC (rev 83260)
+++ relstorage/component.xml	2008-01-27 00:36:00 UTC (rev 83261)
@@ -1,64 +0,0 @@
-<?xml version="1.0"?>
-
-<!-- RelStorage configuration via ZConfig -->
-
-<component prefix="relstorage.config">
-
-  <import package="ZODB"/>
-  <abstracttype name="relstorage.adapter"/>
-
-  <sectiontype name="relstorage" implements="ZODB.storage"
-      datatype=".RelStorageFactory">
-    <section type="relstorage.adapter" name="*" attribute="adapter"/>
-    <key name="name" default="RelStorage"/>
-    <key name="create" datatype="boolean" default="true">
-      <description>
-        Flag that indicates whether the storage should be initialized if
-        it does not already exist.
-      </description>
-    </key>
-    <key name="read-only" datatype="boolean" default="false">
-      <description>
-        If true, only reads may be executed against the storage.  Note
-        that the "pack" operation is not considered a write operation
-        and is still allowed on a read-only filestorage.
-      </description>
-    </key>
-  </sectiontype>
-
-  <sectiontype name="postgresql" implements="relstorage.adapter"
-    datatype=".PostgreSQLAdapterFactory">
-    <key name="dsn" datatype="string" required="no" default="">
-      <description>
-        The PostgreSQL data source name.  For example:
-
-        dsn dbname='template1' user='user' host='localhost' password='pass'
-
-        If dsn is omitted, the adapter will connect to a local database with
-        no password.  Both the user and database name will match the
-        name of the owner of the current process.
-      </description>
-    </key>
-  </sectiontype>
-
-  <sectiontype name="oracle" implements="relstorage.adapter"
-    datatype=".OracleAdapterFactory">
-    <key name="user" datatype="string" required="yes">
-      <description>
-        The Oracle account name
-      </description>
-    </key>
-    <key name="password" datatype="string" required="yes">
-      <description>
-        The Oracle account password
-      </description>
-    </key>
-    <key name="dsn" datatype="string" required="yes">
-      <description>
-        The Oracle data source name.  The Oracle client library will
-        normally expect to find the DSN in /etc/oratab.
-      </description>
-    </key>
-  </sectiontype>
-
-</component>

Deleted: relstorage/config.py
===================================================================
--- relstorage/config.py	2008-01-27 00:27:45 UTC (rev 83260)
+++ relstorage/config.py	2008-01-27 00:36:00 UTC (rev 83261)
@@ -1,41 +0,0 @@
-##############################################################################
-#
-# Copyright (c) 2008 Zope Corporation and Contributors.
-# All Rights Reserved.
-#
-# This software is subject to the provisions of the Zope Public License,
-# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
-# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
-# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
-# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
-# FOR A PARTICULAR PURPOSE.
-#
-##############################################################################
-"""ZConfig directive implementations for binding RelStorage to Zope"""
-
-from ZODB.config import BaseConfig
-
-from relstorage import RelStorage
-
-
-class RelStorageFactory(BaseConfig):
-    """Open a storage configured via ZConfig"""
-    def open(self):
-        config = self.config
-        adapter = config.adapter.open()
-        return RelStorage(adapter, name=config.name, create=config.create,
-            read_only=config.read_only)
-
-
-class PostgreSQLAdapterFactory(BaseConfig):
-    def open(self):
-        from adapters.postgresql import PostgreSQLAdapter
-        return PostgreSQLAdapter(self.config.dsn)
-
-
-class OracleAdapterFactory(BaseConfig):
-    def open(self):
-        from adapters.oracle import OracleAdapter
-        config = self.config
-        return OracleAdapter(config.user, config.password, config.dsn)
-

Deleted: relstorage/poll-invalidation-1-zodb-3-7-1.patch
===================================================================
--- relstorage/poll-invalidation-1-zodb-3-7-1.patch	2008-01-27 00:27:45 UTC (rev 83260)
+++ relstorage/poll-invalidation-1-zodb-3-7-1.patch	2008-01-27 00:36:00 UTC (rev 83261)
@@ -1,94 +0,0 @@
-diff -r 34747fbd09ec Connection.py
---- a/Connection.py	Tue Nov 20 21:57:31 2007 -0700
-+++ b/Connection.py	Fri Jan 11 21:19:00 2008 -0700
-@@ -75,8 +75,14 @@ class Connection(ExportImport, object):
-         """Create a new Connection."""
- 
-         self._db = db
--        self._normal_storage = self._storage = db._storage
--        self.new_oid = db._storage.new_oid
-+        storage = db._storage
-+        m = getattr(storage, 'bind_connection', None)
-+        if m is not None:
-+            # Use a storage instance bound to this connection.
-+            storage = m(self)
-+
-+        self._normal_storage = self._storage = storage
-+        self.new_oid = storage.new_oid
-         self._savepoint_storage = None
- 
-         self.transaction_manager = self._synch = self._mvcc = None
-@@ -170,6 +176,12 @@ class Connection(ExportImport, object):
-         # Multi-database support
-         self.connections = {self._db.database_name: self}
- 
-+        # Allow the storage to decide whether invalidations should
-+        # propagate between connections.  If the storage provides MVCC
-+        # semantics, it is better to not propagate invalidations between
-+        # connections.
-+        self._propagate_invalidations = getattr(
-+            self._storage, 'propagate_invalidations', True)
- 
-     def add(self, obj):
-         """Add a new object 'obj' to the database and assign it an oid."""
-@@ -267,6 +279,11 @@ class Connection(ExportImport, object):
-             self.transaction_manager.unregisterSynch(self)
-             self._synch = None
- 
-+        # If the storage wants to know, tell it this connection is closing.
-+        m = getattr(self._storage, 'connection_closing', None)
-+        if m is not None:
-+            m()
-+
-         if primary:
-             for connection in self.connections.values():
-                 if connection is not self:
-@@ -295,6 +312,10 @@ class Connection(ExportImport, object):
- 
-     def invalidate(self, tid, oids):
-         """Notify the Connection that transaction 'tid' invalidated oids."""
-+        if not self._propagate_invalidations:
-+            # The storage disabled inter-connection invalidation.
-+            return
-+
-         self._inv_lock.acquire()
-         try:
-             if self._txn_time is None:
-@@ -438,8 +459,23 @@ class Connection(ExportImport, object):
-         self._registered_objects = []
-         self._creating.clear()
- 
-+    def _poll_invalidations(self):
-+        """Poll and process object invalidations provided by the storage.
-+        """
-+        m = getattr(self._storage, 'poll_invalidations', None)
-+        if m is not None:
-+            # Poll the storage for invalidations.
-+            invalidated = m()
-+            if invalidated is None:
-+                # special value: the transaction is so old that
-+                # we need to flush the whole cache.
-+                self._cache.invalidate(self._cache.cache_data.keys())
-+            elif invalidated:
-+                self._cache.invalidate(invalidated)
-+
-     # Process pending invalidations.
-     def _flush_invalidations(self):
-+        self._poll_invalidations()
-         self._inv_lock.acquire()
-         try:
-             # Non-ghostifiable objects may need to read when they are
-diff -r 34747fbd09ec DB.py
---- a/DB.py	Tue Nov 20 21:57:31 2007 -0700
-+++ b/DB.py	Wed Nov 28 18:33:12 2007 -0700
-@@ -260,6 +260,10 @@ class DB(object):
-             storage.store(z64, None, file.getvalue(), '', t)
-             storage.tpc_vote(t)
-             storage.tpc_finish(t)
-+        if hasattr(storage, 'connection_closing'):
-+            # Let the storage release whatever resources it used for loading
-+            # the root object.
-+            storage.connection_closing()
- 
-         # Multi-database setup.
-         if databases is None:

Deleted: relstorage/relstorage.py
===================================================================
--- relstorage/relstorage.py	2008-01-27 00:27:45 UTC (rev 83260)
+++ relstorage/relstorage.py	2008-01-27 00:36:00 UTC (rev 83261)
@@ -1,804 +0,0 @@
-##############################################################################
-#
-# Copyright (c) 2008 Zope Corporation and Contributors.
-# All Rights Reserved.
-#
-# This software is subject to the provisions of the Zope Public License,
-# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
-# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
-# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
-# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
-# FOR A PARTICULAR PURPOSE.
-#
-##############################################################################
-"""The core of RelStorage, a ZODB storage for relational databases.
-
-Stores pickles in the database.
-"""
-
-import base64
-import cPickle
-import logging
-import md5
-import os
-import time
-import weakref
-from ZODB.utils import p64, u64, z64
-from ZODB.BaseStorage import BaseStorage
-from ZODB import ConflictResolution, POSException
-from persistent.TimeStamp import TimeStamp
-
-from autotemp import AutoTemporaryFile
-
-log = logging.getLogger("relstorage")
-
-# Set the RELSTORAGE_ABORT_EARLY environment variable when debugging
-# a failure revealed by the ZODB test suite.  The test suite often fails
-# to call tpc_abort in the event of an error, leading to deadlocks.
-# The variable causes RelStorage to abort failed transactions
-# early rather than wait for an explicit abort.
-abort_early = os.environ.get('RELSTORAGE_ABORT_EARLY')
-
-
-class RelStorage(BaseStorage,
-                ConflictResolution.ConflictResolvingStorage):
-    """Storage to a relational database, based on invalidation polling"""
-
-    def __init__(self, adapter, name='RelStorage', create=True,
-            read_only=False):
-        self._adapter = adapter
-        self._name = name
-        self._is_read_only = read_only
-
-        # load_conn and load_cursor are always open
-        self._load_conn = None
-        self._load_cursor = None
-        self._load_started = False
-        self._open_load_connection()
-        # store_conn and store_cursor are open only during commit
-        self._store_conn = None
-        self._store_cursor = None
-
-        if create:
-            self._adapter.prepare_schema()
-
-        BaseStorage.__init__(self, name)
-
-        self._tid = None
-        self._ltid = None
-
-        # _tbuf is a temporary file that contains pickles of data to be
-        # committed.  _pickler writes pickles to that file.  _stored_oids
-        # is a list of integer OIDs to be stored.
-        self._tbuf = None
-        self._pickler = None
-        self._stored_oids = None
-
-        # _prepared_txn is the name of the transaction to commit in the
-        # second phase.
-        self._prepared_txn = None
-
-        # _instances is a list of weak references to storage instances bound
-        # to the same database.
-        self._instances = []
-
-        # _closed is True after self.close() is called.  Since close()
-        # can be called from another thread, access to self._closed should
-        # be inside a _lock_acquire()/_lock_release() block.
-        self._closed = False
-
-    def _open_load_connection(self):
-        """Open the load connection to the database.  Return nothing."""
-        conn, cursor = self._adapter.open_for_load()
-        self._drop_load_connection()
-        self._load_conn, self._load_cursor = conn, cursor
-        self._load_started = True
-
-    def _drop_load_connection(self):
-        conn, cursor = self._load_conn, self._load_cursor
-        self._load_conn, self._load_cursor = None, None
-        self._adapter.close(conn, cursor)
-
-    def _drop_store_connection(self):
-        conn, cursor = self._store_conn, self._store_cursor
-        self._store_conn, self._store_cursor = None, None
-        self._adapter.close(conn, cursor)
-
-    def _rollback_load_connection(self):
-        if self._load_conn is not None:
-            self._load_started = False
-            self._load_conn.rollback()
-
-    def _start_load(self):
-        if self._load_cursor is None:
-            self._open_load_connection()
-        else:
-            self._adapter.restart_load(self._load_cursor)
-            self._load_started = True
-
-    def _zap(self):
-        """Clear all objects out of the database.
-
-        Used by the test suite.
-        """
-        self._adapter.zap()
-        self._rollback_load_connection()
-
-    def close(self):
-        """Close the connections to the database."""
-        self._lock_acquire()
-        try:
-            self._closed = True
-            self._drop_load_connection()
-            self._drop_store_connection()
-            for wref in self._instances:
-                instance = wref()
-                if instance is not None:
-                    instance.close()
-        finally:
-            self._lock_release()
-
-    def bind_connection(self, zodb_conn):
-        """Get a connection-bound storage instance.
-
-        Connections have their own storage instances so that
-        the database can provide the MVCC semantics rather than ZODB.
-        """
-        res = BoundRelStorage(self, zodb_conn)
-        self._instances.append(weakref.ref(res))
-        return res
-
-    def connection_closing(self):
-        """Release resources."""
-        self._rollback_load_connection()
-
-    def __len__(self):
-        return self._adapter.get_object_count()
-
-    def getSize(self):
-        """Return database size in bytes"""
-        return self._adapter.get_db_size()
-
-    def load(self, oid, version):
-        self._lock_acquire()
-        try:
-            if not self._load_started:
-                self._start_load()
-            cursor = self._load_cursor
-            state, tid_int = self._adapter.load_current(cursor, u64(oid))
-        finally:
-            self._lock_release()
-        if tid_int is not None:
-            if state:
-                state = str(state)
-            if not state:
-                # This can happen if something attempts to load
-                # an object whose creation has been undone.
-                raise KeyError(oid)
-            return state, p64(tid_int)
-        else:
-            raise KeyError(oid)
-
-    def loadEx(self, oid, version):
-        # Since we don't support versions, just tack the empty version
-        # string onto load's result.
-        return self.load(oid, version) + ("",)
-
-    def loadSerial(self, oid, serial):
-        """Load a specific revision of an object"""
-        self._lock_acquire()
-        try:
-            if self._store_cursor is not None:
-                # Allow loading data from later transactions
-                # for conflict resolution.
-                cursor = self._store_cursor
-            else:
-                if not self._load_started:
-                    self._start_load()
-                cursor = self._load_cursor
-            state = self._adapter.load_revision(cursor, u64(oid), u64(serial))
-            if state is not None:
-                state = str(state)
-                if not state:
-                    raise POSKeyError(oid)
-                return state
-            else:
-                raise KeyError(oid)
-        finally:
-            self._lock_release()
-
-    def loadBefore(self, oid, tid):
-        """Return the most recent revision of oid before tid committed."""
-        oid_int = u64(oid)
-
-        self._lock_acquire()
-        try:
-            if self._store_cursor is not None:
-                # Allow loading data from later transactions
-                # for conflict resolution.
-                cursor = self._store_cursor
-            else:
-                if not self._load_started:
-                    self._start_load()
-                cursor = self._load_cursor
-            if not self._adapter.exists(cursor, u64(oid)):
-                raise KeyError(oid)
-
-            state, start_tid = self._adapter.load_before(
-                cursor, oid_int, u64(tid))
-            if start_tid is not None:
-                end_int = self._adapter.get_object_tid_after(
-                    cursor, oid_int, start_tid)
-                if end_int is not None:
-                    end = p64(end_int)
-                else:
-                    end = None
-                if state is not None:
-                    state = str(state)
-                return state, p64(start_tid), end
-            else:
-                return None
-        finally:
-            self._lock_release()
-
-
-    def store(self, oid, serial, data, version, transaction):
-        if self._is_read_only:
-            raise POSException.ReadOnlyError()
-        if transaction is not self._transaction:
-            raise POSException.StorageTransactionError(self, transaction)
-        if version:
-            raise POSException.Unsupported("Versions aren't supported")
-
-        # If self._prepared_txn is not None, that means something is
-        # attempting to store objects after the vote phase has finished.
-        # That should not happen, should it?
-        assert self._prepared_txn is None
-        md5sum = md5.new(data).hexdigest()
-
-        self._lock_acquire()
-        try:
-            # buffer the stores
-            if self._tbuf is None:
-                self._tbuf = AutoTemporaryFile()
-                self._pickler = cPickle.Pickler(
-                    self._tbuf, cPickle.HIGHEST_PROTOCOL)
-                self._stored_oids = []
-
-            self._pickler.dump((oid, serial, md5sum, data))
-            self._stored_oids.append(u64(oid))
-            return None
-        finally:
-            self._lock_release()
-
-    def tpc_begin(self, transaction, tid=None, status=' '):
-        if self._is_read_only:
-            raise POSException.ReadOnlyError()
-        self._lock_acquire()
-        try:
-            if self._transaction is transaction:
-                return
-            self._lock_release()
-            self._commit_lock_acquire()
-            self._lock_acquire()
-            self._transaction = transaction
-            self._clear_temp()
-
-            user = transaction.user
-            desc = transaction.description
-            ext = transaction._extension
-            if ext:
-                ext = cPickle.dumps(ext, 1)
-            else:
-                ext = ""
-            self._ude = user, desc, ext
-            self._tstatus = status
-
-            if tid is not None:
-                # get the commit lock and add the transaction now
-                adapter = self._adapter
-                conn, cursor = adapter.open_for_commit()
-                self._store_conn, self._store_cursor = conn, cursor
-                tid_int = u64(tid)
-                try:
-                    adapter.add_transaction(cursor, tid_int, user, desc, ext)
-                except:
-                    self._drop_store_connection()
-                    raise
-            # else choose the tid later
-            self._tid = tid
-
-        finally:
-            self._lock_release()
-
-    def _prepare_tid(self):
-        """Choose a tid for the current transaction.
-
-        This should be done as late in the commit as possible, since
-        it must hold an exclusive commit lock.
-        """
-        if self._tid is not None:
-            return
-        if self._transaction is None:
-            raise POSException.StorageError("No transaction in progress")
-
-        adapter = self._adapter
-        conn, cursor = adapter.open_for_commit()
-        self._store_conn, self._store_cursor = conn, cursor
-        user, desc, ext = self._ude
-
-        attempt = 1
-        while True:
-            try:
-                # Choose a transaction ID.
-                # Base the transaction ID on the database time,
-                # while ensuring that the tid of this transaction
-                # is greater than any existing tid.
-                last_tid, now = adapter.get_tid_and_time(cursor)
-                stamp = TimeStamp(*(time.gmtime(now)[:5] + (now % 60,)))
-                stamp = stamp.laterThan(TimeStamp(p64(last_tid)))
-                tid = repr(stamp)
-
-                tid_int = u64(tid)
-                adapter.add_transaction(cursor, tid_int, user, desc, ext)
-                self._tid = tid
-                break
-
-            except POSException.ConflictError:
-                if attempt < 3:
-                    # A concurrent transaction claimed the tid.
-                    # Rollback and try again.
-                    adapter.restart_commit(cursor)
-                    attempt += 1
-                else:
-                    self._drop_store_connection()
-                    raise
-            except:
-                self._drop_store_connection()
-                raise
-
-
-    def _clear_temp(self):
-        # It is assumed that self._lock_acquire was called before this
-        # method was called.
-        self._prepared_txn = None
-        tbuf = self._tbuf
-        if tbuf is not None:
-            self._tbuf = None
-            self._pickler = None
-            self._stored_oids = None
-            tbuf.close()
-
-
-    def _send_stored(self):
-        """Send the buffered pickles to the database.
-
-        Returns a list of (oid, tid) to be received by
-        Connection._handle_serial().
-        """
-        cursor = self._store_cursor
-        adapter = self._adapter
-        tid_int = u64(self._tid)
-        self._pickler = None  # Don't allow any more store operations
-        self._tbuf.seek(0)
-        unpickler = cPickle.Unpickler(self._tbuf)
-
-        serials = []  # [(oid, serial)]
-        prev_tids = adapter.get_object_tids(cursor, self._stored_oids)
-
-        while True:
-            try:
-                oid, serial, md5sum, data = unpickler.load()
-            except EOFError:
-                break
-            oid_int = u64(oid)
-            if not serial:
-                serial = z64
-
-            prev_tid_int = prev_tids.get(oid_int)
-            if prev_tid_int is None:
-                prev_tid_int = 0
-                prev_tid = z64
-            else:
-                prev_tid = p64(prev_tid_int)
-                if prev_tid != serial:
-                    # conflict detected
-                    rdata = self.tryToResolveConflict(
-                        oid, prev_tid, serial, data)
-                    if rdata is None:
-                        raise POSException.ConflictError(
-                            oid=oid, serials=(prev_tid, serial), data=data)
-                    else:
-                        data = rdata
-                        md5sum = md5.new(data).hexdigest()
-
-            self._adapter.store(
-                cursor, oid_int, tid_int, prev_tid_int, md5sum, data)
-
-            if prev_tid and serial != prev_tid:
-                serials.append((oid, ConflictResolution.ResolvedSerial))
-            else:
-                serials.append((oid, self._tid))
-
-        return serials
-
-
-    def _vote(self):
-        """Prepare the transaction for final commit."""
-        # This method initiates a two-phase commit process,
-        # saving the name of the prepared transaction in self._prepared_txn.
-
-        # It is assumed that self._lock_acquire was called before this
-        # method was called.
-
-        if self._prepared_txn is not None:
-            # the vote phase has already completed
-            return
-
-        self._prepare_tid()
-        tid_int = u64(self._tid)
-        cursor = self._store_cursor
-        assert cursor is not None
-
-        if self._tbuf is not None:
-            serials = self._send_stored()
-        else:
-            serials = []
-
-        self._adapter.update_current(cursor, tid_int)
-        self._prepared_txn = self._adapter.commit_phase1(cursor, tid_int)
-        # From the point of view of self._store_cursor,
-        # it now looks like the transaction has been rolled back.
-
-        return serials
-
-
-    def tpc_vote(self, transaction):
-        self._lock_acquire()
-        try:
-            if transaction is not self._transaction:
-                return
-            try:
-                return self._vote()
-            except:
-                if abort_early:
-                    # abort early to avoid lockups while running the
-                    # somewhat brittle ZODB test suite
-                    self.tpc_abort(transaction)
-                raise
-        finally:
-            self._lock_release()
-
-
-    def _finish(self, tid, user, desc, ext):
-        """Commit the transaction."""
-        # It is assumed that self._lock_acquire was called before this
-        # method was called.
-        assert self._tid is not None
-        self._rollback_load_connection()
-        txn = self._prepared_txn
-        assert txn is not None
-        self._adapter.commit_phase2(self._store_cursor, txn)
-        self._drop_store_connection()
-        self._prepared_txn = None
-        self._ltid = self._tid
-        self._tid = None
-
-    def _abort(self):
-        # the lock is held here
-        self._rollback_load_connection()
-        if self._store_cursor is not None:
-            self._adapter.abort(self._store_cursor, self._prepared_txn)
-        self._prepared_txn = None
-        self._drop_store_connection()
-        self._tid = None
-
-    def lastTransaction(self):
-        return self._ltid
-
-    def new_oid(self):
-        if self._is_read_only:
-            raise POSException.ReadOnlyError()
-        self._lock_acquire()
-        try:
-            cursor = self._load_cursor
-            if cursor is None:
-                self._open_load_connection()
-                cursor = self._load_cursor
-            oid_int = self._adapter.new_oid(cursor)
-            return p64(oid_int)
-        finally:
-            self._lock_release()
-
-    def supportsUndo(self):
-        return True
-
-    def supportsTransactionalUndo(self):
-        return True
-
-    def undoLog(self, first=0, last=-20, filter=None):
-        if last < 0:
-            last = first - last
-
-        # use a private connection to ensure the most current results
-        adapter = self._adapter
-        conn, cursor = adapter.open()
-        try:
-            rows = adapter.iter_transactions(cursor)
-            i = 0
-            res = []
-            for tid_int, user, desc, ext in rows:
-                tid = p64(tid_int)
-                d = {'id': base64.encodestring(tid)[:-1],
-                     'time': TimeStamp(tid).timeTime(),
-                     'user_name': user,
-                     'description': desc}
-                if ext:
-                    ext = str(ext)
-                if ext:
-                    d.update(cPickle.loads(ext))
-                if filter is None or filter(d):
-                    if i >= first:
-                        res.append(d)
-                    i += 1
-                    if i >= last:
-                        break
-            return res
-
-        finally:
-            adapter.close(conn, cursor)
-
-    def history(self, oid, version=None, size=1, filter=None):
-        self._lock_acquire()
-        try:
-            cursor = self._load_cursor
-            oid_int = u64(oid)
-            try:
-                rows = self._adapter.iter_object_history(cursor, oid_int)
-            except KeyError:
-                raise KeyError(oid)
-
-            res = []
-            for tid_int, username, description, extension, length in rows:
-                tid = p64(tid_int)
-                if extension:
-                    d = loads(extension)
-                else:
-                    d = {}
-                d.update({"time": TimeStamp(tid).timeTime(),
-                          "user_name": username,
-                          "description": description,
-                          "tid": tid,
-                          "version": '',
-                          "size": length,
-                          })
-                if filter is None or filter(d):
-                    res.append(d)
-                    if size is not None and len(res) >= size:
-                        break
-            return res
-        finally:
-            self._lock_release()
-
-
-    def undo(self, transaction_id, transaction):
-        """Undo a transaction identified by transaction_id.
-
-        Do so by writing new data that reverses the action taken by
-        the transaction.
-        """
-
-        if self._is_read_only:
-            raise POSException.ReadOnlyError()
-        if transaction is not self._transaction:
-            raise POSException.StorageTransactionError(self, transaction)
-
-        undo_tid = base64.decodestring(transaction_id + '\n')
-        assert len(undo_tid) == 8
-        undo_tid_int = u64(undo_tid)
-
-        self._lock_acquire()
-        try:
-            self._prepare_tid()
-            self_tid_int = u64(self._tid)
-            cursor = self._store_cursor
-            assert cursor is not None
-            adapter = self._adapter
-
-            adapter.verify_undoable(cursor, undo_tid_int)
-            oid_ints = adapter.undo(cursor, undo_tid_int, self_tid_int)
-            oids = [p64(oid_int) for oid_int in oid_ints]
-
-            # Update the current object pointers immediately, so that
-            # subsequent undo operations within this transaction will see
-            # the new current objects.
-            adapter.update_current(cursor, self_tid_int)
-
-            return self._tid, oids
-
-        finally:
-            self._lock_release()
-
-
-    def pack(self, t, referencesf):
-        if self._is_read_only:
-            raise POSException.ReadOnlyError()
-
-        pack_point = repr(TimeStamp(*time.gmtime(t)[:5]+(t%60,)))
-        pack_point_int = u64(pack_point)
-
-        def get_references(state):
-            """Return the set of OIDs the given state refers to."""
-            refs = set()
-            if state:
-                for oid in referencesf(str(state)):
-                    refs.add(u64(oid))
-            return refs
-
-        # Use a private connection (lock_conn and lock_cursor) to
-        # hold the pack lock, while using a second private connection
-        # (conn and cursor) to decide exactly what to pack.
-        # Close the second connection.  Then, with the lock still held,
-        # perform the pack in a third connection opened by the adapter.
-        # This structure is designed to maximize the scalability
-        # of packing and minimize conflicts with concurrent writes.
-        # A consequence of this structure is that the adapter must
-        # not choke on transactions that may have been added between
-        # pre_pack and pack.
-        adapter = self._adapter
-        lock_conn, lock_cursor = adapter.open()
-        try:
-            adapter.hold_pack_lock(lock_cursor)
-
-            conn, cursor = adapter.open()
-            try:
-                try:
-                    # Find the latest commit before or at the pack time.
-                    tid_int = adapter.choose_pack_transaction(
-                        cursor, pack_point_int)
-                    if tid_int is None:
-                        # Nothing needs to be packed.
-                        # TODO: log the fact that nothing needs to be packed.
-                        return
-
-                    # In pre_pack, the adapter fills tables with information
-                    # about what to pack.  The adapter should not actually
-                    # pack anything yet.
-                    adapter.pre_pack(cursor, tid_int, get_references)
-                except:
-                    conn.rollback()
-                    raise
-                else:
-                    conn.commit()
-            finally:
-                adapter.close(conn, cursor)
-
-            # Now pack.  The adapter makes its own connection just for the
-            # pack operation, possibly using a special transaction mode
-            # and connection flags.
-            adapter.pack(tid_int)
-            self._after_pack()
-
-        finally:
-            lock_conn.rollback()
-            adapter.close(lock_conn, lock_cursor)
-
-
-    def _after_pack(self):
-        """Reset the transaction state after packing."""
-        # The tests depend on this.
-        self._rollback_load_connection()
-
-
-class BoundRelStorage(RelStorage):
-    """Storage to a database, bound to a particular ZODB.Connection."""
-
-    # The propagate_invalidations flag, set to a false value, tells
-    # the Connection not to propagate object invalidations across
-    # connections, since that ZODB feature is detrimental when the
-    # storage provides its own MVCC.
-    propagate_invalidations = False
-
-    def __init__(self, parent, zodb_conn):
-        # self._conn = conn
-        RelStorage.__init__(self, adapter=parent._adapter, name=parent._name,
-            read_only=parent._is_read_only, create=False)
-        # _prev_polled_tid contains the tid at the previous poll
-        self._prev_polled_tid = None
-        self._showed_disconnect = False
-
-    def poll_invalidations(self, retry=True):
-        """Looks for OIDs of objects that changed since _prev_polled_tid
-
-        Returns {oid: 1}, or None if all objects need to be invalidated
-        because prev_polled_tid is not in the database (presumably it
-        has been packed).
-        """
-        self._lock_acquire()
-        try:
-            if self._closed:
-                return {}
-            try:
-                self._rollback_load_connection()
-                self._start_load()
-                conn = self._load_conn
-                cursor = self._load_cursor
-
-                # Ignore changes made by the last transaction committed
-                # by this connection.
-                ignore_tid = None
-                if self._ltid is not None:
-                    ignore_tid = u64(self._ltid)
-
-                # get a list of changed OIDs and the most recent tid
-                oid_ints, new_polled_tid = self._adapter.poll_invalidations(
-                    conn, cursor, self._prev_polled_tid, ignore_tid)
-                self._prev_polled_tid = new_polled_tid
-
-                if oid_ints is None:
-                    oids = None
-                else:
-                    oids = {}
-                    for oid_int in oid_ints:
-                        oids[p64(oid_int)] = 1
-                return oids
-            except POSException.StorageError:
-                # disconnected
-                if not retry:
-                    raise
-                if not self._showed_disconnect:
-                    log.warning("Lost connection in %s", repr(self))
-                    self._showed_disconnect = True
-                self._open_load_connection()
-                log.info("Reconnected in %s", repr(self))
-                self._showed_disconnect = False
-                return self.poll_invalidations(retry=False)
-        finally:
-            self._lock_release()
-
-    def _after_pack(self):
-        # Disable transaction reset after packing.  The connection
-        # should call sync() to see the new state.
-        pass
-
-
-# very basic test... ought to be moved or deleted.
-def test():
-    import transaction
-    import pprint
-    from ZODB.DB import DB
-    from persistent.mapping import PersistentMapping
-    from relstorage.adapters.postgresql import PostgreSQLAdapter
-
-    adapter = PostgreSQLAdapter(params='dbname=relstoragetest')
-    storage = RelStorage(adapter)
-    db = DB(storage)
-
-    if True:
-        for i in range(100):
-            c = db.open()
-            c.root()['foo'] = PersistentMapping()
-            transaction.get().note('added %d' % i)
-            transaction.commit()
-            c.close()
-            print 'wrote', i
-
-        # undo 2 transactions, then redo them and undo the first again.
-        for i in range(2):
-            log = db.undoLog()
-            db.undo(log[0]['id'])
-            db.undo(log[1]['id'])
-            transaction.get().note('undone! (%d)' % i)
-            transaction.commit()
-            print 'undid', i
-
-    pprint.pprint(db.undoLog())
-    db.pack(time.time() - 0.1)
-    pprint.pprint(db.undoLog())
-    db.close()
-
-
-if __name__ == '__main__':
-    import logging
-    logging.basicConfig()
-    test()

Copied: relstorage/trunk/CHANGELOG.txt (from rev 83260, relstorage/CHANGELOG.txt)
===================================================================
--- relstorage/trunk/CHANGELOG.txt	                        (rev 0)
+++ relstorage/trunk/CHANGELOG.txt	2008-01-27 00:36:00 UTC (rev 83261)
@@ -0,0 +1,75 @@
+
+relstorage 0.9
+
+- Renamed to reflect expanding database support.
+
+- Support for Oracle added.
+
+- Major overhaul with many scalability and reliability improvements,
+  particularly in the area of packing.
+
+- Moved to svn.zope.org and switched to ZPL 2.1 (required for projects
+  on svn.zope.org.)
+
+
+PGStorage 0.4
+
+- Began using the PostgreSQL LISTEN and NOTIFY statements as a shortcut
+  for invalidation polling.
+
+- Removed the commit_order code.  The commit_order idea was intended to
+  allow concurrent commits, but that idea is a little too ambitious while
+  other more important ideas are being tested.  Something like it may
+  come later.
+
+- Improved connection management: only one database connection is
+  held continuously open per storage instance.
+
+- Reconnect to the database automatically.
+
+- Removed test mode.
+
+- Switched from using a ZODB.Connection subclass to a ZODB patch.  The
+  Connection class changes in subtle ways too often to subclass reliably;
+  a patch is much safer.
+
+- PostgreSQL 8.1 is now a dependency because PGStorage uses two phase commit.
+
+- Fixed an undo bug.  Symptom: attempting to examine the undo log revealed
+  broken pickles.  Cause: the extension field was not being wrapped in
+  psycopg2.Binary upon insert.  Solution: used psycopg2.Binary.
+  Unfortunately, this doesn't fix existing transactions people have
+  committed.  If anyone has any data to keep, fixing the old transactions
+  should be easy.
+
+- Moved from a private CVS repository to Sourceforge.
+  See http://pgstorage.sourceforge.net .  Also switched to the MIT license.
+
+- David Pratt added a basic getSize() implementation so that the Zope
+  management interface displays an estimate of the size of the database.
+
+- Turned PGStorage into a top-level package.  Python generally makes
+  top-level packages easier to install.
+
+
+PGStorage 0.3
+
+- Made compatible with Zope 3, although an undo bug apparently remains.
+
+
+PGStorage 0.2
+
+- Fixed concurrent commits, which were generating deadlocks.  Fixed by
+  adding a special table, "commit_lock", which is used for
+  synchronizing increments of commit_seq (but only at final commit.)
+  If you are upgrading from version 0.1, you need to change your
+  database using the 'psql' prompt:
+
+    create table commit_lock ();
+
+- Added speed tests and an OpenDocument spreadsheet comparing
+  FileStorage / ZEO with PGStorage.  PGStorage wins at reading objects
+  and writing a lot of small transactions, while FileStorage / ZEO
+  wins at writing big transactions.  Interestingly, they tie when
+  writing a RAM disk.
+

Copied: relstorage/trunk/README.txt (from rev 83260, relstorage/README.txt)
===================================================================
--- relstorage/trunk/README.txt	                        (rev 0)
+++ relstorage/trunk/README.txt	2008-01-27 00:36:00 UTC (rev 83261)
@@ -0,0 +1,15 @@
+
+To make Zope store in RelStorage, patch ZODB/Connection.py using the
+provided patch, then add the following to zope.conf, modifying the
+adapter and params lines to fit your needs.
+
+
+%import relstorage
+<zodb_db main>
+  mount-point /
+  <relstorage>
+    adapter postgresql
+    params host=localhost dbname=zodb user=zope password=...
+  </relstorage>
+</zodb_db>
+

Copied: relstorage/trunk/notes (from rev 83260, relstorage/notes)

Copied: relstorage/trunk/poll-invalidation-1-zodb-3-7-1.patch (from rev 83260, relstorage/poll-invalidation-1-zodb-3-7-1.patch)
===================================================================
--- relstorage/trunk/poll-invalidation-1-zodb-3-7-1.patch	                        (rev 0)
+++ relstorage/trunk/poll-invalidation-1-zodb-3-7-1.patch	2008-01-27 00:36:00 UTC (rev 83261)
@@ -0,0 +1,94 @@
+diff -r 34747fbd09ec Connection.py
+--- a/Connection.py	Tue Nov 20 21:57:31 2007 -0700
++++ b/Connection.py	Fri Jan 11 21:19:00 2008 -0700
+@@ -75,8 +75,14 @@ class Connection(ExportImport, object):
+         """Create a new Connection."""
+ 
+         self._db = db
+-        self._normal_storage = self._storage = db._storage
+-        self.new_oid = db._storage.new_oid
++        storage = db._storage
++        m = getattr(storage, 'bind_connection', None)
++        if m is not None:
++            # Use a storage instance bound to this connection.
++            storage = m(self)
++
++        self._normal_storage = self._storage = storage
++        self.new_oid = storage.new_oid
+         self._savepoint_storage = None
+ 
+         self.transaction_manager = self._synch = self._mvcc = None
+@@ -170,6 +176,12 @@ class Connection(ExportImport, object):
+         # Multi-database support
+         self.connections = {self._db.database_name: self}
+ 
++        # Allow the storage to decide whether invalidations should
++        # propagate between connections.  If the storage provides MVCC
++        # semantics, it is better to not propagate invalidations between
++        # connections.
++        self._propagate_invalidations = getattr(
++            self._storage, 'propagate_invalidations', True)
+ 
+     def add(self, obj):
+         """Add a new object 'obj' to the database and assign it an oid."""
+@@ -267,6 +279,11 @@ class Connection(ExportImport, object):
+             self.transaction_manager.unregisterSynch(self)
+             self._synch = None
+ 
++        # If the storage wants to know, tell it this connection is closing.
++        m = getattr(self._storage, 'connection_closing', None)
++        if m is not None:
++            m()
++
+         if primary:
+             for connection in self.connections.values():
+                 if connection is not self:
+@@ -295,6 +312,10 @@ class Connection(ExportImport, object):
+ 
+     def invalidate(self, tid, oids):
+         """Notify the Connection that transaction 'tid' invalidated oids."""
++        if not self._propagate_invalidations:
++            # The storage disabled inter-connection invalidation.
++            return
++
+         self._inv_lock.acquire()
+         try:
+             if self._txn_time is None:
+@@ -438,8 +459,23 @@ class Connection(ExportImport, object):
+         self._registered_objects = []
+         self._creating.clear()
+ 
++    def _poll_invalidations(self):
++        """Poll and process object invalidations provided by the storage.
++        """
++        m = getattr(self._storage, 'poll_invalidations', None)
++        if m is not None:
++            # Poll the storage for invalidations.
++            invalidated = m()
++            if invalidated is None:
++                # special value: the transaction is so old that
++                # we need to flush the whole cache.
++                self._cache.invalidate(self._cache.cache_data.keys())
++            elif invalidated:
++                self._cache.invalidate(invalidated)
++
+     # Process pending invalidations.
+     def _flush_invalidations(self):
++        self._poll_invalidations()
+         self._inv_lock.acquire()
+         try:
+             # Non-ghostifiable objects may need to read when they are
+diff -r 34747fbd09ec DB.py
+--- a/DB.py	Tue Nov 20 21:57:31 2007 -0700
++++ b/DB.py	Wed Nov 28 18:33:12 2007 -0700
+@@ -260,6 +260,10 @@ class DB(object):
+             storage.store(z64, None, file.getvalue(), '', t)
+             storage.tpc_vote(t)
+             storage.tpc_finish(t)
++        if hasattr(storage, 'connection_closing'):
++            # Let the storage release whatever resources it used for loading
++            # the root object.
++            storage.connection_closing()
+ 
+         # Multi-database setup.
+         if databases is None:

Copied: relstorage/trunk/relstorage/__init__.py (from rev 83260, relstorage/__init__.py)
===================================================================
--- relstorage/trunk/relstorage/__init__.py	                        (rev 0)
+++ relstorage/trunk/relstorage/__init__.py	2008-01-27 00:36:00 UTC (rev 83261)
@@ -0,0 +1,22 @@
+##############################################################################
+#
+# Copyright (c) 2008 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""relstorage package"""
+
+# perform a compatibility test
+from ZODB.Connection import Connection
+
+if not hasattr(Connection, '_poll_invalidations'):
+    raise ImportError('RelStorage requires the invalidation polling '
+        'patch for ZODB.')
+del Connection

Copied: relstorage/trunk/relstorage/adapters (from rev 83260, relstorage/adapters)

Copied: relstorage/trunk/relstorage/autotemp.py (from rev 83260, relstorage/autotemp.py)
===================================================================
--- relstorage/trunk/relstorage/autotemp.py	                        (rev 0)
+++ relstorage/trunk/relstorage/autotemp.py	2008-01-27 00:36:00 UTC (rev 83261)
@@ -0,0 +1,43 @@
+##############################################################################
+#
+# Copyright (c) 2008 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""A temporary file that switches from StringIO to TemporaryFile if needed.
+
+This could be a useful addition to Python's tempfile module.
+"""
+
+from cStringIO import StringIO
+import tempfile
+
+class AutoTemporaryFile:
+    """Initially a StringIO, but becomes a TemporaryFile if it grows too big"""
+    def __init__(self, threshold=10485760):
+        self._threshold = threshold
+        self._f = f = StringIO()
+        # delegate most methods
+        for m in ('read', 'readline', 'seek', 'tell', 'close'):
+            setattr(self, m, getattr(f, m))
+
+    def write(self, data):
+        threshold = self._threshold
+        if threshold > 0 and self.tell() + len(data) >= threshold:
+            # convert to TemporaryFile
+            f = tempfile.TemporaryFile()
+            f.write(self._f.getvalue())
+            f.seek(self.tell())
+            self._f = f
+            self._threshold = 0
+            # delegate all important methods
+            for m in ('write', 'read', 'readline', 'seek', 'tell', 'close'):
+                setattr(self, m, getattr(f, m))
+        self._f.write(data)

Copied: relstorage/trunk/relstorage/component.xml (from rev 83260, relstorage/component.xml)
===================================================================
--- relstorage/trunk/relstorage/component.xml	                        (rev 0)
+++ relstorage/trunk/relstorage/component.xml	2008-01-27 00:36:00 UTC (rev 83261)
@@ -0,0 +1,64 @@
+<?xml version="1.0"?>
+
+<!-- RelStorage configuration via ZConfig -->
+
+<component prefix="relstorage.config">
+
+  <import package="ZODB"/>
+  <abstracttype name="relstorage.adapter"/>
+
+  <sectiontype name="relstorage" implements="ZODB.storage"
+      datatype=".RelStorageFactory">
+    <section type="relstorage.adapter" name="*" attribute="adapter"/>
+    <key name="name" default="RelStorage"/>
+    <key name="create" datatype="boolean" default="true">
+      <description>
+        Flag that indicates whether the storage should be initialized if
+        it does not already exist.
+      </description>
+    </key>
+    <key name="read-only" datatype="boolean" default="false">
+      <description>
+        If true, only reads may be executed against the storage.  Note
+        that the "pack" operation is not considered a write operation
+        and is still allowed on a read-only filestorage.
+      </description>
+    </key>
+  </sectiontype>
+
+  <sectiontype name="postgresql" implements="relstorage.adapter"
+    datatype=".PostgreSQLAdapterFactory">
+    <key name="dsn" datatype="string" required="no" default="">
+      <description>
+        The PostgreSQL data source name.  For example:
+
+        dsn dbname='template1' user='user' host='localhost' password='pass'
+
+        If dsn is omitted, the adapter will connect to a local database with
+        no password.  Both the user and database name will match the
+        name of the owner of the current process.
+      </description>
+    </key>
+  </sectiontype>
+
+  <sectiontype name="oracle" implements="relstorage.adapter"
+    datatype=".OracleAdapterFactory">
+    <key name="user" datatype="string" required="yes">
+      <description>
+        The Oracle account name
+      </description>
+    </key>
+    <key name="password" datatype="string" required="yes">
+      <description>
+        The Oracle account password
+      </description>
+    </key>
+    <key name="dsn" datatype="string" required="yes">
+      <description>
+        The Oracle data source name.  The Oracle client library will
+        normally expect to find the DSN in /etc/oratab.
+      </description>
+    </key>
+  </sectiontype>
+
+</component>

Copied: relstorage/trunk/relstorage/config.py (from rev 83260, relstorage/config.py)
===================================================================
--- relstorage/trunk/relstorage/config.py	                        (rev 0)
+++ relstorage/trunk/relstorage/config.py	2008-01-27 00:36:00 UTC (rev 83261)
@@ -0,0 +1,41 @@
+##############################################################################
+#
+# Copyright (c) 2008 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""ZConfig directive implementations for binding RelStorage to Zope"""
+
+from ZODB.config import BaseConfig
+
+from relstorage import RelStorage
+
+
+class RelStorageFactory(BaseConfig):
+    """Open a storage configured via ZConfig"""
+    def open(self):
+        config = self.config
+        adapter = config.adapter.open()
+        return RelStorage(adapter, name=config.name, create=config.create,
+            read_only=config.read_only)
+
+
+class PostgreSQLAdapterFactory(BaseConfig):
+    def open(self):
+        from adapters.postgresql import PostgreSQLAdapter
+        return PostgreSQLAdapter(self.config.dsn)
+
+
+class OracleAdapterFactory(BaseConfig):
+    def open(self):
+        from adapters.oracle import OracleAdapter
+        config = self.config
+        return OracleAdapter(config.user, config.password, config.dsn)
+

Copied: relstorage/trunk/relstorage/relstorage.py (from rev 83260, relstorage/relstorage.py)
===================================================================
--- relstorage/trunk/relstorage/relstorage.py	                        (rev 0)
+++ relstorage/trunk/relstorage/relstorage.py	2008-01-27 00:36:00 UTC (rev 83261)
@@ -0,0 +1,804 @@
+##############################################################################
+#
+# Copyright (c) 2008 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""The core of RelStorage, a ZODB storage for relational databases.
+
+Stores pickles in the database.
+"""
+
+import base64
+import cPickle
+import logging
+import md5
+import os
+import time
+import weakref
+from ZODB.utils import p64, u64, z64
+from ZODB.BaseStorage import BaseStorage
+from ZODB import ConflictResolution, POSException
+from persistent.TimeStamp import TimeStamp
+
+from autotemp import AutoTemporaryFile
+
+log = logging.getLogger("relstorage")
+
+# Set the RELSTORAGE_ABORT_EARLY environment variable when debugging
+# a failure revealed by the ZODB test suite.  The test suite often fails
+# to call tpc_abort in the event of an error, leading to deadlocks.
+# The variable causes RelStorage to abort failed transactions
+# early rather than wait for an explicit abort.
+abort_early = os.environ.get('RELSTORAGE_ABORT_EARLY')
+
+
+class RelStorage(BaseStorage,
+                ConflictResolution.ConflictResolvingStorage):
+    """Storage to a relational database, based on invalidation polling"""
+
+    def __init__(self, adapter, name='RelStorage', create=True,
+            read_only=False):
+        self._adapter = adapter
+        self._name = name
+        self._is_read_only = read_only
+
+        # load_conn and load_cursor are always open
+        self._load_conn = None
+        self._load_cursor = None
+        self._load_started = False
+        self._open_load_connection()
+        # store_conn and store_cursor are open only during commit
+        self._store_conn = None
+        self._store_cursor = None
+
+        if create:
+            self._adapter.prepare_schema()
+
+        BaseStorage.__init__(self, name)
+
+        self._tid = None
+        self._ltid = None
+
+        # _tbuf is a temporary file that contains pickles of data to be
+        # committed.  _pickler writes pickles to that file.  _stored_oids
+        # is a list of integer OIDs to be stored.
+        self._tbuf = None
+        self._pickler = None
+        self._stored_oids = None
+
+        # _prepared_txn is the name of the transaction to commit in the
+        # second phase.
+        self._prepared_txn = None
+
+        # _instances is a list of weak references to storage instances bound
+        # to the same database.
+        self._instances = []
+
+        # _closed is True after self.close() is called.  Since close()
+        # can be called from another thread, access to self._closed should
+        # be inside a _lock_acquire()/_lock_release() block.
+        self._closed = False
+
+    def _open_load_connection(self):
+        """Open the load connection to the database.  Return nothing."""
+        conn, cursor = self._adapter.open_for_load()
+        self._drop_load_connection()
+        self._load_conn, self._load_cursor = conn, cursor
+        self._load_started = True
+
+    def _drop_load_connection(self):
+        conn, cursor = self._load_conn, self._load_cursor
+        self._load_conn, self._load_cursor = None, None
+        self._adapter.close(conn, cursor)
+
+    def _drop_store_connection(self):
+        conn, cursor = self._store_conn, self._store_cursor
+        self._store_conn, self._store_cursor = None, None
+        self._adapter.close(conn, cursor)
+
+    def _rollback_load_connection(self):
+        if self._load_conn is not None:
+            self._load_started = False
+            self._load_conn.rollback()
+
+    def _start_load(self):
+        if self._load_cursor is None:
+            self._open_load_connection()
+        else:
+            self._adapter.restart_load(self._load_cursor)
+            self._load_started = True
+
+    def _zap(self):
+        """Clear all objects out of the database.
+
+        Used by the test suite.
+        """
+        self._adapter.zap()
+        self._rollback_load_connection()
+
+    def close(self):
+        """Close the connections to the database."""
+        self._lock_acquire()
+        try:
+            self._closed = True
+            self._drop_load_connection()
+            self._drop_store_connection()
+            for wref in self._instances:
+                instance = wref()
+                if instance is not None:
+                    instance.close()
+        finally:
+            self._lock_release()
+
+    def bind_connection(self, zodb_conn):
+        """Get a connection-bound storage instance.
+
+        Connections have their own storage instances so that
+        the database can provide the MVCC semantics rather than ZODB.
+        """
+        res = BoundRelStorage(self, zodb_conn)
+        self._instances.append(weakref.ref(res))
+        return res
+
+    def connection_closing(self):
+        """Release resources."""
+        self._rollback_load_connection()
+
+    def __len__(self):
+        return self._adapter.get_object_count()
+
+    def getSize(self):
+        """Return database size in bytes"""
+        return self._adapter.get_db_size()
+
+    def load(self, oid, version):
+        self._lock_acquire()
+        try:
+            if not self._load_started:
+                self._start_load()
+            cursor = self._load_cursor
+            state, tid_int = self._adapter.load_current(cursor, u64(oid))
+        finally:
+            self._lock_release()
+        if tid_int is not None:
+            if state:
+                state = str(state)
+            if not state:
+                # This can happen if something attempts to load
+                # an object whose creation has been undone.
+                raise KeyError(oid)
+            return state, p64(tid_int)
+        else:
+            raise KeyError(oid)
+
+    def loadEx(self, oid, version):
+        # Since we don't support versions, just tack the empty version
+        # string onto load's result.
+        return self.load(oid, version) + ("",)
+
+    def loadSerial(self, oid, serial):
+        """Load a specific revision of an object"""
+        self._lock_acquire()
+        try:
+            if self._store_cursor is not None:
+                # Allow loading data from later transactions
+                # for conflict resolution.
+                cursor = self._store_cursor
+            else:
+                if not self._load_started:
+                    self._start_load()
+                cursor = self._load_cursor
+            state = self._adapter.load_revision(cursor, u64(oid), u64(serial))
+            if state is not None:
+                state = str(state)
+                if not state:
+                    raise POSKeyError(oid)
+                return state
+            else:
+                raise KeyError(oid)
+        finally:
+            self._lock_release()
+
+    def loadBefore(self, oid, tid):
+        """Return the most recent revision of oid before tid committed."""
+        oid_int = u64(oid)
+
+        self._lock_acquire()
+        try:
+            if self._store_cursor is not None:
+                # Allow loading data from later transactions
+                # for conflict resolution.
+                cursor = self._store_cursor
+            else:
+                if not self._load_started:
+                    self._start_load()
+                cursor = self._load_cursor
+            if not self._adapter.exists(cursor, u64(oid)):
+                raise KeyError(oid)
+
+            state, start_tid = self._adapter.load_before(
+                cursor, oid_int, u64(tid))
+            if start_tid is not None:
+                end_int = self._adapter.get_object_tid_after(
+                    cursor, oid_int, start_tid)
+                if end_int is not None:
+                    end = p64(end_int)
+                else:
+                    end = None
+                if state is not None:
+                    state = str(state)
+                return state, p64(start_tid), end
+            else:
+                return None
+        finally:
+            self._lock_release()
+
+
+    def store(self, oid, serial, data, version, transaction):
+        if self._is_read_only:
+            raise POSException.ReadOnlyError()
+        if transaction is not self._transaction:
+            raise POSException.StorageTransactionError(self, transaction)
+        if version:
+            raise POSException.Unsupported("Versions aren't supported")
+
+        # If self._prepared_txn is not None, that means something is
+        # attempting to store objects after the vote phase has finished.
+        # That should not happen, should it?
+        assert self._prepared_txn is None
+        md5sum = md5.new(data).hexdigest()
+
+        self._lock_acquire()
+        try:
+            # buffer the stores
+            if self._tbuf is None:
+                self._tbuf = AutoTemporaryFile()
+                self._pickler = cPickle.Pickler(
+                    self._tbuf, cPickle.HIGHEST_PROTOCOL)
+                self._stored_oids = []
+
+            self._pickler.dump((oid, serial, md5sum, data))
+            self._stored_oids.append(u64(oid))
+            return None
+        finally:
+            self._lock_release()
+
+    def tpc_begin(self, transaction, tid=None, status=' '):
+        if self._is_read_only:
+            raise POSException.ReadOnlyError()
+        self._lock_acquire()
+        try:
+            if self._transaction is transaction:
+                return
+            self._lock_release()
+            self._commit_lock_acquire()
+            self._lock_acquire()
+            self._transaction = transaction
+            self._clear_temp()
+
+            user = transaction.user
+            desc = transaction.description
+            ext = transaction._extension
+            if ext:
+                ext = cPickle.dumps(ext, 1)
+            else:
+                ext = ""
+            self._ude = user, desc, ext
+            self._tstatus = status
+
+            if tid is not None:
+                # get the commit lock and add the transaction now
+                adapter = self._adapter
+                conn, cursor = adapter.open_for_commit()
+                self._store_conn, self._store_cursor = conn, cursor
+                tid_int = u64(tid)
+                try:
+                    adapter.add_transaction(cursor, tid_int, user, desc, ext)
+                except:
+                    self._drop_store_connection()
+                    raise
+            # else choose the tid later
+            self._tid = tid
+
+        finally:
+            self._lock_release()
+
+    def _prepare_tid(self):
+        """Choose a tid for the current transaction.
+
+        This should be done as late in the commit as possible, since
+        it must hold an exclusive commit lock.
+        """
+        if self._tid is not None:
+            return
+        if self._transaction is None:
+            raise POSException.StorageError("No transaction in progress")
+
+        adapter = self._adapter
+        conn, cursor = adapter.open_for_commit()
+        self._store_conn, self._store_cursor = conn, cursor
+        user, desc, ext = self._ude
+
+        attempt = 1
+        while True:
+            try:
+                # Choose a transaction ID.
+                # Base the transaction ID on the database time,
+                # while ensuring that the tid of this transaction
+                # is greater than any existing tid.
+                last_tid, now = adapter.get_tid_and_time(cursor)
+                stamp = TimeStamp(*(time.gmtime(now)[:5] + (now % 60,)))
+                stamp = stamp.laterThan(TimeStamp(p64(last_tid)))
+                tid = repr(stamp)
+
+                tid_int = u64(tid)
+                adapter.add_transaction(cursor, tid_int, user, desc, ext)
+                self._tid = tid
+                break
+
+            except POSException.ConflictError:
+                if attempt < 3:
+                    # A concurrent transaction claimed the tid.
+                    # Rollback and try again.
+                    adapter.restart_commit(cursor)
+                    attempt += 1
+                else:
+                    self._drop_store_connection()
+                    raise
+            except:
+                self._drop_store_connection()
+                raise
+
+
+    def _clear_temp(self):
+        # It is assumed that self._lock_acquire was called before this
+        # method was called.
+        self._prepared_txn = None
+        tbuf = self._tbuf
+        if tbuf is not None:
+            self._tbuf = None
+            self._pickler = None
+            self._stored_oids = None
+            tbuf.close()
+
+
+    def _send_stored(self):
+        """Send the buffered pickles to the database.
+
+        Returns a list of (oid, tid) to be received by
+        Connection._handle_serial().
+        """
+        cursor = self._store_cursor
+        adapter = self._adapter
+        tid_int = u64(self._tid)
+        self._pickler = None  # Don't allow any more store operations
+        self._tbuf.seek(0)
+        unpickler = cPickle.Unpickler(self._tbuf)
+
+        serials = []  # [(oid, serial)]
+        prev_tids = adapter.get_object_tids(cursor, self._stored_oids)
+
+        while True:
+            try:
+                oid, serial, md5sum, data = unpickler.load()
+            except EOFError:
+                break
+            oid_int = u64(oid)
+            if not serial:
+                serial = z64
+
+            prev_tid_int = prev_tids.get(oid_int)
+            if prev_tid_int is None:
+                prev_tid_int = 0
+                prev_tid = z64
+            else:
+                prev_tid = p64(prev_tid_int)
+                if prev_tid != serial:
+                    # conflict detected
+                    rdata = self.tryToResolveConflict(
+                        oid, prev_tid, serial, data)
+                    if rdata is None:
+                        raise POSException.ConflictError(
+                            oid=oid, serials=(prev_tid, serial), data=data)
+                    else:
+                        data = rdata
+                        md5sum = md5.new(data).hexdigest()
+
+            self._adapter.store(
+                cursor, oid_int, tid_int, prev_tid_int, md5sum, data)
+
+            if prev_tid and serial != prev_tid:
+                serials.append((oid, ConflictResolution.ResolvedSerial))
+            else:
+                serials.append((oid, self._tid))
+
+        return serials
+
+
+    def _vote(self):
+        """Prepare the transaction for final commit."""
+        # This method initiates a two-phase commit process,
+        # saving the name of the prepared transaction in self._prepared_txn.
+
+        # It is assumed that self._lock_acquire was called before this
+        # method was called.
+
+        if self._prepared_txn is not None:
+            # the vote phase has already completed
+            return
+
+        self._prepare_tid()
+        tid_int = u64(self._tid)
+        cursor = self._store_cursor
+        assert cursor is not None
+
+        if self._tbuf is not None:
+            serials = self._send_stored()
+        else:
+            serials = []
+
+        self._adapter.update_current(cursor, tid_int)
+        self._prepared_txn = self._adapter.commit_phase1(cursor, tid_int)
+        # From the point of view of self._store_cursor,
+        # it now looks like the transaction has been rolled back.
+
+        return serials
+
+
+    def tpc_vote(self, transaction):
+        self._lock_acquire()
+        try:
+            if transaction is not self._transaction:
+                return
+            try:
+                return self._vote()
+            except:
+                if abort_early:
+                    # abort early to avoid lockups while running the
+                    # somewhat brittle ZODB test suite
+                    self.tpc_abort(transaction)
+                raise
+        finally:
+            self._lock_release()
+
+
+    def _finish(self, tid, user, desc, ext):
+        """Commit the transaction."""
+        # It is assumed that self._lock_acquire was called before this
+        # method was called.
+        assert self._tid is not None
+        self._rollback_load_connection()
+        txn = self._prepared_txn
+        assert txn is not None
+        self._adapter.commit_phase2(self._store_cursor, txn)
+        self._drop_store_connection()
+        self._prepared_txn = None
+        self._ltid = self._tid
+        self._tid = None
+
+    def _abort(self):
+        # the lock is held here
+        self._rollback_load_connection()
+        if self._store_cursor is not None:
+            self._adapter.abort(self._store_cursor, self._prepared_txn)
+        self._prepared_txn = None
+        self._drop_store_connection()
+        self._tid = None
+
+    def lastTransaction(self):
+        return self._ltid
+
+    def new_oid(self):
+        if self._is_read_only:
+            raise POSException.ReadOnlyError()
+        self._lock_acquire()
+        try:
+            cursor = self._load_cursor
+            if cursor is None:
+                self._open_load_connection()
+                cursor = self._load_cursor
+            oid_int = self._adapter.new_oid(cursor)
+            return p64(oid_int)
+        finally:
+            self._lock_release()
+
+    def supportsUndo(self):
+        return True
+
+    def supportsTransactionalUndo(self):
+        return True
+
+    def undoLog(self, first=0, last=-20, filter=None):
+        if last < 0:
+            last = first - last
+
+        # use a private connection to ensure the most current results
+        adapter = self._adapter
+        conn, cursor = adapter.open()
+        try:
+            rows = adapter.iter_transactions(cursor)
+            i = 0
+            res = []
+            for tid_int, user, desc, ext in rows:
+                tid = p64(tid_int)
+                d = {'id': base64.encodestring(tid)[:-1],
+                     'time': TimeStamp(tid).timeTime(),
+                     'user_name': user,
+                     'description': desc}
+                if ext:
+                    ext = str(ext)
+                if ext:
+                    d.update(cPickle.loads(ext))
+                if filter is None or filter(d):
+                    if i >= first:
+                        res.append(d)
+                    i += 1
+                    if i >= last:
+                        break
+            return res
+
+        finally:
+            adapter.close(conn, cursor)
+
+    def history(self, oid, version=None, size=1, filter=None):
+        self._lock_acquire()
+        try:
+            cursor = self._load_cursor
+            oid_int = u64(oid)
+            try:
+                rows = self._adapter.iter_object_history(cursor, oid_int)
+            except KeyError:
+                raise KeyError(oid)
+
+            res = []
+            for tid_int, username, description, extension, length in rows:
+                tid = p64(tid_int)
+                if extension:
+                    d = loads(extension)
+                else:
+                    d = {}
+                d.update({"time": TimeStamp(tid).timeTime(),
+                          "user_name": username,
+                          "description": description,
+                          "tid": tid,
+                          "version": '',
+                          "size": length,
+                          })
+                if filter is None or filter(d):
+                    res.append(d)
+                    if size is not None and len(res) >= size:
+                        break
+            return res
+        finally:
+            self._lock_release()
+
+
+    def undo(self, transaction_id, transaction):
+        """Undo a transaction identified by transaction_id.
+
+        Do so by writing new data that reverses the action taken by
+        the transaction.
+        """
+
+        if self._is_read_only:
+            raise POSException.ReadOnlyError()
+        if transaction is not self._transaction:
+            raise POSException.StorageTransactionError(self, transaction)
+
+        undo_tid = base64.decodestring(transaction_id + '\n')
+        assert len(undo_tid) == 8
+        undo_tid_int = u64(undo_tid)
+
+        self._lock_acquire()
+        try:
+            self._prepare_tid()
+            self_tid_int = u64(self._tid)
+            cursor = self._store_cursor
+            assert cursor is not None
+            adapter = self._adapter
+
+            adapter.verify_undoable(cursor, undo_tid_int)
+            oid_ints = adapter.undo(cursor, undo_tid_int, self_tid_int)
+            oids = [p64(oid_int) for oid_int in oid_ints]
+
+            # Update the current object pointers immediately, so that
+            # subsequent undo operations within this transaction will see
+            # the new current objects.
+            adapter.update_current(cursor, self_tid_int)
+
+            return self._tid, oids
+
+        finally:
+            self._lock_release()
+
+
+    def pack(self, t, referencesf):
+        if self._is_read_only:
+            raise POSException.ReadOnlyError()
+
+        pack_point = repr(TimeStamp(*time.gmtime(t)[:5]+(t%60,)))
+        pack_point_int = u64(pack_point)
+
+        def get_references(state):
+            """Return the set of OIDs the given state refers to."""
+            refs = set()
+            if state:
+                for oid in referencesf(str(state)):
+                    refs.add(u64(oid))
+            return refs
+
+        # Use a private connection (lock_conn and lock_cursor) to
+        # hold the pack lock, while using a second private connection
+        # (conn and cursor) to decide exactly what to pack.
+        # Close the second connection.  Then, with the lock still held,
+        # perform the pack in a third connection opened by the adapter.
+        # This structure is designed to maximize the scalability
+        # of packing and minimize conflicts with concurrent writes.
+        # A consequence of this structure is that the adapter must
+        # not choke on transactions that may have been added between
+        # pre_pack and pack.
+        adapter = self._adapter
+        lock_conn, lock_cursor = adapter.open()
+        try:
+            adapter.hold_pack_lock(lock_cursor)
+
+            conn, cursor = adapter.open()
+            try:
+                try:
+                    # Find the latest commit before or at the pack time.
+                    tid_int = adapter.choose_pack_transaction(
+                        cursor, pack_point_int)
+                    if tid_int is None:
+                        # Nothing needs to be packed.
+                        # TODO: log the fact that nothing needs to be packed.
+                        return
+
+                    # In pre_pack, the adapter fills tables with information
+                    # about what to pack.  The adapter should not actually
+                    # pack anything yet.
+                    adapter.pre_pack(cursor, tid_int, get_references)
+                except:
+                    conn.rollback()
+                    raise
+                else:
+                    conn.commit()
+            finally:
+                adapter.close(conn, cursor)
+
+            # Now pack.  The adapter makes its own connection just for the
+            # pack operation, possibly using a special transaction mode
+            # and connection flags.
+            adapter.pack(tid_int)
+            self._after_pack()
+
+        finally:
+            lock_conn.rollback()
+            adapter.close(lock_conn, lock_cursor)
+
+
+    def _after_pack(self):
+        """Reset the transaction state after packing."""
+        # The tests depend on this.
+        self._rollback_load_connection()
+
+
+class BoundRelStorage(RelStorage):
+    """Storage to a database, bound to a particular ZODB.Connection."""
+
+    # The propagate_invalidations flag, set to a false value, tells
+    # the Connection not to propagate object invalidations across
+    # connections, since that ZODB feature is detrimental when the
+    # storage provides its own MVCC.
+    propagate_invalidations = False
+
+    def __init__(self, parent, zodb_conn):
+        # self._conn = conn
+        RelStorage.__init__(self, adapter=parent._adapter, name=parent._name,
+            read_only=parent._is_read_only, create=False)
+        # _prev_polled_tid contains the tid at the previous poll
+        self._prev_polled_tid = None
+        self._showed_disconnect = False
+
+    def poll_invalidations(self, retry=True):
+        """Looks for OIDs of objects that changed since _prev_polled_tid
+
+        Returns {oid: 1}, or None if all objects need to be invalidated
+        because prev_polled_tid is not in the database (presumably it
+        has been packed).
+        """
+        self._lock_acquire()
+        try:
+            if self._closed:
+                return {}
+            try:
+                self._rollback_load_connection()
+                self._start_load()
+                conn = self._load_conn
+                cursor = self._load_cursor
+
+                # Ignore changes made by the last transaction committed
+                # by this connection.
+                ignore_tid = None
+                if self._ltid is not None:
+                    ignore_tid = u64(self._ltid)
+
+                # get a list of changed OIDs and the most recent tid
+                oid_ints, new_polled_tid = self._adapter.poll_invalidations(
+                    conn, cursor, self._prev_polled_tid, ignore_tid)
+                self._prev_polled_tid = new_polled_tid
+
+                if oid_ints is None:
+                    oids = None
+                else:
+                    oids = {}
+                    for oid_int in oid_ints:
+                        oids[p64(oid_int)] = 1
+                return oids
+            except POSException.StorageError:
+                # disconnected
+                if not retry:
+                    raise
+                if not self._showed_disconnect:
+                    log.warning("Lost connection in %s", repr(self))
+                    self._showed_disconnect = True
+                self._open_load_connection()
+                log.info("Reconnected in %s", repr(self))
+                self._showed_disconnect = False
+                return self.poll_invalidations(retry=False)
+        finally:
+            self._lock_release()
+
+    def _after_pack(self):
+        # Disable transaction reset after packing.  The connection
+        # should call sync() to see the new state.
+        pass
+
+
+# very basic test... ought to be moved or deleted.
+def test():
+    import transaction
+    import pprint
+    from ZODB.DB import DB
+    from persistent.mapping import PersistentMapping
+    from relstorage.adapters.postgresql import PostgreSQLAdapter
+
+    adapter = PostgreSQLAdapter(params='dbname=relstoragetest')
+    storage = RelStorage(adapter)
+    db = DB(storage)
+
+    if True:
+        for i in range(100):
+            c = db.open()
+            c.root()['foo'] = PersistentMapping()
+            transaction.get().note('added %d' % i)
+            transaction.commit()
+            c.close()
+            print 'wrote', i
+
+        # undo 2 transactions, then redo them and undo the first again.
+        for i in range(2):
+            log = db.undoLog()
+            db.undo(log[0]['id'])
+            db.undo(log[1]['id'])
+            transaction.get().note('undone! (%d)' % i)
+            transaction.commit()
+            print 'undid', i
+
+    pprint.pprint(db.undoLog())
+    db.pack(time.time() - 0.1)
+    pprint.pprint(db.undoLog())
+    db.close()
+
+
+if __name__ == '__main__':
+    import logging
+    logging.basicConfig()
+    test()

Copied: relstorage/trunk/relstorage/tests (from rev 83260, relstorage/tests)



More information about the Checkins mailing list