[Checkins] SVN: z3c.dav/trunk/src/z3c/dav/ Implement the IF header
as an extension to z3c.condtionalviews.
Michael Kerrin
michael.kerrin at openapp.ie
Thu May 31 15:18:43 EDT 2007
Log message for revision 76055:
Implement the IF header as an extension to z3c.condtionalviews.
Changed:
U z3c.dav/trunk/src/z3c/dav/configure.zcml
A z3c.dav/trunk/src/z3c/dav/ifvalidator.py
U z3c.dav/trunk/src/z3c/dav/tests/test_doctests.py
-=-
Modified: z3c.dav/trunk/src/z3c/dav/configure.zcml
===================================================================
--- z3c.dav/trunk/src/z3c/dav/configure.zcml 2007-05-31 19:16:33 UTC (rev 76054)
+++ z3c.dav/trunk/src/z3c/dav/configure.zcml 2007-05-31 19:18:43 UTC (rev 76055)
@@ -63,6 +63,23 @@
name="UNLOCK"
/>
+ <utility
+ factory=".ifvalidator.IFValidator"
+ name="webdav.ifheader"
+ />
+
+ <utility
+ component="z3c.conditionalviews.ConditionalHTTPRequest"
+ provides="zope.app.publication.interfaces.IHTTPRequestFactory"
+ />
+
+ <adapter
+ for="zope.interface.Interface
+ zope.publisher.interfaces.http.IHTTPRequest
+ zope.interface.Interface"
+ factory=".ifvalidator.StateTokens"
+ />
+
<!--
Collection of display widget definitions.
-->
Added: z3c.dav/trunk/src/z3c/dav/ifvalidator.py
===================================================================
--- z3c.dav/trunk/src/z3c/dav/ifvalidator.py (rev 0)
+++ z3c.dav/trunk/src/z3c/dav/ifvalidator.py 2007-05-31 19:18:43 UTC (rev 76055)
@@ -0,0 +1,632 @@
+##############################################################################
+# Copyright (c) 2007 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+##############################################################################
+"""
+The definition of the `If` header is [10.4.2]:
+
+ If = "If" ":" ( 1*No-tag-list | 1*Tagged-list )
+
+ No-tag-list = List
+ Tagged-list = Resource-Tag 1*List
+
+ List = "(" 1*Condition ")"
+ Condition = ["Not"] (State-token | "[" entity-tag "]")
+ ; entity-tag: see Section 3.11 of [RFC2616]
+ ; No LWS allowed between "[", entity-tag and "]"
+
+ State-token = Coded-URL
+
+ Resource-Tag = "<" Simple-ref ">"
+ ; Simple-ref: see Section 8.3
+ ; No LWS allowed in Resource-Tag
+
+In order to evaluate this header we must following these rules [10.4.3]:
+
+ Each List production describes a series of conditions. The whole
+ list evaluates to true if and only if each condition evaluates to
+ true (that is, the list represents a logical conjunction of
+ Conditions).
+
+ Each No-tag-list and Tagged-list production may contain one or more
+ Lists. They evaluate to true if and only if any of the contained
+ lists evaluates to true (that is, if there's more than one List, that
+ List sequence represents a logical disjunction of the Lists).
+
+ Finally, the whole If header evaluates to true if and only if at
+ least one of the No-tag-list or Tagged-list productions evaluates to
+ true. If the header evaluates to false, the server MUST reject the
+ request with a 412 (Precondition Failed) status. Otherwise,
+ execution of the request can proceed as if the header wasn't present.
+
+"""
+import re
+import urlparse
+from cStringIO import StringIO
+
+import zope.component
+import zope.interface
+import zope.schema
+import zope.traversing.api
+import zope.app.http.interfaces
+import z3c.dav.coreproperties
+import z3c.dav.interfaces
+import z3c.conditionalviews.interfaces
+
+# Resource-Tag = "<" Simple-ref ">"
+resource_tag = re.compile(r"<(?P<resource>.+?)>")
+# Condition = ["Not"] (State-token | "[" entity-tag "]")
+condition = re.compile(
+ r"(?P<notted>not)?\s*(<(?P<state_token>.*?)>|\[(?P<entity_tag>\S+?)\])+",
+ re.I)
+
+STATE_ANNOTS = "z3c.conditionalviews.stateresults"
+
+class IStateTokens(zope.interface.Interface):
+
+ tokens = zope.schema.List(
+ title = u"State tokens",
+ description = u"List of the current state tokens.")
+
+
+class ListCondition(object):
+
+ def __init__(self, notted = False, state_token = None, entity_tag = None):
+ self.notted = notted
+ self.state_token = state_token
+ self.entity_tag = entity_tag
+
+
+class IFValidator(object):
+ """
+
+ >>> import UserDict
+ >>> import zope.interface.verify
+ >>> import zope.publisher.interfaces
+ >>> import zope.publisher.interfaces.http
+ >>> from zope.publisher.interfaces import IPublishTraverse
+ >>> from zope.publisher.browser import TestRequest
+ >>> from zope.traversing.interfaces import IPhysicallyLocatable
+ >>> from zope.app.publication.zopepublication import ZopePublication
+
+ The validator is a utility that implements the IF header conditional
+ request as specified in the WebDAV specification.
+
+ >>> validator = IFValidator()
+ >>> zope.interface.verify.verifyObject(
+ ... z3c.conditionalviews.interfaces.IHTTPValidator, validator)
+ True
+
+ We can only evaluate this request if the request contains an `IF` header.
+
+ >>> request = TestRequest()
+ >>> validator.evaluate(None, request, None)
+ False
+ >>> request = TestRequest(environ = {'IF': '(<DAV:no-lock>)'})
+ >>> validator.evaluate(None, request, None)
+ True
+
+ We need to set up following adapters for the validator to work.
+
+ >>> class ReqAnnotation(UserDict.IterableUserDict):
+ ... zope.interface.implements(zope.annotation.interfaces.IAnnotations)
+ ... def __init__(self, request):
+ ... self.data = request._environ.setdefault('annotation', {})
+ >>> zope.component.getGlobalSiteManager().registerAdapter(
+ ... ReqAnnotation, (zope.publisher.interfaces.http.IHTTPRequest,))
+
+ >>> class ETag(object):
+ ... zope.interface.implements(z3c.conditionalviews.interfaces.IETag)
+ ... def __init__(self, context, request, view):
+ ... pass
+ ... etag = None
+ ... weak = False
+ >>> zope.component.getGlobalSiteManager().registerAdapter(
+ ... ETag, (None, TestRequest, None))
+
+ >>> class Statetokens(object):
+ ... zope.interface.implements(IStateTokens)
+ ... def __init__(self, context, request, view):
+ ... self.context = context
+ ... @property
+ ... def tokens(self):
+ ... if getattr(self.context, '_tokens', None) is not None:
+ ... return self.context._tokens
+ ... if self.context:
+ ... return [self.context.__name__]
+ ... return []
+ >>> zope.component.getGlobalSiteManager().registerAdapter(
+ ... Statetokens, (None, TestRequest, None))
+
+ >>> class Demo(object):
+ ... zope.interface.implements(IPublishTraverse)
+ ... def __init__(self, name):
+ ... self.__name__ = name
+ ... self.__parent__ = None
+ ... self.children = {}
+ ... def add(self, value):
+ ... self.children[value.__name__] = value
+ ... value.__parent__ = self
+ ... def publishTraverse(self, request, name):
+ ... child = self.children.get(name, None)
+ ... if child:
+ ... return child
+ ... raise zope.publisher.interfaces.NotFound(self, name, request)
+
+ >>> class PhysicallyLocatable(object):
+ ... zope.interface.implements(IPhysicallyLocatable)
+ ... def __init__(self, context):
+ ... self.context = context
+ ... def getRoot(self):
+ ... return root
+ ... def getPath(self):
+ ... return '/' + self.context.__name__
+ >>> zope.component.getGlobalSiteManager().registerAdapter(
+ ... PhysicallyLocatable, (zope.interface.Interface,))
+
+ Firstly nothing matches the <DAV:no-lock> state token.
+
+ >>> resource = Demo('test')
+ >>> validator.valid(resource, request, None)
+ False
+
+ The invalid status for this protocol is 412, for all methods.
+
+ >>> validator.invalidStatus(resource, request, None)
+ 412
+
+ Non-tagged lists
+ ================
+
+ Entity tag condition
+ --------------------
+
+ >>> ETag.etag = 'xx'
+ >>> request = TestRequest(environ = {'IF': '(["xx"])'})
+ >>> validator.valid(resource, request, None)
+ True
+ >>> request._environ['IF'] = '(["xx"] ["yy"])'
+ >>> validator.valid(resource, request, None)
+ False
+ >>> request._environ['IF'] = '(["xx"])(["yy"])'
+ >>> validator.valid(resource, request, None)
+ True
+ >>> request._environ['IF'] = '(["yy"])(["xx"])'
+ >>> validator.valid(resource, request, None)
+ True
+
+ The request object gets annotated with the results of the matching any
+ state tokens, so that I only need to parse this data once. But since only
+ entity tags have been passed so far the request object will not be
+ annotated with the results of the state tokens.
+
+ >>> getStateResults(request)
+ {}
+
+ Not - entity tag conditions
+ ---------------------------
+
+ >>> request = TestRequest(environ = {'IF': '(not ["xx"])'})
+ >>> validator.valid(resource, request, None)
+ False
+ >>> request._environ['IF'] = '(not ["xx"] ["yy"])'
+ >>> validator.valid(resource, request, None)
+ False
+ >>> request._environ['IF'] = '(not ["xx"])(["yy"])'
+ >>> validator.valid(resource, request, None)
+ False
+ >>> request._environ['IF'] = '(not ["yy"])'
+ >>> validator.valid(resource, request, None)
+ True
+
+ >>> getStateResults(request)
+ {}
+
+ State-tokens
+ ------------
+
+ If the request is False then we have no need for the state tokens but
+ they just default then to an empty dictionary.
+
+ >>> request = TestRequest(environ = {'IF': '(<locktoken>)'})
+ >>> validator.valid(resource, request, None)
+ False
+ >>> getStateResults(request)
+ {}
+
+ >>> resource._tokens = ['locktoken']
+ >>> validator.valid(resource, request, None)
+ True
+ >>> getStateResults(request)
+ {'/test': {'locktoken': True}}
+
+ If there are multiple locktokens associated with a resource, we only
+ are interested in the token that is represented in the IF header, so we
+ only have one entry in the state results variable.
+
+ >>> resource._tokens = ['locktoken', 'nolocktoken']
+ >>> validator.valid(resource, request, None)
+ True
+ >>> getStateResults(request)
+ {'/test': {'locktoken': True}}
+ >>> request._environ['IF'] = '(NOT <locktoken>)'
+ >>> validator.valid(resource, request, None)
+ False
+
+ >>> request._environ['IF'] = '(NOT <invalidlocktoken>)'
+ >>> validator.valid(resource, request, None)
+ True
+ >>> getStateResults(request)
+ {'/test': {'invalidlocktoken': False}}
+
+ Combined entity / state tokens
+ ------------------------------
+
+ >>> request = TestRequest(environ = {'IF': '(<locktoken> ["xx"])'})
+ >>> validator.valid(resource, request, None)
+ True
+ >>> resource._tokens = ['nolocktoken']
+ >>> validator.valid(resource, request, None)
+ False
+
+ >>> request._environ['IF'] = '(<nolocktoken> ["xx"]) (["yy"])'
+ >>> validator.valid(resource, request, None)
+ True
+
+ >>> request._environ['IF'] = '(<nolocktoken> ["yy"]) (["xx"])'
+ >>> validator.valid(resource, request, None)
+ True
+
+ Resources
+ =========
+
+ We now need to test the situation when a resource is specified in the
+ the `IF` header. We need to define a context here so that we can find
+ the specified resource from the header. Make the context implement
+ IPublishTraverse so that we know how to traverse to the next segment
+ in the path.
+
+ Setup up three content object. One is the context for validation, the
+ second is a locked resource, and the last is the root of the site.
+
+ >>> root = Demo('root')
+ >>> locked = Demo('locked')
+ >>> root.add(locked)
+ >>> demo = Demo('demo')
+ >>> root.add(demo)
+
+ The request needs more setup in order for the traversal to work. We
+ need and a publication object and since our TestRequets object extends
+ the BrowserRequest pretend that the method is `FROG` so that the
+ traversal doesn't try and find a default view for the resource. This
+ would require more setup.
+
+ >>> request.setPublication(ZopePublication(None))
+ >>> request._environ['REQUEST_METHOD'] = 'FROG'
+
+ Test that we can find the locked resource from the demo resource.
+
+ >>> resource = validator.get_resource(
+ ... demo, request, 'http://localhost/locked')
+ >>> resource.__name__
+ 'locked'
+ >>> resource = validator.get_resource(demo, request, '/locked')
+ >>> resource.__name__
+ 'locked'
+
+ The state tokens for all content objects is there `__name__` attribute.
+
+ >>> request._environ['IF'] = '<http://localhost/locked> (<demo>)'
+ >>> validator.valid(demo, request, None)
+ False
+ >>> request._environ['IF'] = '<http://localhost/locked> (<locked>)'
+ >>> validator.valid(demo, request, None)
+ True
+ >>> request._environ['IF'] = '<http://localhost/demo> (<demo>)'
+ >>> validator.valid(demo, request, None)
+ True
+
+ If a specified resource does not exist then the only way for the IF header
+ to match is for the state tokens to be `(Not <locktoken>)`.
+
+ >>> request._environ['IF'] = '<http://localhost/missing> (<locked>)'
+ >>> validator.valid(demo, request, None)
+ False
+
+ In this case when we try to match against `(Not <locked>)` but we stored
+ state is still matched.
+
+ >>> request._environ['IF'] = '<http://localhost/missing> (Not <locked>)'
+ >>> validator.valid(demo, request, None)
+ True
+ >>> getStateResults(request)
+ {'/missing': {'locked': False}}
+
+ Invalid data
+ ============
+
+ When the if header fails to parse then the request should default to
+ True, but if we matched any resources / state tokens before the parse
+ error then we should still store this.
+
+ >>> request._environ['IF'] = '</ddd> (hi)'
+ >>> validator.valid(demo, request, None)
+ True
+ >>> getStateResults(request)
+ {}
+
+ The IF header parses until the list condition which is missing the angular
+ brackets.
+
+ >>> request._environ['IF'] = '</ddd> (<hi>) (there)'
+ >>> validator.valid(demo, request, None)
+ True
+ >>> getStateResults(request)
+ {'/ddd': {'hi': False}}
+
+ Try what happens when there is no starting '(' for a list.
+
+ >>> request._environ['IF'] = '</ddd> <hi>'
+ >>> validator.valid(demo, request, None)
+ True
+ >>> getStateResults(request)
+ {}
+
+ Expected a '(' in the IF header but the header was already parsed.
+
+ >>> request._environ['IF'] = '</ddd> (<hi>) <hi>'
+ >>> validator.valid(demo, request, None)
+ True
+ >>> getStateResults(request)
+ {'/ddd': {'hi': False}}
+
+ Expected a '(' in the IF header.
+
+ >>> request._environ['IF'] = '</ddd> (<hi>) hi'
+ >>> validator.valid(demo, request, None)
+ True
+ >>> getStateResults(request)
+ {'/ddd': {'hi': False}}
+
+ Cleanup
+ =======
+
+ >>> zope.component.getGlobalSiteManager().unregisterAdapter(
+ ... ReqAnnotation, (zope.publisher.interfaces.http.IHTTPRequest,))
+ True
+ >>> zope.component.getGlobalSiteManager().unregisterAdapter(
+ ... ETag, (None, TestRequest, None))
+ True
+ >>> zope.component.getGlobalSiteManager().unregisterAdapter(
+ ... Statetokens, (None, TestRequest, None))
+ True
+ >>> zope.component.getGlobalSiteManager().unregisterAdapter(
+ ... PhysicallyLocatable, (zope.interface.Interface,))
+ True
+
+ """
+ zope.interface.implements(z3c.conditionalviews.interfaces.IHTTPValidator)
+
+ def evaluate(self, context, request, view):
+ return request.getHeader("If", None) is not None
+
+ def get_next_list(self, request):
+ header = request.getHeader("If").lstrip()
+
+ resource = None
+
+ while header:
+ rmatch = resource_tag.match(header)
+ if rmatch:
+ resource = rmatch.group("resource")
+ header = header[rmatch.end():].lstrip()
+
+ conditions = []
+
+ if not header or header[0] != "(":
+ raise ValueError(
+ "IF Header contains invalid data - expected '('")
+ header = header[1:].lstrip()
+
+ while header:
+ listitem = condition.match(header)
+ if not listitem:
+ if header[0] != ")":
+ raise ValueError(
+ "IF Header contains invalid data - expected ')'")
+ header = header[1:].lstrip()
+ break
+
+ header = header[listitem.end():].lstrip()
+
+ notted = bool(listitem.group("notted"))
+ state_token = listitem.group("state_token")
+ entity_tag = listitem.group("entity_tag")
+ if entity_tag:
+ if entity_tag[2:] == "W/":
+ entity_tag = entity[2:]
+ entity_tag = entity_tag[1:-1]
+
+ conditions.append(
+ ListCondition(notted, state_token, entity_tag))
+
+ if not conditions:
+ break
+
+ yield resource, conditions
+
+ def get_resource(self, context, request, resource):
+ environ = dict(request.environment)
+ environ["PATH_INFO"] = urlparse.urlparse(resource)[2]
+
+ req = request.__class__(StringIO(""), environ)
+ req.setPublication(request.publication)
+
+ if zope.app.http.interfaces.INullResource.providedBy(context):
+ context = context.container
+
+ root = zope.traversing.api.getRoot(context)
+ return req.traverse(root)
+
+ def valid(self, context, request, view):
+ stateresults = {}
+ try:
+ for resource, conditions in self.get_next_list(request):
+ if resource:
+ try:
+ context = self.get_resource(context, request, resource)
+ except zope.publisher.interfaces.NotFound, error:
+ # resource is still set so can't match the conditions
+ # against the current context.
+ context = None
+
+ if context is not None:
+ path = zope.traversing.api.getPath(context)
+ else:
+ path = urlparse.urlparse(resource)[2]
+
+ etag = zope.component.queryMultiAdapter(
+ (context, request, view),
+ z3c.conditionalviews.interfaces.IETag,
+ default = None)
+ etag = etag and etag.etag
+
+ states = zope.component.queryMultiAdapter(
+ (context, request, view), IStateTokens, default = [])
+ states = states and states.tokens
+
+ listresult = True
+ for condition in conditions:
+ # Each List production describes a series of conditions.
+ # The whole list evaluates to true if and only if each
+ # condition evaluates to true.
+ if condition.entity_tag:
+ # XXX - should we store these entity tags for
+ # validation later on, like we do with the state
+ # tokens.
+ result = etag and etag == condition.entity_tag or False
+ elif condition.state_token:
+ result = condition.state_token in states or False
+ stateresults.setdefault(path, {})
+ stateresults[path][condition.state_token] = result
+ else:
+ # This should not happen :-)
+ raise TypeError(
+ "Either the entity_tag or the state_token needs "
+ "to be set on a condition")
+ if condition.notted:
+ result = not result
+
+ listresult &= result
+
+ if listresult:
+ # Each No-tag-list and Tagged-list production may contain
+ # one or more Lists. They evaluate to true if and only if
+ # any of the contained lists evaluates to true. That is if
+ # listresult is True then the tag-lists are True.
+ break
+ else:
+ return False
+ except ValueError:
+ # Error in parsing the IF header, so as with all conditional
+ # requests we default to True - that is a valid request.
+ pass
+
+ # We may have states and entity tags that failed, but we don't want
+ # to reparse the if header to figure this out.
+ reqannot = zope.annotation.interfaces.IAnnotations(request)
+ reqannot[STATE_ANNOTS] = stateresults
+
+ return True
+
+ def invalidStatus(self, context, request, view):
+ return 412
+
+ def updateResponse(self, context, request, view):
+ pass # do nothing
+
+
+def getStateResults(request):
+ reqannot = zope.annotation.interfaces.IAnnotations(request)
+ return reqannot.get(STATE_ANNOTS, {})
+
+
+def matchesIfHeader(context, request):
+ activelock = zope.component.queryMultiAdapter(
+ (context, request), z3c.dav.coreproperties.IActiveLock, default = None)
+ if activelock is not None:
+ reqannot = zope.annotation.interfaces.IAnnotations(request)
+ return reqannot.get(STATE_ANNOTS, {}).get(
+ zope.traversing.api.getPath(context), {}).get(
+ activelock.locktoken[0], False)
+ return True
+
+
+class StateTokens(object):
+ """
+ Default state tokens implementation.
+
+ >>> from zope.interface.verify import verifyObject
+
+ Simple resource content type.
+
+ >>> class IResource(zope.interface.Interface):
+ ... 'Simple resource.'
+ >>> class Resource(object):
+ ... zope.interface.implements(IResource)
+ ... def __init__(self, name):
+ ... self.__name__ = name
+ >>> resource = Resource('testresource')
+
+ No activelock so we have no state tokens.
+
+ >>> states = StateTokens(resource, None, None)
+ >>> verifyObject(IStateTokens, states)
+ True
+ >>> states.tokens
+ []
+
+ We will register a simple active lock adapter indicating that the resource
+ is locked.
+
+ >>> class Activelock(object):
+ ... zope.interface.implements(z3c.dav.coreproperties.IActiveLock)
+ ... zope.component.adapts(IResource,
+ ... zope.interface.Interface)
+ ... def __init__(self, context, request):
+ ... self.context = context
+ ... locktoken = ['testlocktoken']
+
+ >>> zope.component.getGlobalSiteManager().registerAdapter(Activelock)
+
+ >>> states.tokens
+ ['testlocktoken']
+
+ Cleanup.
+
+ >>> zope.component.getGlobalSiteManager().unregisterAdapter(Activelock)
+ True
+
+ """
+ zope.interface.implements(IStateTokens)
+
+ def __init__(self, context, request, view):
+ self.context = context
+ self.request = request
+
+ @property
+ def tokens(self):
+ activelock = zope.component.queryMultiAdapter(
+ (self.context, self.request), z3c.dav.coreproperties.IActiveLock)
+ if activelock is not None:
+ locktokens = activelock.locktoken
+ if locktokens:
+ return locktokens
+ return []
Modified: z3c.dav/trunk/src/z3c/dav/tests/test_doctests.py
===================================================================
--- z3c.dav/trunk/src/z3c/dav/tests/test_doctests.py 2007-05-31 19:16:33 UTC (rev 76054)
+++ z3c.dav/trunk/src/z3c/dav/tests/test_doctests.py 2007-05-31 19:18:43 UTC (rev 76055)
@@ -102,4 +102,5 @@
checker = z3c.etree.testing.xmlOutputChecker,
setUp = z3c.etree.testing.etreeSetup,
tearDown = z3c.etree.testing.etreeTearDown),
+ doctest.DocTestSuite("z3c.dav.ifvalidator"),
))
More information about the Checkins
mailing list