[Checkins] SVN: relstorage/trunk/relstorage/ Tidying: moved memcache integration to a separate module (cache.py)

Shane Hathaway shane at hathawaymix.org
Wed Oct 14 04:35:11 EDT 2009


Log message for revision 105059:
  Tidying: moved memcache integration to a separate module (cache.py)
  and removed BaseStorage as a base class.  Also added code that
  adds to memcache on object store.
  

Changed:
  A   relstorage/trunk/relstorage/autotemp.py
  A   relstorage/trunk/relstorage/cache.py
  U   relstorage/trunk/relstorage/storage.py
  U   relstorage/trunk/relstorage/tests/fakecache.py
  U   relstorage/trunk/relstorage/tests/reltestbase.py
  A   relstorage/trunk/relstorage/tests/test_autotemp.py
  A   relstorage/trunk/relstorage/tests/test_cache.py
  U   relstorage/trunk/relstorage/tests/testmysql.py
  U   relstorage/trunk/relstorage/tests/testoracle.py
  U   relstorage/trunk/relstorage/tests/testpostgresql.py

-=-
Added: relstorage/trunk/relstorage/autotemp.py
===================================================================
--- relstorage/trunk/relstorage/autotemp.py	                        (rev 0)
+++ relstorage/trunk/relstorage/autotemp.py	2009-10-14 08:35:10 UTC (rev 105059)
@@ -0,0 +1,52 @@
+##############################################################################
+#
+# Copyright (c) 2009 Zope Foundation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+
+from cStringIO import StringIO
+import tempfile
+
+class AutoTemporaryFile(object):
+    """Initially a StringIO, but becomes a TemporaryFile if it grows large.
+
+    Not thread safe.
+    """
+
+    def __init__(self, threshold=10*1024*1024):
+        self._threshold = threshold
+        self._f = StringIO()
+
+    def read(self, n=None):
+        if n is not None:
+            return self._f.read(n)
+        else:
+            return self._f.read()
+
+    def seek(self, pos, mode=0):
+        self._f.seek(pos, mode)
+
+    def tell(self):
+        return self._f.tell()
+
+    def close(self):
+        self._f.close()
+
+    def write(self, data):
+        threshold = self._threshold
+        if threshold and self._f.tell() + len(data) >= threshold:
+            # convert to TemporaryFile
+            self._threshold = 0
+            f = tempfile.TemporaryFile()
+            f.write(self._f.getvalue())
+            f.seek(self._f.tell())
+            self._f = f
+        self._f.write(data)

Added: relstorage/trunk/relstorage/cache.py
===================================================================
--- relstorage/trunk/relstorage/cache.py	                        (rev 0)
+++ relstorage/trunk/relstorage/cache.py	2009-10-14 08:35:10 UTC (rev 105059)
@@ -0,0 +1,183 @@
+##############################################################################
+#
+# Copyright (c) 2009 Zope Foundation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+
+from relstorage.autotemp import AutoTemporaryFile
+from ZODB.utils import p64
+from ZODB.utils import u64
+import time
+
+
+class StorageCache(object):
+    """RelStorage integration with memcached or similar.
+    """
+
+    # send_limit: max approx. bytes to buffer before sending to the cache
+    send_limit = 1024 * 1024
+
+    def __init__(self, options):
+        module_name = options.cache_module_name
+        module = __import__(module_name, {}, {}, ['Client'])
+        servers = options.cache_servers
+        if isinstance(servers, basestring):
+            servers = servers.split()
+        self.client = module.Client(servers)
+        self.prefix = options.cache_prefix or ''
+
+        # queue is an AutoTemporaryFile during txn commit.
+        self.queue = None
+
+        # queue_contents is a map of {oid: (startpos, endpos)}
+        # during txn commit.
+        self.queue_contents = None
+
+        # commit_count_key is the cache key to poll for changes
+        self.commit_count_key = '%s:commit_count' % self.prefix
+
+        # polled_commit_count contains the last polled value of the
+        # 'commit_count' cache key
+        self.polled_commit_count = 0
+
+    def flush_all(self):
+        """Remove all data from the cache.  Called by RelStorage.zap_all()"""
+        self.client.flush_all()
+
+    def load(self, cursor, oid_int, prev_polled_tid, adapter):
+        """Load the given object from cache if possible.
+
+        Fall back to loading from the database.
+        """
+        client = self.client
+        state_key = '%s:state:%d' % (self.prefix, oid_int)
+        if prev_polled_tid:
+            backptr_key = '%s:back:%d:%d' % (
+                self.prefix, prev_polled_tid, oid_int)
+            v = client.get_multi([state_key, backptr_key])
+            if v is not None:
+                cache_data = v.get(state_key)
+                backptr = v.get(backptr_key)
+            else:
+                cache_data = None
+                backptr = None
+        else:
+            cache_data = client.get(state_key)
+            backptr = None
+
+        state = None
+        if cache_data and len(cache_data) >= 8:
+            # validate the cache result
+            tid = cache_data[:8]
+            tid_int = u64(tid)
+            if tid_int == prev_polled_tid or tid == backptr:
+                # the cached data is current.
+                state = cache_data[8:]
+
+        if state is None:
+            # could not load from cache, so get from the database
+            state, tid_int = adapter.mover.load_current(
+                cursor, oid_int)
+            state = str(state or '')
+            if tid_int is not None:
+                # cache the result
+                to_cache = {}
+                tid = p64(tid_int)
+                new_cache_data = tid + state
+                if new_cache_data != cache_data:
+                    to_cache[state_key] = new_cache_data
+                if prev_polled_tid and prev_polled_tid != tid_int:
+                    to_cache[backptr_key] = tid
+                if to_cache:
+                    client.set_multi(to_cache)
+
+        return state, tid_int
+
+    def tpc_begin(self):
+        """Prepare temp space for objects to cache."""
+        self.queue = AutoTemporaryFile()
+        self.queue_contents = {}
+
+    def store_temp(self, oid_int, state):
+        """Queue an object for caching.
+
+        Typically, we can't actually cache the object yet, because its
+        transaction ID is not yet chosen.
+        """
+        assert isinstance(state, str)
+        queue = self.queue
+        queue.seek(0, 2)  # seek to end
+        startpos = queue.tell()
+        queue.write(state)
+        endpos = queue.tell()
+        self.queue_contents[oid_int] = (startpos, endpos)
+
+    def tpc_vote(self, tid):
+        """Now that the tid is chosen, send queued objects to the cache.
+        """
+        client = self.client
+        assert len(tid) == 8
+        send_size = 0
+        to_send = {}
+
+        items = [
+            (startpos, endpos, oid_int)
+            for (oid_int, (startpos, endpos)) in self.queue_contents.items()
+            ]
+        items.sort()
+
+        for startpos, endpos, oid_int in items:
+            self.queue.seek(startpos)
+            length = endpos - startpos
+            state = self.queue.read(length)
+            if len(state) != length:
+                raise AssertionError("Queued cache data is truncated")
+            cachekey = '%s:state:%d' % (self.prefix, oid_int)
+            to_send[cachekey] = '%s%s' % (tid, state)
+            send_size += length + len(cachekey)
+            if send_size >= self.send_limit:
+                client.set_multi(to_send)
+                to_send.clear()
+                send_size = 0
+
+        if to_send:
+            client.set_multi(to_send)
+
+        self.queue_contents.clear()
+        self.queue.seek(0)
+
+
+    def tpc_finish(self):
+        """Update the commit count in the cache."""
+        client = self.client
+        cachekey = self.commit_count_key
+        if client.incr(cachekey) is None:
+            # Use the current time as an initial commit_count value.
+            client.add(cachekey, int(time.time()))
+            # A concurrent committer could have won the race to set the
+            # initial commit_count.  Increment commit_count so that it
+            # doesn't matter who won.
+            client.incr(cachekey)
+
+    def clear_temp(self):
+        """Clear any transactional data.  Called after txn finish or abort."""
+        self.queue_contents = None
+        if self.queue is not None:
+            self.queue.close()
+            self.queue = None
+
+    def need_poll(self):
+        """Return True if the commit count has changed"""
+        new_commit_count = self.client.get(self.commit_count_key)
+        if new_commit_count != self.polled_commit_count:
+            self.polled_commit_count = new_commit_count
+            return True
+        return False

Modified: relstorage/trunk/relstorage/storage.py
===================================================================
--- relstorage/trunk/relstorage/storage.py	2009-10-13 22:41:29 UTC (rev 105058)
+++ relstorage/trunk/relstorage/storage.py	2009-10-14 08:35:10 UTC (rev 105059)
@@ -17,14 +17,15 @@
 """
 
 from persistent.TimeStamp import TimeStamp
+from relstorage.cache import StorageCache
 from relstorage.options import Options
 from relstorage.util import is_blob_record
-from ZODB.BaseStorage import BaseStorage
 from ZODB.BaseStorage import DataRecord
 from ZODB.BaseStorage import TransactionRecord
 from ZODB import ConflictResolution
 from ZODB import POSException
 from ZODB.POSException import POSKeyError
+from ZODB.UndoLogCompatible import UndoLogCompatible
 from ZODB.utils import p64
 from ZODB.utils import u64
 from zope.interface import implements
@@ -35,6 +36,7 @@
 import os
 import sys
 import tempfile
+import threading
 import time
 import weakref
 import ZODB.interfaces
@@ -70,11 +72,72 @@
 abort_early = os.environ.get('RELSTORAGE_ABORT_EARLY')
 
 
-class RelStorage(BaseStorage,
-                ConflictResolution.ConflictResolvingStorage):
+class RelStorage(
+        UndoLogCompatible,
+        ConflictResolution.ConflictResolvingStorage
+        ):
     """Storage to a relational database, based on invalidation polling"""
     implements(*_relstorage_interfaces)
 
+    _transaction=None # Transaction that is being committed
+    _tstatus=' '      # Transaction status, used for copying data
+    _is_read_only = False
+
+    # load_conn and load_cursor are open most of the time.
+    _load_conn = None
+    _load_cursor = None
+    _load_transaction_open = False
+
+    # store_conn and store_cursor are open during commit,
+    # but not necessarily open at other times.
+    _store_conn = None
+    _store_cursor = None
+
+    # _tid is the current transaction ID being committed; generally
+    # only set after tpc_vote().
+    _tid = None
+
+    # _ltid is the ID of the last transaction committed by this instance.
+    _ltid = None
+
+    # _prepared_txn is the name of the transaction to commit in the
+    # second phase.
+    _prepared_txn = None
+
+    # _closed is True after self.close() is called.  Since close()
+    # can be called from another thread, access to self._closed should
+    # be inside a _lock_acquire()/_lock_release() block.
+    _closed = False
+
+    # _max_stored_oid is the highest OID stored by the current
+    # transaction
+    _max_stored_oid = 0
+
+    # _max_new_oid is the highest OID provided by new_oid()
+    _max_new_oid = 0
+
+    # _cache, if set, is a StorageCache object.
+    _cache = None
+
+    # _prev_polled_tid contains the tid at the previous poll
+    _prev_polled_tid = None
+
+    # _poll_at is the time to force a poll
+    _poll_at = 0
+
+    # If the blob directory is set, fshelper is a filesystem blob
+    # helper.  Otherwise, fshelper is None.
+    fshelper = None
+
+    # _txn_blobs: {oid->filename}; contains blob data for the
+    # currently uncommitted transaction.
+    _txn_blobs = None
+
+    # _batcher: An object that accumulates store operations
+    # so they can be executed in batch (to minimize latency).
+    _batcher = None
+
+
     def __init__(self, adapter, name=None, create=True,
             options=None, **kwoptions):
         self._adapter = adapter
@@ -90,92 +153,39 @@
             name = options.name
             if not name:
                 name = 'RelStorage: %s' % adapter
-        self._name = name
+        self.__name__ = name
 
         self._is_read_only = options.read_only
-        self._cache_client = None
 
         if create:
             self._adapter.schema.prepare()
 
-        # load_conn and load_cursor are open most of the time.
-        self._load_conn = None
-        self._load_cursor = None
-        self._load_transaction_open = False
         self._open_load_connection()
-        # store_conn and store_cursor are open during commit,
-        # but not necessarily open at other times.
-        self._store_conn = None
-        self._store_cursor = None
 
-        BaseStorage.__init__(self, name)
+        self.__lock = threading.RLock()
+        self.__commit_lock = threading.Lock()
+        self._lock_acquire = self.__lock.acquire
+        self._lock_release = self.__lock.release
+        self._commit_lock_acquire = self.__commit_lock.acquire
+        self._commit_lock_release = self.__commit_lock.release
 
-        self._tid = None
-        self._ltid = None
-
-        # _prepared_txn is the name of the transaction to commit in the
-        # second phase.
-        self._prepared_txn = None
-
         # _instances is a list of weak references to storage instances bound
         # to the same database.
         self._instances = []
 
-        # _closed is True after self.close() is called.  Since close()
-        # can be called from another thread, access to self._closed should
-        # be inside a _lock_acquire()/_lock_release() block.
-        self._closed = False
-
-        # _max_stored_oid is the highest OID stored by the current
-        # transaction
-        self._max_stored_oid = 0
-
-        # _max_new_oid is the highest OID provided by new_oid()
-        self._max_new_oid = 0
-
         # _preallocated_oids contains OIDs provided by the database
         # but not yet used.
         self._preallocated_oids = []
 
-        # set _cache_client
         if options.cache_servers:
-            module_name = options.cache_module_name
-            module = __import__(module_name, {}, {}, ['Client'])
-            servers = options.cache_servers
-            if isinstance(servers, basestring):
-                servers = servers.split()
-            self._cache_client = module.Client(servers)
-            self._cache_prefix = options.cache_prefix or ''
-        else:
-            self._cache_client = None
-            self._cache_prefix = ''
+            self._cache = StorageCache(options)
 
-        # _prev_polled_tid contains the tid at the previous poll
-        self._prev_polled_tid = None
-
-        # _polled_commit_count contains the last polled value of the
-        # 'commit_count' cache key
-        self._polled_commit_count = 0
-
-        # _poll_at is the time to poll regardless of commit_count
-        self._poll_at = 0
-
-        # _txn_blobs: {oid->filename}; contains blob data for the
-        # currently uncommitted transaction.
-        self._txn_blobs = None
-
-        # _batcher: An object that accumulates store operations
-        # so they can be executed in batch (to minimize latency).
-        self._batcher = None
-
         if options.blob_dir:
             from ZODB.blob import FilesystemHelper
             self.fshelper = FilesystemHelper(options.blob_dir)
             if create:
                 self.fshelper.create()
                 self.fshelper.checkSecure()
-        else:
-            self.fshelper = None
 
     def _open_load_connection(self):
         """Open the load connection to the database.  Return nothing."""
@@ -260,7 +270,7 @@
         """
         self._adapter.schema.zap_all()
         self._rollback_load_connection()
-        cache = self._cache_client
+        cache = self._cache
         if cache is not None:
             cache.flush_all()
 
@@ -294,7 +304,7 @@
         See ZODB.interfaces.IMVCCStorage.
         """
         adapter = self._adapter.new_instance()
-        other = RelStorage(adapter=adapter, name=self._name,
+        other = RelStorage(adapter=adapter, name=self.__name__,
             create=False, options=self._options)
         self._instances.append(weakref.ref(other))
         return other
@@ -302,10 +312,27 @@
     def __len__(self):
         return self._adapter.stats.get_object_count()
 
+    def sortKey(self):
+        """Return a string that can be used to sort storage instances.
+
+        The key must uniquely identify a storage and must be the same
+        across multiple instantiations of the same storage.
+        """
+        return self.__name__
+
+    def getName(self):
+        return self.__name__
+
     def getSize(self):
         """Return database size in bytes"""
         return self._adapter.stats.get_db_size()
 
+    def registerDB(self, db):
+        pass # we don't care
+
+    def isReadOnly(self):
+        return self._is_read_only
+
     def _log_keyerror(self, oid_int, reason):
         """Log just before raising POSKeyError in load().
 
@@ -356,7 +383,7 @@
 
     def load(self, oid, version):
         oid_int = u64(oid)
-        cache = self._cache_client
+        cache = self._cache
 
         self._lock_acquire()
         try:
@@ -367,51 +394,10 @@
             if cache is None:
                 state, tid_int = self._adapter.mover.load_current(
                     cursor, oid_int)
-                state = str(state or '')
-
             else:
-                # try to load from cache
-                prefix = self._cache_prefix
-                state_key = '%s:state:%d' % (prefix, oid_int)
-                my_tid = self._prev_polled_tid
-                if my_tid:
-                    backptr_key = '%s:back:%d:%d' % (prefix, my_tid, oid_int)
-                    v = cache.get_multi([state_key, backptr_key])
-                    if v is not None:
-                        cache_data = v.get(state_key)
-                        backptr = v.get(backptr_key)
-                    else:
-                        cache_data = None
-                        backptr = None
-                else:
-                    cache_data = cache.get(state_key)
-                    backptr = None
+                state, tid_int = cache.load(
+                    cursor, oid_int, self._prev_polled_tid, self._adapter)
 
-                state = None
-                if cache_data and len(cache_data) >= 8:
-                    # validate the cache result
-                    tid = cache_data[:8]
-                    tid_int = u64(tid)
-                    if tid_int == my_tid or tid == backptr:
-                        # the cached data is current.
-                        state = cache_data[8:]
-
-                if state is None:
-                    # could not load from cache, so get from the database
-                    state, tid_int = self._adapter.mover.load_current(
-                        cursor, oid_int)
-                    state = str(state or '')
-                    if tid_int is not None:
-                        # cache the result
-                        to_cache = {}
-                        tid = p64(tid_int)
-                        new_cache_data = tid + state
-                        if new_cache_data != cache_data:
-                            to_cache[state_key] = new_cache_data
-                        if my_tid and my_tid != tid_int:
-                            to_cache[backptr_key] = tid
-                        if to_cache:
-                            cache.set_multi(to_cache)
         finally:
             self._lock_release()
 
@@ -421,11 +407,16 @@
                 # an object whose creation has been undone.
                 self._log_keyerror(oid_int, "creation has been undone")
                 raise POSKeyError(oid)
+            state = str(state or '')
             return state, p64(tid_int)
         else:
             self._log_keyerror(oid_int, "no tid found")
             raise POSKeyError(oid)
 
+    def getTid(self, oid):
+        state, serial = self.load(oid, '')
+        return serial
+
     def loadEx(self, oid, version):
         # Since we don't support versions, just tack the empty version
         # string onto load's result.
@@ -521,6 +512,9 @@
             # save the data in a temporary table
             adapter.mover.store_temp(
                 cursor, self._batcher, oid_int, prev_tid_int, data)
+            cache = self._cache
+            if cache is not None:
+                cache.store_temp(oid_int, data)
             return None
         finally:
             self._lock_release()
@@ -566,8 +560,8 @@
             self._lock_release()
             self._commit_lock_acquire()
             self._lock_acquire()
+            self._clear_temp()
             self._transaction = transaction
-            self._clear_temp()
 
             user = str(transaction.user)
             desc = str(transaction.description)
@@ -583,9 +577,12 @@
             adapter = self._adapter
             self._batcher = self._adapter.mover.make_batcher(
                 self._store_cursor)
+            cache = self._cache
+            if cache is not None:
+                cache.tpc_begin()
 
             if tid is not None:
-                # get the commit lock and add the transaction now
+                # hold the commit lock and add the transaction now
                 cursor = self._store_cursor
                 packed = (status == 'p')
                 adapter.locker.hold_commit_lock(cursor, ensure_current=True)
@@ -602,6 +599,9 @@
         finally:
             self._lock_release()
 
+    def tpc_transaction(self):
+        return self._transaction
+
     def _prepare_tid(self):
         """Choose a tid for the current transaction.
 
@@ -633,11 +633,19 @@
 
 
     def _clear_temp(self):
+        # Clear all attributes used for transaction commit.
         # It is assumed that self._lock_acquire was called before this
         # method was called.
+        self._transaction = None
+        self._ude = None
+        self._tid = None
         self._prepared_txn = None
         self._max_stored_oid = 0
         self._batcher = None
+        self._txn_blobs = None
+        cache = self._cache
+        if cache is not None:
+            cache.clear_temp()
 
 
     def _finish_store(self):
@@ -649,6 +657,7 @@
         assert self._tid is not None
         cursor = self._store_cursor
         adapter = self._adapter
+        cache = self._cache
 
         # Detect conflicting changes.
         # Try to resolve the conflicts.
@@ -674,6 +683,8 @@
                 self._adapter.mover.replace_temp(
                     cursor, oid_int, prev_tid_int, data)
                 resolved.add(oid)
+                if cache is not None:
+                    cache.store_temp(oid_int, data)
 
         # Move the new states into the permanent table
         tid_int = u64(self._tid)
@@ -690,6 +701,22 @@
         return serials
 
 
+    def tpc_vote(self, transaction):
+        self._lock_acquire()
+        try:
+            if transaction is not self._transaction:
+                return
+            try:
+                return self._vote()
+            except:
+                if abort_early:
+                    # abort early to avoid lockups while running the
+                    # somewhat brittle ZODB test suite
+                    self.tpc_abort(transaction)
+                raise
+        finally:
+            self._lock_release()
+
     def _vote(self):
         """Prepare the transaction for final commit."""
         # This method initiates a two-phase commit process,
@@ -731,22 +758,28 @@
                     ZODB.blob.rename_or_copy_blob(sourcename, targetname)
                     self._txn_blobs[oid] = targetname
 
+        cache = self._cache
+        if cache is not None:
+            cache.tpc_vote(self._tid)
+
         return serials
 
 
-    def tpc_vote(self, transaction):
+    def tpc_finish(self, transaction, f=None):
         self._lock_acquire()
         try:
             if transaction is not self._transaction:
                 return
             try:
-                return self._vote()
-            except:
-                if abort_early:
-                    # abort early to avoid lockups while running the
-                    # somewhat brittle ZODB test suite
-                    self.tpc_abort(transaction)
-                raise
+                try:
+                    if f is not None:
+                        f(self._tid)
+                    u, d, e = self._ude
+                    self._finish(self._tid, u, d, e)
+                finally:
+                    self._clear_temp()
+            finally:
+                self._commit_lock_release()
         finally:
             self._lock_release()
 
@@ -756,70 +789,65 @@
         # It is assumed that self._lock_acquire was called before this
         # method was called.
         assert self._tid is not None
-        try:
-            self._rollback_load_connection()
-            txn = self._prepared_txn
-            assert txn is not None
-            self._adapter.txncontrol.commit_phase2(
-                self._store_conn, self._store_cursor, txn)
-            self._adapter.locker.release_commit_lock(self._store_cursor)
-            cache = self._cache_client
-            if cache is not None:
-                cachekey = '%s:commit_count' % self._cache_prefix
-                if cache.incr(cachekey) is None:
-                    # Use the current time as an initial commit_count value.
-                    cache.add(cachekey, int(time.time()))
-                    # A concurrent committer could have won the race to set the
-                    # initial commit_count.  Increment commit_count so that it
-                    # doesn't matter who won.
-                    cache.incr(cachekey)
-            self._ltid = self._tid
+        self._rollback_load_connection()
+        txn = self._prepared_txn
+        assert txn is not None
+        self._adapter.txncontrol.commit_phase2(
+            self._store_conn, self._store_cursor, txn)
+        self._adapter.locker.release_commit_lock(self._store_cursor)
+        cache = self._cache
+        if cache is not None:
+            cache.tpc_finish()
+        self._ltid = self._tid
 
-            #if self._txn_blobs and not self._adapter.keep_history:
-                ## For each blob just committed, get the name of
-                ## one earlier revision (if any) and write the
-                ## name of the file to a log.  At pack time,
-                ## all the files in the log will be deleted and
-                ## the log will be cleared.
-                #for oid, filename in self._txn_blobs.iteritems():
-                    #dirname, current_name = os.path.split(filename)
-                    #names = os.listdir(dirname)
-                    #names.sort()
-                    #if current_name in names:
-                        #i = names.index(current_name)
-                        #if i > 0:
-                        #    to_delete = os.path.join(dirname, names[i-1])
-                        #    log.write('%s\n') % to_delete
+        #if self._txn_blobs and not self._adapter.keep_history:
+            ## For each blob just committed, get the name of
+            ## one earlier revision (if any) and write the
+            ## name of the file to a log.  At pack time,
+            ## all the files in the log will be deleted and
+            ## the log will be cleared.
+            #for oid, filename in self._txn_blobs.iteritems():
+                #dirname, current_name = os.path.split(filename)
+                #names = os.listdir(dirname)
+                #names.sort()
+                #if current_name in names:
+                    #i = names.index(current_name)
+                    #if i > 0:
+                    #    to_delete = os.path.join(dirname, names[i-1])
+                    #    log.write('%s\n') % to_delete
 
+
+    def tpc_abort(self, transaction):
+        self._lock_acquire()
+        try:
+            if transaction is not self._transaction:
+                return
+            try:
+                try:
+                    self._abort()
+                finally:
+                    self._clear_temp()
+            finally:
+                self._commit_lock_release()
         finally:
-            self._txn_blobs = None
-            self._prepared_txn = None
-            self._tid = None
-            self._transaction = None
-            self._batcher = None
+            self._lock_release()
 
     def _abort(self):
         # the lock is held here
-        try:
-            self._rollback_load_connection()
-            if self._store_cursor is not None:
-                self._adapter.txncontrol.abort(
-                    self._store_conn, self._store_cursor, self._prepared_txn)
-                self._adapter.locker.release_commit_lock(self._store_cursor)
-            if self._txn_blobs:
-                for oid, filename in self._txn_blobs.iteritems():
-                    if os.path.exists(filename):
-                        ZODB.blob.remove_committed(filename)
-                        dirname = os.path.dirname(filename)
-                        if not os.listdir(dirname):
-                            ZODB.blob.remove_committed_dir(dirname)
-        finally:
-            self._txn_blobs = None
-            self._prepared_txn = None
-            self._tid = None
-            self._transaction = None
-            self._batcher = None
+        self._rollback_load_connection()
+        if self._store_cursor is not None:
+            self._adapter.txncontrol.abort(
+                self._store_conn, self._store_cursor, self._prepared_txn)
+            self._adapter.locker.release_commit_lock(self._store_cursor)
+        if self._txn_blobs:
+            for oid, filename in self._txn_blobs.iteritems():
+                if os.path.exists(filename):
+                    ZODB.blob.remove_committed(filename)
+                    dirname = os.path.dirname(filename)
+                    if not os.listdir(dirname):
+                        ZODB.blob.remove_committed_dir(dirname)
 
+
     def lastTransaction(self):
         return self._ltid
 
@@ -1119,13 +1147,10 @@
         """Return true if polling is needed"""
         now = time.time()
 
-        cache = self._cache_client
+        cache = self._cache
         if cache is not None:
-            new_commit_count = cache.get(
-                '%s:commit_count' % self._cache_prefix)
-            if new_commit_count != self._polled_commit_count:
+            if cache.need_poll():
                 # There is new data ready to poll
-                self._polled_commit_count = new_commit_count
                 self._poll_at = now
                 return True
 

Modified: relstorage/trunk/relstorage/tests/fakecache.py
===================================================================
--- relstorage/trunk/relstorage/tests/fakecache.py	2009-10-13 22:41:29 UTC (rev 105058)
+++ relstorage/trunk/relstorage/tests/fakecache.py	2009-10-14 08:35:10 UTC (rev 105059)
@@ -44,3 +44,7 @@
         value = int(value) + 1
         data[key] = value
         return value
+
+    def flush_all(self):
+        data.clear()
+

Modified: relstorage/trunk/relstorage/tests/reltestbase.py
===================================================================
--- relstorage/trunk/relstorage/tests/reltestbase.py	2009-10-13 22:41:29 UTC (rev 105058)
+++ relstorage/trunk/relstorage/tests/reltestbase.py	2009-10-14 08:35:10 UTC (rev 105059)
@@ -224,7 +224,7 @@
         self.assertEqual(len(got), len(data))
         self.assertEqual(got, data)
 
-    def checkLoadFromCache(self):
+    def checkUseCache(self):
         # Store an object, cache it, then retrieve it from the cache
         self._storage._options.cache_servers = 'x:1 y:2'
         self._storage._options.cache_module_name = fakecache.__name__
@@ -233,7 +233,7 @@
         db = DB(self._storage)
         try:
             c1 = db.open()
-            self.assertEqual(c1._storage._cache_client.servers, ['x:1', 'y:2'])
+            self.assert_(c1._storage._cache.client.servers, ['x:1', 'y:2'])
             fakecache.data.clear()
             r1 = c1.root()
             # the root state should now be cached
@@ -243,11 +243,9 @@
             transaction.commit()
             self.assertTrue('zzz:commit_count' in fakecache.data)
             self.assertEqual(sorted(fakecache.data.keys()),
-                ['zzz:commit_count', 'zzz:state:0'])
-            oid = r1['alpha']._p_oid
-            self.assertEqual(sorted(fakecache.data.keys()),
-                ['zzz:commit_count', 'zzz:state:0'])
+                ['zzz:commit_count', 'zzz:state:0', 'zzz:state:1'])
 
+            oid = r1['alpha']._p_oid
             got, serial = c1._storage.load(oid, '')
             # another state should now be cached
             self.assertEqual(len(fakecache.data.keys()), 3)

Added: relstorage/trunk/relstorage/tests/test_autotemp.py
===================================================================
--- relstorage/trunk/relstorage/tests/test_autotemp.py	                        (rev 0)
+++ relstorage/trunk/relstorage/tests/test_autotemp.py	2009-10-14 08:35:10 UTC (rev 105059)
@@ -0,0 +1,76 @@
+##############################################################################
+#
+# Copyright (c) 2009 Zope Foundation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+
+import unittest
+
+class AutoTemporaryFileTests(unittest.TestCase):
+
+    def getClass(self):
+        from relstorage.autotemp import AutoTemporaryFile
+        return AutoTemporaryFile
+
+    def test_defaults(self):
+        t = self.getClass()()
+        self.assertEqual(t._threshold, 10*1024*1024)
+
+    def test_write_and_read_limited(self):
+        t = self.getClass()()
+        t.write('abc')
+        self.assertEqual(t.tell(), 3)
+        t.seek(0)
+        self.assertEqual(t.tell(), 0)
+        self.assertEqual(t.read(2), 'ab')
+        self.assertEqual(t.tell(), 2)
+
+    def test_write_and_read_unlimited(self):
+        t = self.getClass()()
+        t.write('abc')
+        t.seek(0)
+        self.assertEqual(t.read(), 'abc')
+
+    def test_convert_to_temporary_file(self):
+        t = self.getClass()(threshold=4)
+        try:
+            self.assertEqual(t._threshold, 4)
+            t.write('abc')
+            self.assertEqual(t._threshold, 4)
+            t.write('d')
+            self.assertEqual(t._threshold, 0)
+            t.write('e')
+            t.seek(0)
+            self.assertEqual(t.read(), 'abcde')
+        finally:
+            t.close()
+
+    def test_overwrite_during_conversion(self):
+        t = self.getClass()(threshold=4)
+        try:
+            t.write('abc')
+            self.assertEqual(t._threshold, 4)
+            t.seek(1)
+            t.write('0')
+            self.assertEqual(t._threshold, 4)
+            t.write('1')
+            self.assertEqual(t._threshold, 4)
+            t.write('23')
+            self.assertEqual(t._threshold, 0)
+            t.seek(0)
+            self.assertEqual(t.read(), 'a0123')
+        finally:
+            t.close()
+
+def test_suite():
+    suite = unittest.TestSuite()
+    suite.addTest(unittest.makeSuite(AutoTemporaryFileTests))
+    return suite

Added: relstorage/trunk/relstorage/tests/test_cache.py
===================================================================
--- relstorage/trunk/relstorage/tests/test_cache.py	                        (rev 0)
+++ relstorage/trunk/relstorage/tests/test_cache.py	2009-10-14 08:35:10 UTC (rev 105059)
@@ -0,0 +1,218 @@
+##############################################################################
+#
+# Copyright (c) 2009 Zope Foundation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+
+import unittest
+
+class StorageCacheTests(unittest.TestCase):
+
+    def setUp(self):
+        from relstorage.tests.fakecache import data
+        data.clear()
+
+    tearDown = setUp
+
+    def getClass(self):
+        from relstorage.cache import StorageCache
+        return StorageCache
+
+    def test_ctor(self):
+        from relstorage.tests.fakecache import Client
+        c = self.getClass()(MockOptions())
+        self.assert_(isinstance(c.client, Client))
+        self.assertEqual(c.client.servers, ['host:9999'])
+        self.assertEqual(c.prefix, 'myprefix')
+
+    def test_flush_all(self):
+        from relstorage.tests.fakecache import data
+        data.clear()
+        c = self.getClass()(MockOptions())
+        data['x'] = '1'
+        c.flush_all()
+        self.assert_(not data)
+
+    def test_load_from_current_transaction(self):
+        from relstorage.tests.fakecache import data
+        from ZODB.utils import p64
+        c = self.getClass()(MockOptions())
+        tid_int = 50
+        tid = p64(tid_int)
+        data['myprefix:state:2'] = tid + 'STATE'
+        state, got_tid_int = c.load(None, 2, tid_int, None)
+        self.assertEqual(state, 'STATE')
+        self.assertEqual(got_tid_int, tid_int)
+
+    def test_load_from_backptr(self):
+        from relstorage.tests.fakecache import data
+        from ZODB.utils import p64
+        c = self.getClass()(MockOptions())
+        tid_int = 50
+        tid = p64(tid_int)
+        data['myprefix:state:2'] = tid + 'STATE'
+        data['myprefix:back:60:2'] = tid
+        state, got_tid_int = c.load(None, 2, 60, None)
+        self.assertEqual(state, 'STATE')
+        self.assertEqual(got_tid_int, tid_int)
+
+    def test_load_backptr_missing(self):
+        from relstorage.tests.fakecache import data
+        from ZODB.utils import p64
+        c = self.getClass()(MockOptions())
+        tid_int = 50
+        tid = p64(tid_int)
+        data['myprefix:state:2'] = tid + 'STATE'
+        adapter = MockAdapter()
+        adapter.mover.data[2] = ('STATE', 50)
+        state, got_tid_int = c.load(None, 2, 60, adapter)
+        self.assertEqual(state, 'STATE')
+        self.assertEqual(got_tid_int, 50)
+        self.assertEqual(data, {
+            'myprefix:state:2': tid + 'STATE',
+            'myprefix:back:60:2': tid,
+            })
+
+    def test_load_state_expired(self):
+        from relstorage.tests.fakecache import data
+        from ZODB.utils import p64
+        c = self.getClass()(MockOptions())
+        tid_int = 50
+        tid = p64(tid_int)
+        data['myprefix:state:2'] = tid + 'STATE'
+        adapter = MockAdapter()
+        adapter.mover.data[2] = ('NEWSTATE', 55)
+        state, got_tid_int = c.load(None, 2, 60, adapter)
+        self.assertEqual(state, 'NEWSTATE')
+        self.assertEqual(got_tid_int, 55)
+        self.assertEqual(data, {
+            'myprefix:state:2': p64(55) + 'NEWSTATE',
+            'myprefix:back:60:2': p64(55),
+            })
+
+    def test_load_state_missing(self):
+        from relstorage.tests.fakecache import data
+        from ZODB.utils import p64
+        c = self.getClass()(MockOptions())
+        tid_int = 50
+        tid = p64(tid_int)
+        adapter = MockAdapter()
+        adapter.mover.data[2] = ('NEWSTATE', 55)
+        state, got_tid_int = c.load(None, 2, 60, adapter)
+        self.assertEqual(state, 'NEWSTATE')
+        self.assertEqual(got_tid_int, 55)
+        self.assertEqual(data, {
+            'myprefix:state:2': p64(55) + 'NEWSTATE',
+            'myprefix:back:60:2': p64(55),
+            })
+
+    def test_load_no_object(self):
+        c = self.getClass()(MockOptions())
+        adapter = MockAdapter()
+        state, got_tid_int = c.load(None, 2, 60, adapter)
+        self.assertEqual(state, '')
+        self.assertEqual(got_tid_int, None)
+
+    def test_store_temp(self):
+        c = self.getClass()(MockOptions())
+        c.tpc_begin()
+        c.store_temp(2, 'abc')
+        c.store_temp(1, 'def')
+        c.store_temp(2, 'ghi')
+        self.assertEqual(c.queue_contents, {1: (3, 6), 2: (6, 9)})
+        c.queue.seek(0)
+        self.assertEqual(c.queue.read(), 'abcdefghi')
+
+    def test_tpc_vote_small(self):
+        from relstorage.tests.fakecache import data
+        from ZODB.utils import p64
+        c = self.getClass()(MockOptions())
+        c.tpc_begin()
+        c.store_temp(2, 'abc')
+        c.store_temp(3, 'def')
+        tid = p64(55)
+        c.tpc_vote(tid)
+        self.assertEqual(data, {
+            'myprefix:state:2': tid + 'abc',
+            'myprefix:state:3': tid + 'def',
+            })
+
+    def test_tpc_vote_large(self):
+        from relstorage.tests.fakecache import data
+        from ZODB.utils import p64
+        c = self.getClass()(MockOptions())
+        c.send_limit = 100
+        c.tpc_begin()
+        c.store_temp(2, 'abc')
+        c.store_temp(3, 'def' * 100)
+        tid = p64(55)
+        c.tpc_vote(tid)
+        self.assertEqual(data, {
+            'myprefix:state:2': tid + 'abc',
+            'myprefix:state:3': tid + ('def' * 100),
+            })
+
+    def test_tpc_vote_none(self):
+        from relstorage.tests.fakecache import data
+        from ZODB.utils import p64
+        c = self.getClass()(MockOptions())
+        c.tpc_begin()
+        tid = p64(55)
+        c.tpc_vote(tid)
+        self.assertEqual(data, {})
+
+    def test_tpc_finish(self):
+        from relstorage.tests.fakecache import data
+        c = self.getClass()(MockOptions())
+        c.tpc_finish()
+        count = data['myprefix:commit_count']
+        self.assert_(count > 0)
+        c.tpc_finish()
+        newcount = data['myprefix:commit_count']
+        self.assert_(newcount == count + 1)
+
+    def test_clear_temp(self):
+        c = self.getClass()(MockOptions())
+        c.tpc_begin()
+        c.clear_temp()
+        self.assertEqual(c.queue_contents, None)
+        self.assertEqual(c.queue, None)
+
+    def test_need_poll(self):
+        c = self.getClass()(MockOptions())
+        self.assertTrue(c.need_poll())
+        self.assertFalse(c.need_poll())
+        self.assertFalse(c.need_poll())
+        c.tpc_finish()
+        self.assertTrue(c.need_poll())
+        self.assertFalse(c.need_poll())
+        self.assertFalse(c.need_poll())
+
+
+class MockOptions:
+    cache_module_name = 'relstorage.tests.fakecache'
+    cache_servers = 'host:9999'
+    cache_prefix = 'myprefix'
+
+class MockAdapter:
+    def __init__(self):
+        self.mover = MockObjectMover()
+
+class MockObjectMover:
+    def __init__(self):
+        self.data = {}  # {oid_int: (state, tid_int)}
+    def load_current(self, cursor, oid_int):
+        return self.data.get(oid_int, (None, None))
+
+def test_suite():
+    suite = unittest.TestSuite()
+    suite.addTest(unittest.makeSuite(StorageCacheTests))
+    return suite

Modified: relstorage/trunk/relstorage/tests/testmysql.py
===================================================================
--- relstorage/trunk/relstorage/tests/testmysql.py	2009-10-13 22:41:29 UTC (rev 105058)
+++ relstorage/trunk/relstorage/tests/testmysql.py	2009-10-14 08:35:10 UTC (rev 105059)
@@ -85,8 +85,8 @@
             if storage is None:
                 # ZODB < 3.9
                 storage = db._storage
-            self.assertEqual(storage._is_read_only, False)
-            self.assertEqual(storage._name, "xyz")
+            self.assertEqual(storage.isReadOnly(), False)
+            self.assertEqual(storage.getName(), "xyz")
             adapter = storage._adapter
             from relstorage.adapters.mysql import MySQLAdapter
             self.assert_(isinstance(adapter, MySQLAdapter))

Modified: relstorage/trunk/relstorage/tests/testoracle.py
===================================================================
--- relstorage/trunk/relstorage/tests/testoracle.py	2009-10-13 22:41:29 UTC (rev 105058)
+++ relstorage/trunk/relstorage/tests/testoracle.py	2009-10-14 08:35:10 UTC (rev 105059)
@@ -90,10 +90,10 @@
             try:
                 storage = getattr(db, 'storage', None)
                 if storage is None:
-                    # ZODB < 3.8 and before
+                    # ZODB < 3.9
                     storage = db._storage
-                self.assertEqual(storage._is_read_only, False)
-                self.assertEqual(storage._name, "xyz")
+                self.assertEqual(storage.isReadOnly(), False)
+                self.assertEqual(storage.getName(), "xyz")
                 adapter = storage._adapter
                 from relstorage.adapters.oracle import OracleAdapter
                 self.assert_(isinstance(adapter, OracleAdapter))

Modified: relstorage/trunk/relstorage/tests/testpostgresql.py
===================================================================
--- relstorage/trunk/relstorage/tests/testpostgresql.py	2009-10-13 22:41:29 UTC (rev 105058)
+++ relstorage/trunk/relstorage/tests/testpostgresql.py	2009-10-14 08:35:10 UTC (rev 105059)
@@ -84,8 +84,8 @@
             if storage is None:
                 # ZODB < 3.9
                 storage = db._storage
-            self.assertEqual(storage._is_read_only, False)
-            self.assertEqual(storage._name, "xyz")
+            self.assertEqual(storage.isReadOnly(), False)
+            self.assertEqual(storage.getName(), "xyz")
             adapter = storage._adapter
             from relstorage.adapters.postgresql import PostgreSQLAdapter
             self.assert_(isinstance(adapter, PostgreSQLAdapter))



More information about the checkins mailing list