[Checkins] SVN: Products.PluggableAuthService/trunk/ Add 'csrf_only' decorator for form post handlers.

Tres Seaver cvs-admin at zope.org
Fri Nov 16 00:54:58 UTC 2012


Log message for revision 128303:
  Add 'csrf_only' decorator for form post handlers.

Changed:
  _U  Products.PluggableAuthService/trunk/
  U   Products.PluggableAuthService/trunk/Products/PluggableAuthService/plugins/DynamicGroupsPlugin.py
  U   Products.PluggableAuthService/trunk/Products/PluggableAuthService/plugins/ZODBGroupManager.py
  U   Products.PluggableAuthService/trunk/Products/PluggableAuthService/plugins/ZODBRoleManager.py
  U   Products.PluggableAuthService/trunk/Products/PluggableAuthService/plugins/ZODBUserManager.py
  U   Products.PluggableAuthService/trunk/Products/PluggableAuthService/plugins/tests/test_DynamicGroupsPlugin.py
  U   Products.PluggableAuthService/trunk/Products/PluggableAuthService/plugins/tests/test_ZODBGroupManager.py
  U   Products.PluggableAuthService/trunk/Products/PluggableAuthService/plugins/tests/test_ZODBRoleManager.py
  U   Products.PluggableAuthService/trunk/Products/PluggableAuthService/plugins/tests/test_ZODBUserManager.py
  U   Products.PluggableAuthService/trunk/Products/PluggableAuthService/tests/test_utils.py
  U   Products.PluggableAuthService/trunk/Products/PluggableAuthService/utils.py

-=-
Modified: Products.PluggableAuthService/trunk/Products/PluggableAuthService/plugins/DynamicGroupsPlugin.py
===================================================================
--- Products.PluggableAuthService/trunk/Products/PluggableAuthService/plugins/DynamicGroupsPlugin.py	2012-11-16 00:54:57 UTC (rev 128302)
+++ Products.PluggableAuthService/trunk/Products/PluggableAuthService/plugins/DynamicGroupsPlugin.py	2012-11-16 00:54:58 UTC (rev 128303)
@@ -25,7 +25,6 @@
 from OFS.Folder import Folder
 from OFS.Cache import Cacheable
 from App.class_init import InitializeClass
-from Persistence import PersistentMapping
 
 from zope.interface import Interface
 
@@ -40,6 +39,7 @@
 from Products.PluggableAuthService.plugins.BasePlugin import BasePlugin
 from Products.PluggableAuthService.utils import createViewName
 from Products.PluggableAuthService.utils import classImplements
+from Products.PluggableAuthService.utils import csrf_only
 
 
 class IDynamicGroupsPlugin(Interface):
@@ -341,7 +341,7 @@
         """
         return [ self.getGroupInfo( x ) for x in self.listGroupIds() ]
 
-    security.declareProtected( ManageGroups, 'addGroup' )
+    security.declarePrivate( 'addGroup' )
     def addGroup( self
                 , group_id
                 , predicate
@@ -371,7 +371,7 @@
         view_name = createViewName('enumerateGroups')
         self.ZCacheable_invalidate(view_name=view_name)
 
-    security.declareProtected( ManageGroups, 'updateGroup' )
+    security.declarePrivate( 'updateGroup' )
     def updateGroup( self
                    , group_id
                    , predicate
@@ -409,8 +409,8 @@
         view_name = createViewName('enumerateGroups', group_id)
         self.ZCacheable_invalidate(view_name=view_name)
 
-    security.declareProtected( ManageGroups, 'removeGroup' )
-    def removeGroup( self, group_id, REQUEST=None ):
+    security.declarePrivate( 'removeGroup' )
+    def removeGroup( self, group_id ):
 
         """ Remove a group definition.
 
@@ -427,7 +427,6 @@
         self.ZCacheable_invalidate(view_name=view_name)
         view_name = createViewName('enumerateGroups', group_id)
         self.ZCacheable_invalidate(view_name=view_name)
-    removeGroup = postonly(removeGroup)
 
     #
     #   ZMI
@@ -449,6 +448,8 @@
                                     )
 
     security.declareProtected( ManageGroups, 'manage_addGroup' )
+    @csrf_only
+    @postonly
     def manage_addGroup( self
                        , group_id
                        , title
@@ -456,6 +457,7 @@
                        , predicate
                        , active=True
                        , RESPONSE=None
+                       , REQUEST=None
                        ):
         """ Add a group via the ZMI.
         """
@@ -474,6 +476,8 @@
                             )
 
     security.declareProtected( ManageGroups, 'manage_updateGroup' )
+    @csrf_only
+    @postonly
     def manage_updateGroup( self
                           , group_id
                           , predicate
@@ -481,6 +485,7 @@
                           , description=None
                           , active=True
                           , RESPONSE=None
+                          , REQUEST=None
                           ):
         """ Update a group via the ZMI.
         """
@@ -500,6 +505,8 @@
                              )
 
     security.declareProtected( ManageGroups, 'manage_removeGroups' )
+    @csrf_only
+    @postonly
     def manage_removeGroups( self
                            , group_ids
                            , RESPONSE=None
@@ -523,7 +530,6 @@
             RESPONSE.redirect( '%s/manage_groups?manage_tabs_message=%s'
                              % ( self.absolute_url(), message )
                              )
-    manage_removeGroups = postonly(manage_removeGroups)
 
 classImplements( DynamicGroupsPlugin
                , IDynamicGroupsPlugin

Modified: Products.PluggableAuthService/trunk/Products/PluggableAuthService/plugins/ZODBGroupManager.py
===================================================================
--- Products.PluggableAuthService/trunk/Products/PluggableAuthService/plugins/ZODBGroupManager.py	2012-11-16 00:54:57 UTC (rev 128302)
+++ Products.PluggableAuthService/trunk/Products/PluggableAuthService/plugins/ZODBGroupManager.py	2012-11-16 00:54:58 UTC (rev 128303)
@@ -33,6 +33,7 @@
 from Products.PluggableAuthService.permissions import ManageGroups
 from Products.PluggableAuthService.plugins.BasePlugin import BasePlugin
 from Products.PluggableAuthService.utils import classImplements
+from Products.PluggableAuthService.utils import csrf_only
 
 
 class IZODBGroupManager(Interface):
@@ -262,8 +263,8 @@
 
         return result
 
-    security.declareProtected( ManageGroups, 'addPrincipalToGroup' )
-    def addPrincipalToGroup( self, principal_id, group_id, REQUEST=None ):
+    security.declarePrivate( 'addPrincipalToGroup' )
+    def addPrincipalToGroup( self, principal_id, group_id ):
 
         """ Add a principal to a group.
 
@@ -282,10 +283,9 @@
             self._invalidatePrincipalCache( principal_id )
 
         return not already
-    addPrincipalToGroup = postonly(addPrincipalToGroup)
 
-    security.declareProtected( ManageGroups, 'removePrincipalFromGroup' )
-    def removePrincipalFromGroup( self, principal_id, group_id, REQUEST=None ):
+    security.declarePrivate( 'removePrincipalFromGroup' )
+    def removePrincipalFromGroup( self, principal_id, group_id ):
 
         """ Remove a prinicpal from from a group.
 
@@ -308,7 +308,6 @@
             self._invalidatePrincipalCache( principal_id )
 
         return already
-    removePrincipalFromGroup = postonly(removePrincipalFromGroup)
 
     #
     #   ZMI
@@ -375,6 +374,8 @@
                              )
 
     security.declareProtected( ManageGroups, 'manage_removeGroups' )
+    @csrf_only
+    @postonly
     def manage_removeGroups( self
                            , group_ids
                            , RESPONSE=None
@@ -398,9 +399,10 @@
             RESPONSE.redirect( '%s/manage_groups?manage_tabs_message=%s'
                              % ( self.absolute_url(), message )
                              )
-    manage_removeGroups = postonly(manage_removeGroups)
 
     security.declareProtected( ManageGroups, 'manage_addPrincipalsToGroup' )
+    @csrf_only
+    @postonly
     def manage_addPrincipalsToGroup( self
                                    , group_id
                                    , principal_ids
@@ -427,11 +429,12 @@
                                + '&manage_tabs_message=%s'
                                ) % ( self.absolute_url(), group_id, message )
                              )
-    manage_addPrincipalsToGroup = postonly(manage_addPrincipalsToGroup)
 
     security.declareProtected( ManageGroups
                              , 'manage_removePrincipalsFromGroup'
                              )
+    @csrf_only
+    @postonly
     def manage_removePrincipalsFromGroup( self
                                         , group_id
                                         , principal_ids
@@ -458,7 +461,6 @@
                                + '&manage_tabs_message=%s'
                                ) % ( self.absolute_url(), group_id, message )
                              )
-    manage_removePrincipalsFromGroup = postonly(manage_removePrincipalsFromGroup)
 
 classImplements( ZODBGroupManager
                , IZODBGroupManager

Modified: Products.PluggableAuthService/trunk/Products/PluggableAuthService/plugins/ZODBRoleManager.py
===================================================================
--- Products.PluggableAuthService/trunk/Products/PluggableAuthService/plugins/ZODBRoleManager.py	2012-11-16 00:54:57 UTC (rev 128302)
+++ Products.PluggableAuthService/trunk/Products/PluggableAuthService/plugins/ZODBRoleManager.py	2012-11-16 00:54:58 UTC (rev 128303)
@@ -15,6 +15,8 @@
 
 $Id$
 """
+import logging
+
 from Acquisition import aq_parent, aq_inner
 from AccessControl import ClassSecurityInfo
 from AccessControl.requestmethod import postonly
@@ -31,12 +33,11 @@
     import IRoleEnumerationPlugin
 from Products.PluggableAuthService.interfaces.plugins \
     import IRoleAssignerPlugin
-
 from Products.PluggableAuthService.permissions import ManageUsers
 from Products.PluggableAuthService.plugins.BasePlugin import BasePlugin
 from Products.PluggableAuthService.utils import classImplements
+from Products.PluggableAuthService.utils import csrf_only
 
-import logging
 
 LOG = logging.getLogger('PluggableAuthService')
 
@@ -193,7 +194,7 @@
         """
         return self._roles[ role_id ]
 
-    security.declareProtected( ManageUsers, 'addRole' )
+    security.declarePrivate( 'addRole' )
     def addRole( self, role_id, title='', description='' ):
 
         """ Add 'role_id' to the list of roles managed by this object.
@@ -208,7 +209,7 @@
                                  , 'description' : description
                                  }
 
-    security.declareProtected( ManageUsers, 'updateRole' )
+    security.declarePrivate( 'updateRole' )
     def updateRole( self, role_id, title, description ):
 
         """ Update title and description for the role.
@@ -219,7 +220,7 @@
                                        , 'description' : description
                                        } )
 
-    security.declareProtected( ManageUsers, 'removeRole' )
+    security.declarePrivate( 'removeRole' )
     def removeRole( self, role_id, REQUEST=None ):
 
         """ Remove 'role_id' from the list of roles managed by this object.
@@ -234,7 +235,6 @@
             self.removeRoleFromPrincipal( role_id, principal_id )
 
         del self._roles[ role_id ]
-    removeRole = postonly(removeRole)
 
     #
     #   Role assignment API
@@ -297,8 +297,8 @@
 
         return result
 
-    security.declareProtected( ManageUsers, 'assignRoleToPrincipal' )
-    def assignRoleToPrincipal( self, role_id, principal_id, REQUEST=None ):
+    security.declarePrivate( 'assignRoleToPrincipal' )
+    def assignRoleToPrincipal( self, role_id, principal_id ):
 
         """ Assign a role to a principal (user or group).
 
@@ -317,10 +317,9 @@
             self._invalidatePrincipalCache( principal_id )
 
         return not already
-    assignRoleToPrincipal = postonly(assignRoleToPrincipal)
 
-    security.declareProtected( ManageUsers, 'removeRoleFromPrincipal' )
-    def removeRoleFromPrincipal( self, role_id, principal_id, REQUEST=None ):
+    security.declarePrivate( 'removeRoleFromPrincipal' )
+    def removeRoleFromPrincipal( self, role_id, principal_id ):
 
         """ Remove a role from a principal (user or group).
 
@@ -342,7 +341,6 @@
             self._invalidatePrincipalCache( principal_id )
 
         return already
-    removeRoleFromPrincipal = postonly(removeRoleFromPrincipal)
 
     #
     #   ZMI
@@ -367,11 +365,14 @@
                                       )
 
     security.declareProtected( ManageUsers, 'manage_addRole' )
+    @csrf_only
+    @postonly
     def manage_addRole( self
                       , role_id
                       , title
                       , description
-                      , RESPONSE
+                      , RESPONSE=None
+                      , REQUEST=None
                       ):
         """ Add a role via the ZMI.
         """
@@ -379,16 +380,20 @@
 
         message = 'Role+added'
 
-        RESPONSE.redirect( '%s/manage_roles?manage_tabs_message=%s'
-                         % ( self.absolute_url(), message )
-                         )
+        if RESPONSE is not None:
+            RESPONSE.redirect( '%s/manage_roles?manage_tabs_message=%s'
+                            % ( self.absolute_url(), message )
+                            )
 
     security.declareProtected( ManageUsers, 'manage_updateRole' )
+    @csrf_only
+    @postonly
     def manage_updateRole( self
                          , role_id
                          , title
                          , description
-                         , RESPONSE
+                         , RESPONSE=None
+                         , REQUEST=None
                          ):
         """ Update a role via the ZMI.
         """
@@ -396,14 +401,18 @@
 
         message = 'Role+updated'
 
-        RESPONSE.redirect( '%s/manage_roles?role_id=%s&manage_tabs_message=%s'
-                         % ( self.absolute_url(), role_id, message )
-                         )
+        if RESPONSE is not None:
+            RESPONSE.redirect( '%s/manage_roles?role_id=%s&'
+                               'manage_tabs_message=%s'
+                            % ( self.absolute_url(), role_id, message )
+                            )
 
     security.declareProtected( ManageUsers, 'manage_removeRoles' )
+    @csrf_only
+    @postonly
     def manage_removeRoles( self
                           , role_ids
-                          , RESPONSE
+                          , RESPONSE=None
                           , REQUEST=None
                           ):
         """ Remove one or more role assignments via the ZMI.
@@ -424,12 +433,14 @@
 
             message = 'Role+assignments+removed'
 
-        RESPONSE.redirect( '%s/manage_roles?manage_tabs_message=%s'
-                         % ( self.absolute_url(), message )
-                         )
-    manage_removeRoles = postonly(manage_removeRoles)
+        if RESPONSE is not None:
+            RESPONSE.redirect( '%s/manage_roles?manage_tabs_message=%s'
+                            % ( self.absolute_url(), message )
+                            )
 
     security.declareProtected( ManageUsers, 'manage_assignRoleToPrincipals' )
+    @csrf_only
+    @postonly
     def manage_assignRoleToPrincipals( self
                                      , role_id
                                      , principal_ids
@@ -451,17 +462,19 @@
                                                  , '+'.join( assigned )
                                                  )
 
-        RESPONSE.redirect( ( '%s/manage_roles?role_id=%s&assign=1'
-                           + '&manage_tabs_message=%s'
-                           ) % ( self.absolute_url(), role_id, message )
-                         )
-    manage_assignRoleToPrincipals = postonly(manage_assignRoleToPrincipals)
+        if RESPONSE is not None:
+            RESPONSE.redirect( ( '%s/manage_roles?role_id=%s&assign=1'
+                            + '&manage_tabs_message=%s'
+                            ) % ( self.absolute_url(), role_id, message )
+                            )
 
     security.declareProtected( ManageUsers, 'manage_removeRoleFromPrincipals' )
+    @csrf_only
+    @postonly
     def manage_removeRoleFromPrincipals( self
                                        , role_id
                                        , principal_ids
-                                       , RESPONSE
+                                       , RESPONSE=None
                                        , REQUEST=None
                                        ):
         """ Remove a role from one or more principals via the ZMI.
@@ -479,11 +492,11 @@
                                                   , '+'.join( removed )
                                                   )
 
-        RESPONSE.redirect( ( '%s/manage_roles?role_id=%s&assign=1'
-                           + '&manage_tabs_message=%s'
-                           ) % ( self.absolute_url(), role_id, message )
-                         )
-    manage_removeRoleFromPrincipals = postonly(manage_removeRoleFromPrincipals)
+        if RESPONSE is not None:
+            RESPONSE.redirect( ( '%s/manage_roles?role_id=%s&assign=1'
+                            + '&manage_tabs_message=%s'
+                            ) % ( self.absolute_url(), role_id, message )
+                            )
 
 classImplements( ZODBRoleManager
                , IZODBRoleManager

Modified: Products.PluggableAuthService/trunk/Products/PluggableAuthService/plugins/ZODBUserManager.py
===================================================================
--- Products.PluggableAuthService/trunk/Products/PluggableAuthService/plugins/ZODBUserManager.py	2012-11-16 00:54:57 UTC (rev 128302)
+++ Products.PluggableAuthService/trunk/Products/PluggableAuthService/plugins/ZODBUserManager.py	2012-11-16 00:54:58 UTC (rev 128303)
@@ -44,6 +44,7 @@
 from Products.PluggableAuthService.plugins.BasePlugin import BasePlugin
 from Products.PluggableAuthService.utils import classImplements
 from Products.PluggableAuthService.utils import createViewName
+from Products.PluggableAuthService.utils import csrf_only
 
 
 class IZODBUserManager(Interface):
@@ -384,12 +385,15 @@
                                    )
 
     security.declareProtected( ManageUsers, 'manage_addUser' )
+    @csrf_only
+    @postonly
     def manage_addUser( self
                       , user_id
                       , login_name
                       , password
                       , confirm
                       , RESPONSE=None
+                      , REQUEST=None
                       ):
         """ Add a user via the ZMI.
         """
@@ -413,6 +417,8 @@
                              )
 
     security.declareProtected( ManageUsers, 'manage_updateUserPassword' )
+    @csrf_only
+    @postonly
     def manage_updateUserPassword( self
                                  , user_id
                                  , password
@@ -435,10 +441,16 @@
             RESPONSE.redirect( '%s/manage_users?manage_tabs_message=%s'
                              % ( self.absolute_url(), message )
                              )
-    manage_updateUserPassword = postonly(manage_updateUserPassword)
 
     security.declareProtected( ManageUsers, 'manage_updateUser' )
-    def manage_updateUser(self, user_id, login_name, RESPONSE=None):
+    @csrf_only
+    @postonly
+    def manage_updateUser(self
+                         , user_id
+                         , login_name
+                         , RESPONSE=None
+                         , REQUEST=None
+                         ):
         """ Update a user's login name via the ZMI.
         """
         if not login_name:
@@ -456,6 +468,8 @@
                              )
 
     security.declareProtected( ManageUsers, 'manage_removeUsers' )
+    @csrf_only
+    @postonly
     def manage_removeUsers( self
                           , user_ids
                           , RESPONSE=None
@@ -479,7 +493,6 @@
             RESPONSE.redirect( '%s/manage_users?manage_tabs_message=%s'
                              % ( self.absolute_url(), message )
                              )
-    manage_removeUsers = postonly(manage_removeUsers)
 
     #
     #   Allow users to change their own login name and password.
@@ -500,6 +513,8 @@
                                    )
 
     security.declareProtected( SetOwnPassword, 'manage_updatePassword' )
+    @csrf_only
+    @postonly
     def manage_updatePassword( self
                              , login_name
                              , password
@@ -529,7 +544,6 @@
                                '?manage_tabs_message=%s'
                              % ( self.absolute_url(), message )
                              )
-    manage_updatePassword = postonly(manage_updatePassword)
 
 classImplements( ZODBUserManager
                , IZODBUserManager

Modified: Products.PluggableAuthService/trunk/Products/PluggableAuthService/plugins/tests/test_DynamicGroupsPlugin.py
===================================================================
--- Products.PluggableAuthService/trunk/Products/PluggableAuthService/plugins/tests/test_DynamicGroupsPlugin.py	2012-11-16 00:54:57 UTC (rev 128302)
+++ Products.PluggableAuthService/trunk/Products/PluggableAuthService/plugins/tests/test_DynamicGroupsPlugin.py	2012-11-16 00:54:58 UTC (rev 128303)
@@ -494,30 +494,6 @@
         self.assertEqual( len( groups ), 1 )
         self.failUnless( 'ggp_effable' in groups )
 
-    def test_removeGroup_POST_permissions(self):
-        from zExceptions import Forbidden
-
-        GROUP_ID = 'testgroup'
-
-        dpg = self._makeOne( 'adding' )
-
-        dpg.addGroup( GROUP_ID, 'python:True', 'title', 'description', True )
-
-        req, res = makeRequestAndResponse()
-
-        req.set('REQUEST_METHOD', 'GET')
-
-        # Fails with a GET
-        # test removeGroup
-        req.set('REQUEST_METHOD', 'GET')
-        req.set('method', 'GET')
-        self.assertRaises(Forbidden, dpg.removeGroup,
-                          GROUP_ID, REQUEST=req)
-        # Works with a POST
-        req.set('REQUEST_METHOD', 'POST')
-        req.set('method', 'POST')
-        dpg.removeGroup(GROUP_ID, REQUEST=req)
-
 if __name__ == "__main__":
     unittest.main()
 

Modified: Products.PluggableAuthService/trunk/Products/PluggableAuthService/plugins/tests/test_ZODBGroupManager.py
===================================================================
--- Products.PluggableAuthService/trunk/Products/PluggableAuthService/plugins/tests/test_ZODBGroupManager.py	2012-11-16 00:54:57 UTC (rev 128302)
+++ Products.PluggableAuthService/trunk/Products/PluggableAuthService/plugins/tests/test_ZODBGroupManager.py	2012-11-16 00:54:58 UTC (rev 128303)
@@ -287,7 +287,7 @@
         for info in info_list:
             self.failUnless( info[ 'id' ] in PRE_LIST )
 
-    def test_addPrincipalToGroup( self ):
+    def test_addPrincipalToGroup_w_prefix( self ):
         zgm = self._makeOne()
         zgm.prefix = 'prefixed_'
 
@@ -299,45 +299,6 @@
         groups = zgm.getGroupsForPrincipal( user )
         self.assertEqual( groups, ( 'prefixed_group', ) )
 
-    def test_addPrincipalToGroup_POST_permissions(self):
-        USER_ID = 'testuser'
-        GROUP_ID = 'testgroup'
-
-        zgm = self._makeOne()
-        zgm.prefix = 'prefixed_'
-
-        zgm.addGroup(GROUP_ID)
-        req, res = makeRequestAndResponse()
-
-        # Fails with a GET
-        req.set('REQUEST_METHOD', 'GET')
-        req.set('method', 'GET')
-        self.assertRaises(Forbidden, zgm.addPrincipalToGroup,
-                          USER_ID, GROUP_ID, REQUEST=req)
-        # Works with a POST
-        req.set('REQUEST_METHOD', 'POST')
-        req.set('method', 'POST')
-        zgm.addPrincipalToGroup(USER_ID, GROUP_ID, REQUEST=req)
-
-    def test_removePrincipalFromGroup_POST_permissions(self):
-        USER_ID = 'testuser'
-        GROUP_ID = 'testgroup'
-
-        zgm = self._makeOne()
-        zgm.prefix = 'prefixed_'
-
-        zgm.addGroup(GROUP_ID)
-        req, res = makeRequestAndResponse()
-
-        req.set('REQUEST_METHOD', 'GET')
-        req.set('method', 'GET')
-        self.assertRaises(Forbidden, zgm.removePrincipalFromGroup,
-                          USER_ID, GROUP_ID, REQUEST=req)
-        # Works with a POST
-        req.set('REQUEST_METHOD', 'POST')
-        req.set('method', 'POST')
-        zgm.removePrincipalFromGroup(USER_ID, GROUP_ID, REQUEST=req)
-
     def test_manage_addPrincipalsToGroup_POST_permissions(self):
         USER_ID = 'testuser'
         GROUP_ID = 'testgroup'
@@ -350,12 +311,18 @@
 
         req.set('REQUEST_METHOD', 'GET')
         req.set('method', 'GET')
+        req.set('SESSION', {})
         self.assertRaises(Forbidden, zgm.manage_addPrincipalsToGroup,
                           GROUP_ID, [USER_ID], REQUEST=req)
 
-        # Works with a POST
         req.set('REQUEST_METHOD', 'POST')
         req.set('method', 'POST')
+        self.assertRaises(Forbidden, zgm.manage_addPrincipalsToGroup,
+                          GROUP_ID, [USER_ID], REQUEST=req)
+
+        # Works with a POST + CSRF token
+        req.form['csrf_token'] = 'deadbeef'
+        req.SESSION['_csrft_'] = 'deadbeef'
         zgm.manage_addPrincipalsToGroup(GROUP_ID, [USER_ID], REQUEST=req)
 
     def test_manage_removePrincipalsFromGroup_POST_permissions(self):
@@ -370,12 +337,19 @@
 
         req.set('REQUEST_METHOD', 'GET')
         req.set('method', 'GET')
+        req.set('SESSION', {})
         self.assertRaises(Forbidden, zgm.manage_removePrincipalsFromGroup,
                           GROUP_ID, [USER_ID], REQUEST=req)
 
         # Works with a POST
         req.set('REQUEST_METHOD', 'POST')
         req.set('method', 'POST')
+        self.assertRaises(Forbidden, zgm.manage_removePrincipalsFromGroup,
+                          GROUP_ID, [USER_ID], REQUEST=req)
+
+        # Works with a POST + CSRF token
+        req.form['csrf_token'] = 'deadbeef'
+        req.SESSION['_csrft_'] = 'deadbeef'
         zgm.manage_removePrincipalsFromGroup(GROUP_ID, [USER_ID], REQUEST=req)
 
     def test_manage_removeGroup_POST_permissions(self):
@@ -388,16 +362,20 @@
 
         req.set('REQUEST_METHOD', 'GET')
         req.set('method', 'GET')
+        req.set('SESSION', {})
         self.assertRaises(Forbidden, zgm.manage_removeGroups,
                           [GROUP_ID], REQUEST=req)
 
-        # Works with a POST
         req.set('REQUEST_METHOD', 'POST')
         req.set('method', 'POST')
+        self.assertRaises(Forbidden, zgm.manage_removeGroups,
+                          [GROUP_ID], REQUEST=req)
+
+        # Works with a POST + CSRF token
+        req.form['csrf_token'] = 'deadbeef'
+        req.SESSION['_csrft_'] = 'deadbeef'
         zgm.manage_removeGroups([GROUP_ID], REQUEST=req)
 
-if __name__ == "__main__":
-    unittest.main()
 
 def test_suite():
     return unittest.TestSuite((

Modified: Products.PluggableAuthService/trunk/Products/PluggableAuthService/plugins/tests/test_ZODBRoleManager.py
===================================================================
--- Products.PluggableAuthService/trunk/Products/PluggableAuthService/plugins/tests/test_ZODBRoleManager.py	2012-11-16 00:54:57 UTC (rev 128302)
+++ Products.PluggableAuthService/trunk/Products/PluggableAuthService/plugins/tests/test_ZODBRoleManager.py	2012-11-16 00:54:58 UTC (rev 128303)
@@ -492,66 +492,6 @@
 
         self.failIf( 'test' in zrm.getRolesForPrincipal( user ) )
 
-    def test_assignRoleToPrincipal_POST_permissions(self):
-        USER_ID = 'testuser'
-        ROLE_ID = 'myrole'
-
-        root = FauxPAS()
-        zrm = self._makeOne(id='remove_existing').__of__(root)
-        zrm = self._makeOne()
-        zrm.addRole(ROLE_ID)
-
-        req, res = makeRequestAndResponse()
-
-        # Fails with a GET
-        req.set('REQUEST_METHOD', 'GET')
-        req.set('method', 'GET')
-        self.assertRaises(Forbidden, zrm.assignRoleToPrincipal,
-                          ROLE_ID, USER_ID, REQUEST=req)
-        # Works with a POST
-        req.set('REQUEST_METHOD', 'POST')
-        req.set('method', 'POST')
-        zrm.assignRoleToPrincipal(ROLE_ID, USER_ID, REQUEST=req)
-
-    def test_removeRoleFromPricipal_POST_permission(self):
-        USER_ID = 'testuser'
-        ROLE_ID = 'myrole'
-
-        root = FauxPAS()
-        zrm = self._makeOne(id='remove_existing').__of__(root)
-        zrm = self._makeOne()
-        zrm.addRole(ROLE_ID)
-
-        req, res = makeRequestAndResponse()
-
-        req.set('REQUEST_METHOD', 'GET')
-        req.set('method', 'GET')
-        self.assertRaises(Forbidden, zrm.removeRoleFromPrincipal,
-                          ROLE_ID, USER_ID, REQUEST=req)
-        # Works with a POST
-        req.set('REQUEST_METHOD', 'POST')
-        req.set('method', 'POST')
-        zrm.removeRoleFromPrincipal(ROLE_ID, USER_ID, REQUEST=req)
-
-    def test_removeRole_POST_permissions(self):
-        ROLE_ID = 'myrole'
-
-        root = FauxPAS()
-        zrm = self._makeOne(id='remove_existing').__of__(root)
-        zrm = self._makeOne()
-        zrm.addRole(ROLE_ID)
-
-        req, res = makeRequestAndResponse()
-
-        req.set('REQUEST_METHOD', 'GET')
-        req.set('method', 'GET')
-        self.assertRaises(Forbidden, zrm.removeRole,
-                          ROLE_ID, REQUEST=req)
-        # Works with a POST
-        req.set('REQUEST_METHOD', 'POST')
-        req.set('method', 'POST')
-        zrm.removeRole(ROLE_ID, REQUEST=req)
-
     def test_manage_assignRoleToPrincipal_POST_permissions(self):
         USER_ID = 'testuser'
         ROLE_ID = 'myrole'
@@ -565,14 +505,21 @@
 
         req.set('REQUEST_METHOD', 'GET')
         req.set('method', 'GET')
+        req.set('SESSION', {})
         self.assertRaises(Forbidden, zrm.manage_assignRoleToPrincipals,
                           ROLE_ID, [USER_ID], RESPONSE=res, REQUEST=req)
+
         req.set('REQUEST_METHOD', 'POST')
         req.set('method', 'POST')
+        self.assertRaises(Forbidden, zrm.manage_assignRoleToPrincipals,
+                          ROLE_ID, [USER_ID], RESPONSE=res, REQUEST=req)
+
+        req.form['csrf_token'] = 'deadbeef'
+        req.SESSION['_csrft_'] = 'deadbeef'
         zrm.manage_assignRoleToPrincipals(ROLE_ID, [USER_ID], RESPONSE=res,
                                           REQUEST=req)
 
-    def test_manage_removeRoleFromPricipal_POS_permissionsT(self):
+    def test_manage_removeRoleFromPricipal_POST_permissionsT(self):
         USER_ID = 'testuser'
         ROLE_ID = 'myrole'
 
@@ -585,10 +532,17 @@
 
         req.set('REQUEST_METHOD', 'GET')
         req.set('method', 'GET')
+        req.set('SESSION', {})
         self.assertRaises(Forbidden, zrm.manage_removeRoleFromPrincipals,
                           ROLE_ID, [USER_ID], RESPONSE=res, REQUEST=req)
+
         req.set('REQUEST_METHOD', 'POST')
         req.set('method', 'POST')
+        self.assertRaises(Forbidden, zrm.manage_removeRoleFromPrincipals,
+                          ROLE_ID, [USER_ID], RESPONSE=res, REQUEST=req)
+
+        req.form['csrf_token'] = 'deadbeef'
+        req.SESSION['_csrft_'] = 'deadbeef'
         zrm.manage_removeRoleFromPrincipals(ROLE_ID, [USER_ID], RESPONSE=res,
                                             REQUEST=req)
 
@@ -603,16 +557,20 @@
         req, res = makeRequestAndResponse()
         req.set('REQUEST_METHOD', 'GET')
         req.set('method', 'GET')
+        req.set('SESSION', {})
         self.assertRaises(Forbidden, zrm.manage_removeRoles,
                           [ROLE_ID], RESPONSE=res, REQUEST=req)
+
         req.set('REQUEST_METHOD', 'POST')
         req.set('method', 'POST')
+        self.assertRaises(Forbidden, zrm.manage_removeRoles,
+                          [ROLE_ID], RESPONSE=res, REQUEST=req)
+
+        req.form['csrf_token'] = 'deadbeef'
+        req.SESSION['_csrft_'] = 'deadbeef'
         zrm.manage_removeRoles([ROLE_ID], RESPONSE=res, REQUEST=req)
 
 
-if __name__ == "__main__":
-    unittest.main()
-
 def test_suite():
     return unittest.TestSuite((
         unittest.makeSuite( ZODBRoleManagerTests ),

Modified: Products.PluggableAuthService/trunk/Products/PluggableAuthService/plugins/tests/test_ZODBUserManager.py
===================================================================
--- Products.PluggableAuthService/trunk/Products/PluggableAuthService/plugins/tests/test_ZODBUserManager.py	2012-11-16 00:54:57 UTC (rev 128302)
+++ Products.PluggableAuthService/trunk/Products/PluggableAuthService/plugins/tests/test_ZODBUserManager.py	2012-11-16 00:54:58 UTC (rev 128303)
@@ -553,8 +553,6 @@
         self.assertEqual(uid_and_info, (USER_ID, USER_ID))
 
     def test_updateUserPassword_with_preencrypted_password(self):
-        from AccessControl.AuthEncoding import pw_encrypt
-
         USER_ID = 'already_encrypted'
         PASSWORD = 'password'
 
@@ -594,11 +592,17 @@
             def getId( self ):
                 return self._id
 
+        req, res = makeRequestAndResponse()
+        req.set('REQUEST_METHOD', 'POST')
+        req.set('method', 'POST')
+        req.SESSION = {'_csrft_': 'deadbeef'}
+        req.form['csrf_token'] = 'deadbeef'
         newSecurityManager(None, FauxUser('user1'))
         try:
             zum.manage_updatePassword('user2 at example.com',
                                       'new_password',
                                       'new_password',
+                                      REQUEST=req,
                                      )
         finally:
             noSecurityManager()
@@ -623,14 +627,33 @@
         # Fails with a GET
         req.set('REQUEST_METHOD', 'GET')
         req.set('method', 'GET')
+        req.set('SESSION', {})
         self.assertRaises(Forbidden, zum.manage_updateUserPassword,
                           USER_ID, PASSWORD, PASSWORD, REQUEST=req)
-        # Works with a POST
+
         req.set('REQUEST_METHOD', 'POST')
         req.set('method', 'POST')
+        self.assertRaises(Forbidden, zum.manage_updateUserPassword,
+                          USER_ID, PASSWORD, PASSWORD, REQUEST=req)
+
+        # Works with a POST + CSRF toekn
+        req.form['csrf_token'] = 'deadbeef'
+        req.SESSION['_csrft_'] = 'deadbeef'
         zum.manage_updateUserPassword(USER_ID, PASSWORD, PASSWORD, REQUEST=req)
 
     def test_manage_updatePassword_POST_permissions(self):
+        from AccessControl.SecurityManagement import newSecurityManager
+        from AccessControl.SecurityManagement import noSecurityManager
+        from Acquisition import Implicit
+        # Give the user a new password; attempting to authenticate with the
+        # old password must fail
+        class FauxUser(Implicit):
+
+            def __init__(self, id):
+                self._id = id
+
+            def getId( self ):
+                return self._id
         USER_ID = 'testuser'
         PASSWORD = 'password'
         ENCRYPTED = pw_encrypt(PASSWORD)
@@ -641,10 +664,24 @@
         req, res = makeRequestAndResponse()
         req.set('REQUEST_METHOD', 'GET')
         req.set('method', 'GET')
-        self.assertRaises(Forbidden, zum.manage_updatePassword,
-                          USER_ID, PASSWORD, PASSWORD, REQUEST=req)
-        # XXX: This method is broken
+        req.set('SESSION', {})
+        newSecurityManager(None, FauxUser(USER_ID))
+        try:
+            self.assertRaises(Forbidden, zum.manage_updatePassword,
+                            USER_ID, PASSWORD, PASSWORD, REQUEST=req)
 
+            req.set('REQUEST_METHOD', 'POST')
+            req.set('method', 'POST')
+            self.assertRaises(Forbidden, zum.manage_updatePassword,
+                            USER_ID, PASSWORD, PASSWORD, REQUEST=req)
+
+            # Works with a POST + CSRF toekn
+            req.form['csrf_token'] = 'deadbeef'
+            req.SESSION['_csrft_'] = 'deadbeef'
+            zum.manage_updatePassword(USER_ID, PASSWORD, PASSWORD, REQUEST=req)
+        finally:
+            noSecurityManager()
+
     def test_manage_removeUsers_POST_permissions(self):
         USER_ID = 'testuser'
         PASSWORD = 'password'
@@ -656,10 +693,17 @@
         req, res = makeRequestAndResponse()
         req.set('REQUEST_METHOD', 'GET')
         req.set('method', 'GET')
+        req.set('SESSION', {})
         self.assertRaises(Forbidden, zum.manage_removeUsers,
                           [USER_ID], REQUEST=req)
+
         req.set('REQUEST_METHOD', 'POST')
         req.set('method', 'POST')
+        self.assertRaises(Forbidden, zum.manage_removeUsers,
+                          [USER_ID], REQUEST=req)
+
+        req.form['csrf_token'] = 'deadbeef'
+        req.SESSION['_csrft_'] = 'deadbeef'
         zum.manage_removeUsers([USER_ID], REQUEST=req)
 
 

Modified: Products.PluggableAuthService/trunk/Products/PluggableAuthService/tests/test_utils.py
===================================================================
--- Products.PluggableAuthService/trunk/Products/PluggableAuthService/tests/test_utils.py	2012-11-16 00:54:57 UTC (rev 128302)
+++ Products.PluggableAuthService/trunk/Products/PluggableAuthService/tests/test_utils.py	2012-11-16 00:54:58 UTC (rev 128303)
@@ -96,6 +96,9 @@
 
 
 def _makeRequestWSession(**session):
+    from zope.interface import implementer
+    from zope.publisher.interfaces.browser import IBrowserRequest
+    @implementer(IBrowserRequest)
     class _Request(dict):
         pass
     request = _Request()
@@ -130,19 +133,19 @@
         return checkCSRFToken(*args, **kw)
 
     def test_wo_token_in_session_or_form_w_raises(self):
-        from ZPublisher import BadRequest
+        from ZPublisher import Forbidden
         request = _makeRequestWSession()
-        self.assertRaises(BadRequest, self._callFUT, request)
+        self.assertRaises(Forbidden, self._callFUT, request)
 
     def test_wo_token_in_session_or_form_wo_raises(self):
         request = _makeRequestWSession()
         self.assertFalse(self._callFUT(request, raises=False))
 
     def test_wo_token_in_session_w_token_in_form_w_raises(self):
-        from ZPublisher import BadRequest
+        from ZPublisher import Forbidden
         request = _makeRequestWSession()
         request.form['csrf_token'] = 'deadbeef'
-        self.assertRaises(BadRequest, self._callFUT, request)
+        self.assertRaises(Forbidden, self._callFUT, request)
 
     def test_wo_token_in_session_w_token_in_form_wo_raises(self):
         request = _makeRequestWSession()
@@ -150,19 +153,19 @@
         self.assertFalse(self._callFUT(request, raises=False))
 
     def test_w_token_in_session_wo_token_in_form_w_raises(self):
-        from ZPublisher import BadRequest
+        from ZPublisher import Forbidden
         request = _makeRequestWSession(_csrft_='deadbeef')
-        self.assertRaises(BadRequest, self._callFUT, request)
+        self.assertRaises(Forbidden, self._callFUT, request)
 
     def test_w_token_in_session_wo_token_in_form_wo_raises(self):
         request = _makeRequestWSession(_csrft_='deadbeef')
         self.assertFalse(self._callFUT(request, raises=False))
 
     def test_w_token_in_session_w_token_in_form_miss_w_raises(self):
-        from ZPublisher import BadRequest
+        from ZPublisher import Forbidden
         request = _makeRequestWSession(_csrft_='deadbeef')
         request.form['csrf_token'] = 'bab3l0f'
-        self.assertRaises(BadRequest, self._callFUT, request)
+        self.assertRaises(Forbidden, self._callFUT, request)
 
     def test_w_token_in_session_w_token_in_form_miss_wo_raises(self):
         request = _makeRequestWSession(_csrft_='deadbeef')
@@ -214,7 +217,7 @@
         self.assertRaises(ValueError, self._callFUT, no_request)
 
     def test_w_function_w_positional_REQUEST(self):
-        from ZPublisher import BadRequest
+        from ZPublisher import Forbidden
         def w_positional_request(foo, bar, REQUEST):
             "I haz REQUEST as positional arg"
             return 42
@@ -222,14 +225,14 @@
         self.assertEqual(wrapped.__name__, w_positional_request.__name__)
         self.assertEqual(wrapped.__module__, w_positional_request.__module__)
         self.assertEqual(wrapped.__doc__, w_positional_request.__doc__)
-        self.assertRaises(BadRequest, wrapped, foo=None, bar=None,
+        self.assertRaises(Forbidden, wrapped, foo=None, bar=None,
                           REQUEST=_makeRequestWSession())
         req = _makeRequestWSession(_csrft_='deadbeef')
         req.form['csrf_token'] = 'deadbeef'
         self.assertEqual(wrapped(foo=None, bar=None, REQUEST=req), 42)
 
     def test_w_function_w_optional_REQUEST(self):
-        from ZPublisher import BadRequest
+        from ZPublisher import Forbidden
         def w_optional_request(foo, bar, REQUEST=None):
             "I haz REQUEST as kw arg"
             return 42
@@ -237,7 +240,7 @@
         self.assertEqual(wrapped.__name__, w_optional_request.__name__)
         self.assertEqual(wrapped.__module__, w_optional_request.__module__)
         self.assertEqual(wrapped.__doc__, w_optional_request.__doc__)
-        self.assertRaises(BadRequest,
+        self.assertRaises(Forbidden,
                          wrapped, foo=None, bar=None,
                                   REQUEST=_makeRequestWSession())
         req = _makeRequestWSession(_csrft_='deadbeef')

Modified: Products.PluggableAuthService/trunk/Products/PluggableAuthService/utils.py
===================================================================
--- Products.PluggableAuthService/trunk/Products/PluggableAuthService/utils.py	2012-11-16 00:54:57 UTC (rev 128302)
+++ Products.PluggableAuthService/trunk/Products/PluggableAuthService/utils.py	2012-11-16 00:54:58 UTC (rev 128303)
@@ -23,7 +23,8 @@
 
 from AccessControl import ClassSecurityInfo
 from App.Common import package_home
-from ZPublisher import BadRequest
+from ZPublisher import Forbidden
+from zope.publisher.interfaces.browser import IBrowserRequest
 
 
 from zope import interface
@@ -203,7 +204,7 @@
 def checkCSRFToken(request, token='csrf_token', raises=True):
     """ Check CSRF token in session against token formdata.
     
-    If the values don't match, and 'raises' is True, raise a BadRequest.
+    If the values don't match, and 'raises' is True, raise a Forbidden.
     
     If the values don't match, and 'raises' is False, return False.
     
@@ -211,7 +212,7 @@
     """
     if request.form.get(token) != getCSRFToken(request):
         if raises:
-            raise BadRequest('incorrect CSRF token')
+            raise Forbidden('incorrect CSRF token')
         return False
     return True
 
@@ -235,11 +236,26 @@
 
 def csrf_only(wrapped):
     args, varargs, kwargs, defaults = inspect.getargspec(wrapped)
-    if 'REQUEST' in args:
-        def wrapper(REQUEST, *a, **kw):
-            checkCSRFToken(REQUEST)
-            return wrapped(REQUEST=REQUEST, *a, **kw)
-    else:
+    if 'REQUEST' not in args:
         raise ValueError("Method doesn't name request")
-    functools.update_wrapper(wrapper, wrapped)
-    return wrapper
+    r_index = args.index('REQUEST')
+ 
+    arglen = len(args)
+    if defaults is not None:
+        defaults = zip(args[arglen - len(defaults):], defaults)
+        arglen -= len(defaults)
+
+    spec = (args, varargs, kwargs, defaults)
+    argspec = inspect.formatargspec(formatvalue=lambda v: '=None', *spec)
+    callargs = inspect.formatargspec(formatvalue=lambda v: '', *spec)
+    lines = ['def wrapper' + argspec + ':',
+             '    if IBrowserRequest.providedBy(REQUEST):',
+             '        checkCSRFToken(REQUEST)',
+             '    return wrapped(' + ','.join(args) + ')',
+            ]
+    g = globals().copy()
+    l = locals().copy()
+    g['wrapped'] = wrapped
+    exec '\n'.join(lines) in g, l
+
+    return functools.wraps(wrapped)(l['wrapper'])



More information about the checkins mailing list