[Zope3-checkins] CVS: ZODB4/src/zodb/storage/tests - test_autopack.py:1.3

Barry Warsaw barry@wooz.org
Wed, 22 Jan 2003 15:44:39 -0500


Update of /cvs-repository/ZODB4/src/zodb/storage/tests
In directory cvs.zope.org:/tmp/cvs-serv23266

Modified Files:
	test_autopack.py 
Log Message:
Cleanups and forward ports from ZODB 3.2, such as:

check* -> test*

check the berkeley_is_available flag before attempting to run the BDB
storage tests.

add pack race condition tests


=== ZODB4/src/zodb/storage/tests/test_autopack.py 1.2 => 1.3 ===
--- ZODB4/src/zodb/storage/tests/test_autopack.py:1.2	Wed Dec 25 09:12:20 2002
+++ ZODB4/src/zodb/storage/tests/test_autopack.py	Wed Jan 22 15:44:36 2003
@@ -12,26 +12,30 @@
 #
 ##############################################################################
 
-try:
-    import bsddb3
-except ImportError:
-    raise RuntimeError, 'BerkeleyDB not available'
-
 import os
 import time
 import unittest
+import threading
 
-from zodb.db import DB
-from zodb.storage.tests.minpo import MinPO
 from persistence import Persistent
 from transaction import get_transaction
 
-from zodb.storage.bdbfull import BDBFullStorage
-from zodb.storage.bdbminimal import BDBMinimalStorage
-from zodb.storage.base import BerkeleyConfig
-from zodb.storage.tests.base import BerkeleyTestBase
+from zodb.db import DB
+from zodb.timestamp import TimeStamp
+from zodb.ztransaction import Transaction
+from zodb.storage.base import ZERO, berkeley_is_available
+from zodb.storage.tests.base import zodb_pickle, zodb_unpickle
+from zodb.storage.tests.minpo import MinPO
+
+if berkeley_is_available:
+    from zodb.storage.bdbfull import BDBFullStorage
+    from zodb.storage.bdbminimal import BDBMinimalStorage
+    from zodb.storage.base import BerkeleyConfig
+    from zodb.storage.tests.base import BerkeleyTestBase
+else:
+    class fake: pass
+    BDBFullStorage = BDBMinimalStorage = fake
 
-ZERO = '\0'*8
 
 class C(Persistent):
     pass
@@ -63,11 +67,12 @@
             self._zap_dbhome(dir)
             raise
 
+
 
 class TestAutopack(TestAutopackBase):
     ConcreteStorage = BDBFullStorage
 
-    def checkAutopack(self):
+    def testAutopack(self):
         unless = self.failUnless
         raises = self.assertRaises
         storage = self._storage
@@ -108,7 +113,7 @@
         config.classicpack = 1
         return config
 
-    def checkAutomaticClassicPack(self):
+    def testAutomaticClassicPack(self):
         unless = self.failUnless
         raises = self.assertRaises
         storage = self._storage
@@ -136,7 +141,7 @@
         raises(KeyError, storage.loadSerial, oid, revid2)
         raises(KeyError, storage.loadSerial, oid, revid3)
 
-    def checkCycleUnreachable(self):
+    def testCycleUnreachable(self):
         unless = self.failUnless
         raises = self.assertRaises
         storage = self._storage
@@ -187,7 +192,7 @@
         config.frequency = 3
         return config
 
-    def checkRootUnreachable(self):
+    def testRootUnreachable(self):
         unless = self.failUnless
         raises = self.assertRaises
         storage = self._storage
@@ -216,7 +221,7 @@
         unless(storage.load(ZERO, ''))
         raises(KeyError, storage.load, oid, '')
 
-    def checkCycleUnreachable(self):
+    def testCycleUnreachable(self):
         unless = self.failUnless
         raises = self.assertRaises
         storage = self._storage
@@ -257,11 +262,267 @@
 
 
 
+class RaceConditionBase(BerkeleyTestBase):
+    def setUp(self):
+        BerkeleyTestBase.setUp(self)
+        self._cv = threading.Condition()
+        self._storage.cv = self._cv
+
+    def tearDown(self):
+        # clean up any outstanding transactions
+        get_transaction().abort()
+
+
+
+# Subclass which does ugly things to _dopack so we can actually test the race
+# condition.  We need to store a new object in the database between the
+# _mark() call and the _sweep() call.
+class SynchronizedFullStorage(BDBFullStorage):
+    # XXX Cut and paste copy from BDBFullStorage, except where indicated
+    def _dopack(self, t, gc=True):
+        # t is a TimeTime, or time float, convert this to a TimeStamp object,
+        # using an algorithm similar to what's used in FileStorage.  We know
+        # that our transaction ids, a.k.a. revision ids, are timestamps.
+        #
+        # BAW: should a pack time in the future be a ValueError?  We'd have to
+        # worry about clock skew, so for now, we just set the pack time to the
+        # minimum of t and now.
+        packtime = min(t, time.time())
+        t0 = TimeStamp(*(time.gmtime(packtime)[:5] + (packtime % 60,)))
+        packtid = `t0`
+        # Collect all revisions of all objects earlier than the pack time.
+        self._lock_acquire()
+        try:
+            self._withtxn(self._collect_revs, packtid)
+        finally:
+            self._lock_release()
+        # Collect any objects with refcount zero.
+        self._lock_acquire()
+        try:
+            self._withtxn(self._collect_objs)
+        finally:
+            self._lock_release()
+        # If we're not doing a classic pack, we're done.
+        if not gc:
+            return
+        # Do a mark and sweep for garbage collection.  Calculate the set of
+        # objects reachable from the root.  Anything else is a candidate for
+        # having all their revisions packed away.  The set of reachable
+        # objects lives in the _packmark table.
+        self._lock_acquire()
+        try:
+            self._withtxn(self._mark, packtid)
+        finally:
+            self._lock_release()
+        # XXX thread coordination code start
+        self.cv.acquire()
+        self.cv.notify()
+        self.cv.wait()
+        # XXX thread coordination code stop
+        #
+        # Now perform a sweep, using oidqueue to hold all object ids for
+        # objects which are not root reachable as of the pack time.
+        self._lock_acquire()
+        try:
+            self._withtxn(self._sweep, packtid)
+        finally:
+            self._lock_release()
+        # Once again, collect any objects with refcount zero due to the mark
+        # and sweep garbage collection pass.
+        self._lock_acquire()
+        try:
+            self._withtxn(self._collect_objs)
+        finally:
+            self._lock_release()
+        # XXX thread coordination code start
+        self.cv.notify()
+        self.cv.release()
+        # XXX thread coordination code stop
+
+
+class FullPackThread(threading.Thread):
+    def __init__(self, storage):
+        threading.Thread.__init__(self)
+        self._storage = storage
+
+    def run(self):
+        self._storage.autopack(time.time(), gc=True)
+
+
+class TestFullClassicPackRaceCondition(RaceConditionBase):
+    ConcreteStorage = SynchronizedFullStorage
+
+    def testRaceCondition(self):
+        unless = self.failUnless
+        storage = self._storage
+        db = DB(storage)
+        conn = db.open()
+        root = conn.root()
+        # Start by storing a root reachable object.
+        obj1 = C()
+        obj1.value = 888
+        root.obj1 = obj1
+        txn = get_transaction()
+        txn.note('root -> obj1')
+        txn.commit()
+        # Now, start a transaction, store an object, but don't yet complete
+        # the transaction.  This will ensure that the second object has a tid
+        # < packtime, but it won't be root reachable yet.
+        obj2 = C()
+        t = Transaction()
+        storage.tpc_begin(t)
+        obj2sn = storage.store('\0'*7 + '\2', ZERO, zodb_pickle(obj2), '', t)
+        # Now, acquire the condvar lock and start a thread that will do a
+        # pack, up to the _sweep call.  Wait for the _mark() call to
+        # complete.
+        now = time.time()
+        while now == time.time():
+            time.sleep(0.5)
+        self._cv.acquire()
+        packthread = FullPackThread(storage)
+        packthread.start()
+        self._cv.wait()
+        # Now that the _mark() has finished, complete the transaction, which
+        # links the object to root.
+        root.obj2 = obj2
+        rootsn = storage.getSerial(ZERO)
+        rootsn = storage.store(ZERO, rootsn, zodb_pickle(root), '', t)
+        storage.tpc_vote(t)
+        storage.tpc_finish(t)
+        # And notify the pack thread that it can do the sweep and collect
+        self._cv.notify()
+        self._cv.wait()
+        # We're done with the condvar and the thread
+        self._cv.release()
+        packthread.join()
+        # Now make sure that all the interesting objects are still available
+        rootsn = storage.getSerial(ZERO)
+        obj1sn = storage.getSerial('\0'*7 + '\1')
+        obj2sn = storage.getSerial('\0'*7 + '\2')
+        # obj1 revision was written before the second revision of the root
+        unless(obj1sn < rootsn)
+        unless(rootsn == obj2sn)
+        unless(obj1sn < obj2sn)
+
+
+
+# Subclass which does ugly things to _dopack so we can actually test the race
+# condition.  We need to storage a new object in the database between the
+# _mark() call and the _sweep() call.
+class SynchronizedMinimalStorage(BDBMinimalStorage):
+    # XXX Cut and paste copy from BDBMinimalStorage, except where indicated
+    def _dopack(self):
+        # Do a mark and sweep for garbage collection.  Calculate the set of
+        # objects reachable from the root.  Anything else is a candidate for
+        # having all their revisions packed away.  The set of reachable
+        # objects lives in the _packmark table.
+        self._lock_acquire()
+        try:
+            self._withtxn(self._mark)
+        finally:
+            self._lock_release()
+        # XXX thread coordination code start
+        self.cv.acquire()
+        self.cv.notify()
+        self.cv.wait()
+        # XXX thread coordination code stop
+        #
+        # Now perform a sweep, using oidqueue to hold all object ids for
+        # objects which are not root reachable as of the pack time.
+        self._lock_acquire()
+        try:
+            self._withtxn(self._sweep)
+        finally:
+            self._lock_release()
+        # Once again, collect any objects with refcount zero due to the mark
+        # and sweep garbage collection pass.
+        self._lock_acquire()
+        try:
+            self._withtxn(self._collect_objs)
+        finally:
+            self._lock_release()
+        # XXX thread coordination code start
+        self.cv.notify()
+        self.cv.release()
+        # XXX thread coordination code stop
+
+
+class MinimalPackThread(threading.Thread):
+    def __init__(self, storage):
+        threading.Thread.__init__(self)
+        self._storage = storage
+
+    def run(self):
+        self._storage.pack(time.time())
+
+
+class TestMinimalClassicPackRaceCondition(RaceConditionBase):
+    ConcreteStorage = SynchronizedMinimalStorage
+
+    def testRaceCondition(self):
+        unless = self.failUnless
+        storage = self._storage
+        db = DB(storage)
+        conn = db.open()
+        root = conn.root()
+        # Start by storing a root reachable object.
+        obj1 = C()
+        obj1.value = 888
+        root.obj1 = obj1
+        txn = get_transaction()
+        txn.note('root -> obj1')
+        txn.commit()
+        # Now, start a transaction, store an object, but don't yet complete
+        # the transaction.  This will ensure that the second object has a tid
+        # < packtime, but it won't be root reachable yet.
+        obj2 = C()
+        t = Transaction()
+        storage.tpc_begin(t)
+        obj2sn = storage.store('\0'*7 + '\2', ZERO, zodb_pickle(obj2), '', t)
+        # Now, acquire the condvar lock and start a thread that will do a
+        # pack, up to the _sweep call.  Wait for the _mark() call to
+        # complete.
+        now = time.time()
+        while now == time.time():
+            time.sleep(0.5)
+        self._cv.acquire()
+        packthread = MinimalPackThread(storage)
+        packthread.start()
+        self._cv.wait()
+        # Now that the _mark() has finished, complete the transaction, which
+        # links the object to root.
+        root.obj2 = obj2
+        rootsn = storage.getSerial(ZERO)
+        #rootdata = ObjectWriter(root._p_jar).getState(root)
+        rootsn = storage.store(ZERO, rootsn, zodb_pickle(root), '', t)
+        storage.tpc_vote(t)
+        storage.tpc_finish(t)
+        # And notify the pack thread that it can do the sweep and collect
+        self._cv.notify()
+        self._cv.wait()
+        # We're done with the condvar and the thread
+        self._cv.release()
+        packthread.join()
+        # Now make sure that all the interesting objects are still available
+        rootsn = storage.getSerial(ZERO)
+        obj1sn = storage.getSerial('\0'*7 + '\1')
+        obj2sn = storage.getSerial('\0'*7 + '\2')
+        # obj1 revision was written before the second revision of the root
+        unless(obj1sn < rootsn)
+        unless(rootsn == obj2sn)
+        unless(obj1sn < obj2sn)
+
+
+
 def test_suite():
     suite = unittest.TestSuite()
-    suite.addTest(unittest.makeSuite(TestAutopack, 'check'))
-    suite.addTest(unittest.makeSuite(TestAutomaticClassicPack, 'check'))
-    suite.addTest(unittest.makeSuite(TestMinimalPack, 'check'))
+    suite.level = 2
+    if berkeley_is_available:
+        suite.addTest(unittest.makeSuite(TestAutopack))
+        suite.addTest(unittest.makeSuite(TestAutomaticClassicPack))
+        suite.addTest(unittest.makeSuite(TestMinimalPack))
+        suite.addTest(unittest.makeSuite(TestFullClassicPackRaceCondition))
+        suite.addTest(unittest.makeSuite(TestMinimalClassicPackRaceCondition))
     return suite