[Zope-Checkins] CVS: Zope/lib/python/Products/Sessions/tests - testSessionDataManager.py:1.7

Chris McDonough chrism@zope.com
Sat, 17 Nov 2001 17:46:36 -0500


Update of /cvs-repository/Zope/lib/python/Products/Sessions/tests
In directory cvs.zope.org:/tmp/cvs-serv14333

Modified Files:
	testSessionDataManager.py 
Log Message:
Broke stress tests out from unit tests.


=== Zope/lib/python/Products/Sessions/tests/testSessionDataManager.py 1.6 => 1.7 ===
 
     def testBadExternalSDCPath(self):
-        self.app.session_data_manager.REQUEST['REQUEST_METHOD'] = 'GET' # fake out webdav
+        # fake out webdav
+        self.app.session_data_manager.REQUEST['REQUEST_METHOD'] = 'GET'
         self.app.session_data_manager.setContainerPath('/fudgeffoloo')
         try:
             self.app.session_data_manager.getSessionData()
@@ -260,123 +261,9 @@
         sd.set('foo', aq_wrapped)
         self.assertRaises(UnpickleableError, get_transaction().commit)
 
-class TestMultiThread(TestCase):
-    def testNonOverlappingSids(self):
-        readers = []
-        writers = []
-        readiters = 20
-        writeiters = 5
-        readout = []
-        writeout = []
-        numreaders = 20
-        numwriters = 5
-        sdm_name = 'session_data_manager'
-        db = _getDB()
-        for i in range(numreaders):
-            thread = ReaderThread(db, readiters, sdm_name)
-            readers.append(thread)
-        for i in range(numwriters):
-            thread = WriterThread(db, writeiters, sdm_name)
-            writers.append(thread)
-        for thread in readers:
-            thread.start()
-            time.sleep(0.1)
-        for thread in writers:
-            thread.start()
-            time.sleep(0.1)
-        while threading.activeCount() > 1:
-            time.sleep(1)
-        
-        for thread in readers:
-            assert thread.out == [], thread.out
-
-    def testOverlappingSids(self):
-        readers = []
-        writers = []
-        readiters = 20
-        writeiters = 5
-        readout = []
-        writeout = []
-        numreaders = 20
-        numwriters = 5
-        sdm_name = 'session_data_manager'
-        db = _getDB()
-        for i in range(numreaders):
-            thread = ReaderThread(db, readiters, sdm_name)
-            readers.append(thread)
-        for i in range(numwriters):
-            thread = WriterThread(db, writeiters, sdm_name)
-            writers.append(thread)
-        for thread in readers:
-            thread.start()
-            time.sleep(0.1)
-        for thread in writers:
-            thread.start()
-            time.sleep(0.1)
-        while threading.activeCount() > 1:
-            time.sleep(1)
-        
-        for thread in readers:
-            assert thread.out == [], thread.out
-
-class BaseReaderWriter(threading.Thread):
-    def __init__(self, db, iters, sdm_name):
-        self.conn = db.open()
-        self.app = self.conn.root()['Application']
-        self.app = makerequest.makerequest(self.app)
-        token = self.app.browser_id_manager._getNewBrowserId()
-        self.app.REQUEST.browser_id_ = token
-        self.iters = iters
-        self.sdm_name = sdm_name
-        self.out = []
-        threading.Thread.__init__(self)
-
-    def run(self):
-        i = 0
-        try:
-            while 1:
-                try:
-                    self.run1()
-                    return
-                except ConflictError:
-                    i = i + 1
-                    #print "conflict %d" % i
-                    if i > 3: raise
-        finally:
-            self.conn.close()
-            del self.app
-            
-class ReaderThread(BaseReaderWriter):
-    def run1(self):
-        session_data_manager = getattr(self.app, self.sdm_name)
-        data = session_data_manager.getSessionData(create=1)
-        t = time.time()
-        data[t] = 1
-        get_transaction().commit()
-        for i in range(self.iters):
-            data = session_data_manager.getSessionData()
-            if not data.has_key(t):
-                self.out.append(1)
-            time.sleep(whrandom.choice(range(3)))
-            get_transaction().commit()
-
-class WriterThread(BaseReaderWriter):
-    def run1(self):
-        session_data_manager = getattr(self.app, self.sdm_name)
-        for i in range(self.iters):
-            data = session_data_manager.getSessionData()
-            data[time.time()] = 1
-            n = whrandom.choice(range(3))
-            time.sleep(n)
-            if n % 2 == 0:
-                get_transaction().commit()
-            else:
-                get_transaction().abort()
-
 def test_suite():
     test_datamgr = makeSuite(TestSessionManager, 'test')
-    test_multithread = makeSuite(TestMultiThread, 'test')
-    suite = TestSuite((test_datamgr, test_multithread))
+    suite = TestSuite((test_datamgr,))
     return suite
 
 if __name__ == '__main__':