[Checkins] SVN: zc.tokenpolicy/trunk/src/zc/tokenpolicy/ Initial
import. A small variation on the zc.sharing policy that makes
the security policy enforce zope.locking tokens. Useful on
its own and as an example of enforcing the pure-policy
zope.locking tokens.
Gary Poster
gary at zope.com
Tue Aug 15 17:25:55 EDT 2006
Log message for revision 69545:
Initial import. A small variation on the zc.sharing policy that makes the security policy enforce zope.locking tokens. Useful on its own and as an example of enforcing the pure-policy zope.locking tokens.
Changed:
A zc.tokenpolicy/trunk/src/zc/tokenpolicy/README.txt
A zc.tokenpolicy/trunk/src/zc/tokenpolicy/__init__.py
A zc.tokenpolicy/trunk/src/zc/tokenpolicy/configure.zcml
A zc.tokenpolicy/trunk/src/zc/tokenpolicy/meta.zcml
A zc.tokenpolicy/trunk/src/zc/tokenpolicy/policy.py
A zc.tokenpolicy/trunk/src/zc/tokenpolicy/subscribers.py
A zc.tokenpolicy/trunk/src/zc/tokenpolicy/tests.py
A zc.tokenpolicy/trunk/src/zc/tokenpolicy/zcml.py
A zc.tokenpolicy/trunk/src/zc/tokenpolicy/zcml.txt
-=-
Added: zc.tokenpolicy/trunk/src/zc/tokenpolicy/README.txt
===================================================================
--- zc.tokenpolicy/trunk/src/zc/tokenpolicy/README.txt 2006-08-15 21:23:27 UTC (rev 69544)
+++ zc.tokenpolicy/trunk/src/zc/tokenpolicy/README.txt 2006-08-15 21:25:55 UTC (rev 69545)
@@ -0,0 +1,257 @@
+The tokenpolicy package supplies a modified sharing policy that honors tokens
+from the zope.locking package.
+
+-----------
+Basic Setup
+-----------
+
+To test the tokenpolicy code, we need an object that may be locked and
+shared. This effectively means an object that can be adapted to
+zope.app.keyreference.interfaces.IKeyReference,
+that can be adapted to zope.app.annotation.interfaces.IAnnotations, and that
+implements zc.sharing.interfaces.ISharable and
+zope.app.annotation.interfaces.IAnnotatable. Our first step will be to create
+a factory for some demonstration objects that meet these requirements.
+
+ >>> from zope import interface, component
+ >>> import zc.sharing.interfaces
+ >>> import zope.annotation.interfaces
+ >>> import zope.annotation.attribute
+ >>> import zope.app.keyreference.interfaces
+ >>> class IDemo(interface.Interface):
+ ... """a demonstration interface for a demonstration class"""
+ ...
+ >>> class Demo(object):
+ ... interface.implements(
+ ... zc.sharing.interfaces.ISharable,
+ ... zope.annotation.interfaces.IAttributeAnnotatable,
+ ... IDemo)
+ ...
+ >>> component.provideAdapter(
+ ... zope.annotation.attribute.AttributeAnnotations)
+ >>> class DemoKeyReference(object):
+ ... component.adapts(IDemo)
+ ... _class_counter = 0
+ ... interface.implements(
+ ... zope.app.keyreference.interfaces.IKeyReference)
+ ... def __init__(self, context):
+ ... self.context = context
+ ... class_ = type(self)
+ ... self._id = getattr(context, '__demo_key_reference__', None)
+ ... if self._id is None:
+ ... self._id = class_._class_counter
+ ... context.__demo_key_reference__ = self._id
+ ... class_._class_counter += 1
+ ... key_type_id = 'zope.locking.README.DemoKeyReference'
+ ... def __call__(self):
+ ... return self.context
+ ... def __hash__(self):
+ ... return (self.key_type_id, self._id)
+ ... def __cmp__(self, other):
+ ... if self.key_type_id == other.key_type_id:
+ ... return cmp(self._id, other._id)
+ ... return cmp(self.key_type_id, other.key_type_id)
+ ...
+ >>> component.provideAdapter(DemoKeyReference)
+
+We also need to provide adapters to the ISharing interfaces.
+
+ >>> import zc.sharing.sharing
+ >>> component.provideAdapter(zc.sharing.sharing.BaseSharing)
+ >>> component.provideAdapter(zc.sharing.sharing.Sharing)
+
+Additional setup is that you need to define privileges and lock privileges.
+We'll assume this has been done already, defining Read at id 0, Write at id 2,
+and Share at id 4; and defining the lock privileges as being a set of a single
+privilege, Write/2. (For test purposes, these are defined in tests.py.)
+
+>>> demo = Demo()
+>>> sharing = zc.sharing.interfaces.ISharing(demo)
+
+-------------------------------------
+Security Policy and zope.locking Tokens
+-------------------------------------
+
+This package includes a simple change to the sharing policy that checks for
+the zope.locking token utility and disallows access to privileges registered
+in zc.tokenpolicy.policy if a token exists for the object and none of the
+security identifiers (user id and group id) for each principal in the
+interaction are members of the token. This enables the tokenpolicy package to
+be used as an enforcement mechanism for lock and freeze tokens in the
+zope.locking package.
+
+More interesting and flexible policies could be assembled. One might allow
+multiple token utilities, each managing a different set of privileges.
+Another might supply alternate tokens, so that a read lock token would have a
+different meaning to the security policy than a write lock token. These
+possibilities are unexplored at the moment because a simpler solution is
+sufficient for the current use cases.
+
+To use the provided policy, you must define privileges that are controlled by
+the tokens. This is usually done with ZCML. The ZCML directives
+(see zcml.txt) themselves rely on five functions in the policy module:
+definePrivilege, definePrivilegesByTitle, getPrivileges, getPrivilegeTitles,
+and removePrivilege.
+
+ >>> import zc.sharing.sharing
+ >>> from zc.tokenpolicy import policy
+ >>> sorted(policy.getPrivileges())
+ []
+ >>> policy.definePrivilege(2)
+ >>> sorted(policy.getPrivileges())
+ [2]
+ >>> policy.definePrivilege(0)
+ >>> sorted(policy.getPrivileges())
+ [0, 2]
+ >>> policy.removePrivilege(2)
+ >>> sorted(policy.getPrivileges())
+ [0]
+ >>> policy.definePrivilegesByTitle(
+ ... ('Read', 'Write', 'Share'))
+ >>> sorted(policy.getPrivileges())
+ [0, 2, 4]
+ >>> sorted(policy.getPrivilegeTitles())
+ [u'Read', u'Share', u'Write']
+ >>> policy.removePrivilege(2)
+ >>> policy.removePrivilege(4)
+ >>> policy.removePrivilege(0)
+ >>> sorted(policy.getPrivileges())
+ []
+
+We'll use just 'Write' as our token privilege. We also need to do some other
+setup: define some permissions mapped to our privileges, set up the policy,
+set up an interaction for us to use, and set up the token utility.
+
+ >>> policy.definePrivilege(2)
+ >>> import zc.sharing.policy
+ >>> zc.sharing.policy.permissionPrivilege('R1', 0)
+ ... # R1 permission -> Read privilege
+ >>> zc.sharing.policy.permissionPrivilege('W1', 2)
+ ... # W1 permission -> Write privilege
+ >>> zc.sharing.policy.permissionPrivilege('S1', 4)
+ ... # S1 permission -> Share privilege
+ >>> import zope.security.management
+ >>> oldPolicy = zope.security.management.setSecurityPolicy(
+ ... policy.SecurityPolicy)
+
+ >>> class Principal:
+ ... def __init__(self, id):
+ ... self.id = id
+ ... self.groups = []
+
+ >>> joe = Principal('joe')
+ >>> class Participation:
+ ... interaction = None
+ >>> participation = Participation()
+ >>> participation.principal = joe
+ >>> zope.security.management.endInteraction()
+ >>> zope.security.management.newInteraction(participation)
+ >>> interaction = zope.security.management.getInteraction()
+
+ >>> import zope.locking.utility
+ >>> import zope.locking.interfaces
+ >>> util = zope.locking.utility.TokenUtility()
+ >>> component.provideUtility(
+ ... util,
+ ... provides=zope.locking.interfaces.ITokenUtility)
+ >>> import zope.component.interfaces
+ >>> @interface.implementer(zope.component.interfaces.IComponentLookup)
+ ... @component.adapter(interface.Interface)
+ ... def siteManager(obj):
+ ... return component.getGlobalSiteManager()
+ ...
+ >>> component.provideAdapter(siteManager)
+
+Let's give Joe the 'Read' and 'Write' privileges on the demo object.
+
+ >>> sharing.setPrivileges('joe', ('Read', 'Write'))
+
+Now, if we currently ask if joe has the 'W1' permission--part of the Write
+privilege--on demo, he can:
+
+ >>> interaction.checkPermission('W1', demo)
+ True
+
+If we freeze the object, though, he shouldn't have the permission, though.
+
+ >>> from zope.locking.tokens import EndableFreeze
+ >>> token = util.register(EndableFreeze(demo))
+ >>> interaction.checkPermission('W1', demo)
+ True
+
+But he did! Why? Because of caching, of course.
+
+ >>> interaction.invalidateCache()
+ >>> interaction.checkPermission('W1', demo)
+ False
+
+The cache invalidation is fairly annoying, and easy to forget. This suggests
+that we should perhaps invalidate the interaction cache whenever we get a
+locking event. This package includes a subscriber to do just that.
+
+This won't catch tokens that expire in the middle of a transaction: we assume
+that our transactions won't be long enough for that to be a serious problem.
+If that were not sufficient, you would need trickier solutions. In any case,
+let's register the handler.
+
+ >>> from zc.tokenpolicy import subscribers
+ >>> component.provideHandler(subscribers.tokenSubscriber)
+
+That means we won't need to call invalidateCache for any of the other
+examples.
+
+So, Joe doesn't have the W1 permission because of the freeze token. He still
+has permissions not blocked by the token, though.
+
+ >>> interaction.checkPermission('R1', demo)
+ True
+
+Missing permissions still are missing, of course.
+
+ >>> interaction.checkPermission('S1', demo)
+ False
+
+When the token ends, joe regains access to W1.
+
+ >>> token.end()
+ >>> interaction.checkPermission('W1', demo)
+ True
+
+If there's an active token that joe is a part of, he can access the object
+after all.
+
+ >>> from zope.locking.tokens import SharedLock
+ >>> token = util.register(SharedLock(demo, ('mary', 'joe')))
+ >>> interaction.checkPermission('W1', demo)
+ True
+
+Importantly, participation in the token is simply a mask, and does not grant
+access when there is none. For instance, if we take the Write privilege away
+from joe, the lock token won't get him anything.
+
+ >>> sharing.removePrivileges('joe', ('Write',))
+ >>> interaction.checkPermission('W1', demo)
+ False
+
+Now we'll give him the privilege back, but take him out of the lock. No
+permission.
+
+ >>> sharing.addPrivileges('joe', ('Write',))
+ >>> interaction.checkPermission('W1', demo)
+ True
+ >>> token.remove(('joe',))
+ >>> interaction.checkPermission('W1', demo)
+ False
+
+If we add him back, he has it, and if we end the token he has it. It's pretty
+simple.
+
+ >>> token.add(('joe',))
+ >>> interaction.checkPermission('W1', demo)
+ True
+ >>> token.end()
+ >>> interaction.checkPermission('W1', demo)
+ True
+
+ >>> discard = zope.security.management.setSecurityPolicy(oldPolicy)
+ >>> zope.security.management.restoreInteraction()
Property changes on: zc.tokenpolicy/trunk/src/zc/tokenpolicy/README.txt
___________________________________________________________________
Name: svn:eol-style
+ native
Added: zc.tokenpolicy/trunk/src/zc/tokenpolicy/__init__.py
===================================================================
Added: zc.tokenpolicy/trunk/src/zc/tokenpolicy/configure.zcml
===================================================================
--- zc.tokenpolicy/trunk/src/zc/tokenpolicy/configure.zcml 2006-08-15 21:23:27 UTC (rev 69544)
+++ zc.tokenpolicy/trunk/src/zc/tokenpolicy/configure.zcml 2006-08-15 21:25:55 UTC (rev 69545)
@@ -0,0 +1,7 @@
+<configure
+ xmlns="http://namespaces.zope.org/zope"
+ i18n_domain="zc.tokenpolicy">
+
+ <securityPolicy component=".policy.SecurityPolicy" />
+
+</configure>
\ No newline at end of file
Added: zc.tokenpolicy/trunk/src/zc/tokenpolicy/meta.zcml
===================================================================
--- zc.tokenpolicy/trunk/src/zc/tokenpolicy/meta.zcml 2006-08-15 21:23:27 UTC (rev 69544)
+++ zc.tokenpolicy/trunk/src/zc/tokenpolicy/meta.zcml 2006-08-15 21:25:55 UTC (rev 69545)
@@ -0,0 +1,9 @@
+<configure xmlns="http://namespaces.zope.org/zope"
+ xmlns:meta="http://namespaces.zope.org/meta">
+
+ <meta:directive namespace="http://namespaces.zope.com/zc"
+ name="tokenPrivileges"
+ schema=".zcml.IPrivileges"
+ handler=".zcml.Privileges" />
+
+</configure>
Added: zc.tokenpolicy/trunk/src/zc/tokenpolicy/policy.py
===================================================================
--- zc.tokenpolicy/trunk/src/zc/tokenpolicy/policy.py 2006-08-15 21:23:27 UTC (rev 69544)
+++ zc.tokenpolicy/trunk/src/zc/tokenpolicy/policy.py 2006-08-15 21:25:55 UTC (rev 69545)
@@ -0,0 +1,79 @@
+from zope import component
+
+import zc.sharing.policy
+import zc.sharing.interfaces
+import zope.locking.interfaces
+import zc.sharing.sharing
+from zope.security.proxy import removeSecurityProxy
+
+class SecurityPolicy(zc.sharing.policy.SecurityPolicy):
+
+ def cachedDecision(self, parent, principal, groups, privilege):
+ # Return the decision for a principal and permission
+
+ cache = self.cache(parent)
+ try:
+ cache_decision = cache.decision
+ except AttributeError:
+ cache_decision = cache.decision = {}
+
+ cache_decision_prin = cache_decision.get(principal)
+ if not cache_decision_prin:
+ cache_decision_prin = cache_decision[principal] = {}
+
+ try:
+ return cache_decision_prin[privilege]
+ except KeyError:
+ pass
+
+ sharing = zc.sharing.interfaces.IBaseSharing(parent, None)
+ if sharing is not None:
+ decision = sharing.sharedTo(privilege, groups)
+ # insert new zope.locking code
+ if decision and privilege in getPrivileges():
+ utility = component.queryUtility(
+ zope.locking.interfaces.ITokenUtility, context=parent)
+ if utility is not None:
+ token = utility.get(parent)
+ if token is not None:
+ for p in token.principal_ids:
+ if p in groups:
+ break
+ else:
+ decision = False
+ # end new code
+ elif parent is None:
+ decision = False
+ else:
+ parent = removeSecurityProxy(getattr(parent, '__parent__', None))
+ decision = self.cachedDecision(
+ parent, principal, groups, privilege)
+
+ cache_decision_prin[privilege] = decision
+ return decision
+
+_privileges = frozenset()
+
+def definePrivilege(bit):
+ global _privileges
+ tmp = set(_privileges)
+ tmp.add(bit)
+ _privileges = frozenset(tmp)
+
+def definePrivilegesByTitle(titles):
+ assert not isinstance(titles, basestring) # try and avoid a common error
+ for title in titles:
+ bit = zc.sharing.sharing.getIdByTitle(title)
+ definePrivilege(bit)
+
+def getPrivileges():
+ return _privileges
+
+def getPrivilegeTitles():
+ return [zc.sharing.sharing.getPrivilege(i)['title'] for i in _privileges]
+
+def removePrivilege(bit):
+ global _privileges
+ tmp = set(_privileges)
+ tmp.remove(bit)
+ _privileges = frozenset(tmp)
Added: zc.tokenpolicy/trunk/src/zc/tokenpolicy/subscribers.py
===================================================================
--- zc.tokenpolicy/trunk/src/zc/tokenpolicy/subscribers.py 2006-08-15 21:23:27 UTC (rev 69544)
+++ zc.tokenpolicy/trunk/src/zc/tokenpolicy/subscribers.py 2006-08-15 21:25:55 UTC (rev 69545)
@@ -0,0 +1,14 @@
+from zope import component
+import zope.locking.interfaces
+import zope.security.management
+
+ at component.adapter(zope.locking.interfaces.ITokenEvent)
+def tokenSubscriber(ev):
+ interaction = zope.security.management.queryInteraction()
+ if interaction is not None:
+ try:
+ invalidateCache = interaction.invalidateCache
+ except AttributeError:
+ pass
+ else:
+ invalidateCache()
Added: zc.tokenpolicy/trunk/src/zc/tokenpolicy/tests.py
===================================================================
--- zc.tokenpolicy/trunk/src/zc/tokenpolicy/tests.py 2006-08-15 21:23:27 UTC (rev 69544)
+++ zc.tokenpolicy/trunk/src/zc/tokenpolicy/tests.py 2006-08-15 21:25:55 UTC (rev 69545)
@@ -0,0 +1,59 @@
+import unittest
+from zope.app.testing import placelesssetup
+from zope.configuration import xmlconfig
+from zope.testing import module
+
+import zc.sharing.sharing
+
+from zc.tokenpolicy import policy
+
+def zcml(s):
+ context = xmlconfig.file('meta.zcml', package=zc.tokenpolicy)
+ xmlconfig.string(s, context)
+
+def zcmlSetUp(test):
+ placelesssetup.setUp()
+ zc.sharing.sharing.definePrivilege(0, u'Read')
+ zc.sharing.sharing.definePrivilege(2, u'Write')
+ zc.sharing.sharing.definePrivilege(4, u'Share')
+ module.setUp(test, 'zc.tokenpolicy.zcml_text')
+
+def zcmlTearDown(test):
+ for pid in policy.getPrivileges():
+ policy.removePrivilege(pid)
+ zc.sharing.sharing.clearPrivileges()
+ module.tearDown(test, 'zc.tokenpolicy.zcml_text')
+ placelesssetup.tearDown()
+
+def setUp(test):
+ placelesssetup.setUp()
+ zc.sharing.sharing.definePrivilege(0, u'Read')
+ zc.sharing.sharing.definePrivilege(2, u'Write')
+ zc.sharing.sharing.definePrivilege(4, u'Share')
+
+def tearDown(test):
+ zc.sharing.sharing.clearPrivileges()
+ for pid in policy.getPrivileges():
+ policy.removePrivilege(pid)
+ placelesssetup.tearDown()
+
+def test_suite():
+ from zope.testing import doctest
+ return unittest.TestSuite((
+ doctest.DocFileSuite(
+ 'zcml.txt', globs={'zcml': zcml},
+ setUp=zcmlSetUp, tearDown=zcmlTearDown,
+ optionflags=doctest.NORMALIZE_WHITESPACE,
+ ),
+ doctest.DocFileSuite(
+ 'README.txt',
+ setUp=setUp, tearDown=tearDown,
+ ),
+# doctest.DocFileSuite(
+# 'index.txt',
+# setUp=placelesssetup.setUp, tearDown=tearDownIndex,
+# ),
+ ))
+
+if __name__ == '__main__':
+ unittest.main(defaultTest='test_suite')
Added: zc.tokenpolicy/trunk/src/zc/tokenpolicy/zcml.py
===================================================================
--- zc.tokenpolicy/trunk/src/zc/tokenpolicy/zcml.py 2006-08-15 21:23:27 UTC (rev 69544)
+++ zc.tokenpolicy/trunk/src/zc/tokenpolicy/zcml.py 2006-08-15 21:25:55 UTC (rev 69545)
@@ -0,0 +1,19 @@
+from zope import interface, schema
+import zope.configuration.fields
+from zc.tokenpolicy import policy
+
+class IPrivileges(interface.Interface):
+
+ titles = zope.configuration.fields.Tokens(
+ title=u"Privileges",
+ description=u"List of privilege titles",
+ value_type=schema.TextLine(),
+ )
+
+def Privileges(_context, titles):
+
+ _context.action(
+ discriminator='zc:tokenPrivileges',
+ callable=policy.definePrivilegesByTitle,
+ args=(titles,),
+ )
Added: zc.tokenpolicy/trunk/src/zc/tokenpolicy/zcml.txt
===================================================================
--- zc.tokenpolicy/trunk/src/zc/tokenpolicy/zcml.txt 2006-08-15 21:23:27 UTC (rev 69544)
+++ zc.tokenpolicy/trunk/src/zc/tokenpolicy/zcml.txt 2006-08-15 21:25:55 UTC (rev 69545)
@@ -0,0 +1,35 @@
+Sharing Configuration directives
+================================
+
+This package provides a couple of ZCML configuration directives for setting up
+the sharing security model.
+
+Defining lock privileges
+------------------------
+
+The tokenpolicy package allows zope.locking tokens to be associated with
+the security policy. Defining the set of privileges affected by the
+tokens is done using the tokenPrivileges directive:
+
+ >>> zcml("""
+ ... <configure
+ ... xmlns="http://namespaces.zope.org/zope"
+ ... xmlns:zc="http://namespaces.zope.com/zc"
+ ... >
+ ... <zc:tokenPrivileges titles="Write" />
+ ... </configure>
+ ... """)
+
+To display the titles for the test we use a little helper:
+
+ >>> from zc.tokenpolicy import policy
+ >>> import zc.sharing.sharing
+ >>> def getPrivilegeTitles(ids):
+ ... return sorted(
+ ... zc.sharing.sharing.getPrivilege(pid)['title'] for pid in ids)
+
+So now we can see that the privileges have been set correctly:
+
+ >>> getPrivilegeTitles(policy.getPrivileges())
+ [u'Write']
+
Property changes on: zc.tokenpolicy/trunk/src/zc/tokenpolicy/zcml.txt
___________________________________________________________________
Name: svn:eol-style
+ native
More information about the Checkins
mailing list