[Checkins] SVN: zope.interface/trunk/ Merge chrism-componentregistry branch:
Chris McDonough
chrism at plope.com
Thu Sep 15 00:47:54 EST 2011
Log message for revision 122810:
Merge chrism-componentregistry branch:
svn merge -r122752:122809 svn+ssh://svn.zope.org/repos/main/zope.interface/branches/chrism-componentregistry
Changed:
U zope.interface/trunk/CHANGES.txt
U zope.interface/trunk/buildout.cfg
U zope.interface/trunk/setup.py
U zope.interface/trunk/src/zope/interface/interfaces.py
A zope.interface/trunk/src/zope/interface/registry.py
A zope.interface/trunk/src/zope/interface/tests/test_registry.py
U zope.interface/trunk/tox.ini
-=-
Modified: zope.interface/trunk/CHANGES.txt
===================================================================
--- zope.interface/trunk/CHANGES.txt 2011-09-15 05:44:44 UTC (rev 122809)
+++ zope.interface/trunk/CHANGES.txt 2011-09-15 05:47:54 UTC (rev 122810)
@@ -4,6 +4,32 @@
3.7.1 (unreleased)
------------------
+- New module ``zope.interface.registry``. This is code moved from
+ ``zope.component.registry`` which implements a basic nonperistent component
+ registry as ``zope.interface.registry.Components``. This class was moved
+ from ``zope.component`` to make porting systems (such as Pyramid) that rely
+ only on a basic component registry to Python 3 possible without needing to
+ port the entirety of the ``zope.component`` package. Backwards
+ compatibility import shims have been left behind in ``zope.component``, so
+ this change will not break any existing code.
+
+- New ``tests_require`` dependency: ``zope.event`` to test events sent by
+ Components implementation. The ``zope.interface`` package does not have a
+ hard dependency on ``zope.event``, but if ``zope.event`` is importable, it
+ will send component registration events when methods of an instance of
+ ``zope.interface.registry.Components`` are called.
+
+- New interfaces added to support ``zope.interface.registry.Components``
+ addition: ``ComponentLookupError``, ``Invalid``, ``IObjectEvent``,
+ ``ObjectEvent``, ``IComponentLookup``, ``IRegistration``,
+ ``IUtilityRegistration``, ``IAdapterRegistration``,
+ ``ISubscriptionAdapterRegistration``, ``IHandlerRegistration``,
+ ``IRegistrationEvent``, ``RegistrationEvent``, ``IRegistered``,
+ ``Registered``, ``IUnregistered``, ``Unregistered``,
+ ``IComponentRegistry``, and ``IComponents``.
+
+- No longer Python 2.4 compatible (tested under 2.5, 2.6, 2.7, and 3.2).
+
3.7.0 (2011-08-13)
------------------
Modified: zope.interface/trunk/buildout.cfg
===================================================================
--- zope.interface/trunk/buildout.cfg 2011-09-15 05:44:44 UTC (rev 122809)
+++ zope.interface/trunk/buildout.cfg 2011-09-15 05:47:54 UTC (rev 122810)
@@ -5,10 +5,12 @@
[test]
recipe = zc.recipe.testrunner
eggs = zope.interface
+ zope.event
[python]
recipe = zc.recipe.egg
eggs = zope.interface
+ zope.event
interpreter = python
[docs]
Modified: zope.interface/trunk/setup.py
===================================================================
--- zope.interface/trunk/setup.py 2011-09-15 05:44:44 UTC (rev 122809)
+++ zope.interface/trunk/setup.py 2011-09-15 05:47:54 UTC (rev 122810)
@@ -65,7 +65,7 @@
namespace_packages=["zope"],
include_package_data = True,
zip_safe = False,
- tests_require = [],
+ tests_require = ['zope.event'],
install_requires = ['setuptools'],
extras_require={'docs': ['z3c.recipe.sphinxdoc']},
features = features
Modified: zope.interface/trunk/src/zope/interface/interfaces.py
===================================================================
--- zope.interface/trunk/src/zope/interface/interfaces.py 2011-09-15 05:44:44 UTC (rev 122809)
+++ zope.interface/trunk/src/zope/interface/interfaces.py 2011-09-15 05:47:54 UTC (rev 122810)
@@ -15,9 +15,10 @@
"""
__docformat__ = 'restructuredtext'
-
-from zope.interface import Interface
from zope.interface.interface import Attribute
+from zope.interface.interface import Interface
+from zope.interface.declarations import implements
+from zope.interface.declarations import implementer # required by py3k fixers
class IElement(Interface):
"""Objects that have basic documentation and tagged values.
@@ -744,3 +745,540 @@
def subscribers(objects, provided, name=u''):
"""Get a sequence of subscription adapters
"""
+
+# begin formerly in zope.component
+
+class ComponentLookupError(LookupError):
+ """A component could not be found."""
+
+class Invalid(Exception):
+ """A component doesn't satisfy a promise."""
+
+class IObjectEvent(Interface):
+ """An event related to an object.
+
+ The object that generated this event is not necessarily the object
+ refered to by location.
+ """
+
+ object = Attribute("The subject of the event.")
+
+
+class ObjectEvent(object):
+ implements(IObjectEvent)
+
+ def __init__(self, object):
+ self.object = object
+
+class IComponentLookup(Interface):
+ """Component Manager for a Site
+
+ This object manages the components registered at a particular site. The
+ definition of a site is intentionally vague.
+ """
+
+ adapters = Attribute(
+ "Adapter Registry to manage all registered adapters.")
+
+ utilities = Attribute(
+ "Adapter Registry to manage all registered utilities.")
+
+ def queryAdapter(object, interface, name=u'', default=None):
+ """Look for a named adapter to an interface for an object
+
+ If a matching adapter cannot be found, returns the default.
+ """
+
+ def getAdapter(object, interface, name=u''):
+ """Look for a named adapter to an interface for an object
+
+ If a matching adapter cannot be found, a ComponentLookupError
+ is raised.
+ """
+
+ def queryMultiAdapter(objects, interface, name=u'', default=None):
+ """Look for a multi-adapter to an interface for multiple objects
+
+ If a matching adapter cannot be found, returns the default.
+ """
+
+ def getMultiAdapter(objects, interface, name=u''):
+ """Look for a multi-adapter to an interface for multiple objects
+
+ If a matching adapter cannot be found, a ComponentLookupError
+ is raised.
+ """
+
+ def getAdapters(objects, provided):
+ """Look for all matching adapters to a provided interface for objects
+
+ Return an iterable of name-adapter pairs for adapters that
+ provide the given interface.
+ """
+
+ def subscribers(objects, provided):
+ """Get subscribers
+
+ Subscribers are returned that provide the provided interface
+ and that depend on and are comuted from the sequence of
+ required objects.
+ """
+
+ def handle(*objects):
+ """Call handlers for the given objects
+
+ Handlers registered for the given objects are called.
+ """
+
+ def queryUtility(interface, name='', default=None):
+ """Look up a utility that provides an interface.
+
+ If one is not found, returns default.
+ """
+
+ def getUtilitiesFor(interface):
+ """Look up the registered utilities that provide an interface.
+
+ Returns an iterable of name-utility pairs.
+ """
+
+ def getAllUtilitiesRegisteredFor(interface):
+ """Return all registered utilities for an interface
+
+ This includes overridden utilities.
+
+ An iterable of utility instances is returned. No names are
+ returned.
+ """
+
+class IRegistration(Interface):
+ """A registration-information object
+ """
+
+ registry = Attribute("The registry having the registration")
+
+ name = Attribute("The registration name")
+
+ info = Attribute("""Information about the registration
+
+ This is information deemed useful to people browsing the
+ configuration of a system. It could, for example, include
+ commentary or information about the source of the configuration.
+ """)
+
+class IUtilityRegistration(IRegistration):
+ """Information about the registration of a utility
+ """
+
+ factory = Attribute("The factory used to create the utility. Optional.")
+ component = Attribute("The object registered")
+ provided = Attribute("The interface provided by the component")
+
+class _IBaseAdapterRegistration(IRegistration):
+ """Information about the registration of an adapter
+ """
+
+ factory = Attribute("The factory used to create adapters")
+
+ required = Attribute("""The adapted interfaces
+
+ This is a sequence of interfaces adapters by the registered
+ factory. The factory will be caled with a sequence of objects, as
+ positional arguments, that provide these interfaces.
+ """)
+
+ provided = Attribute("""The interface provided by the adapters.
+
+ This interface is implemented by the factory
+ """)
+
+class IAdapterRegistration(_IBaseAdapterRegistration):
+ """Information about the registration of an adapter
+ """
+
+class ISubscriptionAdapterRegistration(_IBaseAdapterRegistration):
+ """Information about the registration of a subscription adapter
+ """
+
+class IHandlerRegistration(IRegistration):
+
+ handler = Attribute("An object called used to handle an event")
+
+ required = Attribute("""The handled interfaces
+
+ This is a sequence of interfaces handled by the registered
+ handler. The handler will be caled with a sequence of objects, as
+ positional arguments, that provide these interfaces.
+ """)
+
+class IRegistrationEvent(IObjectEvent):
+ """An event that involves a registration"""
+
+class RegistrationEvent(ObjectEvent):
+ """There has been a change in a registration
+ """
+ implements(IRegistrationEvent)
+
+ def __repr__(self):
+ return "%s event:\n%r" % (self.__class__.__name__, self.object)
+
+class IRegistered(IRegistrationEvent):
+ """A component or factory was registered
+ """
+
+class Registered(RegistrationEvent):
+ implements(IRegistered)
+
+class IUnregistered(IRegistrationEvent):
+ """A component or factory was unregistered
+ """
+
+class Unregistered(RegistrationEvent):
+ """A component or factory was unregistered
+ """
+ implements(IUnregistered)
+
+class IComponentRegistry(Interface):
+ """Register components
+ """
+
+ def registerUtility(component=None, provided=None, name=u'',
+ info=u'', factory=None):
+ """Register a utility
+
+ factory
+ Factory for the component to be registerd.
+
+ component
+ The registered component
+
+ provided
+ This is the interface provided by the utility. If the
+ component provides a single interface, then this
+ argument is optional and the component-implemented
+ interface will be used.
+
+ name
+ The utility name.
+
+ info
+ An object that can be converted to a string to provide
+ information about the registration.
+
+ Only one of component and factory can be used.
+ A Registered event is generated with an IUtilityRegistration.
+ """
+
+ def unregisterUtility(component=None, provided=None, name=u'',
+ factory=None):
+ """Unregister a utility
+
+ A boolean is returned indicating whether the registry was
+ changed. If the given component is None and there is no
+ component registered, or if the given component is not
+ None and is not registered, then the function returns
+ False, otherwise it returns True.
+
+ factory
+ Factory for the component to be unregisterd.
+
+ component
+ The registered component The given component can be
+ None, in which case any component registered to provide
+ the given provided interface with the given name is
+ unregistered.
+
+ provided
+ This is the interface provided by the utility. If the
+ component is not None and provides a single interface,
+ then this argument is optional and the
+ component-implemented interface will be used.
+
+ name
+ The utility name.
+
+ Only one of component and factory can be used.
+ An UnRegistered event is generated with an IUtilityRegistration.
+ """
+
+ def registeredUtilities():
+ """Return an iterable of IUtilityRegistration instances.
+
+ These registrations describe the current utility registrations
+ in the object.
+ """
+
+ def registerAdapter(factory, required=None, provided=None, name=u'',
+ info=u''):
+ """Register an adapter factory
+
+ Parameters:
+
+ factory
+ The object used to compute the adapter
+
+ required
+ This is a sequence of specifications for objects to be
+ adapted. If omitted, then the value of the factory's
+ __component_adapts__ attribute will be used. The
+ __component_adapts__ attribute is usually attribute is
+ normally set in class definitions using adapts
+ function, or for callables using the adapter
+ decorator. If the factory doesn't have a
+ __component_adapts__ adapts attribute, then this
+ argument is required.
+
+ provided
+ This is the interface provided by the adapter and
+ implemented by the factory. If the factory
+ implements a single interface, then this argument is
+ optional and the factory-implemented interface will be
+ used.
+
+ name
+ The adapter name.
+
+ info
+ An object that can be converted to a string to provide
+ information about the registration.
+
+ A Registered event is generated with an IAdapterRegistration.
+ """
+
+ def unregisterAdapter(factory=None, required=None,
+ provided=None, name=u''):
+ """Register an adapter factory
+
+ A boolean is returned indicating whether the registry was
+ changed. If the given component is None and there is no
+ component registered, or if the given component is not
+ None and is not registered, then the function returns
+ False, otherwise it returns True.
+
+ Parameters:
+
+ factory
+ This is the object used to compute the adapter. The
+ factory can be None, in which case any factory
+ registered to implement the given provided interface
+ for the given required specifications with the given
+ name is unregistered.
+
+ required
+ This is a sequence of specifications for objects to be
+ adapted. If the factory is not None and the required
+ arguments is omitted, then the value of the factory's
+ __component_adapts__ attribute will be used. The
+ __component_adapts__ attribute attribute is normally
+ set in class definitions using adapts function, or for
+ callables using the adapter decorator. If the factory
+ is None or doesn't have a __component_adapts__ adapts
+ attribute, then this argument is required.
+
+ provided
+ This is the interface provided by the adapter and
+ implemented by the factory. If the factory is not
+ None and implements a single interface, then this
+ argument is optional and the factory-implemented
+ interface will be used.
+
+ name
+ The adapter name.
+
+ An Unregistered event is generated with an IAdapterRegistration.
+ """
+
+ def registeredAdapters():
+ """Return an iterable of IAdapterRegistration instances.
+
+ These registrations describe the current adapter registrations
+ in the object.
+ """
+
+ def registerSubscriptionAdapter(factory, required=None, provides=None,
+ name=u'', info=''):
+ """Register a subscriber factory
+
+ Parameters:
+
+ factory
+ The object used to compute the adapter
+
+ required
+ This is a sequence of specifications for objects to be
+ adapted. If omitted, then the value of the factory's
+ __component_adapts__ attribute will be used. The
+ __component_adapts__ attribute is usually attribute is
+ normally set in class definitions using adapts
+ function, or for callables using the adapter
+ decorator. If the factory doesn't have a
+ __component_adapts__ adapts attribute, then this
+ argument is required.
+
+ provided
+ This is the interface provided by the adapter and
+ implemented by the factory. If the factory implements
+ a single interface, then this argument is optional and
+ the factory-implemented interface will be used.
+
+ name
+ The adapter name.
+
+ Currently, only the empty string is accepted. Other
+ strings will be accepted in the future when support for
+ named subscribers is added.
+
+ info
+ An object that can be converted to a string to provide
+ information about the registration.
+
+ A Registered event is generated with an
+ ISubscriptionAdapterRegistration.
+ """
+
+ def unregisterSubscriptionAdapter(factory=None, required=None,
+ provides=None, name=u''):
+ """Unregister a subscriber factory.
+
+ A boolean is returned indicating whether the registry was
+ changed. If the given component is None and there is no
+ component registered, or if the given component is not
+ None and is not registered, then the function returns
+ False, otherwise it returns True.
+
+ Parameters:
+
+ factory
+ This is the object used to compute the adapter. The
+ factory can be None, in which case any factories
+ registered to implement the given provided interface
+ for the given required specifications with the given
+ name are unregistered.
+
+ required
+ This is a sequence of specifications for objects to be
+ adapted. If the factory is not None and the required
+ arguments is omitted, then the value of the factory's
+ __component_adapts__ attribute will be used. The
+ __component_adapts__ attribute attribute is normally
+ set in class definitions using adapts function, or for
+ callables using the adapter decorator. If the factory
+ is None or doesn't have a __component_adapts__ adapts
+ attribute, then this argument is required.
+
+ provided
+ This is the interface provided by the adapter and
+ implemented by the factory. If the factory is not
+ None implements a single interface, then this argument
+ is optional and the factory-implemented interface will
+ be used.
+
+ name
+ The adapter name.
+
+ Currently, only the empty string is accepted. Other
+ strings will be accepted in the future when support for
+ named subscribers is added.
+
+ An Unregistered event is generated with an
+ ISubscriptionAdapterRegistration.
+ """
+
+ def registeredSubscriptionAdapters():
+ """Return an iterable of ISubscriptionAdapterRegistration instances.
+
+ These registrations describe the current subscription adapter
+ registrations in the object.
+ """
+
+ def registerHandler(handler, required=None, name=u'', info=''):
+ """Register a handler.
+
+ A handler is a subscriber that doesn't compute an adapter
+ but performs some function when called.
+
+ Parameters:
+
+ handler
+ The object used to handle some event represented by
+ the objects passed to it.
+
+ required
+ This is a sequence of specifications for objects to be
+ adapted. If omitted, then the value of the factory's
+ __component_adapts__ attribute will be used. The
+ __component_adapts__ attribute is usually attribute is
+ normally set in class definitions using adapts
+ function, or for callables using the adapter
+ decorator. If the factory doesn't have a
+ __component_adapts__ adapts attribute, then this
+ argument is required.
+
+ name
+ The handler name.
+
+ Currently, only the empty string is accepted. Other
+ strings will be accepted in the future when support for
+ named handlers is added.
+
+ info
+ An object that can be converted to a string to provide
+ information about the registration.
+
+
+ A Registered event is generated with an IHandlerRegistration.
+ """
+
+ def unregisterHandler(handler=None, required=None, name=u''):
+ """Unregister a handler.
+
+ A handler is a subscriber that doesn't compute an adapter
+ but performs some function when called.
+
+ A boolean is returned indicating whether the registry was
+ changed.
+
+ Parameters:
+
+ handler
+ This is the object used to handle some event
+ represented by the objects passed to it. The handler
+ can be None, in which case any handlers registered for
+ the given required specifications with the given are
+ unregistered.
+
+ required
+ This is a sequence of specifications for objects to be
+ adapted. If omitted, then the value of the factory's
+ __component_adapts__ attribute will be used. The
+ __component_adapts__ attribute is usually attribute is
+ normally set in class definitions using adapts
+ function, or for callables using the adapter
+ decorator. If the factory doesn't have a
+ __component_adapts__ adapts attribute, then this
+ argument is required.
+
+ name
+ The handler name.
+
+ Currently, only the empty string is accepted. Other
+ strings will be accepted in the future when support for
+ named handlers is added.
+
+ An Unregistered event is generated with an IHandlerRegistration.
+ """
+
+ def registeredHandlers():
+ """Return an iterable of IHandlerRegistration instances.
+
+ These registrations describe the current handler registrations
+ in the object.
+ """
+
+
+class IComponents(IComponentLookup, IComponentRegistry):
+ """Component registration and access
+ """
+
+
+# end formerly in zope.component
Copied: zope.interface/trunk/src/zope/interface/registry.py (from rev 122809, zope.interface/branches/chrism-componentregistry/src/zope/interface/registry.py)
===================================================================
--- zope.interface/trunk/src/zope/interface/registry.py (rev 0)
+++ zope.interface/trunk/src/zope/interface/registry.py 2011-09-15 05:47:54 UTC (rev 122810)
@@ -0,0 +1,545 @@
+##############################################################################
+#
+# Copyright (c) 2006 Zope Foundation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""Basic components support
+"""
+import sys
+import types
+
+try:
+ from zope.event import notify
+except ImportError:
+ def notify(*arg, **kw): pass
+
+from zope.interface.interfaces import ISpecification
+from zope.interface.interfaces import ComponentLookupError
+from zope.interface.interfaces import IAdapterRegistration
+from zope.interface.interfaces import IComponents
+from zope.interface.interfaces import IHandlerRegistration
+from zope.interface.interfaces import ISubscriptionAdapterRegistration
+from zope.interface.interfaces import IUtilityRegistration
+from zope.interface.interfaces import Registered
+from zope.interface.interfaces import Unregistered
+
+from zope.interface.interface import Interface
+from zope.interface.declarations import implementedBy
+from zope.interface.declarations import implements
+from zope.interface.declarations import implementsOnly
+from zope.interface.declarations import providedBy
+from zope.interface.declarations import implementer # required by py3k fixers
+from zope.interface.declarations import implementer_only # req by py3k fixers
+from zope.interface.adapter import AdapterRegistry
+
+if sys.version_info[0] == 3:
+ def _u(s):
+ return s
+ class_types = type
+ string_types = (str,)
+else:
+ def _u(s):
+ return unicode(s, 'unicode_escape')
+ class_types = (type, types.ClassType)
+ string_types = (basestring,)
+
+class Components(object):
+
+ implements(IComponents)
+
+ def __init__(self, name='', bases=()):
+ assert isinstance(name, string_types)
+ self.__name__ = name
+ self._init_registries()
+ self._init_registrations()
+ self.__bases__ = tuple(bases)
+
+ def __repr__(self):
+ return "<%s %s>" % (self.__class__.__name__, self.__name__)
+
+ def _init_registries(self):
+ self.adapters = AdapterRegistry()
+ self.utilities = AdapterRegistry()
+
+ def _init_registrations(self):
+ self._utility_registrations = {}
+ self._adapter_registrations = {}
+ self._subscription_registrations = []
+ self._handler_registrations = []
+
+ def _getBases(self):
+ # Subclasses might override
+ return self.__dict__.get('__bases__', ())
+
+ def _setBases(self, bases):
+ # Subclasses might override
+ self.adapters.__bases__ = tuple([
+ base.adapters for base in bases])
+ self.utilities.__bases__ = tuple([
+ base.utilities for base in bases])
+ self.__dict__['__bases__'] = tuple(bases)
+
+ __bases__ = property(
+ lambda self: self._getBases(),
+ lambda self, bases: self._setBases(bases),
+ )
+
+ def registerUtility(self, component=None, provided=None, name=_u(''),
+ info=_u(''), event=True, factory=None):
+ if factory:
+ if component:
+ raise TypeError("Can't specify factory and component.")
+ component = factory()
+
+ if provided is None:
+ provided = _getUtilityProvided(component)
+
+ reg = self._utility_registrations.get((provided, name))
+ if reg is not None:
+ if reg[:2] == (component, info):
+ # already registered
+ return
+ self.unregisterUtility(reg[0], provided, name)
+
+ subscribed = False
+ for ((p, _), data) in iter(self._utility_registrations.items()):
+ if p == provided and data[0] == component:
+ subscribed = True
+ break
+
+ self._utility_registrations[(provided, name)] = component, info, factory
+ self.utilities.register((), provided, name, component)
+
+ if not subscribed:
+ self.utilities.subscribe((), provided, component)
+
+ if event:
+ notify(Registered(
+ UtilityRegistration(self, provided, name, component, info,
+ factory)
+ ))
+
+ def unregisterUtility(self, component=None, provided=None, name=_u(''),
+ factory=None):
+ if factory:
+ if component:
+ raise TypeError("Can't specify factory and component.")
+ component = factory()
+
+ if provided is None:
+ if component is None:
+ raise TypeError("Must specify one of component, factory and "
+ "provided")
+ provided = _getUtilityProvided(component)
+
+ old = self._utility_registrations.get((provided, name))
+ if (old is None) or ((component is not None) and
+ (component != old[0])):
+ return False
+
+ if component is None:
+ component = old[0]
+
+ # Note that component is now the old thing registered
+
+ del self._utility_registrations[(provided, name)]
+ self.utilities.unregister((), provided, name)
+
+ subscribed = False
+ for ((p, _), data) in iter(self._utility_registrations.items()):
+ if p == provided and data[0] == component:
+ subscribed = True
+ break
+
+ if not subscribed:
+ self.utilities.unsubscribe((), provided, component)
+
+ notify(Unregistered(
+ UtilityRegistration(self, provided, name, component, *old[1:])
+ ))
+
+ return True
+
+ def registeredUtilities(self):
+ for ((provided, name), data
+ ) in iter(self._utility_registrations.items()):
+ yield UtilityRegistration(self, provided, name, *data)
+
+ def queryUtility(self, provided, name=_u(''), default=None):
+ return self.utilities.lookup((), provided, name, default)
+
+ def getUtility(self, provided, name=_u('')):
+ utility = self.utilities.lookup((), provided, name)
+ if utility is None:
+ raise ComponentLookupError(provided, name)
+ return utility
+
+ def getUtilitiesFor(self, interface):
+ for name, utility in self.utilities.lookupAll((), interface):
+ yield name, utility
+
+ def getAllUtilitiesRegisteredFor(self, interface):
+ return self.utilities.subscriptions((), interface)
+
+ def registerAdapter(self, factory, required=None, provided=None,
+ name=_u(''), info=_u(''), event=True):
+ if provided is None:
+ provided = _getAdapterProvided(factory)
+ required = _getAdapterRequired(factory, required)
+ self._adapter_registrations[(required, provided, name)
+ ] = factory, info
+ self.adapters.register(required, provided, name, factory)
+
+ if event:
+ notify(Registered(
+ AdapterRegistration(self, required, provided, name,
+ factory, info)
+ ))
+
+
+ def unregisterAdapter(self, factory=None,
+ required=None, provided=None, name=_u(''),
+ ):
+ if provided is None:
+ if factory is None:
+ raise TypeError("Must specify one of factory and provided")
+ provided = _getAdapterProvided(factory)
+
+ if (required is None) and (factory is None):
+ raise TypeError("Must specify one of factory and required")
+
+ required = _getAdapterRequired(factory, required)
+ old = self._adapter_registrations.get((required, provided, name))
+ if (old is None) or ((factory is not None) and
+ (factory != old[0])):
+ return False
+
+ del self._adapter_registrations[(required, provided, name)]
+ self.adapters.unregister(required, provided, name)
+
+ notify(Unregistered(
+ AdapterRegistration(self, required, provided, name,
+ *old)
+ ))
+
+ return True
+
+ def registeredAdapters(self):
+ for ((required, provided, name), (component, info)
+ ) in iter(self._adapter_registrations.items()):
+ yield AdapterRegistration(self, required, provided, name,
+ component, info)
+
+ def queryAdapter(self, object, interface, name=_u(''), default=None):
+ return self.adapters.queryAdapter(object, interface, name, default)
+
+ def getAdapter(self, object, interface, name=_u('')):
+ adapter = self.adapters.queryAdapter(object, interface, name)
+ if adapter is None:
+ raise ComponentLookupError(object, interface, name)
+ return adapter
+
+ def queryMultiAdapter(self, objects, interface, name=_u(''),
+ default=None):
+ return self.adapters.queryMultiAdapter(
+ objects, interface, name, default)
+
+ def getMultiAdapter(self, objects, interface, name=_u('')):
+ adapter = self.adapters.queryMultiAdapter(objects, interface, name)
+ if adapter is None:
+ raise ComponentLookupError(objects, interface, name)
+ return adapter
+
+ def getAdapters(self, objects, provided):
+ for name, factory in self.adapters.lookupAll(
+ list(map(providedBy, objects)),
+ provided):
+ adapter = factory(*objects)
+ if adapter is not None:
+ yield name, adapter
+
+ def registerSubscriptionAdapter(self,
+ factory, required=None, provided=None,
+ name=_u(''), info=_u(''),
+ event=True):
+ if name:
+ raise TypeError("Named subscribers are not yet supported")
+ if provided is None:
+ provided = _getAdapterProvided(factory)
+ required = _getAdapterRequired(factory, required)
+ self._subscription_registrations.append(
+ (required, provided, name, factory, info)
+ )
+ self.adapters.subscribe(required, provided, factory)
+
+ if event:
+ notify(Registered(
+ SubscriptionRegistration(self, required, provided, name,
+ factory, info)
+ ))
+
+ def registeredSubscriptionAdapters(self):
+ for data in self._subscription_registrations:
+ yield SubscriptionRegistration(self, *data)
+
+ def unregisterSubscriptionAdapter(self, factory=None,
+ required=None, provided=None, name=_u(''),
+ ):
+ if name:
+ raise TypeError("Named subscribers are not yet supported")
+ if provided is None:
+ if factory is None:
+ raise TypeError("Must specify one of factory and provided")
+ provided = _getAdapterProvided(factory)
+
+ if (required is None) and (factory is None):
+ raise TypeError("Must specify one of factory and required")
+
+ required = _getAdapterRequired(factory, required)
+
+ if factory is None:
+ new = [(r, p, n, f, i)
+ for (r, p, n, f, i)
+ in self._subscription_registrations
+ if not (r == required and p == provided)
+ ]
+ else:
+ new = [(r, p, n, f, i)
+ for (r, p, n, f, i)
+ in self._subscription_registrations
+ if not (r == required and p == provided and f == factory)
+ ]
+
+ if len(new) == len(self._subscription_registrations):
+ return False
+
+
+ self._subscription_registrations[:] = new
+ self.adapters.unsubscribe(required, provided, factory)
+
+ notify(Unregistered(
+ SubscriptionRegistration(self, required, provided, name,
+ factory, '')
+ ))
+
+ return True
+
+ def subscribers(self, objects, provided):
+ return self.adapters.subscribers(objects, provided)
+
+ def registerHandler(self,
+ factory, required=None,
+ name=_u(''), info=_u(''),
+ event=True):
+ if name:
+ raise TypeError("Named handlers are not yet supported")
+ required = _getAdapterRequired(factory, required)
+ self._handler_registrations.append(
+ (required, name, factory, info)
+ )
+ self.adapters.subscribe(required, None, factory)
+
+ if event:
+ notify(Registered(
+ HandlerRegistration(self, required, name, factory, info)
+ ))
+
+ def registeredHandlers(self):
+ for data in self._handler_registrations:
+ yield HandlerRegistration(self, *data)
+
+ def unregisterHandler(self, factory=None, required=None, name=_u('')):
+ if name:
+ raise TypeError("Named subscribers are not yet supported")
+
+ if (required is None) and (factory is None):
+ raise TypeError("Must specify one of factory and required")
+
+ required = _getAdapterRequired(factory, required)
+
+ if factory is None:
+ new = [(r, n, f, i)
+ for (r, n, f, i)
+ in self._handler_registrations
+ if r != required
+ ]
+ else:
+ new = [(r, n, f, i)
+ for (r, n, f, i)
+ in self._handler_registrations
+ if not (r == required and f == factory)
+ ]
+
+ if len(new) == len(self._handler_registrations):
+ return False
+
+ self._handler_registrations[:] = new
+ self.adapters.unsubscribe(required, None, factory)
+
+ notify(Unregistered(
+ HandlerRegistration(self, required, name, factory, '')
+ ))
+
+ return True
+
+ def handle(self, *objects):
+ self.adapters.subscribers(objects, None)
+
+
+def _getUtilityProvided(component):
+ provided = list(providedBy(component))
+ if len(provided) == 1:
+ return provided[0]
+ raise TypeError(
+ "The utility doesn't provide a single interface "
+ "and no provided interface was specified.")
+
+def _getAdapterProvided(factory):
+ provided = list(implementedBy(factory))
+ if len(provided) == 1:
+ return provided[0]
+ raise TypeError(
+ "The adapter factory doesn't implement a single interface "
+ "and no provided interface was specified.")
+
+def _getAdapterRequired(factory, required):
+ if required is None:
+ try:
+ required = factory.__component_adapts__
+ except AttributeError:
+ raise TypeError(
+ "The adapter factory doesn't have a __component_adapts__ "
+ "attribute and no required specifications were specified"
+ )
+ elif ISpecification.providedBy(required):
+ raise TypeError("the required argument should be a list of "
+ "interfaces, not a single interface")
+
+ result = []
+ for r in required:
+ if r is None:
+ r = Interface
+ elif not ISpecification.providedBy(r):
+ if isinstance(r, class_types):
+ r = implementedBy(r)
+ else:
+ raise TypeError("Required specification must be a "
+ "specification or class."
+ )
+ result.append(r)
+ return tuple(result)
+
+
+class UtilityRegistration(object):
+
+ implements(IUtilityRegistration)
+
+ def __init__(self, registry, provided, name, component, doc, factory=None):
+ (self.registry, self.provided, self.name, self.component, self.info,
+ self.factory
+ ) = registry, provided, name, component, doc, factory
+
+ def __repr__(self):
+ return '%s(%r, %s, %r, %s, %r, %r)' % (
+ self.__class__.__name__,
+ self.registry,
+ getattr(self.provided, '__name__', None), self.name,
+ getattr(self.component, '__name__', repr(self.component)),
+ self.factory, self.info,
+ )
+
+ def __hash__(self):
+ return id(self)
+
+ def __eq__(self, other):
+ return repr(self) == repr(other)
+
+ def __ne__(self, other):
+ return repr(self) != repr(other)
+
+ def __lt__(self, other):
+ return repr(self) < repr(other)
+
+ def __le__(self, other):
+ return repr(self) <= repr(other)
+
+ def __gt__(self, other):
+ return repr(self) > repr(other)
+
+ def __ge__(self, other):
+ return repr(self) >= repr(other)
+
+class AdapterRegistration(object):
+
+ implements(IAdapterRegistration)
+
+ def __init__(self, registry, required, provided, name, component, doc):
+ (self.registry, self.required, self.provided, self.name,
+ self.factory, self.info
+ ) = registry, required, provided, name, component, doc
+
+ def __repr__(self):
+ return '%s(%r, %s, %s, %r, %s, %r)' % (
+ self.__class__.__name__,
+ self.registry,
+ '[' + ", ".join([r.__name__ for r in self.required]) + ']',
+ getattr(self.provided, '__name__', None), self.name,
+ getattr(self.factory, '__name__', repr(self.factory)), self.info,
+ )
+
+ def __hash__(self):
+ return id(self)
+
+ def __eq__(self, other):
+ return repr(self) == repr(other)
+
+ def __ne__(self, other):
+ return repr(self) != repr(other)
+
+ def __lt__(self, other):
+ return repr(self) < repr(other)
+
+ def __le__(self, other):
+ return repr(self) <= repr(other)
+
+ def __gt__(self, other):
+ return repr(self) > repr(other)
+
+ def __ge__(self, other):
+ return repr(self) >= repr(other)
+
+class SubscriptionRegistration(AdapterRegistration):
+
+ implementsOnly(ISubscriptionAdapterRegistration)
+
+class HandlerRegistration(AdapterRegistration):
+
+ implementsOnly(IHandlerRegistration)
+
+ def __init__(self, registry, required, name, handler, doc):
+ (self.registry, self.required, self.name, self.handler, self.info
+ ) = registry, required, name, handler, doc
+
+ @property
+ def factory(self):
+ return self.handler
+
+ provided = None
+
+ def __repr__(self):
+ return '%s(%r, %s, %r, %s, %r)' % (
+ self.__class__.__name__,
+ self.registry,
+ '[' + ", ".join([r.__name__ for r in self.required]) + ']',
+ self.name,
+ getattr(self.factory, '__name__', repr(self.factory)), self.info,
+ )
+
Copied: zope.interface/trunk/src/zope/interface/tests/test_registry.py (from rev 122809, zope.interface/branches/chrism-componentregistry/src/zope/interface/tests/test_registry.py)
===================================================================
--- zope.interface/trunk/src/zope/interface/tests/test_registry.py (rev 0)
+++ zope.interface/trunk/src/zope/interface/tests/test_registry.py 2011-09-15 05:47:54 UTC (rev 122810)
@@ -0,0 +1,950 @@
+##############################################################################
+#
+# Copyright (c) 2001, 2002, 2009 Zope Foundation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""Component Registry Tests"""
+
+import types
+import unittest
+
+from zope import interface
+from zope.interface import implementedBy
+from zope.interface.interfaces import ComponentLookupError
+from zope.interface.registry import Components
+
+import sys
+
+# fixtures
+
+if sys.version_info[0] == 3:
+ _class_types = type
+else:
+ _class_types = (type, types.ClassType)
+
+class adapter:
+
+ def __init__(self, *interfaces):
+ self.interfaces = interfaces
+
+ def __call__(self, ob):
+ if isinstance(ob, _class_types):
+ ob.__component_adapts__ = _adapts_descr(self.interfaces)
+ else:
+ ob.__component_adapts__ = self.interfaces
+
+ return ob
+
+
+def adapts(*interfaces):
+ frame = sys._getframe(1)
+ locals = frame.f_locals
+
+ # Try to make sure we were called from a class def. In 2.2.0 we can't
+ # check for __module__ since it doesn't seem to be added to the locals
+ # until later on.
+ if (locals is frame.f_globals) or (
+ ('__module__' not in locals) and sys.version_info[:3] > (2, 2, 0)):
+ raise TypeError("adapts can be used only from a class definition.")
+
+ if '__component_adapts__' in locals:
+ raise TypeError("adapts can be used only once in a class definition.")
+
+ locals['__component_adapts__'] = _adapts_descr(interfaces)
+
+class _adapts_descr(object):
+ def __init__(self, interfaces):
+ self.interfaces = interfaces
+
+ def __get__(self, inst, cls):
+ if inst is None:
+ return self.interfaces
+ raise AttributeError('__component_adapts__')
+
+class I1(interface.Interface):
+ pass
+class I2(interface.Interface):
+ pass
+class I2e(I2):
+ pass
+class I3(interface.Interface):
+ pass
+class IC(interface.Interface):
+ pass
+
+class ITestType(interface.interfaces.IInterface):
+ pass
+
+class U:
+
+ def __init__(self, name):
+ self.__name__ = name
+
+ def __repr__(self):
+ return "%s(%s)" % (self.__class__.__name__, self.__name__)
+
+class U1(U):
+ interface.implements(I1)
+
+class U12(U):
+ interface.implements(I1, I2)
+
+class IA1(interface.Interface):
+ pass
+
+class IA2(interface.Interface):
+ pass
+
+class IA3(interface.Interface):
+ pass
+
+class A:
+
+ def __init__(self, *context):
+ self.context = context
+
+ def __repr__(self):
+ return "%s%r" % (self.__class__.__name__, self.context)
+
+class A12_1(A):
+ adapts(I1, I2)
+ interface.implements(IA1)
+
+class A12_(A):
+ adapts(I1, I2)
+
+class A_2(A):
+ interface.implements(IA2)
+
+class A_3(A):
+ interface.implements(IA3)
+
+class A1_12(U):
+ adapts(I1)
+ interface.implements(IA1, IA2)
+
+class A1_2(U):
+ adapts(I1)
+ interface.implements(IA2)
+
+class A1_23(U):
+ adapts(I1)
+ interface.implements(IA1, IA3)
+
+def noop(*args):
+ pass
+
+
+# tests
+
+class TestAdapter(unittest.TestCase):
+
+ def setUp(self):
+ self.components = Components('comps')
+
+ def test_register_and_unregister_adapter(self):
+ self.components.registerAdapter(A12_1)
+
+ multi_adapter = self.components.getMultiAdapter(
+ (U1(1), U12(2)), IA1)
+ self.assertEqual(multi_adapter.__class__, A12_1)
+ self.assertEqual(repr(multi_adapter), 'A12_1(U1(1), U12(2))')
+
+ self.assertTrue(self.components.unregisterAdapter(A12_1))
+ self.assertRaises(
+ ComponentLookupError,
+ self.components.getMultiAdapter,
+ (U1(1), U12(2)),
+ IA1
+ )
+
+ def test_register_and_unregister_adapter_with_two_interfaces(self):
+ self.assertRaises(TypeError, self.components.registerAdapter,
+ A1_12)
+ self.components.registerAdapter(A1_12,
+ provided=IA2)
+
+ multi_adapter = self.components.getMultiAdapter((U1(1),), IA2)
+ self.assertEqual(multi_adapter.__class__, A1_12)
+ self.assertEqual(repr(multi_adapter), 'A1_12(U1(1))')
+
+ self.assertRaises(TypeError, self.components.unregisterAdapter, A1_12)
+ self.assertTrue(self.components.unregisterAdapter(A1_12, provided=IA2))
+ self.assertRaises(ComponentLookupError,
+ self.components.getMultiAdapter, (U1(1),), IA2)
+
+ def test_register_and_unregister_adapter_with_no_interfaces(self):
+ self.assertRaises(TypeError, self.components.registerAdapter, A12_)
+
+ self.components.registerAdapter(A12_, provided=IA2)
+ multi_adapter = self.components.getMultiAdapter((U1(1), U12(2)), IA2)
+ self.assertEqual(multi_adapter.__class__, A12_)
+ self.assertEqual(repr(multi_adapter), 'A12_(U1(1), U12(2))')
+
+ self.assertRaises(TypeError, self.components.unregisterAdapter, A12_)
+ self.assertTrue(self.components.unregisterAdapter(A12_, provided=IA2))
+ self.assertRaises(ComponentLookupError,
+ self.components.getMultiAdapter, (U1(1), U12(2)), IA2)
+
+ def test_reg_and_unreg_adp_with_no___component_adapts___attribute(self):
+ self.assertRaises(TypeError, self.components.registerAdapter, A_2)
+ self.components.registerAdapter(A_2, required=[I3])
+ self.assertTrue(self.components.unregisterAdapter(A_2, required=[I3]))
+
+ def test_register_and_unregister_class_specific(self):
+ self.components.registerAdapter(A_3, required=[U],
+ info=u'Really class specific')
+ self.assertTrue(self.components.unregisterAdapter(required=[U],
+ provided=IA3))
+
+ def test_registered_adapters_and_sorting(self):
+ self.components.registerAdapter(A12_1)
+ self.components.registerAdapter(A1_12, provided=IA2)
+ self.components.registerAdapter(A12_, provided=IA2)
+ self.components.registerAdapter(A_2, required=[I3])
+ self.components.registerAdapter(A_3, required=[U],
+ info=u'Really class specific')
+
+ sorted_adapters = sorted(self.components.registeredAdapters())
+ sorted_adapters_name = map(lambda x: getattr(x, 'name'),
+ sorted_adapters)
+ sorted_adapters_provided = map(lambda x: getattr(x, 'provided'),
+ sorted_adapters)
+ sorted_adapters_required = map(lambda x: getattr(x, 'required'),
+ sorted_adapters)
+ sorted_adapters_info = map(lambda x: getattr(x, 'info'),
+ sorted_adapters)
+
+ self.assertEqual(len(sorted_adapters), 5)
+ self.assertEqual(sorted_adapters_name, [u'', u'', u'', u'', u''])
+ self.assertEqual(sorted_adapters_provided, [IA1,
+ IA2,
+ IA2,
+ IA2,
+ IA3])
+
+ self.assertEqual(sorted_adapters_required, [(I1, I2),
+ (I1, I2),
+ (I1,),
+ (I3,),
+ (implementedBy(U),)])
+ self.assertEqual(sorted_adapters_info,
+ [u'', u'', u'', u'', u'Really class specific'])
+
+ def test_get_none_existing_adapter(self):
+ self.assertRaises(ComponentLookupError,
+ self.components.getMultiAdapter, (U(1),), IA1)
+
+ def test_query_none_existing_adapter(self):
+ self.assertTrue(self.components.queryMultiAdapter((U(1),), IA1) is None)
+ self.assertEqual(self.components.queryMultiAdapter((U(1),), IA1,
+ default=42), 42)
+
+ def test_unregister_none_existing_adapter(self):
+ self.assertFalse(self.components.unregisterAdapter(A_2, required=[I3]))
+ self.assertFalse(self.components.unregisterAdapter(A12_1, required=[U]))
+
+ def test_unregister_adapter(self):
+ self.components.registerAdapter(A12_1)
+ self.components.registerAdapter(A1_12, provided=IA2)
+ self.components.registerAdapter(A12_, provided=IA2)
+ self.components.registerAdapter(A_2, required=[I3])
+ self.components.registerAdapter(A_3, required=[U],
+ info=u'Really class specific')
+
+ self.assertTrue(self.components.unregisterAdapter(A12_1))
+ self.assertTrue(self.components.unregisterAdapter(
+ required=[U], provided=IA3))
+
+ sorted_adapters = sorted(self.components.registeredAdapters())
+ sorted_adapters_name = map(lambda x: getattr(x, 'name'),
+ sorted_adapters)
+ sorted_adapters_provided = map(lambda x: getattr(x, 'provided'),
+ sorted_adapters)
+ sorted_adapters_required = map(lambda x: getattr(x, 'required'),
+ sorted_adapters)
+ sorted_adapters_info = map(lambda x: getattr(x, 'info'),
+ sorted_adapters)
+
+ self.assertEqual(len(sorted_adapters), 3)
+ self.assertEqual(sorted_adapters_name, [u'', u'', u''])
+ self.assertEqual(sorted_adapters_provided, [IA2,
+ IA2,
+ IA2])
+ self.assertEqual(sorted_adapters_required, [(I1, I2),
+ (I1,),
+ (I3,)])
+ self.assertEqual(sorted_adapters_info, [u'', u'', u''])
+
+ def test_register_named_adapter(self):
+ self.components.registerAdapter(A1_12, provided=IA2, name=u'test')
+ self.assertTrue(
+ self.components.queryMultiAdapter((U1(1),), IA2) is None)
+ self.assertEqual(
+ repr(self.components.queryMultiAdapter((U1(1),),IA2,name=u'test')),
+ 'A1_12(U1(1))')
+
+ self.assertTrue(self.components.queryAdapter(U1(1), IA2) is None)
+ self.assertEqual(
+ repr(self.components.queryAdapter(U1(1), IA2, name=u'test')),
+ 'A1_12(U1(1))')
+ self.assertEqual(
+ repr(self.components.getAdapter(U1(1), IA2, name=u'test')),
+ 'A1_12(U1(1))')
+
+ def test_get_adapters(self):
+ self.components.registerAdapter(A1_12, provided=IA1, name=u'test 1')
+ self.components.registerAdapter(A1_23, provided=IA2, name=u'test 2')
+ self.components.registerAdapter(A1_12, provided=IA2)
+ self.components.registerAdapter(A1_12, provided=IA2)
+
+ adapters = list(self.components.getAdapters((U1(1),), IA2))
+ self.assertEqual(len(adapters), 2)
+ self.assertEqual(adapters[0][0], u'test 2')
+ self.assertEqual(adapters[1][0], u'')
+ self.assertEqual(repr(adapters[0][1]), 'A1_23(U1(1))')
+ self.assertEqual(repr(adapters[1][1]), 'A1_12(U1(1))')
+
+ def test_register_no_factory(self):
+ self.components.registerAdapter(A1_12, provided=IA2)
+ self.components.registerAdapter(noop,
+ required=[IA1], provided=IA2,
+ name=u'test noop')
+
+ self.assertTrue(
+ self.components.queryAdapter(U1(9), IA2, name=u'test noop') is None)
+ adapters = list(self.components.getAdapters((U1(1),), IA2))
+ self.assertEqual(len(adapters), 1)
+ self.assertEqual(adapters[0][0], u'')
+ self.assertEqual(repr(adapters[0][1]), 'A1_12(U1(1))')
+
+ self.assertTrue(self.components.unregisterAdapter(A1_12, provided=IA2))
+
+ sorted_adapters = sorted(self.components.registeredAdapters())
+ sorted_adapters_name = map(lambda x: getattr(x, 'name'),
+ sorted_adapters)
+ sorted_adapters_provided = map(lambda x: getattr(x, 'provided'),
+ sorted_adapters)
+ sorted_adapters_required = map(lambda x: getattr(x, 'required'),
+ sorted_adapters)
+ sorted_adapters_info = map(lambda x: getattr(x, 'info'),
+ sorted_adapters)
+
+ self.assertEqual(len(sorted_adapters), 1)
+ self.assertEqual(sorted_adapters_name, [u'test noop'])
+ self.assertEqual(sorted_adapters_provided, [IA2])
+ self.assertEqual(sorted_adapters_required, [(IA1,)])
+ self.assertEqual(sorted_adapters_info, [u''])
+
+
+class TestExtending(unittest.TestCase):
+
+ def test_extendning(self):
+ c1 = Components('1')
+ self.assertEqual(c1.__bases__, ())
+
+ c2 = Components('2', (c1, ))
+ self.assertTrue(c2.__bases__ == (c1, ))
+
+ test_object1 = U1(1)
+ test_object2 = U1(2)
+ test_object3 = U12(1)
+ test_object4 = U12(3)
+
+ self.assertEqual(len(list(c1.registeredUtilities())), 0)
+ self.assertEqual(len(list(c2.registeredUtilities())), 0)
+
+ c1.registerUtility(test_object1)
+ self.assertEqual(len(list(c1.registeredUtilities())), 1)
+ self.assertEqual(len(list(c2.registeredUtilities())), 0)
+ self.assertEqual(c1.queryUtility(I1), test_object1)
+ self.assertEqual(c2.queryUtility(I1), test_object1)
+
+ c1.registerUtility(test_object2)
+ self.assertEqual(len(list(c1.registeredUtilities())), 1)
+ self.assertEqual(len(list(c2.registeredUtilities())), 0)
+ self.assertEqual(c1.queryUtility(I1), test_object2)
+ self.assertEqual(c2.queryUtility(I1), test_object2)
+
+
+ c3 = Components('3', (c1, ))
+ c4 = Components('4', (c2, c3))
+ self.assertEqual(c4.queryUtility(I1), test_object2)
+
+ c1.registerUtility(test_object3, I2)
+ self.assertEqual(c4.queryUtility(I2), test_object3)
+
+ c3.registerUtility(test_object4, I2)
+ self.assertEqual(c4.queryUtility(I2), test_object4)
+
+ @adapter(I1)
+ def handle1(x):
+ self.assertEqual(x, test_object1)
+
+ def handle(*objects):
+ self.assertEqual(objects, (test_object1,))
+
+ @adapter(I1)
+ def handle3(x):
+ self.assertEqual(x, test_object1)
+
+ @adapter(I1)
+ def handle4(x):
+ self.assertEqual(x, test_object1)
+
+ c1.registerHandler(handle1, info=u'First handler')
+ c2.registerHandler(handle, required=[U])
+ c3.registerHandler(handle3)
+ c4.registerHandler(handle4)
+
+ c4.handle(test_object1)
+
+class TestHandler(unittest.TestCase):
+
+ def setUp(self):
+ self.components = Components('comps')
+
+ def test_register_handler(self):
+ test_object1 = U1(1)
+ test_object2 = U12(2)
+
+ @adapter(I1)
+ def handle1(x):
+ self.assertEqual(x, test_object1)
+
+ self.components.registerHandler(handle1, info=u'First handler')
+ self.components.handle(test_object1)
+
+ @adapter(I1, I2)
+ def handle12(x, y):
+ self.assertEqual(x, test_object1)
+ self.assertEqual(y, test_object2)
+
+ self.components.registerHandler(handle12)
+ self.components.handle(test_object1, test_object2)
+
+ def test_register_noncompliant_handler(self):
+ handle_calls = []
+ def handle(*objects):
+ handle_calls.append(objects)
+
+ self.assertRaises(TypeError, self.components.registerHandler, handle)
+ self.components.registerHandler(
+ handle, required=[I1], info=u'a comment')
+ self.components.registerHandler(
+ handle, required=[U], info=u'handle a class')
+
+ test_object = U1(1)
+ self.components.handle(test_object)
+ self.assertEqual(len(handle_calls), 2)
+ map(self.assertEqual, handle_calls, [(test_object,), (test_object,)])
+
+ def test_list_handlers(self):
+ test_object1 = U1(1)
+ test_object2 = U12(2)
+
+ @adapter(I1)
+ def handle1(x):
+ self.assertEqual(x, test_object1)
+
+ @adapter(I1, I2)
+ def handle12(x, y):
+ self.assertEqual(x, test_object1)
+ self.assertEqual(y, test_object2)
+
+ handle_calls = []
+ def handle(*objects):
+ handle_calls.append(objects)
+
+ self.components.registerHandler(handle1, info=u'First handler')
+ self.components.registerHandler(handle12)
+ self.components.registerHandler(
+ handle, required=[I1], info=u'a comment')
+ self.components.registerHandler(
+ handle, required=[U], info=u'handle a class')
+
+ handlers = list(self.components.registeredHandlers())
+ handlers_required = map(lambda x: getattr(x, 'required'), handlers)
+ handlers_handler = map(lambda x: getattr(x, 'handler'), handlers)
+ handlers_info = map(lambda x: getattr(x, 'info'), handlers)
+
+ self.assertEqual(len(handlers), 4)
+ self.assertEqual(handlers_required,
+ [(I1,), (I1, I2), (I1,), (implementedBy(U),)])
+ self.assertEqual(handlers_handler,
+ [handle1, handle12, handle, handle])
+ self.assertEqual(
+ handlers_info,
+ [u'First handler', u'', u'a comment', u'handle a class'])
+
+ def test_unregister_handler(self):
+ test_object1 = U1(1)
+ test_object2 = U12(2)
+
+ @adapter(I1)
+ def handle1(x):
+ self.assertEqual(x, test_object1)
+
+ @adapter(I1, I2)
+ def handle12(x, y):
+ self.assertEqual(x, test_object1)
+ self.assertEqual(y, test_object2)
+
+ handle_calls = []
+ def handle(*objects):
+ handle_calls.append(objects)
+
+ self.components.registerHandler(handle1, info=u'First handler')
+ self.components.registerHandler(handle12)
+ self.components.registerHandler(
+ handle, required=[I1], info=u'a comment')
+ self.components.registerHandler(
+ handle, required=[U], info=u'handle a class')
+
+ self.assertEqual(len(list(self.components.registeredHandlers())), 4)
+ self.assertTrue(self.components.unregisterHandler(handle12))
+ self.assertEqual(len(list(self.components.registeredHandlers())), 3)
+ self.assertFalse(self.components.unregisterHandler(handle12))
+ self.assertEqual(len(list(self.components.registeredHandlers())), 3)
+ self.assertRaises(TypeError, self.components.unregisterHandler)
+ self.assertEqual(len(list(self.components.registeredHandlers())), 3)
+ self.assertTrue(
+ self.components.unregisterHandler(handle, required=[I1]))
+ self.assertEqual(len(list(self.components.registeredHandlers())), 2)
+ self.assertTrue(self.components.unregisterHandler(handle, required=[U]))
+ self.assertEqual(len(list(self.components.registeredHandlers())), 1)
+
+ def test_multi_handler_unregistration(self):
+ """
+ There was a bug where multiple handlers for the same required
+ specification would all be removed when one of them was
+ unregistered.
+
+ """
+ from zope import interface
+
+ calls = []
+
+ class I(interface.Interface):
+ pass
+
+ def factory1(event):
+ calls.append(2)
+
+ def factory2(event):
+ calls.append(3)
+
+ class Event(object):
+ interface.implements(I)
+
+ self.components.registerHandler(factory1, [I,])
+ self.components.registerHandler(factory2, [I,])
+ self.components.handle(Event())
+ self.assertEqual(sum(calls), 5)
+ self.assertTrue(self.components.unregisterHandler(factory1, [I,]))
+ calls = []
+ self.components.handle(Event())
+ self.assertEqual(sum(calls), 3)
+
+class TestSubscriber(unittest.TestCase):
+
+ def setUp(self):
+ self.components = Components('comps')
+
+ def test_register_subscriber(self):
+ self.components.registerSubscriptionAdapter(A1_2)
+ self.components.registerSubscriptionAdapter(A1_12, provided=IA2)
+ self.components.registerSubscriptionAdapter(
+ A, [I1], IA2, info='a sample comment')
+ subscribers = self.components.subscribers((U1(1),), IA2)
+ self.assertEqual(len(subscribers), 3)
+ self.assertEqual(repr(subscribers[0]), 'A1_2(U1(1))')
+ self.assertEqual(repr(subscribers[1]), 'A1_12(U1(1))')
+ self.assertEqual(repr(subscribers[2]), 'A(U1(1),)')
+
+ def test_register_noncompliant_subscriber(self):
+ self.assertRaises(TypeError,
+ self.components.registerSubscriptionAdapter, A1_12)
+ self.assertRaises(TypeError,
+ self.components.registerSubscriptionAdapter, A)
+ self.assertRaises(
+ TypeError,
+ self.components.registerSubscriptionAdapter, A, required=[IA1])
+
+ def test_register_named_subscriber(self):
+ self.components.registerSubscriptionAdapter(
+ A, [I1], IA2, u'', u'a sample comment')
+ self.assertRaises(TypeError,
+ self.components.registerSubscriptionAdapter,
+ A, [I1], IA2, u'oops', u'a sample comment')
+ subscribers = self.components.subscribers((U1(1),), IA2)
+ self.assertEqual(len(subscribers), 1)
+ self.assertEqual(repr(subscribers[0]), 'A(U1(1),)')
+
+ def test_register_no_factory(self):
+ self.components.registerSubscriptionAdapter(noop, [I1], IA2)
+ subscribers = self.components.subscribers((U1(1),), IA2)
+ self.assertEqual(len(subscribers), 0)
+
+ def test_sorting_registered_subscription_adapters(self):
+ self.components.registerSubscriptionAdapter(A1_2)
+ self.components.registerSubscriptionAdapter(A1_12, provided=IA2)
+ self.components.registerSubscriptionAdapter(
+ A, [I1], IA2, info=u'a sample comment')
+ self.components.registerSubscriptionAdapter(
+ A, [I1], IA2, u'', u'a sample comment')
+ self.components.registerSubscriptionAdapter(noop, [I1], IA2)
+
+ sorted_subscribers = sorted(
+ self.components.registeredSubscriptionAdapters())
+ sorted_subscribers_name = map(lambda x: getattr(x, 'name'),
+ sorted_subscribers)
+ sorted_subscribers_provided = map(lambda x: getattr(x, 'provided'),
+ sorted_subscribers)
+ sorted_subscribers_required = map(lambda x: getattr(x, 'required'),
+ sorted_subscribers)
+ sorted_subscribers_factory = map(lambda x: getattr(x, 'factory'),
+ sorted_subscribers)
+ sorted_subscribers_info = map(lambda x: getattr(x, 'info'),
+ sorted_subscribers)
+
+ self.assertEqual(len(sorted_subscribers), 5)
+ self.assertEqual(sorted_subscribers_name, [u'', u'', u'', u'', u''])
+ self.assertEqual(sorted_subscribers_provided,
+ [IA2, IA2, IA2, IA2, IA2])
+ self.assertEqual(sorted_subscribers_required,
+ [(I1,), (I1,), (I1,),(I1,), (I1,)])
+ self.assertEqual(sorted_subscribers_factory,
+ [A, A, A1_12, A1_2, noop])
+ self.assertEqual(
+ sorted_subscribers_info,
+ [u'a sample comment', u'a sample comment', u'', u'', u''])
+
+ def test_unregister(self):
+ self.components.registerSubscriptionAdapter(A1_2)
+ self.assertEqual(len(self.components.subscribers((U1(1),), IA2)), 1)
+ self.assertTrue(self.components.unregisterSubscriptionAdapter(A1_2))
+ self.assertEqual(len(self.components.subscribers((U1(1),), IA2)), 0)
+
+ def test_unregister_multiple(self):
+ self.components.registerSubscriptionAdapter(A1_2)
+ self.components.registerSubscriptionAdapter(A1_12, provided=IA2)
+ self.components.registerSubscriptionAdapter(
+ A, [I1], IA2, info=u'a sample comment')
+ self.components.registerSubscriptionAdapter(
+ A, [I1], IA2, u'', u'a sample comment')
+ self.components.registerSubscriptionAdapter(noop, [I1], IA2)
+ self.assertEqual(len(self.components.subscribers((U1(1),), IA2)), 4)
+ self.assertEqual(
+ len(list(self.components.registeredSubscriptionAdapters())), 5)
+
+ self.assertTrue(
+ self.components.unregisterSubscriptionAdapter(A, [I1], IA2))
+ self.assertEqual(len(self.components.subscribers((U1(1),), IA2)), 2)
+ self.assertEqual(
+ len(list(self.components.registeredSubscriptionAdapters())), 3)
+
+ def test_unregister_no_factory(self):
+ self.components.registerSubscriptionAdapter(A1_2)
+ self.components.registerSubscriptionAdapter(A1_12, provided=IA2)
+ self.components.registerSubscriptionAdapter(noop, [I1], IA2)
+ self.assertEqual(len(self.components.subscribers((U1(1),), IA2)), 2)
+ self.assertEqual(
+ len(list(self.components.registeredSubscriptionAdapters())), 3)
+
+ self.assertRaises(
+ TypeError,
+ self.components.unregisterSubscriptionAdapter, required=[I1])
+ self.assertRaises(
+ TypeError,
+ self.components.unregisterSubscriptionAdapter, provided=IA2)
+ self.assertTrue(
+ self.components.unregisterSubscriptionAdapter(
+ required=[I1], provided=IA2))
+ self.assertEqual(len(self.components.subscribers((U1(1),), IA2)), 0)
+ self.assertEqual(
+ len(list(self.components.registeredSubscriptionAdapters())), 0)
+
+ def test_unregister_noncompliant_subscriber(self):
+ self.assertRaises(
+ TypeError,
+ self.components.unregisterSubscriptionAdapter, A1_12)
+ self.assertRaises(
+ TypeError,
+ self.components.unregisterSubscriptionAdapter, A)
+ self.assertRaises(
+ TypeError,
+ self.components.unregisterSubscriptionAdapter, A, required=[IA1])
+
+ def test_unregister_nonexistent_subscriber(self):
+ self.assertFalse(
+ self.components.unregisterSubscriptionAdapter(required=[I1],
+ provided=IA2))
+
+class TestUtility(unittest.TestCase):
+
+ def setUp(self):
+ self.components = Components('comps')
+
+ def test_register_utility(self):
+ test_object = U1(1)
+ self.components.registerUtility(test_object)
+ self.assertEqual(self.components.getUtility(I1), test_object)
+
+ def test_register_utility_with_factory(self):
+ test_object = U1(1)
+ def factory():
+ return test_object
+ self.components.registerUtility(factory=factory)
+ self.assertEqual(self.components.getUtility(I1), test_object)
+ self.assertTrue(self.components.unregisterUtility(factory=factory))
+
+ def test_register_utility_with_component_and_factory(self):
+ def factory():
+ return U1(1)
+ self.assertRaises(
+ TypeError,
+ self.components.registerUtility, U1(1), factory=factory)
+
+ def test_unregister_utility_with_and_without_component_and_factory(self):
+ def factory():
+ return U1(1)
+ self.assertRaises(
+ TypeError,
+ self.components.unregisterUtility, U1(1), factory=factory)
+ self.assertRaises(TypeError, self.components.unregisterUtility)
+
+ def test_register_utility_with_no_interfaces(self):
+ self.assertRaises(TypeError, self.components.registerUtility, A)
+
+ def test_register_utility_with_two_interfaces(self):
+ self.assertRaises(TypeError, self.components.registerUtility, U12(1))
+
+ def test_register_utility_with_arguments(self):
+ test_object1 = U12(1)
+ test_object2 = U12(2)
+ self.components.registerUtility(test_object1, I2)
+ self.components.registerUtility(test_object2, I2, 'name')
+ self.assertEqual(self.components.getUtility(I2), test_object1)
+ self.assertEqual(self.components.getUtility(I2, 'name'), test_object2)
+
+ def test_get_none_existing_utility(self):
+ from zope.interface.interfaces import ComponentLookupError
+ self.assertRaises(ComponentLookupError, self.components.getUtility, I3)
+
+ def test_query_none_existing_utility(self):
+ self.assertTrue(self.components.queryUtility(I3) is None)
+ self.assertEqual(self.components.queryUtility(I3, default=42), 42)
+
+ def test_registered_utilities_and_sorting(self):
+ test_object1 = U1(1)
+ test_object2 = U12(2)
+ test_object3 = U12(3)
+ self.components.registerUtility(test_object1)
+ self.components.registerUtility(test_object3, I2, u'name')
+ self.components.registerUtility(test_object2, I2)
+
+ sorted_utilities = sorted(self.components.registeredUtilities())
+ sorted_utilities_name = map(lambda x: getattr(x, 'name'),
+ sorted_utilities)
+ sorted_utilities_component = map(lambda x: getattr(x, 'component'),
+ sorted_utilities)
+ sorted_utilities_provided = map(lambda x: getattr(x, 'provided'),
+ sorted_utilities)
+
+ self.assertEqual(len(sorted_utilities), 3)
+ self.assertEqual(sorted_utilities_name, [u'', u'', u'name'])
+ self.assertEqual(
+ sorted_utilities_component,
+ [test_object1, test_object2, test_object3])
+ self.assertEqual(sorted_utilities_provided, [I1, I2, I2])
+
+ def test_duplicate_utility(self):
+ test_object1 = U1(1)
+ test_object2 = U12(2)
+ test_object3 = U12(3)
+ test_object4 = U1(4)
+ self.components.registerUtility(test_object1)
+ self.components.registerUtility(test_object2, I2)
+ self.components.registerUtility(test_object3, I2, u'name')
+ self.assertEqual(self.components.getUtility(I1), test_object1)
+
+ self.components.registerUtility(test_object4, info=u'use 4 now')
+ self.assertEqual(self.components.getUtility(I1), test_object4)
+
+ def test_unregister_utility(self):
+ test_object = U1(1)
+ self.components.registerUtility(test_object)
+ self.assertEqual(self.components.getUtility(I1), test_object)
+ self.assertTrue(self.components.unregisterUtility(provided=I1))
+ self.assertFalse(self.components.unregisterUtility(provided=I1))
+
+ def test_unregister_utility_extended(self):
+ test_object = U1(1)
+ self.components.registerUtility(test_object)
+ self.assertFalse(self.components.unregisterUtility(U1(1)))
+ self.assertEqual(self.components.queryUtility(I1), test_object)
+ self.assertTrue(self.components.unregisterUtility(test_object))
+ self.assertTrue(self.components.queryUtility(I1) is None)
+
+ def test_get_utilities_for(self):
+ test_object1 = U1(1)
+ test_object2 = U12(2)
+ test_object3 = U12(3)
+ self.components.registerUtility(test_object1)
+ self.components.registerUtility(test_object2, I2)
+ self.components.registerUtility(test_object3, I2, u'name')
+
+ sorted_utilities = sorted(self.components.getUtilitiesFor(I2))
+ self.assertEqual(len(sorted_utilities), 2)
+ self.assertEqual(sorted_utilities[0], (u'', test_object2))
+ self.assertEqual(sorted_utilities[1], (u'name', test_object3))
+
+ def test_get_all_utilities_registered_for(self):
+ test_object1 = U1(1)
+ test_object2 = U12(2)
+ test_object3 = U12(3)
+ test_object4 = U('ext')
+ self.components.registerUtility(test_object1)
+ self.components.registerUtility(test_object2, I2)
+ self.components.registerUtility(test_object3, I2, u'name')
+ self.components.registerUtility(test_object4, I2e)
+
+ sorted_utilities = sorted(self.components.getUtilitiesFor(I2))
+ self.assertEqual(len(sorted_utilities), 2)
+ self.assertEqual(sorted_utilities[0], (u'', test_object2))
+ self.assertEqual(sorted_utilities[1], (u'name', test_object3))
+
+ all_utilities = self.components.getAllUtilitiesRegisteredFor(I2)
+ self.assertEqual(len(all_utilities), 3)
+ self.assertTrue(test_object2 in all_utilities)
+ self.assertTrue(test_object3 in all_utilities)
+ self.assertTrue(test_object4 in all_utilities)
+
+ self.assertTrue(self.components.unregisterUtility(test_object4, I2e))
+ self.assertEqual(self.components.getAllUtilitiesRegisteredFor(I2e), [])
+
+ def test_utility_events(self):
+ from zope.event import subscribers
+ old_subscribers = subscribers[:]
+ subscribers[:] = []
+
+ test_object = U1(1)
+ def log_event(event):
+ self.assertEqual(event.object.component, test_object)
+ subscribers.append(log_event)
+ self.components.registerUtility(test_object)
+
+ subscribers[:] = old_subscribers
+
+ def test_dont_leak_utility_registrations_in__subscribers(self):
+ """
+ We've observed utilities getting left in _subscribers when they
+ get unregistered.
+
+ """
+ class C:
+ def __init__(self, name):
+ self.name = name
+ def __repr__(self):
+ return "C(%s)" % self.name
+
+ c1 = C(1)
+ c2 = C(2)
+
+ self.components.registerUtility(c1, I1)
+ self.components.registerUtility(c1, I1)
+ utilities = list(self.components.getAllUtilitiesRegisteredFor(I1))
+ self.assertEqual(len(utilities), 1)
+ self.assertEqual(utilities[0], c1)
+
+ self.assertTrue(self.components.unregisterUtility(provided=I1))
+ utilities = list(self.components.getAllUtilitiesRegisteredFor(I1))
+ self.assertEqual(len(utilities), 0)
+
+ self.components.registerUtility(c1, I1)
+ self.components.registerUtility(c2, I1)
+
+ utilities = list(self.components.getAllUtilitiesRegisteredFor(I1))
+ self.assertEqual(len(utilities), 1)
+ self.assertEqual(utilities[0], c2)
+
+
+class TestExtending(unittest.TestCase):
+
+ def test_extendning(self):
+ c1 = Components('1')
+ self.assertEqual(c1.__bases__, ())
+
+ c2 = Components('2', (c1, ))
+ self.assertTrue(c2.__bases__ == (c1, ))
+
+ test_object1 = U1(1)
+ test_object2 = U1(2)
+ test_object3 = U12(1)
+ test_object4 = U12(3)
+
+ self.assertEqual(len(list(c1.registeredUtilities())), 0)
+ self.assertEqual(len(list(c2.registeredUtilities())), 0)
+
+ c1.registerUtility(test_object1)
+ self.assertEqual(len(list(c1.registeredUtilities())), 1)
+ self.assertEqual(len(list(c2.registeredUtilities())), 0)
+ self.assertEqual(c1.queryUtility(I1), test_object1)
+ self.assertEqual(c2.queryUtility(I1), test_object1)
+
+ c1.registerUtility(test_object2)
+ self.assertEqual(len(list(c1.registeredUtilities())), 1)
+ self.assertEqual(len(list(c2.registeredUtilities())), 0)
+ self.assertEqual(c1.queryUtility(I1), test_object2)
+ self.assertEqual(c2.queryUtility(I1), test_object2)
+
+
+ c3 = Components('3', (c1, ))
+ c4 = Components('4', (c2, c3))
+ self.assertEqual(c4.queryUtility(I1), test_object2)
+
+ c1.registerUtility(test_object3, I2)
+ self.assertEqual(c4.queryUtility(I2), test_object3)
+
+ c3.registerUtility(test_object4, I2)
+ self.assertEqual(c4.queryUtility(I2), test_object4)
+
+ @adapter(I1)
+ def handle1(x):
+ self.assertEqual(x, test_object1)
+
+ def handle(*objects):
+ self.assertEqual(objects, (test_object1,))
+
+ @adapter(I1)
+ def handle3(x):
+ self.assertEqual(x, test_object1)
+
+ @adapter(I1)
+ def handle4(x):
+ self.assertEqual(x, test_object1)
+
+ c1.registerHandler(handle1, info=u'First handler')
+ c2.registerHandler(handle, required=[U])
+ c3.registerHandler(handle3)
+ c4.registerHandler(handle4)
+
+ c4.handle(test_object1)
+
+def test_suite():
+ return unittest.TestSuite((
+ unittest.makeSuite(TestUtility),
+ unittest.makeSuite(TestAdapter),
+ unittest.makeSuite(TestSubscriber),
+ unittest.makeSuite(TestHandler),
+ unittest.makeSuite(TestExtending)
+ ))
Modified: zope.interface/trunk/tox.ini
===================================================================
--- zope.interface/trunk/tox.ini 2011-09-15 05:44:44 UTC (rev 122809)
+++ zope.interface/trunk/tox.ini 2011-09-15 05:47:54 UTC (rev 122810)
@@ -1,11 +1,17 @@
[tox]
envlist =
- py24,py25,py26,py27,py32,jython,pypy
+ py25,py26,py27,py32,jython,pypy
[testenv]
commands =
python setup.py test -q
+deps = zope.event
[testenv:jython]
commands =
jython setup.py test -q
+
+[testenv:py32]
+deps = zope.event
+ zope.fixers
+
More information about the checkins
mailing list