[Zope-CVS] CVS: Packages/pypes/pypes/tests - test_graph.py:1.5

Casey Duncan casey at zope.com
Mon Aug 11 00:17:43 EDT 2003


Update of /cvs-repository/Packages/pypes/pypes/tests
In directory cvs.zope.org:/tmp/cvs-serv12515/tests

Modified Files:
	test_graph.py 
Log Message:
Add persistence and concurrency tests for graph


=== Packages/pypes/pypes/tests/test_graph.py 1.4 => 1.5 ===
--- Packages/pypes/pypes/tests/test_graph.py:1.4	Sun Aug 10 22:32:19 2003
+++ Packages/pypes/pypes/tests/test_graph.py	Sun Aug 10 23:17:38 2003
@@ -16,10 +16,11 @@
 $Id$"""
 
 import unittest
+import ZODB
 from random import randint
 from zope.interface.verify import verifyObject
 from Persistence import Persistent
-from common import PypesTestCase
+from common import PypesTestCase, PypesPersistentTest
 from pypes import graph
 
 
@@ -460,7 +461,53 @@
             self.assertEqual(k, str(v))
             self.assertEqual(m, v)
 
-
+class TestDirectedIdGraphPersistence(PypesPersistentTest):
+    
+    def setUp(self):
+        from pypes.graph import DirectedIdGraph
+        super(TestDirectedIdGraphPersistence, self).setUp()
+        self.root = self.db.open().root()
+        self.root['g'] = self.graph = DirectedIdGraph()
+        get_transaction().commit()
+    
+    def testBasicPersistence(self):
+        ob1, ob2, ob3 = self._newObj(), self._newObj(), self._newObj()
+        self.graph.edges.add(ob1, ob2)
+        self.graph.edges.add(ob1, ob3)
+        self.graph.edges.add(ob2, ob1)
+        self.graph.edges.add(ob2, ob3)
+        get_transaction().commit()
+        
+        g2 = self.db.open().root()['g']
+        self.assertEqual(len(g2.nodes), 3)
+        self.assertEqual(len(g2.edges), 4)
+        self.failUnless((ob1, ob2) in g2.edges)
+        self.failUnless((ob1, ob3) in g2.edges)
+        self.failUnless((ob2, ob1) in g2.edges)
+        self.failUnless((ob2, ob3) in g2.edges)
+        
+    def testConcurrentAddNodes(self):
+        ob1, ob2= self._newObj(), self._newObj()
+        g2 = self.db.open().root()['g']
+        list(g2.nodes); list(g2.edges)
+        
+        self.graph.nodes.add(ob1)
+        get_transaction().commit()
+        
+        g2.nodes.add(ob2)
+        get_transaction().commit()
+    
+    def testConcurrentAddEdges(self):
+        ob1, ob2, ob3 = self._newObj(), self._newObj(), self._newObj()
+        g2 = self.db.open().root()['g']
+        list(g2.nodes); list(g2.edges)
+        
+        self.graph.edges.add(ob1, ob2)
+        g2.edges.add(ob1, ob2)
+        get_transaction().commit()
+        
+        g2.edges.add(ob2, ob3)
+        get_transaction().commit()     
         
 if __name__ == '__main__':
     unittest.main()




More information about the Zope-CVS mailing list