[Checkins] SVN: relstorage/trunk/relstorage/ Tidying: moved memcache integration to a separate module (cache.py)
Shane Hathaway
shane at hathawaymix.org
Wed Oct 14 04:35:11 EDT 2009
Log message for revision 105059:
Tidying: moved memcache integration to a separate module (cache.py)
and removed BaseStorage as a base class. Also added code that
adds to memcache on object store.
Changed:
A relstorage/trunk/relstorage/autotemp.py
A relstorage/trunk/relstorage/cache.py
U relstorage/trunk/relstorage/storage.py
U relstorage/trunk/relstorage/tests/fakecache.py
U relstorage/trunk/relstorage/tests/reltestbase.py
A relstorage/trunk/relstorage/tests/test_autotemp.py
A relstorage/trunk/relstorage/tests/test_cache.py
U relstorage/trunk/relstorage/tests/testmysql.py
U relstorage/trunk/relstorage/tests/testoracle.py
U relstorage/trunk/relstorage/tests/testpostgresql.py
-=-
Added: relstorage/trunk/relstorage/autotemp.py
===================================================================
--- relstorage/trunk/relstorage/autotemp.py (rev 0)
+++ relstorage/trunk/relstorage/autotemp.py 2009-10-14 08:35:10 UTC (rev 105059)
@@ -0,0 +1,52 @@
+##############################################################################
+#
+# Copyright (c) 2009 Zope Foundation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+
+from cStringIO import StringIO
+import tempfile
+
+class AutoTemporaryFile(object):
+ """Initially a StringIO, but becomes a TemporaryFile if it grows large.
+
+ Not thread safe.
+ """
+
+ def __init__(self, threshold=10*1024*1024):
+ self._threshold = threshold
+ self._f = StringIO()
+
+ def read(self, n=None):
+ if n is not None:
+ return self._f.read(n)
+ else:
+ return self._f.read()
+
+ def seek(self, pos, mode=0):
+ self._f.seek(pos, mode)
+
+ def tell(self):
+ return self._f.tell()
+
+ def close(self):
+ self._f.close()
+
+ def write(self, data):
+ threshold = self._threshold
+ if threshold and self._f.tell() + len(data) >= threshold:
+ # convert to TemporaryFile
+ self._threshold = 0
+ f = tempfile.TemporaryFile()
+ f.write(self._f.getvalue())
+ f.seek(self._f.tell())
+ self._f = f
+ self._f.write(data)
Added: relstorage/trunk/relstorage/cache.py
===================================================================
--- relstorage/trunk/relstorage/cache.py (rev 0)
+++ relstorage/trunk/relstorage/cache.py 2009-10-14 08:35:10 UTC (rev 105059)
@@ -0,0 +1,183 @@
+##############################################################################
+#
+# Copyright (c) 2009 Zope Foundation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+
+from relstorage.autotemp import AutoTemporaryFile
+from ZODB.utils import p64
+from ZODB.utils import u64
+import time
+
+
+class StorageCache(object):
+ """RelStorage integration with memcached or similar.
+ """
+
+ # send_limit: max approx. bytes to buffer before sending to the cache
+ send_limit = 1024 * 1024
+
+ def __init__(self, options):
+ module_name = options.cache_module_name
+ module = __import__(module_name, {}, {}, ['Client'])
+ servers = options.cache_servers
+ if isinstance(servers, basestring):
+ servers = servers.split()
+ self.client = module.Client(servers)
+ self.prefix = options.cache_prefix or ''
+
+ # queue is an AutoTemporaryFile during txn commit.
+ self.queue = None
+
+ # queue_contents is a map of {oid: (startpos, endpos)}
+ # during txn commit.
+ self.queue_contents = None
+
+ # commit_count_key is the cache key to poll for changes
+ self.commit_count_key = '%s:commit_count' % self.prefix
+
+ # polled_commit_count contains the last polled value of the
+ # 'commit_count' cache key
+ self.polled_commit_count = 0
+
+ def flush_all(self):
+ """Remove all data from the cache. Called by RelStorage.zap_all()"""
+ self.client.flush_all()
+
+ def load(self, cursor, oid_int, prev_polled_tid, adapter):
+ """Load the given object from cache if possible.
+
+ Fall back to loading from the database.
+ """
+ client = self.client
+ state_key = '%s:state:%d' % (self.prefix, oid_int)
+ if prev_polled_tid:
+ backptr_key = '%s:back:%d:%d' % (
+ self.prefix, prev_polled_tid, oid_int)
+ v = client.get_multi([state_key, backptr_key])
+ if v is not None:
+ cache_data = v.get(state_key)
+ backptr = v.get(backptr_key)
+ else:
+ cache_data = None
+ backptr = None
+ else:
+ cache_data = client.get(state_key)
+ backptr = None
+
+ state = None
+ if cache_data and len(cache_data) >= 8:
+ # validate the cache result
+ tid = cache_data[:8]
+ tid_int = u64(tid)
+ if tid_int == prev_polled_tid or tid == backptr:
+ # the cached data is current.
+ state = cache_data[8:]
+
+ if state is None:
+ # could not load from cache, so get from the database
+ state, tid_int = adapter.mover.load_current(
+ cursor, oid_int)
+ state = str(state or '')
+ if tid_int is not None:
+ # cache the result
+ to_cache = {}
+ tid = p64(tid_int)
+ new_cache_data = tid + state
+ if new_cache_data != cache_data:
+ to_cache[state_key] = new_cache_data
+ if prev_polled_tid and prev_polled_tid != tid_int:
+ to_cache[backptr_key] = tid
+ if to_cache:
+ client.set_multi(to_cache)
+
+ return state, tid_int
+
+ def tpc_begin(self):
+ """Prepare temp space for objects to cache."""
+ self.queue = AutoTemporaryFile()
+ self.queue_contents = {}
+
+ def store_temp(self, oid_int, state):
+ """Queue an object for caching.
+
+ Typically, we can't actually cache the object yet, because its
+ transaction ID is not yet chosen.
+ """
+ assert isinstance(state, str)
+ queue = self.queue
+ queue.seek(0, 2) # seek to end
+ startpos = queue.tell()
+ queue.write(state)
+ endpos = queue.tell()
+ self.queue_contents[oid_int] = (startpos, endpos)
+
+ def tpc_vote(self, tid):
+ """Now that the tid is chosen, send queued objects to the cache.
+ """
+ client = self.client
+ assert len(tid) == 8
+ send_size = 0
+ to_send = {}
+
+ items = [
+ (startpos, endpos, oid_int)
+ for (oid_int, (startpos, endpos)) in self.queue_contents.items()
+ ]
+ items.sort()
+
+ for startpos, endpos, oid_int in items:
+ self.queue.seek(startpos)
+ length = endpos - startpos
+ state = self.queue.read(length)
+ if len(state) != length:
+ raise AssertionError("Queued cache data is truncated")
+ cachekey = '%s:state:%d' % (self.prefix, oid_int)
+ to_send[cachekey] = '%s%s' % (tid, state)
+ send_size += length + len(cachekey)
+ if send_size >= self.send_limit:
+ client.set_multi(to_send)
+ to_send.clear()
+ send_size = 0
+
+ if to_send:
+ client.set_multi(to_send)
+
+ self.queue_contents.clear()
+ self.queue.seek(0)
+
+
+ def tpc_finish(self):
+ """Update the commit count in the cache."""
+ client = self.client
+ cachekey = self.commit_count_key
+ if client.incr(cachekey) is None:
+ # Use the current time as an initial commit_count value.
+ client.add(cachekey, int(time.time()))
+ # A concurrent committer could have won the race to set the
+ # initial commit_count. Increment commit_count so that it
+ # doesn't matter who won.
+ client.incr(cachekey)
+
+ def clear_temp(self):
+ """Clear any transactional data. Called after txn finish or abort."""
+ self.queue_contents = None
+ if self.queue is not None:
+ self.queue.close()
+ self.queue = None
+
+ def need_poll(self):
+ """Return True if the commit count has changed"""
+ new_commit_count = self.client.get(self.commit_count_key)
+ if new_commit_count != self.polled_commit_count:
+ self.polled_commit_count = new_commit_count
+ return True
+ return False
Modified: relstorage/trunk/relstorage/storage.py
===================================================================
--- relstorage/trunk/relstorage/storage.py 2009-10-13 22:41:29 UTC (rev 105058)
+++ relstorage/trunk/relstorage/storage.py 2009-10-14 08:35:10 UTC (rev 105059)
@@ -17,14 +17,15 @@
"""
from persistent.TimeStamp import TimeStamp
+from relstorage.cache import StorageCache
from relstorage.options import Options
from relstorage.util import is_blob_record
-from ZODB.BaseStorage import BaseStorage
from ZODB.BaseStorage import DataRecord
from ZODB.BaseStorage import TransactionRecord
from ZODB import ConflictResolution
from ZODB import POSException
from ZODB.POSException import POSKeyError
+from ZODB.UndoLogCompatible import UndoLogCompatible
from ZODB.utils import p64
from ZODB.utils import u64
from zope.interface import implements
@@ -35,6 +36,7 @@
import os
import sys
import tempfile
+import threading
import time
import weakref
import ZODB.interfaces
@@ -70,11 +72,72 @@
abort_early = os.environ.get('RELSTORAGE_ABORT_EARLY')
-class RelStorage(BaseStorage,
- ConflictResolution.ConflictResolvingStorage):
+class RelStorage(
+ UndoLogCompatible,
+ ConflictResolution.ConflictResolvingStorage
+ ):
"""Storage to a relational database, based on invalidation polling"""
implements(*_relstorage_interfaces)
+ _transaction=None # Transaction that is being committed
+ _tstatus=' ' # Transaction status, used for copying data
+ _is_read_only = False
+
+ # load_conn and load_cursor are open most of the time.
+ _load_conn = None
+ _load_cursor = None
+ _load_transaction_open = False
+
+ # store_conn and store_cursor are open during commit,
+ # but not necessarily open at other times.
+ _store_conn = None
+ _store_cursor = None
+
+ # _tid is the current transaction ID being committed; generally
+ # only set after tpc_vote().
+ _tid = None
+
+ # _ltid is the ID of the last transaction committed by this instance.
+ _ltid = None
+
+ # _prepared_txn is the name of the transaction to commit in the
+ # second phase.
+ _prepared_txn = None
+
+ # _closed is True after self.close() is called. Since close()
+ # can be called from another thread, access to self._closed should
+ # be inside a _lock_acquire()/_lock_release() block.
+ _closed = False
+
+ # _max_stored_oid is the highest OID stored by the current
+ # transaction
+ _max_stored_oid = 0
+
+ # _max_new_oid is the highest OID provided by new_oid()
+ _max_new_oid = 0
+
+ # _cache, if set, is a StorageCache object.
+ _cache = None
+
+ # _prev_polled_tid contains the tid at the previous poll
+ _prev_polled_tid = None
+
+ # _poll_at is the time to force a poll
+ _poll_at = 0
+
+ # If the blob directory is set, fshelper is a filesystem blob
+ # helper. Otherwise, fshelper is None.
+ fshelper = None
+
+ # _txn_blobs: {oid->filename}; contains blob data for the
+ # currently uncommitted transaction.
+ _txn_blobs = None
+
+ # _batcher: An object that accumulates store operations
+ # so they can be executed in batch (to minimize latency).
+ _batcher = None
+
+
def __init__(self, adapter, name=None, create=True,
options=None, **kwoptions):
self._adapter = adapter
@@ -90,92 +153,39 @@
name = options.name
if not name:
name = 'RelStorage: %s' % adapter
- self._name = name
+ self.__name__ = name
self._is_read_only = options.read_only
- self._cache_client = None
if create:
self._adapter.schema.prepare()
- # load_conn and load_cursor are open most of the time.
- self._load_conn = None
- self._load_cursor = None
- self._load_transaction_open = False
self._open_load_connection()
- # store_conn and store_cursor are open during commit,
- # but not necessarily open at other times.
- self._store_conn = None
- self._store_cursor = None
- BaseStorage.__init__(self, name)
+ self.__lock = threading.RLock()
+ self.__commit_lock = threading.Lock()
+ self._lock_acquire = self.__lock.acquire
+ self._lock_release = self.__lock.release
+ self._commit_lock_acquire = self.__commit_lock.acquire
+ self._commit_lock_release = self.__commit_lock.release
- self._tid = None
- self._ltid = None
-
- # _prepared_txn is the name of the transaction to commit in the
- # second phase.
- self._prepared_txn = None
-
# _instances is a list of weak references to storage instances bound
# to the same database.
self._instances = []
- # _closed is True after self.close() is called. Since close()
- # can be called from another thread, access to self._closed should
- # be inside a _lock_acquire()/_lock_release() block.
- self._closed = False
-
- # _max_stored_oid is the highest OID stored by the current
- # transaction
- self._max_stored_oid = 0
-
- # _max_new_oid is the highest OID provided by new_oid()
- self._max_new_oid = 0
-
# _preallocated_oids contains OIDs provided by the database
# but not yet used.
self._preallocated_oids = []
- # set _cache_client
if options.cache_servers:
- module_name = options.cache_module_name
- module = __import__(module_name, {}, {}, ['Client'])
- servers = options.cache_servers
- if isinstance(servers, basestring):
- servers = servers.split()
- self._cache_client = module.Client(servers)
- self._cache_prefix = options.cache_prefix or ''
- else:
- self._cache_client = None
- self._cache_prefix = ''
+ self._cache = StorageCache(options)
- # _prev_polled_tid contains the tid at the previous poll
- self._prev_polled_tid = None
-
- # _polled_commit_count contains the last polled value of the
- # 'commit_count' cache key
- self._polled_commit_count = 0
-
- # _poll_at is the time to poll regardless of commit_count
- self._poll_at = 0
-
- # _txn_blobs: {oid->filename}; contains blob data for the
- # currently uncommitted transaction.
- self._txn_blobs = None
-
- # _batcher: An object that accumulates store operations
- # so they can be executed in batch (to minimize latency).
- self._batcher = None
-
if options.blob_dir:
from ZODB.blob import FilesystemHelper
self.fshelper = FilesystemHelper(options.blob_dir)
if create:
self.fshelper.create()
self.fshelper.checkSecure()
- else:
- self.fshelper = None
def _open_load_connection(self):
"""Open the load connection to the database. Return nothing."""
@@ -260,7 +270,7 @@
"""
self._adapter.schema.zap_all()
self._rollback_load_connection()
- cache = self._cache_client
+ cache = self._cache
if cache is not None:
cache.flush_all()
@@ -294,7 +304,7 @@
See ZODB.interfaces.IMVCCStorage.
"""
adapter = self._adapter.new_instance()
- other = RelStorage(adapter=adapter, name=self._name,
+ other = RelStorage(adapter=adapter, name=self.__name__,
create=False, options=self._options)
self._instances.append(weakref.ref(other))
return other
@@ -302,10 +312,27 @@
def __len__(self):
return self._adapter.stats.get_object_count()
+ def sortKey(self):
+ """Return a string that can be used to sort storage instances.
+
+ The key must uniquely identify a storage and must be the same
+ across multiple instantiations of the same storage.
+ """
+ return self.__name__
+
+ def getName(self):
+ return self.__name__
+
def getSize(self):
"""Return database size in bytes"""
return self._adapter.stats.get_db_size()
+ def registerDB(self, db):
+ pass # we don't care
+
+ def isReadOnly(self):
+ return self._is_read_only
+
def _log_keyerror(self, oid_int, reason):
"""Log just before raising POSKeyError in load().
@@ -356,7 +383,7 @@
def load(self, oid, version):
oid_int = u64(oid)
- cache = self._cache_client
+ cache = self._cache
self._lock_acquire()
try:
@@ -367,51 +394,10 @@
if cache is None:
state, tid_int = self._adapter.mover.load_current(
cursor, oid_int)
- state = str(state or '')
-
else:
- # try to load from cache
- prefix = self._cache_prefix
- state_key = '%s:state:%d' % (prefix, oid_int)
- my_tid = self._prev_polled_tid
- if my_tid:
- backptr_key = '%s:back:%d:%d' % (prefix, my_tid, oid_int)
- v = cache.get_multi([state_key, backptr_key])
- if v is not None:
- cache_data = v.get(state_key)
- backptr = v.get(backptr_key)
- else:
- cache_data = None
- backptr = None
- else:
- cache_data = cache.get(state_key)
- backptr = None
+ state, tid_int = cache.load(
+ cursor, oid_int, self._prev_polled_tid, self._adapter)
- state = None
- if cache_data and len(cache_data) >= 8:
- # validate the cache result
- tid = cache_data[:8]
- tid_int = u64(tid)
- if tid_int == my_tid or tid == backptr:
- # the cached data is current.
- state = cache_data[8:]
-
- if state is None:
- # could not load from cache, so get from the database
- state, tid_int = self._adapter.mover.load_current(
- cursor, oid_int)
- state = str(state or '')
- if tid_int is not None:
- # cache the result
- to_cache = {}
- tid = p64(tid_int)
- new_cache_data = tid + state
- if new_cache_data != cache_data:
- to_cache[state_key] = new_cache_data
- if my_tid and my_tid != tid_int:
- to_cache[backptr_key] = tid
- if to_cache:
- cache.set_multi(to_cache)
finally:
self._lock_release()
@@ -421,11 +407,16 @@
# an object whose creation has been undone.
self._log_keyerror(oid_int, "creation has been undone")
raise POSKeyError(oid)
+ state = str(state or '')
return state, p64(tid_int)
else:
self._log_keyerror(oid_int, "no tid found")
raise POSKeyError(oid)
+ def getTid(self, oid):
+ state, serial = self.load(oid, '')
+ return serial
+
def loadEx(self, oid, version):
# Since we don't support versions, just tack the empty version
# string onto load's result.
@@ -521,6 +512,9 @@
# save the data in a temporary table
adapter.mover.store_temp(
cursor, self._batcher, oid_int, prev_tid_int, data)
+ cache = self._cache
+ if cache is not None:
+ cache.store_temp(oid_int, data)
return None
finally:
self._lock_release()
@@ -566,8 +560,8 @@
self._lock_release()
self._commit_lock_acquire()
self._lock_acquire()
+ self._clear_temp()
self._transaction = transaction
- self._clear_temp()
user = str(transaction.user)
desc = str(transaction.description)
@@ -583,9 +577,12 @@
adapter = self._adapter
self._batcher = self._adapter.mover.make_batcher(
self._store_cursor)
+ cache = self._cache
+ if cache is not None:
+ cache.tpc_begin()
if tid is not None:
- # get the commit lock and add the transaction now
+ # hold the commit lock and add the transaction now
cursor = self._store_cursor
packed = (status == 'p')
adapter.locker.hold_commit_lock(cursor, ensure_current=True)
@@ -602,6 +599,9 @@
finally:
self._lock_release()
+ def tpc_transaction(self):
+ return self._transaction
+
def _prepare_tid(self):
"""Choose a tid for the current transaction.
@@ -633,11 +633,19 @@
def _clear_temp(self):
+ # Clear all attributes used for transaction commit.
# It is assumed that self._lock_acquire was called before this
# method was called.
+ self._transaction = None
+ self._ude = None
+ self._tid = None
self._prepared_txn = None
self._max_stored_oid = 0
self._batcher = None
+ self._txn_blobs = None
+ cache = self._cache
+ if cache is not None:
+ cache.clear_temp()
def _finish_store(self):
@@ -649,6 +657,7 @@
assert self._tid is not None
cursor = self._store_cursor
adapter = self._adapter
+ cache = self._cache
# Detect conflicting changes.
# Try to resolve the conflicts.
@@ -674,6 +683,8 @@
self._adapter.mover.replace_temp(
cursor, oid_int, prev_tid_int, data)
resolved.add(oid)
+ if cache is not None:
+ cache.store_temp(oid_int, data)
# Move the new states into the permanent table
tid_int = u64(self._tid)
@@ -690,6 +701,22 @@
return serials
+ def tpc_vote(self, transaction):
+ self._lock_acquire()
+ try:
+ if transaction is not self._transaction:
+ return
+ try:
+ return self._vote()
+ except:
+ if abort_early:
+ # abort early to avoid lockups while running the
+ # somewhat brittle ZODB test suite
+ self.tpc_abort(transaction)
+ raise
+ finally:
+ self._lock_release()
+
def _vote(self):
"""Prepare the transaction for final commit."""
# This method initiates a two-phase commit process,
@@ -731,22 +758,28 @@
ZODB.blob.rename_or_copy_blob(sourcename, targetname)
self._txn_blobs[oid] = targetname
+ cache = self._cache
+ if cache is not None:
+ cache.tpc_vote(self._tid)
+
return serials
- def tpc_vote(self, transaction):
+ def tpc_finish(self, transaction, f=None):
self._lock_acquire()
try:
if transaction is not self._transaction:
return
try:
- return self._vote()
- except:
- if abort_early:
- # abort early to avoid lockups while running the
- # somewhat brittle ZODB test suite
- self.tpc_abort(transaction)
- raise
+ try:
+ if f is not None:
+ f(self._tid)
+ u, d, e = self._ude
+ self._finish(self._tid, u, d, e)
+ finally:
+ self._clear_temp()
+ finally:
+ self._commit_lock_release()
finally:
self._lock_release()
@@ -756,70 +789,65 @@
# It is assumed that self._lock_acquire was called before this
# method was called.
assert self._tid is not None
- try:
- self._rollback_load_connection()
- txn = self._prepared_txn
- assert txn is not None
- self._adapter.txncontrol.commit_phase2(
- self._store_conn, self._store_cursor, txn)
- self._adapter.locker.release_commit_lock(self._store_cursor)
- cache = self._cache_client
- if cache is not None:
- cachekey = '%s:commit_count' % self._cache_prefix
- if cache.incr(cachekey) is None:
- # Use the current time as an initial commit_count value.
- cache.add(cachekey, int(time.time()))
- # A concurrent committer could have won the race to set the
- # initial commit_count. Increment commit_count so that it
- # doesn't matter who won.
- cache.incr(cachekey)
- self._ltid = self._tid
+ self._rollback_load_connection()
+ txn = self._prepared_txn
+ assert txn is not None
+ self._adapter.txncontrol.commit_phase2(
+ self._store_conn, self._store_cursor, txn)
+ self._adapter.locker.release_commit_lock(self._store_cursor)
+ cache = self._cache
+ if cache is not None:
+ cache.tpc_finish()
+ self._ltid = self._tid
- #if self._txn_blobs and not self._adapter.keep_history:
- ## For each blob just committed, get the name of
- ## one earlier revision (if any) and write the
- ## name of the file to a log. At pack time,
- ## all the files in the log will be deleted and
- ## the log will be cleared.
- #for oid, filename in self._txn_blobs.iteritems():
- #dirname, current_name = os.path.split(filename)
- #names = os.listdir(dirname)
- #names.sort()
- #if current_name in names:
- #i = names.index(current_name)
- #if i > 0:
- # to_delete = os.path.join(dirname, names[i-1])
- # log.write('%s\n') % to_delete
+ #if self._txn_blobs and not self._adapter.keep_history:
+ ## For each blob just committed, get the name of
+ ## one earlier revision (if any) and write the
+ ## name of the file to a log. At pack time,
+ ## all the files in the log will be deleted and
+ ## the log will be cleared.
+ #for oid, filename in self._txn_blobs.iteritems():
+ #dirname, current_name = os.path.split(filename)
+ #names = os.listdir(dirname)
+ #names.sort()
+ #if current_name in names:
+ #i = names.index(current_name)
+ #if i > 0:
+ # to_delete = os.path.join(dirname, names[i-1])
+ # log.write('%s\n') % to_delete
+
+ def tpc_abort(self, transaction):
+ self._lock_acquire()
+ try:
+ if transaction is not self._transaction:
+ return
+ try:
+ try:
+ self._abort()
+ finally:
+ self._clear_temp()
+ finally:
+ self._commit_lock_release()
finally:
- self._txn_blobs = None
- self._prepared_txn = None
- self._tid = None
- self._transaction = None
- self._batcher = None
+ self._lock_release()
def _abort(self):
# the lock is held here
- try:
- self._rollback_load_connection()
- if self._store_cursor is not None:
- self._adapter.txncontrol.abort(
- self._store_conn, self._store_cursor, self._prepared_txn)
- self._adapter.locker.release_commit_lock(self._store_cursor)
- if self._txn_blobs:
- for oid, filename in self._txn_blobs.iteritems():
- if os.path.exists(filename):
- ZODB.blob.remove_committed(filename)
- dirname = os.path.dirname(filename)
- if not os.listdir(dirname):
- ZODB.blob.remove_committed_dir(dirname)
- finally:
- self._txn_blobs = None
- self._prepared_txn = None
- self._tid = None
- self._transaction = None
- self._batcher = None
+ self._rollback_load_connection()
+ if self._store_cursor is not None:
+ self._adapter.txncontrol.abort(
+ self._store_conn, self._store_cursor, self._prepared_txn)
+ self._adapter.locker.release_commit_lock(self._store_cursor)
+ if self._txn_blobs:
+ for oid, filename in self._txn_blobs.iteritems():
+ if os.path.exists(filename):
+ ZODB.blob.remove_committed(filename)
+ dirname = os.path.dirname(filename)
+ if not os.listdir(dirname):
+ ZODB.blob.remove_committed_dir(dirname)
+
def lastTransaction(self):
return self._ltid
@@ -1119,13 +1147,10 @@
"""Return true if polling is needed"""
now = time.time()
- cache = self._cache_client
+ cache = self._cache
if cache is not None:
- new_commit_count = cache.get(
- '%s:commit_count' % self._cache_prefix)
- if new_commit_count != self._polled_commit_count:
+ if cache.need_poll():
# There is new data ready to poll
- self._polled_commit_count = new_commit_count
self._poll_at = now
return True
Modified: relstorage/trunk/relstorage/tests/fakecache.py
===================================================================
--- relstorage/trunk/relstorage/tests/fakecache.py 2009-10-13 22:41:29 UTC (rev 105058)
+++ relstorage/trunk/relstorage/tests/fakecache.py 2009-10-14 08:35:10 UTC (rev 105059)
@@ -44,3 +44,7 @@
value = int(value) + 1
data[key] = value
return value
+
+ def flush_all(self):
+ data.clear()
+
Modified: relstorage/trunk/relstorage/tests/reltestbase.py
===================================================================
--- relstorage/trunk/relstorage/tests/reltestbase.py 2009-10-13 22:41:29 UTC (rev 105058)
+++ relstorage/trunk/relstorage/tests/reltestbase.py 2009-10-14 08:35:10 UTC (rev 105059)
@@ -224,7 +224,7 @@
self.assertEqual(len(got), len(data))
self.assertEqual(got, data)
- def checkLoadFromCache(self):
+ def checkUseCache(self):
# Store an object, cache it, then retrieve it from the cache
self._storage._options.cache_servers = 'x:1 y:2'
self._storage._options.cache_module_name = fakecache.__name__
@@ -233,7 +233,7 @@
db = DB(self._storage)
try:
c1 = db.open()
- self.assertEqual(c1._storage._cache_client.servers, ['x:1', 'y:2'])
+ self.assert_(c1._storage._cache.client.servers, ['x:1', 'y:2'])
fakecache.data.clear()
r1 = c1.root()
# the root state should now be cached
@@ -243,11 +243,9 @@
transaction.commit()
self.assertTrue('zzz:commit_count' in fakecache.data)
self.assertEqual(sorted(fakecache.data.keys()),
- ['zzz:commit_count', 'zzz:state:0'])
- oid = r1['alpha']._p_oid
- self.assertEqual(sorted(fakecache.data.keys()),
- ['zzz:commit_count', 'zzz:state:0'])
+ ['zzz:commit_count', 'zzz:state:0', 'zzz:state:1'])
+ oid = r1['alpha']._p_oid
got, serial = c1._storage.load(oid, '')
# another state should now be cached
self.assertEqual(len(fakecache.data.keys()), 3)
Added: relstorage/trunk/relstorage/tests/test_autotemp.py
===================================================================
--- relstorage/trunk/relstorage/tests/test_autotemp.py (rev 0)
+++ relstorage/trunk/relstorage/tests/test_autotemp.py 2009-10-14 08:35:10 UTC (rev 105059)
@@ -0,0 +1,76 @@
+##############################################################################
+#
+# Copyright (c) 2009 Zope Foundation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+
+import unittest
+
+class AutoTemporaryFileTests(unittest.TestCase):
+
+ def getClass(self):
+ from relstorage.autotemp import AutoTemporaryFile
+ return AutoTemporaryFile
+
+ def test_defaults(self):
+ t = self.getClass()()
+ self.assertEqual(t._threshold, 10*1024*1024)
+
+ def test_write_and_read_limited(self):
+ t = self.getClass()()
+ t.write('abc')
+ self.assertEqual(t.tell(), 3)
+ t.seek(0)
+ self.assertEqual(t.tell(), 0)
+ self.assertEqual(t.read(2), 'ab')
+ self.assertEqual(t.tell(), 2)
+
+ def test_write_and_read_unlimited(self):
+ t = self.getClass()()
+ t.write('abc')
+ t.seek(0)
+ self.assertEqual(t.read(), 'abc')
+
+ def test_convert_to_temporary_file(self):
+ t = self.getClass()(threshold=4)
+ try:
+ self.assertEqual(t._threshold, 4)
+ t.write('abc')
+ self.assertEqual(t._threshold, 4)
+ t.write('d')
+ self.assertEqual(t._threshold, 0)
+ t.write('e')
+ t.seek(0)
+ self.assertEqual(t.read(), 'abcde')
+ finally:
+ t.close()
+
+ def test_overwrite_during_conversion(self):
+ t = self.getClass()(threshold=4)
+ try:
+ t.write('abc')
+ self.assertEqual(t._threshold, 4)
+ t.seek(1)
+ t.write('0')
+ self.assertEqual(t._threshold, 4)
+ t.write('1')
+ self.assertEqual(t._threshold, 4)
+ t.write('23')
+ self.assertEqual(t._threshold, 0)
+ t.seek(0)
+ self.assertEqual(t.read(), 'a0123')
+ finally:
+ t.close()
+
+def test_suite():
+ suite = unittest.TestSuite()
+ suite.addTest(unittest.makeSuite(AutoTemporaryFileTests))
+ return suite
Added: relstorage/trunk/relstorage/tests/test_cache.py
===================================================================
--- relstorage/trunk/relstorage/tests/test_cache.py (rev 0)
+++ relstorage/trunk/relstorage/tests/test_cache.py 2009-10-14 08:35:10 UTC (rev 105059)
@@ -0,0 +1,218 @@
+##############################################################################
+#
+# Copyright (c) 2009 Zope Foundation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+
+import unittest
+
+class StorageCacheTests(unittest.TestCase):
+
+ def setUp(self):
+ from relstorage.tests.fakecache import data
+ data.clear()
+
+ tearDown = setUp
+
+ def getClass(self):
+ from relstorage.cache import StorageCache
+ return StorageCache
+
+ def test_ctor(self):
+ from relstorage.tests.fakecache import Client
+ c = self.getClass()(MockOptions())
+ self.assert_(isinstance(c.client, Client))
+ self.assertEqual(c.client.servers, ['host:9999'])
+ self.assertEqual(c.prefix, 'myprefix')
+
+ def test_flush_all(self):
+ from relstorage.tests.fakecache import data
+ data.clear()
+ c = self.getClass()(MockOptions())
+ data['x'] = '1'
+ c.flush_all()
+ self.assert_(not data)
+
+ def test_load_from_current_transaction(self):
+ from relstorage.tests.fakecache import data
+ from ZODB.utils import p64
+ c = self.getClass()(MockOptions())
+ tid_int = 50
+ tid = p64(tid_int)
+ data['myprefix:state:2'] = tid + 'STATE'
+ state, got_tid_int = c.load(None, 2, tid_int, None)
+ self.assertEqual(state, 'STATE')
+ self.assertEqual(got_tid_int, tid_int)
+
+ def test_load_from_backptr(self):
+ from relstorage.tests.fakecache import data
+ from ZODB.utils import p64
+ c = self.getClass()(MockOptions())
+ tid_int = 50
+ tid = p64(tid_int)
+ data['myprefix:state:2'] = tid + 'STATE'
+ data['myprefix:back:60:2'] = tid
+ state, got_tid_int = c.load(None, 2, 60, None)
+ self.assertEqual(state, 'STATE')
+ self.assertEqual(got_tid_int, tid_int)
+
+ def test_load_backptr_missing(self):
+ from relstorage.tests.fakecache import data
+ from ZODB.utils import p64
+ c = self.getClass()(MockOptions())
+ tid_int = 50
+ tid = p64(tid_int)
+ data['myprefix:state:2'] = tid + 'STATE'
+ adapter = MockAdapter()
+ adapter.mover.data[2] = ('STATE', 50)
+ state, got_tid_int = c.load(None, 2, 60, adapter)
+ self.assertEqual(state, 'STATE')
+ self.assertEqual(got_tid_int, 50)
+ self.assertEqual(data, {
+ 'myprefix:state:2': tid + 'STATE',
+ 'myprefix:back:60:2': tid,
+ })
+
+ def test_load_state_expired(self):
+ from relstorage.tests.fakecache import data
+ from ZODB.utils import p64
+ c = self.getClass()(MockOptions())
+ tid_int = 50
+ tid = p64(tid_int)
+ data['myprefix:state:2'] = tid + 'STATE'
+ adapter = MockAdapter()
+ adapter.mover.data[2] = ('NEWSTATE', 55)
+ state, got_tid_int = c.load(None, 2, 60, adapter)
+ self.assertEqual(state, 'NEWSTATE')
+ self.assertEqual(got_tid_int, 55)
+ self.assertEqual(data, {
+ 'myprefix:state:2': p64(55) + 'NEWSTATE',
+ 'myprefix:back:60:2': p64(55),
+ })
+
+ def test_load_state_missing(self):
+ from relstorage.tests.fakecache import data
+ from ZODB.utils import p64
+ c = self.getClass()(MockOptions())
+ tid_int = 50
+ tid = p64(tid_int)
+ adapter = MockAdapter()
+ adapter.mover.data[2] = ('NEWSTATE', 55)
+ state, got_tid_int = c.load(None, 2, 60, adapter)
+ self.assertEqual(state, 'NEWSTATE')
+ self.assertEqual(got_tid_int, 55)
+ self.assertEqual(data, {
+ 'myprefix:state:2': p64(55) + 'NEWSTATE',
+ 'myprefix:back:60:2': p64(55),
+ })
+
+ def test_load_no_object(self):
+ c = self.getClass()(MockOptions())
+ adapter = MockAdapter()
+ state, got_tid_int = c.load(None, 2, 60, adapter)
+ self.assertEqual(state, '')
+ self.assertEqual(got_tid_int, None)
+
+ def test_store_temp(self):
+ c = self.getClass()(MockOptions())
+ c.tpc_begin()
+ c.store_temp(2, 'abc')
+ c.store_temp(1, 'def')
+ c.store_temp(2, 'ghi')
+ self.assertEqual(c.queue_contents, {1: (3, 6), 2: (6, 9)})
+ c.queue.seek(0)
+ self.assertEqual(c.queue.read(), 'abcdefghi')
+
+ def test_tpc_vote_small(self):
+ from relstorage.tests.fakecache import data
+ from ZODB.utils import p64
+ c = self.getClass()(MockOptions())
+ c.tpc_begin()
+ c.store_temp(2, 'abc')
+ c.store_temp(3, 'def')
+ tid = p64(55)
+ c.tpc_vote(tid)
+ self.assertEqual(data, {
+ 'myprefix:state:2': tid + 'abc',
+ 'myprefix:state:3': tid + 'def',
+ })
+
+ def test_tpc_vote_large(self):
+ from relstorage.tests.fakecache import data
+ from ZODB.utils import p64
+ c = self.getClass()(MockOptions())
+ c.send_limit = 100
+ c.tpc_begin()
+ c.store_temp(2, 'abc')
+ c.store_temp(3, 'def' * 100)
+ tid = p64(55)
+ c.tpc_vote(tid)
+ self.assertEqual(data, {
+ 'myprefix:state:2': tid + 'abc',
+ 'myprefix:state:3': tid + ('def' * 100),
+ })
+
+ def test_tpc_vote_none(self):
+ from relstorage.tests.fakecache import data
+ from ZODB.utils import p64
+ c = self.getClass()(MockOptions())
+ c.tpc_begin()
+ tid = p64(55)
+ c.tpc_vote(tid)
+ self.assertEqual(data, {})
+
+ def test_tpc_finish(self):
+ from relstorage.tests.fakecache import data
+ c = self.getClass()(MockOptions())
+ c.tpc_finish()
+ count = data['myprefix:commit_count']
+ self.assert_(count > 0)
+ c.tpc_finish()
+ newcount = data['myprefix:commit_count']
+ self.assert_(newcount == count + 1)
+
+ def test_clear_temp(self):
+ c = self.getClass()(MockOptions())
+ c.tpc_begin()
+ c.clear_temp()
+ self.assertEqual(c.queue_contents, None)
+ self.assertEqual(c.queue, None)
+
+ def test_need_poll(self):
+ c = self.getClass()(MockOptions())
+ self.assertTrue(c.need_poll())
+ self.assertFalse(c.need_poll())
+ self.assertFalse(c.need_poll())
+ c.tpc_finish()
+ self.assertTrue(c.need_poll())
+ self.assertFalse(c.need_poll())
+ self.assertFalse(c.need_poll())
+
+
+class MockOptions:
+ cache_module_name = 'relstorage.tests.fakecache'
+ cache_servers = 'host:9999'
+ cache_prefix = 'myprefix'
+
+class MockAdapter:
+ def __init__(self):
+ self.mover = MockObjectMover()
+
+class MockObjectMover:
+ def __init__(self):
+ self.data = {} # {oid_int: (state, tid_int)}
+ def load_current(self, cursor, oid_int):
+ return self.data.get(oid_int, (None, None))
+
+def test_suite():
+ suite = unittest.TestSuite()
+ suite.addTest(unittest.makeSuite(StorageCacheTests))
+ return suite
Modified: relstorage/trunk/relstorage/tests/testmysql.py
===================================================================
--- relstorage/trunk/relstorage/tests/testmysql.py 2009-10-13 22:41:29 UTC (rev 105058)
+++ relstorage/trunk/relstorage/tests/testmysql.py 2009-10-14 08:35:10 UTC (rev 105059)
@@ -85,8 +85,8 @@
if storage is None:
# ZODB < 3.9
storage = db._storage
- self.assertEqual(storage._is_read_only, False)
- self.assertEqual(storage._name, "xyz")
+ self.assertEqual(storage.isReadOnly(), False)
+ self.assertEqual(storage.getName(), "xyz")
adapter = storage._adapter
from relstorage.adapters.mysql import MySQLAdapter
self.assert_(isinstance(adapter, MySQLAdapter))
Modified: relstorage/trunk/relstorage/tests/testoracle.py
===================================================================
--- relstorage/trunk/relstorage/tests/testoracle.py 2009-10-13 22:41:29 UTC (rev 105058)
+++ relstorage/trunk/relstorage/tests/testoracle.py 2009-10-14 08:35:10 UTC (rev 105059)
@@ -90,10 +90,10 @@
try:
storage = getattr(db, 'storage', None)
if storage is None:
- # ZODB < 3.8 and before
+ # ZODB < 3.9
storage = db._storage
- self.assertEqual(storage._is_read_only, False)
- self.assertEqual(storage._name, "xyz")
+ self.assertEqual(storage.isReadOnly(), False)
+ self.assertEqual(storage.getName(), "xyz")
adapter = storage._adapter
from relstorage.adapters.oracle import OracleAdapter
self.assert_(isinstance(adapter, OracleAdapter))
Modified: relstorage/trunk/relstorage/tests/testpostgresql.py
===================================================================
--- relstorage/trunk/relstorage/tests/testpostgresql.py 2009-10-13 22:41:29 UTC (rev 105058)
+++ relstorage/trunk/relstorage/tests/testpostgresql.py 2009-10-14 08:35:10 UTC (rev 105059)
@@ -84,8 +84,8 @@
if storage is None:
# ZODB < 3.9
storage = db._storage
- self.assertEqual(storage._is_read_only, False)
- self.assertEqual(storage._name, "xyz")
+ self.assertEqual(storage.isReadOnly(), False)
+ self.assertEqual(storage.getName(), "xyz")
adapter = storage._adapter
from relstorage.adapters.postgresql import PostgreSQLAdapter
self.assert_(isinstance(adapter, PostgreSQLAdapter))
More information about the checkins
mailing list