[Zodb-checkins] CVS: ZODB3/ZEO/tests - InvalidationTests.py:1.4.2.2

Jeremy Hylton jeremy at zope.com
Fri Sep 5 17:21:02 EDT 2003


Update of /cvs-repository/ZODB3/ZEO/tests
In directory cvs.zope.org:/tmp/cvs-serv1078/ZEO/tests

Modified Files:
      Tag: ZODB3-3_2-branch
	InvalidationTests.py 
Log Message:
Port the modern, adaptive concurrent update tests from ZODB3-3_1-branch.


=== ZODB3/ZEO/tests/InvalidationTests.py 1.4.2.1 => 1.4.2.2 ===
--- ZODB3/ZEO/tests/InvalidationTests.py:1.4.2.1	Thu Jun 26 11:38:38 2003
+++ ZODB3/ZEO/tests/InvalidationTests.py	Fri Sep  5 16:21:00 2003
@@ -12,7 +12,6 @@
 #
 ##############################################################################
 
-from thread import get_ident
 import threading
 import time
 
@@ -20,12 +19,10 @@
 from BTrees.OOBTree import OOBTree
 
 from ZEO.tests.TestThread import TestThread
-from ZEO.tests.ConnectionTests import CommonSetupTearDown
 
 from ZODB.DB import DB
 from ZODB.POSException \
      import ReadConflictError, ConflictError, VersionLockError
-import zLOG
 
 # The tests here let several threads have a go at one or more database
 # instances simultaneously.  Each thread appends a disjoint (from the
@@ -48,8 +45,8 @@
     # to 'tree' until Event stop is set.  If sleep is given, sleep
     # that long after each append.  At the end, instance var .added_keys
     # is a list of the ints the thread believes it added successfully.
-    def __init__(self, testcase, db, stop, threadnum, startnum,
-                 step=2, sleep=None):
+    def __init__(self, testcase, db, stop, threadnum, commitdict,
+                 startnum, step=2, sleep=None):
         TestThread.__init__(self, testcase)
         self.db = db
         self.stop = stop
@@ -58,6 +55,7 @@
         self.step = step
         self.sleep = sleep
         self.added_keys = []
+        self.commitdict = commitdict
 
     def testrun(self):
         cn = self.db.open()
@@ -74,6 +72,7 @@
                 tree[key] = self.threadnum
                 get_transaction().note("add key %s" % key)
                 get_transaction().commit()
+                self.commitdict[self] = 1
                 if self.sleep:
                     time.sleep(self.sleep)
             except (ReadConflictError, ConflictError), msg:
@@ -88,9 +87,13 @@
             key += self.step
         cn.close()
 
-class VersionStressThread(TestThread):
+class LargeUpdatesThread(TestThread):
+
+    # A thread that performs a lot of updates.  It attempts to modify
+    # more than 25 objects so that it can test code that runs vote
+    # in a separate thread when it modifies more than 25 objects.
 
-    def __init__(self, testcase, db, stop, threadnum, startnum,
+    def __init__(self, testcase, db, stop, threadnum, commitdict, startnum,
                  step=2, sleep=None):
         TestThread.__init__(self, testcase)
         self.db = db
@@ -100,21 +103,88 @@
         self.step = step
         self.sleep = sleep
         self.added_keys = []
+        self.commitdict = commitdict
 
-    def log(self, msg):
-        zLOG.LOG("thread %d" % get_ident(), 0, msg)
+    def testrun(self):
+        cn = self.db.open()
+        while not self.stop.isSet():
+            try:
+                tree = cn.root()["tree"]
+                break
+            except (ConflictError, KeyError):
+                # print "%d getting tree abort" % self.threadnum
+                get_transaction().abort()
+                cn.sync()
+
+        keys_added = {} # set of keys we commit
+        tkeys = []
+        while not self.stop.isSet():
+
+            # The test picks 50 keys spread across many buckets.
+            # self.startnum and self.step ensure that all threads use
+            # disjoint key sets, to minimize conflict errors.
+
+            nkeys = len(tkeys)
+            if nkeys < 50:
+                tkeys = range(self.startnum, 3000, self.step)
+                nkeys = len(tkeys)
+            step = max(int(nkeys / 50), 1)
+            keys = [tkeys[i] for i in range(0, nkeys, step)]
+            for key in keys:
+                try:
+                    tree[key] = self.threadnum
+                except (ReadConflictError, ConflictError), msg:
+                    # print "%d setting key %s" % (self.threadnum, msg)
+                    get_transaction().abort()
+                    cn.sync()
+                    break
+            else:
+                # print "%d set #%d" % (self.threadnum, len(keys))
+                get_transaction().note("keys %s" % ", ".join(map(str, keys)))
+                try:
+                    get_transaction().commit()
+                    self.commitdict[self] = 1
+                    if self.sleep:
+                        time.sleep(self.sleep)
+                except ConflictError, msg:
+                    # print "%d commit %s" % (self.threadnum, msg)
+                    get_transaction().abort()
+                    cn.sync()
+                    continue
+                for k in keys:
+                    tkeys.remove(k)
+                    keys_added[k] = 1
+                # sync() is necessary here to process invalidations
+                # if we get a read conflict.  In the read conflict case,
+                # no objects were modified so cn never got registered
+                # with the transaction.
+                cn.sync()
+        self.added_keys = keys_added.keys()
+        cn.close()
+
+class VersionStressThread(TestThread):
+
+    def __init__(self, testcase, db, stop, threadnum, commitdict, startnum,
+                 step=2, sleep=None):
+        TestThread.__init__(self, testcase)
+        self.db = db
+        self.stop = stop
+        self.threadnum = threadnum
+        self.startnum = startnum
+        self.step = step
+        self.sleep = sleep
+        self.added_keys = []
+        self.commitdict = commitdict
 
     def testrun(self):
-        self.log("thread begin")
         commit = 0
         key = self.startnum
         while not self.stop.isSet():
             version = "%s:%s" % (self.threadnum, key)
             commit = not commit
-            self.log("attempt to add key=%s version=%s commit=%d" %
-                     (key, version, commit))
             if self.oneupdate(version, key, commit):
                 self.added_keys.append(key)
+                self.commitdict[self] = 1
             key += self.step
 
     def oneupdate(self, version, key, commit=1):
@@ -134,11 +204,11 @@
         while not self.stop.isSet():
             try:
                 tree[key] = self.threadnum
-                get_transaction().note("add key %d" % key)
                 get_transaction().commit()
+                if self.sleep:
+                    time.sleep(self.sleep)
                 break
             except (VersionLockError, ReadConflictError, ConflictError), msg:
-                self.log(msg)
                 get_transaction().abort()
                 # sync() is necessary here to process invalidations
                 # if we get a read conflict.  In the read conflict case,
@@ -161,20 +231,30 @@
                         time.sleep(self.sleep)
                     return commit
                 except ConflictError, msg:
-                    self.log(msg)
                     get_transaction().abort()
                     cn.sync()
         finally:
             cn.close()
         return 0
 
-class InvalidationTests(CommonSetupTearDown):
+class InvalidationTests:
 
     level = 2
-    DELAY = 15  # number of seconds the main thread lets the workers run
+
+    # Minimum # of seconds the main thread lets the workers run.  The
+    # test stops as soon as this much time has elapsed, and all threads
+    # have managed to commit a change.
+    MINTIME = 10
+
+    # Maximum # of seconds the main thread lets the workers run.  We
+    # stop after this long has elapsed regardless of whether all threads
+    # have managed to commit a change.
+    MAXTIME = 300
+
+    StressThread = StressThread
 
     def _check_tree(self, cn, tree):
-        # Make sure the BTree is sane and that all the updates persisted.
+        # Make sure the BTree is sane at the C level.
         retries = 3
         while retries:
             retries -= 1
@@ -194,28 +274,46 @@
     def _check_threads(self, tree, *threads):
         # Make sure the thread's view of the world is consistent with
         # the actual database state.
-        all_keys = []
+        expected_keys = []
+        errormsgs = []
+        err = errormsgs.append
         for t in threads:
-            # If the test didn't add any keys, it didn't do what we expected.
-            self.assert_(t.added_keys)
-            for key in t.added_keys:
-                self.assert_(tree.has_key(key), key)
-            all_keys.extend(t.added_keys)
-        all_keys.sort()
-        self.assertEqual(all_keys, list(tree.keys()))
+            if not t.added_keys:
+                err("thread %d didn't add any keys" % t.threadnum)
+            expected_keys.extend(t.added_keys)
+        expected_keys.sort()
+        actual_keys = list(tree.keys())
+        if expected_keys != actual_keys:
+            err("expected keys != actual keys")
+            for k in expected_keys:
+                if k not in actual_keys:
+                    err("key %s expected but not in tree" % k)
+            for k in actual_keys:
+                if k not in expected_keys:
+                    err("key %s in tree but not expected" % k)
+        if errormsgs:
+            display(tree)
+            self.fail('\n'.join(errormsgs))
 
-    def go(self, stop, *threads):
+    def go(self, stop, commitdict, *threads):
         # Run the threads
         for t in threads:
             t.start()
-        time.sleep(self.DELAY)
+        delay = self.MINTIME
+        start = time.time()
+        while time.time() - start <= self.MAXTIME:
+            time.sleep(delay)
+            delay = 2.0
+            if len(commitdict) >= len(threads):
+                break
+            # Some thread still hasn't managed to commit anything.
         stop.set()
         for t in threads:
             t.cleanup()
 
     def checkConcurrentUpdates2Storages(self):
         self._storage = storage1 = self.openClientStorage()
-        storage2 = self.openClientStorage(cache="2")
+        storage2 = self.openClientStorage()
         db1 = DB(storage1)
         db2 = DB(storage2)
         stop = threading.Event()
@@ -225,9 +323,10 @@
         get_transaction().commit()
 
         # Run two threads that update the BTree
-        t1 = StressThread(self, db1, stop, 1, 1)
-        t2 = StressThread(self, db2, stop, 2, 2)
-        self.go(stop, t1, t2)
+        cd = {}
+        t1 = self.StressThread(self, db1, stop, 1, cd, 1)
+        t2 = self.StressThread(self, db2, stop, 2, cd, 2)
+        self.go(stop, cd, t1, t2)
 
         cn.sync()
         self._check_tree(cn, tree)
@@ -247,9 +346,10 @@
         get_transaction().commit()
 
         # Run two threads that update the BTree
-        t1 = StressThread(self, db1, stop, 1, 1, sleep=0.001)
-        t2 = StressThread(self, db1, stop, 2, 2, sleep=0.001)
-        self.go(stop, t1, t2)
+        cd = {}
+        t1 = self.StressThread(self, db1, stop, 1, cd, 1, sleep=0.001)
+        t2 = self.StressThread(self, db1, stop, 2, cd, 2, sleep=0.001)
+        self.go(stop, cd, t1, t2)
 
         cn.sync()
         self._check_tree(cn, tree)
@@ -261,22 +361,23 @@
     def checkConcurrentUpdates2StoragesMT(self):
         self._storage = storage1 = self.openClientStorage()
         db1 = DB(storage1)
+        db2 = DB(self.openClientStorage())
         stop = threading.Event()
 
         cn = db1.open()
         tree = cn.root()["tree"] = OOBTree()
         get_transaction().commit()
 
-        db2 = DB(self.openClientStorage(cache="2"))
         # Run three threads that update the BTree.
         # Two of the threads share a single storage so that it
         # is possible for both threads to read the same object
         # at the same time.
 
-        t1 = StressThread(self, db1, stop, 1, 1, 3)
-        t2 = StressThread(self, db2, stop, 2, 2, 3, 0.001)
-        t3 = StressThread(self, db2, stop, 3, 3, 3, 0.001)
-        self.go(stop, t1, t2, t3)
+        cd = {}
+        t1 = self.StressThread(self, db1, stop, 1, cd, 1, 3)
+        t2 = self.StressThread(self, db2, stop, 2, cd, 2, 3, 0.001)
+        t3 = self.StressThread(self, db2, stop, 3, cd, 3, 3, 0.001)
+        self.go(stop, cd, t1, t2, t3)
 
         cn.sync()
         self._check_tree(cn, tree)
@@ -289,7 +390,7 @@
     def checkConcurrentUpdatesInVersions(self):
         self._storage = storage1 = self.openClientStorage()
         db1 = DB(storage1)
-        db2 = DB(self.openClientStorage(cache="2"))
+        db2 = DB(self.openClientStorage())
         stop = threading.Event()
 
         cn = db1.open()
@@ -301,10 +402,11 @@
         # is possible for both threads to read the same object
         # at the same time.
 
-        t1 = VersionStressThread(self, db1, stop, 1, 1, 3)
-        t2 = VersionStressThread(self, db2, stop, 2, 2, 3, 0.001)
-        t3 = VersionStressThread(self, db2, stop, 3, 3, 3, 0.001)
-        self.go(stop, t1, t2, t3)
+        cd = {}
+        t1 = VersionStressThread(self, db1, stop, 1, cd, 1, 3)
+        t2 = VersionStressThread(self, db2, stop, 2, cd, 2, 3, 0.001)
+        t3 = VersionStressThread(self, db2, stop, 3, cd, 3, 3, 0.001)
+        self.go(stop, cd, t1, t2, t3)
 
         cn.sync()
         self._check_tree(cn, tree)
@@ -314,3 +416,41 @@
         db1.close()
         db2.close()
 
+    def checkConcurrentLargeUpdates(self):
+        # Use 3 threads like the 2StorageMT test above.
+        self._storage = storage1 = self.openClientStorage()
+        db1 = DB(storage1)
+        db2 = DB(self.openClientStorage())
+        stop = threading.Event()
+
+        cn = db1.open()
+        tree = cn.root()["tree"] = OOBTree()
+        for i in range(0, 3000, 2):
+            tree[i] = 0
+        get_transaction().commit()
+
+        # Run three threads that update the BTree.
+        # Two of the threads share a single storage so that it
+        # is possible for both threads to read the same object
+        # at the same time.
+
+        cd = {}
+        t1 = LargeUpdatesThread(self, db1, stop, 1, cd, 1, 3, 0.001)
+        t2 = LargeUpdatesThread(self, db2, stop, 2, cd, 2, 3, 0.001)
+        t3 = LargeUpdatesThread(self, db2, stop, 3, cd, 3, 3, 0.001)
+        self.go(stop, cd, t1, t2, t3)
+
+        cn.sync()
+        self._check_tree(cn, tree)
+
+        # Purge the tree of the dummy entries mapping to 0.
+        losers = [k for k, v in tree.items() if v == 0]
+        for k in losers:
+            del tree[k]
+        get_transaction().commit()
+
+        self._check_threads(tree, t1, t2, t3)
+
+        cn.close()
+        db1.close()
+        db2.close()




More information about the Zodb-checkins mailing list