[Zope-CVS] CVS: Products/PluggableAuthService/tests - test_Caching.py:1.2 test_PluggableAuthService.py:1.14

Jens Vagelpohl jens at dataflake.org
Mon Nov 8 17:35:45 EST 2004


Update of /cvs-repository/Products/PluggableAuthService/tests
In directory cvs.zope.org:/tmp/cvs-serv5460/tests

Modified Files:
	test_PluggableAuthService.py 
Added Files:
	test_Caching.py 
Log Message:
- merge the jens-implement_caching_branch . For some details please
  see doc/caching.stx.



=== Products/PluggableAuthService/tests/test_Caching.py 1.1 => 1.2 ===
--- /dev/null	Mon Nov  8 17:35:45 2004
+++ Products/PluggableAuthService/tests/test_Caching.py	Mon Nov  8 17:35:45 2004
@@ -0,0 +1,205 @@
+##############################################################################
+#
+# Copyright (c) 2001 Zope Corporation and Contributors. All Rights
+# Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL).  A copy of the ZPL should accompany this
+# distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+import unittest
+
+from Acquisition import aq_base
+from OFS.Cache import isCacheable
+
+from Products.StandardCacheManagers.RAMCacheManager import RAMCacheManager
+
+class FauxRequest:
+
+    def __init__( self, steps=(), **kw ):
+
+        self.steps = steps
+        self._dict = {}
+        self._dict.update( kw )
+
+    def get( self, key, default=None ):
+
+        return self._dict.get( key, default )
+
+    def _authUserPW( self ):
+        form = self.get( 'form' )
+        return ( form.get( 'login' ), form.get( 'password' ) )
+
+    def __getitem__( self, key ):
+
+        return self._dict[ key ]
+
+    def __setitem__( self, key, value ):
+
+        self._dict[ key ] = value
+
+class PluggableAuthServiceCachingTests( unittest.TestCase ):
+
+    def tearDown( self ):
+        pass
+
+
+    def _getTargetClass( self ):
+
+        from Products.PluggableAuthService.PluggableAuthService \
+            import PluggableAuthService
+
+        return PluggableAuthService
+
+    def _makeOne( self, plugins=None, *args, **kw ):
+
+        zcuf = self._getTargetClass()( *args, **kw )
+
+        if plugins is not None:
+            zcuf._setObject( 'plugins', plugins )
+
+        rcm = RAMCacheManager('ramcache')
+        zcuf._setObject('ramcache', rcm)
+
+        return zcuf
+
+    def _makePlugins( self, plugin_type_info=None ):
+
+        from Products.PluggableAuthService.PluggableAuthService \
+            import _PLUGIN_TYPE_INFO
+        from Products.PluginRegistry.PluginRegistry import PluginRegistry
+
+        if plugin_type_info is None:
+            plugin_type_info = _PLUGIN_TYPE_INFO
+
+        reg = PluginRegistry( plugin_type_info=plugin_type_info )
+        reg._setId( 'plugins' )
+        reg._plugins = {}
+
+        return reg
+
+    def _makeAndFill(self):
+
+        from Products.PluggableAuthService.plugins import ZODBUserManager
+        from Products.PluggableAuthService.plugins import ZODBRoleManager
+
+        plugin_registry = self._makePlugins()
+        user_source = ZODBUserManager.ZODBUserManager('zodb_users')
+        roles_source = ZODBRoleManager.ZODBRoleManager('zodb_roles')
+        pas_instance = self._makeOne(plugins=plugin_registry)
+        pas_instance._setObject('zodb_users', user_source)
+        pas_instance._setObject('zodb_roles', roles_source)
+
+        return pas_instance
+
+    def test_empty( self ):
+        zcuf = self._makeOne()
+        rcm = getattr(zcuf, 'ramcache')
+
+        # This is needed because some underlying ZCacheable code wants to
+        # use self.REQUEST :/
+        setattr(rcm, 'REQUEST', FauxRequest())
+
+        # Make sure the PAS instance itself is Cacheable
+        self.assert_(isCacheable(zcuf))
+
+        # Make sure the PAS instance is not associated with any cache manager
+        # by default
+        self.assert_(zcuf.ZCacheable_getManager() is None)
+
+        # Make sure the RAMCacheManager is empty
+        self.assert_(len(rcm.getCacheReport()) == 0)
+
+    def test_caching_in_PAS(self):
+        zcuf = self._makeAndFill()
+        rcm = getattr(zcuf, 'ramcache')
+        plugin_registry = getattr(zcuf, 'plugins')
+        user_source = getattr(zcuf, 'zodb_users')
+        roles_source = getattr(zcuf, 'zodb_roles')
+
+        # This is needed because some underlying ZCacheable code wants to
+        # use self.REQUEST :/
+        setattr(zcuf, 'REQUEST', FauxRequest())
+
+        # First, we register the ZODBUserManager as a plugin suitable
+        # for storing and returning user objects and the ZODBRoleManager
+        # for roles. Basic scaffolding to be able to store and retrieve users.
+        from Products.PluggableAuthService.interfaces import plugins
+
+        plugin_registry.activatePlugin( plugins.IUserEnumerationPlugin
+                                      , user_source.getId()
+                                      )
+        plugin_registry.activatePlugin( plugins.IUserAdderPlugin
+                                      , user_source.getId()
+                                      )
+        plugin_registry.activatePlugin( plugins.IRolesPlugin
+                                      , roles_source.getId()
+                                      )
+        plugin_registry.activatePlugin( plugins.IRoleEnumerationPlugin
+                                      , roles_source.getId()
+                                      )
+        plugin_registry.activatePlugin( plugins.IRoleAssignerPlugin
+                                      , roles_source.getId()
+                                      )
+
+        # Now add a user and make sure it's there
+        zcuf._doAddUser('testlogin', 'secret', ['Member', 'Anonymous'], [])
+        self.failIf(zcuf.getUser('testlogin') is None)
+
+        # Then we activate caching for the PAS instance itself
+        zcuf.ZCacheable_setManagerId(rcm.getId())
+
+        # Make sure the PAS instance is associated with the cache
+        self.failUnless(aq_base(zcuf.ZCacheable_getManager()) is aq_base(rcm))
+
+        # Now we can see if the cache is getting used. Test for emptiness
+        # first, then retrieve a user, and the cache should have content.
+        # Then test again to see if the cache entries are being used.
+        # This is a bit nasty because I am relying on knowing the structure
+        # of the cache report, which is really an internal implementation
+        # detail.
+
+        # First check: The cache must be empty
+        report = rcm.getCacheReport()
+        self.failUnless(len(report) == 0)
+
+        # The user is being requested once. At this point there must be one
+        # entry for the PAS instance. The number of "misses" must be >0 because
+        # the first cache check will have failed. The number of cache hits must
+        # be zero.
+        zcuf.getUser('testlogin')
+        report = rcm.getCacheReport()
+        self.failUnless(len(report) == 1)
+        report_item = report[0]
+        firstpass_misses = report_item.get('misses')
+        firstpass_hits = report_item.get('hits')
+        firstpass_entries = report_item.get('entries')
+        self.failUnless(firstpass_misses > 0)
+        self.failUnless(firstpass_hits == 0)
+
+        # The user is requested again. This request should produce a cache hit,
+        # so the number of "misses" must have stayed the same as after the
+        # first pass, but the number of hits must now be >0. Also, the number
+        # of in-memory entries must have remained the same to prove that we are
+        # reusing the same cache entries.
+        zcuf.getUser('testlogin')
+        report = rcm.getCacheReport()
+        self.failUnless(len(report) == 1)
+        report_item = report[0]
+        self.failIf(report_item.get('misses') != firstpass_misses)
+        self.failUnless(report_item.get('hits') > firstpass_hits)
+        self.failIf(report_item.get('entries') != firstpass_entries)
+
+
+if __name__ == "__main__":
+    unittest.main()
+
+def test_suite():
+    return unittest.TestSuite((
+        unittest.makeSuite( PluggableAuthServiceCachingTests ),
+        ))


=== Products/PluggableAuthService/tests/test_PluggableAuthService.py 1.13 => 1.14 ===
--- Products/PluggableAuthService/tests/test_PluggableAuthService.py:1.13	Sat Oct 16 16:15:46 2004
+++ Products/PluggableAuthService/tests/test_PluggableAuthService.py	Mon Nov  8 17:35:45 2004
@@ -632,68 +632,6 @@
         self.assertEqual( len( user_ids ), 1 )
         self.assertEqual( user_ids[ 0 ][0], 'login__foo' )
 
-    def test__extractUserIds_cache( self ):
-
-        from Products.PluggableAuthService.interfaces.plugins \
-            import IExtractionPlugin, IAuthenticationPlugin
-
-        plugins = self._makePlugins()
-        zcuf = self._makeOne( plugins )
-
-        login = DummyPlugin()
-        directlyProvides( login, ( IExtractionPlugin, IAuthenticationPlugin ) )
-        login.extractCredentials = _extractLogin
-        login.authenticateCredentials = _authLogin
-
-        zcuf._setObject( 'login', login )
-
-        extra = DummyPlugin()
-        directlyProvides( extra, ( IExtractionPlugin, IAuthenticationPlugin ) )
-        extra.extractCredentials = _extractExtra
-        extra.authenticateCredentials = _authExtra
-
-        zcuf._setObject( 'extra', extra )
-
-        plugins = zcuf._getOb( 'plugins' )
-
-        plugins.activatePlugin( IExtractionPlugin, 'extra' )
-        plugins.activatePlugin( IExtractionPlugin, 'login' )
-        plugins.activatePlugin( IAuthenticationPlugin, 'extra' )
-        plugins.activatePlugin( IAuthenticationPlugin, 'login' )
-
-        cache = {}
-        request = FauxRequest( form={ 'login' : 'foo' , 'password' : 'bar' }
-                             , extra='qux'
-                             )
-
-
-        user_ids = zcuf._extractUserIds( request=request
-                                       , plugins=zcuf.plugins
-                                       , cache=cache
-                                       )
-
-        self.assertEqual( len( user_ids ), 2 )
-        self.assertEqual( user_ids[ 0 ][0], 'extra__qux' )
-        self.assertEqual( user_ids[ 1 ][0], 'login__foo' )
-
-        self.assertEqual( len( cache ), 2 )
-        self.failUnless( [ ('login__foo', 'foo') ] in cache.values() )
-        self.failUnless( [ ('extra__qux', 'qux') ] in cache.values() )
-
-        key = [ x[0] for x in cache.items()
-                      if x[1] == [('login__foo', 'foo')] ][0]
-        cache[ key ].append( ('forced__baz', 'baz' ) )
-
-        user_ids = zcuf._extractUserIds( request=request
-                                       , plugins=zcuf.plugins
-                                       , cache=cache
-                                       )
-
-        self.assertEqual( len( user_ids ), 3, user_ids )
-        self.assertEqual( user_ids[ 0 ][0], 'extra__qux' )
-        self.assertEqual( user_ids[ 1 ][0], 'login__foo' )
-        self.assertEqual( user_ids[ 2 ][0], 'forced__baz' )
-
     def test__getObjectContext_no_steps( self ):
 
         zcuf = self._makeOne()
@@ -1026,28 +964,6 @@
         self.assertEqual( len( groups ), 2 )
         self.failIf( 'bar:group3' in groups )
         self.failIf( 'bar:group4' in groups )
-
-    def test__findUser_from_cache( self ):
-
-        plugins = self._makePlugins()
-        zcuf = self._makeOne(plugins)
-        faux = FauxUser( 'faux' )
-        cache = { 'faux' : faux }
-
-        user = zcuf._findUser( plugins, 'faux', 'faux', cache )
-
-        self.failUnless( aq_base( user ) is faux )
-        self.failUnless( aq_parent( user ) is zcuf )
-
-    def test__findUser_loads_cache( self ):
-
-        plugins = self._makePlugins()
-
-        zcuf = self._makeOne(plugins)
-        cache = {}
-        user = zcuf._findUser( plugins, 'someone', 'someone', cache )
-
-        self.failUnless( aq_base(cache[ 'someone' ]) is aq_base( user ) )
 
     def test__authorizeUser_force_ok( self ):
 



More information about the Zope-CVS mailing list