[CMF-checkins] CVS: CMF - test_Discussions.py:1.3

tseaver@digicool.com tseaver@digicool.com
Sun, 17 Jun 2001 15:21:07 -0400 (EDT)


Update of /cvs-repository/CMF/CMFDefault/tests
In directory korak.digicool.com:/tmp/cvs-serv28068/CMFDefault/tests

Modified Files:
	test_Discussions.py 
Log Message:
 - Merge 'discussiongeddon' patch.


--- Updated File test_Discussions.py in package CMF --
--- test_Discussions.py	2001/06/04 19:16:04	1.2
+++ test_Discussions.py	2001/06/17 19:21:06	1.3
@@ -1,18 +1,19 @@
 import Zope
-import unittest
-import re, new
-import Globals
-from Globals import Persistent 
-from Acquisition import Implicit
+import unittest, string
+
 from AccessControl import SecurityManager
-from Products.CMFDefault.Document import Document
+from Acquisition import aq_base, aq_inner
+
 from Products.CMFCore.CatalogTool import CatalogTool
-from Products.CMFDefault.Discussions import DiscussionResponse
-from Products.CMFDefault.DiscussionTool import DiscussionTool
-from Products.CMFDefault.URLTool import URLTool
+from Products.CMFCore.TypesTool import TypesTool, FactoryTypeInformation
 from Products.CMFCore.WorkflowTool import WorkflowTool
-from Products.CMFDefault.DiscussionItem import *
 
+from Products.CMFDefault.DiscussionTool import DiscussionTool\
+                                             , DiscussionNotAllowed
+
+from Products.CMFDefault.Document import Document
+from Products.CMFDefault.URLTool import URLTool
+
 class UnitTestSecurityPolicy:
     """
         Stub out the existing security policy for unit testing purposes.
@@ -20,75 +21,195 @@
     #
     #   Standard SecurityPolicy interface
     #
-    def validate(self, accessed, container, name, value, context, roles,
-                 *args, **kw):
+    def validate( self, accessed, container, name, value, context, roles,
+                 *args, **kw ):
         return 1
     
-    def checkPermission( self, permission, object, context) :
+    def checkPermission( self, permission, object, context ) :
         return 1
 
-
-class DiscussionItem(Document, DiscussionResponse):
-    """
-    """
-    meta_type = 'DItem'
-    after_add_called = before_delete_called = 0
-
-    def __init__( self, id, catalog=0 ):
-        self.id = id
-        self.reset()
-        self.catalog = catalog
-
-    def manage_afterAdd( self, item, container ):
-        self.after_add_called = 1
-        if self.catalog:
-            Document.manage_afterAdd( self, item, container )
-
-    def manage_beforeDelete( self, item, container ):
-        self.before_delete_called = 1
-        if self.catalog:
-            DiscussionItem.DiscussionItemContainer.manage_beforeDelete( self, item, container )
-    
-    def reset( self ):
-        self.after_add_called = self.before_delete_called = 0
-
 
-class DiscussionTests(unittest.TestCase):
+class DiscussionTests( unittest.TestCase ):
 
-    def setUp(self):
+    def setUp( self ):
         get_transaction().begin()
         self._policy = UnitTestSecurityPolicy()
-        SecurityManager.setSecurityPolicy(self._policy)
+        SecurityManager.setSecurityPolicy( self._policy )
         self.root = Zope.app()
+        self.root._setObject( 'portal_discussion', DiscussionTool() )
+        self.discussion_tool = self.root.portal_discussion
+        self.root._setObject( 'portal_catalog', CatalogTool() )
+        self.catalog_tool = self.root.portal_catalog
+        self.root._setObject( 'portal_url', URLTool() )
+        self.url_tool = self.root.portal_url
+        self.root._setObject( 'portal_workflow', WorkflowTool() ) 
+        self.workflow_tool = self.root.portal_workflow
+        self.root._setObject( 'portal_types', TypesTool() )
+        types_tool = self.types_tool = self.root.portal_types
     
-    def tearDown(self):
+    def tearDown( self ):
+        del self.root
+        del self.discussion_tool
+        del self.catalog_tool
+        del self.url_tool
+        del self.workflow_tool
+        del self.types_tool
         get_transaction().abort()
 
-    def test_deletePropagation(self):
-        portal_catalog = CatalogTool()
-        self.root._setObject('portal_catalog', portal_catalog)
-        catalog = self.root.portal_catalog
-        portal_discussion = DiscussionTool()
-        self.root._setObject('portal_discussion', portal_discussion)
-        portal_url = URLTool()
-        self.root._setObject('portal_url', portal_url)
-        portal_workflow = WorkflowTool()
-        self.root._setObject('portal_workflow', portal_workflow) 
-        test = Document('test')
-        self.root._setObject('test', test)
+    def test_policy( self ):
+
+        self.root._setObject( 'test', Document( 'test' ) )
         test = self.root.test
+        self.assertRaises( DiscussionNotAllowed
+                         , self.discussion_tool.getDiscussionFor
+                         , test
+                         )
+        assert getattr( test, 'talkback', None ) is None
+
         test.allow_discussion = 1
-        assert len(catalog) == 1
-        portal_discussion.createDiscussionFor(test)
-        talkback = test.talkback
-        talkback.createReply(title='test'
-                             , text='blah'
+        assert self.discussion_tool.getDiscussionFor( test )
+        assert test.talkback
+
+        del test.talkback
+        del test.allow_discussion
+        FTI = FactoryTypeInformation
+        self.types_tool._setObject( 'Document'
+                                  , FTI( 'Document'
+                                       , meta_type=Document.meta_type
+                                       , product='CMFDefault'
+                                       , factory='addDocument'
+                                       )
+                                  )
+        self.assertRaises( DiscussionNotAllowed
+                         , self.discussion_tool.getDiscussionFor
+                         , test
+                         )
+        assert getattr( test, 'talkback', None ) is None
+
+        self.types_tool.Document.allow_discussion = 1
+        assert self.discussion_tool.getDiscussionFor( test )
+        assert test.talkback
+
+        del test.talkback
+        self.types_tool.Document.allow_discussion = 0
+        self.assertRaises( DiscussionNotAllowed
+                         , self.discussion_tool.getDiscussionFor
+                         , test
+                         )
+        assert getattr( test, 'talkback', None ) is None
+
+        test.allow_discussion = 1
+        assert self.discussion_tool.getDiscussionFor( test )
+        assert test.talkback
+    
+    def test_nestedReplies( self ):
+        self.root._setObject( 'test', Document( 'test' ) )
+        test = self.root.test
+        test.allow_discussion = 1
+        talkback = self.discussion_tool.getDiscussionFor( test )
+        assert talkback._getDiscussable() == test
+        assert talkback._getDiscussable( outer=1 ) == test
+        assert not talkback.hasReplies()
+        assert len( talkback.getReplies() ) == 0
+
+        talkback.createReply( title='test'
+                            , text='blah'
+                            )
+        assert talkback.hasReplies()
+        assert len( talkback.getReplies() ) == 1
+
+        reply1 = talkback.getReplies()[0]
+        items = talkback._container.items()
+        assert items[0][0] == reply1.getId()
+        assert reply1.inReplyTo() == test
+
+        parents = reply1.parentsInThread()
+        assert len( parents ) == 1
+        assert test in parents
+
+        talkback1 = self.discussion_tool.getDiscussionFor( reply1 )
+        assert talkback == talkback1
+        assert len( talkback1.getReplies() ) == 0
+        assert len( talkback.getReplies() ) == 1
+
+        talkback1.createReply( title='test2'
+                             , text='blah2'
                              )
-        foo = talkback.getReplies()[0]
-        assert len(catalog) == 2
-        self.root._delObject('test')
-        assert len(catalog) == 0, len(catalog)
+        assert len( talkback._container ) == 2
+        assert talkback1.hasReplies()
+        assert len( talkback1.getReplies() ) == 1
+        assert len( talkback.getReplies() ) == 1
+
+        reply2 = talkback1.getReplies()[0]
+        assert reply2.inReplyTo() == reply1
+
+        parents = reply2.parentsInThread()
+        assert len( parents ) == 2
+        assert parents[ 0 ] == test
+        assert parents[ 1 ] == reply1
+
+        parents = reply2.parentsInThread( 1 )
+        assert len( parents ) == 1
+        assert parents[ 0 ] == reply1
 
+    def test_itemCatloguing( self ):
+
+        self.root._setObject( 'test', Document( 'test' ) )
+        test = self.root.test
+        catalog = self.catalog_tool._catalog
+        test.allow_discussion = 1
+        assert len( self.catalog_tool ) == 1
+        assert has_path( catalog, test.getPhysicalPath() )
+        talkback = self.discussion_tool.getDiscussionFor( test )
+        assert talkback.getPhysicalPath() == ( '', 'test', 'talkback' ), \
+            talkback.getPhysicalPath()
+        talkback.createReply( title='test'
+                            , text='blah'
+                            )
+        assert len( self.catalog_tool ) == 2
+        for reply in talkback.getReplies():
+            assert has_path( catalog, reply.getPhysicalPath() )
+            assert has_path( catalog
+                           , '/test/talkback/%s' % reply.getId() )
+
+        reply1 = talkback.getReplies()[0]
+        talkback1 = self.discussion_tool.getDiscussionFor( reply1 )
+        talkback1.createReply( title='test2'
+                             , text='blah2'
+                             )
+        for reply in talkback.getReplies():
+            assert has_path( catalog, reply.getPhysicalPath() )
+            assert has_path( catalog
+                           , '/test/talkback/%s' % reply.getId() )
+        for reply in talkback1.getReplies():
+            assert has_path( catalog, reply.getPhysicalPath() )
+            assert has_path( catalog
+                           , '/test/talkback/%s' % reply.getId() )
+
+    def test_deletePropagation( self ):
+
+        self.root._setObject( 'test', Document( 'test' ) )
+        test = self.root.test
+
+        test.allow_discussion = 1
+        talkback = self.discussion_tool.getDiscussionFor( test )
+        talkback.createReply( title='test'
+                            , text='blah'
+                            )
+        self.root._delObject( 'test' )
+        assert len( self.catalog_tool ) == 0
+
+def has_path( catalog, path ):
+    """
+        Verify that catalog has an object at path.
+    """
+    if type( path ) is type( () ):
+        path = string.join( path, '/' )
+    rids = map( lambda x: x.data_record_id_, catalog.searchResults() )
+    for rid in rids:
+        if catalog.getpath( rid ) == path:
+            return 1
+    return 0
 
 def test_suite():
     suite = unittest.TestSuite()
@@ -96,7 +217,7 @@
     return suite
 
 def run():
-    unittest.TextTestRunner().run(test_suite())
+    unittest.TextTestRunner().run( test_suite() )
 
 if __name__ == '__main__':
     run()