[Checkins] SVN: relstorage/trunk/ Fixed the blob handling tests for ZODB 3.8.
Shane Hathaway
shane at hathawaymix.org
Fri Sep 25 18:13:40 EDT 2009
Log message for revision 104556:
Fixed the blob handling tests for ZODB 3.8.
This involved:
- Renamed relstorage.py to storage.py to overcome import issues.
- Updated the patch for ZODB 3.7 and 3.8 to fix an issue with
blobs and subtransactions.
Changed:
U relstorage/trunk/CHANGES.txt
D relstorage/trunk/poll-invalidation-1-zodb-3-7-1.patch
D relstorage/trunk/poll-invalidation-1-zodb-3-8-0.patch
A relstorage/trunk/poll-invalidation-zodb-3-7.patch
A relstorage/trunk/poll-invalidation-zodb-3-8.patch
U relstorage/trunk/relstorage/config.py
D relstorage/trunk/relstorage/relstorage.py
A relstorage/trunk/relstorage/storage.py
U relstorage/trunk/relstorage/tests/RecoveryStorage.py
U relstorage/trunk/relstorage/tests/blob/blob_connection.txt
U relstorage/trunk/relstorage/tests/blob/blob_transaction.txt
U relstorage/trunk/relstorage/tests/blob/testblob.py
U relstorage/trunk/relstorage/tests/packstresstest.py
U relstorage/trunk/relstorage/tests/reltestbase.py
U relstorage/trunk/relstorage/tests/speedtest.py
U relstorage/trunk/relstorage/tests/testmysql.py
U relstorage/trunk/relstorage/tests/testoracle.py
U relstorage/trunk/relstorage/tests/testpostgresql.py
A relstorage/trunk/relstorage/util.py
-=-
Modified: relstorage/trunk/CHANGES.txt
===================================================================
--- relstorage/trunk/CHANGES.txt 2009-09-25 22:09:42 UTC (rev 104555)
+++ relstorage/trunk/CHANGES.txt 2009-09-25 22:13:39 UTC (rev 104556)
@@ -2,6 +2,15 @@
Unreleased
----------
+- Added the keep_history option to the database adapters. Set it
+ to false to keep no history. (Packing is still required for
+ garbage collection and blob deletion.)
+
+- Renamed relstorage.py to storage.py to overcome import issues.
+
+- Updated the patch for ZODB 3.7 and 3.8 to fix an issue with
+ blobs and subtransactions.
+
- Divided the implementation of database adapters into many small
objects, making the adapter code more modular. Added interfaces
that describe the duties of each part.
Deleted: relstorage/trunk/poll-invalidation-1-zodb-3-7-1.patch
===================================================================
--- relstorage/trunk/poll-invalidation-1-zodb-3-7-1.patch 2009-09-25 22:09:42 UTC (rev 104555)
+++ relstorage/trunk/poll-invalidation-1-zodb-3-7-1.patch 2009-09-25 22:13:39 UTC (rev 104556)
@@ -1,96 +0,0 @@
-Index: Connection.py
-===================================================================
---- Connection.py (revision 87280)
-+++ Connection.py (working copy)
-@@ -75,8 +75,14 @@
- """Create a new Connection."""
-
- self._db = db
-- self._normal_storage = self._storage = db._storage
-- self.new_oid = db._storage.new_oid
-+ storage = db._storage
-+ m = getattr(storage, 'bind_connection', None)
-+ if m is not None:
-+ # Use a storage instance bound to this connection.
-+ storage = m(self)
-+
-+ self._normal_storage = self._storage = storage
-+ self.new_oid = storage.new_oid
- self._savepoint_storage = None
-
- self.transaction_manager = self._synch = self._mvcc = None
-@@ -170,6 +176,12 @@
- # Multi-database support
- self.connections = {self._db.database_name: self}
-
-+ # Allow the storage to decide whether invalidations should
-+ # propagate between connections. If the storage provides MVCC
-+ # semantics, it is better to not propagate invalidations between
-+ # connections.
-+ self._propagate_invalidations = getattr(
-+ self._storage, 'propagate_invalidations', True)
-
- def add(self, obj):
- """Add a new object 'obj' to the database and assign it an oid."""
-@@ -267,6 +279,11 @@
- self.transaction_manager.unregisterSynch(self)
- self._synch = None
-
-+ # If the storage wants to know, tell it this connection is closing.
-+ m = getattr(self._storage, 'connection_closing', None)
-+ if m is not None:
-+ m()
-+
- if primary:
- for connection in self.connections.values():
- if connection is not self:
-@@ -295,6 +312,10 @@
-
- def invalidate(self, tid, oids):
- """Notify the Connection that transaction 'tid' invalidated oids."""
-+ if not self._propagate_invalidations:
-+ # The storage disabled inter-connection invalidation.
-+ return
-+
- self._inv_lock.acquire()
- try:
- if self._txn_time is None:
-@@ -438,8 +459,23 @@
- self._registered_objects = []
- self._creating.clear()
-
-+ def _poll_invalidations(self):
-+ """Poll and process object invalidations provided by the storage.
-+ """
-+ m = getattr(self._storage, 'poll_invalidations', None)
-+ if m is not None:
-+ # Poll the storage for invalidations.
-+ invalidated = m()
-+ if invalidated is None:
-+ # special value: the transaction is so old that
-+ # we need to flush the whole cache.
-+ self._cache.invalidate(self._cache.cache_data.keys())
-+ elif invalidated:
-+ self._cache.invalidate(invalidated)
-+
- # Process pending invalidations.
- def _flush_invalidations(self):
-+ self._poll_invalidations()
- self._inv_lock.acquire()
- try:
- # Non-ghostifiable objects may need to read when they are
-Index: DB.py
-===================================================================
---- DB.py (revision 87280)
-+++ DB.py (working copy)
-@@ -260,6 +260,10 @@
- storage.store(z64, None, file.getvalue(), '', t)
- storage.tpc_vote(t)
- storage.tpc_finish(t)
-+ if hasattr(storage, 'connection_closing'):
-+ # Let the storage release whatever resources it used for loading
-+ # the root object.
-+ storage.connection_closing()
-
- # Multi-database setup.
- if databases is None:
Deleted: relstorage/trunk/poll-invalidation-1-zodb-3-8-0.patch
===================================================================
--- relstorage/trunk/poll-invalidation-1-zodb-3-8-0.patch 2009-09-25 22:09:42 UTC (rev 104555)
+++ relstorage/trunk/poll-invalidation-1-zodb-3-8-0.patch 2009-09-25 22:13:39 UTC (rev 104556)
@@ -1,97 +0,0 @@
-Index: Connection.py
-===================================================================
---- Connection.py (revision 87666)
-+++ Connection.py (working copy)
-@@ -90,8 +90,15 @@
- self.connections = {self._db.database_name: self}
-
- self._version = version
-- self._normal_storage = self._storage = db._storage
-- self.new_oid = db._storage.new_oid
-+
-+ storage = db._storage
-+ m = getattr(storage, 'bind_connection', None)
-+ if m is not None:
-+ # Use a storage instance bound to this connection.
-+ storage = m(self)
-+ self._normal_storage = self._storage = storage
-+ self.new_oid = storage.new_oid
-+
- self._savepoint_storage = None
-
- # Do we need to join a txn manager?
-@@ -151,6 +158,12 @@
- # in the cache on abort and in other connections on finish.
- self._modified = []
-
-+ # Allow the storage to decide whether invalidations should
-+ # propagate between connections. If the storage provides MVCC
-+ # semantics, it is better to not propagate invalidations between
-+ # connections.
-+ self._propagate_invalidations = getattr(
-+ self._storage, 'propagate_invalidations', True)
-
- # _invalidated queues invalidate messages delivered from the DB
- # _inv_lock prevents one thread from modifying the set while
-@@ -297,6 +310,11 @@
- if self._opened:
- self.transaction_manager.unregisterSynch(self)
-
-+ # If the storage wants to know, tell it this connection is closing.
-+ m = getattr(self._storage, 'connection_closing', None)
-+ if m is not None:
-+ m()
-+
- if primary:
- for connection in self.connections.values():
- if connection is not self:
-@@ -328,6 +346,10 @@
-
- def invalidate(self, tid, oids):
- """Notify the Connection that transaction 'tid' invalidated oids."""
-+ if not self._propagate_invalidations:
-+ # The storage disabled inter-connection invalidation.
-+ return
-+
- self._inv_lock.acquire()
- try:
- if self._txn_time is None:
-@@ -469,8 +491,23 @@
- self._registered_objects = []
- self._creating.clear()
-
-+ def _poll_invalidations(self):
-+ """Poll and process object invalidations provided by the storage.
-+ """
-+ m = getattr(self._storage, 'poll_invalidations', None)
-+ if m is not None:
-+ # Poll the storage for invalidations.
-+ invalidated = m()
-+ if invalidated is None:
-+ # special value: the transaction is so old that
-+ # we need to flush the whole cache.
-+ self._cache.invalidate(self._cache.cache_data.keys())
-+ elif invalidated:
-+ self._cache.invalidate(invalidated)
-+
- # Process pending invalidations.
- def _flush_invalidations(self):
-+ self._poll_invalidations()
- self._inv_lock.acquire()
- try:
- # Non-ghostifiable objects may need to read when they are
-Index: DB.py
-===================================================================
---- DB.py (revision 87666)
-+++ DB.py (working copy)
-@@ -284,6 +284,10 @@
- storage.store(z64, None, file.getvalue(), '', t)
- storage.tpc_vote(t)
- storage.tpc_finish(t)
-+ if hasattr(storage, 'connection_closing'):
-+ # Let the storage release whatever resources it used for loading
-+ # the root object.
-+ storage.connection_closing()
-
- # Multi-database setup.
- if databases is None:
Copied: relstorage/trunk/poll-invalidation-zodb-3-7.patch (from rev 104541, relstorage/trunk/poll-invalidation-1-zodb-3-7-1.patch)
===================================================================
--- relstorage/trunk/poll-invalidation-zodb-3-7.patch (rev 0)
+++ relstorage/trunk/poll-invalidation-zodb-3-7.patch 2009-09-25 22:13:39 UTC (rev 104556)
@@ -0,0 +1,96 @@
+Index: Connection.py
+===================================================================
+--- Connection.py (revision 104552)
++++ Connection.py (working copy)
+@@ -75,8 +75,14 @@
+ """Create a new Connection."""
+
+ self._db = db
+- self._normal_storage = self._storage = db._storage
+- self.new_oid = db._storage.new_oid
++ storage = db._storage
++ m = getattr(storage, 'bind_connection', None)
++ if m is not None:
++ # Use a storage instance bound to this connection.
++ storage = m(self)
++
++ self._normal_storage = self._storage = storage
++ self.new_oid = storage.new_oid
+ self._savepoint_storage = None
+
+ self.transaction_manager = self._synch = self._mvcc = None
+@@ -170,6 +176,12 @@
+ # Multi-database support
+ self.connections = {self._db.database_name: self}
+
++ # Allow the storage to decide whether invalidations should
++ # propagate between connections. If the storage provides MVCC
++ # semantics, it is better to not propagate invalidations between
++ # connections.
++ self._propagate_invalidations = getattr(
++ self._storage, 'propagate_invalidations', True)
+
+ def add(self, obj):
+ """Add a new object 'obj' to the database and assign it an oid."""
+@@ -267,6 +279,11 @@
+ self.transaction_manager.unregisterSynch(self)
+ self._synch = None
+
++ # If the storage wants to know, tell it this connection is closing.
++ m = getattr(self._storage, 'connection_closing', None)
++ if m is not None:
++ m()
++
+ if primary:
+ for connection in self.connections.values():
+ if connection is not self:
+@@ -438,8 +455,23 @@
+ self._registered_objects = []
+ self._creating.clear()
+
++ def _poll_invalidations(self):
++ """Poll and process object invalidations provided by the storage.
++ """
++ m = getattr(self._storage, 'poll_invalidations', None)
++ if m is not None:
++ # Poll the storage for invalidations.
++ invalidated = m()
++ if invalidated is None:
++ # special value: the transaction is so old that
++ # we need to flush the whole cache.
++ self._cache.invalidate(self._cache.cache_data.keys())
++ elif invalidated:
++ self._cache.invalidate(invalidated)
++
+ # Process pending invalidations.
+ def _flush_invalidations(self):
++ self._poll_invalidations()
+ self._inv_lock.acquire()
+ try:
+ # Non-ghostifiable objects may need to read when they are
+@@ -698,6 +730,10 @@
+ """Indicate confirmation that the transaction is done."""
+
+ def callback(tid):
++ if not self._propagate_invalidations:
++ # The storage disabled inter-connection invalidation.
++ return
++
+ d = dict.fromkeys(self._modified)
+ self._db.invalidate(tid, d, self)
+ # It's important that the storage calls the passed function
+Index: DB.py
+===================================================================
+--- DB.py (revision 104552)
++++ DB.py (working copy)
+@@ -260,6 +260,10 @@
+ storage.store(z64, None, file.getvalue(), '', t)
+ storage.tpc_vote(t)
+ storage.tpc_finish(t)
++ if hasattr(storage, 'connection_closing'):
++ # Let the storage release whatever resources it used for loading
++ # the root object.
++ storage.connection_closing()
+
+ # Multi-database setup.
+ if databases is None:
Property changes on: relstorage/trunk/poll-invalidation-zodb-3-7.patch
___________________________________________________________________
Added: svn:mergeinfo
+
Copied: relstorage/trunk/poll-invalidation-zodb-3-8.patch (from rev 104541, relstorage/trunk/poll-invalidation-1-zodb-3-8-0.patch)
===================================================================
--- relstorage/trunk/poll-invalidation-zodb-3-8.patch (rev 0)
+++ relstorage/trunk/poll-invalidation-zodb-3-8.patch 2009-09-25 22:13:39 UTC (rev 104556)
@@ -0,0 +1,96 @@
+Index: Connection.py
+===================================================================
+--- Connection.py (revision 104546)
++++ Connection.py (working copy)
+@@ -90,8 +90,15 @@
+ self.connections = {self._db.database_name: self}
+
+ self._version = version
+- self._normal_storage = self._storage = db._storage
+- self.new_oid = db._storage.new_oid
++
++ storage = db._storage
++ m = getattr(storage, 'bind_connection', None)
++ if m is not None:
++ # Use a storage instance bound to this connection.
++ storage = m(self)
++ self._normal_storage = self._storage = storage
++ self.new_oid = storage.new_oid
++
+ self._savepoint_storage = None
+
+ # Do we need to join a txn manager?
+@@ -151,6 +158,12 @@
+ # in the cache on abort and in other connections on finish.
+ self._modified = []
+
++ # Allow the storage to decide whether invalidations should
++ # propagate between connections. If the storage provides MVCC
++ # semantics, it is better to not propagate invalidations between
++ # connections.
++ self._propagate_invalidations = getattr(
++ self._storage, 'propagate_invalidations', True)
+
+ # _invalidated queues invalidate messages delivered from the DB
+ # _inv_lock prevents one thread from modifying the set while
+@@ -297,6 +310,11 @@
+ if self._opened:
+ self.transaction_manager.unregisterSynch(self)
+
++ # If the storage wants to know, tell it this connection is closing.
++ m = getattr(self._storage, 'connection_closing', None)
++ if m is not None:
++ m()
++
+ if primary:
+ for connection in self.connections.values():
+ if connection is not self:
+@@ -469,8 +487,23 @@
+ self._registered_objects = []
+ self._creating.clear()
+
++ def _poll_invalidations(self):
++ """Poll and process object invalidations provided by the storage.
++ """
++ m = getattr(self._storage, 'poll_invalidations', None)
++ if m is not None:
++ # Poll the storage for invalidations.
++ invalidated = m()
++ if invalidated is None:
++ # special value: the transaction is so old that
++ # we need to flush the whole cache.
++ self._cache.invalidate(self._cache.cache_data.keys())
++ elif invalidated:
++ self._cache.invalidate(invalidated)
++
+ # Process pending invalidations.
+ def _flush_invalidations(self):
++ self._poll_invalidations()
+ self._inv_lock.acquire()
+ try:
+ # Non-ghostifiable objects may need to read when they are
+@@ -748,6 +781,9 @@
+ """Indicate confirmation that the transaction is done."""
+
+ def callback(tid):
++ if not self._propagate_invalidations:
++ # The storage disabled inter-connection invalidation.
++ return
+ d = dict.fromkeys(self._modified)
+ self._db.invalidate(tid, d, self)
+ # It's important that the storage calls the passed function
+Index: DB.py
+===================================================================
+--- DB.py (revision 104546)
++++ DB.py (working copy)
+@@ -284,6 +284,10 @@
+ storage.store(z64, None, file.getvalue(), '', t)
+ storage.tpc_vote(t)
+ storage.tpc_finish(t)
++ if hasattr(storage, 'connection_closing'):
++ # Let the storage release whatever resources it used for loading
++ # the root object.
++ storage.connection_closing()
+
+ # Multi-database setup.
+ if databases is None:
Property changes on: relstorage/trunk/poll-invalidation-zodb-3-8.patch
___________________________________________________________________
Added: svn:mergeinfo
+
Modified: relstorage/trunk/relstorage/config.py
===================================================================
--- relstorage/trunk/relstorage/config.py 2009-09-25 22:09:42 UTC (rev 104555)
+++ relstorage/trunk/relstorage/config.py 2009-09-25 22:13:39 UTC (rev 104556)
@@ -15,7 +15,7 @@
from ZODB.config import BaseConfig
-from relstorage import RelStorage, Options
+from relstorage.storage import RelStorage, Options
class RelStorageFactory(BaseConfig):
Deleted: relstorage/trunk/relstorage/relstorage.py
===================================================================
--- relstorage/trunk/relstorage/relstorage.py 2009-09-25 22:09:42 UTC (rev 104555)
+++ relstorage/trunk/relstorage/relstorage.py 2009-09-25 22:13:39 UTC (rev 104556)
@@ -1,1425 +0,0 @@
-##############################################################################
-#
-# Copyright (c) 2008 Zope Foundation and Contributors.
-# All Rights Reserved.
-#
-# This software is subject to the provisions of the Zope Public License,
-# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
-# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
-# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
-# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
-# FOR A PARTICULAR PURPOSE.
-#
-##############################################################################
-"""The core of RelStorage, a ZODB storage for relational databases.
-
-Stores pickles in the database.
-"""
-
-from persistent.TimeStamp import TimeStamp
-from ZODB.BaseStorage import BaseStorage
-from ZODB.BaseStorage import DataRecord
-from ZODB.BaseStorage import TransactionRecord
-from ZODB import ConflictResolution
-from ZODB import POSException
-from ZODB.POSException import POSKeyError
-from ZODB.utils import p64
-from ZODB.utils import u64
-from zope.interface import implements
-from zope.interface import Interface
-import base64
-import cPickle
-import logging
-import os
-import sys
-import tempfile
-import time
-import weakref
-import ZODB.interfaces
-
-try:
- from ZODB.interfaces import StorageStopIteration
-except ImportError:
- class StorageStopIteration(IndexError, StopIteration):
- """A combination of StopIteration and IndexError to provide a
- backwards-compatible exception.
- """
-
-_relstorage_interfaces = []
-for name in (
- 'IStorage',
- 'IMVCCStorage',
- 'IStorageRestoreable',
- 'IStorageIteration',
- 'IStorageUndoable',
- 'IBlobStorage',
- 'IBlobStorageRestoreable',
- ):
- if hasattr(ZODB.interfaces, name):
- _relstorage_interfaces.append(getattr(ZODB.interfaces, name))
-
-log = logging.getLogger("relstorage")
-
-# Set the RELSTORAGE_ABORT_EARLY environment variable when debugging
-# a failure revealed by the ZODB test suite. The test suite often fails
-# to call tpc_abort in the event of an error, leading to deadlocks.
-# This variable causes RelStorage to abort failed transactions
-# early rather than wait for an explicit abort.
-abort_early = os.environ.get('RELSTORAGE_ABORT_EARLY')
-
-
-class RelStorage(BaseStorage,
- ConflictResolution.ConflictResolvingStorage):
- """Storage to a relational database, based on invalidation polling"""
- implements(*_relstorage_interfaces)
-
- def __init__(self, adapter, name=None, create=True,
- read_only=False, options=None, **kwoptions):
- if name is None:
- name = 'RelStorage on %s' % adapter.__class__.__name__
-
- self._adapter = adapter
- self._name = name
- self._is_read_only = read_only
- if options is None:
- options = Options()
- for key, value in kwoptions.iteritems():
- if key in options.__dict__:
- setattr(options, key, value)
- else:
- raise TypeError("Unknown parameter: %s" % key)
- elif kwoptions:
- raise TypeError("The RelStorage constructor accepts either "
- "an options parameter or keyword arguments, not both")
- self._options = options
- self._cache_client = None
-
- if create:
- self._adapter.schema.prepare()
-
- # load_conn and load_cursor are open most of the time.
- self._load_conn = None
- self._load_cursor = None
- self._load_transaction_open = False
- self._open_load_connection()
- # store_conn and store_cursor are open during commit,
- # but not necessarily open at other times.
- self._store_conn = None
- self._store_cursor = None
-
- BaseStorage.__init__(self, name)
-
- self._tid = None
- self._ltid = None
-
- # _prepared_txn is the name of the transaction to commit in the
- # second phase.
- self._prepared_txn = None
-
- # _instances is a list of weak references to storage instances bound
- # to the same database.
- self._instances = []
-
- # _closed is True after self.close() is called. Since close()
- # can be called from another thread, access to self._closed should
- # be inside a _lock_acquire()/_lock_release() block.
- self._closed = False
-
- # _max_stored_oid is the highest OID stored by the current
- # transaction
- self._max_stored_oid = 0
-
- # _max_new_oid is the highest OID provided by new_oid()
- self._max_new_oid = 0
-
- # set _cache_client
- if options.cache_servers:
- module_name = options.cache_module_name
- module = __import__(module_name, {}, {}, ['Client'])
- servers = options.cache_servers
- if isinstance(servers, basestring):
- servers = servers.split()
- self._cache_client = module.Client(servers)
- else:
- self._cache_client = None
-
- # _prev_polled_tid contains the tid at the previous poll
- self._prev_polled_tid = None
-
- # _polled_commit_count contains the last polled value of the
- # 'commit_count' cache key
- self._polled_commit_count = 0
-
- # _poll_at is the time to poll regardless of commit_count
- self._poll_at = 0
-
- # _txn_blobs: {oid->filename}; contains blob data for the
- # currently uncommitted transaction.
- self._txn_blobs = None
-
- if options.blob_dir:
- from ZODB.blob import FilesystemHelper
- self.fshelper = FilesystemHelper(options.blob_dir)
- if create:
- self.fshelper.create()
- self.fshelper.checkSecure()
- else:
- self.fshelper = None
-
- def _open_load_connection(self):
- """Open the load connection to the database. Return nothing."""
- conn, cursor = self._adapter.connmanager.open_for_load()
- self._drop_load_connection()
- self._load_conn, self._load_cursor = conn, cursor
- self._load_transaction_open = True
-
- def _drop_load_connection(self):
- """Unconditionally drop the load connection"""
- conn, cursor = self._load_conn, self._load_cursor
- self._load_conn, self._load_cursor = None, None
- self._adapter.connmanager.close(conn, cursor)
- self._load_transaction_open = False
-
- def _rollback_load_connection(self):
- if self._load_conn is not None:
- try:
- self._load_conn.rollback()
- except:
- self._drop_load_connection()
- raise
- self._load_transaction_open = False
-
- def _restart_load(self):
- """Restart the load connection, creating a new connection if needed"""
- if self._load_cursor is None:
- self._open_load_connection()
- else:
- try:
- self._adapter.connmanager.restart_load(
- self._load_conn, self._load_cursor)
- except POSException.StorageError, e:
- log.warning("Reconnecting load_conn: %s", e)
- self._drop_load_connection()
- try:
- self._open_load_connection()
- except:
- log.exception("Reconnect failed.")
- raise
- else:
- log.info("Reconnected.")
- self._load_transaction_open = True
-
-
- def _open_store_connection(self):
- """Open the store connection to the database. Return nothing."""
- conn, cursor = self._adapter.connmanager.open_for_store()
- self._drop_store_connection()
- self._store_conn, self._store_cursor = conn, cursor
-
- def _drop_store_connection(self):
- """Unconditionally drop the store connection"""
- conn, cursor = self._store_conn, self._store_cursor
- self._store_conn, self._store_cursor = None, None
- self._adapter.connmanager.close(conn, cursor)
-
- def _restart_store(self):
- """Restart the store connection, creating a new connection if needed"""
- if self._store_cursor is None:
- self._open_store_connection()
- else:
- try:
- self._adapter.connmanager.restart_store(
- self._store_conn, self._store_cursor)
- except POSException.StorageError, e:
- log.warning("Reconnecting store_conn: %s", e)
- self._drop_store_connection()
- try:
- self._open_store_connection()
- except:
- log.exception("Reconnect failed.")
- raise
- else:
- log.info("Reconnected.")
-
-
- def zap_all(self):
- """Clear all objects and transactions out of the database.
-
- Used by the test suite and the ZODBConvert script.
- """
- self._adapter.schema.zap_all()
- self._rollback_load_connection()
- cache = self._cache_client
- if cache is not None:
- cache.flush_all()
-
- def release(self):
- """Release back end database sessions used by this storage instance.
- """
- self._lock_acquire()
- try:
- self._drop_load_connection()
- self._drop_store_connection()
- finally:
- self._lock_release()
-
- def close(self):
- """Close the storage and all instances."""
- self._lock_acquire()
- try:
- self._closed = True
- self._drop_load_connection()
- self._drop_store_connection()
- for wref in self._instances:
- instance = wref()
- if instance is not None:
- instance.close()
- finally:
- self._lock_release()
-
- def new_instance(self):
- """Creates and returns another storage instance.
-
- See ZODB.interfaces.IMVCCStorage.
- """
- other = RelStorage(adapter=self._adapter, name=self._name,
- create=False, read_only=self._is_read_only,
- options=self._options)
- self._instances.append(weakref.ref(other))
- return other
-
- def __len__(self):
- return self._adapter.stats.get_object_count()
-
- def getSize(self):
- """Return database size in bytes"""
- return self._adapter.stats.get_db_size()
-
- def _log_keyerror(self, oid_int, reason):
- """Log just before raising KeyError in load().
-
- KeyErrors in load() are generally not supposed to happen,
- so this is a good place to gather information.
- """
- cursor = self._load_cursor
- adapter = self._adapter
- logfunc = log.warning
- msg = ["Storage KeyError on oid %d: %s" % (oid_int, reason)]
- rows = adapter.dbiter.iter_transactions(cursor)
- row = None
- for row in rows:
- # just get the first row
- break
- if not row:
- # This happens when initializing a new database, so it's
- # not a warning.
- logfunc = log.debug
- msg.append("No transactions exist")
- else:
- msg.append("Current transaction is %d" % row[0])
-
- tids = []
- try:
- rows = adapter.dbiter.iter_object_history(cursor, oid_int)
- except KeyError:
- # The object has no history, at least from the point of view
- # of the current database load connection.
- pass
- else:
- for row in rows:
- tids.append(row[0])
- if len(tids) >= 10:
- break
- msg.append("Recent object tids: %s" % repr(tids))
- logfunc('; '.join(msg))
-
- def _get_oid_cache_key(self, oid_int):
- """Return the cache key for finding the current tid."""
- my_tid = self._prev_polled_tid
- if my_tid is None:
- return None
- return 'tid:%d:%d' % (oid_int, my_tid)
-
- def load(self, oid, version):
- oid_int = u64(oid)
- cache = self._cache_client
-
- self._lock_acquire()
- try:
- if not self._load_transaction_open:
- self._restart_load()
- cursor = self._load_cursor
- if cache is None:
- state, tid_int = self._adapter.mover.load_current(
- cursor, oid_int)
- else:
- # get tid_int from the cache or the database
- cachekey = self._get_oid_cache_key(oid_int)
- if cachekey:
- tid_int = cache.get(cachekey)
- if not cachekey or not tid_int:
- tid_int = self._adapter.mover.get_current_tid(
- cursor, oid_int)
- if cachekey and tid_int is not None:
- cache.set(cachekey, tid_int)
- if tid_int is None:
- self._log_keyerror(oid_int, "no tid found(1)")
- raise POSKeyError(oid)
-
- # get state from the cache or the database
- cachekey = 'state:%d:%d' % (oid_int, tid_int)
- state = cache.get(cachekey)
- if not state:
- state = self._adapter.mover.load_revision(
- cursor, oid_int, tid_int)
- if state:
- state = str(state)
- cache.set(cachekey, state)
- finally:
- self._lock_release()
-
- if tid_int is not None:
- if state:
- state = str(state)
- if not state:
- # This can happen if something attempts to load
- # an object whose creation has been undone.
- self._log_keyerror(oid_int, "creation has been undone")
- raise POSKeyError(oid)
- return state, p64(tid_int)
- else:
- self._log_keyerror(oid_int, "no tid found(2)")
- raise POSKeyError(oid)
-
- def loadEx(self, oid, version):
- # Since we don't support versions, just tack the empty version
- # string onto load's result.
- return self.load(oid, version) + ("",)
-
- def loadSerial(self, oid, serial):
- """Load a specific revision of an object"""
- oid_int = u64(oid)
- tid_int = u64(serial)
- cache = self._cache_client
- if cache is not None:
- cachekey = 'state:%d:%d' % (oid_int, tid_int)
- state = cache.get(cachekey)
- if state:
- return state
-
- self._lock_acquire()
- try:
- if not self._load_transaction_open:
- self._restart_load()
- state = self._adapter.mover.load_revision(
- self._load_cursor, oid_int, tid_int)
- if state is None and self._store_cursor is not None:
- # Allow loading data from later transactions
- # for conflict resolution.
- state = self._adapter.mover.load_revision(
- self._store_cursor, oid_int, tid_int)
- finally:
- self._lock_release()
-
- if state is not None:
- state = str(state)
- if not state:
- raise POSKeyError(oid)
- if cache is not None:
- cache.set(cachekey, state)
- return state
- else:
- raise POSKeyError(oid)
-
- def loadBefore(self, oid, tid):
- """Return the most recent revision of oid before tid committed."""
- oid_int = u64(oid)
-
- self._lock_acquire()
- try:
- if self._store_cursor is not None:
- # Allow loading data from later transactions
- # for conflict resolution.
- cursor = self._store_cursor
- else:
- if not self._load_transaction_open:
- self._restart_load()
- cursor = self._load_cursor
- if not self._adapter.mover.exists(cursor, u64(oid)):
- raise POSKeyError(oid)
-
- state, start_tid = self._adapter.mover.load_before(
- cursor, oid_int, u64(tid))
- if start_tid is not None:
- end_int = self._adapter.mover.get_object_tid_after(
- cursor, oid_int, start_tid)
- if end_int is not None:
- end = p64(end_int)
- else:
- end = None
- if state is not None:
- state = str(state)
- return state, p64(start_tid), end
- else:
- return None
- finally:
- self._lock_release()
-
-
- def store(self, oid, serial, data, version, transaction):
- if self._is_read_only:
- raise POSException.ReadOnlyError()
- if transaction is not self._transaction:
- raise POSException.StorageTransactionError(self, transaction)
- if version:
- raise POSException.Unsupported("Versions aren't supported")
-
- # If self._prepared_txn is not None, that means something is
- # attempting to store objects after the vote phase has finished.
- # That should not happen, should it?
- assert self._prepared_txn is None
-
- adapter = self._adapter
- cursor = self._store_cursor
- assert cursor is not None
- oid_int = u64(oid)
- if serial:
- prev_tid_int = u64(serial)
- else:
- prev_tid_int = 0
-
- self._lock_acquire()
- try:
- self._max_stored_oid = max(self._max_stored_oid, oid_int)
- # save the data in a temporary table
- adapter.mover.store_temp(cursor, oid_int, prev_tid_int, data)
- return None
- finally:
- self._lock_release()
-
-
- def restore(self, oid, serial, data, version, prev_txn, transaction):
- # Like store(), but used for importing transactions. See the
- # comments in FileStorage.restore(). The prev_txn optimization
- # is not used.
- if self._is_read_only:
- raise POSException.ReadOnlyError()
- if transaction is not self._transaction:
- raise POSException.StorageTransactionError(self, transaction)
- if version:
- raise POSException.Unsupported("Versions aren't supported")
-
- assert self._tid is not None
- assert self._prepared_txn is None
-
- adapter = self._adapter
- cursor = self._store_cursor
- assert cursor is not None
- oid_int = u64(oid)
- tid_int = u64(serial)
-
- self._lock_acquire()
- try:
- self._max_stored_oid = max(self._max_stored_oid, oid_int)
- # save the data. Note that data can be None.
- adapter.mover.restore(cursor, oid_int, tid_int, data)
- finally:
- self._lock_release()
-
-
- def tpc_begin(self, transaction, tid=None, status=' '):
- if self._is_read_only:
- raise POSException.ReadOnlyError()
- self._lock_acquire()
- try:
- if self._transaction is transaction:
- return
- self._lock_release()
- self._commit_lock_acquire()
- self._lock_acquire()
- self._transaction = transaction
- self._clear_temp()
-
- user = str(transaction.user)
- desc = str(transaction.description)
- ext = transaction._extension
- if ext:
- ext = cPickle.dumps(ext, 1)
- else:
- ext = ""
- self._ude = user, desc, ext
- self._tstatus = status
-
- adapter = self._adapter
- self._restart_store()
-
- if tid is not None:
- # get the commit lock and add the transaction now
- cursor = self._store_cursor
- packed = (status == 'p')
- adapter.locker.hold_commit_lock(cursor, ensure_current=True)
- tid_int = u64(tid)
- try:
- adapter.txncontrol.add_transaction(
- cursor, tid_int, user, desc, ext, packed)
- except:
- self._drop_store_connection()
- raise
- # else choose the tid later
- self._tid = tid
-
- finally:
- self._lock_release()
-
- def _prepare_tid(self):
- """Choose a tid for the current transaction.
-
- This should be done as late in the commit as possible, since
- it must hold an exclusive commit lock.
- """
- if self._tid is not None:
- return
- if self._transaction is None:
- raise POSException.StorageError("No transaction in progress")
-
- adapter = self._adapter
- cursor = self._store_cursor
- adapter.locker.hold_commit_lock(cursor, ensure_current=True)
- user, desc, ext = self._ude
-
- # Choose a transaction ID.
- # Base the transaction ID on the database time,
- # while ensuring that the tid of this transaction
- # is greater than any existing tid.
- last_tid, now = adapter.txncontrol.get_tid_and_time(cursor)
- stamp = TimeStamp(*(time.gmtime(now)[:5] + (now % 60,)))
- stamp = stamp.laterThan(TimeStamp(p64(last_tid)))
- tid = repr(stamp)
-
- tid_int = u64(tid)
- adapter.txncontrol.add_transaction(cursor, tid_int, user, desc, ext)
- self._tid = tid
-
-
- def _clear_temp(self):
- # It is assumed that self._lock_acquire was called before this
- # method was called.
- self._prepared_txn = None
- self._max_stored_oid = 0
-
-
- def _finish_store(self):
- """Move stored objects from the temporary table to final storage.
-
- Returns a list of (oid, tid) to be received by
- Connection._handle_serial().
- """
- assert self._tid is not None
- cursor = self._store_cursor
- adapter = self._adapter
-
- # Detect conflicting changes.
- # Try to resolve the conflicts.
- resolved = set() # a set of OIDs
- while True:
- conflict = adapter.mover.detect_conflict(cursor)
- if conflict is None:
- break
-
- oid_int, prev_tid_int, serial_int, data = conflict
- oid = p64(oid_int)
- prev_tid = p64(prev_tid_int)
- serial = p64(serial_int)
-
- rdata = self.tryToResolveConflict(oid, prev_tid, serial, data)
- if rdata is None:
- # unresolvable; kill the whole transaction
- raise POSException.ConflictError(
- oid=oid, serials=(prev_tid, serial), data=data)
- else:
- # resolved
- data = rdata
- self._adapter.mover.replace_temp(
- cursor, oid_int, prev_tid_int, data)
- resolved.add(oid)
-
- # Move the new states into the permanent table
- tid_int = u64(self._tid)
- serials = []
- oid_ints = adapter.mover.move_from_temp(cursor, tid_int)
- for oid_int in oid_ints:
- oid = p64(oid_int)
- if oid in resolved:
- serial = ConflictResolution.ResolvedSerial
- else:
- serial = self._tid
- serials.append((oid, serial))
-
- return serials
-
-
- def _vote(self):
- """Prepare the transaction for final commit."""
- # This method initiates a two-phase commit process,
- # saving the name of the prepared transaction in self._prepared_txn.
-
- # It is assumed that self._lock_acquire was called before this
- # method was called.
-
- if self._prepared_txn is not None:
- # the vote phase has already completed
- return
-
- cursor = self._store_cursor
- assert cursor is not None
- conn = self._store_conn
-
- if self._max_stored_oid > self._max_new_oid:
- self._adapter.oidallocator.set_min_oid(
- cursor, self._max_stored_oid + 1)
-
- self._prepare_tid()
- tid_int = u64(self._tid)
-
- serials = self._finish_store()
- self._adapter.mover.update_current(cursor, tid_int)
- self._prepared_txn = self._adapter.txncontrol.commit_phase1(
- conn, cursor, tid_int)
-
- if self._txn_blobs:
- # We now have a transaction ID, so rename all the blobs
- # accordingly.
- for oid, sourcename in self._txn_blobs.items():
- targetname = self.fshelper.getBlobFilename(oid, self._tid)
- if sourcename != targetname:
- ZODB.blob.rename_or_copy_blob(sourcename, targetname)
- self._txn_blobs[oid] = targetname
-
- return serials
-
-
- def tpc_vote(self, transaction):
- self._lock_acquire()
- try:
- if transaction is not self._transaction:
- return
- try:
- return self._vote()
- except:
- if abort_early:
- # abort early to avoid lockups while running the
- # somewhat brittle ZODB test suite
- self.tpc_abort(transaction)
- raise
- finally:
- self._lock_release()
-
-
- def _finish(self, tid, user, desc, ext):
- """Commit the transaction."""
- # It is assumed that self._lock_acquire was called before this
- # method was called.
- assert self._tid is not None
- try:
- self._rollback_load_connection()
- txn = self._prepared_txn
- assert txn is not None
- self._adapter.txncontrol.commit_phase2(
- self._store_conn, self._store_cursor, txn)
- self._adapter.locker.release_commit_lock(self._store_cursor)
- cache = self._cache_client
- if cache is not None:
- if cache.incr('commit_count') is None:
- # Use the current time as an initial commit_count value.
- cache.add('commit_count', int(time.time()))
- # A concurrent committer could have won the race to set the
- # initial commit_count. Increment commit_count so that it
- # doesn't matter who won.
- cache.incr('commit_count')
- self._ltid = self._tid
-
- #if self._txn_blobs and not self._adapter.keep_history:
- ## For each blob just committed, get the name of
- ## one earlier revision (if any) and write the
- ## name of the file to a log. At pack time,
- ## all the files in the log will be deleted and
- ## the log will be cleared.
- #for oid, filename in self._txn_blobs.iteritems():
- #dirname, current_name = os.path.split(filename)
- #names = os.listdir(dirname)
- #names.sort()
- #if current_name in names:
- #i = names.index(current_name)
- #if i > 0:
- # to_delete = os.path.join(dirname, names[i-1])
- # log.write('%s\n') % to_delete
-
- finally:
- self._txn_blobs = None
- self._prepared_txn = None
- self._tid = None
- self._transaction = None
-
- def _abort(self):
- # the lock is held here
- try:
- self._rollback_load_connection()
- if self._store_cursor is not None:
- self._adapter.txncontrol.abort(
- self._store_conn, self._store_cursor, self._prepared_txn)
- self._adapter.locker.release_commit_lock(self._store_cursor)
- if self._txn_blobs:
- for oid, filename in self._txn_blobs.iteritems():
- if os.path.exists(filename):
- ZODB.blob.remove_committed(filename)
- dirname = os.path.dirname(filename)
- if not os.listdir(dirname):
- ZODB.blob.remove_committed_dir(dirname)
- finally:
- self._txn_blobs = None
- self._prepared_txn = None
- self._tid = None
- self._transaction = None
-
- def lastTransaction(self):
- return self._ltid
-
- def new_oid(self):
- if self._is_read_only:
- raise POSException.ReadOnlyError()
- self._lock_acquire()
- try:
- cursor = self._load_cursor
- if cursor is None:
- self._open_load_connection()
- cursor = self._load_cursor
- oid_int = self._adapter.oidallocator.new_oid(cursor)
- self._max_new_oid = max(self._max_new_oid, oid_int)
- return p64(oid_int)
- finally:
- self._lock_release()
-
- def cleanup(self):
- pass
-
- def supportsVersions(self):
- return False
-
- def modifiedInVersion(self, oid):
- return ''
-
- def supportsUndo(self):
- return self._adapter.keep_history
-
- def supportsTransactionalUndo(self):
- return self._adapter.keep_history
-
- def undoLog(self, first=0, last=-20, filter=None):
- if last < 0:
- last = first - last
-
- # use a private connection to ensure the most current results
- adapter = self._adapter
- conn, cursor = adapter.connmanager.open()
- try:
- rows = adapter.dbiter.iter_transactions(cursor)
- i = 0
- res = []
- for tid_int, user, desc, ext in rows:
- tid = p64(tid_int)
- d = {'id': base64.encodestring(tid)[:-1],
- 'time': TimeStamp(tid).timeTime(),
- 'user_name': user or '',
- 'description': desc or ''}
- if ext:
- d.update(cPickle.loads(ext))
- if filter is None or filter(d):
- if i >= first:
- res.append(d)
- i += 1
- if i >= last:
- break
- return res
-
- finally:
- adapter.connmanager.close(conn, cursor)
-
- def history(self, oid, version=None, size=1, filter=None):
- self._lock_acquire()
- try:
- cursor = self._load_cursor
- oid_int = u64(oid)
- try:
- rows = self._adapter.dbiter.iter_object_history(
- cursor, oid_int)
- except KeyError:
- raise POSKeyError(oid)
-
- res = []
- for tid_int, username, description, extension, length in rows:
- tid = p64(tid_int)
- if extension:
- d = loads(extension)
- else:
- d = {}
- d.update({"time": TimeStamp(tid).timeTime(),
- "user_name": username or '',
- "description": description or '',
- "tid": tid,
- "version": '',
- "size": length,
- })
- if filter is None or filter(d):
- res.append(d)
- if size is not None and len(res) >= size:
- break
- return res
- finally:
- self._lock_release()
-
-
- def undo(self, transaction_id, transaction):
- """Undo a transaction identified by transaction_id.
-
- transaction_id is the base 64 encoding of an 8 byte tid.
- Undo by writing new data that reverses the action taken by
- the transaction.
- """
-
- if self._is_read_only:
- raise POSException.ReadOnlyError()
- if transaction is not self._transaction:
- raise POSException.StorageTransactionError(self, transaction)
-
- undo_tid = base64.decodestring(transaction_id + '\n')
- assert len(undo_tid) == 8
- undo_tid_int = u64(undo_tid)
-
- self._lock_acquire()
- try:
- adapter = self._adapter
- cursor = self._store_cursor
- assert cursor is not None
-
- adapter.locker.hold_pack_lock(cursor)
- try:
- # Note that _prepare_tid acquires the commit lock.
- # The commit lock must be acquired after the pack lock
- # because the database adapters also acquire in that
- # order during packing.
- self._prepare_tid()
- adapter.packundo.verify_undoable(cursor, undo_tid_int)
-
- self_tid_int = u64(self._tid)
- copied = adapter.packundo.undo(
- cursor, undo_tid_int, self_tid_int)
- oids = [p64(oid_int) for oid_int, _ in copied]
-
- # Update the current object pointers immediately, so that
- # subsequent undo operations within this transaction will see
- # the new current objects.
- adapter.mover.update_current(cursor, self_tid_int)
-
- if self.fshelper is not None:
- self._copy_undone_blobs(copied)
-
- return self._tid, oids
- finally:
- adapter.locker.release_pack_lock(cursor)
- finally:
- self._lock_release()
-
- def _copy_undone_blobs(self, copied):
- """After an undo operation, copy the matching blobs forward.
-
- The copied parameter is a list of (integer oid, integer tid).
- """
- for oid_int, old_tid_int in copied:
- oid = p64(oid_int)
- old_tid = p64(old_tid_int)
- orig_fn = self.fshelper.getBlobFilename(oid, old_tid)
- if not os.path.exists(orig_fn):
- # not a blob
- continue
-
- new_fn = self.fshelper.getBlobFilename(oid, self._tid)
- orig = open(orig_fn, 'r')
- new = open(new_fn, 'wb')
- ZODB.utils.cp(orig, new)
- orig.close()
- new.close()
-
- self._add_blob_to_transaction(oid, new_fn)
-
- def pack(self, t, referencesf, sleep=None):
- if self._is_read_only:
- raise POSException.ReadOnlyError()
-
- pack_point = repr(TimeStamp(*time.gmtime(t)[:5]+(t%60,)))
- pack_point_int = u64(pack_point)
-
- def get_references(state):
- """Return the set of OIDs the given state refers to."""
- refs = set()
- if state:
- for oid in referencesf(str(state)):
- refs.add(u64(oid))
- return refs
-
- # Use a private connection (lock_conn and lock_cursor) to
- # hold the pack lock. Have the adapter open temporary
- # connections to do the actual work, allowing the adapter
- # to use special transaction modes for packing.
- adapter = self._adapter
- lock_conn, lock_cursor = adapter.connmanager.open()
- try:
- adapter.locker.hold_pack_lock(lock_cursor)
- try:
- # Find the latest commit before or at the pack time.
- tid_int = adapter.packundo.choose_pack_transaction(
- pack_point_int)
- if tid_int is None:
- log.debug("all transactions before %s have already "
- "been packed", time.ctime(t))
- return
-
- if self._options.pack_dry_run:
- log.info("pack: beginning dry run")
-
- s = time.ctime(TimeStamp(p64(tid_int)).timeTime())
- log.info("pack: analyzing transactions committed "
- "%s or before", s)
-
- # In pre_pack, the adapter fills tables with
- # information about what to pack. The adapter
- # must not actually pack anything yet.
- adapter.packundo.pre_pack(
- tid_int, get_references, self._options)
-
- if self._options.pack_dry_run:
- log.info("pack: dry run complete")
- else:
- # Now pack.
- if self.fshelper is not None:
- packed_func = self._after_pack
- else:
- packed_func = None
- adapter.packundo.pack(tid_int, self._options, sleep=sleep,
- packed_func=packed_func)
- finally:
- adapter.locker.release_pack_lock(lock_cursor)
- finally:
- lock_conn.rollback()
- adapter.connmanager.close(lock_conn, lock_cursor)
- self.sync()
-
- self._pack_finished()
-
- def _after_pack(self, oid_int, tid_int):
- """Called after an object state has been removed by packing.
-
- Removes the corresponding blob file.
- """
- oid = p64(oid_int)
- tid = p64(tid_int)
- fn = self.fshelper.getBlobFilename(oid, tid)
- if self._adapter.keep_history:
- # remove only the revision just packed
- if os.path.exists(fn):
- ZODB.blob.remove_committed(fn)
- dirname = os.path.dirname(fn)
- if not os.listdir(dirname):
- ZODB.blob.remove_committed_dir(dirname)
- else:
- # remove all revisions
- dirname = os.path.dirname(fn)
- if os.path.exists(dirname):
- for name in os.listdir(dirname):
- ZODB.blob.remove_committed(os.path.join(dirname, name))
- ZODB.blob.remove_committed_dir(dirname)
-
- def _pack_finished(self):
- if self.fshelper is None or self._adapter.keep_history:
- return
-
- # Remove all old revisions of blobs.
-
- def iterator(self, start=None, stop=None):
- return TransactionIterator(self._adapter, start, stop)
-
- def sync(self, force=True):
- """Updates to a current view of the database.
-
- This is implemented by rolling back the relational database
- transaction.
-
- If force is False and a poll interval has been set, this call
- is ignored. The poll_invalidations method will later choose to
- sync with the database only if enough time has elapsed since
- the last poll.
- """
- if not force and self._options.poll_interval:
- # keep the load transaction open so that it's possible
- # to ignore the next poll.
- return
- self._lock_acquire()
- try:
- if self._load_transaction_open:
- self._rollback_load_connection()
- finally:
- self._lock_release()
-
- def need_poll(self):
- """Return true if polling is needed"""
- now = time.time()
-
- cache = self._cache_client
- if cache is not None:
- new_commit_count = cache.get('commit_count')
- if new_commit_count != self._polled_commit_count:
- # There is new data ready to poll
- self._polled_commit_count = new_commit_count
- self._poll_at = now
- return True
-
- if not self._load_transaction_open:
- # Since the load connection is closed or does not have
- # a transaction in progress, polling is required.
- return True
-
- if now >= self._poll_at:
- # The poll timeout has expired
- return True
-
- return False
-
- def poll_invalidations(self):
- """Looks for OIDs of objects that changed since _prev_polled_tid
-
- Returns {oid: 1}, or None if all objects need to be invalidated
- because prev_polled_tid is not in the database (presumably it
- has been packed).
- """
- self._lock_acquire()
- try:
- if self._closed:
- return {}
-
- if self._options.poll_interval:
- if not self.need_poll():
- return {}
- # reset the timeout
- self._poll_at = time.time() + self._options.poll_interval
-
- self._restart_load()
- conn = self._load_conn
- cursor = self._load_cursor
-
- # Ignore changes made by the last transaction committed
- # by this connection.
- if self._ltid is not None:
- ignore_tid = u64(self._ltid)
- else:
- ignore_tid = None
-
- # get a list of changed OIDs and the most recent tid
- oid_ints, new_polled_tid = self._adapter.poller.poll_invalidations(
- conn, cursor, self._prev_polled_tid, ignore_tid)
- self._prev_polled_tid = new_polled_tid
-
- if oid_ints is None:
- oids = None
- else:
- oids = {}
- for oid_int in oid_ints:
- oids[p64(oid_int)] = 1
- return oids
- finally:
- self._lock_release()
-
- def loadBlob(self, oid, serial):
- """Return the filename of the Blob data for this OID and serial.
-
- Returns a filename.
-
- Raises POSKeyError if the blobfile cannot be found.
- """
- if self.fshelper is None:
- raise POSException.Unsupported("No blob directory is configured.")
-
- blob_filename = self.fshelper.getBlobFilename(oid, serial)
- if os.path.exists(blob_filename):
- return blob_filename
- else:
- raise POSKeyError("No blob file", oid, serial)
-
- def openCommittedBlobFile(self, oid, serial, blob=None):
- """Return a file for committed data for the given object id and serial
-
- If a blob is provided, then a BlobFile object is returned,
- otherwise, an ordinary file is returned. In either case, the
- file is opened for binary reading.
-
- This method is used to allow storages that cache blob data to
- make sure that data are available at least long enough for the
- file to be opened.
- """
- blob_filename = self.loadBlob(oid, serial)
- if blob is None:
- return open(blob_filename, 'rb')
- else:
- return ZODB.blob.BlobFile(blob_filename, 'r', blob)
-
- def temporaryDirectory(self):
- """Return a directory that should be used for uncommitted blob data.
-
- If Blobs use this, then commits can be performed with a simple rename.
- """
- return self.fshelper.temp_dir
-
- def storeBlob(self, oid, oldserial, data, blobfilename, version, txn):
- """Stores data that has a BLOB attached.
-
- The blobfilename argument names a file containing blob data.
- The storage will take ownership of the file and will rename it
- (or copy and remove it) immediately, or at transaction-commit
- time. The file must not be open.
-
- The new serial is returned.
- """
- assert not version
- self.store(oid, oldserial, data, '', txn)
- self._store_blob_data(oid, oldserial, blobfilename)
- return None
-
- def restoreBlob(self, oid, serial, data, blobfilename, prev_txn, txn):
- """Write blob data already committed in a separate database
-
- See the restore and storeBlob methods.
- """
- self.restore(oid, serial, data, '', prev_txn, txn)
- self._lock_acquire()
- try:
- self.fshelper.getPathForOID(oid, create=True)
- targetname = self.fshelper.getBlobFilename(oid, serial)
- ZODB.blob.rename_or_copy_blob(blobfilename, targetname)
- finally:
- self._lock_release()
-
- def _store_blob_data(self, oid, oldserial, filename):
- self.fshelper.getPathForOID(oid, create=True)
- fd, target = self.fshelper.blob_mkstemp(oid, oldserial)
- os.close(fd)
- if sys.platform == 'win32':
- # On windows, we can't rename to an existing file. We'll
- # use a slightly different file name. We keep the old one
- # until we're done to avoid conflicts. Then remove the old name.
- target += 'w'
- ZODB.blob.rename_or_copy_blob(filename, target)
- os.remove(target[:-1])
- else:
- ZODB.blob.rename_or_copy_blob(filename, target)
-
- self._add_blob_to_transaction(oid, target)
-
- def _add_blob_to_transaction(self, oid, filename):
- if self._txn_blobs is None:
- self._txn_blobs = {}
- else:
- old_filename = self._txn_blobs.get(oid)
- if old_filename is not None and old_filename != filename:
- ZODB.blob.remove_committed(old_filename)
- self._txn_blobs[oid] = filename
-
- def copyTransactionsFrom(self, other):
- # adapted from ZODB.blob.BlobStorageMixin
- for trans in other.iterator():
- self.tpc_begin(trans, trans.tid, trans.status)
- for record in trans:
- blobfilename = None
- if self.fshelper is not None:
- if ZODB.blob.is_blob_record(record.data):
- try:
- blobfilename = other.loadBlob(
- record.oid, record.tid)
- except POSKeyError:
- pass
- if blobfilename is not None:
- fd, name = tempfile.mkstemp(
- suffix='.tmp', dir=self.fshelper.temp_dir)
- os.close(fd)
- ZODB.utils.cp(open(blobfilename, 'rb'), open(name, 'wb'))
- self.restoreBlob(record.oid, record.tid, record.data,
- name, record.data_txn, trans)
- else:
- self.restore(record.oid, record.tid, record.data,
- '', record.data_txn, trans)
-
- self.tpc_vote(trans)
- self.tpc_finish(trans)
-
- # The propagate_invalidations flag implements the old
- # invalidation polling API and is not otherwise used. Set to a
- # false value, it tells the Connection not to propagate object
- # invalidations across connections, since that ZODB feature is
- # detrimental when the storage provides its own MVCC.
- propagate_invalidations = False
-
- def bind_connection(self, zodb_conn):
- """Make a new storage instance.
-
- This implements the old invalidation polling API and is not
- otherwise used.
- """
- return self.new_instance()
-
- def connection_closing(self):
- """Release resources
-
- This implements the old invalidation polling API and is not
- otherwise used.
- """
- self.sync(False)
-
-
-class TransactionIterator(object):
- """Iterate over the transactions in a RelStorage instance."""
-
- def __init__(self, adapter, start, stop):
- self._adapter = adapter
- self._conn, self._cursor = self._adapter.connmanager.open_for_load()
- self._closed = False
-
- if start is not None:
- start_int = u64(start)
- else:
- start_int = 1
- if stop is not None:
- stop_int = u64(stop)
- else:
- stop_int = None
-
- # _transactions: [(tid, username, description, extension, packed)]
- self._transactions = list(adapter.dbiter.iter_transactions_range(
- self._cursor, start_int, stop_int))
- self._index = 0
-
- def close(self):
- self._adapter.connmanager.close(self._conn, self._cursor)
- self._closed = True
-
- def iterator(self):
- return self
-
- def __iter__(self):
- return self
-
- def __len__(self):
- return len(self._transactions)
-
- def __getitem__(self, n):
- self._index = n
- return self.next()
-
- def next(self):
- if self._closed:
- raise IOError("TransactionIterator already closed")
- if self._index >= len(self._transactions):
- raise StorageStopIteration()
- params = self._transactions[self._index]
- res = RelStorageTransactionRecord(self, *params)
- self._index += 1
- return res
-
-
-class RelStorageTransactionRecord(TransactionRecord):
-
- def __init__(self, trans_iter, tid_int, user, desc, ext, packed):
- self._trans_iter = trans_iter
- self._tid_int = tid_int
- self.tid = p64(tid_int)
- self.status = packed and 'p' or ' '
- self.user = user or ''
- self.description = desc or ''
- if ext:
- self.extension = cPickle.loads(ext)
- else:
- self.extension = {}
-
- # maintain compatibility with the old (ZODB 3.8 and below) name of
- # the extension attribute.
- def _ext_set(self, value):
- self.extension = value
- def _ext_get(self):
- return self.extension
- _extension = property(fset=_ext_set, fget=_ext_get)
-
- def __iter__(self):
- return RecordIterator(self)
-
-
-class RecordIterator(object):
- """Iterate over the objects in a transaction."""
- def __init__(self, record):
- # record is a RelStorageTransactionRecord.
- cursor = record._trans_iter._cursor
- adapter = record._trans_iter._adapter
- tid_int = record._tid_int
- self.tid = record.tid
- self._records = list(adapter.dbiter.iter_objects(cursor, tid_int))
- self._index = 0
-
- def __iter__(self):
- return self
-
- def __len__(self):
- return len(self._records)
-
- def __getitem__(self, n):
- self._index = n
- return self.next()
-
- def next(self):
- if self._index >= len(self._records):
- raise StorageStopIteration()
- params = self._records[self._index]
- res = Record(self.tid, *params)
- self._index += 1
- return res
-
-
-class Record(DataRecord):
- """An object state in a transaction"""
- version = ''
- data_txn = None
-
- def __init__(self, tid, oid_int, data):
- self.tid = tid
- self.oid = p64(oid_int)
- if data is not None:
- self.data = str(data)
- else:
- self.data = None
-
-
-class Options:
- """Options for tuning RelStorage.
-
- These parameters can be provided as keyword options in the RelStorage
- constructor. For example:
-
- storage = RelStorage(adapter, pack_gc=True, pack_dry_run=True)
-
- Alternatively, the RelStorage constructor accepts an options
- parameter, which should be an Options instance.
- """
- def __init__(self):
- self.blob_dir = None
- self.poll_interval = 0
- self.pack_gc = True
- self.pack_dry_run = False
- self.pack_batch_timeout = 5.0
- self.pack_duty_cycle = 0.5
- self.pack_max_delay = 20.0
- self.cache_servers = () # ['127.0.0.1:11211']
- self.cache_module_name = 'memcache'
Copied: relstorage/trunk/relstorage/storage.py (from rev 104541, relstorage/trunk/relstorage/relstorage.py)
===================================================================
--- relstorage/trunk/relstorage/storage.py (rev 0)
+++ relstorage/trunk/relstorage/storage.py 2009-09-25 22:13:39 UTC (rev 104556)
@@ -0,0 +1,1426 @@
+##############################################################################
+#
+# Copyright (c) 2008 Zope Foundation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""The core of RelStorage, a ZODB storage for relational databases.
+
+Stores pickles in the database.
+"""
+
+from persistent.TimeStamp import TimeStamp
+from relstorage.util import is_blob_record
+from ZODB.BaseStorage import BaseStorage
+from ZODB.BaseStorage import DataRecord
+from ZODB.BaseStorage import TransactionRecord
+from ZODB import ConflictResolution
+from ZODB import POSException
+from ZODB.POSException import POSKeyError
+from ZODB.utils import p64
+from ZODB.utils import u64
+from zope.interface import implements
+from zope.interface import Interface
+import base64
+import cPickle
+import logging
+import os
+import sys
+import tempfile
+import time
+import weakref
+import ZODB.interfaces
+
+try:
+ from ZODB.interfaces import StorageStopIteration
+except ImportError:
+ class StorageStopIteration(IndexError, StopIteration):
+ """A combination of StopIteration and IndexError to provide a
+ backwards-compatible exception.
+ """
+
+_relstorage_interfaces = []
+for name in (
+ 'IStorage',
+ 'IMVCCStorage',
+ 'IStorageRestoreable',
+ 'IStorageIteration',
+ 'IStorageUndoable',
+ 'IBlobStorage',
+ 'IBlobStorageRestoreable',
+ ):
+ if hasattr(ZODB.interfaces, name):
+ _relstorage_interfaces.append(getattr(ZODB.interfaces, name))
+
+log = logging.getLogger("relstorage")
+
+# Set the RELSTORAGE_ABORT_EARLY environment variable when debugging
+# a failure revealed by the ZODB test suite. The test suite often fails
+# to call tpc_abort in the event of an error, leading to deadlocks.
+# This variable causes RelStorage to abort failed transactions
+# early rather than wait for an explicit abort.
+abort_early = os.environ.get('RELSTORAGE_ABORT_EARLY')
+
+
+class RelStorage(BaseStorage,
+ ConflictResolution.ConflictResolvingStorage):
+ """Storage to a relational database, based on invalidation polling"""
+ implements(*_relstorage_interfaces)
+
+ def __init__(self, adapter, name=None, create=True,
+ read_only=False, options=None, **kwoptions):
+ if name is None:
+ name = 'RelStorage on %s' % adapter.__class__.__name__
+
+ self._adapter = adapter
+ self._name = name
+ self._is_read_only = read_only
+ if options is None:
+ options = Options()
+ for key, value in kwoptions.iteritems():
+ if key in options.__dict__:
+ setattr(options, key, value)
+ else:
+ raise TypeError("Unknown parameter: %s" % key)
+ elif kwoptions:
+ raise TypeError("The RelStorage constructor accepts either "
+ "an options parameter or keyword arguments, not both")
+ self._options = options
+ self._cache_client = None
+
+ if create:
+ self._adapter.schema.prepare()
+
+ # load_conn and load_cursor are open most of the time.
+ self._load_conn = None
+ self._load_cursor = None
+ self._load_transaction_open = False
+ self._open_load_connection()
+ # store_conn and store_cursor are open during commit,
+ # but not necessarily open at other times.
+ self._store_conn = None
+ self._store_cursor = None
+
+ BaseStorage.__init__(self, name)
+
+ self._tid = None
+ self._ltid = None
+
+ # _prepared_txn is the name of the transaction to commit in the
+ # second phase.
+ self._prepared_txn = None
+
+ # _instances is a list of weak references to storage instances bound
+ # to the same database.
+ self._instances = []
+
+ # _closed is True after self.close() is called. Since close()
+ # can be called from another thread, access to self._closed should
+ # be inside a _lock_acquire()/_lock_release() block.
+ self._closed = False
+
+ # _max_stored_oid is the highest OID stored by the current
+ # transaction
+ self._max_stored_oid = 0
+
+ # _max_new_oid is the highest OID provided by new_oid()
+ self._max_new_oid = 0
+
+ # set _cache_client
+ if options.cache_servers:
+ module_name = options.cache_module_name
+ module = __import__(module_name, {}, {}, ['Client'])
+ servers = options.cache_servers
+ if isinstance(servers, basestring):
+ servers = servers.split()
+ self._cache_client = module.Client(servers)
+ else:
+ self._cache_client = None
+
+ # _prev_polled_tid contains the tid at the previous poll
+ self._prev_polled_tid = None
+
+ # _polled_commit_count contains the last polled value of the
+ # 'commit_count' cache key
+ self._polled_commit_count = 0
+
+ # _poll_at is the time to poll regardless of commit_count
+ self._poll_at = 0
+
+ # _txn_blobs: {oid->filename}; contains blob data for the
+ # currently uncommitted transaction.
+ self._txn_blobs = None
+
+ if options.blob_dir:
+ from ZODB.blob import FilesystemHelper
+ self.fshelper = FilesystemHelper(options.blob_dir)
+ if create:
+ self.fshelper.create()
+ self.fshelper.checkSecure()
+ else:
+ self.fshelper = None
+
+ def _open_load_connection(self):
+ """Open the load connection to the database. Return nothing."""
+ conn, cursor = self._adapter.connmanager.open_for_load()
+ self._drop_load_connection()
+ self._load_conn, self._load_cursor = conn, cursor
+ self._load_transaction_open = True
+
+ def _drop_load_connection(self):
+ """Unconditionally drop the load connection"""
+ conn, cursor = self._load_conn, self._load_cursor
+ self._load_conn, self._load_cursor = None, None
+ self._adapter.connmanager.close(conn, cursor)
+ self._load_transaction_open = False
+
+ def _rollback_load_connection(self):
+ if self._load_conn is not None:
+ try:
+ self._load_conn.rollback()
+ except:
+ self._drop_load_connection()
+ raise
+ self._load_transaction_open = False
+
+ def _restart_load(self):
+ """Restart the load connection, creating a new connection if needed"""
+ if self._load_cursor is None:
+ self._open_load_connection()
+ else:
+ try:
+ self._adapter.connmanager.restart_load(
+ self._load_conn, self._load_cursor)
+ except POSException.StorageError, e:
+ log.warning("Reconnecting load_conn: %s", e)
+ self._drop_load_connection()
+ try:
+ self._open_load_connection()
+ except:
+ log.exception("Reconnect failed.")
+ raise
+ else:
+ log.info("Reconnected.")
+ self._load_transaction_open = True
+
+
+ def _open_store_connection(self):
+ """Open the store connection to the database. Return nothing."""
+ conn, cursor = self._adapter.connmanager.open_for_store()
+ self._drop_store_connection()
+ self._store_conn, self._store_cursor = conn, cursor
+
+ def _drop_store_connection(self):
+ """Unconditionally drop the store connection"""
+ conn, cursor = self._store_conn, self._store_cursor
+ self._store_conn, self._store_cursor = None, None
+ self._adapter.connmanager.close(conn, cursor)
+
+ def _restart_store(self):
+ """Restart the store connection, creating a new connection if needed"""
+ if self._store_cursor is None:
+ self._open_store_connection()
+ else:
+ try:
+ self._adapter.connmanager.restart_store(
+ self._store_conn, self._store_cursor)
+ except POSException.StorageError, e:
+ log.warning("Reconnecting store_conn: %s", e)
+ self._drop_store_connection()
+ try:
+ self._open_store_connection()
+ except:
+ log.exception("Reconnect failed.")
+ raise
+ else:
+ log.info("Reconnected.")
+
+
+ def zap_all(self):
+ """Clear all objects and transactions out of the database.
+
+ Used by the test suite and the ZODBConvert script.
+ """
+ self._adapter.schema.zap_all()
+ self._rollback_load_connection()
+ cache = self._cache_client
+ if cache is not None:
+ cache.flush_all()
+
+ def release(self):
+ """Release back end database sessions used by this storage instance.
+ """
+ self._lock_acquire()
+ try:
+ self._drop_load_connection()
+ self._drop_store_connection()
+ finally:
+ self._lock_release()
+
+ def close(self):
+ """Close the storage and all instances."""
+ self._lock_acquire()
+ try:
+ self._closed = True
+ self._drop_load_connection()
+ self._drop_store_connection()
+ for wref in self._instances:
+ instance = wref()
+ if instance is not None:
+ instance.close()
+ finally:
+ self._lock_release()
+
+ def new_instance(self):
+ """Creates and returns another storage instance.
+
+ See ZODB.interfaces.IMVCCStorage.
+ """
+ other = RelStorage(adapter=self._adapter, name=self._name,
+ create=False, read_only=self._is_read_only,
+ options=self._options)
+ self._instances.append(weakref.ref(other))
+ return other
+
+ def __len__(self):
+ return self._adapter.stats.get_object_count()
+
+ def getSize(self):
+ """Return database size in bytes"""
+ return self._adapter.stats.get_db_size()
+
+ def _log_keyerror(self, oid_int, reason):
+ """Log just before raising KeyError in load().
+
+ KeyErrors in load() are generally not supposed to happen,
+ so this is a good place to gather information.
+ """
+ cursor = self._load_cursor
+ adapter = self._adapter
+ logfunc = log.warning
+ msg = ["Storage KeyError on oid %d: %s" % (oid_int, reason)]
+ rows = adapter.dbiter.iter_transactions(cursor)
+ row = None
+ for row in rows:
+ # just get the first row
+ break
+ if not row:
+ # This happens when initializing a new database, so it's
+ # not a warning.
+ logfunc = log.debug
+ msg.append("No transactions exist")
+ else:
+ msg.append("Current transaction is %d" % row[0])
+
+ tids = []
+ try:
+ rows = adapter.dbiter.iter_object_history(cursor, oid_int)
+ except KeyError:
+ # The object has no history, at least from the point of view
+ # of the current database load connection.
+ pass
+ else:
+ for row in rows:
+ tids.append(row[0])
+ if len(tids) >= 10:
+ break
+ msg.append("Recent object tids: %s" % repr(tids))
+ logfunc('; '.join(msg))
+
+ def _get_oid_cache_key(self, oid_int):
+ """Return the cache key for finding the current tid."""
+ my_tid = self._prev_polled_tid
+ if my_tid is None:
+ return None
+ return 'tid:%d:%d' % (oid_int, my_tid)
+
+ def load(self, oid, version):
+ oid_int = u64(oid)
+ cache = self._cache_client
+
+ self._lock_acquire()
+ try:
+ if not self._load_transaction_open:
+ self._restart_load()
+ cursor = self._load_cursor
+ if cache is None:
+ state, tid_int = self._adapter.mover.load_current(
+ cursor, oid_int)
+ else:
+ # get tid_int from the cache or the database
+ cachekey = self._get_oid_cache_key(oid_int)
+ if cachekey:
+ tid_int = cache.get(cachekey)
+ if not cachekey or not tid_int:
+ tid_int = self._adapter.mover.get_current_tid(
+ cursor, oid_int)
+ if cachekey and tid_int is not None:
+ cache.set(cachekey, tid_int)
+ if tid_int is None:
+ self._log_keyerror(oid_int, "no tid found(1)")
+ raise POSKeyError(oid)
+
+ # get state from the cache or the database
+ cachekey = 'state:%d:%d' % (oid_int, tid_int)
+ state = cache.get(cachekey)
+ if not state:
+ state = self._adapter.mover.load_revision(
+ cursor, oid_int, tid_int)
+ if state:
+ state = str(state)
+ cache.set(cachekey, state)
+ finally:
+ self._lock_release()
+
+ if tid_int is not None:
+ if state:
+ state = str(state)
+ if not state:
+ # This can happen if something attempts to load
+ # an object whose creation has been undone.
+ self._log_keyerror(oid_int, "creation has been undone")
+ raise POSKeyError(oid)
+ return state, p64(tid_int)
+ else:
+ self._log_keyerror(oid_int, "no tid found(2)")
+ raise POSKeyError(oid)
+
+ def loadEx(self, oid, version):
+ # Since we don't support versions, just tack the empty version
+ # string onto load's result.
+ return self.load(oid, version) + ("",)
+
+ def loadSerial(self, oid, serial):
+ """Load a specific revision of an object"""
+ oid_int = u64(oid)
+ tid_int = u64(serial)
+ cache = self._cache_client
+ if cache is not None:
+ cachekey = 'state:%d:%d' % (oid_int, tid_int)
+ state = cache.get(cachekey)
+ if state:
+ return state
+
+ self._lock_acquire()
+ try:
+ if not self._load_transaction_open:
+ self._restart_load()
+ state = self._adapter.mover.load_revision(
+ self._load_cursor, oid_int, tid_int)
+ if state is None and self._store_cursor is not None:
+ # Allow loading data from later transactions
+ # for conflict resolution.
+ state = self._adapter.mover.load_revision(
+ self._store_cursor, oid_int, tid_int)
+ finally:
+ self._lock_release()
+
+ if state is not None:
+ state = str(state)
+ if not state:
+ raise POSKeyError(oid)
+ if cache is not None:
+ cache.set(cachekey, state)
+ return state
+ else:
+ raise POSKeyError(oid)
+
+ def loadBefore(self, oid, tid):
+ """Return the most recent revision of oid before tid committed."""
+ oid_int = u64(oid)
+
+ self._lock_acquire()
+ try:
+ if self._store_cursor is not None:
+ # Allow loading data from later transactions
+ # for conflict resolution.
+ cursor = self._store_cursor
+ else:
+ if not self._load_transaction_open:
+ self._restart_load()
+ cursor = self._load_cursor
+ if not self._adapter.mover.exists(cursor, u64(oid)):
+ raise POSKeyError(oid)
+
+ state, start_tid = self._adapter.mover.load_before(
+ cursor, oid_int, u64(tid))
+ if start_tid is not None:
+ end_int = self._adapter.mover.get_object_tid_after(
+ cursor, oid_int, start_tid)
+ if end_int is not None:
+ end = p64(end_int)
+ else:
+ end = None
+ if state is not None:
+ state = str(state)
+ return state, p64(start_tid), end
+ else:
+ return None
+ finally:
+ self._lock_release()
+
+
+ def store(self, oid, serial, data, version, transaction):
+ if self._is_read_only:
+ raise POSException.ReadOnlyError()
+ if transaction is not self._transaction:
+ raise POSException.StorageTransactionError(self, transaction)
+ if version:
+ raise POSException.Unsupported("Versions aren't supported")
+
+ # If self._prepared_txn is not None, that means something is
+ # attempting to store objects after the vote phase has finished.
+ # That should not happen, should it?
+ assert self._prepared_txn is None
+
+ adapter = self._adapter
+ cursor = self._store_cursor
+ assert cursor is not None
+ oid_int = u64(oid)
+ if serial:
+ prev_tid_int = u64(serial)
+ else:
+ prev_tid_int = 0
+
+ self._lock_acquire()
+ try:
+ self._max_stored_oid = max(self._max_stored_oid, oid_int)
+ # save the data in a temporary table
+ adapter.mover.store_temp(cursor, oid_int, prev_tid_int, data)
+ return None
+ finally:
+ self._lock_release()
+
+
+ def restore(self, oid, serial, data, version, prev_txn, transaction):
+ # Like store(), but used for importing transactions. See the
+ # comments in FileStorage.restore(). The prev_txn optimization
+ # is not used.
+ if self._is_read_only:
+ raise POSException.ReadOnlyError()
+ if transaction is not self._transaction:
+ raise POSException.StorageTransactionError(self, transaction)
+ if version:
+ raise POSException.Unsupported("Versions aren't supported")
+
+ assert self._tid is not None
+ assert self._prepared_txn is None
+
+ adapter = self._adapter
+ cursor = self._store_cursor
+ assert cursor is not None
+ oid_int = u64(oid)
+ tid_int = u64(serial)
+
+ self._lock_acquire()
+ try:
+ self._max_stored_oid = max(self._max_stored_oid, oid_int)
+ # save the data. Note that data can be None.
+ adapter.mover.restore(cursor, oid_int, tid_int, data)
+ finally:
+ self._lock_release()
+
+
+ def tpc_begin(self, transaction, tid=None, status=' '):
+ if self._is_read_only:
+ raise POSException.ReadOnlyError()
+ self._lock_acquire()
+ try:
+ if self._transaction is transaction:
+ return
+ self._lock_release()
+ self._commit_lock_acquire()
+ self._lock_acquire()
+ self._transaction = transaction
+ self._clear_temp()
+
+ user = str(transaction.user)
+ desc = str(transaction.description)
+ ext = transaction._extension
+ if ext:
+ ext = cPickle.dumps(ext, 1)
+ else:
+ ext = ""
+ self._ude = user, desc, ext
+ self._tstatus = status
+
+ adapter = self._adapter
+ self._restart_store()
+
+ if tid is not None:
+ # get the commit lock and add the transaction now
+ cursor = self._store_cursor
+ packed = (status == 'p')
+ adapter.locker.hold_commit_lock(cursor, ensure_current=True)
+ tid_int = u64(tid)
+ try:
+ adapter.txncontrol.add_transaction(
+ cursor, tid_int, user, desc, ext, packed)
+ except:
+ self._drop_store_connection()
+ raise
+ # else choose the tid later
+ self._tid = tid
+
+ finally:
+ self._lock_release()
+
+ def _prepare_tid(self):
+ """Choose a tid for the current transaction.
+
+ This should be done as late in the commit as possible, since
+ it must hold an exclusive commit lock.
+ """
+ if self._tid is not None:
+ return
+ if self._transaction is None:
+ raise POSException.StorageError("No transaction in progress")
+
+ adapter = self._adapter
+ cursor = self._store_cursor
+ adapter.locker.hold_commit_lock(cursor, ensure_current=True)
+ user, desc, ext = self._ude
+
+ # Choose a transaction ID.
+ # Base the transaction ID on the database time,
+ # while ensuring that the tid of this transaction
+ # is greater than any existing tid.
+ last_tid, now = adapter.txncontrol.get_tid_and_time(cursor)
+ stamp = TimeStamp(*(time.gmtime(now)[:5] + (now % 60,)))
+ stamp = stamp.laterThan(TimeStamp(p64(last_tid)))
+ tid = repr(stamp)
+
+ tid_int = u64(tid)
+ adapter.txncontrol.add_transaction(cursor, tid_int, user, desc, ext)
+ self._tid = tid
+
+
+ def _clear_temp(self):
+ # It is assumed that self._lock_acquire was called before this
+ # method was called.
+ self._prepared_txn = None
+ self._max_stored_oid = 0
+
+
+ def _finish_store(self):
+ """Move stored objects from the temporary table to final storage.
+
+ Returns a list of (oid, tid) to be received by
+ Connection._handle_serial().
+ """
+ assert self._tid is not None
+ cursor = self._store_cursor
+ adapter = self._adapter
+
+ # Detect conflicting changes.
+ # Try to resolve the conflicts.
+ resolved = set() # a set of OIDs
+ while True:
+ conflict = adapter.mover.detect_conflict(cursor)
+ if conflict is None:
+ break
+
+ oid_int, prev_tid_int, serial_int, data = conflict
+ oid = p64(oid_int)
+ prev_tid = p64(prev_tid_int)
+ serial = p64(serial_int)
+
+ rdata = self.tryToResolveConflict(oid, prev_tid, serial, data)
+ if rdata is None:
+ # unresolvable; kill the whole transaction
+ raise POSException.ConflictError(
+ oid=oid, serials=(prev_tid, serial), data=data)
+ else:
+ # resolved
+ data = rdata
+ self._adapter.mover.replace_temp(
+ cursor, oid_int, prev_tid_int, data)
+ resolved.add(oid)
+
+ # Move the new states into the permanent table
+ tid_int = u64(self._tid)
+ serials = []
+ oid_ints = adapter.mover.move_from_temp(cursor, tid_int)
+ for oid_int in oid_ints:
+ oid = p64(oid_int)
+ if oid in resolved:
+ serial = ConflictResolution.ResolvedSerial
+ else:
+ serial = self._tid
+ serials.append((oid, serial))
+
+ return serials
+
+
+ def _vote(self):
+ """Prepare the transaction for final commit."""
+ # This method initiates a two-phase commit process,
+ # saving the name of the prepared transaction in self._prepared_txn.
+
+ # It is assumed that self._lock_acquire was called before this
+ # method was called.
+
+ if self._prepared_txn is not None:
+ # the vote phase has already completed
+ return
+
+ cursor = self._store_cursor
+ assert cursor is not None
+ conn = self._store_conn
+
+ if self._max_stored_oid > self._max_new_oid:
+ self._adapter.oidallocator.set_min_oid(
+ cursor, self._max_stored_oid + 1)
+
+ self._prepare_tid()
+ tid_int = u64(self._tid)
+
+ serials = self._finish_store()
+ self._adapter.mover.update_current(cursor, tid_int)
+ self._prepared_txn = self._adapter.txncontrol.commit_phase1(
+ conn, cursor, tid_int)
+
+ if self._txn_blobs:
+ # We now have a transaction ID, so rename all the blobs
+ # accordingly.
+ for oid, sourcename in self._txn_blobs.items():
+ targetname = self.fshelper.getBlobFilename(oid, self._tid)
+ if sourcename != targetname:
+ ZODB.blob.rename_or_copy_blob(sourcename, targetname)
+ self._txn_blobs[oid] = targetname
+
+ return serials
+
+
+ def tpc_vote(self, transaction):
+ self._lock_acquire()
+ try:
+ if transaction is not self._transaction:
+ return
+ try:
+ return self._vote()
+ except:
+ if abort_early:
+ # abort early to avoid lockups while running the
+ # somewhat brittle ZODB test suite
+ self.tpc_abort(transaction)
+ raise
+ finally:
+ self._lock_release()
+
+
+ def _finish(self, tid, user, desc, ext):
+ """Commit the transaction."""
+ # It is assumed that self._lock_acquire was called before this
+ # method was called.
+ assert self._tid is not None
+ try:
+ self._rollback_load_connection()
+ txn = self._prepared_txn
+ assert txn is not None
+ self._adapter.txncontrol.commit_phase2(
+ self._store_conn, self._store_cursor, txn)
+ self._adapter.locker.release_commit_lock(self._store_cursor)
+ cache = self._cache_client
+ if cache is not None:
+ if cache.incr('commit_count') is None:
+ # Use the current time as an initial commit_count value.
+ cache.add('commit_count', int(time.time()))
+ # A concurrent committer could have won the race to set the
+ # initial commit_count. Increment commit_count so that it
+ # doesn't matter who won.
+ cache.incr('commit_count')
+ self._ltid = self._tid
+
+ #if self._txn_blobs and not self._adapter.keep_history:
+ ## For each blob just committed, get the name of
+ ## one earlier revision (if any) and write the
+ ## name of the file to a log. At pack time,
+ ## all the files in the log will be deleted and
+ ## the log will be cleared.
+ #for oid, filename in self._txn_blobs.iteritems():
+ #dirname, current_name = os.path.split(filename)
+ #names = os.listdir(dirname)
+ #names.sort()
+ #if current_name in names:
+ #i = names.index(current_name)
+ #if i > 0:
+ # to_delete = os.path.join(dirname, names[i-1])
+ # log.write('%s\n') % to_delete
+
+ finally:
+ self._txn_blobs = None
+ self._prepared_txn = None
+ self._tid = None
+ self._transaction = None
+
+ def _abort(self):
+ # the lock is held here
+ try:
+ self._rollback_load_connection()
+ if self._store_cursor is not None:
+ self._adapter.txncontrol.abort(
+ self._store_conn, self._store_cursor, self._prepared_txn)
+ self._adapter.locker.release_commit_lock(self._store_cursor)
+ if self._txn_blobs:
+ for oid, filename in self._txn_blobs.iteritems():
+ if os.path.exists(filename):
+ ZODB.blob.remove_committed(filename)
+ dirname = os.path.dirname(filename)
+ if not os.listdir(dirname):
+ ZODB.blob.remove_committed_dir(dirname)
+ finally:
+ self._txn_blobs = None
+ self._prepared_txn = None
+ self._tid = None
+ self._transaction = None
+
+ def lastTransaction(self):
+ return self._ltid
+
+ def new_oid(self):
+ if self._is_read_only:
+ raise POSException.ReadOnlyError()
+ self._lock_acquire()
+ try:
+ cursor = self._load_cursor
+ if cursor is None:
+ self._open_load_connection()
+ cursor = self._load_cursor
+ oid_int = self._adapter.oidallocator.new_oid(cursor)
+ self._max_new_oid = max(self._max_new_oid, oid_int)
+ return p64(oid_int)
+ finally:
+ self._lock_release()
+
+ def cleanup(self):
+ pass
+
+ def supportsVersions(self):
+ return False
+
+ def modifiedInVersion(self, oid):
+ return ''
+
+ def supportsUndo(self):
+ return self._adapter.keep_history
+
+ def supportsTransactionalUndo(self):
+ return self._adapter.keep_history
+
+ def undoLog(self, first=0, last=-20, filter=None):
+ if last < 0:
+ last = first - last
+
+ # use a private connection to ensure the most current results
+ adapter = self._adapter
+ conn, cursor = adapter.connmanager.open()
+ try:
+ rows = adapter.dbiter.iter_transactions(cursor)
+ i = 0
+ res = []
+ for tid_int, user, desc, ext in rows:
+ tid = p64(tid_int)
+ d = {'id': base64.encodestring(tid)[:-1],
+ 'time': TimeStamp(tid).timeTime(),
+ 'user_name': user or '',
+ 'description': desc or ''}
+ if ext:
+ d.update(cPickle.loads(ext))
+ if filter is None or filter(d):
+ if i >= first:
+ res.append(d)
+ i += 1
+ if i >= last:
+ break
+ return res
+
+ finally:
+ adapter.connmanager.close(conn, cursor)
+
+ def history(self, oid, version=None, size=1, filter=None):
+ self._lock_acquire()
+ try:
+ cursor = self._load_cursor
+ oid_int = u64(oid)
+ try:
+ rows = self._adapter.dbiter.iter_object_history(
+ cursor, oid_int)
+ except KeyError:
+ raise POSKeyError(oid)
+
+ res = []
+ for tid_int, username, description, extension, length in rows:
+ tid = p64(tid_int)
+ if extension:
+ d = loads(extension)
+ else:
+ d = {}
+ d.update({"time": TimeStamp(tid).timeTime(),
+ "user_name": username or '',
+ "description": description or '',
+ "tid": tid,
+ "version": '',
+ "size": length,
+ })
+ if filter is None or filter(d):
+ res.append(d)
+ if size is not None and len(res) >= size:
+ break
+ return res
+ finally:
+ self._lock_release()
+
+
+ def undo(self, transaction_id, transaction):
+ """Undo a transaction identified by transaction_id.
+
+ transaction_id is the base 64 encoding of an 8 byte tid.
+ Undo by writing new data that reverses the action taken by
+ the transaction.
+ """
+
+ if self._is_read_only:
+ raise POSException.ReadOnlyError()
+ if transaction is not self._transaction:
+ raise POSException.StorageTransactionError(self, transaction)
+
+ undo_tid = base64.decodestring(transaction_id + '\n')
+ assert len(undo_tid) == 8
+ undo_tid_int = u64(undo_tid)
+
+ self._lock_acquire()
+ try:
+ adapter = self._adapter
+ cursor = self._store_cursor
+ assert cursor is not None
+
+ adapter.locker.hold_pack_lock(cursor)
+ try:
+ # Note that _prepare_tid acquires the commit lock.
+ # The commit lock must be acquired after the pack lock
+ # because the database adapters also acquire in that
+ # order during packing.
+ self._prepare_tid()
+ adapter.packundo.verify_undoable(cursor, undo_tid_int)
+
+ self_tid_int = u64(self._tid)
+ copied = adapter.packundo.undo(
+ cursor, undo_tid_int, self_tid_int)
+ oids = [p64(oid_int) for oid_int, _ in copied]
+
+ # Update the current object pointers immediately, so that
+ # subsequent undo operations within this transaction will see
+ # the new current objects.
+ adapter.mover.update_current(cursor, self_tid_int)
+
+ if self.fshelper is not None:
+ self._copy_undone_blobs(copied)
+
+ return self._tid, oids
+ finally:
+ adapter.locker.release_pack_lock(cursor)
+ finally:
+ self._lock_release()
+
+ def _copy_undone_blobs(self, copied):
+ """After an undo operation, copy the matching blobs forward.
+
+ The copied parameter is a list of (integer oid, integer tid).
+ """
+ for oid_int, old_tid_int in copied:
+ oid = p64(oid_int)
+ old_tid = p64(old_tid_int)
+ orig_fn = self.fshelper.getBlobFilename(oid, old_tid)
+ if not os.path.exists(orig_fn):
+ # not a blob
+ continue
+
+ new_fn = self.fshelper.getBlobFilename(oid, self._tid)
+ orig = open(orig_fn, 'r')
+ new = open(new_fn, 'wb')
+ ZODB.utils.cp(orig, new)
+ orig.close()
+ new.close()
+
+ self._add_blob_to_transaction(oid, new_fn)
+
+ def pack(self, t, referencesf, sleep=None):
+ if self._is_read_only:
+ raise POSException.ReadOnlyError()
+
+ pack_point = repr(TimeStamp(*time.gmtime(t)[:5]+(t%60,)))
+ pack_point_int = u64(pack_point)
+
+ def get_references(state):
+ """Return the set of OIDs the given state refers to."""
+ refs = set()
+ if state:
+ for oid in referencesf(str(state)):
+ refs.add(u64(oid))
+ return refs
+
+ # Use a private connection (lock_conn and lock_cursor) to
+ # hold the pack lock. Have the adapter open temporary
+ # connections to do the actual work, allowing the adapter
+ # to use special transaction modes for packing.
+ adapter = self._adapter
+ lock_conn, lock_cursor = adapter.connmanager.open()
+ try:
+ adapter.locker.hold_pack_lock(lock_cursor)
+ try:
+ # Find the latest commit before or at the pack time.
+ tid_int = adapter.packundo.choose_pack_transaction(
+ pack_point_int)
+ if tid_int is None:
+ log.debug("all transactions before %s have already "
+ "been packed", time.ctime(t))
+ return
+
+ if self._options.pack_dry_run:
+ log.info("pack: beginning dry run")
+
+ s = time.ctime(TimeStamp(p64(tid_int)).timeTime())
+ log.info("pack: analyzing transactions committed "
+ "%s or before", s)
+
+ # In pre_pack, the adapter fills tables with
+ # information about what to pack. The adapter
+ # must not actually pack anything yet.
+ adapter.packundo.pre_pack(
+ tid_int, get_references, self._options)
+
+ if self._options.pack_dry_run:
+ log.info("pack: dry run complete")
+ else:
+ # Now pack.
+ if self.fshelper is not None:
+ packed_func = self._after_pack
+ else:
+ packed_func = None
+ adapter.packundo.pack(tid_int, self._options, sleep=sleep,
+ packed_func=packed_func)
+ finally:
+ adapter.locker.release_pack_lock(lock_cursor)
+ finally:
+ lock_conn.rollback()
+ adapter.connmanager.close(lock_conn, lock_cursor)
+ self.sync()
+
+ self._pack_finished()
+
+ def _after_pack(self, oid_int, tid_int):
+ """Called after an object state has been removed by packing.
+
+ Removes the corresponding blob file.
+ """
+ oid = p64(oid_int)
+ tid = p64(tid_int)
+ fn = self.fshelper.getBlobFilename(oid, tid)
+ if self._adapter.keep_history:
+ # remove only the revision just packed
+ if os.path.exists(fn):
+ ZODB.blob.remove_committed(fn)
+ dirname = os.path.dirname(fn)
+ if not os.listdir(dirname):
+ ZODB.blob.remove_committed_dir(dirname)
+ else:
+ # remove all revisions
+ dirname = os.path.dirname(fn)
+ if os.path.exists(dirname):
+ for name in os.listdir(dirname):
+ ZODB.blob.remove_committed(os.path.join(dirname, name))
+ ZODB.blob.remove_committed_dir(dirname)
+
+ def _pack_finished(self):
+ if self.fshelper is None or self._adapter.keep_history:
+ return
+
+ # Remove all old revisions of blobs.
+
+ def iterator(self, start=None, stop=None):
+ return TransactionIterator(self._adapter, start, stop)
+
+ def sync(self, force=True):
+ """Updates to a current view of the database.
+
+ This is implemented by rolling back the relational database
+ transaction.
+
+ If force is False and a poll interval has been set, this call
+ is ignored. The poll_invalidations method will later choose to
+ sync with the database only if enough time has elapsed since
+ the last poll.
+ """
+ if not force and self._options.poll_interval:
+ # keep the load transaction open so that it's possible
+ # to ignore the next poll.
+ return
+ self._lock_acquire()
+ try:
+ if self._load_transaction_open:
+ self._rollback_load_connection()
+ finally:
+ self._lock_release()
+
+ def need_poll(self):
+ """Return true if polling is needed"""
+ now = time.time()
+
+ cache = self._cache_client
+ if cache is not None:
+ new_commit_count = cache.get('commit_count')
+ if new_commit_count != self._polled_commit_count:
+ # There is new data ready to poll
+ self._polled_commit_count = new_commit_count
+ self._poll_at = now
+ return True
+
+ if not self._load_transaction_open:
+ # Since the load connection is closed or does not have
+ # a transaction in progress, polling is required.
+ return True
+
+ if now >= self._poll_at:
+ # The poll timeout has expired
+ return True
+
+ return False
+
+ def poll_invalidations(self):
+ """Looks for OIDs of objects that changed since _prev_polled_tid
+
+ Returns {oid: 1}, or None if all objects need to be invalidated
+ because prev_polled_tid is not in the database (presumably it
+ has been packed).
+ """
+ self._lock_acquire()
+ try:
+ if self._closed:
+ return {}
+
+ if self._options.poll_interval:
+ if not self.need_poll():
+ return {}
+ # reset the timeout
+ self._poll_at = time.time() + self._options.poll_interval
+
+ self._restart_load()
+ conn = self._load_conn
+ cursor = self._load_cursor
+
+ # Ignore changes made by the last transaction committed
+ # by this connection.
+ if self._ltid is not None:
+ ignore_tid = u64(self._ltid)
+ else:
+ ignore_tid = None
+
+ # get a list of changed OIDs and the most recent tid
+ oid_ints, new_polled_tid = self._adapter.poller.poll_invalidations(
+ conn, cursor, self._prev_polled_tid, ignore_tid)
+ self._prev_polled_tid = new_polled_tid
+
+ if oid_ints is None:
+ oids = None
+ else:
+ oids = {}
+ for oid_int in oid_ints:
+ oids[p64(oid_int)] = 1
+ return oids
+ finally:
+ self._lock_release()
+
+ def loadBlob(self, oid, serial):
+ """Return the filename of the Blob data for this OID and serial.
+
+ Returns a filename.
+
+ Raises POSKeyError if the blobfile cannot be found.
+ """
+ if self.fshelper is None:
+ raise POSException.Unsupported("No blob directory is configured.")
+
+ blob_filename = self.fshelper.getBlobFilename(oid, serial)
+ if os.path.exists(blob_filename):
+ return blob_filename
+ else:
+ raise POSKeyError("No blob file", oid, serial)
+
+ def openCommittedBlobFile(self, oid, serial, blob=None):
+ """Return a file for committed data for the given object id and serial
+
+ If a blob is provided, then a BlobFile object is returned,
+ otherwise, an ordinary file is returned. In either case, the
+ file is opened for binary reading.
+
+ This method is used to allow storages that cache blob data to
+ make sure that data are available at least long enough for the
+ file to be opened.
+ """
+ blob_filename = self.loadBlob(oid, serial)
+ if blob is None:
+ return open(blob_filename, 'rb')
+ else:
+ return ZODB.blob.BlobFile(blob_filename, 'r', blob)
+
+ def temporaryDirectory(self):
+ """Return a directory that should be used for uncommitted blob data.
+
+ If Blobs use this, then commits can be performed with a simple rename.
+ """
+ return self.fshelper.temp_dir
+
+ def storeBlob(self, oid, oldserial, data, blobfilename, version, txn):
+ """Stores data that has a BLOB attached.
+
+ The blobfilename argument names a file containing blob data.
+ The storage will take ownership of the file and will rename it
+ (or copy and remove it) immediately, or at transaction-commit
+ time. The file must not be open.
+
+ The new serial is returned.
+ """
+ assert not version
+ self.store(oid, oldserial, data, '', txn)
+ self._store_blob_data(oid, oldserial, blobfilename)
+ return None
+
+ def restoreBlob(self, oid, serial, data, blobfilename, prev_txn, txn):
+ """Write blob data already committed in a separate database
+
+ See the restore and storeBlob methods.
+ """
+ self.restore(oid, serial, data, '', prev_txn, txn)
+ self._lock_acquire()
+ try:
+ self.fshelper.getPathForOID(oid, create=True)
+ targetname = self.fshelper.getBlobFilename(oid, serial)
+ ZODB.blob.rename_or_copy_blob(blobfilename, targetname)
+ finally:
+ self._lock_release()
+
+ def _store_blob_data(self, oid, oldserial, filename):
+ self.fshelper.getPathForOID(oid, create=True)
+ fd, target = self.fshelper.blob_mkstemp(oid, oldserial)
+ os.close(fd)
+ if sys.platform == 'win32':
+ # On windows, we can't rename to an existing file. We'll
+ # use a slightly different file name. We keep the old one
+ # until we're done to avoid conflicts. Then remove the old name.
+ target += 'w'
+ ZODB.blob.rename_or_copy_blob(filename, target)
+ os.remove(target[:-1])
+ else:
+ ZODB.blob.rename_or_copy_blob(filename, target)
+
+ self._add_blob_to_transaction(oid, target)
+
+ def _add_blob_to_transaction(self, oid, filename):
+ if self._txn_blobs is None:
+ self._txn_blobs = {}
+ else:
+ old_filename = self._txn_blobs.get(oid)
+ if old_filename is not None and old_filename != filename:
+ ZODB.blob.remove_committed(old_filename)
+ self._txn_blobs[oid] = filename
+
+ def copyTransactionsFrom(self, other):
+ # adapted from ZODB.blob.BlobStorageMixin
+ for trans in other.iterator():
+ self.tpc_begin(trans, trans.tid, trans.status)
+ for record in trans:
+ blobfilename = None
+ if self.fshelper is not None:
+ if is_blob_record(record.data):
+ try:
+ blobfilename = other.loadBlob(
+ record.oid, record.tid)
+ except POSKeyError:
+ pass
+ if blobfilename is not None:
+ fd, name = tempfile.mkstemp(
+ suffix='.tmp', dir=self.fshelper.temp_dir)
+ os.close(fd)
+ ZODB.utils.cp(open(blobfilename, 'rb'), open(name, 'wb'))
+ self.restoreBlob(record.oid, record.tid, record.data,
+ name, record.data_txn, trans)
+ else:
+ self.restore(record.oid, record.tid, record.data,
+ '', record.data_txn, trans)
+
+ self.tpc_vote(trans)
+ self.tpc_finish(trans)
+
+ # The propagate_invalidations flag implements the old
+ # invalidation polling API and is not otherwise used. Set to a
+ # false value, it tells the Connection not to propagate object
+ # invalidations across connections, since that ZODB feature is
+ # detrimental when the storage provides its own MVCC.
+ propagate_invalidations = False
+
+ def bind_connection(self, zodb_conn):
+ """Make a new storage instance.
+
+ This implements the old invalidation polling API and is not
+ otherwise used.
+ """
+ return self.new_instance()
+
+ def connection_closing(self):
+ """Release resources
+
+ This implements the old invalidation polling API and is not
+ otherwise used.
+ """
+ self.sync(False)
+
+
+class TransactionIterator(object):
+ """Iterate over the transactions in a RelStorage instance."""
+
+ def __init__(self, adapter, start, stop):
+ self._adapter = adapter
+ self._conn, self._cursor = self._adapter.connmanager.open_for_load()
+ self._closed = False
+
+ if start is not None:
+ start_int = u64(start)
+ else:
+ start_int = 1
+ if stop is not None:
+ stop_int = u64(stop)
+ else:
+ stop_int = None
+
+ # _transactions: [(tid, username, description, extension, packed)]
+ self._transactions = list(adapter.dbiter.iter_transactions_range(
+ self._cursor, start_int, stop_int))
+ self._index = 0
+
+ def close(self):
+ self._adapter.connmanager.close(self._conn, self._cursor)
+ self._closed = True
+
+ def iterator(self):
+ return self
+
+ def __iter__(self):
+ return self
+
+ def __len__(self):
+ return len(self._transactions)
+
+ def __getitem__(self, n):
+ self._index = n
+ return self.next()
+
+ def next(self):
+ if self._closed:
+ raise IOError("TransactionIterator already closed")
+ if self._index >= len(self._transactions):
+ raise StorageStopIteration()
+ params = self._transactions[self._index]
+ res = RelStorageTransactionRecord(self, *params)
+ self._index += 1
+ return res
+
+
+class RelStorageTransactionRecord(TransactionRecord):
+
+ def __init__(self, trans_iter, tid_int, user, desc, ext, packed):
+ self._trans_iter = trans_iter
+ self._tid_int = tid_int
+ self.tid = p64(tid_int)
+ self.status = packed and 'p' or ' '
+ self.user = user or ''
+ self.description = desc or ''
+ if ext:
+ self.extension = cPickle.loads(ext)
+ else:
+ self.extension = {}
+
+ # maintain compatibility with the old (ZODB 3.8 and below) name of
+ # the extension attribute.
+ def _ext_set(self, value):
+ self.extension = value
+ def _ext_get(self):
+ return self.extension
+ _extension = property(fset=_ext_set, fget=_ext_get)
+
+ def __iter__(self):
+ return RecordIterator(self)
+
+
+class RecordIterator(object):
+ """Iterate over the objects in a transaction."""
+ def __init__(self, record):
+ # record is a RelStorageTransactionRecord.
+ cursor = record._trans_iter._cursor
+ adapter = record._trans_iter._adapter
+ tid_int = record._tid_int
+ self.tid = record.tid
+ self._records = list(adapter.dbiter.iter_objects(cursor, tid_int))
+ self._index = 0
+
+ def __iter__(self):
+ return self
+
+ def __len__(self):
+ return len(self._records)
+
+ def __getitem__(self, n):
+ self._index = n
+ return self.next()
+
+ def next(self):
+ if self._index >= len(self._records):
+ raise StorageStopIteration()
+ params = self._records[self._index]
+ res = Record(self.tid, *params)
+ self._index += 1
+ return res
+
+
+class Record(DataRecord):
+ """An object state in a transaction"""
+ version = ''
+ data_txn = None
+
+ def __init__(self, tid, oid_int, data):
+ self.tid = tid
+ self.oid = p64(oid_int)
+ if data is not None:
+ self.data = str(data)
+ else:
+ self.data = None
+
+
+class Options:
+ """Options for tuning RelStorage.
+
+ These parameters can be provided as keyword options in the RelStorage
+ constructor. For example:
+
+ storage = RelStorage(adapter, pack_gc=True, pack_dry_run=True)
+
+ Alternatively, the RelStorage constructor accepts an options
+ parameter, which should be an Options instance.
+ """
+ def __init__(self):
+ self.blob_dir = None
+ self.poll_interval = 0
+ self.pack_gc = True
+ self.pack_dry_run = False
+ self.pack_batch_timeout = 5.0
+ self.pack_duty_cycle = 0.5
+ self.pack_max_delay = 20.0
+ self.cache_servers = () # ['127.0.0.1:11211']
+ self.cache_module_name = 'memcache'
Property changes on: relstorage/trunk/relstorage/storage.py
___________________________________________________________________
Added: svn:mergeinfo
+
Modified: relstorage/trunk/relstorage/tests/RecoveryStorage.py
===================================================================
--- relstorage/trunk/relstorage/tests/RecoveryStorage.py 2009-09-25 22:09:42 UTC (rev 104555)
+++ relstorage/trunk/relstorage/tests/RecoveryStorage.py 2009-09-25 22:13:39 UTC (rev 104556)
@@ -17,22 +17,16 @@
# history-free storages.
import itertools
+import time
import transaction
from transaction import Transaction
from ZODB.tests.StorageTestBase import MinPO, zodb_unpickle, snooze
from ZODB import DB
import ZODB.POSException
from ZODB.serialize import referencesf
+from relstorage.util import is_blob_record
-import time
-try:
- from ZODB.blob import is_blob_record
-except ImportError:
- def is_blob_record(data):
- return False
-
-
class IteratorDeepCompare:
def compare(self, storage1, storage2):
Modified: relstorage/trunk/relstorage/tests/blob/blob_connection.txt
===================================================================
--- relstorage/trunk/relstorage/tests/blob/blob_connection.txt 2009-09-25 22:09:42 UTC (rev 104555)
+++ relstorage/trunk/relstorage/tests/blob/blob_connection.txt 2009-09-25 22:13:39 UTC (rev 104556)
@@ -74,7 +74,7 @@
>>> transaction2.commit() # doctest: +ELLIPSIS
Traceback (most recent call last):
...
- Unsupported: Storing Blobs in <ZODB.MappingStorage.MappingStorage object at ...> is not supported.
+ Unsupported: Storing Blobs in <ZODB.MappingStorage.MappingStorage ...> is not supported.
>>> transaction2.abort()
>>> connection2.close()
Modified: relstorage/trunk/relstorage/tests/blob/blob_transaction.txt
===================================================================
--- relstorage/trunk/relstorage/tests/blob/blob_transaction.txt 2009-09-25 22:09:42 UTC (rev 104555)
+++ relstorage/trunk/relstorage/tests/blob/blob_transaction.txt 2009-09-25 22:13:39 UTC (rev 104556)
@@ -242,9 +242,9 @@
the committed location again:
>>> transaction.commit()
- >>> len([name for name in os.listdir(os.path.join(blob_dir, 'tmp'))
- ... if name.startswith('savepoint')])
- 0
+ >>> savepoint_dir = os.path.join(blob_dir, 'tmp', 'savepoint')
+ >>> os.path.exists(savepoint_dir) and len(os.listdir(savepoint_dir)) > 0
+ False
We support non-optimistic savepoints too:
@@ -268,16 +268,14 @@
The savepoint blob directory gets cleaned up on an abort:
- >>> len([name for name in os.listdir(os.path.join(blob_dir, 'tmp'))
- ... if name.startswith('savepoint')])
- 0
+ >>> os.path.exists(savepoint_dir) and len(os.listdir(savepoint_dir)) > 0
+ False
-Reading Blobs outside of a transaction
---------------------------------------
+tpc_abort
+---------
-If you want to read from a Blob outside of transaction boundaries (e.g. to
-stream a file to the browser), committed method to get the name of a
-file that can be opened.
+If a transaction is aborted in the middle of 2-phase commit, any data
+stored are discarded.
>>> connection6 = database.open()
>>> root6 = connection6.root()
@@ -290,81 +288,6 @@
>>> open(blob.committed()).read()
"I'm a happy blob."
-We can also read committed data by calling open with a 'c' flag:
-
- >>> f = blob.open('c')
-
-This just returns a regular file object:
-
- >>> type(f)
- <type 'file'>
-
-and doesn't prevent us from opening the blob for writing:
-
- >>> blob.open('w').write('x')
- >>> blob.open().read()
- 'x'
-
- >>> f.read()
- "I'm a happy blob."
-
- >>> f.close()
- >>> transaction.abort()
-
-An exception is raised if we call committed on a blob that has
-uncommitted changes:
-
- >>> blob = ZODB.blob.Blob()
- >>> blob.committed()
- Traceback (most recent call last):
- ...
- BlobError: Uncommitted changes
-
- >>> blob.open('c')
- Traceback (most recent call last):
- ...
- BlobError: Uncommitted changes
-
- >>> blob.open('w').write("I'm a happy blob.")
- >>> root6['blob6'] = blob
- >>> blob.committed()
- Traceback (most recent call last):
- ...
- BlobError: Uncommitted changes
-
- >>> blob.open('c')
- Traceback (most recent call last):
- ...
- BlobError: Uncommitted changes
-
- >>> s = transaction.savepoint()
- >>> blob.committed()
- Traceback (most recent call last):
- ...
- BlobError: Uncommitted changes
-
- >>> blob.open('c')
- Traceback (most recent call last):
- ...
- BlobError: Uncommitted changes
-
- >>> transaction.commit()
- >>> open(blob.committed()).read()
- "I'm a happy blob."
-
-You can't open a committed blob file for writing:
-
- >>> open(blob.committed(), 'w') # doctest: +ELLIPSIS
- Traceback (most recent call last):
- ...
- IOError: ...
-
-tpc_abort
----------
-
-If a transaction is aborted in the middle of 2-phase commit, any data
-stored are discarded.
-
>>> olddata, oldserial = blob_storage.load(blob._p_oid, '')
>>> t = transaction.get()
>>> blob_storage.tpc_begin(t)
Modified: relstorage/trunk/relstorage/tests/blob/testblob.py
===================================================================
--- relstorage/trunk/relstorage/tests/blob/testblob.py 2009-09-25 22:09:42 UTC (rev 104555)
+++ relstorage/trunk/relstorage/tests/blob/testblob.py 2009-09-25 22:13:39 UTC (rev 104556)
@@ -184,9 +184,11 @@
# Requires a setUp() that creates a self._dst destination storage
def testSimpleBlobRecovery(self):
- self.assert_(
- ZODB.interfaces.IBlobStorageRestoreable.providedBy(self._storage)
- )
+ if hasattr(ZODB.interfaces, 'IBlobStorageRestoreable'):
+ self.assert_(
+ ZODB.interfaces.IBlobStorageRestoreable.providedBy(
+ self._storage)
+ )
db = DB(self._storage)
conn = db.open()
conn.root()[1] = ZODB.blob.Blob()
@@ -337,7 +339,11 @@
>>> database.close()
>>> from ZODB.Connection import TmpStore
- >>> tmpstore = TmpStore(blob_storage)
+ >>> try:
+ ... tmpstore = TmpStore(blob_storage)
+ ... except TypeError:
+ ... # ZODB 3.8
+ ... tmpstore = TmpStore('', blob_storage)
We can access the blob correctly:
@@ -350,35 +356,6 @@
>>> database.close()
"""
-def is_blob_record():
- r"""
- >>> bs = create_storage()
- >>> db = DB(bs)
- >>> conn = db.open()
- >>> conn.root()['blob'] = ZODB.blob.Blob()
- >>> transaction.commit()
- >>> ZODB.blob.is_blob_record(bs.load(ZODB.utils.p64(0), '')[0])
- False
- >>> ZODB.blob.is_blob_record(bs.load(ZODB.utils.p64(1), '')[0])
- True
-
- An invalid pickle yields a false value:
-
- >>> ZODB.blob.is_blob_record("Hello world!")
- False
- >>> ZODB.blob.is_blob_record('c__main__\nC\nq\x01.')
- False
- >>> ZODB.blob.is_blob_record('cWaaaa\nC\nq\x01.')
- False
-
- As does None, which may occur in delete records:
-
- >>> ZODB.blob.is_blob_record(None)
- False
-
- >>> db.close()
- """
-
def do_not_depend_on_cwd():
"""
>>> bs = create_storage()
@@ -397,31 +374,35 @@
>>> bs.close()
"""
-def savepoint_isolation():
- """Make sure savepoint data is distinct accross transactions
+if False:
+ # ZODB 3.8 fails this test because it creates a single
+ # 'savepoints' directory.
+ def savepoint_isolation():
+ """Make sure savepoint data is distinct accross transactions
- >>> bs = create_storage()
- >>> db = DB(bs)
- >>> conn = db.open()
- >>> conn.root.b = ZODB.blob.Blob('initial')
- >>> transaction.commit()
- >>> conn.root.b.open('w').write('1')
- >>> _ = transaction.savepoint()
- >>> tm = transaction.TransactionManager()
- >>> conn2 = db.open(transaction_manager=tm)
- >>> conn2.root.b.open('w').write('2')
- >>> _ = tm.savepoint()
- >>> conn.root.b.open().read()
- '1'
- >>> conn2.root.b.open().read()
- '2'
- >>> transaction.abort()
- >>> tm.commit()
- >>> conn.sync()
- >>> conn.root.b.open().read()
- '2'
- >>> db.close()
- """
+ >>> bs = create_storage()
+ >>> db = DB(bs)
+ >>> conn = db.open()
+ >>> conn.root().b = ZODB.blob.Blob()
+ >>> conn.root().b.open('w').write('initial')
+ >>> transaction.commit()
+ >>> conn.root().b.open('w').write('1')
+ >>> _ = transaction.savepoint()
+ >>> tm = transaction.TransactionManager()
+ >>> conn2 = db.open(transaction_manager=tm)
+ >>> conn2.root().b.open('w').write('2')
+ >>> _ = tm.savepoint()
+ >>> conn.root().b.open().read()
+ '1'
+ >>> conn2.root().b.open().read()
+ '2'
+ >>> transaction.abort()
+ >>> tm.commit()
+ >>> conn.sync()
+ >>> conn.root().b.open().read()
+ '2'
+ >>> db.close()
+ """
def savepoint_cleanup():
"""Make sure savepoint data gets cleaned up.
@@ -433,20 +414,23 @@
>>> db = DB(bs)
>>> conn = db.open()
- >>> conn.root.b = ZODB.blob.Blob('initial')
+ >>> conn.root().b = ZODB.blob.Blob()
+ >>> conn.root().b.open('w').write('initial')
>>> _ = transaction.savepoint()
>>> len(os.listdir(tdir))
1
>>> transaction.abort()
- >>> os.listdir(tdir)
- []
- >>> conn.root.b = ZODB.blob.Blob('initial')
+ >>> savepoint_dir = os.path.join(tdir, 'savepoint')
+ >>> os.path.exists(savepoint_dir) and len(os.listdir(savepoint_dir)) > 0
+ False
+ >>> conn.root().b = ZODB.blob.Blob()
+ >>> conn.root().b.open('w').write('initial')
>>> transaction.commit()
- >>> conn.root.b.open('w').write('1')
+ >>> conn.root().b.open('w').write('1')
>>> _ = transaction.savepoint()
>>> transaction.abort()
- >>> os.listdir(tdir)
- []
+ >>> os.path.exists(savepoint_dir) and len(os.listdir(savepoint_dir)) > 0
+ False
>>> db.close()
"""
@@ -479,9 +463,13 @@
os.chdir(self.here)
shutil.rmtree(self.tmp)
- testSetUp = testTearDown = lambda self: None
+ def testSetUp(self):
+ transaction.abort()
+ def testTearDown(self):
+ transaction.abort()
+
def clean(tmp):
if os.path.isdir(tmp):
shutil.rmtree(tmp)
Modified: relstorage/trunk/relstorage/tests/packstresstest.py
===================================================================
--- relstorage/trunk/relstorage/tests/packstresstest.py 2009-09-25 22:09:42 UTC (rev 104555)
+++ relstorage/trunk/relstorage/tests/packstresstest.py 2009-09-25 22:13:39 UTC (rev 104556)
@@ -2,7 +2,7 @@
import logging
from ZODB.DB import DB
-from relstorage.relstorage import RelStorage
+from relstorage.storage import RelStorage
import transaction
from persistent.mapping import PersistentMapping
import random
Modified: relstorage/trunk/relstorage/tests/reltestbase.py
===================================================================
--- relstorage/trunk/relstorage/tests/reltestbase.py 2009-09-25 22:09:42 UTC (rev 104555)
+++ relstorage/trunk/relstorage/tests/reltestbase.py 2009-09-25 22:13:39 UTC (rev 104556)
@@ -15,7 +15,6 @@
from persistent import Persistent
from persistent.mapping import PersistentMapping
-from relstorage.relstorage import RelStorage
from relstorage.tests import fakecache
from ZODB.DB import DB
from ZODB.serialize import referencesf
@@ -42,6 +41,7 @@
raise NotImplementedError
def open(self, **kwargs):
+ from relstorage.storage import RelStorage
adapter = self.make_adapter()
self._storage = RelStorage(adapter, **kwargs)
Modified: relstorage/trunk/relstorage/tests/speedtest.py
===================================================================
--- relstorage/trunk/relstorage/tests/speedtest.py 2009-09-25 22:09:42 UTC (rev 104555)
+++ relstorage/trunk/relstorage/tests/speedtest.py 2009-09-25 22:13:39 UTC (rev 104556)
@@ -32,7 +32,7 @@
from ZODB.DB import DB
from ZODB.Connection import Connection
-from relstorage.relstorage import RelStorage
+from relstorage.storage import RelStorage
debug = False
txn_count = 10
Modified: relstorage/trunk/relstorage/tests/testmysql.py
===================================================================
--- relstorage/trunk/relstorage/tests/testmysql.py 2009-09-25 22:09:42 UTC (rev 104555)
+++ relstorage/trunk/relstorage/tests/testmysql.py 2009-09-25 22:13:39 UTC (rev 104556)
@@ -13,7 +13,6 @@
##############################################################################
"""Tests of relstorage.adapters.mysql"""
-from relstorage.adapters.mysql import MySQLAdapter
from relstorage.tests.hftestbase import HistoryFreeFromFileStorage
from relstorage.tests.hftestbase import HistoryFreeRelStorageTests
from relstorage.tests.hftestbase import HistoryFreeToFileStorage
@@ -21,10 +20,12 @@
from relstorage.tests.hptestbase import HistoryPreservingRelStorageTests
from relstorage.tests.hptestbase import HistoryPreservingToFileStorage
import logging
+import os
import unittest
class UseMySQLAdapter:
def make_adapter(self):
+ from relstorage.adapters.mysql import MySQLAdapter
if self.keep_history:
db = 'relstoragetest'
else:
@@ -82,7 +83,8 @@
from relstorage.tests.blob.testblob import storage_reusable_suite
for keep_history in (False, True):
def create_storage(name, blob_dir, keep_history=keep_history):
- from relstorage.relstorage import RelStorage
+ from relstorage.storage import RelStorage
+ from relstorage.adapters.mysql import MySQLAdapter
db = db_names[name]
if not keep_history:
db += '_hf'
@@ -93,7 +95,7 @@
passwd='relstoragetest',
)
storage = RelStorage(adapter, name=name, create=True,
- blob_dir=blob_dir)
+ blob_dir=os.path.abspath(blob_dir))
storage.zap_all()
return storage
Modified: relstorage/trunk/relstorage/tests/testoracle.py
===================================================================
--- relstorage/trunk/relstorage/tests/testoracle.py 2009-09-25 22:09:42 UTC (rev 104555)
+++ relstorage/trunk/relstorage/tests/testoracle.py 2009-09-25 22:13:39 UTC (rev 104556)
@@ -18,7 +18,6 @@
import re
import unittest
-from relstorage.adapters.oracle import OracleAdapter
from relstorage.tests.hptestbase import HistoryPreservingFromFileStorage
from relstorage.tests.hptestbase import HistoryPreservingRelStorageTests
from relstorage.tests.hptestbase import HistoryPreservingToFileStorage
@@ -42,6 +41,7 @@
class UseOracleAdapter:
def make_adapter(self):
+ from relstorage.adapters.oracle import OracleAdapter
user, password, dsn = getOracleParams()
return OracleAdapter(user, password, dsn)
Modified: relstorage/trunk/relstorage/tests/testpostgresql.py
===================================================================
--- relstorage/trunk/relstorage/tests/testpostgresql.py 2009-09-25 22:09:42 UTC (rev 104555)
+++ relstorage/trunk/relstorage/tests/testpostgresql.py 2009-09-25 22:13:39 UTC (rev 104556)
@@ -13,7 +13,6 @@
##############################################################################
"""Tests of relstorage.adapters.postgresql"""
-from relstorage.adapters.postgresql import PostgreSQLAdapter
from relstorage.tests.hftestbase import HistoryFreeFromFileStorage
from relstorage.tests.hftestbase import HistoryFreeRelStorageTests
from relstorage.tests.hftestbase import HistoryFreeToFileStorage
@@ -21,10 +20,12 @@
from relstorage.tests.hptestbase import HistoryPreservingRelStorageTests
from relstorage.tests.hptestbase import HistoryPreservingToFileStorage
import logging
+import os
import unittest
class UsePostgreSQLAdapter:
def make_adapter(self):
+ from relstorage.adapters.postgresql import PostgreSQLAdapter
if self.keep_history:
db = 'relstoragetest'
else:
@@ -81,7 +82,8 @@
from relstorage.tests.blob.testblob import storage_reusable_suite
for keep_history in (False, True):
def create_storage(name, blob_dir, keep_history=keep_history):
- from relstorage.relstorage import RelStorage
+ from relstorage.storage import RelStorage
+ from relstorage.adapters.postgresql import PostgreSQLAdapter
db = db_names[name]
if not keep_history:
db += '_hf'
@@ -90,7 +92,7 @@
adapter = PostgreSQLAdapter(
keep_history=keep_history, dsn=dsn)
storage = RelStorage(adapter, name=name, create=True,
- blob_dir=blob_dir)
+ blob_dir=os.path.abspath(blob_dir))
storage.zap_all()
return storage
Added: relstorage/trunk/relstorage/util.py
===================================================================
--- relstorage/trunk/relstorage/util.py (rev 0)
+++ relstorage/trunk/relstorage/util.py 2009-09-25 22:13:39 UTC (rev 104556)
@@ -0,0 +1,53 @@
+##############################################################################
+#
+# Copyright (c) 2009 Zope Foundation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""Utilities needed by RelStorage"""
+
+try:
+ from ZODB.blob import is_blob_record
+ # ZODB 3.9
+except ImportError:
+ try:
+ from ZODB.blob import Blob
+ except ImportError:
+ # ZODB < 3.8
+ def is_blob_record(record):
+ False
+ else:
+ # ZODB 3.8
+ import cPickle
+ import cStringIO
+
+ def find_global_Blob(module, class_):
+ if module == 'ZODB.blob' and class_ == 'Blob':
+ return Blob
+
+ def is_blob_record(record):
+ """Check whether a database record is a blob record.
+
+ This is primarily intended to be used when copying data from one
+ storage to another.
+
+ """
+ if record and ('ZODB.blob' in record):
+ unpickler = cPickle.Unpickler(cStringIO.StringIO(record))
+ unpickler.find_global = find_global_Blob
+
+ try:
+ return unpickler.load() is Blob
+ except (MemoryError, KeyboardInterrupt, SystemExit):
+ raise
+ except Exception:
+ pass
+
+ return False
More information about the checkins
mailing list