[Zope-CVS] CVS: Packages/pypes/pypes/tests - test_event.py:1.6 test_identity.py:1.12

Casey Duncan casey at zope.com
Wed Mar 3 23:29:47 EST 2004


Update of /cvs-repository/Packages/pypes/pypes/tests
In directory cvs.zope.org:/tmp/cvs-serv12779/tests

Modified Files:
	test_event.py test_identity.py 
Log Message:
Use persistent weakrefs for event listener registrations
Add event service persistence tests


=== Packages/pypes/pypes/tests/test_event.py 1.5 => 1.6 ===
--- Packages/pypes/pypes/tests/test_event.py:1.5	Mon Feb  9 16:27:35 2004
+++ Packages/pypes/pypes/tests/test_event.py	Wed Mar  3 23:29:46 2004
@@ -20,6 +20,7 @@
 from persistent import Persistent
 from pypes.exceptions import VetoEvent
 from random import choice
+from pypes.tests.common import PersistenceTest
     
 class TestListener(Persistent):
     notifies = 0
@@ -108,6 +109,10 @@
         o = TestListener()
         self.assertRaises(
             TypeError, self.event.registerListener, o, 'notify', [])
+    
+    def testRegisterNotPersistent(self):
+        self.assertRaises(
+            TypeError, self.event.registerListener, {}, 'pop', [])
         
     def testUnregister(self):
         o = TestListener()
@@ -309,6 +314,36 @@
         self.event.unregisterListener(o, 'notify', MessageA)
         self.assertRaises(
             StopIteration, self.event.iterMessageTypes().next)        
+
+class TestEventServicePersistence(PersistenceTest):
+    
+    def testListenerRegistrationsPersist(self):
+        from pypes.event import EventService
+        r1 = self.db.open().root()
+        event = r1['event'] = EventService()
+        test = r1['test'] = TestListener()
+        event.registerListener(test, 'notify', MessageA)
+        get_transaction().commit()
+        
+        r2 = self.db.open().root()
+        event = r2['event']
+        event.send(MessageA())
+        event.send(MessageB())
+        self.assertEqual(r2['test'].notifies, 1)
+        get_transaction().abort()
+        
+    def testListenersWeaklyReferenced(self):
+        self.testListenerRegistrationsPersist()
+        r1 = self.db.open().root()
+        test = r1['test']
+        del r1['test']
+        get_transaction().commit()
+        
+        r2 = self.db.open().root()
+        event = r2['event']
+        event.send(MessageA())
+        self.assertEqual(test.notifies, 0)
+        get_transaction().abort()
         
 class TestMessages(unittest.TestCase):
     


=== Packages/pypes/pypes/tests/test_identity.py 1.11 => 1.12 ===
--- Packages/pypes/pypes/tests/test_identity.py:1.11	Mon Mar  1 23:14:27 2004
+++ Packages/pypes/pypes/tests/test_identity.py	Wed Mar  3 23:29:46 2004
@@ -203,7 +203,7 @@
             ids.insert(register(TestClass()))
         self.failIf(difference(OOTreeSet(self.identity.idSet()), ids))
         
-class TestListener(object):
+class TestListener(Persistent):
     
     def __init__(self):
         self.received = []
@@ -211,7 +211,7 @@
     def receive(self, msg):
         self.received.append(msg)
         
-class TestIdentityServiceEvents(PypesTestCase):
+class TestIdentityServiceEvents(PypesTestCase, Persistent):
     
     def testIdRegisteredEvent(self):
         from pypes import services




More information about the Zope-CVS mailing list