[Zope3-checkins] CVS: Zope3/src/zodb/zeo/tests - invalid.py:1.5

Tim Peters tim.one at comcast.net
Thu Sep 4 16:12:20 EDT 2003


Update of /cvs-repository/Zope3/src/zodb/zeo/tests
In directory cvs.zope.org:/tmp/cvs-serv23652/src/zodb/zeo/tests

Modified Files:
	invalid.py 
Log Message:
Port from ZODB3-3_1-branch:  The ZEO testConcurrent...Updates... tests
spin off multiple threads, all of which try to commit.  If some thread
doesn't manage to commit within 15 seconds, the test fails.  Unfortunately,
that isn't rare, and especially not on Windows, and especially not when
Berkeley is the underlying storage.

3.1 attempted to repair this by periodically peeking into the threads'
progress, letting a test consume as long as 5 minutes if at least one
thread still hasn't managed to commit.  I haven't seen one of these tests
fail since then in 3.1 (whose tests I've been running a lot lately), so
am hopeful that people stop seeing them fail in the Zope3 tests too now.


=== Zope3/src/zodb/zeo/tests/invalid.py 1.4 => 1.5 ===
--- Zope3/src/zodb/zeo/tests/invalid.py:1.4	Sat Jun 21 18:04:28 2003
+++ Zope3/src/zodb/zeo/tests/invalid.py	Thu Sep  4 15:12:19 2003
@@ -28,8 +28,6 @@
 
 import logging
 
-# XXX stopped porting here
-
 # The tests here let several threads have a go at one or more database
 # instances simultaneously.  Each thread appends a disjoint (from the
 # other threads) sequence of increasing integers to an OOBTree, one at
@@ -51,8 +49,8 @@
     # to 'tree' until Event stop is set.  If sleep is given, sleep
     # that long after each append.  At the end, instance var .added_keys
     # is a list of the ints the thread believes it added successfully.
-    def __init__(self, testcase, db, stop, threadnum, startnum,
-                 step=2, sleep=None):
+    def __init__(self, testcase, db, stop, threadnum, commitdict,
+                 startnum, step=2, sleep=None):
         TestThread.__init__(self, testcase)
         self.db = db
         self.stop = stop
@@ -61,6 +59,7 @@
         self.step = step
         self.sleep = sleep
         self.added_keys = []
+        self.commitdict = commitdict
 
     def testrun(self):
         cn = self.db.open()
@@ -77,6 +76,7 @@
                 tree[key] = self.threadnum
                 get_transaction().note("add key %s" % key)
                 get_transaction().commit()
+                self.commitdict[self] = 1
                 if self.sleep:
                     time.sleep(self.sleep)
             except (ReadConflictError, ConflictError), msg:
@@ -91,9 +91,85 @@
             key += self.step
         cn.close()
 
+class LargeUpdatesThread(TestThread):
+
+    # A thread that performs a lot of updates.  It attempts to modify
+    # more than 25 objects so that it can test code that runs vote
+    # in a separate thread when it modifies more than 25 objects.
+    # XXX ZODB4 doesn't actually run vote in a separate thread then.
+
+    def __init__(self, testcase, db, stop, threadnum, commitdict, startnum,
+                 step=2, sleep=None):
+        TestThread.__init__(self, testcase)
+        self.db = db
+        self.stop = stop
+        self.threadnum = threadnum
+        self.startnum = startnum
+        self.step = step
+        self.sleep = sleep
+        self.added_keys = []
+        self.commitdict = commitdict
+
+    def testrun(self):
+        cn = self.db.open()
+        while not self.stop.isSet():
+            try:
+                tree = cn.root()["tree"]
+                break
+            except (ConflictError, KeyError):
+                # print "%d getting tree abort" % self.threadnum
+                get_transaction().abort()
+                cn.sync()
+
+        keys_added = {} # set of keys we commit
+        tkeys = []
+        while not self.stop.isSet():
+
+            # The test picks 50 keys spread across many buckets.
+            # self.startnum and self.step ensure that all threads use
+            # disjoint key sets, to minimize conflict errors.
+
+            nkeys = len(tkeys)
+            if nkeys < 50:
+                tkeys = range(self.startnum, 3000, self.step)
+                nkeys = len(tkeys)
+            step = max(int(nkeys / 50), 1)
+            keys = [tkeys[i] for i in range(0, nkeys, step)]
+            for key in keys:
+                try:
+                    tree[key] = self.threadnum
+                except (ReadConflictError, ConflictError), msg:
+                    # print "%d setting key %s" % (self.threadnum, msg)
+                    get_transaction().abort()
+                    cn.sync()
+                    break
+            else:
+                # print "%d set #%d" % (self.threadnum, len(keys))
+                get_transaction().note("keys %s" % ", ".join(map(str, keys)))
+                try:
+                    get_transaction().commit()
+                    self.commitdict[self] = 1
+                    if self.sleep:
+                        time.sleep(self.sleep)
+                except ConflictError, msg:
+                    # print "%d commit %s" % (self.threadnum, msg)
+                    get_transaction().abort()
+                    cn.sync()
+                    continue
+                for k in keys:
+                    tkeys.remove(k)
+                    keys_added[k] = 1
+                # sync() is necessary here to process invalidations
+                # if we get a read conflict.  In the read conflict case,
+                # no objects were modified so cn never got registered
+                # with the transaction.
+                cn.sync()
+        self.added_keys = keys_added.keys()
+        cn.close()
+
 class VersionStressThread(TestThread):
 
-    def __init__(self, testcase, db, stop, threadnum, startnum,
+    def __init__(self, testcase, db, stop, threadnum, commitdict, startnum,
                  step=2, sleep=None):
         TestThread.__init__(self, testcase)
         self.db = db
@@ -104,6 +180,7 @@
         self.sleep = sleep
         self.added_keys = []
         self.log = logging.getLogger("thread:%s" % get_ident()).info
+        self.commitdict = commitdict
 
     def testrun(self):
         self.log("thread begin")
@@ -116,6 +193,7 @@
                      (key, version, commit))
             if self.oneupdate(version, key, commit):
                 self.added_keys.append(key)
+                self.commitdict[self] = 1
             key += self.step
 
     def oneupdate(self, version, key, commit=1):
@@ -130,6 +208,7 @@
                 tree = cn.root()["tree"]
                 break
             except (ConflictError, KeyError), msg:
+                get_transaction().abort()
                 cn.sync()
         while not self.stop.isSet():
             try:
@@ -173,7 +252,18 @@
 class InvalidationTests(CommonSetupTearDown):
 
     level = 2
-    DELAY = 15  # number of seconds the main thread lets the workers run
+
+    # Minimum # of seconds the main thread lets the workers run.  The
+    # test stops as soon as this much time has elapsed, and all threads
+    # have managed to commit a change.
+    MINTIME = 10
+
+    # Maximum # of seconds the main thread lets the workers run.  We
+    # stop after this long has elapsed regardless of whether all threads
+    # have managed to commit a change.
+    MAXTIME = 300
+
+    StressThread = StressThread
 
     def setUp(self):
         super(InvalidationTests, self).setUp()
@@ -190,7 +280,7 @@
         return db
 
     def _check_tree(self, cn, tree):
-        # Make sure the BTree is sane and that all the updates persisted.
+        # Make sure the BTree is sane at the C level.
         retries = 3
         while retries:
             retries -= 1
@@ -231,40 +321,50 @@
             display(tree)
             self.fail('\n'.join(errormsgs))
 
-    def go(self, stop, *threads):
+    def go(self, stop, commitdict, *threads):
         # Run the threads
         for t in threads:
             t.start()
-        time.sleep(self.DELAY)
+        delay = self.MINTIME
+        start = time.time()
+        while time.time() - start <= self.MAXTIME:
+            time.sleep(delay)
+            delay = 2.0
+            if len(commitdict) >= len(threads):
+                break
+            # Some thread still hasn't managed to commit anything.
         stop.set()
         for t in threads:
             t.cleanup()
 
     def testConcurrentUpdates2Storages(self):
         self._storage = storage1 = self.openClientStorage()
-        storage2 = self.openClientStorage(cache="2")
-        db1 = self.db(storage1)
+        storage2 = self.openClientStorage()
+        db1 = DB(storage1)
+        db2 = DB(storage2)
         stop = threading.Event()
 
         cn = db1.open()
         tree = cn.root()["tree"] = OOBTree()
         get_transaction().commit()
 
-        db2 = self.db(storage2)
         # Run two threads that update the BTree
-        t1 = StressThread(self, db1, stop, 1, 1)
-        t2 = StressThread(self, db2, stop, 2, 2)
-        self.go(stop, t1, t2)
+        cd = {}
+        t1 = self.StressThread(self, db1, stop, 1, cd, 1)
+        t2 = self.StressThread(self, db2, stop, 2, cd, 2)
+        self.go(stop, cd, t1, t2)
 
         cn.sync()
         self._check_tree(cn, tree)
         self._check_threads(tree, t1, t2)
 
         cn.close()
+        db1.close()
+        db2.close()
 
     def testConcurrentUpdates1Storage(self):
         self._storage = storage1 = self.openClientStorage()
-        db1 = self.db(storage1)
+        db1 = DB(storage1)
         stop = threading.Event()
 
         cn = db1.open()
@@ -272,66 +372,111 @@
         get_transaction().commit()
 
         # Run two threads that update the BTree
-        t1 = StressThread(self, db1, stop, 1, 1, sleep=0.001)
-        t2 = StressThread(self, db1, stop, 2, 2, sleep=0.001)
-        self.go(stop, t1, t2)
+        cd = {}
+        t1 = self.StressThread(self, db1, stop, 1, cd, 1, sleep=0.001)
+        t2 = self.StressThread(self, db1, stop, 2, cd, 2, sleep=0.001)
+        self.go(stop, cd, t1, t2)
 
         cn.sync()
         self._check_tree(cn, tree)
         self._check_threads(tree, t1, t2)
 
         cn.close()
+        db1.close()
 
     def testConcurrentUpdates2StoragesMT(self):
         self._storage = storage1 = self.openClientStorage()
-        db1 = self.db(storage1)
+        db1 = DB(storage1)
+        db2 = DB(self.openClientStorage())
         stop = threading.Event()
 
         cn = db1.open()
         tree = cn.root()["tree"] = OOBTree()
         get_transaction().commit()
 
-        db2 = self.db(self.openClientStorage(cache="2"))
         # Run three threads that update the BTree.
         # Two of the threads share a single storage so that it
         # is possible for both threads to read the same object
         # at the same time.
 
-        t1 = StressThread(self, db1, stop, 1, 1, 3)
-        t2 = StressThread(self, db2, stop, 2, 2, 3, 0.001)
-        t3 = StressThread(self, db2, stop, 3, 3, 3, 0.001)
-        self.go(stop, t1, t2, t3)
+        cd = {}
+        t1 = self.StressThread(self, db1, stop, 1, cd, 1, 3)
+        t2 = self.StressThread(self, db2, stop, 2, cd, 2, 3, 0.001)
+        t3 = self.StressThread(self, db2, stop, 3, cd, 3, 3, 0.001)
+        self.go(stop, cd, t1, t2, t3)
 
         cn.sync()
         self._check_tree(cn, tree)
         self._check_threads(tree, t1, t2, t3)
 
         cn.close()
+        db1.close()
+        db2.close()
 
     def testConcurrentUpdatesInVersions(self):
         self._storage = storage1 = self.openClientStorage()
-        db1 = self.db(storage1)
+        db1 = DB(storage1)
+        db2 = DB(self.openClientStorage())
         stop = threading.Event()
 
         cn = db1.open()
         tree = cn.root()["tree"] = OOBTree()
         get_transaction().commit()
-        cn.close()
 
-        db2 = self.db(self.openClientStorage(cache="2"))
         # Run three threads that update the BTree.
         # Two of the threads share a single storage so that it
         # is possible for both threads to read the same object
         # at the same time.
 
-        t1 = VersionStressThread(self, db1, stop, 1, 1, 3)
-        t2 = VersionStressThread(self, db2, stop, 2, 2, 3, 0.001)
-        t3 = VersionStressThread(self, db2, stop, 3, 3, 3, 0.001)
-        self.go(stop, t1, t2, t3)
+        cd = {}
+        t1 = VersionStressThread(self, db1, stop, 1, cd, 1, 3)
+        t2 = VersionStressThread(self, db2, stop, 2, cd, 2, 3, 0.001)
+        t3 = VersionStressThread(self, db2, stop, 3, cd, 3, 3, 0.001)
+        self.go(stop, cd, t1, t2, t3)
 
-        cn = db1.open()
+        cn.sync()
         self._check_tree(cn, tree)
         self._check_threads(tree, t1, t2, t3)
 
         cn.close()
+        db1.close()
+        db2.close()
+
+    def testConcurrentLargeUpdates(self):
+        # Use 3 threads like the 2StorageMT test above.
+        self._storage = storage1 = self.openClientStorage()
+        db1 = DB(storage1)
+        db2 = DB(self.openClientStorage())
+        stop = threading.Event()
 
+        cn = db1.open()
+        tree = cn.root()["tree"] = OOBTree()
+        for i in range(0, 3000, 2):
+            tree[i] = 0
+        get_transaction().commit()
+
+        # Run three threads that update the BTree.
+        # Two of the threads share a single storage so that it
+        # is possible for both threads to read the same object
+        # at the same time.
+
+        cd = {}
+        t1 = LargeUpdatesThread(self, db1, stop, 1, cd, 1, 3, 0.001)
+        t2 = LargeUpdatesThread(self, db2, stop, 2, cd, 2, 3, 0.001)
+        t3 = LargeUpdatesThread(self, db2, stop, 3, cd, 3, 3, 0.001)
+        self.go(stop, cd, t1, t2, t3)
+
+        cn.sync()
+        self._check_tree(cn, tree)
+
+        # Purge the tree of the dummy entries mapping to 0.
+        losers = [k for k, v in tree.items() if v == 0]
+        for k in losers:
+            del tree[k]
+        get_transaction().commit()
+
+        self._check_threads(tree, t1, t2, t3)
+
+        cn.close()
+        db1.close()
+        db2.close()




More information about the Zope3-Checkins mailing list