[Zope-Checkins] CVS: StandaloneZODB/ZODB/tests - MTStorage.py:1.2 ReadOnlyStorage.py:1.2

Jeremy Hylton jeremy@zope.com
Mon, 21 Jan 2002 11:45:41 -0500


Update of /cvs-repository/StandaloneZODB/ZODB/tests
In directory cvs.zope.org:/tmp/cvs-serv9778

Added Files:
	MTStorage.py ReadOnlyStorage.py 
Log Message:
Add new tests from the Standby-branch branch


=== StandaloneZODB/ZODB/tests/MTStorage.py 1.1 => 1.2 ===
+import threading
+import time
+
+import ZODB
+from PersistentMapping import PersistentMapping
+
+from ZODB.tests.StorageTestBase \
+     import StorageTestBase, zodb_pickle, zodb_unpickle, handle_serials
+from ZODB.tests.MinPO import MinPO
+from ZODB.Transaction import Transaction
+from ZODB.POSException import ConflictError
+
+SHORT_DELAY = 0.01
+
+def sort(l):
+    "Sort a list in place and return it."
+    l.sort()
+    return l
+
+class ZODBClientThread(threading.Thread):
+
+    __super_init = threading.Thread.__init__
+
+    def __init__(self, db, test, commits=10, delay=SHORT_DELAY):
+        self.__super_init()
+        self.db = db
+        self.test = test
+        self.commits = commits
+        self.delay = delay
+
+    def run(self):
+        conn = self.db.open()
+        root = conn.root()
+        d = self.get_thread_dict(root)
+        if d is None:
+            self.test.fail()
+        else:
+            for i in range(self.commits):
+                self.commit(d, i)
+        self.test.assertEqual(sort(d.keys()), range(self.commits))
+
+    def commit(self, d, num):
+        d[num] = time.time()
+        time.sleep(self.delay)
+        get_transaction().commit()
+        time.sleep(self.delay)
+
+    def get_thread_dict(self, root):
+        name = self.getName()
+        # arbitrarily limit to 10 re-tries
+        for i in range(10):
+            try:
+                m = PersistentMapping()
+                root[name] = m
+                get_transaction().commit()
+                break
+            except ConflictError:
+                get_transaction().abort()
+        for i in range(10):
+            try:
+                return root.get(name)
+            except ConflictError:
+                get_transaction().abort()
+
+class StorageClientThread(threading.Thread):
+
+    __super_init = threading.Thread.__init__
+
+    def __init__(self, storage, test, commits=10, delay=SHORT_DELAY):
+        self.__super_init()
+        self.storage = storage
+        self.test = test
+        self.commits = commits
+        self.delay = delay
+        self.oids = {}
+
+    def run(self):
+        for i in range(self.commits):
+            self.dostore(i)
+        self.check()
+
+    def check(self):
+        for oid, revid in self.oids.items():
+            data, serial = self.storage.load(oid, '')
+            self.test.assertEqual(serial, revid)
+            obj = zodb_unpickle(data)
+            self.test.assertEqual(obj.value[0], self.getName())
+
+    def pause(self):
+        time.sleep(self.delay)
+
+    def oid(self):
+        oid = self.storage.new_oid()
+        self.oids[oid] = None
+        return oid
+
+    def dostore(self, i):
+        data = zodb_pickle(MinPO((self.getName(), i)))
+        t = Transaction()
+        oid = self.oid()
+        self.pause()
+
+        self.storage.tpc_begin(t)
+        self.pause()
+
+        # Always create a new object, signified by None for revid
+        r1 = self.storage.store(oid, None, data, '', t)
+        self.pause()
+
+        r2 = self.storage.tpc_vote(t)
+        self.pause()
+
+        self.storage.tpc_finish(t)
+        self.pause()
+
+        revid = handle_serials(oid, r1, r2)
+        self.oids[oid] = revid
+
+class ExtStorageClientThread(StorageClientThread):
+
+    def run(self):
+        # pick some other storage ops to execute
+        ops = [getattr(self, meth) for meth in dir(ExtStorageClientThread)
+               if meth.startswith('do_')]
+        assert ops, "Didn't find an storage ops in %s" % self.storage
+        # do a store to guarantee there's at least one oid in self.oids
+        self.dostore(0)
+
+        for i in range(self.commits - 1):
+            meth = random.choice(ops)
+            meth()
+            self.dostore(i)
+        self.check()
+
+    def pick_oid(self):
+        return random.choice(self.oids.keys())
+
+    def do_load(self):
+        oid = self.pick_oid()
+        self.storage.load(oid, '')
+
+    def do_loadSerial(self):
+        oid = self.pick_oid()
+        self.storage.loadSerial(oid, self.oids[oid])
+
+    def do_modifiedInVersion(self):
+        oid = self.pick_oid()
+        self.storage.modifiedInVersion(oid)
+
+    def do_undoLog(self):
+        self.storage.undoLog(0, -20)
+
+    def do_iterator(self):
+        try:
+            iter = self.storage.iterator()
+        except AttributeError:
+            # XXX It's hard to detect that a ZEO ClientStorage
+            # doesn't have this method, but does have all the others.
+            return
+        for obj in iter:
+            pass
+
+class MTStorage:
+    "Test a storage with multiple client threads executing concurrently."
+
+    def _checkNThreads(self, n, constructor, *args):
+        threads = [constructor(*args) for i in range(n)]
+        for t in threads:
+            t.start()
+        for t in threads:
+            t.join()
+    
+    def check2ZODBThreads(self):
+        db = ZODB.DB(self._storage)
+        self._checkNThreads(2, ZODBClientThread, db, self)
+
+    def check7ZODBThreads(self):
+        db = ZODB.DB(self._storage)
+        self._checkNThreads(7, ZODBClientThread, db, self)
+
+    def check2StorageThreads(self):
+        self._checkNThreads(2, StorageClientThread, self._storage, self)
+    
+    def check7StorageThreads(self):
+        self._checkNThreads(7, StorageClientThread, self._storage, self)
+
+    def check4ExtStorageThread(self):
+        self._checkNThreads(4, ExtStorageClientThread, self._storage, self)
+        


=== StandaloneZODB/ZODB/tests/ReadOnlyStorage.py 1.1 => 1.2 ===
+from ZODB.Transaction import Transaction
+
+class ReadOnlyStorage:
+
+    def _create_data(self):
+        # test a read-only storage that already has some data
+        self.oids = {}
+        for i in range(10):
+            oid = self._storage.new_oid()
+            revid = self._dostore(oid)
+            self.oids[oid] = revid
+
+    def _make_readonly(self):
+        self._storage.close()
+        self.open(read_only=1)
+        self.assert_(self._storage.isReadOnly())
+
+    def checkReadMethods(self):
+        self._create_data()
+        self._make_readonly()
+        # XXX not going to bother checking all read methods
+        for oid in self.oids.keys():
+            data, revid = self._storage.load(oid, '')
+            self.assertEqual(revid, self.oids[oid])
+            self.assert_(not self._storage.modifiedInVersion(oid))
+            _data = self._storage.loadSerial(oid, revid)
+            self.assertEqual(data, _data)
+
+    def checkWriteMethods(self):
+        self._make_readonly()
+        self.assertRaises(ReadOnlyError, self._storage.new_oid)
+        self.assertRaises(ReadOnlyError, self._storage.undo,
+                          '\000' * 8)
+
+        t = Transaction()
+        self._storage.tpc_begin(t)
+        self.assertRaises(ReadOnlyError, self._storage.abortVersion,
+                          '', t)
+        self._storage.tpc_abort(t)
+        
+        t = Transaction()
+        self._storage.tpc_begin(t)
+        self.assertRaises(ReadOnlyError, self._storage.commitVersion,
+                          '', '', t)
+        self._storage.tpc_abort(t)
+
+        t = Transaction()
+        self._storage.tpc_begin(t)
+        self.assertRaises(ReadOnlyError, self._storage.store,
+                          '\000' * 8, None, '', '', t)
+        self._storage.tpc_abort(t)
+
+        if self._storage.supportsTransactionalUndo():
+            t = Transaction()
+            self._storage.tpc_begin(t)
+            self.assertRaises(ReadOnlyError, self._storage.transactionalUndo,
+                              '\000' * 8, t)
+            self._storage.tpc_abort(t)
+            
+