[Zope-Checkins] CVS: Products/EventListenerTool/tests - __init__.py:1.1 testListenerTool.py:1.1

Chris McDonough chrism@zope.com
Tue, 7 Jan 2003 13:03:59 -0500


Update of /cvs-repository/Products/EventListenerTool/tests
In directory cvs.zope.org:/tmp/cvs-serv6077/tests

Added Files:
	__init__.py testListenerTool.py 
Log Message:
Add tests.


=== Added File Products/EventListenerTool/tests/__init__.py ===
# placeholder


=== Added File Products/EventListenerTool/tests/testListenerTool.py ===
import unittest
import Zope

from OFS.SimpleItem import SimpleItem

from Interface import Interface

from Products.CMFCore.tests.base.testcase import SecurityRequestTest

class IDummyEvent( Interface ):
    pass

class DummyEvent:

    __implements__ = ( IDummyEvent, )
    __allow_access_to_unprotected_subobjects__ = 1

    def __init__( self ):

        self.handled_by = {}

class DebugError( Exception ):
    pass

class Counter( SimpleItem ):

    _count = 0

    def __call__( self, event ):

        self._count += 1

    def getCount( self ):

        return self._count

class EventSetupTestBase:

    _old_event_registry_lookup = None
    _old_logger = None
    _logged = None

    def _prepEventRegistry( self ):

        _MONIKER = 'IDummyEvent'
        def _lookup( moniker, _MONIKER=_MONIKER ):
            return moniker == _MONIKER and IDummyEvent or None

        from Products.BCWorkbench.Framework.ListenerTool \
            import _setEventRegistryLookup

        self._old_event_registry_lookup = _setEventRegistryLookup( _lookup )

        return _MONIKER

    def _restoreEventRegistry( self ):

        from Products.BCWorkbench.Framework.ListenerTool \
            import _setEventRegistryLookup

        if self._old_event_registry_lookup is not None:
            _setEventRegistryLookup( self._old_event_registry_lookup )

    def _prepLogger( self ):

        from Products.BCWorkbench.Framework.ListenerTool \
            import _setLogger

        self._old_logger = _setLogger( self._fauxLog )

    def _restoreLogger( self ):

        from Products.BCWorkbench.Framework.ListenerTool \
            import _setLogger

        if self._old_logger is not None:
            _setLogger( self._old_logger )

    def _fauxLog( self
                , subsystem
                , severity
                , summary
                , detail=''
                , error=None
                , reraise=None
                ):

        if self._logged is None:
            self._logged = []

        self._logged.append( { 'subsystem' : subsystem
                             , 'severity' : severity
                             , 'summary' : summary
                             , 'detail' : detail
                             , 'error' : error
                             , 'reraise' : reraise
                             } )

    def _prepRootObject( self ):

        class Root( SimpleItem ):

            first_action_called = second_action_called = 0

            def first_action( self, event ):
                event.handled_by[ 'first_action' ] = 1

            def second_action( self, event ):
                event.handled_by[ 'second_action' ] = 1

            def raises_error( self, event ):
                raise DebugError, event

        return Root()

class ListenerRuleTests( unittest.TestCase
                       , EventSetupTestBase
                       ):

    def _getTargetClass( self ):

        from Products.BCWorkbench.Framework.ListenerTool \
            import ListenerRule

        return ListenerRule

    def tearDown( self ):

        self._restoreEventRegistry()
        self._restoreLogger()

    def _makeOne( self, *args, **kw ):

        return self._getTargetClass()( *args, **kw )

    def test_verify_IEventListenerRule( self ):

        from Interface.Verify import verifyClass
        from Products.BCWorkbench import IEventListenerRule

        verifyClass( IEventListenerRule, self._getTargetClass() )

    def test_verify_ISubscriber( self ):

        from Interface.Verify import verifyClass
        from Products.Event.ISubscriber import ISubscriber

        verifyClass( ISubscriber, self._getTargetClass() )

    def test_ctor( self ):

        MONIKER = 'foo.bar.baz'
        PASSALL = 'python: 1'
        ACTIONS = ( 'foo', 'bar' )

        rule = self._makeOne( moniker=MONIKER
                            , condition=PASSALL
                            , actions=ACTIONS
                            )

        self.assertEqual( rule.getMoniker(), MONIKER )
        self.assertEqual( rule.getCondition(), PASSALL )
        self.assertEqual( rule.listActions(), ACTIONS )

    def test_notify_passall( self ):

        MONIKER = self._prepEventRegistry()
        root = self._prepRootObject()
        event = DummyEvent()

        rule = self._makeOne( moniker=MONIKER
                            , condition='python: 1'
                            , actions=( 'first_action', 'second_action' )
                            ).__of__( root )

        rule.notify( event )

        self.assertEqual( event.handled_by.get( 'first_action', 0 ), 1 )
        self.assertEqual( event.handled_by.get( 'second_action', 0 ), 1 )
        self.assertEqual( event.handled_by.get( 'other_action', 0 ), 0 )

    def test_notify_passnone( self ):

        MONIKER = self._prepEventRegistry()
        root = self._prepRootObject()
        event = DummyEvent()

        rule = self._makeOne( moniker=MONIKER
                            , condition='python: 0'
                            , actions=( 'first_action', 'second_action' )
                            ).__of__( root )

        rule.notify( event )

        self.assertEqual( event.handled_by.get( 'first_action', 0 ), 0 )
        self.assertEqual( event.handled_by.get( 'second_action', 0 ), 0 )

    def test_notify_passwhen( self ):

        MONIKER = self._prepEventRegistry()
        root = self._prepRootObject()
        event = DummyEvent()

        rule = self._makeOne( moniker=MONIKER
                            , condition='event/should_pass'
                            , actions=( 'first_action', 'second_action' )
                            ).__of__( root )

        event.should_pass = 0

        rule.notify( event )

        self.assertEqual( event.handled_by.get( 'first_action', 0 ), 0 )
        self.assertEqual( event.handled_by.get( 'second_action', 0 ), 0 )

        event.should_pass = 1

        rule.notify( event )

        self.assertEqual( event.handled_by.get( 'first_action', 0 ), 1 )
        self.assertEqual( event.handled_by.get( 'second_action', 0 ), 1 )

    def test_notify_invalidaction( self ):

        from zLOG import PROBLEM

        MONIKER = self._prepEventRegistry()
        root = self._prepRootObject()
        self._prepLogger()
        event = DummyEvent()

        rule = self._makeOne( moniker=MONIKER
                            , condition='python: 1'
                            , actions=( 'first_action', 'nonesuch' )
                            ).__of__( root )

        rule._setId( 'invalidaction' )
        rule.notify( event )

        self.assertEqual( event.handled_by.get( 'first_action', 0 ), 1 )
        self.assertEqual( event.handled_by.get( 'second_action', 0 ), 0 )

        self.assertEqual( len( self._logged ), 1 )
        entry = self._logged[0]
        self.assertEqual( entry[ 'subsystem' ], 'Listener' )
        self.assertEqual( entry[ 'severity' ], PROBLEM )
        self.assertEqual( entry[ 'summary' ], "Rule invalidaction: "
                                              "invalid action 'nonesuch'"
                        )
        self.assertEqual( entry[ 'detail' ], '' )
        self.assertEqual( entry[ 'error' ], None )
        self.assertEqual( entry[ 'reraise' ], None )

    def test_notify_witherror( self ):

        from zLOG import PROBLEM

        MONIKER = self._prepEventRegistry()
        root = self._prepRootObject()
        self._prepLogger()
        event = DummyEvent()

        rule = self._makeOne( moniker=MONIKER
                            , condition='python: 1'
                            , actions=( 'first_action', 'raises_error' )
                            ).__of__( root )

        rule._setId( 'witherror' )
        rule.notify( event )

        self.assertEqual( event.handled_by.get( 'first_action', 0 ), 1 )
        self.assertEqual( event.handled_by.get( 'second_action', 0 ), 0 )

        self.assertEqual( len( self._logged ), 1 )
        entry = self._logged[0]
        self.assertEqual( entry[ 'subsystem' ], 'Listener' )
        self.assertEqual( entry[ 'severity' ], PROBLEM )
        self.assertEqual( entry[ 'summary' ], "Rule witherror: "
                                              "action 'raises_error' "
                                              "raised error"
                        )
        self.assertEqual( entry[ 'error' ][0], DebugError )
        self.assertEqual( entry[ 'reraise' ], None )


class ListenerToolTests( unittest.TestCase
                       , EventSetupTestBase
                       ):

    def _getTargetClass( self ):

        from Products.BCWorkbench.Framework.ListenerTool \
            import ListenerTool

        return ListenerTool

    def _makeOne( self, *args, **kw ):

        return self._getTargetClass()( *args, **kw )

    def test_verify_IEventListenerTool( self ):

        from Interface.Verify import verifyClass
        from Products.BCWorkbench import IEventListenerTool

        verifyClass( IEventListenerTool, self._getTargetClass() )

    def test_verify_ISubscriber( self ):

        from Interface.Verify import verifyClass
        from Products.Event.ISubscriber import ISubscriber

        verifyClass( ISubscriber, self._getTargetClass() )

    def test_empty( self ):

        tool = self._makeOne()

        self.assertEqual( len( tool.listRuleIds() ), 0 )
        self.assertRaises( KeyError, tool.getRule, 'foo' )
        self.assertRaises( KeyError, tool.deleteRule, 'foo' )

        self.failIf( tool.isActive() )
        self.assertRaises( ValueError, tool.activate ) # no event service
        self.assertRaises( ValueError, tool.deactivate ) # no event service

    def test_addRule( self ):

        from Products.BCWorkbench import IEventListenerRule

        tool = self._makeOne()
        MONIKER = self._prepEventRegistry()

        tool.addRule( 'foo', MONIKER, 'python: 1', () )

        self.assertEqual( len( tool.listRuleIds() ), 1 )
        self.failUnless( 'foo' in tool.listRuleIds() )

        rule = tool.getRule( 'foo' )

        self.failUnless( IEventListenerRule.isImplementedBy( rule ) )

    def test_deleteRule( self ):

        tool = self._makeOne()
        MONIKER = self._prepEventRegistry()

        tool.addRule( 'foo', MONIKER, 'python: 1', () )
        tool.addRule( 'bar', MONIKER, 'python: 0', () )
        tool.deleteRule( 'foo' )

        self.assertEqual( len( tool.listRuleIds() ), 1 )
        self.failIf( 'foo' in tool.listRuleIds() )
        self.failUnless( 'bar' in tool.listRuleIds() )

    def test_notify( self ):

        tool = self._makeOne()
        MONIKER = self._prepEventRegistry()

        tool.addRule( 'foo', MONIKER, 'event/for_foo | nothing'
                    , ( 'foo_called', ) )

        tool.addRule( 'bar', MONIKER, 'event/for_bar | nothing'
                    , ( 'bar_called', ) )

        foo_counter = Counter()
        tool._setObject( 'foo_called', foo_counter )

        bar_counter = Counter()
        tool._setObject( 'bar_called', bar_counter )

        event = DummyEvent()

        event.for_foo = 1
        event.for_bar = 0

        tool.notify( event )

        self.assertEqual( foo_counter.getCount(), 1 )
        self.assertEqual( bar_counter.getCount(), 0 )

        event.for_bar = 1

        tool.notify( event )

        self.assertEqual( foo_counter.getCount(), 2 )
        self.assertEqual( bar_counter.getCount(), 1 )

        event.for_foo = event.for_bar = 0

        tool.notify( event )

        self.assertEqual( foo_counter.getCount(), 2 )
        self.assertEqual( bar_counter.getCount(), 1 )


class DummyEventsTool( SimpleItem ):

    def __init__( self ):

        self._subscriptions = {}

    def subscribe( self, subscriber, event_type, filter=None ):

        interested = self._subscriptions.setdefault( event_type, [] )
        interested.append( ( subscriber, filter ) )

    def unsubscribe( self, subscriber, event_type=None, filter=None ):

        interested = self._subscriptions.setdefault( event_type, [] )
        interested.remove( ( subscriber, filter ) )

    def listSubscriptions( self, subscriber, event_type=None ):

        interested = self._subscriptions.setdefault( event_type, [] )
        return [ x for x in interested if x[0] is subscriber ]

class ListenerToolTests_with_context( SecurityRequestTest ):

    def _getTargetClass( self ):

        from Products.BCWorkbench.Framework.ListenerTool \
            import ListenerTool

        return ListenerTool

    def _makeOne( self, *args, **kw ):

        return self._getTargetClass()( *args, **kw )

    def _initEventTool( self ):

        self.root.portal_events = DummyEventsTool()
        return self.root.portal_events

    def test_activate( self ):

        evtool = self._initEventTool()
        tool = self._makeOne().__of__( self.root )

        tool.activate()
        self.failUnless( tool.isActive() )
        self.assertEqual( len( evtool._subscriptions[ None ] ), 1 )

        tool.activate()
        self.assertEqual( len( evtool._subscriptions[ None ] ), 1 )

    def test_deactivate( self ):

        evtool = self._initEventTool()
        tool = self._makeOne().__of__( self.root )

        tool.activate()
        tool.deactivate()
        self.assertEqual( len( evtool._subscriptions[ None ] ), 0 )