[Checkins] SVN: zope.component/branches/wosc-test-stacking/src/zope/component/test Implement stacking registries by chaining them with their __bases__

Wolfgang Schnerring wosc at wosc.de
Thu Mar 3 02:27:45 EST 2011


Log message for revision 120720:
  Implement stacking registries by chaining them with their __bases__
  
  There's quite some nasty business going on with redirecting getSiteManager.sethook, this is probably missing tests for edge cases one can't think of
  

Changed:
  U   zope.component/branches/wosc-test-stacking/src/zope/component/testing.py
  U   zope.component/branches/wosc-test-stacking/src/zope/component/tests.py

-=-
Modified: zope.component/branches/wosc-test-stacking/src/zope/component/testing.py
===================================================================
--- zope.component/branches/wosc-test-stacking/src/zope/component/testing.py	2011-03-03 07:25:28 UTC (rev 120719)
+++ zope.component/branches/wosc-test-stacking/src/zope/component/testing.py	2011-03-03 07:27:45 UTC (rev 120720)
@@ -15,13 +15,130 @@
 """
 
 # HACK to make sure basicmost event subscriber is installed
+import zope.component
 import zope.component.event
+import zope.component.registry
 
 # we really don't need special setup now:
 from zope.testing.cleanup import CleanUp as PlacelessSetup
 
+
 def setUp(test=None):
     PlacelessSetup().setUp()
 
+
 def tearDown(test=None):
     PlacelessSetup().tearDown()
+
+
+class RegistryStack(object):
+
+    def __init__(self):
+        self.stack = []
+        self.original_hook = None
+
+    def __call__(self, context=None):
+        new = self.stack[-1]
+        if self.original_hook:
+            original = self.original_hook(context)
+        else:
+            original = zope.component.getGlobalSiteManager()
+
+        if not self.find_in_bases(new, original):
+            bases = (new, original)
+            new = zope.component.registry.Components(
+                name='wraps %r' % list(bases), bases=bases)
+        return new
+
+    def sethook(self, newimplementation):
+        self.original_hook = newimplementation
+        self.original_sethook(self)
+
+    def find_in_bases(self, haystack, needle):
+        if needle in haystack.__bases__:
+            return True
+        for base in haystack.__bases__:
+            if self.find_in_bases(base, needle):
+                return True
+        return False
+
+    def push(self, item):
+        self.stack.append(item)
+
+    def pop(self):
+        return self.stack.pop()
+
+    def __len__(self):
+        return len(self.stack)
+
+_registry_stack = RegistryStack()
+
+
+def push_registry():
+    if not hasattr(_registry_stack, 'original_sethook'):
+        _registry_stack.original_sethook = (
+            zope.component.getSiteManager.sethook)
+    _make_sethook_overridable()
+
+    current = zope.component.getSiteManager()
+    new = zope.component.registry.Components(
+        name='wraps %r' % current, bases=(current,))
+    _registry_stack.push(new)
+
+    if zope.component.getSiteManager.sethook != _registry_stack.sethook:
+        current_hook = zope.component.getSiteManager.sethook(_registry_stack)
+        _registry_stack.sethook(current_hook)
+        zope.component.getSiteManager.sethook = _registry_stack.sethook
+
+
+def pop_registry():
+    _registry_stack.pop()
+
+    if not _registry_stack:
+        zope.component.getSiteManager.sethook(_registry_stack.original_hook)
+        zope.component.getSiteManager = _original_getsitemanager
+        zope.component.getSiteManager.sethook(_registry_stack.original_hook)
+        del _registry_stack.original_sethook
+
+
+_original_getsitemanager = zope.component.getSiteManager
+
+
+def _make_sethook_overridable():
+    # XXX Why oh why does everything have to be locked down so much?? We need
+    # to override sethook for the registry stacking, but neither zope.hookable
+    # nor zope.component.hookable allow us to do so. Herewith, a copy&pasted
+    # zope.component.hookable, only without the slots. *sigh*
+
+    class overridably_hookable(object):
+        original = property(lambda self: self.__original,)
+        implementation = property(lambda self: self.__implementation,)
+
+        def __init__(self, implementation):
+            self.__original = self.__implementation = implementation
+
+        def sethook(self, newimplementation):
+            (old, self.__implementation) = (
+                self.__implementation, newimplementation)
+            return old
+
+        def reset(self):
+            self.__implementation = self.__original
+
+        def __call__(self, *args, **kw):
+            return self.__implementation(*args, **kw)
+
+    try:
+        import zope.hookable
+        hook_module = zope.hookable
+    except ImportError:
+        import zope.component.hookable
+        hook_module = zope.component.hookable
+
+    if isinstance(zope.component.getSiteManager,
+                  getattr(hook_module, 'hookable')):
+        global _original_getsitemanager
+        _original_getsitemanager = zope.component.getSiteManager
+        zope.component.getSiteManager = overridably_hookable(
+            zope.component.getSiteManager.implementation)
+

Modified: zope.component/branches/wosc-test-stacking/src/zope/component/tests.py
===================================================================
--- zope.component/branches/wosc-test-stacking/src/zope/component/tests.py	2011-03-03 07:25:28 UTC (rev 120719)
+++ zope.component/branches/wosc-test-stacking/src/zope/component/tests.py	2011-03-03 07:27:45 UTC (rev 120720)
@@ -1681,6 +1681,67 @@
         reload(zope.component.zcml)
 
 
+class StackingTests(unittest.TestCase):
+
+    def test_pushed_registry_delegates_to_current(self):
+        from zope.interface import Interface
+        from zope.component import queryUtility
+
+        self.assertEqual(None, queryUtility(Interface, name='foo'))
+        self.assertEqual(None, queryUtility(Interface, name='bar'))
+        self.assertEqual(None, queryUtility(Interface, name='qux'))
+
+        zope.component.getSiteManager().registerUtility(
+            'foo', Interface, name='foo')
+        self.assertEqual('foo', queryUtility(Interface, name='foo'))
+        self.assertEqual(None, queryUtility(Interface, name='bar'))
+        self.assertEqual(None, queryUtility(Interface, name='qux'))
+
+        zope.component.testing.push_registry()
+        zope.component.getSiteManager().registerUtility(
+            'bar', Interface, name='bar')
+        self.assertEqual('foo', queryUtility(Interface, name='foo'))
+        self.assertEqual('bar', queryUtility(Interface, name='bar'))
+        self.assertEqual(None, queryUtility(Interface, name='qux'))
+
+        zope.component.testing.push_registry()
+        zope.component.getSiteManager().registerUtility(
+            'qux', Interface, name='qux')
+        self.assertEqual('foo', queryUtility(Interface, name='foo'))
+        self.assertEqual('bar', queryUtility(Interface, name='bar'))
+        self.assertEqual('qux', queryUtility(Interface, name='qux'))
+
+        zope.component.testing.pop_registry()
+        self.assertEqual('foo', queryUtility(Interface, name='foo'))
+        self.assertEqual('bar', queryUtility(Interface, name='bar'))
+        self.assertEqual(None, queryUtility(Interface, name='qux'))
+
+        zope.component.testing.pop_registry()
+        self.assertEqual('foo', queryUtility(Interface, name='foo'))
+        self.assertEqual(None, queryUtility(Interface, name='bar'))
+        self.assertEqual(None, queryUtility(Interface, name='qux'))
+
+    def test_changing_sitemanager_hook_does_not_break_stack(self):
+        from zope.interface import Interface
+        from zope.component import queryUtility
+
+        zope.component.testing.push_registry()
+        zope.component.getSiteManager().registerUtility(
+            'foo', Interface, name='foo')
+
+        site_registry = zope.component.registry.Components(name='site')
+        site_registry.registerUtility('bar', Interface, name='bar')
+        self.assertEqual(None, queryUtility(Interface, name='bar'))
+
+        zope.component.getSiteManager.sethook(lambda x=None: site_registry)
+        self.assertEqual('foo', queryUtility(Interface, name='foo'))
+        self.assertEqual('bar', queryUtility(Interface, name='bar'))
+
+        zope.component.testing.pop_registry()
+        self.assertEqual(site_registry, zope.component.getSiteManager())
+        zope.component.getSiteManager.reset() # XXX should be done by tearDown
+
+
 def setUpRegistryTests(tests):
     setUp()
 
@@ -1739,6 +1800,7 @@
         hooks_conditional,
         unittest.makeSuite(StandaloneTests),
         unittest.makeSuite(ResourceViewTests),
+        unittest.makeSuite(StackingTests),
         ))
 
 if __name__ == "__main__":



More information about the checkins mailing list