[Zope-CVS] CVS: Products/Ape/lib/apelib/tests - testscanner.py:1.2.2.2

Shane Hathaway shane at zope.com
Sat Jan 3 15:36:40 EST 2004


Update of /cvs-repository/Products/Ape/lib/apelib/tests
In directory cvs.zope.org:/tmp/cvs-serv4617/tests

Modified Files:
      Tag: ape-0_8-branch
	testscanner.py 
Log Message:
Changed names in the scanner to make it a little easier to understand.


=== Products/Ape/lib/apelib/tests/testscanner.py 1.2.2.1 => 1.2.2.2 ===
--- Products/Ape/lib/apelib/tests/testscanner.py:1.2.2.1	Sat Dec 20 02:31:07 2003
+++ Products/Ape/lib/apelib/tests/testscanner.py	Sat Jan  3 15:36:39 2004
@@ -19,7 +19,7 @@
 import unittest
 from time import time
 
-from apelib.zodb3.scanner import ScanControl
+from apelib.zodb3.scanner import PoolScanControl, Scanner
 
 
 class FakeRepository:
@@ -31,7 +31,8 @@
             if repo is not self:
                 raise AssertionError, "repo must be self"
             if str(location) != location:
-                raise AssertionError, "location is expected to be a string"
+                raise AssertionError(
+                    "location %s is not a string" % repr(location))
             # Always report a change
             res[source] = 1001
         return res
@@ -42,13 +43,17 @@
     repo = FakeRepository()
 
     def getPollSources(self, oid):
-        return {(self.repo, oid): 10}
+        return {(self.repo, str(oid)): 10}
 
 
 class ScanControlTests(unittest.TestCase):
 
     def setUp(self):
-        ctl = self.ctl = ScanControl()
+        storage = self.storage = FakeStorage()
+        scanner = self.scanner = Scanner()
+        storage.scanner = scanner
+        scanner.storage = storage
+        ctl = self.ctl = PoolScanControl(storage)
         self.conn1 = ctl.newConnection()
         self.conn2 = ctl.newConnection()
 
@@ -84,32 +89,29 @@
 class ScannerTests(unittest.TestCase):
 
     def setUp(self):
-        ctl = self.ctl = ScanControl()
+        storage = self.storage = FakeStorage()
+        scanner = self.scanner = Scanner()
+        storage.scanner = scanner
+        scanner.storage = storage
+        ctl = self.ctl = PoolScanControl(storage)
         self.conn1 = ctl.newConnection()
         self.conn2 = ctl.newConnection()
-        self.scanner = ctl.scanner
         self.repo = FakeRepository()
 
     def testAddSource(self):
         new_sources = {(self.repo, '5'): 0}
-        self.scanner.setPollSources(5, new_sources)
+        self.scanner.afterLoad(5, new_sources)
         self.assertEqual(len(self.scanner.future), 1)
         self.assertEqual(self.scanner.future[5][0], new_sources)
 
-    def testChangeSources(self):
+    def testNoUpdatesWhenNotInvalidating(self):
+        # Don't change current except in scan(), where invalidation
+        # messages are possible.
         self.conn1.setOIDs([5])
 
-        old_sources = {(self.repo, '5'): 0}
-        self.scanner.setPollSources(5, old_sources)
-        self.assertEqual(len(self.scanner.future), 0)
-        self.assertEqual(len(self.scanner.current), 1)
-        self.assertEqual(self.scanner.current[5], old_sources)
-
-        new_sources = {(self.repo, '6'): 0, (self.repo, '7'): 0, }
-        self.scanner.setPollSources(5, new_sources)
-        self.assertEqual(len(self.scanner.future), 0)
-        self.assertEqual(len(self.scanner.current), 1)
-        self.assertEqual(self.scanner.current[5], new_sources)
+        sources = {(self.repo, '5'): 0}
+        self.scanner.afterLoad(5, sources)
+        self.assertNotEqual(self.scanner.current[5], sources)
 
     def testRemoveOID(self):
         self.conn1.setOIDs([5])
@@ -120,7 +122,7 @@
     def testScan(self):
         self.conn1.setOIDs([5])
         new_sources = {(self.repo, '6'): 0, (self.repo, '7'): 0, }
-        self.scanner.setPollSources(5, new_sources)
+        self.scanner.afterLoad(5, new_sources)
         to_invalidate = self.scanner.scan()
         self.assertEqual(len(to_invalidate), 1)
 
@@ -134,17 +136,15 @@
 
     def testFindNewSources(self):
         # Verify the scanner calls storage.getSources() and saves the result.
-        storage = FakeStorage()
-        self.scanner.setStorage(storage)
         self.conn1.setOIDs([5])
-        expect_sources = storage.getPollSources(5)
+        expect_sources = self.storage.getPollSources(5)
         self.assertEqual(self.scanner.current[5], expect_sources)
 
     def testUseCachedSources(self):
         # Verify the scanner uses previously cached sources when available.
         repo = FakeRepository()
         sources = {(repo, '999'): -1}
-        self.scanner.setPollSources(5, sources)
+        self.scanner.afterLoad(5, sources)
         self.conn1.setOIDs([5])
         self.assertEqual(self.scanner.current[5], sources)
 
@@ -152,10 +152,10 @@
         # Verify the scanner sees sources according to transactions.
         repo = FakeRepository()
         sources = {(repo, '999'): -1}
-        self.scanner.setPollSources(5, sources)
+        self.scanner.afterLoad(5, sources)
         self.conn1.setOIDs([5])
         sources_2 = {(repo, '999'): -2}
-        self.scanner.setUncommittedSources(123, 5, sources_2)
+        self.scanner.afterStore(5, 123, sources_2)
         # Uncommitted data should have no effect on sources
         self.assertEqual(self.scanner.current[5], sources)
         self.scanner.afterCommit(123)
@@ -170,7 +170,7 @@
         # Verify the scanner ignores sources on transaction abort.
         repo = FakeRepository()
         sources = {(repo, '999'): -2}
-        self.scanner.setUncommittedSources(123, 5, sources)
+        self.scanner.afterStore(5, 123, sources)
         self.assertEqual(self.scanner.uncommitted[123][5], sources)
         self.scanner.afterAbort(123)
         self.assert_(not self.scanner.uncommitted.has_key(123))




More information about the Zope-CVS mailing list