[Checkins] SVN: relstorage/ moved to trunk
Shane Hathaway
shane at hathawaymix.org
Sat Jan 26 19:36:01 EST 2008
Log message for revision 83261:
moved to trunk
Changed:
D relstorage/CHANGELOG.txt
D relstorage/README.txt
D relstorage/__init__.py
D relstorage/adapters/
D relstorage/autotemp.py
D relstorage/component.xml
D relstorage/config.py
D relstorage/notes/
D relstorage/poll-invalidation-1-zodb-3-7-1.patch
D relstorage/relstorage.py
D relstorage/tests/
A relstorage/trunk/
A relstorage/trunk/CHANGELOG.txt
A relstorage/trunk/README.txt
A relstorage/trunk/notes/
A relstorage/trunk/poll-invalidation-1-zodb-3-7-1.patch
A relstorage/trunk/relstorage/
A relstorage/trunk/relstorage/__init__.py
A relstorage/trunk/relstorage/adapters/
A relstorage/trunk/relstorage/autotemp.py
A relstorage/trunk/relstorage/component.xml
A relstorage/trunk/relstorage/config.py
A relstorage/trunk/relstorage/relstorage.py
A relstorage/trunk/relstorage/tests/
-=-
Deleted: relstorage/CHANGELOG.txt
===================================================================
--- relstorage/CHANGELOG.txt 2008-01-27 00:27:45 UTC (rev 83260)
+++ relstorage/CHANGELOG.txt 2008-01-27 00:36:00 UTC (rev 83261)
@@ -1,75 +0,0 @@
-
-relstorage 0.9
-
-- Renamed to reflect expanding database support.
-
-- Support for Oracle added.
-
-- Major overhaul with many scalability and reliability improvements,
- particularly in the area of packing.
-
-- Moved to svn.zope.org and switched to ZPL 2.1 (required for projects
- on svn.zope.org.)
-
-
-PGStorage 0.4
-
-- Began using the PostgreSQL LISTEN and NOTIFY statements as a shortcut
- for invalidation polling.
-
-- Removed the commit_order code. The commit_order idea was intended to
- allow concurrent commits, but that idea is a little too ambitious while
- other more important ideas are being tested. Something like it may
- come later.
-
-- Improved connection management: only one database connection is
- held continuously open per storage instance.
-
-- Reconnect to the database automatically.
-
-- Removed test mode.
-
-- Switched from using a ZODB.Connection subclass to a ZODB patch. The
- Connection class changes in subtle ways too often to subclass reliably;
- a patch is much safer.
-
-- PostgreSQL 8.1 is now a dependency because PGStorage uses two phase commit.
-
-- Fixed an undo bug. Symptom: attempting to examine the undo log revealed
- broken pickles. Cause: the extension field was not being wrapped in
- psycopg2.Binary upon insert. Solution: used psycopg2.Binary.
- Unfortunately, this doesn't fix existing transactions people have
- committed. If anyone has any data to keep, fixing the old transactions
- should be easy.
-
-- Moved from a private CVS repository to Sourceforge.
- See http://pgstorage.sourceforge.net . Also switched to the MIT license.
-
-- David Pratt added a basic getSize() implementation so that the Zope
- management interface displays an estimate of the size of the database.
-
-- Turned PGStorage into a top-level package. Python generally makes
- top-level packages easier to install.
-
-
-PGStorage 0.3
-
-- Made compatible with Zope 3, although an undo bug apparently remains.
-
-
-PGStorage 0.2
-
-- Fixed concurrent commits, which were generating deadlocks. Fixed by
- adding a special table, "commit_lock", which is used for
- synchronizing increments of commit_seq (but only at final commit.)
- If you are upgrading from version 0.1, you need to change your
- database using the 'psql' prompt:
-
- create table commit_lock ();
-
-- Added speed tests and an OpenDocument spreadsheet comparing
- FileStorage / ZEO with PGStorage. PGStorage wins at reading objects
- and writing a lot of small transactions, while FileStorage / ZEO
- wins at writing big transactions. Interestingly, they tie when
- writing a RAM disk.
-
Deleted: relstorage/README.txt
===================================================================
--- relstorage/README.txt 2008-01-27 00:27:45 UTC (rev 83260)
+++ relstorage/README.txt 2008-01-27 00:36:00 UTC (rev 83261)
@@ -1,15 +0,0 @@
-
-To make Zope store in RelStorage, patch ZODB/Connection.py using the
-provided patch, then add the following to zope.conf, modifying the
-adapter and params lines to fit your needs.
-
-
-%import relstorage
-<zodb_db main>
- mount-point /
- <relstorage>
- adapter postgresql
- params host=localhost dbname=zodb user=zope password=...
- </relstorage>
-</zodb_db>
-
Deleted: relstorage/__init__.py
===================================================================
--- relstorage/__init__.py 2008-01-27 00:27:45 UTC (rev 83260)
+++ relstorage/__init__.py 2008-01-27 00:36:00 UTC (rev 83261)
@@ -1,22 +0,0 @@
-##############################################################################
-#
-# Copyright (c) 2008 Zope Corporation and Contributors.
-# All Rights Reserved.
-#
-# This software is subject to the provisions of the Zope Public License,
-# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
-# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
-# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
-# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
-# FOR A PARTICULAR PURPOSE.
-#
-##############################################################################
-"""relstorage package"""
-
-# perform a compatibility test
-from ZODB.Connection import Connection
-
-if not hasattr(Connection, '_poll_invalidations'):
- raise ImportError('RelStorage requires the invalidation polling '
- 'patch for ZODB.')
-del Connection
Deleted: relstorage/autotemp.py
===================================================================
--- relstorage/autotemp.py 2008-01-27 00:27:45 UTC (rev 83260)
+++ relstorage/autotemp.py 2008-01-27 00:36:00 UTC (rev 83261)
@@ -1,43 +0,0 @@
-##############################################################################
-#
-# Copyright (c) 2008 Zope Corporation and Contributors.
-# All Rights Reserved.
-#
-# This software is subject to the provisions of the Zope Public License,
-# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
-# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
-# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
-# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
-# FOR A PARTICULAR PURPOSE.
-#
-##############################################################################
-"""A temporary file that switches from StringIO to TemporaryFile if needed.
-
-This could be a useful addition to Python's tempfile module.
-"""
-
-from cStringIO import StringIO
-import tempfile
-
-class AutoTemporaryFile:
- """Initially a StringIO, but becomes a TemporaryFile if it grows too big"""
- def __init__(self, threshold=10485760):
- self._threshold = threshold
- self._f = f = StringIO()
- # delegate most methods
- for m in ('read', 'readline', 'seek', 'tell', 'close'):
- setattr(self, m, getattr(f, m))
-
- def write(self, data):
- threshold = self._threshold
- if threshold > 0 and self.tell() + len(data) >= threshold:
- # convert to TemporaryFile
- f = tempfile.TemporaryFile()
- f.write(self._f.getvalue())
- f.seek(self.tell())
- self._f = f
- self._threshold = 0
- # delegate all important methods
- for m in ('write', 'read', 'readline', 'seek', 'tell', 'close'):
- setattr(self, m, getattr(f, m))
- self._f.write(data)
Deleted: relstorage/component.xml
===================================================================
--- relstorage/component.xml 2008-01-27 00:27:45 UTC (rev 83260)
+++ relstorage/component.xml 2008-01-27 00:36:00 UTC (rev 83261)
@@ -1,64 +0,0 @@
-<?xml version="1.0"?>
-
-<!-- RelStorage configuration via ZConfig -->
-
-<component prefix="relstorage.config">
-
- <import package="ZODB"/>
- <abstracttype name="relstorage.adapter"/>
-
- <sectiontype name="relstorage" implements="ZODB.storage"
- datatype=".RelStorageFactory">
- <section type="relstorage.adapter" name="*" attribute="adapter"/>
- <key name="name" default="RelStorage"/>
- <key name="create" datatype="boolean" default="true">
- <description>
- Flag that indicates whether the storage should be initialized if
- it does not already exist.
- </description>
- </key>
- <key name="read-only" datatype="boolean" default="false">
- <description>
- If true, only reads may be executed against the storage. Note
- that the "pack" operation is not considered a write operation
- and is still allowed on a read-only filestorage.
- </description>
- </key>
- </sectiontype>
-
- <sectiontype name="postgresql" implements="relstorage.adapter"
- datatype=".PostgreSQLAdapterFactory">
- <key name="dsn" datatype="string" required="no" default="">
- <description>
- The PostgreSQL data source name. For example:
-
- dsn dbname='template1' user='user' host='localhost' password='pass'
-
- If dsn is omitted, the adapter will connect to a local database with
- no password. Both the user and database name will match the
- name of the owner of the current process.
- </description>
- </key>
- </sectiontype>
-
- <sectiontype name="oracle" implements="relstorage.adapter"
- datatype=".OracleAdapterFactory">
- <key name="user" datatype="string" required="yes">
- <description>
- The Oracle account name
- </description>
- </key>
- <key name="password" datatype="string" required="yes">
- <description>
- The Oracle account password
- </description>
- </key>
- <key name="dsn" datatype="string" required="yes">
- <description>
- The Oracle data source name. The Oracle client library will
- normally expect to find the DSN in /etc/oratab.
- </description>
- </key>
- </sectiontype>
-
-</component>
Deleted: relstorage/config.py
===================================================================
--- relstorage/config.py 2008-01-27 00:27:45 UTC (rev 83260)
+++ relstorage/config.py 2008-01-27 00:36:00 UTC (rev 83261)
@@ -1,41 +0,0 @@
-##############################################################################
-#
-# Copyright (c) 2008 Zope Corporation and Contributors.
-# All Rights Reserved.
-#
-# This software is subject to the provisions of the Zope Public License,
-# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
-# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
-# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
-# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
-# FOR A PARTICULAR PURPOSE.
-#
-##############################################################################
-"""ZConfig directive implementations for binding RelStorage to Zope"""
-
-from ZODB.config import BaseConfig
-
-from relstorage import RelStorage
-
-
-class RelStorageFactory(BaseConfig):
- """Open a storage configured via ZConfig"""
- def open(self):
- config = self.config
- adapter = config.adapter.open()
- return RelStorage(adapter, name=config.name, create=config.create,
- read_only=config.read_only)
-
-
-class PostgreSQLAdapterFactory(BaseConfig):
- def open(self):
- from adapters.postgresql import PostgreSQLAdapter
- return PostgreSQLAdapter(self.config.dsn)
-
-
-class OracleAdapterFactory(BaseConfig):
- def open(self):
- from adapters.oracle import OracleAdapter
- config = self.config
- return OracleAdapter(config.user, config.password, config.dsn)
-
Deleted: relstorage/poll-invalidation-1-zodb-3-7-1.patch
===================================================================
--- relstorage/poll-invalidation-1-zodb-3-7-1.patch 2008-01-27 00:27:45 UTC (rev 83260)
+++ relstorage/poll-invalidation-1-zodb-3-7-1.patch 2008-01-27 00:36:00 UTC (rev 83261)
@@ -1,94 +0,0 @@
-diff -r 34747fbd09ec Connection.py
---- a/Connection.py Tue Nov 20 21:57:31 2007 -0700
-+++ b/Connection.py Fri Jan 11 21:19:00 2008 -0700
-@@ -75,8 +75,14 @@ class Connection(ExportImport, object):
- """Create a new Connection."""
-
- self._db = db
-- self._normal_storage = self._storage = db._storage
-- self.new_oid = db._storage.new_oid
-+ storage = db._storage
-+ m = getattr(storage, 'bind_connection', None)
-+ if m is not None:
-+ # Use a storage instance bound to this connection.
-+ storage = m(self)
-+
-+ self._normal_storage = self._storage = storage
-+ self.new_oid = storage.new_oid
- self._savepoint_storage = None
-
- self.transaction_manager = self._synch = self._mvcc = None
-@@ -170,6 +176,12 @@ class Connection(ExportImport, object):
- # Multi-database support
- self.connections = {self._db.database_name: self}
-
-+ # Allow the storage to decide whether invalidations should
-+ # propagate between connections. If the storage provides MVCC
-+ # semantics, it is better to not propagate invalidations between
-+ # connections.
-+ self._propagate_invalidations = getattr(
-+ self._storage, 'propagate_invalidations', True)
-
- def add(self, obj):
- """Add a new object 'obj' to the database and assign it an oid."""
-@@ -267,6 +279,11 @@ class Connection(ExportImport, object):
- self.transaction_manager.unregisterSynch(self)
- self._synch = None
-
-+ # If the storage wants to know, tell it this connection is closing.
-+ m = getattr(self._storage, 'connection_closing', None)
-+ if m is not None:
-+ m()
-+
- if primary:
- for connection in self.connections.values():
- if connection is not self:
-@@ -295,6 +312,10 @@ class Connection(ExportImport, object):
-
- def invalidate(self, tid, oids):
- """Notify the Connection that transaction 'tid' invalidated oids."""
-+ if not self._propagate_invalidations:
-+ # The storage disabled inter-connection invalidation.
-+ return
-+
- self._inv_lock.acquire()
- try:
- if self._txn_time is None:
-@@ -438,8 +459,23 @@ class Connection(ExportImport, object):
- self._registered_objects = []
- self._creating.clear()
-
-+ def _poll_invalidations(self):
-+ """Poll and process object invalidations provided by the storage.
-+ """
-+ m = getattr(self._storage, 'poll_invalidations', None)
-+ if m is not None:
-+ # Poll the storage for invalidations.
-+ invalidated = m()
-+ if invalidated is None:
-+ # special value: the transaction is so old that
-+ # we need to flush the whole cache.
-+ self._cache.invalidate(self._cache.cache_data.keys())
-+ elif invalidated:
-+ self._cache.invalidate(invalidated)
-+
- # Process pending invalidations.
- def _flush_invalidations(self):
-+ self._poll_invalidations()
- self._inv_lock.acquire()
- try:
- # Non-ghostifiable objects may need to read when they are
-diff -r 34747fbd09ec DB.py
---- a/DB.py Tue Nov 20 21:57:31 2007 -0700
-+++ b/DB.py Wed Nov 28 18:33:12 2007 -0700
-@@ -260,6 +260,10 @@ class DB(object):
- storage.store(z64, None, file.getvalue(), '', t)
- storage.tpc_vote(t)
- storage.tpc_finish(t)
-+ if hasattr(storage, 'connection_closing'):
-+ # Let the storage release whatever resources it used for loading
-+ # the root object.
-+ storage.connection_closing()
-
- # Multi-database setup.
- if databases is None:
Deleted: relstorage/relstorage.py
===================================================================
--- relstorage/relstorage.py 2008-01-27 00:27:45 UTC (rev 83260)
+++ relstorage/relstorage.py 2008-01-27 00:36:00 UTC (rev 83261)
@@ -1,804 +0,0 @@
-##############################################################################
-#
-# Copyright (c) 2008 Zope Corporation and Contributors.
-# All Rights Reserved.
-#
-# This software is subject to the provisions of the Zope Public License,
-# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
-# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
-# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
-# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
-# FOR A PARTICULAR PURPOSE.
-#
-##############################################################################
-"""The core of RelStorage, a ZODB storage for relational databases.
-
-Stores pickles in the database.
-"""
-
-import base64
-import cPickle
-import logging
-import md5
-import os
-import time
-import weakref
-from ZODB.utils import p64, u64, z64
-from ZODB.BaseStorage import BaseStorage
-from ZODB import ConflictResolution, POSException
-from persistent.TimeStamp import TimeStamp
-
-from autotemp import AutoTemporaryFile
-
-log = logging.getLogger("relstorage")
-
-# Set the RELSTORAGE_ABORT_EARLY environment variable when debugging
-# a failure revealed by the ZODB test suite. The test suite often fails
-# to call tpc_abort in the event of an error, leading to deadlocks.
-# The variable causes RelStorage to abort failed transactions
-# early rather than wait for an explicit abort.
-abort_early = os.environ.get('RELSTORAGE_ABORT_EARLY')
-
-
-class RelStorage(BaseStorage,
- ConflictResolution.ConflictResolvingStorage):
- """Storage to a relational database, based on invalidation polling"""
-
- def __init__(self, adapter, name='RelStorage', create=True,
- read_only=False):
- self._adapter = adapter
- self._name = name
- self._is_read_only = read_only
-
- # load_conn and load_cursor are always open
- self._load_conn = None
- self._load_cursor = None
- self._load_started = False
- self._open_load_connection()
- # store_conn and store_cursor are open only during commit
- self._store_conn = None
- self._store_cursor = None
-
- if create:
- self._adapter.prepare_schema()
-
- BaseStorage.__init__(self, name)
-
- self._tid = None
- self._ltid = None
-
- # _tbuf is a temporary file that contains pickles of data to be
- # committed. _pickler writes pickles to that file. _stored_oids
- # is a list of integer OIDs to be stored.
- self._tbuf = None
- self._pickler = None
- self._stored_oids = None
-
- # _prepared_txn is the name of the transaction to commit in the
- # second phase.
- self._prepared_txn = None
-
- # _instances is a list of weak references to storage instances bound
- # to the same database.
- self._instances = []
-
- # _closed is True after self.close() is called. Since close()
- # can be called from another thread, access to self._closed should
- # be inside a _lock_acquire()/_lock_release() block.
- self._closed = False
-
- def _open_load_connection(self):
- """Open the load connection to the database. Return nothing."""
- conn, cursor = self._adapter.open_for_load()
- self._drop_load_connection()
- self._load_conn, self._load_cursor = conn, cursor
- self._load_started = True
-
- def _drop_load_connection(self):
- conn, cursor = self._load_conn, self._load_cursor
- self._load_conn, self._load_cursor = None, None
- self._adapter.close(conn, cursor)
-
- def _drop_store_connection(self):
- conn, cursor = self._store_conn, self._store_cursor
- self._store_conn, self._store_cursor = None, None
- self._adapter.close(conn, cursor)
-
- def _rollback_load_connection(self):
- if self._load_conn is not None:
- self._load_started = False
- self._load_conn.rollback()
-
- def _start_load(self):
- if self._load_cursor is None:
- self._open_load_connection()
- else:
- self._adapter.restart_load(self._load_cursor)
- self._load_started = True
-
- def _zap(self):
- """Clear all objects out of the database.
-
- Used by the test suite.
- """
- self._adapter.zap()
- self._rollback_load_connection()
-
- def close(self):
- """Close the connections to the database."""
- self._lock_acquire()
- try:
- self._closed = True
- self._drop_load_connection()
- self._drop_store_connection()
- for wref in self._instances:
- instance = wref()
- if instance is not None:
- instance.close()
- finally:
- self._lock_release()
-
- def bind_connection(self, zodb_conn):
- """Get a connection-bound storage instance.
-
- Connections have their own storage instances so that
- the database can provide the MVCC semantics rather than ZODB.
- """
- res = BoundRelStorage(self, zodb_conn)
- self._instances.append(weakref.ref(res))
- return res
-
- def connection_closing(self):
- """Release resources."""
- self._rollback_load_connection()
-
- def __len__(self):
- return self._adapter.get_object_count()
-
- def getSize(self):
- """Return database size in bytes"""
- return self._adapter.get_db_size()
-
- def load(self, oid, version):
- self._lock_acquire()
- try:
- if not self._load_started:
- self._start_load()
- cursor = self._load_cursor
- state, tid_int = self._adapter.load_current(cursor, u64(oid))
- finally:
- self._lock_release()
- if tid_int is not None:
- if state:
- state = str(state)
- if not state:
- # This can happen if something attempts to load
- # an object whose creation has been undone.
- raise KeyError(oid)
- return state, p64(tid_int)
- else:
- raise KeyError(oid)
-
- def loadEx(self, oid, version):
- # Since we don't support versions, just tack the empty version
- # string onto load's result.
- return self.load(oid, version) + ("",)
-
- def loadSerial(self, oid, serial):
- """Load a specific revision of an object"""
- self._lock_acquire()
- try:
- if self._store_cursor is not None:
- # Allow loading data from later transactions
- # for conflict resolution.
- cursor = self._store_cursor
- else:
- if not self._load_started:
- self._start_load()
- cursor = self._load_cursor
- state = self._adapter.load_revision(cursor, u64(oid), u64(serial))
- if state is not None:
- state = str(state)
- if not state:
- raise POSKeyError(oid)
- return state
- else:
- raise KeyError(oid)
- finally:
- self._lock_release()
-
- def loadBefore(self, oid, tid):
- """Return the most recent revision of oid before tid committed."""
- oid_int = u64(oid)
-
- self._lock_acquire()
- try:
- if self._store_cursor is not None:
- # Allow loading data from later transactions
- # for conflict resolution.
- cursor = self._store_cursor
- else:
- if not self._load_started:
- self._start_load()
- cursor = self._load_cursor
- if not self._adapter.exists(cursor, u64(oid)):
- raise KeyError(oid)
-
- state, start_tid = self._adapter.load_before(
- cursor, oid_int, u64(tid))
- if start_tid is not None:
- end_int = self._adapter.get_object_tid_after(
- cursor, oid_int, start_tid)
- if end_int is not None:
- end = p64(end_int)
- else:
- end = None
- if state is not None:
- state = str(state)
- return state, p64(start_tid), end
- else:
- return None
- finally:
- self._lock_release()
-
-
- def store(self, oid, serial, data, version, transaction):
- if self._is_read_only:
- raise POSException.ReadOnlyError()
- if transaction is not self._transaction:
- raise POSException.StorageTransactionError(self, transaction)
- if version:
- raise POSException.Unsupported("Versions aren't supported")
-
- # If self._prepared_txn is not None, that means something is
- # attempting to store objects after the vote phase has finished.
- # That should not happen, should it?
- assert self._prepared_txn is None
- md5sum = md5.new(data).hexdigest()
-
- self._lock_acquire()
- try:
- # buffer the stores
- if self._tbuf is None:
- self._tbuf = AutoTemporaryFile()
- self._pickler = cPickle.Pickler(
- self._tbuf, cPickle.HIGHEST_PROTOCOL)
- self._stored_oids = []
-
- self._pickler.dump((oid, serial, md5sum, data))
- self._stored_oids.append(u64(oid))
- return None
- finally:
- self._lock_release()
-
- def tpc_begin(self, transaction, tid=None, status=' '):
- if self._is_read_only:
- raise POSException.ReadOnlyError()
- self._lock_acquire()
- try:
- if self._transaction is transaction:
- return
- self._lock_release()
- self._commit_lock_acquire()
- self._lock_acquire()
- self._transaction = transaction
- self._clear_temp()
-
- user = transaction.user
- desc = transaction.description
- ext = transaction._extension
- if ext:
- ext = cPickle.dumps(ext, 1)
- else:
- ext = ""
- self._ude = user, desc, ext
- self._tstatus = status
-
- if tid is not None:
- # get the commit lock and add the transaction now
- adapter = self._adapter
- conn, cursor = adapter.open_for_commit()
- self._store_conn, self._store_cursor = conn, cursor
- tid_int = u64(tid)
- try:
- adapter.add_transaction(cursor, tid_int, user, desc, ext)
- except:
- self._drop_store_connection()
- raise
- # else choose the tid later
- self._tid = tid
-
- finally:
- self._lock_release()
-
- def _prepare_tid(self):
- """Choose a tid for the current transaction.
-
- This should be done as late in the commit as possible, since
- it must hold an exclusive commit lock.
- """
- if self._tid is not None:
- return
- if self._transaction is None:
- raise POSException.StorageError("No transaction in progress")
-
- adapter = self._adapter
- conn, cursor = adapter.open_for_commit()
- self._store_conn, self._store_cursor = conn, cursor
- user, desc, ext = self._ude
-
- attempt = 1
- while True:
- try:
- # Choose a transaction ID.
- # Base the transaction ID on the database time,
- # while ensuring that the tid of this transaction
- # is greater than any existing tid.
- last_tid, now = adapter.get_tid_and_time(cursor)
- stamp = TimeStamp(*(time.gmtime(now)[:5] + (now % 60,)))
- stamp = stamp.laterThan(TimeStamp(p64(last_tid)))
- tid = repr(stamp)
-
- tid_int = u64(tid)
- adapter.add_transaction(cursor, tid_int, user, desc, ext)
- self._tid = tid
- break
-
- except POSException.ConflictError:
- if attempt < 3:
- # A concurrent transaction claimed the tid.
- # Rollback and try again.
- adapter.restart_commit(cursor)
- attempt += 1
- else:
- self._drop_store_connection()
- raise
- except:
- self._drop_store_connection()
- raise
-
-
- def _clear_temp(self):
- # It is assumed that self._lock_acquire was called before this
- # method was called.
- self._prepared_txn = None
- tbuf = self._tbuf
- if tbuf is not None:
- self._tbuf = None
- self._pickler = None
- self._stored_oids = None
- tbuf.close()
-
-
- def _send_stored(self):
- """Send the buffered pickles to the database.
-
- Returns a list of (oid, tid) to be received by
- Connection._handle_serial().
- """
- cursor = self._store_cursor
- adapter = self._adapter
- tid_int = u64(self._tid)
- self._pickler = None # Don't allow any more store operations
- self._tbuf.seek(0)
- unpickler = cPickle.Unpickler(self._tbuf)
-
- serials = [] # [(oid, serial)]
- prev_tids = adapter.get_object_tids(cursor, self._stored_oids)
-
- while True:
- try:
- oid, serial, md5sum, data = unpickler.load()
- except EOFError:
- break
- oid_int = u64(oid)
- if not serial:
- serial = z64
-
- prev_tid_int = prev_tids.get(oid_int)
- if prev_tid_int is None:
- prev_tid_int = 0
- prev_tid = z64
- else:
- prev_tid = p64(prev_tid_int)
- if prev_tid != serial:
- # conflict detected
- rdata = self.tryToResolveConflict(
- oid, prev_tid, serial, data)
- if rdata is None:
- raise POSException.ConflictError(
- oid=oid, serials=(prev_tid, serial), data=data)
- else:
- data = rdata
- md5sum = md5.new(data).hexdigest()
-
- self._adapter.store(
- cursor, oid_int, tid_int, prev_tid_int, md5sum, data)
-
- if prev_tid and serial != prev_tid:
- serials.append((oid, ConflictResolution.ResolvedSerial))
- else:
- serials.append((oid, self._tid))
-
- return serials
-
-
- def _vote(self):
- """Prepare the transaction for final commit."""
- # This method initiates a two-phase commit process,
- # saving the name of the prepared transaction in self._prepared_txn.
-
- # It is assumed that self._lock_acquire was called before this
- # method was called.
-
- if self._prepared_txn is not None:
- # the vote phase has already completed
- return
-
- self._prepare_tid()
- tid_int = u64(self._tid)
- cursor = self._store_cursor
- assert cursor is not None
-
- if self._tbuf is not None:
- serials = self._send_stored()
- else:
- serials = []
-
- self._adapter.update_current(cursor, tid_int)
- self._prepared_txn = self._adapter.commit_phase1(cursor, tid_int)
- # From the point of view of self._store_cursor,
- # it now looks like the transaction has been rolled back.
-
- return serials
-
-
- def tpc_vote(self, transaction):
- self._lock_acquire()
- try:
- if transaction is not self._transaction:
- return
- try:
- return self._vote()
- except:
- if abort_early:
- # abort early to avoid lockups while running the
- # somewhat brittle ZODB test suite
- self.tpc_abort(transaction)
- raise
- finally:
- self._lock_release()
-
-
- def _finish(self, tid, user, desc, ext):
- """Commit the transaction."""
- # It is assumed that self._lock_acquire was called before this
- # method was called.
- assert self._tid is not None
- self._rollback_load_connection()
- txn = self._prepared_txn
- assert txn is not None
- self._adapter.commit_phase2(self._store_cursor, txn)
- self._drop_store_connection()
- self._prepared_txn = None
- self._ltid = self._tid
- self._tid = None
-
- def _abort(self):
- # the lock is held here
- self._rollback_load_connection()
- if self._store_cursor is not None:
- self._adapter.abort(self._store_cursor, self._prepared_txn)
- self._prepared_txn = None
- self._drop_store_connection()
- self._tid = None
-
- def lastTransaction(self):
- return self._ltid
-
- def new_oid(self):
- if self._is_read_only:
- raise POSException.ReadOnlyError()
- self._lock_acquire()
- try:
- cursor = self._load_cursor
- if cursor is None:
- self._open_load_connection()
- cursor = self._load_cursor
- oid_int = self._adapter.new_oid(cursor)
- return p64(oid_int)
- finally:
- self._lock_release()
-
- def supportsUndo(self):
- return True
-
- def supportsTransactionalUndo(self):
- return True
-
- def undoLog(self, first=0, last=-20, filter=None):
- if last < 0:
- last = first - last
-
- # use a private connection to ensure the most current results
- adapter = self._adapter
- conn, cursor = adapter.open()
- try:
- rows = adapter.iter_transactions(cursor)
- i = 0
- res = []
- for tid_int, user, desc, ext in rows:
- tid = p64(tid_int)
- d = {'id': base64.encodestring(tid)[:-1],
- 'time': TimeStamp(tid).timeTime(),
- 'user_name': user,
- 'description': desc}
- if ext:
- ext = str(ext)
- if ext:
- d.update(cPickle.loads(ext))
- if filter is None or filter(d):
- if i >= first:
- res.append(d)
- i += 1
- if i >= last:
- break
- return res
-
- finally:
- adapter.close(conn, cursor)
-
- def history(self, oid, version=None, size=1, filter=None):
- self._lock_acquire()
- try:
- cursor = self._load_cursor
- oid_int = u64(oid)
- try:
- rows = self._adapter.iter_object_history(cursor, oid_int)
- except KeyError:
- raise KeyError(oid)
-
- res = []
- for tid_int, username, description, extension, length in rows:
- tid = p64(tid_int)
- if extension:
- d = loads(extension)
- else:
- d = {}
- d.update({"time": TimeStamp(tid).timeTime(),
- "user_name": username,
- "description": description,
- "tid": tid,
- "version": '',
- "size": length,
- })
- if filter is None or filter(d):
- res.append(d)
- if size is not None and len(res) >= size:
- break
- return res
- finally:
- self._lock_release()
-
-
- def undo(self, transaction_id, transaction):
- """Undo a transaction identified by transaction_id.
-
- Do so by writing new data that reverses the action taken by
- the transaction.
- """
-
- if self._is_read_only:
- raise POSException.ReadOnlyError()
- if transaction is not self._transaction:
- raise POSException.StorageTransactionError(self, transaction)
-
- undo_tid = base64.decodestring(transaction_id + '\n')
- assert len(undo_tid) == 8
- undo_tid_int = u64(undo_tid)
-
- self._lock_acquire()
- try:
- self._prepare_tid()
- self_tid_int = u64(self._tid)
- cursor = self._store_cursor
- assert cursor is not None
- adapter = self._adapter
-
- adapter.verify_undoable(cursor, undo_tid_int)
- oid_ints = adapter.undo(cursor, undo_tid_int, self_tid_int)
- oids = [p64(oid_int) for oid_int in oid_ints]
-
- # Update the current object pointers immediately, so that
- # subsequent undo operations within this transaction will see
- # the new current objects.
- adapter.update_current(cursor, self_tid_int)
-
- return self._tid, oids
-
- finally:
- self._lock_release()
-
-
- def pack(self, t, referencesf):
- if self._is_read_only:
- raise POSException.ReadOnlyError()
-
- pack_point = repr(TimeStamp(*time.gmtime(t)[:5]+(t%60,)))
- pack_point_int = u64(pack_point)
-
- def get_references(state):
- """Return the set of OIDs the given state refers to."""
- refs = set()
- if state:
- for oid in referencesf(str(state)):
- refs.add(u64(oid))
- return refs
-
- # Use a private connection (lock_conn and lock_cursor) to
- # hold the pack lock, while using a second private connection
- # (conn and cursor) to decide exactly what to pack.
- # Close the second connection. Then, with the lock still held,
- # perform the pack in a third connection opened by the adapter.
- # This structure is designed to maximize the scalability
- # of packing and minimize conflicts with concurrent writes.
- # A consequence of this structure is that the adapter must
- # not choke on transactions that may have been added between
- # pre_pack and pack.
- adapter = self._adapter
- lock_conn, lock_cursor = adapter.open()
- try:
- adapter.hold_pack_lock(lock_cursor)
-
- conn, cursor = adapter.open()
- try:
- try:
- # Find the latest commit before or at the pack time.
- tid_int = adapter.choose_pack_transaction(
- cursor, pack_point_int)
- if tid_int is None:
- # Nothing needs to be packed.
- # TODO: log the fact that nothing needs to be packed.
- return
-
- # In pre_pack, the adapter fills tables with information
- # about what to pack. The adapter should not actually
- # pack anything yet.
- adapter.pre_pack(cursor, tid_int, get_references)
- except:
- conn.rollback()
- raise
- else:
- conn.commit()
- finally:
- adapter.close(conn, cursor)
-
- # Now pack. The adapter makes its own connection just for the
- # pack operation, possibly using a special transaction mode
- # and connection flags.
- adapter.pack(tid_int)
- self._after_pack()
-
- finally:
- lock_conn.rollback()
- adapter.close(lock_conn, lock_cursor)
-
-
- def _after_pack(self):
- """Reset the transaction state after packing."""
- # The tests depend on this.
- self._rollback_load_connection()
-
-
-class BoundRelStorage(RelStorage):
- """Storage to a database, bound to a particular ZODB.Connection."""
-
- # The propagate_invalidations flag, set to a false value, tells
- # the Connection not to propagate object invalidations across
- # connections, since that ZODB feature is detrimental when the
- # storage provides its own MVCC.
- propagate_invalidations = False
-
- def __init__(self, parent, zodb_conn):
- # self._conn = conn
- RelStorage.__init__(self, adapter=parent._adapter, name=parent._name,
- read_only=parent._is_read_only, create=False)
- # _prev_polled_tid contains the tid at the previous poll
- self._prev_polled_tid = None
- self._showed_disconnect = False
-
- def poll_invalidations(self, retry=True):
- """Looks for OIDs of objects that changed since _prev_polled_tid
-
- Returns {oid: 1}, or None if all objects need to be invalidated
- because prev_polled_tid is not in the database (presumably it
- has been packed).
- """
- self._lock_acquire()
- try:
- if self._closed:
- return {}
- try:
- self._rollback_load_connection()
- self._start_load()
- conn = self._load_conn
- cursor = self._load_cursor
-
- # Ignore changes made by the last transaction committed
- # by this connection.
- ignore_tid = None
- if self._ltid is not None:
- ignore_tid = u64(self._ltid)
-
- # get a list of changed OIDs and the most recent tid
- oid_ints, new_polled_tid = self._adapter.poll_invalidations(
- conn, cursor, self._prev_polled_tid, ignore_tid)
- self._prev_polled_tid = new_polled_tid
-
- if oid_ints is None:
- oids = None
- else:
- oids = {}
- for oid_int in oid_ints:
- oids[p64(oid_int)] = 1
- return oids
- except POSException.StorageError:
- # disconnected
- if not retry:
- raise
- if not self._showed_disconnect:
- log.warning("Lost connection in %s", repr(self))
- self._showed_disconnect = True
- self._open_load_connection()
- log.info("Reconnected in %s", repr(self))
- self._showed_disconnect = False
- return self.poll_invalidations(retry=False)
- finally:
- self._lock_release()
-
- def _after_pack(self):
- # Disable transaction reset after packing. The connection
- # should call sync() to see the new state.
- pass
-
-
-# very basic test... ought to be moved or deleted.
-def test():
- import transaction
- import pprint
- from ZODB.DB import DB
- from persistent.mapping import PersistentMapping
- from relstorage.adapters.postgresql import PostgreSQLAdapter
-
- adapter = PostgreSQLAdapter(params='dbname=relstoragetest')
- storage = RelStorage(adapter)
- db = DB(storage)
-
- if True:
- for i in range(100):
- c = db.open()
- c.root()['foo'] = PersistentMapping()
- transaction.get().note('added %d' % i)
- transaction.commit()
- c.close()
- print 'wrote', i
-
- # undo 2 transactions, then redo them and undo the first again.
- for i in range(2):
- log = db.undoLog()
- db.undo(log[0]['id'])
- db.undo(log[1]['id'])
- transaction.get().note('undone! (%d)' % i)
- transaction.commit()
- print 'undid', i
-
- pprint.pprint(db.undoLog())
- db.pack(time.time() - 0.1)
- pprint.pprint(db.undoLog())
- db.close()
-
-
-if __name__ == '__main__':
- import logging
- logging.basicConfig()
- test()
Copied: relstorage/trunk/CHANGELOG.txt (from rev 83260, relstorage/CHANGELOG.txt)
===================================================================
--- relstorage/trunk/CHANGELOG.txt (rev 0)
+++ relstorage/trunk/CHANGELOG.txt 2008-01-27 00:36:00 UTC (rev 83261)
@@ -0,0 +1,75 @@
+
+relstorage 0.9
+
+- Renamed to reflect expanding database support.
+
+- Support for Oracle added.
+
+- Major overhaul with many scalability and reliability improvements,
+ particularly in the area of packing.
+
+- Moved to svn.zope.org and switched to ZPL 2.1 (required for projects
+ on svn.zope.org.)
+
+
+PGStorage 0.4
+
+- Began using the PostgreSQL LISTEN and NOTIFY statements as a shortcut
+ for invalidation polling.
+
+- Removed the commit_order code. The commit_order idea was intended to
+ allow concurrent commits, but that idea is a little too ambitious while
+ other more important ideas are being tested. Something like it may
+ come later.
+
+- Improved connection management: only one database connection is
+ held continuously open per storage instance.
+
+- Reconnect to the database automatically.
+
+- Removed test mode.
+
+- Switched from using a ZODB.Connection subclass to a ZODB patch. The
+ Connection class changes in subtle ways too often to subclass reliably;
+ a patch is much safer.
+
+- PostgreSQL 8.1 is now a dependency because PGStorage uses two phase commit.
+
+- Fixed an undo bug. Symptom: attempting to examine the undo log revealed
+ broken pickles. Cause: the extension field was not being wrapped in
+ psycopg2.Binary upon insert. Solution: used psycopg2.Binary.
+ Unfortunately, this doesn't fix existing transactions people have
+ committed. If anyone has any data to keep, fixing the old transactions
+ should be easy.
+
+- Moved from a private CVS repository to Sourceforge.
+ See http://pgstorage.sourceforge.net . Also switched to the MIT license.
+
+- David Pratt added a basic getSize() implementation so that the Zope
+ management interface displays an estimate of the size of the database.
+
+- Turned PGStorage into a top-level package. Python generally makes
+ top-level packages easier to install.
+
+
+PGStorage 0.3
+
+- Made compatible with Zope 3, although an undo bug apparently remains.
+
+
+PGStorage 0.2
+
+- Fixed concurrent commits, which were generating deadlocks. Fixed by
+ adding a special table, "commit_lock", which is used for
+ synchronizing increments of commit_seq (but only at final commit.)
+ If you are upgrading from version 0.1, you need to change your
+ database using the 'psql' prompt:
+
+ create table commit_lock ();
+
+- Added speed tests and an OpenDocument spreadsheet comparing
+ FileStorage / ZEO with PGStorage. PGStorage wins at reading objects
+ and writing a lot of small transactions, while FileStorage / ZEO
+ wins at writing big transactions. Interestingly, they tie when
+ writing a RAM disk.
+
Copied: relstorage/trunk/README.txt (from rev 83260, relstorage/README.txt)
===================================================================
--- relstorage/trunk/README.txt (rev 0)
+++ relstorage/trunk/README.txt 2008-01-27 00:36:00 UTC (rev 83261)
@@ -0,0 +1,15 @@
+
+To make Zope store in RelStorage, patch ZODB/Connection.py using the
+provided patch, then add the following to zope.conf, modifying the
+adapter and params lines to fit your needs.
+
+
+%import relstorage
+<zodb_db main>
+ mount-point /
+ <relstorage>
+ adapter postgresql
+ params host=localhost dbname=zodb user=zope password=...
+ </relstorage>
+</zodb_db>
+
Copied: relstorage/trunk/notes (from rev 83260, relstorage/notes)
Copied: relstorage/trunk/poll-invalidation-1-zodb-3-7-1.patch (from rev 83260, relstorage/poll-invalidation-1-zodb-3-7-1.patch)
===================================================================
--- relstorage/trunk/poll-invalidation-1-zodb-3-7-1.patch (rev 0)
+++ relstorage/trunk/poll-invalidation-1-zodb-3-7-1.patch 2008-01-27 00:36:00 UTC (rev 83261)
@@ -0,0 +1,94 @@
+diff -r 34747fbd09ec Connection.py
+--- a/Connection.py Tue Nov 20 21:57:31 2007 -0700
++++ b/Connection.py Fri Jan 11 21:19:00 2008 -0700
+@@ -75,8 +75,14 @@ class Connection(ExportImport, object):
+ """Create a new Connection."""
+
+ self._db = db
+- self._normal_storage = self._storage = db._storage
+- self.new_oid = db._storage.new_oid
++ storage = db._storage
++ m = getattr(storage, 'bind_connection', None)
++ if m is not None:
++ # Use a storage instance bound to this connection.
++ storage = m(self)
++
++ self._normal_storage = self._storage = storage
++ self.new_oid = storage.new_oid
+ self._savepoint_storage = None
+
+ self.transaction_manager = self._synch = self._mvcc = None
+@@ -170,6 +176,12 @@ class Connection(ExportImport, object):
+ # Multi-database support
+ self.connections = {self._db.database_name: self}
+
++ # Allow the storage to decide whether invalidations should
++ # propagate between connections. If the storage provides MVCC
++ # semantics, it is better to not propagate invalidations between
++ # connections.
++ self._propagate_invalidations = getattr(
++ self._storage, 'propagate_invalidations', True)
+
+ def add(self, obj):
+ """Add a new object 'obj' to the database and assign it an oid."""
+@@ -267,6 +279,11 @@ class Connection(ExportImport, object):
+ self.transaction_manager.unregisterSynch(self)
+ self._synch = None
+
++ # If the storage wants to know, tell it this connection is closing.
++ m = getattr(self._storage, 'connection_closing', None)
++ if m is not None:
++ m()
++
+ if primary:
+ for connection in self.connections.values():
+ if connection is not self:
+@@ -295,6 +312,10 @@ class Connection(ExportImport, object):
+
+ def invalidate(self, tid, oids):
+ """Notify the Connection that transaction 'tid' invalidated oids."""
++ if not self._propagate_invalidations:
++ # The storage disabled inter-connection invalidation.
++ return
++
+ self._inv_lock.acquire()
+ try:
+ if self._txn_time is None:
+@@ -438,8 +459,23 @@ class Connection(ExportImport, object):
+ self._registered_objects = []
+ self._creating.clear()
+
++ def _poll_invalidations(self):
++ """Poll and process object invalidations provided by the storage.
++ """
++ m = getattr(self._storage, 'poll_invalidations', None)
++ if m is not None:
++ # Poll the storage for invalidations.
++ invalidated = m()
++ if invalidated is None:
++ # special value: the transaction is so old that
++ # we need to flush the whole cache.
++ self._cache.invalidate(self._cache.cache_data.keys())
++ elif invalidated:
++ self._cache.invalidate(invalidated)
++
+ # Process pending invalidations.
+ def _flush_invalidations(self):
++ self._poll_invalidations()
+ self._inv_lock.acquire()
+ try:
+ # Non-ghostifiable objects may need to read when they are
+diff -r 34747fbd09ec DB.py
+--- a/DB.py Tue Nov 20 21:57:31 2007 -0700
++++ b/DB.py Wed Nov 28 18:33:12 2007 -0700
+@@ -260,6 +260,10 @@ class DB(object):
+ storage.store(z64, None, file.getvalue(), '', t)
+ storage.tpc_vote(t)
+ storage.tpc_finish(t)
++ if hasattr(storage, 'connection_closing'):
++ # Let the storage release whatever resources it used for loading
++ # the root object.
++ storage.connection_closing()
+
+ # Multi-database setup.
+ if databases is None:
Copied: relstorage/trunk/relstorage/__init__.py (from rev 83260, relstorage/__init__.py)
===================================================================
--- relstorage/trunk/relstorage/__init__.py (rev 0)
+++ relstorage/trunk/relstorage/__init__.py 2008-01-27 00:36:00 UTC (rev 83261)
@@ -0,0 +1,22 @@
+##############################################################################
+#
+# Copyright (c) 2008 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""relstorage package"""
+
+# perform a compatibility test
+from ZODB.Connection import Connection
+
+if not hasattr(Connection, '_poll_invalidations'):
+ raise ImportError('RelStorage requires the invalidation polling '
+ 'patch for ZODB.')
+del Connection
Copied: relstorage/trunk/relstorage/adapters (from rev 83260, relstorage/adapters)
Copied: relstorage/trunk/relstorage/autotemp.py (from rev 83260, relstorage/autotemp.py)
===================================================================
--- relstorage/trunk/relstorage/autotemp.py (rev 0)
+++ relstorage/trunk/relstorage/autotemp.py 2008-01-27 00:36:00 UTC (rev 83261)
@@ -0,0 +1,43 @@
+##############################################################################
+#
+# Copyright (c) 2008 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""A temporary file that switches from StringIO to TemporaryFile if needed.
+
+This could be a useful addition to Python's tempfile module.
+"""
+
+from cStringIO import StringIO
+import tempfile
+
+class AutoTemporaryFile:
+ """Initially a StringIO, but becomes a TemporaryFile if it grows too big"""
+ def __init__(self, threshold=10485760):
+ self._threshold = threshold
+ self._f = f = StringIO()
+ # delegate most methods
+ for m in ('read', 'readline', 'seek', 'tell', 'close'):
+ setattr(self, m, getattr(f, m))
+
+ def write(self, data):
+ threshold = self._threshold
+ if threshold > 0 and self.tell() + len(data) >= threshold:
+ # convert to TemporaryFile
+ f = tempfile.TemporaryFile()
+ f.write(self._f.getvalue())
+ f.seek(self.tell())
+ self._f = f
+ self._threshold = 0
+ # delegate all important methods
+ for m in ('write', 'read', 'readline', 'seek', 'tell', 'close'):
+ setattr(self, m, getattr(f, m))
+ self._f.write(data)
Copied: relstorage/trunk/relstorage/component.xml (from rev 83260, relstorage/component.xml)
===================================================================
--- relstorage/trunk/relstorage/component.xml (rev 0)
+++ relstorage/trunk/relstorage/component.xml 2008-01-27 00:36:00 UTC (rev 83261)
@@ -0,0 +1,64 @@
+<?xml version="1.0"?>
+
+<!-- RelStorage configuration via ZConfig -->
+
+<component prefix="relstorage.config">
+
+ <import package="ZODB"/>
+ <abstracttype name="relstorage.adapter"/>
+
+ <sectiontype name="relstorage" implements="ZODB.storage"
+ datatype=".RelStorageFactory">
+ <section type="relstorage.adapter" name="*" attribute="adapter"/>
+ <key name="name" default="RelStorage"/>
+ <key name="create" datatype="boolean" default="true">
+ <description>
+ Flag that indicates whether the storage should be initialized if
+ it does not already exist.
+ </description>
+ </key>
+ <key name="read-only" datatype="boolean" default="false">
+ <description>
+ If true, only reads may be executed against the storage. Note
+ that the "pack" operation is not considered a write operation
+ and is still allowed on a read-only filestorage.
+ </description>
+ </key>
+ </sectiontype>
+
+ <sectiontype name="postgresql" implements="relstorage.adapter"
+ datatype=".PostgreSQLAdapterFactory">
+ <key name="dsn" datatype="string" required="no" default="">
+ <description>
+ The PostgreSQL data source name. For example:
+
+ dsn dbname='template1' user='user' host='localhost' password='pass'
+
+ If dsn is omitted, the adapter will connect to a local database with
+ no password. Both the user and database name will match the
+ name of the owner of the current process.
+ </description>
+ </key>
+ </sectiontype>
+
+ <sectiontype name="oracle" implements="relstorage.adapter"
+ datatype=".OracleAdapterFactory">
+ <key name="user" datatype="string" required="yes">
+ <description>
+ The Oracle account name
+ </description>
+ </key>
+ <key name="password" datatype="string" required="yes">
+ <description>
+ The Oracle account password
+ </description>
+ </key>
+ <key name="dsn" datatype="string" required="yes">
+ <description>
+ The Oracle data source name. The Oracle client library will
+ normally expect to find the DSN in /etc/oratab.
+ </description>
+ </key>
+ </sectiontype>
+
+</component>
Copied: relstorage/trunk/relstorage/config.py (from rev 83260, relstorage/config.py)
===================================================================
--- relstorage/trunk/relstorage/config.py (rev 0)
+++ relstorage/trunk/relstorage/config.py 2008-01-27 00:36:00 UTC (rev 83261)
@@ -0,0 +1,41 @@
+##############################################################################
+#
+# Copyright (c) 2008 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""ZConfig directive implementations for binding RelStorage to Zope"""
+
+from ZODB.config import BaseConfig
+
+from relstorage import RelStorage
+
+
+class RelStorageFactory(BaseConfig):
+ """Open a storage configured via ZConfig"""
+ def open(self):
+ config = self.config
+ adapter = config.adapter.open()
+ return RelStorage(adapter, name=config.name, create=config.create,
+ read_only=config.read_only)
+
+
+class PostgreSQLAdapterFactory(BaseConfig):
+ def open(self):
+ from adapters.postgresql import PostgreSQLAdapter
+ return PostgreSQLAdapter(self.config.dsn)
+
+
+class OracleAdapterFactory(BaseConfig):
+ def open(self):
+ from adapters.oracle import OracleAdapter
+ config = self.config
+ return OracleAdapter(config.user, config.password, config.dsn)
+
Copied: relstorage/trunk/relstorage/relstorage.py (from rev 83260, relstorage/relstorage.py)
===================================================================
--- relstorage/trunk/relstorage/relstorage.py (rev 0)
+++ relstorage/trunk/relstorage/relstorage.py 2008-01-27 00:36:00 UTC (rev 83261)
@@ -0,0 +1,804 @@
+##############################################################################
+#
+# Copyright (c) 2008 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""The core of RelStorage, a ZODB storage for relational databases.
+
+Stores pickles in the database.
+"""
+
+import base64
+import cPickle
+import logging
+import md5
+import os
+import time
+import weakref
+from ZODB.utils import p64, u64, z64
+from ZODB.BaseStorage import BaseStorage
+from ZODB import ConflictResolution, POSException
+from persistent.TimeStamp import TimeStamp
+
+from autotemp import AutoTemporaryFile
+
+log = logging.getLogger("relstorage")
+
+# Set the RELSTORAGE_ABORT_EARLY environment variable when debugging
+# a failure revealed by the ZODB test suite. The test suite often fails
+# to call tpc_abort in the event of an error, leading to deadlocks.
+# The variable causes RelStorage to abort failed transactions
+# early rather than wait for an explicit abort.
+abort_early = os.environ.get('RELSTORAGE_ABORT_EARLY')
+
+
+class RelStorage(BaseStorage,
+ ConflictResolution.ConflictResolvingStorage):
+ """Storage to a relational database, based on invalidation polling"""
+
+ def __init__(self, adapter, name='RelStorage', create=True,
+ read_only=False):
+ self._adapter = adapter
+ self._name = name
+ self._is_read_only = read_only
+
+ # load_conn and load_cursor are always open
+ self._load_conn = None
+ self._load_cursor = None
+ self._load_started = False
+ self._open_load_connection()
+ # store_conn and store_cursor are open only during commit
+ self._store_conn = None
+ self._store_cursor = None
+
+ if create:
+ self._adapter.prepare_schema()
+
+ BaseStorage.__init__(self, name)
+
+ self._tid = None
+ self._ltid = None
+
+ # _tbuf is a temporary file that contains pickles of data to be
+ # committed. _pickler writes pickles to that file. _stored_oids
+ # is a list of integer OIDs to be stored.
+ self._tbuf = None
+ self._pickler = None
+ self._stored_oids = None
+
+ # _prepared_txn is the name of the transaction to commit in the
+ # second phase.
+ self._prepared_txn = None
+
+ # _instances is a list of weak references to storage instances bound
+ # to the same database.
+ self._instances = []
+
+ # _closed is True after self.close() is called. Since close()
+ # can be called from another thread, access to self._closed should
+ # be inside a _lock_acquire()/_lock_release() block.
+ self._closed = False
+
+ def _open_load_connection(self):
+ """Open the load connection to the database. Return nothing."""
+ conn, cursor = self._adapter.open_for_load()
+ self._drop_load_connection()
+ self._load_conn, self._load_cursor = conn, cursor
+ self._load_started = True
+
+ def _drop_load_connection(self):
+ conn, cursor = self._load_conn, self._load_cursor
+ self._load_conn, self._load_cursor = None, None
+ self._adapter.close(conn, cursor)
+
+ def _drop_store_connection(self):
+ conn, cursor = self._store_conn, self._store_cursor
+ self._store_conn, self._store_cursor = None, None
+ self._adapter.close(conn, cursor)
+
+ def _rollback_load_connection(self):
+ if self._load_conn is not None:
+ self._load_started = False
+ self._load_conn.rollback()
+
+ def _start_load(self):
+ if self._load_cursor is None:
+ self._open_load_connection()
+ else:
+ self._adapter.restart_load(self._load_cursor)
+ self._load_started = True
+
+ def _zap(self):
+ """Clear all objects out of the database.
+
+ Used by the test suite.
+ """
+ self._adapter.zap()
+ self._rollback_load_connection()
+
+ def close(self):
+ """Close the connections to the database."""
+ self._lock_acquire()
+ try:
+ self._closed = True
+ self._drop_load_connection()
+ self._drop_store_connection()
+ for wref in self._instances:
+ instance = wref()
+ if instance is not None:
+ instance.close()
+ finally:
+ self._lock_release()
+
+ def bind_connection(self, zodb_conn):
+ """Get a connection-bound storage instance.
+
+ Connections have their own storage instances so that
+ the database can provide the MVCC semantics rather than ZODB.
+ """
+ res = BoundRelStorage(self, zodb_conn)
+ self._instances.append(weakref.ref(res))
+ return res
+
+ def connection_closing(self):
+ """Release resources."""
+ self._rollback_load_connection()
+
+ def __len__(self):
+ return self._adapter.get_object_count()
+
+ def getSize(self):
+ """Return database size in bytes"""
+ return self._adapter.get_db_size()
+
+ def load(self, oid, version):
+ self._lock_acquire()
+ try:
+ if not self._load_started:
+ self._start_load()
+ cursor = self._load_cursor
+ state, tid_int = self._adapter.load_current(cursor, u64(oid))
+ finally:
+ self._lock_release()
+ if tid_int is not None:
+ if state:
+ state = str(state)
+ if not state:
+ # This can happen if something attempts to load
+ # an object whose creation has been undone.
+ raise KeyError(oid)
+ return state, p64(tid_int)
+ else:
+ raise KeyError(oid)
+
+ def loadEx(self, oid, version):
+ # Since we don't support versions, just tack the empty version
+ # string onto load's result.
+ return self.load(oid, version) + ("",)
+
+ def loadSerial(self, oid, serial):
+ """Load a specific revision of an object"""
+ self._lock_acquire()
+ try:
+ if self._store_cursor is not None:
+ # Allow loading data from later transactions
+ # for conflict resolution.
+ cursor = self._store_cursor
+ else:
+ if not self._load_started:
+ self._start_load()
+ cursor = self._load_cursor
+ state = self._adapter.load_revision(cursor, u64(oid), u64(serial))
+ if state is not None:
+ state = str(state)
+ if not state:
+ raise POSKeyError(oid)
+ return state
+ else:
+ raise KeyError(oid)
+ finally:
+ self._lock_release()
+
+ def loadBefore(self, oid, tid):
+ """Return the most recent revision of oid before tid committed."""
+ oid_int = u64(oid)
+
+ self._lock_acquire()
+ try:
+ if self._store_cursor is not None:
+ # Allow loading data from later transactions
+ # for conflict resolution.
+ cursor = self._store_cursor
+ else:
+ if not self._load_started:
+ self._start_load()
+ cursor = self._load_cursor
+ if not self._adapter.exists(cursor, u64(oid)):
+ raise KeyError(oid)
+
+ state, start_tid = self._adapter.load_before(
+ cursor, oid_int, u64(tid))
+ if start_tid is not None:
+ end_int = self._adapter.get_object_tid_after(
+ cursor, oid_int, start_tid)
+ if end_int is not None:
+ end = p64(end_int)
+ else:
+ end = None
+ if state is not None:
+ state = str(state)
+ return state, p64(start_tid), end
+ else:
+ return None
+ finally:
+ self._lock_release()
+
+
+ def store(self, oid, serial, data, version, transaction):
+ if self._is_read_only:
+ raise POSException.ReadOnlyError()
+ if transaction is not self._transaction:
+ raise POSException.StorageTransactionError(self, transaction)
+ if version:
+ raise POSException.Unsupported("Versions aren't supported")
+
+ # If self._prepared_txn is not None, that means something is
+ # attempting to store objects after the vote phase has finished.
+ # That should not happen, should it?
+ assert self._prepared_txn is None
+ md5sum = md5.new(data).hexdigest()
+
+ self._lock_acquire()
+ try:
+ # buffer the stores
+ if self._tbuf is None:
+ self._tbuf = AutoTemporaryFile()
+ self._pickler = cPickle.Pickler(
+ self._tbuf, cPickle.HIGHEST_PROTOCOL)
+ self._stored_oids = []
+
+ self._pickler.dump((oid, serial, md5sum, data))
+ self._stored_oids.append(u64(oid))
+ return None
+ finally:
+ self._lock_release()
+
+ def tpc_begin(self, transaction, tid=None, status=' '):
+ if self._is_read_only:
+ raise POSException.ReadOnlyError()
+ self._lock_acquire()
+ try:
+ if self._transaction is transaction:
+ return
+ self._lock_release()
+ self._commit_lock_acquire()
+ self._lock_acquire()
+ self._transaction = transaction
+ self._clear_temp()
+
+ user = transaction.user
+ desc = transaction.description
+ ext = transaction._extension
+ if ext:
+ ext = cPickle.dumps(ext, 1)
+ else:
+ ext = ""
+ self._ude = user, desc, ext
+ self._tstatus = status
+
+ if tid is not None:
+ # get the commit lock and add the transaction now
+ adapter = self._adapter
+ conn, cursor = adapter.open_for_commit()
+ self._store_conn, self._store_cursor = conn, cursor
+ tid_int = u64(tid)
+ try:
+ adapter.add_transaction(cursor, tid_int, user, desc, ext)
+ except:
+ self._drop_store_connection()
+ raise
+ # else choose the tid later
+ self._tid = tid
+
+ finally:
+ self._lock_release()
+
+ def _prepare_tid(self):
+ """Choose a tid for the current transaction.
+
+ This should be done as late in the commit as possible, since
+ it must hold an exclusive commit lock.
+ """
+ if self._tid is not None:
+ return
+ if self._transaction is None:
+ raise POSException.StorageError("No transaction in progress")
+
+ adapter = self._adapter
+ conn, cursor = adapter.open_for_commit()
+ self._store_conn, self._store_cursor = conn, cursor
+ user, desc, ext = self._ude
+
+ attempt = 1
+ while True:
+ try:
+ # Choose a transaction ID.
+ # Base the transaction ID on the database time,
+ # while ensuring that the tid of this transaction
+ # is greater than any existing tid.
+ last_tid, now = adapter.get_tid_and_time(cursor)
+ stamp = TimeStamp(*(time.gmtime(now)[:5] + (now % 60,)))
+ stamp = stamp.laterThan(TimeStamp(p64(last_tid)))
+ tid = repr(stamp)
+
+ tid_int = u64(tid)
+ adapter.add_transaction(cursor, tid_int, user, desc, ext)
+ self._tid = tid
+ break
+
+ except POSException.ConflictError:
+ if attempt < 3:
+ # A concurrent transaction claimed the tid.
+ # Rollback and try again.
+ adapter.restart_commit(cursor)
+ attempt += 1
+ else:
+ self._drop_store_connection()
+ raise
+ except:
+ self._drop_store_connection()
+ raise
+
+
+ def _clear_temp(self):
+ # It is assumed that self._lock_acquire was called before this
+ # method was called.
+ self._prepared_txn = None
+ tbuf = self._tbuf
+ if tbuf is not None:
+ self._tbuf = None
+ self._pickler = None
+ self._stored_oids = None
+ tbuf.close()
+
+
+ def _send_stored(self):
+ """Send the buffered pickles to the database.
+
+ Returns a list of (oid, tid) to be received by
+ Connection._handle_serial().
+ """
+ cursor = self._store_cursor
+ adapter = self._adapter
+ tid_int = u64(self._tid)
+ self._pickler = None # Don't allow any more store operations
+ self._tbuf.seek(0)
+ unpickler = cPickle.Unpickler(self._tbuf)
+
+ serials = [] # [(oid, serial)]
+ prev_tids = adapter.get_object_tids(cursor, self._stored_oids)
+
+ while True:
+ try:
+ oid, serial, md5sum, data = unpickler.load()
+ except EOFError:
+ break
+ oid_int = u64(oid)
+ if not serial:
+ serial = z64
+
+ prev_tid_int = prev_tids.get(oid_int)
+ if prev_tid_int is None:
+ prev_tid_int = 0
+ prev_tid = z64
+ else:
+ prev_tid = p64(prev_tid_int)
+ if prev_tid != serial:
+ # conflict detected
+ rdata = self.tryToResolveConflict(
+ oid, prev_tid, serial, data)
+ if rdata is None:
+ raise POSException.ConflictError(
+ oid=oid, serials=(prev_tid, serial), data=data)
+ else:
+ data = rdata
+ md5sum = md5.new(data).hexdigest()
+
+ self._adapter.store(
+ cursor, oid_int, tid_int, prev_tid_int, md5sum, data)
+
+ if prev_tid and serial != prev_tid:
+ serials.append((oid, ConflictResolution.ResolvedSerial))
+ else:
+ serials.append((oid, self._tid))
+
+ return serials
+
+
+ def _vote(self):
+ """Prepare the transaction for final commit."""
+ # This method initiates a two-phase commit process,
+ # saving the name of the prepared transaction in self._prepared_txn.
+
+ # It is assumed that self._lock_acquire was called before this
+ # method was called.
+
+ if self._prepared_txn is not None:
+ # the vote phase has already completed
+ return
+
+ self._prepare_tid()
+ tid_int = u64(self._tid)
+ cursor = self._store_cursor
+ assert cursor is not None
+
+ if self._tbuf is not None:
+ serials = self._send_stored()
+ else:
+ serials = []
+
+ self._adapter.update_current(cursor, tid_int)
+ self._prepared_txn = self._adapter.commit_phase1(cursor, tid_int)
+ # From the point of view of self._store_cursor,
+ # it now looks like the transaction has been rolled back.
+
+ return serials
+
+
+ def tpc_vote(self, transaction):
+ self._lock_acquire()
+ try:
+ if transaction is not self._transaction:
+ return
+ try:
+ return self._vote()
+ except:
+ if abort_early:
+ # abort early to avoid lockups while running the
+ # somewhat brittle ZODB test suite
+ self.tpc_abort(transaction)
+ raise
+ finally:
+ self._lock_release()
+
+
+ def _finish(self, tid, user, desc, ext):
+ """Commit the transaction."""
+ # It is assumed that self._lock_acquire was called before this
+ # method was called.
+ assert self._tid is not None
+ self._rollback_load_connection()
+ txn = self._prepared_txn
+ assert txn is not None
+ self._adapter.commit_phase2(self._store_cursor, txn)
+ self._drop_store_connection()
+ self._prepared_txn = None
+ self._ltid = self._tid
+ self._tid = None
+
+ def _abort(self):
+ # the lock is held here
+ self._rollback_load_connection()
+ if self._store_cursor is not None:
+ self._adapter.abort(self._store_cursor, self._prepared_txn)
+ self._prepared_txn = None
+ self._drop_store_connection()
+ self._tid = None
+
+ def lastTransaction(self):
+ return self._ltid
+
+ def new_oid(self):
+ if self._is_read_only:
+ raise POSException.ReadOnlyError()
+ self._lock_acquire()
+ try:
+ cursor = self._load_cursor
+ if cursor is None:
+ self._open_load_connection()
+ cursor = self._load_cursor
+ oid_int = self._adapter.new_oid(cursor)
+ return p64(oid_int)
+ finally:
+ self._lock_release()
+
+ def supportsUndo(self):
+ return True
+
+ def supportsTransactionalUndo(self):
+ return True
+
+ def undoLog(self, first=0, last=-20, filter=None):
+ if last < 0:
+ last = first - last
+
+ # use a private connection to ensure the most current results
+ adapter = self._adapter
+ conn, cursor = adapter.open()
+ try:
+ rows = adapter.iter_transactions(cursor)
+ i = 0
+ res = []
+ for tid_int, user, desc, ext in rows:
+ tid = p64(tid_int)
+ d = {'id': base64.encodestring(tid)[:-1],
+ 'time': TimeStamp(tid).timeTime(),
+ 'user_name': user,
+ 'description': desc}
+ if ext:
+ ext = str(ext)
+ if ext:
+ d.update(cPickle.loads(ext))
+ if filter is None or filter(d):
+ if i >= first:
+ res.append(d)
+ i += 1
+ if i >= last:
+ break
+ return res
+
+ finally:
+ adapter.close(conn, cursor)
+
+ def history(self, oid, version=None, size=1, filter=None):
+ self._lock_acquire()
+ try:
+ cursor = self._load_cursor
+ oid_int = u64(oid)
+ try:
+ rows = self._adapter.iter_object_history(cursor, oid_int)
+ except KeyError:
+ raise KeyError(oid)
+
+ res = []
+ for tid_int, username, description, extension, length in rows:
+ tid = p64(tid_int)
+ if extension:
+ d = loads(extension)
+ else:
+ d = {}
+ d.update({"time": TimeStamp(tid).timeTime(),
+ "user_name": username,
+ "description": description,
+ "tid": tid,
+ "version": '',
+ "size": length,
+ })
+ if filter is None or filter(d):
+ res.append(d)
+ if size is not None and len(res) >= size:
+ break
+ return res
+ finally:
+ self._lock_release()
+
+
+ def undo(self, transaction_id, transaction):
+ """Undo a transaction identified by transaction_id.
+
+ Do so by writing new data that reverses the action taken by
+ the transaction.
+ """
+
+ if self._is_read_only:
+ raise POSException.ReadOnlyError()
+ if transaction is not self._transaction:
+ raise POSException.StorageTransactionError(self, transaction)
+
+ undo_tid = base64.decodestring(transaction_id + '\n')
+ assert len(undo_tid) == 8
+ undo_tid_int = u64(undo_tid)
+
+ self._lock_acquire()
+ try:
+ self._prepare_tid()
+ self_tid_int = u64(self._tid)
+ cursor = self._store_cursor
+ assert cursor is not None
+ adapter = self._adapter
+
+ adapter.verify_undoable(cursor, undo_tid_int)
+ oid_ints = adapter.undo(cursor, undo_tid_int, self_tid_int)
+ oids = [p64(oid_int) for oid_int in oid_ints]
+
+ # Update the current object pointers immediately, so that
+ # subsequent undo operations within this transaction will see
+ # the new current objects.
+ adapter.update_current(cursor, self_tid_int)
+
+ return self._tid, oids
+
+ finally:
+ self._lock_release()
+
+
+ def pack(self, t, referencesf):
+ if self._is_read_only:
+ raise POSException.ReadOnlyError()
+
+ pack_point = repr(TimeStamp(*time.gmtime(t)[:5]+(t%60,)))
+ pack_point_int = u64(pack_point)
+
+ def get_references(state):
+ """Return the set of OIDs the given state refers to."""
+ refs = set()
+ if state:
+ for oid in referencesf(str(state)):
+ refs.add(u64(oid))
+ return refs
+
+ # Use a private connection (lock_conn and lock_cursor) to
+ # hold the pack lock, while using a second private connection
+ # (conn and cursor) to decide exactly what to pack.
+ # Close the second connection. Then, with the lock still held,
+ # perform the pack in a third connection opened by the adapter.
+ # This structure is designed to maximize the scalability
+ # of packing and minimize conflicts with concurrent writes.
+ # A consequence of this structure is that the adapter must
+ # not choke on transactions that may have been added between
+ # pre_pack and pack.
+ adapter = self._adapter
+ lock_conn, lock_cursor = adapter.open()
+ try:
+ adapter.hold_pack_lock(lock_cursor)
+
+ conn, cursor = adapter.open()
+ try:
+ try:
+ # Find the latest commit before or at the pack time.
+ tid_int = adapter.choose_pack_transaction(
+ cursor, pack_point_int)
+ if tid_int is None:
+ # Nothing needs to be packed.
+ # TODO: log the fact that nothing needs to be packed.
+ return
+
+ # In pre_pack, the adapter fills tables with information
+ # about what to pack. The adapter should not actually
+ # pack anything yet.
+ adapter.pre_pack(cursor, tid_int, get_references)
+ except:
+ conn.rollback()
+ raise
+ else:
+ conn.commit()
+ finally:
+ adapter.close(conn, cursor)
+
+ # Now pack. The adapter makes its own connection just for the
+ # pack operation, possibly using a special transaction mode
+ # and connection flags.
+ adapter.pack(tid_int)
+ self._after_pack()
+
+ finally:
+ lock_conn.rollback()
+ adapter.close(lock_conn, lock_cursor)
+
+
+ def _after_pack(self):
+ """Reset the transaction state after packing."""
+ # The tests depend on this.
+ self._rollback_load_connection()
+
+
+class BoundRelStorage(RelStorage):
+ """Storage to a database, bound to a particular ZODB.Connection."""
+
+ # The propagate_invalidations flag, set to a false value, tells
+ # the Connection not to propagate object invalidations across
+ # connections, since that ZODB feature is detrimental when the
+ # storage provides its own MVCC.
+ propagate_invalidations = False
+
+ def __init__(self, parent, zodb_conn):
+ # self._conn = conn
+ RelStorage.__init__(self, adapter=parent._adapter, name=parent._name,
+ read_only=parent._is_read_only, create=False)
+ # _prev_polled_tid contains the tid at the previous poll
+ self._prev_polled_tid = None
+ self._showed_disconnect = False
+
+ def poll_invalidations(self, retry=True):
+ """Looks for OIDs of objects that changed since _prev_polled_tid
+
+ Returns {oid: 1}, or None if all objects need to be invalidated
+ because prev_polled_tid is not in the database (presumably it
+ has been packed).
+ """
+ self._lock_acquire()
+ try:
+ if self._closed:
+ return {}
+ try:
+ self._rollback_load_connection()
+ self._start_load()
+ conn = self._load_conn
+ cursor = self._load_cursor
+
+ # Ignore changes made by the last transaction committed
+ # by this connection.
+ ignore_tid = None
+ if self._ltid is not None:
+ ignore_tid = u64(self._ltid)
+
+ # get a list of changed OIDs and the most recent tid
+ oid_ints, new_polled_tid = self._adapter.poll_invalidations(
+ conn, cursor, self._prev_polled_tid, ignore_tid)
+ self._prev_polled_tid = new_polled_tid
+
+ if oid_ints is None:
+ oids = None
+ else:
+ oids = {}
+ for oid_int in oid_ints:
+ oids[p64(oid_int)] = 1
+ return oids
+ except POSException.StorageError:
+ # disconnected
+ if not retry:
+ raise
+ if not self._showed_disconnect:
+ log.warning("Lost connection in %s", repr(self))
+ self._showed_disconnect = True
+ self._open_load_connection()
+ log.info("Reconnected in %s", repr(self))
+ self._showed_disconnect = False
+ return self.poll_invalidations(retry=False)
+ finally:
+ self._lock_release()
+
+ def _after_pack(self):
+ # Disable transaction reset after packing. The connection
+ # should call sync() to see the new state.
+ pass
+
+
+# very basic test... ought to be moved or deleted.
+def test():
+ import transaction
+ import pprint
+ from ZODB.DB import DB
+ from persistent.mapping import PersistentMapping
+ from relstorage.adapters.postgresql import PostgreSQLAdapter
+
+ adapter = PostgreSQLAdapter(params='dbname=relstoragetest')
+ storage = RelStorage(adapter)
+ db = DB(storage)
+
+ if True:
+ for i in range(100):
+ c = db.open()
+ c.root()['foo'] = PersistentMapping()
+ transaction.get().note('added %d' % i)
+ transaction.commit()
+ c.close()
+ print 'wrote', i
+
+ # undo 2 transactions, then redo them and undo the first again.
+ for i in range(2):
+ log = db.undoLog()
+ db.undo(log[0]['id'])
+ db.undo(log[1]['id'])
+ transaction.get().note('undone! (%d)' % i)
+ transaction.commit()
+ print 'undid', i
+
+ pprint.pprint(db.undoLog())
+ db.pack(time.time() - 0.1)
+ pprint.pprint(db.undoLog())
+ db.close()
+
+
+if __name__ == '__main__':
+ import logging
+ logging.basicConfig()
+ test()
Copied: relstorage/trunk/relstorage/tests (from rev 83260, relstorage/tests)
More information about the Checkins
mailing list