[Zope-Checkins] CVS: Zope3/lib/python/Zope/App/Security - ZopeSecurityPolicy.py:1.1.2.2

Tres Seaver tseaver@zope.com
Wed, 28 Nov 2001 14:03:17 -0500


Update of /cvs-repository/Zope3/lib/python/Zope/App/Security
In directory cvs.zope.org:/tmp/cvs-serv3487

Modified Files:
      Tag: Zope-3x-branch
	ZopeSecurityPolicy.py 
Log Message:


 - Implement ZSP based on '__permission__' (partial on public methods).


=== Zope3/lib/python/Zope/App/Security/ZopeSecurityPolicy.py 1.1.2.1 => 1.1.2.2 ===
 # WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS 
 # FOR A PARTICULAR PURPOSE.
-__doc__='''Define Zope\'s default security policy
+'''Define Zope\'s default security policy
 
 
 $Id$'''
 __version__='$Revision$'[11:-2]
 
-
+import string
 from types import StringType, TupleType
 
-import SimpleObjectPolicies
 from Zope.Exceptions import Unauthorized
-_noroles=SimpleObjectPolicies._noroles
-from zLOG import LOG, PROBLEM
-from Acquisition import aq_base
+from Zope.ContextWrapper import getinnercontext
+from PermissionRegistry import rolesForPermission
 
 _notfound = []
-_what_not_even_god_should_do = []
-AnonymousRole = 'Anonymous',
-
-#from PermissionRole import _what_not_even_god_should_do, \
-#     rolesForPermissionOn
+_what_not_even_god_should_do = ()
 
+AnonymousRole = ( 'Anonymous', )
 
 class ZopeSecurityPolicy:
 
-    def __init__(self, ownerous=1, authenticated=1):
-        """Create a Zope security policy.
-
-        Two optional keyword arguments may be provided:
+    def __init__( self, ownerous=1, authenticated=1 ):
+        """
+            Two optional keyword arguments may be provided:
 
-        ownerous -- Untrusted users can create code
-                    (e.g. Python scripts or templates),
-                    so check that code owners can access resources.
-                    The argument must have a truth value.
-                    The default is true.
-
-        authenticated -- Allow access to resources based on the
-                    privaledges of the authenticated user.  
-                    The argument must have a truth value.
-                    The default is true.
-
-                    This (somewhat experimental) option can be set
-                    to false on sites that allow only public
-                    (unauthenticated) access. An anticipated
-                    scenario is a ZEO configuration in which some
-                    clients allow only public access and other
-                    clients allow full management.
+            ownerous -- Untrusted users can create code
+                        (e.g. Python scripts or templates),
+                        so check that code owners can access resources.
+                        The argument must have a truth value.
+                        The default is true.
+
+            authenticated -- Allow access to resources based on the
+                        privaledges of the authenticated user.  
+                        The argument must have a truth value.
+                        The default is true.
+
+                        This (somewhat experimental) option can be set
+                        to false on sites that allow only public
+                        (unauthenticated) access. An anticipated
+                        scenario is a ZEO configuration in which some
+                        clients allow only public access and other
+                        clients allow full management.
         """
         
         self._ownerous=ownerous
         self._authenticated=authenticated
 
-    def validate(self, accessed, container, name, value, context,
-                 roles=_noroles, None=None, type=type, IntType=type(0),
-                 DictType=type({}), getattr=getattr, _noroles=_noroles,
-                 StringType=type(''),
-                 Containers=SimpleObjectPolicies.Containers,
-                 valid_aq_=('aq_parent','aq_explicit')):
-
-
-        ############################################################
-        # Provide special rules for the acquisition attributes
-        if type(name) is StringType:
-            if name[:3]=='aq_' and name not in valid_aq_:
-                return 0
-
-        containerbase = aq_base(container)
-        accessedbase=getattr(accessed, 'aq_base', container)
-
-        ############################################################
-        # If roles weren't passed in, we'll try to get them from the object
-
-        if roles is _noroles:
-            roles=getattr(value, '__roles__', _noroles)
-
-        ############################################################
-        # We still might not have any roles
-
-        if roles is _noroles:
-
-            ############################################################
-            # We have an object without roles and we didn't get a list
-            # of roles passed in. Presumably, the value is some simple
-            # object like a string or a list.  We'll try to get roles
-            # from its container.
-            if container is None: return 0 # Bail if no container
-
-            roles=getattr(container, '__roles__', _noroles)
-            if roles is _noroles:
-                aq=getattr(container, 'aq_acquire', None)
-                if aq is None:
-                    roles=_noroles
-                    if containerbase is not accessedbase: return 0
-                else:
-                    # Try to acquire roles
-                    try: roles=aq('__roles__')
-                    except AttributeError:
-                        roles=_noroles
-                        if containerbase is not accessedbase: return 0
-
-            # We need to make sure that we are allowed to
-            # get unprotected attributes from the container. We are
-            # allowed for certain simple containers and if the
-            # container says we can. Simple containers
-            # may also impose name restrictions.
-            p=Containers(type(container), None)
-            if p is None:
-                p=getattr(container,
-                          '__allow_access_to_unprotected_subobjects__', None)
-
-            if p is not None:
-                tp=type(p)
-                if tp is not IntType:
-                    if tp is DictType:
-                        p=p.get(name, None)
-                    else:
-                        p=p(name, value)
-
-            if not p:
-                if (containerbase is accessedbase):
-                    raise Unauthorized(name, value)
-                else:
-                    return 0
-
-            if roles is _noroles: return 1
-
-            # We are going to need a security-aware object to pass
-            # to allowed(). We'll use the container.
-            value=container
-
-        # Short-circuit tests if we can:
-        try:
-            if roles is None or 'Anonymous' in roles: return 1
-        except TypeError:
-            # 'roles' isn't a sequence
-            LOG('Zope Security Policy', PROBLEM, "'%s' passed as roles"
-                " during validation of '%s' is not a sequence." % (
-                `roles`, name))
-            raise
-
-        # Check executable security
-        stack=context.stack
-        if stack:
-            eo=stack[-1]
-
-            # If the executable had an owner, can it execute?
-            if self._ownerous:
-                owner=eo.getOwner()
-                if (owner is not None) and not owner.allowed(value, roles):
-                    # We don't want someone to acquire if they can't
-                    # get an unacquired!
-                    if accessedbase is containerbase:
-                        raise Unauthorized(name, value)
-                    return 0
-
-            # Proxy roles, which are a lot safer now.
-            proxy_roles=getattr(eo, '_proxy_roles', None)
-            if proxy_roles:
-                for r in proxy_roles:
-                    if r in roles: return 1
-
-                # Proxy roles actually limit access!
-                if accessedbase is containerbase:
-                    raise Unauthorized(name, value)
-
-                return 0
-
-
-        try:
-            if self._authenticated and context.user.allowed(value, roles):
-                return 1
-        except AttributeError: pass
-
-        # We don't want someone to acquire if they can't get an unacquired!
-        if accessedbase is containerbase:
-            raise Unauthorized(name, value)
-
-        return 0
-
-    def checkPermission(self, permission, object, context):
-        # XXX proxy roles and executable owner are not checked
-        roles=_rolesForPermissionOn(permission, object)
-        if type(roles) is StringType:
-            roles=[roles]
-        return context.user.allowed(object, roles)
+    #
+    #   ISecurityPolicy implementation.
+    #
+    def validate( self
+                , accessed
+                , container
+                , name
+                , value
+                , context
+                ):
+        """
+            Return silently if current user (from 'context') is allowed
+            to access 'value' from 'accessed'; else raise Unauthorized.
 
-    
-    def _rolesForPermissionOn(self,
-                              permission,
-                              object,
-                              defaultRoles=('Manager',)
-                             ):
+            accessed -- the object that was being accessed
+
+            container -- the object the value was found in
+
+            name -- The name used to access the value
+
+            value -- The value returned by the access
 
+            context -- must implement ISecurityContext; access to information
+                       such as the context stack and AUTHENTICATED_USER.
         """
-        Working from a permission name and an object, accumulate all roles
-        which have this permission assocated with them.
+        if not self.allowName( name ):
+            raise Unauthorized, "Name '%s' is not allowed" % name
+
+        permission = self._findPermission( value )
+
+        if permission is None:
+            raise Unauthorized, "Can't find permission for %s" % name
+
+        roles = self._aggregateRolesFor( permission, value )
+
+        if not roles:
+            raise Unauthorized, "No roles have permission %s" % permission
+
+        allowed = context.user.allowed( value, roles )
+
+        if not allowed:
+            raise Unauthorized, \
+                   ( "User '%s' does not have permission, %s"
+                     "(user would need one of the following roles: %s)"
+                   % ( permission
+                     , context.user.getUserName()
+                     , string.join( roles, ',' )
+                     )
+                   )
+
+    def checkPermission( self, permission, object, context ):
         """
+            Check whether the security context allows the given
+            permission on the given object.
 
-        permissionName = permission
-        accumlatedRoles=None
-        while 1:
-            
-            roles=getattr(object, permissionName, _notfound)
-            if roles is not _notfound:
+            Arguments:
+
+            permission -- A permission name
 
-                # If we cannot find any roles on this object, we return
-                # The anonymous role
-                if roles is None: return AnonymousRole
-
-                t=type(roles)
-
-                if t is TupleType:
-                    # If we get a tuple, then we don't acquire
-                    if accumulatedRoles is None: return roles
-                    return accumulatedRoles+list(roles)
-
-                if t is StringType:
-                    # We found roles set to a name.  Start over
-                    # with the new permission name.  If the permission
-                    # name is '', then treat as private!
-                    if roles:
-                        if roles != permissionName:
-                            permissionName=roles
-                        # If we find a name that is the same as the
-                        # current name, we just ignore it.
-                        roles=None
-                    else:
-                        return _what_not_even_god_should_do
-
-                elif roles:
-                    if accumulatedRoles is None:
-                        accumulatedRoles=list(roles)
-                    else:
-                        accumulatedRoles=accumulatedRoles+list(roles)
-
-            # Replace this with approprate ContextWrapper eqivalent
-            object=getattr(object, 'aq_inner', None)
-            if object is None: break
-            object=object.aq_parent
+            object -- The object being accessed according to the permission
 
-        if accumulatedRoles is None: accumulatedRoles=defaultRoles
+            context -- A SecurityContext, which provides access to information
+            shuch as the context stack and AUTHENTICATED_USER.
+        """
+        roles = self._aggregateRolesFor( permission, object )
+
+        return context.user.allowed( object, roles )
+
+    #
+    #   Helper methods
+    #
+    def _allowName( self, name, strip=string.strip ):
+        """
+            Is 'name' ever allowed to be retrieved under this policy?
+        """
+        return name and type( name ) is StringType and strip( name )
+
+    def _findPermission( self, value ):
+        """
+            Find the permission which guards the accessed object.
+        """
+        return getattr( value, '__permission__', None )
+    
+    def _listAggregatedRolesFor( self, permission, wrapped ):
+        """
+            Crawl the context of 'wrapped' and return an accumulated
+            list of all roles which have 'permission'.
+        """
+        role_set = {}
+
+        # Check placeful roles by walking the inner context chain.
+        while wrapped is not None:
 
-        return accumulatedRoles
+            for role in self._listRolesFor( permission, wrapped ):
+                role_set[ role ] = 1
+            
+            wrapped = getinnercontext( wrapped )
+        
+        # Add "global" roles for permission here
+        for role in rolesForPermission( permission ):
+            role_set[ role ] = 1
 
+        roles = role_set.keys()
+        roles.sort()
+
+        return tuple( roles )
+
+    def _listRolesFor( self, permission, unwrapped ):
+        """
+            Return a list of roles which have 'permission' on 'unwrapped'.
+        """
+        return ()   # TODO:  find placeful roles