[Zope-CVS] CVS: Products/PluggableAuthService/plugins - ChallengeProtocolChooser.py:1.4 RequestTypeSniffer.py:1.4

Sidnei da Silva sidnei at enfoldsystems.com
Wed Aug 17 16:53:45 EDT 2005


Update of /cvs-repository/Products/PluggableAuthService/plugins
In directory cvs.zope.org:/tmp/cvs-serv13697/plugins

Added Files:
	ChallengeProtocolChooser.py RequestTypeSniffer.py 
Log Message:

Merge changes from sidnei-challenge-protocol-chooser:

    - Added two new interfaces, IChallengeProtocolChooser and
      IRequestTypeSniffer. Those are used to select the 'authorization
      protocol' or 'challenger protocol' to be used for challenging
      according to the incoming request type.

    - Fixed a couple more places where Zope 2-style __implements__
      where being used to standardize on using classImplements.

    - Fixed fallback implementations of providedBy and
      implementedBy to always return a tuple.

    - Make sure challenge doesn't break if existing instances of the
      PluginRegistry don't yet have IChallengeProtocolChooser as a
      registered interface. (Would be nice to have some sort of
      migration for the PluginRegistry between PAS releases)

    - Don't assume that just because zope.interface can be imported
      that Five is present.


=== Products/PluggableAuthService/plugins/ChallengeProtocolChooser.py 1.3 => 1.4 ===
--- /dev/null	Wed Aug 17 16:53:44 2005
+++ Products/PluggableAuthService/plugins/ChallengeProtocolChooser.py	Wed Aug 17 16:53:14 2005
@@ -0,0 +1,190 @@
+##############################################################################
+#
+# Copyright (c) 2001 Zope Corporation and Contributors. All Rights
+# Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL).  A copy of the ZPL should accompany this
+# distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+""" Classes: ChallengeProtocolChooser
+
+$Id$
+"""
+
+from Acquisition import aq_parent
+from AccessControl import ClassSecurityInfo
+from BTrees.OOBTree import OOBTree
+from Globals import InitializeClass
+
+from Products.PageTemplates.PageTemplateFile import PageTemplateFile
+
+from Products.PluggableAuthService.utils import classImplements
+from Products.PluggableAuthService.plugins.BasePlugin import BasePlugin
+from Products.PluggableAuthService.interfaces.plugins \
+    import IRequestTypeSniffer
+from Products.PluggableAuthService.interfaces.plugins \
+    import IChallengeProtocolChooser
+from Products.PluggableAuthService.interfaces.plugins \
+     import IChallengePlugin
+
+from Products.PluggableAuthService.interfaces.request \
+    import IBrowserRequest, IWebDAVRequest, IFTPRequest, IXMLRPCRequest
+
+_request_types = ()
+_request_type_bmap = {}
+
+def registerRequestType(label, iface):
+    global _request_types
+    registry = list(_request_types)
+    registry.append((label, iface))
+    _request_types = tuple(registry)
+    _request_type_bmap[iface] = label
+
+def listRequestTypesLabels():
+    return _request_type_bmap.values()
+
+manage_addChallengeProtocolChooserForm = PageTemplateFile(
+    'www/cpcAdd', globals(), __name__='manage_addChallengeProtocolChooserForm' )
+
+def addChallengeProtocolChooserPlugin( dispatcher, id, title=None,
+                                       mapping=None, REQUEST=None ):
+    """ Add a ChallengeProtocolChooserPlugin to a Pluggable Auth Service. """
+
+    cpc = ChallengeProtocolChooser(id, title=title, mapping=mapping)
+    dispatcher._setObject(cpc.getId(), cpc)
+
+    if REQUEST is not None:
+        REQUEST['RESPONSE'].redirect(
+                                '%s/manage_workspace'
+                                '?manage_tabs_message='
+                                'ChallengeProtocolChooser+added.'
+                            % dispatcher.absolute_url())
+
+
+class ChallengeProtocolChooser( BasePlugin ):
+
+    """ PAS plugin for choosing challenger protocol based on request
+    """
+    meta_type = 'Challenge Protocol Chooser Plugin'
+
+    security = ClassSecurityInfo()
+
+    manage_options = (({'label': 'Mapping',
+                        'action': 'manage_editProtocolMapping'
+                        },
+                       )
+                      + BasePlugin.manage_options
+                      )
+
+    def __init__(self, id, title=None, mapping=None):
+        self._id = self.id = id
+        self.title = title
+        self._map = OOBTree()
+        if mapping is not None:
+            self.manage_updateProtocolMapping(mapping=mapping)
+
+    security.declarePrivate('chooseProtocol')
+    def chooseProtocols(self, request):
+        pas_instance = self._getPAS()
+        plugins = pas_instance._getOb('plugins')
+
+        sniffers = plugins.listPlugins( IRequestTypeSniffer )
+
+        for sniffer_id, sniffer in sniffers:
+            request_type = sniffer.sniffRequestType(request)
+            if request_type is not None:
+                return self._getProtocolsFor(request_type)
+
+    def _getProtocolsFor(self, request_type):
+        label = _request_type_bmap.get(request_type, None)
+        if label is None:
+            return
+        return self._map.get(label, None)
+
+
+    def _listProtocols(self):
+        pas_instance = self._getPAS()
+        plugins = pas_instance._getOb('plugins')
+
+        challengers = plugins.listPlugins( IChallengePlugin )
+        found = []
+
+        for challenger_id, challenger in challengers:
+            protocol = getattr(challenger, 'protocol', challenger_id)
+            if protocol not in found:
+                found.append(protocol)
+
+        return found
+
+    manage_editProtocolMappingForm = PageTemplateFile(
+        'www/cpcEdit', globals(),
+        __name__='manage_editProtocolMappingForm')
+
+    def manage_editProtocolMapping(self, REQUEST=None):
+        """ Edit Protocol Mapping
+        """
+        info = []
+        available_protocols = self._listProtocols()
+
+        request_types = listRequestTypesLabels()
+        request_types.sort()
+
+        for label in request_types:
+            settings = []
+            select_any = False
+            info.append(
+                {'label': label,
+                 'settings': settings
+                 })
+            protocols = self._map.get(label, None)
+            if not protocols:
+                select_any = True
+            for protocol in available_protocols:
+                selected = False
+                if protocols and protocol in protocols:
+                    selected = True
+                settings.append({'label': protocol,
+                                 'selected': selected,
+                                 'value': protocol,
+                                 })
+
+            settings.insert(0, {'label': '(any)',
+                                'selected': select_any,
+                                'value': '',
+                                })
+        return self.manage_editProtocolMappingForm(info=info, REQUEST=REQUEST)
+
+    def manage_updateProtocolMapping(self, mapping, REQUEST=None):
+        """ Update mapping of Request Type to Protocols
+        """
+        for key, value in mapping.items():
+            value = filter(None, value)
+            if not value:
+                if self._map.has_key(key):
+                    del self._map[key]
+            else:
+                self._map[key] = value
+
+        if REQUEST is not None:
+            REQUEST['RESPONSE'].redirect(
+                '%s/manage_editProtocolMapping'
+                '?manage_tabs_message='
+                'Protocol+Mappings+Changed.'
+                % self.absolute_url())
+
+classImplements(ChallengeProtocolChooser,
+                IChallengeProtocolChooser)
+
+InitializeClass(ChallengeProtocolChooser)
+
+for label, iface in (('Browser', IBrowserRequest),
+                     ('WebDAV', IWebDAVRequest),
+                     ('FTP', IFTPRequest),
+                     ('XML-RPC', IXMLRPCRequest)):
+    registerRequestType(label, iface)


=== Products/PluggableAuthService/plugins/RequestTypeSniffer.py 1.3 => 1.4 ===
--- /dev/null	Wed Aug 17 16:53:44 2005
+++ Products/PluggableAuthService/plugins/RequestTypeSniffer.py	Wed Aug 17 16:53:14 2005
@@ -0,0 +1,129 @@
+##############################################################################
+#
+# Copyright (c) 2001 Zope Corporation and Contributors. All Rights
+# Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL).  A copy of the ZPL should accompany this
+# distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+""" Classes: RequestTypeSniffer
+
+$Id$
+"""
+
+from Acquisition import aq_parent
+from AccessControl import ClassSecurityInfo
+from Globals import InitializeClass
+from ZServer.FTPRequest import FTPRequest
+from ZPublisher import xmlrpc
+
+from Products.PageTemplates.PageTemplateFile import PageTemplateFile
+
+from Products.PluggableAuthService.utils import classImplements
+from Products.PluggableAuthService.plugins.BasePlugin import BasePlugin
+from Products.PluggableAuthService.interfaces.plugins \
+    import IRequestTypeSniffer
+
+from Products.PluggableAuthService.interfaces.request \
+    import IBrowserRequest, IWebDAVRequest, IFTPRequest, IXMLRPCRequest
+
+_sniffers = ()
+
+def registerSniffer(iface, func):
+    global _sniffers
+    registry = list(_sniffers)
+    registry.append((iface, func))
+    _sniffers = tuple(registry)
+
+manage_addRequestTypeSnifferForm = PageTemplateFile(
+    'www/rtsAdd', globals(), __name__='manage_addRequestTypeSnifferForm' )
+
+def addRequestTypeSnifferPlugin( dispatcher, id, title=None, REQUEST=None ):
+    """ Add a RequestTypeSnifferPlugin to a Pluggable Auth Service. """
+
+    rts = RequestTypeSniffer(id, title)
+    dispatcher._setObject(rts.getId(), rts)
+
+    if REQUEST is not None:
+        REQUEST['RESPONSE'].redirect(
+                                '%s/manage_workspace'
+                                '?manage_tabs_message='
+                                'RequestTypeSniffer+added.'
+                            % dispatcher.absolute_url())
+
+
+class RequestTypeSniffer( BasePlugin ):
+
+    """ PAS plugin for detecting a Request's type
+    """
+    meta_type = 'Request Type Sniffer Plugin'
+
+    security = ClassSecurityInfo()
+
+    def __init__(self, id, title=None):
+
+        self._id = self.id = id
+        self.title = title
+
+    security.declarePrivate('sniffRequestType')
+    def sniffRequestType(self, request):
+        found = None
+        for iface, func in _sniffers:
+            if func( request ):
+                found = iface
+
+        if found is not None:
+            return found
+
+classImplements(RequestTypeSniffer,
+                IRequestTypeSniffer)
+
+InitializeClass(RequestTypeSniffer)
+
+# Most of the sniffing code below has been inspired by
+# similar tests found in BaseRequest, HTTPRequest and ZServer
+def webdavSniffer(request):
+    dav_src = request.get('WEBDAV_SOURCE_PORT', None)
+    method = request.get('REQUEST_METHOD', 'GET').upper()
+    path_info = request.get('PATH_INFO', '')
+
+    if dav_src:
+        return True
+
+    if method not in ('GET', 'POST'):
+        return True
+
+    if method in ('GET',) and path_info.endswith('manage_DAVget'):
+        return True
+
+registerSniffer(IWebDAVRequest, webdavSniffer)
+
+def xmlrpcSniffer(request):
+    response = request['RESPONSE']
+    method = request.get('REQUEST_METHOD', 'GET').upper()
+
+    if method in ('GET', 'POST') and isinstance(response, xmlrpc.Response):
+        return True
+
+registerSniffer(IXMLRPCRequest, xmlrpcSniffer)
+
+def ftpSniffer(request):
+    if isinstance(request, FTPRequest):
+        return True
+
+registerSniffer(IFTPRequest, ftpSniffer)
+
+def browserSniffer(request):
+    # If it's none of the above, it's very likely a browser request.
+    for sniffer in (webdavSniffer, ftpSniffer, xmlrpcSniffer):
+        if sniffer(request):
+            return False
+    return True
+
+registerSniffer(IBrowserRequest, browserSniffer)



More information about the Zope-CVS mailing list