[Checkins] SVN: relstorage/branches/1.1/ Added optional memcache integration. This is useful when the connection

Shane Hathaway shane at hathawaymix.org
Sat Jul 19 17:55:27 EDT 2008


Log message for revision 88623:
  Added optional memcache integration.  This is useful when the connection
  to the relational database has high latency.
  
  Also made it possible to set the pack and memcache options in zope.conf.
  

Changed:
  U   relstorage/branches/1.1/CHANGES.txt
  U   relstorage/branches/1.1/relstorage/adapters/common.py
  U   relstorage/branches/1.1/relstorage/adapters/mysql.py
  U   relstorage/branches/1.1/relstorage/adapters/oracle.py
  U   relstorage/branches/1.1/relstorage/adapters/postgresql.py
  U   relstorage/branches/1.1/relstorage/component.xml
  U   relstorage/branches/1.1/relstorage/config.py
  U   relstorage/branches/1.1/relstorage/relstorage.py
  U   relstorage/branches/1.1/relstorage/tests/reltestbase.py

-=-
Modified: relstorage/branches/1.1/CHANGES.txt
===================================================================
--- relstorage/branches/1.1/CHANGES.txt	2008-07-19 21:33:27 UTC (rev 88622)
+++ relstorage/branches/1.1/CHANGES.txt	2008-07-19 21:55:26 UTC (rev 88623)
@@ -1,4 +1,12 @@
 
+RelStorage 1.1c1
+
+- Added optional memcache integration.  This is useful when the connection
+  to the relational database has high latency.
+
+- Made it possible to set the pack and memcache options in zope.conf.
+
+
 RelStorage 1.1b2
 
 - Made the MySQL locks database-specific rather than server-wide.  This is

Modified: relstorage/branches/1.1/relstorage/adapters/common.py
===================================================================
--- relstorage/branches/1.1/relstorage/adapters/common.py	2008-07-19 21:33:27 UTC (rev 88622)
+++ relstorage/branches/1.1/relstorage/adapters/common.py	2008-07-19 21:55:26 UTC (rev 88623)
@@ -392,7 +392,7 @@
         return self.open()
 
 
-    def pre_pack(self, pack_tid, get_references, gc):
+    def pre_pack(self, pack_tid, get_references, options):
         """Decide what to pack.
 
         tid specifies the most recent transaction to pack.
@@ -400,15 +400,16 @@
         get_references is a function that accepts a pickled state and
         returns a set of OIDs that state refers to.
 
-        gc is a boolean indicating whether to run garbage collection.
-        If gc is false, at least one revision of every object is kept,
-        even if nothing refers to it.  Packing with gc disabled can be
+        options is an instance of relstorage.Options.
+        The options.pack_gc flag indicates whether to run garbage collection.
+        If pack_gc is false, at least one revision of every object is kept,
+        even if nothing refers to it.  Packing with pack_gc disabled can be
         much faster.
         """
         conn, cursor = self.open_for_pre_pack()
         try:
             try:
-                if gc:
+                if options.pack_gc:
                     log.info("pre_pack: start with gc enabled")
                     self._pre_pack_with_gc(
                         conn, cursor, pack_tid, get_references)
@@ -423,7 +424,8 @@
                 self._run_script_stmt(cursor, stmt)
                 to_remove = 0
 
-                if gc:
+                if options.pack_gc:
+                    # Pack objects with the keep flag set to false.
                     stmt = """
                     INSERT INTO pack_state (tid, zoid)
                     SELECT tid, zoid
@@ -437,6 +439,7 @@
                         pack_tid})
                     to_remove += cursor.rowcount
 
+                # Pack object states with the keep flag set to true.
                 stmt = """
                 INSERT INTO pack_state (tid, zoid)
                 SELECT tid, zoid
@@ -724,8 +727,7 @@
         pass
 
 
-    def pack(self, pack_tid, batch_timeout=5.0, delay_ratio=1.0,
-            max_delay=20.0):
+    def pack(self, pack_tid, options):
         """Pack.  Requires populated pack tables."""
 
         # Read committed mode is sufficient.
@@ -757,17 +759,21 @@
                 for tid, packed, has_removable in tid_rows:
                     self._pack_transaction(
                         cursor, pack_tid, tid, packed, has_removable)
-                    if time.time() >= start + batch_timeout:
+                    if time.time() >= start + options.pack_batch_timeout:
                         # commit the work done so far and release the
                         # commit lock for a short time
                         conn.commit()
                         self._release_commit_lock(cursor)
-                        # Add a delay.
+                        # Add a delay based on the configured duty cycle.
                         elapsed = time.time() - start
-                        delay = min(max_delay, elapsed * delay_ratio)
-                        if delay > 0:
-                            log.debug('pack: sleeping %.4g second(s)', delay)
-                            time.sleep(delay)
+                        duty_cycle = options.pack_duty_cycle
+                        if duty_cycle > 0.0 and duty_cycle < 1.0:
+                            delay = min(options.pack_max_delay,
+                                elapsed * (1.0 / duty_cycle - 1.0))
+                            if delay > 0:
+                                log.debug('pack: sleeping %.4g second(s)',
+                                    delay)
+                                time.sleep(delay)
                         self._hold_commit_lock(cursor)
                         start = time.time()
 

Modified: relstorage/branches/1.1/relstorage/adapters/mysql.py
===================================================================
--- relstorage/branches/1.1/relstorage/adapters/mysql.py	2008-07-19 21:33:27 UTC (rev 88622)
+++ relstorage/branches/1.1/relstorage/adapters/mysql.py	2008-07-19 21:55:26 UTC (rev 88623)
@@ -280,6 +280,21 @@
         # do later
         return 0
 
+    def get_current_tid(self, cursor, oid):
+        """Returns the current integer tid for an object.
+
+        oid is an integer.  Returns None if object does not exist.
+        """
+        cursor.execute("""
+        SELECT tid
+        FROM current_object
+        WHERE zoid = %s
+        """, (oid,))
+        if cursor.rowcount:
+            assert cursor.rowcount == 1
+            return cursor.fetchone()[0]
+        return None
+
     def load_current(self, cursor, oid):
         """Returns the current pickle and integer tid for an object.
 

Modified: relstorage/branches/1.1/relstorage/adapters/oracle.py
===================================================================
--- relstorage/branches/1.1/relstorage/adapters/oracle.py	2008-07-19 21:33:27 UTC (rev 88622)
+++ relstorage/branches/1.1/relstorage/adapters/oracle.py	2008-07-19 21:55:26 UTC (rev 88623)
@@ -351,6 +351,20 @@
         # May not be possible without access to the dba_* objects
         return 0
 
+    def get_current_tid(self, cursor, oid):
+        """Returns the current integer tid for an object.
+
+        oid is an integer.  Returns None if object does not exist.
+        """
+        cursor.execute("""
+        SELECT tid
+        FROM current_object
+        WHERE zoid = :1
+        """, (oid,))
+        for (tid,) in cursor:
+            return tid
+        return None
+
     def load_current(self, cursor, oid):
         """Returns the current pickle and integer tid for an object.
 

Modified: relstorage/branches/1.1/relstorage/adapters/postgresql.py
===================================================================
--- relstorage/branches/1.1/relstorage/adapters/postgresql.py	2008-07-19 21:33:27 UTC (rev 88622)
+++ relstorage/branches/1.1/relstorage/adapters/postgresql.py	2008-07-19 21:55:26 UTC (rev 88623)
@@ -263,6 +263,21 @@
         finally:
             self.close(conn, cursor)
 
+    def get_current_tid(self, cursor, oid):
+        """Returns the current integer tid for an object.
+
+        oid is an integer.  Returns None if object does not exist.
+        """
+        cursor.execute("""
+        SELECT tid
+        FROM current_object
+        WHERE zoid = %s
+        """, (oid,))
+        if cursor.rowcount:
+            assert cursor.rowcount == 1
+            return cursor.fetchone()[0]
+        return None
+
     def load_current(self, cursor, oid):
         """Returns the current pickle and integer tid for an object.
 

Modified: relstorage/branches/1.1/relstorage/component.xml
===================================================================
--- relstorage/branches/1.1/relstorage/component.xml	2008-07-19 21:33:27 UTC (rev 88622)
+++ relstorage/branches/1.1/relstorage/component.xml	2008-07-19 21:55:26 UTC (rev 88623)
@@ -55,6 +55,59 @@
         inter-database references never break.
       </description>
     </key>
+    <key name="pack-batch-timeout" datatype="float" required="no">
+      <description>
+        Packing occurs in batches of transactions; this specifies the
+        timeout in seconds for each batch.  Note that some database
+        configurations have unpredictable I/O performance
+        and might stall much longer than the timeout.
+        The default timeout is 5.0 seconds.
+      </description>
+    </key>
+    <key name="pack-duty-cycle" datatype="float" required="no">
+      <description>
+        After each batch, the pack code pauses for a time to
+        allow concurrent transactions to commit.  The pack-duty-cycle
+        specifies what fraction of time should be spent on packing.
+        For example, if the duty cycle is 0.75, then 75% of the time
+        will be spent packing: a 6 second pack batch
+        will be followed by a 2 second delay.  The duty cycle should
+        be greater than 0.0 and less than or equal to 1.0.  Specify
+        1.0 for no delay between batches.
+
+        The default is 0.5.  Raise it to finish packing faster; lower it
+        to reduce the effect of packing on transaction commit performance.
+      </description>
+    </key>
+    <key name="pack-max-delay" datatype="float" required="no">
+      <description>
+        This specifies a maximum delay between pack batches.  Sometimes
+        the database takes an extra long time to finish a pack batch; at
+        those times it is useful to cap the delay imposed by the
+        pack-duty-cycle.  The default is 20 seconds.
+      </description>
+    </key>
+    <key name="cache-servers" datatype="string" required="no">
+      <description>
+        Specifies a list of memcache servers.  Enabling memcache integration
+        is useful if the connection to the relational database has high
+        latency and the connection to memcache has significantly lower
+        latency.  On the other hand, if the connection to the relational
+        database already has low latency, memcache integration may actually
+        hurt overall performance.
+
+        Provide a list of host:port pairs, separated by whitespace.
+        "127.0.0.1:11211" is a common setting.  The default is to disable
+        memcache integration.
+      </description>
+    </key>
+    <key name="cache-module-name" datatype="string" required="no">
+      <description>
+        Specifies which Python memcache module to use.  The default is
+        "memcache", a pure Python module.  An alternative module is
+        "cmemcache".  This setting has no effect unless cache-servers is set.
+      </description>
+    </key>
   </sectiontype>
 
   <sectiontype name="postgresql" implements="relstorage.adapter"

Modified: relstorage/branches/1.1/relstorage/config.py
===================================================================
--- relstorage/branches/1.1/relstorage/config.py	2008-07-19 21:33:27 UTC (rev 88622)
+++ relstorage/branches/1.1/relstorage/config.py	2008-07-19 21:55:26 UTC (rev 88623)
@@ -15,7 +15,7 @@
 
 from ZODB.config import BaseConfig
 
-from relstorage import RelStorage
+from relstorage import RelStorage, Options
 
 
 class RelStorageFactory(BaseConfig):
@@ -23,9 +23,13 @@
     def open(self):
         config = self.config
         adapter = config.adapter.open()
+        options = Options()
+        for key in options.__dict__.keys():
+            value = getattr(config, key, None)
+            if value is not None:
+                setattr(options, key, value)
         return RelStorage(adapter, name=config.name, create=config.create,
-            read_only=config.read_only, poll_interval=config.poll_interval,
-            pack_gc=config.pack_gc)
+            read_only=config.read_only, options=options)
 
 
 class PostgreSQLAdapterFactory(BaseConfig):

Modified: relstorage/branches/1.1/relstorage/relstorage.py
===================================================================
--- relstorage/branches/1.1/relstorage/relstorage.py	2008-07-19 21:33:27 UTC (rev 88622)
+++ relstorage/branches/1.1/relstorage/relstorage.py	2008-07-19 21:55:26 UTC (rev 88623)
@@ -43,15 +43,17 @@
     """Storage to a relational database, based on invalidation polling"""
 
     def __init__(self, adapter, name=None, create=True,
-            read_only=False, poll_interval=0, pack_gc=True):
+            read_only=False, options=None):
         if name is None:
             name = 'RelStorage on %s' % adapter.__class__.__name__
 
         self._adapter = adapter
         self._name = name
         self._is_read_only = read_only
-        self._poll_interval = poll_interval
-        self._pack_gc = pack_gc
+        if options is None:
+            options = Options()
+        self._options = options
+        self._cache_client = None
 
         if create:
             self._adapter.prepare_schema()
@@ -91,7 +93,18 @@
         # _max_new_oid is the highest OID provided by new_oid()
         self._max_new_oid = 0
 
+        # set _cache_client
+        if options.cache_servers:
+            module_name = options.cache_module_name
+            module = __import__(module_name, {}, {}, ['Client'])
+            servers = options.cache_servers
+            if isinstance(servers, basestring):
+                servers = servers.split()
+            self._cache_client = module.Client(servers)
+        else:
+            self._cache_client = None
 
+
     def _open_load_connection(self):
         """Open the load connection to the database.  Return nothing."""
         conn, cursor = self._adapter.open_for_load()
@@ -173,6 +186,9 @@
         """
         self._adapter.zap_all()
         self._rollback_load_connection()
+        cache = self._cache_client
+        if cache is not None:
+            cache.flush_all()
 
     def close(self):
         """Close the connections to the database."""
@@ -210,15 +226,49 @@
         """Return database size in bytes"""
         return self._adapter.get_db_size()
 
+    def _get_oid_cache_key(self, oid_int):
+        """Return the cache key for finding the current tid.
+
+        This is overridden by BoundRelStorage.
+        """
+        return None
+
     def load(self, oid, version):
+        oid_int = u64(oid)
+        cache = self._cache_client
+
         self._lock_acquire()
         try:
             if not self._load_transaction_open:
                 self._restart_load()
             cursor = self._load_cursor
-            state, tid_int = self._adapter.load_current(cursor, u64(oid))
+            if cache is None:
+                state, tid_int = self._adapter.load_current(cursor, oid_int)
+            else:
+                # get tid_int from the cache or the database
+                cachekey = self._get_oid_cache_key(oid_int)
+                if cachekey:
+                    tid_int = cache.get(cachekey)
+                if not cachekey or not tid_int:
+                    tid_int = self._adapter.get_current_tid(
+                        cursor, oid_int)
+                if tid_int is None:
+                    raise KeyError(oid)
+                if cachekey:
+                    cache.set(cachekey, tid_int)
+
+                # get state from the cache or the database
+                cachekey = 'state:%d:%d' % (oid_int, tid_int)
+                state = cache.get(cachekey)
+                if not state:
+                    state = self._adapter.load_revision(
+                        cursor, oid_int, tid_int)
+                    if state:
+                        state = str(state)
+                        cache.set(cachekey, state)
         finally:
             self._lock_release()
+
         if tid_int is not None:
             if state:
                 state = str(state)
@@ -237,6 +287,15 @@
 
     def loadSerial(self, oid, serial):
         """Load a specific revision of an object"""
+        oid_int = u64(oid)
+        tid_int = u64(serial)
+        cache = self._cache_client
+        if cache is not None:
+            cachekey = 'state:%d:%d' % (oid_int, tid_int)
+            state = cache.get(cachekey)
+            if state:
+                return state
+
         self._lock_acquire()
         try:
             if self._store_cursor is not None:
@@ -247,17 +306,20 @@
                 if not self._load_transaction_open:
                     self._restart_load()
                 cursor = self._load_cursor
-            state = self._adapter.load_revision(cursor, u64(oid), u64(serial))
-            if state is not None:
-                state = str(state)
-                if not state:
-                    raise POSKeyError(oid)
-                return state
-            else:
-                raise KeyError(oid)
+            state = self._adapter.load_revision(cursor, oid_int, tid_int)
         finally:
             self._lock_release()
 
+        if state is not None:
+            state = str(state)
+            if not state:
+                raise POSKeyError(oid)
+            if cache is not None:
+                cache.set(cachekey, state)
+            return state
+        else:
+            raise KeyError(oid)
+
     def loadBefore(self, oid, tid):
         """Return the most recent revision of oid before tid committed."""
         oid_int = u64(oid)
@@ -702,22 +764,6 @@
             self._lock_release()
 
 
-    def set_pack_gc(self, pack_gc):
-        """Configures whether garbage collection during packing is enabled.
-
-        Garbage collection is enabled by default.  If GC is disabled,
-        packing keeps at least one revision of every object.
-        With GC disabled, the pack code does not need to follow object
-        references, making packing conceivably much faster.
-        However, some of that benefit may be lost due to an ever
-        increasing number of unused objects.
-
-        Disabling garbage collection is also a hack that ensures
-        inter-database references never break.
-        """
-        self._pack_gc = pack_gc
-
-
     def pack(self, t, referencesf):
         if self._is_read_only:
             raise POSException.ReadOnlyError()
@@ -751,10 +797,10 @@
                 # In pre_pack, the adapter fills tables with
                 # information about what to pack.  The adapter
                 # should not actually pack anything yet.
-                adapter.pre_pack(tid_int, get_references, self._pack_gc)
+                adapter.pre_pack(tid_int, get_references, self._options)
 
                 # Now pack.
-                adapter.pack(tid_int)
+                adapter.pack(tid_int, self._options)
                 self._after_pack()
             finally:
                 adapter.release_pack_lock(lock_cursor)
@@ -785,14 +831,20 @@
         # self._zodb_conn = zodb_conn
         RelStorage.__init__(self, adapter=parent._adapter, name=parent._name,
             create=False, read_only=parent._is_read_only,
-            poll_interval=parent._poll_interval, pack_gc=parent._pack_gc)
+            options=parent._options)
         # _prev_polled_tid contains the tid at the previous poll
         self._prev_polled_tid = None
         self._poll_at = 0
 
+    def _get_oid_cache_key(self, oid_int):
+        my_tid = self._prev_polled_tid
+        if my_tid is None:
+            return None
+        return 'tid:%d:%d' % (oid_int, my_tid)
+
     def connection_closing(self):
         """Release resources."""
-        if not self._poll_interval:
+        if not self._options.poll_interval:
             self._rollback_load_connection()
         # else keep the load transaction open so that it's possible
         # to ignore the next poll.
@@ -818,7 +870,7 @@
             if self._closed:
                 return {}
 
-            if self._poll_interval:
+            if self._options.poll_interval:
                 now = time.time()
                 if self._load_transaction_open and now < self._poll_at:
                     # It's not yet time to poll again.  The previous load
@@ -826,7 +878,7 @@
                     # ignore this poll.
                     return {}
                 # else poll now after resetting the timeout
-                self._poll_at = now + self._poll_interval
+                self._poll_at = now + self._options.poll_interval
 
             self._restart_load()
             conn = self._load_conn
@@ -949,3 +1001,14 @@
         else:
             self.data = None
 
+
+class Options:
+    """Options for tuning RelStorage."""
+    def __init__(self):
+        self.poll_interval = 0
+        self.pack_gc = True
+        self.pack_batch_timeout = 5.0
+        self.pack_duty_cycle = 0.5
+        self.pack_max_delay = 20.0
+        self.cache_servers = ()  # ['127.0.0.1:11211']
+        self.cache_module_name = 'memcache'

Modified: relstorage/branches/1.1/relstorage/tests/reltestbase.py
===================================================================
--- relstorage/branches/1.1/relstorage/tests/reltestbase.py	2008-07-19 21:33:27 UTC (rev 88622)
+++ relstorage/branches/1.1/relstorage/tests/reltestbase.py	2008-07-19 21:55:26 UTC (rev 88623)
@@ -260,7 +260,7 @@
     def checkPollInterval(self):
         # Verify the poll_interval parameter causes RelStorage to
         # delay invalidation polling.
-        self._storage._poll_interval = 3600 
+        self._storage._poll_interval = 3600
         db = DB(self._storage)
         try:
             c1 = db.open()
@@ -446,7 +446,7 @@
             db.close()
 
     def checkPackGCDisabled(self):
-        self._storage.set_pack_gc(False)
+        self._storage._options.pack_gc = False
         self.checkPackGC(gc_enabled=False)
 
 
@@ -526,5 +526,3 @@
 
     def new_dest(self):
         return self._dst
-
-



More information about the Checkins mailing list