[Checkins] SVN: ZODB/trunk/src/ZODB/ Reimplemented MappingStorage to be more full featured, with a cleaner

Jim Fulton jim at zope.com
Sat Oct 25 20:36:37 EDT 2008


Log message for revision 92565:
  Reimplemented MappingStorage to be more full featured, with a cleaner
  and more instructive implementation.
  

Changed:
  U   ZODB/trunk/src/ZODB/MappingStorage.py
  U   ZODB/trunk/src/ZODB/tests/testMappingStorage.py

-=-
Modified: ZODB/trunk/src/ZODB/MappingStorage.py
===================================================================
--- ZODB/trunk/src/ZODB/MappingStorage.py	2008-10-26 00:36:28 UTC (rev 92564)
+++ ZODB/trunk/src/ZODB/MappingStorage.py	2008-10-26 00:36:36 UTC (rev 92565)
@@ -1,6 +1,6 @@
 ##############################################################################
 #
-# Copyright (c) 2001, 2002, 2003 Zope Corporation and Contributors.
+# Copyright (c) Zope Corporation and Contributors.
 # All Rights Reserved.
 #
 # This software is subject to the provisions of the Zope Public License,
@@ -11,135 +11,346 @@
 # FOR A PARTICULAR PURPOSE
 #
 ##############################################################################
-"""Very Simple Mapping ZODB storage
+"""A simple in-memory mapping-based ZODB storage
 
-The Mapping storage provides an extremely simple storage implementation that
-doesn't provide undo or version support.
+This storage provides an example implementation of a fairly full
+storage without distracting storage details.
+"""
 
-It is meant to illustrate the simplest possible storage.
+import BTrees
+import cPickle
+import time
+import threading
+import ZODB.interfaces
+import ZODB.POSException
+import ZODB.TimeStamp
+import ZODB.utils
+import zope.interface
 
-The Mapping storage uses a single data structure to map object ids to data.
-"""
+class MappingStorage:
+    
+    zope.interface.implements(
+        ZODB.interfaces.IStorage,
+        ZODB.interfaces.IStorageIteration,
+        )
 
-import ZODB.BaseStorage
-from ZODB.utils import u64, z64
-from ZODB import POSException
-from persistent.TimeStamp import TimeStamp
+    def __init__(self, name='MappingStorage'):
+        self.__name__ = name
+        self._data = {}                               # {oid->{tid->pickle}}
+        self._transactions = BTrees.OOBTree.OOBTree() # {tid->transaction}
+        self._ltid = None
+        self._last_pack = None
+        _lock = threading.RLock()
+        self._lock_acquire = _lock.acquire
+        self._lock_release = _lock.release
+        self._commit_lock = threading.Lock()
+        self._opened = True
+        self._transaction = None
+        self._oid = 0
 
+    ######################################################################
+    # Preconditions:
+    
+    def opened(self):
+        """The storage is open
+        """
+        return self._opened
 
-class MappingStorage(ZODB.BaseStorage.BaseStorage):
+    def not_in_transaction(self):
+        """The storage is not committing a transaction
+        """
+        return self._transaction is None
 
-    def __init__(self, name='Mapping Storage'):
-        ZODB.BaseStorage.BaseStorage.__init__(self, name)
-        # ._index maps an oid to a string s.  s[:8] is the tid of the
-        # transaction that created oid's current state, and s[8:] is oid's
-        # current state.
-        self._index = {}
-        self._clear_temp()
-        self._ltid = None
-        # Note: If you subclass this and use a persistent mapping facility
-        # (e.g. a dbm file), you will need to get the maximum key and save it
-        # as self._oid.  See dbmStorage.
+    #
+    ######################################################################
 
-    def __len__(self):
-        return len(self._index)
+    # testing framework (lame)
+    def cleanup(self):
+        pass
 
+    # ZODB.interfaces.IStorage
+    @ZODB.utils.locked
+    def close(self):
+        self._opened = False
+
+    # ZODB.interfaces.IStorage
+    def getName(self):
+        return self.__name__
+
+    # ZODB.interfaces.IStorage
+    @ZODB.utils.locked(opened)
     def getSize(self):
-        self._lock_acquire()
-        try:
-            # These constants are for Python object memory overheads.  Heh.
-            s = 32
-            for p in self._index.itervalues():
-                s += 56 + len(p)
-            return s
-        finally:
-            self._lock_release()
+        size = 0
+        for oid, tid_data in self._data.items():
+            size += 50
+            for tid, pickle in tid_data.items():
+                size += 100+len(pickle)
+        return size
 
-    def load(self, oid, version):
-        self._lock_acquire()
-        try:
+    # ZEO.interfaces.IServeable
+    @ZODB.utils.locked(opened)
+    def getTid(self, oid):
+        tid_data = self._data.get(oid)
+        if tid_data:
+            return tid_data.maxKey()
+        raise ZODB.POSException.POSKeyError(oid)
+        
+    # ZODB.interfaces.IStorage
+    @ZODB.utils.locked(opened)
+    def history(self, oid, size=1):
+        tid_data = self._data.get(oid)
+        if not tid_data:
+            raise ZODB.POSException.POSKeyError(oid)
+
+        tids = tid_data.keys()[-size:]
+        tids.reverse()
+        return [
+            dict(
+                time = ZODB.TimeStamp.TimeStamp(tid),
+                tid = tid,
+                serial = tid,
+                user_name = self._transactions[tid].user,
+                description = self._transactions[tid].description,
+                extension = self._transactions[tid].extension,
+                size = len(tid_data[tid])
+                )
+            for tid in tids]
+
+    # ZODB.interfaces.IStorage
+    def isReadOnly(self):
+        return False
+
+    # ZODB.interfaces.IStorageIteration
+    def iterator(self, start=None, end=None):
+        for transaction_record in self._transactions.values(start, end):
+            yield transaction_record
+        
+    # ZODB.interfaces.IStorage
+    @ZODB.utils.locked(opened)
+    def lastTransaction(self):
+        if self._ltid is not None:
+            return self._ltid
+
+    # ZODB.interfaces.IStorage
+    @ZODB.utils.locked(opened)
+    def __len__(self):
+        return len(self._data)
+
+    # ZODB.interfaces.IStorage
+    @ZODB.utils.locked(opened)
+    def load(self, oid, version=''):
+        assert not version, "Versions are not supported"
+        tid_data = self._data.get(oid)
+        if tid_data:
+            tid = tid_data.maxKey()
+            return tid_data[tid], tid
+        raise ZODB.POSException.POSKeyError(oid)
+
+    # ZODB.interfaces.IStorage
+    @ZODB.utils.locked(opened)
+    def loadBefore(self, oid, tid):
+        tid_data = self._data.get(oid)
+        if tid_data:
+            before = ZODB.utils.u64(tid)
+            if not before:
+                return None
+            before = ZODB.utils.p64(before-1)
+            tids_before = tid_data.keys(None, before)
+            if tids_before:
+                tids_after = tid_data.keys(tid, None)
+                tid = tids_before[-1]
+                return (tid_data[tid], tid,
+                        (tids_after and tids_after[0] or None)
+                        )
+        else:
+            raise ZODB.POSException.POSKeyError(oid)
+
+            
+    # ZODB.interfaces.IStorage
+    @ZODB.utils.locked(opened)
+    def loadSerial(self, oid, serial):
+        tid_data = self._data.get(oid)
+        if tid_data:
             try:
-                p = self._index[oid]
-                return p[8:], p[:8] # pickle, serial
+                return tid_data[serial]
             except KeyError:
-                raise POSException.POSKeyError(oid)
-        finally:
-            self._lock_release()
+                pass
 
-    def getTid(self, oid):
-        self._lock_acquire()
-        try:
-            # The tid is the first 8 bytes of the buffer.
-            return self._index[oid][:8]
-        finally:
-            self._lock_release()
+        raise ZODB.POSException.POSBeforeKeyError(oid)
 
+    # ZODB.interfaces.IStorage
+    @ZODB.utils.locked(opened)
+    def new_oid(self):
+        self._oid += 1
+        return ZODB.utils.p64(self._oid)
+
+    # ZODB.interfaces.IStorage
+    @ZODB.utils.locked(opened)
+    def pack(self, t, referencesf, gc=True):
+        if not self._data:
+            return
+        
+        stop = `ZODB.TimeStamp.TimeStamp(*time.gmtime(t)[:5]+(t%60,))`
+        if self._last_pack is not None and self._last_pack >= stop:
+            if self._last_pack == stop:
+                return
+            raise ValueError("Already packed to a later time")
+
+        self._last_pack = stop
+        transactions = self._transactions
+
+        # Step 1, remove old non-current records
+        for oid, tid_data in self._data.items():
+            tids_to_remove = tid_data.keys(None, stop)
+            if tids_to_remove:
+                tids_to_remove.pop()    # Keep the last, if any
+
+                if tids_to_remove:
+                    for tid in tids_to_remove:
+                        del tid_data[tid]
+                        if transactions[tid].pack(oid):
+                            del transactions[tid]
+
+        if gc:
+            # Step 2, GC.  A simple sweep+copy
+            new_data = BTrees.OOBTree.OOBTree()
+            to_copy = set([ZODB.utils.z64])
+            while to_copy:
+                oid = to_copy.pop()
+                tid_data = self._data.pop(oid)
+                new_data[oid] = tid_data
+                for pickle in tid_data.values():
+                    for oid in referencesf(pickle):
+                        if oid in new_data:
+                            continue
+                        to_copy.add(oid)
+
+            # Remove left over data from transactions
+            for oid, tid_data in self._data.items():
+                for tid in tid_data:
+                    if transactions[tid].pack(oid):
+                        del transactions[tid]
+
+            self._data = new_data
+
+    # ZODB.interfaces.IStorage
+    def registerDB(self, db):
+        pass
+
+    # ZODB.interfaces.IStorage
+    def sortKey(self):
+        return self.__name__
+
+    # ZODB.interfaces.IStorage
+    @ZODB.utils.locked(opened)
     def store(self, oid, serial, data, version, transaction):
+        assert not version, "Versions are not supported"
         if transaction is not self._transaction:
-            raise POSException.StorageTransactionError(self, transaction)
+            raise ZODB.POSException.StorageTransactionError(self, transaction)
 
-        if version:
-            raise POSException.Unsupported("Versions aren't supported")
+        old_tid = None
+        tid_data = self._data.get(oid)
+        if tid_data:
+            old_tid = tid_data.maxKey()
+            if serial != old_tid:
+                raise ZODB.POSException.ConflictError(
+                    oid=oid, serials=(old_tid, serial), data=data)
 
-        self._lock_acquire()
-        try:
-            if oid in self._index:
-                oserial = self._index[oid][:8]
-                if serial != oserial:
-                    raise POSException.ConflictError(oid=oid,
-                                                     serials=(oserial, serial),
-                                                     data=data)
-            self._tindex[oid] = self._tid + data
-        finally:
-            self._lock_release()
+        self._tdata[oid] = data
+
         return self._tid
 
-    def _clear_temp(self):
-        # store() saves data in _tindex; if the transaction completes
-        # successfully, _finish() merges _tindex into _index.
-        self._tindex = {}
+    # ZODB.interfaces.IStorage
+    @ZODB.utils.locked(opened)
+    def tpc_abort(self, transaction):
+        if transaction is not self._transaction:
+            return
+        self._transaction = None
+        self._commit_lock.release()
 
-    def _finish(self, tid, user, desc, ext):
-        self._index.update(self._tindex)
-        self._ltid = self._tid
+    # ZODB.interfaces.IStorage
+    @ZODB.utils.locked(opened)
+    def tpc_begin(self, transaction, tid=None):
+        # The tid argument exists to support testing.
+        if transaction is self._transaction:
+            return
+        self._lock_release()
+        self._commit_lock.acquire()
+        self._lock_acquire()
+        self._transaction = transaction
+        self._tdata = {}
+        if tid is None:
+            tid = ZODB.utils.newTid(self._ltid)
+        self._tid = tid
 
-    def lastTransaction(self):
-        return self._ltid
+    # ZODB.interfaces.IStorage
+    @ZODB.utils.locked(opened)
+    def tpc_finish(self, transaction, func = lambda tid: None):
+        if (transaction is not self._transaction) or not self._tdata:
+            return
 
-    def pack(self, t, referencesf):
-        self._lock_acquire()
-        try:
-            if not self._index:
-                return
-            # Build an index of *only* those objects reachable from the root.
-            rootl = [z64]
-            pindex = {}
-            while rootl:
-                oid = rootl.pop()
-                if oid not in pindex:
-                    # Scan non-version pickle for references.
-                    r = self._index[oid]
-                    pindex[oid] = r
-                    referencesf(r[8:], rootl)
-            self._index = pindex
+        tid = self._tid
+        func(tid)
 
-        finally:
-            self._lock_release()
+        tdata = self._tdata
+        for oid in tdata:
+            tid_data = self._data.get(oid)
+            if tid_data is None:
+                tid_data = BTrees.OOBTree.OOBucket()
+                self._data[oid] = tid_data
+            tid_data[tid] = tdata[oid]
 
-    def _splat(self):
-        """Spit out a string showing state."""
-        o = ['Index:']
-        keys = self._index.keys()
-        keys.sort()
-        for oid in keys:
-            r = self._index[oid]
-            o.append('  %s: %s, %s' %
-                     (u64(oid), TimeStamp(r[:8]), repr(r[8:])))
+        self._ltid = tid
+        self._transactions[tid] = TransactionRecord(tid, transaction, tdata)
+        self._transaction = None
+        self._commit_lock.release()
+ 
+    # ZEO.interfaces.IServeable
+    @ZODB.utils.locked(opened)
+    def tpc_transaction(self):
+        return self._transaction
 
-        return '\n'.join(o)
-
-    def cleanup(self):
+    # ZODB.interfaces.IStorage
+    def tpc_vote(self, transaction):
         pass
 
-    def close(self):
-        pass
+class TransactionRecord:
+
+    status = ' '
+
+    def __init__(self, tid, transaction, data):
+        self.tid = tid
+        self.user = transaction.user
+        self.description = transaction.description
+        extension = transaction._extension
+        self.extension = extension
+        self.data = data
+
+    _extension = property(lambda self: self._extension,
+                          lambda self, v: setattr(self, '_extension', v),
+                          )
+
+    def __iter__(self):
+        for oid, data in self.data.items():
+            yield DataRecord(oid, self.tid, data, None)
+
+    def pack(self, oid):
+        self.status = 'p'
+        del self.data[oid]
+        return not self.data
+
+class DataRecord(object):
+    """Abstract base class for iterator protocol"""
+
+    zope.interface.implements(ZODB.interfaces.IStorageRecordInformation)
+
+    version = ''
+
+    def __init__(self, oid, tid, data, prev):
+        self.oid = oid
+        self.tid = tid
+        self.data = data
+        self.data_txn = prev
+
+def DB(*args, **kw):
+    return ZODB.DB(MappingStorage(), *args, **kw)

Modified: ZODB/trunk/src/ZODB/tests/testMappingStorage.py
===================================================================
--- ZODB/trunk/src/ZODB/tests/testMappingStorage.py	2008-10-26 00:36:28 UTC (rev 92564)
+++ ZODB/trunk/src/ZODB/tests/testMappingStorage.py	2008-10-26 00:36:36 UTC (rev 92565)
@@ -14,17 +14,31 @@
 import ZODB.MappingStorage
 import unittest
 
-from ZODB.tests import StorageTestBase
-from ZODB.tests import BasicStorage, MTStorage, Synchronization
-from ZODB.tests import PackableStorage
 
-class MappingStorageTests(StorageTestBase.StorageTestBase,
-                          BasicStorage.BasicStorage,
-                          MTStorage.MTStorage,
-                          PackableStorage.PackableStorage,
-                          Synchronization.SynchronizedStorage,
-                          ):
+from ZODB.tests import (
+    BasicStorage,
+    HistoryStorage,
+    IteratorStorage,
+    MTStorage,
+    PackableStorage,
+    RevisionStorage,
+    StorageTestBase,
+    Synchronization,
+    )
 
+class MappingStorageTests(
+    StorageTestBase.StorageTestBase,
+    BasicStorage.BasicStorage,
+
+    HistoryStorage.HistoryStorage,
+    IteratorStorage.ExtendedIteratorStorage,
+    IteratorStorage.IteratorStorage,
+    MTStorage.MTStorage,
+    PackableStorage.PackableStorage,
+    RevisionStorage.RevisionStorage,
+    Synchronization.SynchronizedStorage,
+    ):
+
     def setUp(self):
         self._storage = ZODB.MappingStorage.MappingStorage()
 
@@ -36,8 +50,11 @@
         # doesnt support huge transaction metadata. This storage doesnt
         # have this limit, so we inhibit this test here.
         pass
+        
+    def checkLoadBeforeUndo(self):
+        pass # we don't support undo yet
+    checkUndoZombie = checkLoadBeforeUndo
 
-
 def test_suite():
     suite = unittest.makeSuite(MappingStorageTests, 'check')
     return suite



More information about the Checkins mailing list