[Checkins] SVN: zope.webdav/trunk/src/zope/webdav/ Rewrite the
locking unit tests as a doctest as they were overly complicated.
Michael Kerrin
michael.kerrin at openapp.biz
Wed Apr 4 15:56:56 EDT 2007
Log message for revision 74014:
Rewrite the locking unit tests as a doctest as they were overly complicated.
Plus I made the locking.py module responsible for checking that a LOCK request
contains valid lock-token information.
Changed:
U zope.webdav/trunk/src/zope/webdav/configure.zcml
U zope.webdav/trunk/src/zope/webdav/interfaces.py
U zope.webdav/trunk/src/zope/webdav/locking.py
A zope.webdav/trunk/src/zope/webdav/locking.txt
U zope.webdav/trunk/src/zope/webdav/lockingutils.py
U zope.webdav/trunk/src/zope/webdav/tests/test_copymove.py
U zope.webdav/trunk/src/zope/webdav/tests/test_doctests.py
D zope.webdav/trunk/src/zope/webdav/tests/test_locking.py
-=-
Modified: zope.webdav/trunk/src/zope/webdav/configure.zcml
===================================================================
--- zope.webdav/trunk/src/zope/webdav/configure.zcml 2007-04-04 18:35:41 UTC (rev 74013)
+++ zope.webdav/trunk/src/zope/webdav/configure.zcml 2007-04-04 19:56:56 UTC (rev 74014)
@@ -243,6 +243,13 @@
parent="zope.webdav"
/>
+ <bookchapter
+ id="zope.webdav.locking"
+ title="Locking"
+ doc_path="locking.txt"
+ parent="zope.webdav"
+ />
+
</configure>
</configure>
Modified: zope.webdav/trunk/src/zope/webdav/interfaces.py
===================================================================
--- zope.webdav/trunk/src/zope/webdav/interfaces.py 2007-04-04 18:35:41 UTC (rev 74013)
+++ zope.webdav/trunk/src/zope/webdav/interfaces.py 2007-04-04 19:56:56 UTC (rev 74014)
@@ -559,14 +559,6 @@
Raise a AlreadyLockedError if some resource is already locked.
"""
- def getActivelock(request = None):
- """
- Return an implementation of zope.webdav.coreproperties.IActiveLock
-
- If you are testing the lock manager then you don't need to pass
- in the request - just don't access the lockroot attribute :-)
- """
-
def refreshlock(timeout):
"""
Refresh to lock token associated with this resource.
Modified: zope.webdav/trunk/src/zope/webdav/locking.py
===================================================================
--- zope.webdav/trunk/src/zope/webdav/locking.py 2007-04-04 18:35:41 UTC (rev 74013)
+++ zope.webdav/trunk/src/zope/webdav/locking.py 2007-04-04 19:56:56 UTC (rev 74014)
@@ -46,14 +46,36 @@
import zope.webdav.interfaces
import zope.webdav.properties
-from zope.webdav.coreproperties import IActiveLock
+from zope.webdav.coreproperties import IActiveLock, IDAVSupportedlock
from zope.etree.interfaces import IEtree
import zope.webdav.utils
MAXTIMEOUT = (2L ** 32) - 1
DEFAULTTIMEOUT = 12 * 60L
+def getIfHeader(request):
+ """
+ Parse the `If` HTTP header in this request and return a list of lock tokens
+ and entity tags.
+ XXX - This implementation is overly simplicitic.
+
+ >>> from zope.publisher.browser import TestRequest
+
+ >>> getIfHeader(TestRequest()) is None
+ True
+ >>> getIfHeader(TestRequest(environ = {'IF': 'xxx'})) is None
+ True
+ >>> getIfHeader(TestRequest(environ = {'IF': '<xxx>'}))
+ 'xxx'
+
+ """
+ headervalue = request.get("IF", "")
+ if headervalue and headervalue[0] == "<" and headervalue[-1] == ">":
+ return headervalue[1:-1]
+ return None
+
+
@component.adapter(interface.Interface, zope.webdav.interfaces.IWebDAVMethod)
@interface.implementer(zope.webdav.interfaces.IWebDAVMethod)
def LOCK(context, request):
@@ -81,10 +103,6 @@
self.context = context
self.request = request
- def getDepth(self):
- # default is infinity
- return self.request.getHeader("depth", "infinity")
-
def getTimeout(self):
"""
Return a datetime.timedelta object representing the duration of
@@ -99,6 +117,82 @@
Multiple TimeType entries are listed in order of performace so this
method will return the first valid TimeType converted to a
`datetime.timedelta' object or else it returns the default timeout.
+
+ >>> from zope.publisher.browser import TestRequest
+
+ No supplied value -> default value.
+
+ >>> LOCKMethod(None, TestRequest(environ = {})).getTimeout()
+ datetime.timedelta(0, 720)
+
+ Infinity lock timeout is too long so revert to the default timeout.
+
+ >>> LOCKMethod(None,
+ ... TestRequest(environ = {'TIMEOUT': 'infinity'})).getTimeout()
+ datetime.timedelta(0, 720)
+ >>> LOCKMethod(None,
+ ... TestRequest(environ = {'TIMEOUT': 'infinite'})).getTimeout()
+ datetime.timedelta(0, 720)
+
+ Specify a lock timeout of 500 seconds.
+
+ >>> LOCKMethod(None,
+ ... TestRequest(environ = {'TIMEOUT': 'Second-500'})).getTimeout()
+ datetime.timedelta(0, 500)
+
+ Invalid and invalid second.
+
+ >>> LOCKMethod(None,
+ ... TestRequest(environ = {'TIMEOUT': 'XXX500'})).getTimeout()
+ Traceback (most recent call last):
+ ...
+ BadRequest: <zope.publisher.browser.TestRequest instance URL=http://127.0.0.1>, u'Invalid TIMEOUT header'
+
+ >>> LOCKMethod(None,
+ ... TestRequest(environ = {'TIMEOUT': 'XXX-500'})).getTimeout()
+ Traceback (most recent call last):
+ ...
+ BadRequest: <zope.publisher.browser.TestRequest instance URL=http://127.0.0.1>, u'Invalid TIMEOUT header'
+
+ >>> LOCKMethod(None,
+ ... TestRequest(environ = {'TIMEOUT': 'Second-500x'})).getTimeout()
+ Traceback (most recent call last):
+ ...
+ BadRequest: <zope.publisher.browser.TestRequest instance URL=http://127.0.0.1>, u'Invalid TIMEOUT header'
+
+ Maximum timeout value.
+
+ >>> timeout = 'Second-%d' %(MAXTIMEOUT + 100)
+ >>> LOCKMethod(None,
+ ... TestRequest(environ = {'TIMEOUT': timeout})).getTimeout()
+ datetime.timedelta(0, 720)
+
+ >>> LOCKMethod(None,
+ ... TestRequest(environ = {'TIMEOUT': 'Second-3600'})).getTimeout()
+ datetime.timedelta(0, 3600)
+
+ Specify multiple timeout values. The first applicable time type value
+ is choosen.
+
+ >>> LOCKMethod(None, TestRequest(
+ ... environ = {'TIMEOUT': 'Infinity, Second-3600'})).getTimeout()
+ datetime.timedelta(0, 3600)
+
+ >>> timeout = 'Infinity, Second-%d' %(MAXTIMEOUT + 10)
+ >>> LOCKMethod(None,
+ ... TestRequest(environ = {'TIMEOUT': timeout})).getTimeout()
+ datetime.timedelta(0, 720)
+
+ >>> timeout = 'Second-1200, Second-450, Second-500'
+ >>> LOCKMethod(None,
+ ... TestRequest(environ = {'TIMEOUT': timeout})).getTimeout()
+ datetime.timedelta(0, 1200)
+
+ >>> timeout = 'Second-%d, Second-450' %(MAXTIMEOUT + 10)
+ >>> LOCKMethod(None,
+ ... TestRequest(environ = {'TIMEOUT': timeout})).getTimeout()
+ datetime.timedelta(0, 450)
+
"""
timeout = None
header = self.request.getHeader("timeout", "infinity")
@@ -133,6 +227,33 @@
return datetime.timedelta(seconds = timeout)
+ def getDepth(self):
+ """Default is infinity.
+
+ >>> from zope.publisher.browser import TestRequest
+
+ >>> LOCKMethod(None, TestRequest()).getDepth()
+ 'infinity'
+ >>> LOCKMethod(None, TestRequest(environ = {'DEPTH': '0'})).getDepth()
+ '0'
+ >>> LOCKMethod(None, TestRequest(
+ ... environ = {'DEPTH': 'infinity'})).getDepth()
+ 'infinity'
+ >>> LOCKMethod(None, TestRequest(
+ ... environ = {'DEPTH': '1'})).getDepth()
+ Traceback (most recent call last):
+ ...
+ BadRequest: <zope.publisher.browser.TestRequest instance URL=http://127.0.0.1>, u"Invalid depth header. Must be either '0' or 'infinity'"
+
+ """
+ depth = self.request.getHeader("depth", "infinity")
+ if depth not in ("0", "infinity"):
+ raise zope.webdav.interfaces.BadRequest(
+ self.request,
+ u"Invalid depth header. Must be either '0' or 'infinity'")
+
+ return depth
+
def LOCK(self):
# The Lock-Token header is not returned in the response for a
# successful refresh LOCK request.
@@ -174,7 +295,8 @@
raise zope.webdav.interfaces.PreconditionFailed(
self.context, message = u"Context is not locked.")
- locktoken = lockmanager.getActivelock().locktoken[0]
+ locktoken = component.getMultiAdapter((self.context, self.request),
+ IActiveLock).locktoken[0]
request_uri = self.request.getHeader("IF", "")
if not request_uri or \
request_uri[0] != "<" or request_uri[-1] != ">" or \
@@ -194,16 +316,12 @@
self.context,
message = u"LOCK request body must be a `lockinfo' XML element")
+ timeout = self.getTimeout()
+
depth = self.getDepth()
- if depth not in ("0", "infinity"):
- raise zope.webdav.interfaces.BadRequest(
- self.request,
- u"Invalid depth header. Must be either '0' or 'infinity'")
etree = component.getUtility(IEtree)
- timeout = self.getTimeout()
-
lockscope = xmlsource.find("{DAV:}lockscope")
if not lockscope:
raise zope.webdav.interfaces.UnprocessableError(
@@ -218,12 +336,25 @@
message = u"No `{DAV:}locktype' XML element in request")
locktype_str = zope.webdav.utils.parseEtreeTag(locktype[0].tag)[1]
+ supportedlocks = component.getMultiAdapter(
+ (self.context, self.request), IDAVSupportedlock)
+ for entry in supportedlocks.supportedlock:
+ if entry.locktype[0] == locktype_str and \
+ entry.lockscope[0] == lockscope_str:
+ break
+ else:
+ raise zope.webdav.interfaces.UnprocessableError(
+ self.context,
+ message = u"Unknown lock-token requested.")
+
owner = xmlsource.find("{DAV:}owner")
- owner_str = etree.tostring(owner)
+ if owner is not None: # The owner element is optional.
+ owner_str = etree.tostring(owner)
+ else:
+ owner_str = None
lockmanager = zope.webdav.interfaces.IDAVLockmanager(self.context)
- roottoken = None
try:
lockmanager.lock(scope = lockscope_str,
type = locktype_str,
@@ -278,11 +409,12 @@
self.request, message = u"No lock-token header supplied")
lockmanager = zope.webdav.interfaces.IDAVLockmanager(self.context)
- if not lockmanager.islocked() or \
- lockmanager.getActivelock(self.request).locktoken[0] != locktoken:
+ activelock = component.getMultiAdapter((self.context, self.request),
+ IActiveLock)
+ if not lockmanager.islocked() or activelock.locktoken[0] != locktoken:
raise zope.webdav.interfaces.ConflictError(
self.context, message = "object is locked or the lock isn't" \
- "in the scope the passed.")
+ " in the scope the passed.")
lockmanager.unlock()
Added: zope.webdav/trunk/src/zope/webdav/locking.txt
===================================================================
--- zope.webdav/trunk/src/zope/webdav/locking.txt 2007-04-04 18:35:41 UTC (rev 74013)
+++ zope.webdav/trunk/src/zope/webdav/locking.txt 2007-04-04 19:56:56 UTC (rev 74014)
@@ -0,0 +1,700 @@
+==============
+WebDAV locking
+==============
+
+zope.webdav.locking doesn't fully implement the WebDAV LOCK and UNLOCK methods.
+Instead it tries to hide most of the protocols details from developers and
+provide a way for developers to integrate the WebDAV LOCK and UNLOCK methods
+with different locking mechanisms. This is mainly done by an implementation
+of the zope.webdav.interfaces.IDAVLockmanager adapter. The
+zope.webdav.lockingutils module uses the zope.locking package to integrate
+the zope.locking with the LOCK and UNLOCK methods.
+
+This test will define a very simple locking mechanism that will show what
+needs to be done in order to integrate this mechanism with the LOCK and
+UNLOCK methods. At the end of the tests we will have successfully locked
+and then unlocked an instance of the Resource content type defined below.
+
+Setup
+=====
+
+ >>> import UserDict
+ >>> from cStringIO import StringIO
+ >>> import zope.interface
+ >>> from zope.interface.verify import verifyObject
+ >>> import zope.component
+ >>> import zope.app.container.interfaces
+ >>> import zope.webdav.interfaces
+
+Define a test request object and a content type against which to run the
+tests.
+
+ >>> class TestWebDAVRequest(zope.webdav.publisher.WebDAVRequest):
+ ... def __init__(self, lockinfo = {}, body = "", environ = {}):
+ ... env = environ.copy()
+ ... if body:
+ ... env.setdefault("CONTENT_TYPE", "text/xml")
+ ... env.setdefault("CONTENT_LENGTH", len(body))
+ ... super(TestWebDAVRequest, self).__init__(StringIO(body), env)
+ ... self.processInputs()
+
+ >>> class IResource(zope.interface.Interface):
+ ... """ """
+
+ >>> class Resource(object):
+ ... zope.interface.implements(IResource)
+ ... _lockinfo = None
+
+ >>> gsm = zope.component.getGlobalSiteManager()
+
+LOCK Method
+===========
+
+ >>> from zope.webdav.locking import LOCK
+
+The LOCK method is only defined when the current resource is adaptable to
+`zope.webdav.interfaces.IDAVLockManager`.
+
+ >>> LOCK(Resource(), TestWebDAVRequest()) is None
+ True
+
+Implement the zope.webdav.interfaces.IDAVLockmanager adapter. When the
+lock, refreshlock, and unlock methods are called by the LOCK / UNLOCK methods
+below then these methods will print out a simple unique message that we
+can test for.
+
+ >>> class DAVLockmanager(object):
+ ... zope.interface.implements(zope.webdav.interfaces.IDAVLockmanager)
+ ... zope.component.adapts(IResource)
+ ... _islockable = True
+ ... def __init__(self, context):
+ ... self.context = context
+ ... def islockable(self):
+ ... return self._islockable
+ ... def islocked(self):
+ ... return self.context._lockinfo is not None
+ ... def refreshlock(self, timeout):
+ ... self.context._lockinfo['duration'] = timeout
+ ... print "Refreshed lock token."
+ ... def lock(self, scope, type, owner, duration, depth):
+ ... if self.context._lockinfo is not None:
+ ... raise zope.webdav.interfaces.AlreadyLocked(self.context)
+ ... self.context._lockinfo = {'scope': scope,
+ ... 'type': type,
+ ... 'owner': owner,
+ ... 'duration': duration,
+ ... 'depth': depth}
+ ... print "Locked the resource."
+ ... def unlock(self):
+ ... self.context._lockinfo = None
+ ... print "Unlocked the resource."
+
+ >>> verifyObject(zope.webdav.interfaces.IDAVLockmanager,
+ ... DAVLockmanager(Resource()))
+ True
+ >>> gsm.registerAdapter(DAVLockmanager)
+
+ >>> LOCK(Resource(), TestWebDAVRequest()) #doctest:+ELLIPSIS
+ <zope.webdav.locking.LOCKMethod object at 0x...>
+
+In some locking implementations the lockmanager can say that a resource can
+not be locked - ever. This is when the islockable method returns False. In
+this case we don't create a LOCK view.
+
+ >>> DAVLockmanager._islockable = False
+ >>> DAVLockmanager(Resource()).islockable()
+ False
+
+ >>> LOCK(Resource(), TestWebDAVRequest()) is None
+ True
+ >>> DAVLockmanager._islockable = True
+
+handleLock
+----------
+
+The `handleLock` method is responsible for locking a resource. It parses the
+body of the request and calls the `IDAVLockmanager` lock method (assuming that
+everything is OK) with the information in the request body. Whether or not
+the `handleLock` method is called just depends on whether or not the request
+contains an xml body that the elementtree used was able to use.
+
+The request body must conform to the specification in order to the LOCK method
+to succeed in locking the resource.
+
+ >>> body = """<?xml version="1.0" encoding="utf-8" ?>
+ ... <D:notlockinfo xmlns:D="DAV:">
+ ... Not a lockinfo.
+ ... </D:notlockinfo>"""
+ >>> resource = Resource()
+ >>> LOCK(resource, TestWebDAVRequest(body = body)).handleLock()
+ Traceback (most recent call last):
+ ...
+ UnprocessableError: LOCK request body must be a `lockinfo' XML element
+
+ >>> body = """<?xml version="1.0" encoding="utf-8" ?>
+ ... <D:lockinfo xmlns:D="DAV:">
+ ... Not a lockinfo.
+ ... </D:lockinfo>"""
+ >>> LOCK(resource, TestWebDAVRequest(body = body,
+ ... environ = {'DEPTH': '1'})).handleLock()
+ Traceback (most recent call last):
+ ...
+ BadRequest: <__builtin__.TestWebDAVRequest instance URL=http:/>, u"Invalid depth header. Must be either '0' or 'infinity'"
+
+ >>> LOCK(resource, TestWebDAVRequest(body = body)).handleLock()
+ Traceback (most recent call last):
+ ...
+ UnprocessableError: No `{DAV:}lockscope' XML element in request
+
+ >>> body = """<?xml version="1.0" encoding="utf-8" ?>
+ ... <D:lockinfo xmlns:D="DAV:">
+ ... <D:lockscope><D:exculsive/></D:lockscope>
+ ... </D:lockinfo>"""
+ >>> LOCK(resource, TestWebDAVRequest(body = body)).handleLock()
+ Traceback (most recent call last):
+ ...
+ UnprocessableError: No `{DAV:}locktype' XML element in request
+
+Now in-order to valid the request lock we need a `ILockEntry` and
+`IDAVSupportedlock` implementation in order to integrate what the system
+supports.
+
+ >>> class Exclusivelock(object):
+ ... zope.interface.implements(zope.webdav.coreproperties.ILockEntry)
+ ... lockscope = [u"exclusive"]
+ ... locktype = [u"write"]
+
+ >>> class Supportedlock(object):
+ ... zope.interface.implements(zope.webdav.coreproperties.IDAVSupportedlock)
+ ... zope.component.adapts(IResource, zope.webdav.interfaces.IWebDAVRequest)
+ ... def __init__(self, context, request):
+ ... pass
+ ... @property
+ ... def supportedlock(self):
+ ... return [Exclusivelock()]
+
+ >>> gsm.registerAdapter(Supportedlock)
+
+If the system doesn't know about the lockscope or the locktype then we get
+an `UnprocessableError`.
+
+ >>> errors = LOCK(resource, TestWebDAVRequest(
+ ... body = """<?xml version="1.0" encoding="utf-8" ?>
+ ... <D:lockinfo xmlns:D="DAV:">
+ ... <D:lockscope><D:none-exclusive/></D:lockscope>
+ ... <D:locktype><D:write/></D:locktype>
+ ... </D:lockinfo>""")).LOCK()
+ Traceback (most recent call last):
+ ...
+ UnprocessableError: Unknown lock-token requested.
+
+Now enough of the errors that can occur, we will now lock the resource.
+
+ >>> errors = LOCK(resource, TestWebDAVRequest(
+ ... body = """<?xml version="1.0" encoding="utf-8" ?>
+ ... <D:lockinfo xmlns:D="DAV:">
+ ... <D:lockscope><D:exclusive/></D:lockscope>
+ ... <D:locktype><D:write/></D:locktype>
+ ... </D:lockinfo>""")).handleLock()
+ Locked the resource.
+ >>> errors
+ []
+
+ >>> manager = DAVLockmanager(resource)
+ >>> manager.islocked()
+ True
+
+The lock method sets the `_lockinfo` attribute on the resource object which
+just contains all the info need to set up the lock.
+
+ >>> resource._lockinfo
+ {'owner': None, 'scope': 'exclusive', 'duration': datetime.timedelta(0, 720), 'type': 'write', 'depth': 'infinity'}
+ >>> resource._lockinfo = None # unlocks the resource.
+
+ >>> errors = LOCK(resource, TestWebDAVRequest(
+ ... body = """<?xml version="1.0" encoding="utf-8" ?>
+ ... <D:lockinfo xmlns:D="DAV:">
+ ... <D:lockscope><D:exclusive/></D:lockscope>
+ ... <D:locktype><D:write/></D:locktype>
+ ... <D:owner>
+ ... <D:href>http://example.org/~ejw/contact.html</D:href>
+ ... </D:owner>
+ ... </D:lockinfo>""")).handleLock()
+ Locked the resource.
+ >>> errors
+ []
+ >>> lockinfo = resource._lockinfo
+ >>> print lockinfo['owner'] #doctest:+XMLDATA
+ <owner xmlns="DAV:">
+ <href>http://example.org/~ejw/contact.html</href>
+ </owner>
+ >>> resource._lockinfo['duration']
+ datetime.timedelta(0, 720)
+ >>> resource._lockinfo['scope']
+ 'exclusive'
+ >>> resource._lockinfo['type']
+ 'write'
+ >>> resource._lockinfo['depth']
+ 'infinity'
+
+The depth parameter defaults to 'infinity' but could also specify a zero depth.
+
+ >>> resource._lockinfo = None
+
+ >>> errors = LOCK(resource, TestWebDAVRequest(
+ ... environ = {'DEPTH': '0'},
+ ... body = """<?xml version="1.0" encoding="utf-8" ?>
+ ... <D:lockinfo xmlns:D="DAV:">
+ ... <D:lockscope><D:exclusive/></D:lockscope>
+ ... <D:locktype><D:write/></D:locktype>
+ ... <D:owner>
+ ... <D:href>http://example.org/~ejw/contact.html</D:href>
+ ... </D:owner>
+ ... </D:lockinfo>""")).handleLock()
+ Locked the resource.
+ >>> errors
+ []
+ >>> lockinfo = resource._lockinfo
+ >>> print lockinfo['owner'] #doctest:+XMLDATA
+ <owner xmlns="DAV:">
+ <href>http://example.org/~ejw/contact.html</href>
+ </owner>
+ >>> resource._lockinfo['duration']
+ datetime.timedelta(0, 720)
+ >>> resource._lockinfo['scope']
+ 'exclusive'
+ >>> resource._lockinfo['type']
+ 'write'
+ >>> resource._lockinfo['depth']
+ '0'
+
+Now if the resource is already locked then the `handleLock` returns an
+`zope.webdav.interfaces.AlreadyLocked` error.
+
+ >>> errors = LOCK(resource, TestWebDAVRequest(
+ ... body = """<?xml version="1.0" encoding="utf-8" ?>
+ ... <D:lockinfo xmlns:D="DAV:">
+ ... <D:lockscope><D:exclusive/></D:lockscope>
+ ... <D:locktype><D:write/></D:locktype>
+ ... <D:owner>
+ ... <D:href>http://example.org/~ejw/contact.html</D:href>
+ ... </D:owner>
+ ... </D:lockinfo>""")).handleLock()
+ >>> errors #doctest:+ELLIPSIS
+ [<zope.webdav.interfaces.AlreadyLocked instance at ...>]
+
+LOCK
+----
+
+The LOCK method will wrap any AlreadyLockedErrors in a
+`zope.webdav.interfaces.WebDAVErrors` containing exception. The logic behind
+this is to make it easier write a view of the error.
+
+ >>> LOCK(resource, TestWebDAVRequest(
+ ... body = """<?xml version="1.0" encoding="utf-8" ?>
+ ... <D:lockinfo xmlns:D="DAV:">
+ ... <D:lockscope><D:exclusive/></D:lockscope>
+ ... <D:locktype><D:write/></D:locktype>
+ ... <D:owner>
+ ... <D:href>http://example.org/~ejw/contact.html</D:href>
+ ... </D:owner>
+ ... </D:lockinfo>""")).LOCK()
+ Traceback (most recent call last):
+ ...
+ WebDAVErrors
+
+Now unlock the resource for the other locking tests to work.
+
+ >>> resource._lockinfo = None
+
+When the LOCK method is called with a correct body it calls the `handleLock`
+method tested previously tries to render the `{DAV:}lockdiscovery` property to
+return to the requesting client.
+
+ >>> LOCK(resource, TestWebDAVRequest(
+ ... body = """<?xml version="1.0" encoding="utf-8" ?>
+ ... <D:lockinfo xmlns:D="DAV:">
+ ... <D:lockscope><D:exclusive/></D:lockscope>
+ ... <D:locktype><D:write/></D:locktype>
+ ... <D:owner>
+ ... <D:href>http://example.org/~ejw/contact.html</D:href>
+ ... </D:owner>
+ ... </D:lockinfo>""")).LOCK()
+ Traceback (most recent call last):
+ ...
+ PropertyNotFound: {DAV:}lockdiscovery
+
+Now we will define the `{DAV:}lockdiscovery` property and re-lock the resource.
+We keep the resource locked until after we make sure that the `Activelock`
+implementation implements the `IActiveLock` interface with the `verifyObject`
+method. As this method calls all the properties on this adapter. Note that the
+previous test locked the resource since these tests aren't run within a
+transaction.
+
+ >>> manager.islocked()
+ True
+
+ >>> class Activelock(object):
+ ... zope.interface.implements(zope.webdav.coreproperties.IActiveLock)
+ ... zope.component.adapts(IResource, zope.webdav.interfaces.IWebDAVRequest)
+ ... def __init__(self, context, request):
+ ... self.context = context
+ ... self.data = context._lockinfo
+ ... @property
+ ... def lockscope(self):
+ ... return [self.data['scope']]
+ ... @property
+ ... def locktype(self):
+ ... return [self.data['type']]
+ ... @property
+ ... def depth(self):
+ ... return self.data['depth']
+ ... @property
+ ... def owner(self):
+ ... return self.data['owner'].replace('\n', '')
+ ... @property
+ ... def timeout(self):
+ ... return "Second-%d" % self.data['duration'].seconds
+ ... @property
+ ... def lockroot(self):
+ ... return "http://localhost/resource"
+ ... @property
+ ... def locktoken(self):
+ ... return ['urn:resourcelocktoken']
+
+ >>> verifyObject(zope.webdav.coreproperties.IActiveLock,
+ ... Activelock(resource, None))
+ True
+ >>> gsm.registerAdapter(Activelock)
+
+ >>> class Lockdiscovery(object):
+ ... zope.interface.implements(zope.webdav.coreproperties.IDAVLockdiscovery)
+ ... zope.component.adapts(IResource, zope.webdav.interfaces.IWebDAVRequest)
+ ... def __init__(self, context, request):
+ ... self.context, self.request = context, request
+ ... @property
+ ... def lockdiscovery(self):
+ ... return [Activelock(self.context, self.request)]
+
+We need the following setup in-order for the LOCK method to render the
+`{DAV:}lockdiscovery` widget.
+
+ >>> gsm.registerAdapter(zope.webdav.widgets.ListDAVWidget,
+ ... (zope.schema.interfaces.IList,
+ ... zope.webdav.interfaces.IWebDAVRequest))
+ >>> gsm.registerAdapter(zope.webdav.widgets.ObjectDAVWidget,
+ ... (zope.schema.interfaces.IObject,
+ ... zope.webdav.interfaces.IWebDAVRequest))
+ >>> gsm.registerAdapter(zope.webdav.widgets.TextDAVWidget,
+ ... (zope.schema.interfaces.IText,
+ ... zope.webdav.interfaces.IWebDAVRequest))
+ >>> gsm.registerAdapter(zope.webdav.properties.OpaqueWidget,
+ ... (zope.webdav.properties.DeadField,
+ ... zope.webdav.interfaces.IWebDAVRequest))
+ >>> gsm.registerAdapter(zope.webdav.widgets.TextDAVWidget,
+ ... (zope.schema.interfaces.IURI,
+ ... zope.webdav.interfaces.IWebDAVRequest))
+
+ >>> gsm.registerUtility(zope.webdav.coreproperties.lockdiscovery,
+ ... name = "{DAV:}lockdiscovery")
+ >>> gsm.registerAdapter(Lockdiscovery)
+
+Now unlock the resource since we have already tested the `Activelock`
+implementation.
+
+ >>> resource._lockinfo = None
+
+By calling the LOCK method directly on a unlocked resource we get a full
+response to a LOCK request. This is a response with a status of 200 (we get
+a 201 on unmapped URL's but this isn't supported yet), a correct content-type
+of application/xml and the body should be a rendering of the
+`{DAV:}lockdiscovery` property.
+
+ >>> request = TestWebDAVRequest(
+ ... body = """<?xml version="1.0" encoding="utf-8" ?>
+ ... <D:lockinfo xmlns:D="DAV:">
+ ... <D:lockscope><D:exclusive/></D:lockscope>
+ ... <D:locktype><D:write/></D:locktype>
+ ... <D:owner>
+ ... <D:href>http://example.org/~ejw/contact.html</D:href>
+ ... </D:owner>
+ ... </D:lockinfo>""")
+ >>> respbody = LOCK(resource, request).LOCK()
+ Locked the resource.
+ >>> respbody #doctest:+XMLDATA
+ <prop xmlns="DAV:">
+ <lockdiscovery>
+ <activelock>
+ <lockscope><exclusive /></lockscope>
+ <locktype><write /></locktype>
+ <depth>infinity</depth>
+ <owner>
+ <href>http://example.org/~ejw/contact.html</href>
+ </owner>
+ <timeout>Second-720</timeout>
+ <locktoken><href>urn:resourcelocktoken</href></locktoken>
+ <lockroot>http://localhost/resource</lockroot>
+ </activelock>
+ </lockdiscovery>
+ </prop>
+ >>> request.response.getStatus()
+ 200
+ >>> request.response.getHeader("Content-type")
+ 'application/xml'
+ >>> request.response.getHeader("Lock-token")
+ '<urn:resourcelocktoken>'
+
+handleLockRefresh
+-----------------
+
+ >>> manager.islocked()
+ True
+
+Lock token can be refreshed by submitting a request without a body. In this
+case the `handleLock` method is not called but the `handleLockRefresh` is
+called.
+
+ >>> LOCK(resource, TestWebDAVRequest()).handleLockRefresh()
+ Traceback (most recent call last):
+ ...
+ PreconditionFailed: Lock-Token doesn't match request uri
+
+ >>> LOCK(resource, TestWebDAVRequest(
+ ... environ = {'IF': '<urn:wrong-resourcelocktoken>'})).handleLockRefresh()
+ Traceback (most recent call last):
+ ...
+ PreconditionFailed: Lock-Token doesn't match request uri
+
+Now refresh the lock info but without a timeout so it doesn't change.
+
+ >>> LOCK(resource, TestWebDAVRequest(
+ ... environ = {'IF': '<urn:resourcelocktoken>'})).handleLockRefresh()
+ Refreshed lock token.
+ >>> resource._lockinfo['duration']
+ datetime.timedelta(0, 720)
+ >>> resource._lockinfo['scope']
+ 'exclusive'
+ >>> resource._lockinfo['type']
+ 'write'
+ >>> resource._lockinfo['depth']
+ 'infinity'
+
+ >>> LOCK(resource, TestWebDAVRequest(
+ ... environ = {'IF': '<urn:resourcelocktoken>',
+ ... 'TIMEOUT': 'Second-1440'})).handleLockRefresh()
+ Refreshed lock token.
+ >>> resource._lockinfo['duration']
+ datetime.timedelta(0, 1440)
+ >>> resource._lockinfo['scope']
+ 'exclusive'
+ >>> resource._lockinfo['type']
+ 'write'
+ >>> resource._lockinfo['depth']
+ 'infinity'
+
+Now when we call the LOCK method without a request body the handleLockRefresh
+method is called and assuming that everything else is OK then the lock token
+gets refreshed and the `{DAV:}lockdiscovery` property is rendered and
+returned.
+
+ >>> request = TestWebDAVRequest(environ = {'IF': '<urn:resourcelocktoken>'})
+ >>> respbody = LOCK(resource, request).LOCK()
+ Refreshed lock token.
+
+We didn't specify a timeout but in this cause the default timeout is selected
+and the lock token timeout is updated with this value.
+
+ >>> resource._lockinfo['duration']
+ datetime.timedelta(0, 720)
+ >>> resource._lockinfo['scope']
+ 'exclusive'
+ >>> resource._lockinfo['type']
+ 'write'
+ >>> resource._lockinfo['depth']
+ 'infinity'
+ >>> print respbody #doctest:+XMLDATA
+ <prop xmlns="DAV:">
+ <lockdiscovery>
+ <activelock>
+ <lockscope><exclusive /></lockscope>
+ <locktype><write /></locktype>
+ <depth>infinity</depth>
+ <owner>
+ <href>http://example.org/~ejw/contact.html</href>
+ </owner>
+ <timeout>Second-720</timeout>
+ <locktoken>
+ <href>urn:resourcelocktoken</href>
+ </locktoken>
+ <lockroot>http://localhost/resource</lockroot>
+ </activelock>
+ </lockdiscovery>
+ </prop>
+ >>> request.response.getStatus()
+ 200
+ >>> request.response.getHeader('content-type')
+ 'application/xml'
+
+Similar if we specify a timeout header.
+
+ >>> request = TestWebDAVRequest(environ = {'IF': '<urn:resourcelocktoken>',
+ ... 'TIMEOUT': 'Second-1440'})
+ >>> respbody = LOCK(resource, request).LOCK()
+ Refreshed lock token.
+ >>> resource._lockinfo['duration']
+ datetime.timedelta(0, 1440)
+ >>> resource._lockinfo['scope']
+ 'exclusive'
+ >>> resource._lockinfo['type']
+ 'write'
+ >>> resource._lockinfo['depth']
+ 'infinity'
+ >>> print respbody #doctest:+XMLDATA
+ <prop xmlns="DAV:">
+ <lockdiscovery>
+ <activelock>
+ <lockscope><exclusive /></lockscope>
+ <locktype><write /></locktype>
+ <depth>infinity</depth>
+ <owner>
+ <href>http://example.org/~ejw/contact.html</href>
+ </owner>
+ <timeout>Second-1440</timeout>
+ <locktoken>
+ <href>urn:resourcelocktoken</href>
+ </locktoken>
+ <lockroot>http://localhost/resource</lockroot>
+ </activelock>
+ </lockdiscovery>
+ </prop>
+ >>> request.response.getStatus()
+ 200
+ >>> request.response.getHeader('content-type')
+ 'application/xml'
+
+It doesn't make sense trying to refresh the lock on a unlock resource.
+
+ >>> resource._lockinfo = None
+
+ >>> LOCK(resource, request).LOCK()
+ Traceback (most recent call last):
+ ...
+ PreconditionFailed: Context is not locked.
+
+UNLOCK Method
+=============
+
+ >>> from zope.webdav.locking import UNLOCK
+
+Re-lock the resource which just got unlocked.
+
+ >>> manager.islocked()
+ False
+ >>> request = TestWebDAVRequest(
+ ... body = """<?xml version="1.0" encoding="utf-8" ?>
+ ... <D:lockinfo xmlns:D="DAV:">
+ ... <D:lockscope><D:exclusive/></D:lockscope>
+ ... <D:locktype><D:write/></D:locktype>
+ ... <D:owner>
+ ... <D:href>http://example.org/~ejw/contact.html</D:href>
+ ... </D:owner>
+ ... </D:lockinfo>""")
+ >>> respbody = LOCK(resource, request).LOCK()
+ Locked the resource.
+ >>> request.response.getStatus()
+ 200
+
+ >>> manager.islocked()
+ True
+
+A UNLOCK request needs a lock-token header.
+
+ >>> UNLOCK(resource, TestWebDAVRequest()).UNLOCK()
+ Traceback (most recent call last):
+ ...
+ BadRequest: <__builtin__.TestWebDAVRequest instance URL=http:/>, u'No lock-token header supplied'
+
+We need to be test this again later - as we get the same error when resource
+is finally locked.
+
+ >>> UNLOCK(resource, TestWebDAVRequest(
+ ... environ = {'LOCK_TOKEN': '<urn:wrong-resourcelocktoken>'})).UNLOCK()
+ Traceback (most recent call last):
+ ...
+ ConflictError: object is locked or the lock isn't in the scope the passed.
+
+Now successfully unlock the object.
+
+ >>> request = TestWebDAVRequest(
+ ... environ = {'LOCK_TOKEN': '<urn:resourcelocktoken>'})
+ >>> respbody = UNLOCK(resource, request).UNLOCK()
+ Unlocked the resource.
+ >>> respbody
+ ''
+ >>> request.response.getStatus()
+ 204
+
+ >>> manager.islocked()
+ False
+
+If we try and unlock an unlocked resource we get the following error.
+
+ >>> UNLOCK(resource, request).UNLOCK()
+ Traceback (most recent call last):
+ ...
+ ConflictError: object is locked or the lock isn't in the scope the passed.
+
+In some locking implementations the lockmanager can say that a resource can
+not be locked - and hence unlocked.
+
+ >>> DAVLockmanager._islockable = False
+ >>> DAVLockmanager(Resource()).islockable()
+ False
+
+ >>> UNLOCK(Resource(), TestWebDAVRequest()) is None
+ True
+ >>> DAVLockmanager._islockable = True
+
+Finally the UNLOCK method is only defined when their is a `IDAVLockmanager`
+implementation registered with the system.
+
+ >>> gsm.unregisterAdapter(DAVLockmanager)
+ True
+
+ >>> UNLOCK(resource, TestWebDAVRequest()) is None
+ True
+
+Cleanup
+-------
+
+ >>> gsm.unregisterAdapter(Supportedlock)
+ True
+ >>> gsm.unregisterAdapter(Activelock)
+ True
+
+ >>> gsm.unregisterAdapter(zope.webdav.widgets.ListDAVWidget,
+ ... (zope.schema.interfaces.IList,
+ ... zope.webdav.interfaces.IWebDAVRequest))
+ True
+ >>> gsm.unregisterAdapter(zope.webdav.widgets.ObjectDAVWidget,
+ ... (zope.schema.interfaces.IObject,
+ ... zope.webdav.interfaces.IWebDAVRequest))
+ True
+ >>> gsm.unregisterAdapter(zope.webdav.widgets.TextDAVWidget,
+ ... (zope.schema.interfaces.IText,
+ ... zope.webdav.interfaces.IWebDAVRequest))
+ True
+ >>> gsm.unregisterAdapter(zope.webdav.properties.OpaqueWidget,
+ ... (zope.webdav.properties.DeadField,
+ ... zope.webdav.interfaces.IWebDAVRequest))
+ True
+ >>> gsm.unregisterAdapter(zope.webdav.widgets.TextDAVWidget,
+ ... (zope.schema.interfaces.IURI,
+ ... zope.webdav.interfaces.IWebDAVRequest))
+ True
+
+ >>> gsm.unregisterUtility(zope.webdav.coreproperties.lockdiscovery,
+ ... name = "{DAV:}lockdiscovery")
+ True
+ >>> gsm.unregisterAdapter(Lockdiscovery)
+ True
Property changes on: zope.webdav/trunk/src/zope/webdav/locking.txt
___________________________________________________________________
Name: svn:eol-style
+ native
Modified: zope.webdav/trunk/src/zope/webdav/lockingutils.py
===================================================================
--- zope.webdav/trunk/src/zope/webdav/lockingutils.py 2007-04-04 18:35:41 UTC (rev 74013)
+++ zope.webdav/trunk/src/zope/webdav/lockingutils.py 2007-04-04 19:56:56 UTC (rev 74014)
@@ -495,15 +495,17 @@
@component.adapter(interface.Interface, zope.webdav.interfaces.IWebDAVRequest)
@interface.implementer(IDAVSupportedlock)
def DAVSupportedlock(context, request):
- utility = component.queryUtility(zope.locking.interfaces.ITokenUtility)
+ ## XXX - not tested.
+ utility = component.queryUtility(zope.locking.interfaces.ITokenUtility,
+ context = context, default = None)
if utility is None:
return None
- return DAVSupportedlockAdapter(context, request)
+ return DAVSupportedlockAdapter()
class DAVSupportedlockAdapter(object):
"""
- >>> slock = DAVSupportedlockAdapter(None, None)
+ >>> slock = DAVSupportedlockAdapter()
>>> exclusive, shared = slock.supportedlock
>>> exclusive.lockscope
@@ -521,9 +523,6 @@
component.adapts(interface.Interface,
zope.webdav.interfaces.IWebDAVRequest)
- def __init__(self, context, request):
- pass
-
@property
def supportedlock(self):
return [ExclusiveLockEntry(), SharedLockEntry()]
@@ -536,6 +535,8 @@
def DAVActiveLock(context, request):
"""
The activelock property only exists whenever the resource is locked.
+
+ XXX - not tested.
"""
try:
token = zope.locking.interfaces.ITokenBroker(context).get()
Modified: zope.webdav/trunk/src/zope/webdav/tests/test_copymove.py
===================================================================
--- zope.webdav/trunk/src/zope/webdav/tests/test_copymove.py 2007-04-04 18:35:41 UTC (rev 74013)
+++ zope.webdav/trunk/src/zope/webdav/tests/test_copymove.py 2007-04-04 19:56:56 UTC (rev 74014)
@@ -17,22 +17,61 @@
"""
import unittest
+import UserDict
from cStringIO import StringIO
from zope import interface
from zope import component
+from zope import schema
from zope.copypastemove.interfaces import IObjectCopier, IObjectMover
from zope.location.traversing import LocationPhysicallyLocatable
from zope.traversing.adapters import Traverser, DefaultTraversable
from zope.traversing.browser.interfaces import IAbsoluteURL
from zope.app.publication.http import MethodNotAllowed
+from zope.app.container.interfaces import IReadContainer
from zope.traversing.interfaces import IContainmentRoot
import zope.webdav.publisher
from zope.webdav.copymove import COPY, MOVE
-import test_locking
+class IResource(interface.Interface):
+ text = schema.TextLine(
+ title = u"Example Text Property")
+
+ intprop = schema.Int(
+ title = u"Example Int Property")
+
+
+class Resource(object):
+ interface.implements(IResource)
+
+ def __init__(self, text = u"", intprop = 0):
+ self.text = text
+ self.intprop = intprop
+
+
+class ICollectionResource(IReadContainer):
+
+ title = schema.TextLine(
+ title = u"Title",
+ description = u"Title of resource")
+
+
+class CollectionResource(UserDict.UserDict):
+ interface.implements(ICollectionResource)
+
+ title = None
+
+ def __setitem__(self, key, val):
+ val.__parent__ = self
+ val.__name__ = key
+
+ self.data[key] = val
+
+class RootCollectionResource(CollectionResource):
+ interface.implements(IContainmentRoot)
+
class TestRequest(zope.webdav.publisher.WebDAVRequest):
def __init__(self, environ = {}):
@@ -46,33 +85,33 @@
def baseSetUp():
gsm = component.getGlobalSiteManager()
gsm.registerAdapter(LocationPhysicallyLocatable,
- (test_locking.IResource,))
+ (IResource,))
gsm.registerAdapter(LocationPhysicallyLocatable,
- (test_locking.ICollectionResource,))
- gsm.registerAdapter(Traverser, (test_locking.IResource,))
- gsm.registerAdapter(Traverser, (test_locking.ICollectionResource,))
- gsm.registerAdapter(DefaultTraversable, (test_locking.IResource,))
+ (ICollectionResource,))
+ gsm.registerAdapter(Traverser, (IResource,))
+ gsm.registerAdapter(Traverser, (ICollectionResource,))
+ gsm.registerAdapter(DefaultTraversable, (IResource,))
gsm.registerAdapter(DefaultTraversable,
- (test_locking.ICollectionResource,))
+ (ICollectionResource,))
def baseTearDown():
gsm = component.getGlobalSiteManager()
gsm.unregisterAdapter(LocationPhysicallyLocatable,
- (test_locking.IResource,))
+ (IResource,))
gsm.unregisterAdapter(LocationPhysicallyLocatable,
- (test_locking.ICollectionResource,))
- gsm.unregisterAdapter(Traverser, (test_locking.IResource,))
- gsm.unregisterAdapter(Traverser, (test_locking.ICollectionResource,))
- gsm.unregisterAdapter(DefaultTraversable, (test_locking.IResource,))
+ (ICollectionResource,))
+ gsm.unregisterAdapter(Traverser, (IResource,))
+ gsm.unregisterAdapter(Traverser, (ICollectionResource,))
+ gsm.unregisterAdapter(DefaultTraversable, (IResource,))
gsm.unregisterAdapter(DefaultTraversable,
- (test_locking.ICollectionResource,))
+ (ICollectionResource,))
class COPYMOVEParseHeadersTestCase(unittest.TestCase):
def setUp(self):
- self.root = test_locking.RootCollectionResource()
+ self.root = RootCollectionResource()
baseSetUp()
@@ -147,7 +186,7 @@
copy.getDestinationPath)
def test_getDestinationPath_with_username(self):
- resource = self.root["resource"] = test_locking.Resource()
+ resource = self.root["resource"] = Resource()
request = TestRequest(
environ = {"DESTINATION": "http://michael@localhost/testpath"})
copy = COPY(resource, request)
@@ -157,7 +196,7 @@
self.assertEqual(parent, self.root)
def test_getDestinationPath_with_username_and_password(self):
- resource = self.root["resource"] = test_locking.Resource()
+ resource = self.root["resource"] = Resource()
request = TestRequest(
environ = {"DESTINATION": "http://michael:pw@localhost/testpath"})
copy = COPY(resource, request)
@@ -169,7 +208,7 @@
def test_getDestinationPath_with_port(self):
# this is correct since localhost:10080 is a different server to
# localhost.
- resource = self.root["resource"] = test_locking.Resource()
+ resource = self.root["resource"] = Resource()
request = TestRequest(
environ = {"DESTINATION": "http://localhost:10080/testpath"})
copy = COPY(resource, request)
@@ -177,7 +216,7 @@
copy.getDestinationNameAndParentObject)
def test_getDestinationNameAndParentObject(self):
- resource = self.root["resource"] = test_locking.Resource()
+ resource = self.root["resource"] = Resource()
request = TestRequest(
environ = {"DESTINATION": "http://localhost/testpath"})
@@ -188,8 +227,8 @@
self.assertEqual(parent, self.root)
def test_getDestinationNameAndParentObject_destob_overwrite(self):
- destresource = self.root["destresource"] = test_locking.Resource()
- resource = self.root["resource"] = test_locking.Resource()
+ destresource = self.root["destresource"] = Resource()
+ resource = self.root["resource"] = Resource()
request = TestRequest(
environ = {"DESTINATION": "http://localhost/destresource",
"OVERWRITE": "T"})
@@ -202,8 +241,8 @@
self.assertEqual(parent, self.root)
def test_getDestinationNameAndParentObject_destob_overwrite_failed(self):
- destresource = self.root["destresource"] = test_locking.Resource()
- resource = self.root["resource"] = test_locking.Resource()
+ destresource = self.root["destresource"] = Resource()
+ resource = self.root["resource"] = Resource()
request = TestRequest(
environ = {"DESTINATION": "http://localhost/destresource",
"OVERWRITE": "F"})
@@ -214,7 +253,7 @@
self.assert_("destresource" in self.root)
def test_getDestinationNameAndParentObject_noparent(self):
- resource = self.root["resource"] = test_locking.Resource()
+ resource = self.root["resource"] = Resource()
request = TestRequest(
environ = {"DESTINATION": "http://localhost/noparent/testpath"})
@@ -223,7 +262,7 @@
copy.getDestinationNameAndParentObject)
def test_getDestinationNameAndParentObject_destob_sameob(self):
- resource = self.root["resource"] = test_locking.Resource()
+ resource = self.root["resource"] = Resource()
request = TestRequest(
environ = {"DESTINATION": "http://localhost/resource",
"OVERWRITE": "T"})
@@ -233,7 +272,7 @@
copy.getDestinationNameAndParentObject)
def test_nocopier(self):
- resource = self.root["resource"] = test_locking.Resource()
+ resource = self.root["resource"] = Resource()
request = TestRequest(
environ = {"DESTINATION": "http://localhost/copy_of_resource"})
@@ -241,7 +280,7 @@
self.assertRaises(MethodNotAllowed, copy.COPY)
def test_nomovier(self):
- resource = self.root["resource"] = test_locking.Resource()
+ resource = self.root["resource"] = Resource()
request = TestRequest(
environ = {"DESTINATION": "http://localhost/copy_of_resource"})
@@ -265,9 +304,9 @@
if getattr(self.context, "__name__", None) is not None:
path += "/" + self.context.__name__
- elif test_locking.IResource.providedBy(self.context):
+ elif IResource.providedBy(self.context):
path += "/resource"
- elif test_locking.ICollectionResource.providedBy(self.context):
+ elif ICollectionResource.providedBy(self.context):
path += "/collection"
else:
raise ValueError("unknown context type")
@@ -301,16 +340,16 @@
class COPYObjectTestCase(unittest.TestCase):
def setUp(self):
- self.root = test_locking.RootCollectionResource()
+ self.root = RootCollectionResource()
baseSetUp()
Copier.iscopyable = True
Copier.canCopyableTo = True
gsm = component.getGlobalSiteManager()
- gsm.registerAdapter(Copier, (test_locking.IResource,))
+ gsm.registerAdapter(Copier, (IResource,))
gsm.registerAdapter(DummyResourceURL,
- (test_locking.IResource,
+ (IResource,
zope.webdav.interfaces.IWebDAVRequest))
def tearDown(self):
@@ -319,13 +358,13 @@
baseTearDown()
gsm = component.getGlobalSiteManager()
- gsm.unregisterAdapter(Copier, (test_locking.IResource,))
+ gsm.unregisterAdapter(Copier, (IResource,))
gsm.unregisterAdapter(DummyResourceURL,
- (test_locking.IResource,
+ (IResource,
zope.webdav.interfaces.IWebDAVRequest))
def test_copy(self):
- resource = self.root["resource"] = test_locking.Resource()
+ resource = self.root["resource"] = Resource()
request = TestRequest(
environ = {"DESTINATION": "http://localhost/copy_of_resource"})
@@ -340,8 +379,8 @@
self.assertEqual(self.root["resource"] is resource, True)
def test_copy_overwrite(self):
- resource = self.root["resource"] = test_locking.Resource()
- resource2 = self.root["resource2"] = test_locking.Resource()
+ resource = self.root["resource"] = Resource()
+ resource2 = self.root["resource2"] = Resource()
request = TestRequest(
environ = {"DESTINATION": "http://localhost/resource2",
"OVERWRITE": "T"})
@@ -355,7 +394,7 @@
self.assertEqual(self.root["resource2"] is resource, True)
def test_copy_not_copyable(self):
- resource = self.root["resource"] = test_locking.Resource()
+ resource = self.root["resource"] = Resource()
request = TestRequest(
environ = {"DESTINATION": "http://localhost/copy_of_resource"})
@@ -365,7 +404,7 @@
self.assertRaises(MethodNotAllowed, copy.COPY)
def test_copy_not_copyableto(self):
- resource = self.root["resource"] = test_locking.Resource()
+ resource = self.root["resource"] = Resource()
request = TestRequest(
environ = {"DESTINATION": "http://localhost/copy_of_resource"})
@@ -400,16 +439,16 @@
class MOVEObjectTestCase(unittest.TestCase):
def setUp(self):
- self.root = test_locking.RootCollectionResource()
+ self.root = RootCollectionResource()
baseSetUp()
Movier.isMoveable = True
Movier.isMoveableTo = True
gsm = component.getGlobalSiteManager()
- gsm.registerAdapter(Movier, (test_locking.IResource,))
+ gsm.registerAdapter(Movier, (IResource,))
gsm.registerAdapter(DummyResourceURL,
- (test_locking.IResource,
+ (IResource,
zope.webdav.interfaces.IWebDAVRequest))
def tearDown(self):
@@ -418,13 +457,13 @@
baseTearDown()
gsm = component.getGlobalSiteManager()
- gsm.unregisterAdapter(Movier, (test_locking.IResource,))
+ gsm.unregisterAdapter(Movier, (IResource,))
gsm.unregisterAdapter(DummyResourceURL,
- (test_locking.IResource,
+ (IResource,
zope.webdav.interfaces.IWebDAVRequest))
def test_move(self):
- resource = self.root["resource"] = test_locking.Resource()
+ resource = self.root["resource"] = Resource()
request = TestRequest(
environ = {"DESTINATION": "http://localhost/copy_of_resource"})
@@ -438,8 +477,8 @@
self.assertEqual(self.root["copy_of_resource"], resource)
def test_move_overwrite(self):
- resource = self.root["resource"] = test_locking.Resource()
- resource2 = self.root["resource2"] = test_locking.Resource()
+ resource = self.root["resource"] = Resource()
+ resource2 = self.root["resource2"] = Resource()
request = TestRequest(
environ = {"DESTINATION": "http://localhost/resource2",
"OVERWRITE": "T"})
@@ -452,7 +491,7 @@
self.assertEqual(self.root["resource2"] is resource, True)
def test_move_not_moveable(self):
- resource = self.root["resource"] = test_locking.Resource()
+ resource = self.root["resource"] = Resource()
request = TestRequest(
environ = {"DESTINATION": "http://localhost/copy_of_resource"})
@@ -462,7 +501,7 @@
self.assertRaises(MethodNotAllowed, move.MOVE)
def test_move_not_moveableTo(self):
- resource = self.root["resource"] = test_locking.Resource()
+ resource = self.root["resource"] = Resource()
request = TestRequest(
environ = {"DESTINATION": "http://localhost/copy_of_resource"})
Modified: zope.webdav/trunk/src/zope/webdav/tests/test_doctests.py
===================================================================
--- zope.webdav/trunk/src/zope/webdav/tests/test_doctests.py 2007-04-04 18:35:41 UTC (rev 74013)
+++ zope.webdav/trunk/src/zope/webdav/tests/test_doctests.py 2007-04-04 19:56:56 UTC (rev 74014)
@@ -204,5 +204,13 @@
setUp = lockingSetUp, tearDown = lockingTearDown),
doctest.DocTestSuite("zope.webdav.deadproperties"),
doctest.DocTestSuite("zope.webdav.adapters"),
+ doctest.DocTestSuite("zope.webdav.locking",
+ checker = zope.etree.testing.xmlOutputChecker,
+ setUp = zope.etree.testing.etreeSetup,
+ tearDown = zope.etree.testing.etreeTearDown),
+ doctest.DocFileSuite("locking.txt", package = "zope.webdav",
+ checker = zope.etree.testing.xmlOutputChecker,
+ setUp = zope.etree.testing.etreeSetup,
+ tearDown = zope.etree.testing.etreeTearDown),
doctest.DocTestSuite("zope.webdav.mkcol"),
))
Deleted: zope.webdav/trunk/src/zope/webdav/tests/test_locking.py
===================================================================
--- zope.webdav/trunk/src/zope/webdav/tests/test_locking.py 2007-04-04 18:35:41 UTC (rev 74013)
+++ zope.webdav/trunk/src/zope/webdav/tests/test_locking.py 2007-04-04 19:56:56 UTC (rev 74014)
@@ -1,707 +0,0 @@
-##############################################################################
-#
-# Copyright (c) 2006 Zope Corporation and Contributors.
-# All Rights Reserved.
-#
-# This software is subject to the provisions of the Zope Public License,
-# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
-# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
-# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
-# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
-# FOR A PARTICULAR PURPOSE.
-#
-##############################################################################
-"""Test WebDAV propfind method.
-
-$Id$
-"""
-
-import unittest
-from cStringIO import StringIO
-import UserDict
-import datetime
-import random
-import time
-
-from zope import interface
-from zope import schema
-import zope.schema.interfaces
-from zope import component
-from zope.security.proxy import removeSecurityProxy
-from zope.traversing.api import getPath, traverse, getRoot
-from zope.traversing.interfaces import IContainmentRoot
-from zope.location.traversing import LocationPhysicallyLocatable
-from zope.app.container.interfaces import IReadContainer
-from zope.webdav.locking import DEFAULTTIMEOUT, MAXTIMEOUT
-from zope.webdav.locking import UNLOCKMethod, LOCKMethod
-import zope.webdav.publisher
-import zope.webdav.interfaces
-from zope.etree.testing import etreeSetup, etreeTearDown, assertXMLEqual
-
-_randGen = random.Random(time.time())
-
-class TestWebDAVRequest(zope.webdav.publisher.WebDAVRequest):
-
- def __init__(self, lockinfo = {}, body = "", environ = {}):
- env = environ.copy()
-
- if body:
- env.setdefault("CONTENT_TYPE", "text/xml")
- env.setdefault("CONTENT_LENGTH", len(body))
-
- super(TestWebDAVRequest, self).__init__(StringIO(body), env)
-
- self.processInputs()
-
-
-class LOCKINGHeaders(unittest.TestCase):
-
- def test_depth_empty(self):
- lock = LOCKMethod(None, TestWebDAVRequest())
- self.assertEqual(lock.getDepth(), "infinity")
-
- def test_depth_zero(self):
- lock = LOCKMethod(None, TestWebDAVRequest(environ = {"DEPTH": "0"}))
- self.assertEqual(lock.getDepth(), "0")
-
- def test_timeout_default(self):
- lock = LOCKMethod(None, TestWebDAVRequest(environ = {}))
- self.assertEqual(lock.getTimeout(),
- datetime.timedelta(seconds = DEFAULTTIMEOUT))
-
- def test_timeout_infinity(self):
- request = TestWebDAVRequest(environ = {"TIMEOUT": "infinity"})
- lock = LOCKMethod(None, request)
- self.assertEqual(lock.getTimeout(),
- datetime.timedelta(seconds = DEFAULTTIMEOUT))
-
- def test_timeout_infinite(self):
- request = TestWebDAVRequest(environ = {"TIMEOUT": "infinite"})
- lock = LOCKMethod(None, request)
- self.assertEqual(lock.getTimeout(),
- datetime.timedelta(seconds = DEFAULTTIMEOUT))
-
- def test_timeout_second_500(self):
- request = TestWebDAVRequest(environ = {"TIMEOUT": "Second-500"})
- lock = LOCKMethod(None, request)
- self.assertEqual(lock.getTimeout(),
- datetime.timedelta(seconds = 500))
-
- def test_invalid_second(self):
- request = TestWebDAVRequest(environ = {"TIMEOUT": "XXX-500"})
- lock = LOCKMethod(None, request)
- self.assertRaises(zope.webdav.interfaces.BadRequest, lock.getTimeout)
-
- def test_invalid_second_value(self):
- request = TestWebDAVRequest(environ = {"TIMEOUT": "Second-500x"})
- lock = LOCKMethod(None, request)
- self.assertRaises(zope.webdav.interfaces.BadRequest, lock.getTimeout)
-
- def test_big_second(self):
- request = TestWebDAVRequest(
- environ = {"TIMEOUT": "Second-%d" %(MAXTIMEOUT + 100)})
- lock = LOCKMethod(None, request)
- self.assertEqual(lock.getTimeout(),
- datetime.timedelta(seconds = DEFAULTTIMEOUT))
-
- def test_timeout_infinity_and_3600(self):
- request = TestWebDAVRequest(
- environ = {"TIMEOUT": "Infinite, Second-3600"})
- lock = LOCKMethod(None, request)
- self.assertEqual(lock.getTimeout(), datetime.timedelta(seconds = 3600))
-
- def test_timeout_infinity_and_big_int(self):
- request = TestWebDAVRequest(
- environ = {"TIMEOUT": "Infinite, Second-%d" %(MAXTIMEOUT + 100)})
- lock = LOCKMethod(None, request)
- self.assertEqual(lock.getTimeout(),
- datetime.timedelta(seconds = DEFAULTTIMEOUT))
-
-
-class DAVLockmanager(object):
- interface.implements(zope.webdav.interfaces.IDAVLockmanager)
-
- def __init__(self, context):
- self.context = context
-
- def generateLocktoken(self):
- return "opaquelocktoken:%s-%s-00105A989226:%.03f" % \
- (_randGen.random(), _randGen.random(), time.time())
-
- def lock(self, scope, type, owner, duration, depth,
- context = None):
- if context is None:
- if self.islockedObject(self.context):
- raise zope.webdav.interfaces.AlreadyLocked(self.context)
-
- ob = removeSecurityProxy(self.context)
- setattr(ob, "_is_locked", {"scope": scope,
- "type": type,
- "owner": owner,
- "duration": duration,
- "depth": depth,
- "token": self.generateLocktoken(),
- "indirectlylocked": [],
- "lockroot": getPath(self.context)})
- else:
- if self.islockedObject(context):
- raise zope.webdav.interfaces.AlreadyLocked(context)
-
- ob = removeSecurityProxy(context)
- setattr(ob, "_is_indirectly_locked",
- {"lockroot": getPath(self.context),
- "rootinfo": removeSecurityProxy(self.context)._is_locked,
- })
- root = removeSecurityProxy(self.context)
- root._is_locked["indirectlylocked"].append(getPath(ob))
-
- if context is None:
- context = self.context
-
- if depth == "infinity" and IReadContainer.providedBy(context):
- for subob in context.values():
- self.lock(scope, type, owner, duration, depth, subob)
-
- def getActivelock(self, request = None):
- return getActiveLock(self.context, request)
-
- def refreshlock(self, timeout):
- if not self.islockedObject(self.context):
- raise zope.webdav.interfaces.ConflictError(
- self.context, u"The context is not locked")
-
- ob = removeSecurityProxy(self.context)
- root = getRoot(self.context)
-
- if getattr(ob, "_is_indirectly_locked", None) is not None:
- lockroot = traverse(root, ob._is_indirectly_locked["lockroot"])
- else:
- lockroot = ob
-
- removeSecurityProxy(lockroot)._is_locked["duration"] = timeout
-
- def unlock(self):
- if not self.islockedObject(self.context):
- raise Exception("object is not locked")
-
- ob = removeSecurityProxy(self.context)
- root = getRoot(self.context)
-
- if getattr(ob, "_is_indirectly_locked", None) is not None:
- lockroot = traverse(root, ob._is_indirectly_locked["lockroot"])
- else:
- lockroot = ob
-
- lockroot = removeSecurityProxy(lockroot)
- for path in lockroot._is_locked["indirectlylocked"]:
- ob = removeSecurityProxy(traverse(root, path))
- delattr(ob, "_is_indirectly_locked")
- delattr(lockroot, "_is_locked")
-
- def islocked(self):
- return self.islockedObject(self.context)
-
- def islockedObject(self, context):
- ob = removeSecurityProxy(context)
- return getattr(ob, "_is_locked", None) is not None or \
- getattr(ob, "_is_indirectly_locked", None) is not None
-
-
- at interface.implementer(zope.webdav.coreproperties.IActiveLock)
-def getActiveLock(context, request):
- ob = removeSecurityProxy(context)
-
- data = getattr(ob, "_is_indirectly_locked", None)
- if data is not None:
- root = traverse(getRoot(context), data["lockroot"])
- root = removeSecurityProxy(root)
- return ActiveLock(root._is_locked)
-
- data = getattr(ob, "_is_locked", None)
- if data is not None:
- return ActiveLock(data)
-
- return None
-
-
-class ActiveLock(object):
- interface.implements(zope.webdav.coreproperties.IActiveLock)
-
- def __init__(self, data):
- self.data = data
-
- @property
- def lockscope(self):
- return [u"exclusive"]
-
- @property
- def locktype(self):
- return [u"write"]
-
- @property
- def depth(self):
- return self.data["depth"]
-
- @property
- def owner(self):
- return self.data["owner"]
-
- @property
- def timeout(self):
- seconds = self.data["duration"]
- if not isinstance(seconds, int):
- seconds = seconds.seconds
- return u"Second-%d" % seconds
-
- @property
- def locktoken(self):
- return [self.data["token"]]
-
- @property
- def lockroot(self):
- return "http://localhost%s" % self.data["lockroot"]
-
-
-class Lockdiscovery(object):
- interface.implements(zope.webdav.coreproperties.IDAVLockdiscovery)
-
- def __init__(self, context, request):
- self.context = context
- self.request = request
-
- @property
- def lockdiscovery(self):
- activelock = getActiveLock(self.context, self.request)
- if activelock is not None:
- return [activelock]
- return None
-
-
-class IResource(interface.Interface):
-
- text = schema.TextLine(
- title = u"Example Text Property")
-
- intprop = schema.Int(
- title = u"Example Int Property")
-
-
-class Resource(object):
- interface.implements(IResource)
-
- def __init__(self, text = u"", intprop = 0):
- self.text = text
- self.intprop = intprop
-
-
-class ICollectionResource(IReadContainer):
-
- title = schema.TextLine(
- title = u"Title",
- description = u"Title of resource")
-
-
-class CollectionResource(UserDict.UserDict):
- interface.implements(ICollectionResource)
-
- title = None
-
- def __setitem__(self, key, val):
- val.__parent__ = self
- val.__name__ = key
-
- self.data[key] = val
-
-class RootCollectionResource(CollectionResource):
- interface.implements(IContainmentRoot)
-
-
-class LOCKTestCase(unittest.TestCase):
-
- def setUp(self):
- etreeSetup()
-
- self.root = RootCollectionResource()
-
- gsm = component.getGlobalSiteManager()
- gsm.registerAdapter(DAVLockmanager, (IResource,))
- gsm.registerAdapter(DAVLockmanager, (ICollectionResource,))
- gsm.registerAdapter(LocationPhysicallyLocatable, (IResource,))
- gsm.registerAdapter(LocationPhysicallyLocatable, (ICollectionResource,))
-
- gsm.registerAdapter(Lockdiscovery,
- (IResource, zope.webdav.interfaces.IWebDAVRequest))
- gsm.registerUtility(zope.webdav.coreproperties.lockdiscovery,
- name = "{DAV:}lockdiscovery")
- gsm.registerAdapter(getActiveLock,
- (IResource, zope.webdav.interfaces.IWebDAVRequest))
-
- gsm.registerAdapter(zope.webdav.widgets.TextDAVWidget,
- (zope.schema.interfaces.IText,
- zope.webdav.interfaces.IWebDAVRequest))
- gsm.registerAdapter(zope.webdav.widgets.TextDAVWidget,
- (zope.schema.interfaces.IURI,
- zope.webdav.interfaces.IWebDAVRequest))
- gsm.registerAdapter(zope.webdav.properties.OpaqueWidget,
- (zope.webdav.properties.DeadField,
- zope.webdav.interfaces.IWebDAVRequest))
- gsm.registerAdapter(zope.webdav.widgets.ListDAVWidget,
- (zope.schema.interfaces.IList,
- zope.webdav.interfaces.IWebDAVRequest))
- gsm.registerAdapter(zope.webdav.widgets.ObjectDAVWidget,
- (zope.schema.interfaces.IObject,
- zope.webdav.interfaces.IWebDAVRequest))
-
- def tearDown(self):
- etreeTearDown()
-
- gsm = component.getGlobalSiteManager()
- gsm.unregisterAdapter(DAVLockmanager, (IResource,))
- gsm.unregisterAdapter(DAVLockmanager, (ICollectionResource,))
- gsm.unregisterAdapter(LocationPhysicallyLocatable, (IResource,))
- gsm.unregisterAdapter(
- LocationPhysicallyLocatable, (ICollectionResource,))
-
- del self.root
-
- def test_handleLock_notlockinfo(self):
- body = """<?xml version="1.0" encoding="utf-8" ?>
-<D:notlockinfo xmlns:D="DAV:">
- Not a lockinfo.
-</D:notlockinfo>"""
- request = TestWebDAVRequest(body = body)
-
- lock = LOCKMethod(None, request)
- self.assertRaises(
- zope.webdav.interfaces.UnprocessableError, lock.handleLock)
-
- def test_handleLock_invalidDepth(self):
- body = """<?xml version="1.0" encoding="utf-8" ?>
-<D:lockinfo xmlns:D="DAV:">
- Not a lockinfo.
-</D:lockinfo>"""
- request = TestWebDAVRequest(body = body, environ = {"DEPTH": "1"})
-
- lock = LOCKMethod(None, request)
- self.assertRaises(
- zope.webdav.interfaces.BadRequest, lock.handleLock)
-
- def test_handleLock_nolockscope(self):
- body = """<?xml version="1.0" encoding="utf-8" ?>
-<D:lockinfo xmlns:D="DAV:">
- Not a lockinfo.
-</D:lockinfo>"""
- request = TestWebDAVRequest(body = body)
-
- lock = LOCKMethod(None, request)
- self.assertRaises(
- zope.webdav.interfaces.UnprocessableError, lock.handleLock)
-
- def test_handleLock_nolocktype(self):
- body = """<?xml version="1.0" encoding="utf-8" ?>
-<D:lockinfo xmlns:D="DAV:">
- <D:lockscope><D:exculsive/></D:lockscope>
-</D:lockinfo>"""
- request = TestWebDAVRequest(body = body)
-
- lock = LOCKMethod(None, request)
- self.assertRaises(
- zope.webdav.interfaces.UnprocessableError, lock.handleLock)
-
- def test_handleLock(self):
- body = """<?xml version="1.0" encoding="utf-8" ?>
-<D:lockinfo xmlns:D="DAV:">
- <D:lockscope><D:exclusive/></D:lockscope>
- <D:locktype><D:write/></D:locktype>
- <D:owner>
- <D:href>http://example.org/~ejw/contact.html</D:href>
- </D:owner>
-</D:lockinfo>"""
- request = TestWebDAVRequest(body = body)
- resource = self.root["resource"] = Resource()
-
- lock = LOCKMethod(resource, request)
- errors = lock.handleLock()
-
- lockmanager = DAVLockmanager(resource)
- self.assertEqual(lockmanager.islocked(), True)
-
- def test_handleLock_alreadLocked(self):
- body = """<?xml version="1.0" encoding="utf-8" ?>
-<D:lockinfo xmlns:D="DAV:">
- <D:lockscope><D:exclusive/></D:lockscope>
- <D:locktype><D:write/></D:locktype>
- <D:owner>
- <D:href>http://example.org/~ejw/contact.html</D:href>
- </D:owner>
-</D:lockinfo>"""
- request = TestWebDAVRequest(body = body)
- resource = self.root["resource"] = Resource()
-
- lock = LOCKMethod(resource, request)
- errors = lock.handleLock()
- self.assertEqual(errors, [])
-
- lockmanager = DAVLockmanager(resource)
- self.assertEqual(lockmanager.islocked(), True)
-
- lock = LOCKMethod(resource, request)
- errors = lock.handleLock()
- self.assertEqual(len(errors), 1)
- self.assert_(zope.webdav.interfaces.IAlreadyLocked.providedBy(errors[0]))
-
- def test_handleLockRefresh_notlocked(self):
- request = TestWebDAVRequest()
- resource = self.root["resource"] = Resource()
-
- lock = LOCKMethod(resource, request)
-
- self.assertRaises(zope.webdav.interfaces.PreconditionFailed,
- lock.handleLockRefresh)
-
- def test_handleLockRefresh_noifheader(self):
- request = TestWebDAVRequest()
- resource = self.root["resource"] = Resource()
-
- lock = LOCKMethod(resource, request)
- lockmanager = DAVLockmanager(resource)
- lockmanager.lock(u"exclusive", u"write", u"Michael",
- duration = datetime.timedelta(seconds = 100),
- depth = "0")
-
- self.assertRaises(zope.webdav.interfaces.PreconditionFailed,
- lock.handleLockRefresh)
-
- def test_handleLockRefresh_ifbadheader(self):
- request = TestWebDAVRequest(environ = {"IF": "<xxx>"})
- resource = self.root["resource"] = Resource()
-
- lock = LOCKMethod(resource, request)
- lockmanager = DAVLockmanager(resource)
- lockmanager.lock(u"exclusive", u"write", u"Michael",
- duration = datetime.timedelta(seconds = 100),
- depth = "0")
-
- self.assertRaises(zope.webdav.interfaces.PreconditionFailed,
- lock.handleLockRefresh)
-
- def test_handleLockRefresh_ifbadheader2(self):
- request = TestWebDAVRequest(environ = {"IF": "xxx"})
- resource = self.root["resource"] = Resource()
-
- lock = LOCKMethod(resource, request)
- lockmanager = DAVLockmanager(resource)
- lockmanager.lock(u"exclusive", u"write", u"Michael",
- duration = datetime.timedelta(seconds = 100),
- depth = "0")
-
- self.assertRaises(zope.webdav.interfaces.PreconditionFailed,
- lock.handleLockRefresh)
-
- def test_handleLockRefresh_defaulttimeout(self):
- request = TestWebDAVRequest(environ = {"IF": "xxx"})
- resource = self.root["resource"] = Resource()
-
- lock = LOCKMethod(resource, request)
- lockmanager = DAVLockmanager(resource)
- lockmanager.lock(u"exclusive", u"write", u"Michael",
- duration = datetime.timedelta(seconds = 100),
- depth = "0")
-
- self.assertEqual(lockmanager.getActivelock().timeout,
- u"Second-100")
-
- locktoken = lockmanager.getActivelock().locktoken[0]
- request = TestWebDAVRequest(environ = {"IF": "<%s>" % locktoken})
- lock = LOCKMethod(resource, request)
-
- lock.handleLockRefresh()
-
- self.assertEqual(lockmanager.getActivelock().timeout,
- u"Second-720")
-
- def test_handleLockRefresh_timeout(self):
- request = TestWebDAVRequest(environ = {"IF": "xxx"})
- resource = self.root["resource"] = Resource()
-
- lock = LOCKMethod(resource, request)
- lockmanager = DAVLockmanager(resource)
- lockmanager.lock(u"exclusive", u"write", u"Michael",
- duration = datetime.timedelta(seconds = 100),
- depth = "0")
-
- self.assertEqual(lockmanager.getActivelock().timeout,
- u"Second-100")
-
- locktoken = lockmanager.getActivelock().locktoken[0]
- request = TestWebDAVRequest(environ = {"IF": "<%s>" % locktoken,
- "TIMEOUT": "Second-500"})
- lock = LOCKMethod(resource, request)
-
- lock.handleLockRefresh()
-
- self.assertEqual(lockmanager.getActivelock().timeout,
- u"Second-500")
-
- def test_lock_alreadLocked(self):
- body = """<?xml version="1.0" encoding="utf-8" ?>
-<D:lockinfo xmlns:D="DAV:">
- <D:lockscope><D:exclusive/></D:lockscope>
- <D:locktype><D:write/></D:locktype>
- <D:owner>
- <D:href>http://example.org/~ejw/contact.html</D:href>
- </D:owner>
-</D:lockinfo>"""
- request = TestWebDAVRequest(body = body)
- resource = self.root["resource"] = Resource()
-
- lockmanager = DAVLockmanager(resource)
- lockmanager.lock(u"exclusive", u"write", u"Michael",
- duration = datetime.timedelta(seconds = 100),
- depth = "0")
-
- lock = LOCKMethod(resource, request)
- self.assertRaises(zope.webdav.interfaces.WebDAVErrors, lock.LOCK)
-
- def test_lock(self):
- body = """<?xml version="1.0" encoding="utf-8" ?>
-<D:lockinfo xmlns:D="DAV:">
- <D:lockscope><D:exclusive/></D:lockscope>
- <D:locktype><D:write/></D:locktype>
- <D:owner>
- <D:href>http://example.org/~ejw/contact.html</D:href>
- </D:owner>
-</D:lockinfo>"""
- request = TestWebDAVRequest(body = body)
- resource = self.root["resource"] = Resource()
-
- lock = LOCKMethod(resource, request)
- result = lock.LOCK()
-
- locktoken = request.response.getHeader("lock-token")
- lockmanager = DAVLockmanager(resource)
- currentlocktoken = lockmanager.getActivelock().locktoken[0]
- self.assert_(locktoken[0], "<")
- self.assert_(locktoken[-1], ">")
- self.assertEqual(currentlocktoken, locktoken[1:-1])
-
- assertXMLEqual(result, """<ns0:prop xmlns:ns0="DAV:">
-<ns0:lockdiscovery xmlns:ns0="DAV:">
- <ns0:activelock xmlns:ns0="DAV:">
- <ns0:lockscope xmlns:ns0="DAV:"><ns0:exclusive xmlns:ns0="DAV:"/></ns0:lockscope>
- <ns0:locktype xmlns:ns0="DAV:"><ns0:write xmlns:ns0="DAV:"/></ns0:locktype>
- <ns0:depth xmlns:ns0="DAV:">infinity</ns0:depth>
- <ns0:owner xmlns:D="DAV:">
- <ns0:href>http://example.org/~ejw/contact.html</ns0:href>
- </ns0:owner>
- <ns0:timeout xmlns:ns0="DAV:">Second-720</ns0:timeout>
- <ns0:locktoken xmlns:ns0="DAV:">
- <ns0:href xmlns:ns0="DAV:">%s</ns0:href>
- </ns0:locktoken>
- <ns0:lockroot xmlns:ns0="DAV:">http://localhost/resource</ns0:lockroot>
- </ns0:activelock>
-</ns0:lockdiscovery></ns0:prop>""" % locktoken[1:-1])
-
- def test_refreshlock_alreadLocked(self):
- request = TestWebDAVRequest()
- resource = self.root["resource"] = Resource()
-
- lockmanager = DAVLockmanager(resource)
- lockmanager.lock(u"exclusive", u"write", u"Michael",
- duration = datetime.timedelta(seconds = 100),
- depth = "0")
-
- lock = LOCKMethod(resource, request)
- self.assertRaises(zope.webdav.interfaces.PreconditionFailed, lock.LOCK)
-
- def test_refreshlock(self):
- resource = self.root["resource"] = Resource()
-
- lockmanager = DAVLockmanager(resource)
- lockmanager.lock(u"exclusive", u"write",
- u"""<D:owner xmlns:D="DAV:">Michael</D:owner>""",
- duration = datetime.timedelta(seconds = 100),
- depth = "0")
- locktoken = lockmanager.getActivelock().locktoken[0]
-
- request = TestWebDAVRequest(environ = {"IF": "<%s>" % locktoken})
- lock = LOCKMethod(resource, request)
- result = lock.LOCK()
-
- assertXMLEqual(result, """<ns0:prop xmlns:ns0="DAV:">
-<ns0:lockdiscovery xmlns:ns0="DAV:">
- <ns0:activelock xmlns:ns0="DAV:">
- <ns0:lockscope xmlns:ns0="DAV:"><ns0:exclusive xmlns:ns0="DAV:"/></ns0:lockscope>
- <ns0:locktype xmlns:ns0="DAV:"><ns0:write xmlns:ns0="DAV:"/></ns0:locktype>
- <ns0:depth xmlns:ns0="DAV:">0</ns0:depth>
- <ns0:owner xmlns:D="DAV:">Michael</ns0:owner>
- <ns0:timeout xmlns:ns0="DAV:">Second-720</ns0:timeout>
- <ns0:locktoken xmlns:ns0="DAV:">
- <ns0:href xmlns:ns0="DAV:">%s</ns0:href>
- </ns0:locktoken>
- <ns0:lockroot xmlns:ns0="DAV:">http://localhost/resource</ns0:lockroot>
- </ns0:activelock>
-</ns0:lockdiscovery></ns0:prop>""" % locktoken)
-
- def test_unlock_nolocktoken(self):
- resource = self.root["resource"] = Resource()
-
- lockmanager = DAVLockmanager(resource)
- lockmanager.lock(u"exclusive", u"write",
- u"""<D:owner xmlns:D="DAV:">Michael</D:owner>""",
- duration = datetime.timedelta(seconds = 100),
- depth = "0")
-
- unlock = UNLOCKMethod(resource, TestWebDAVRequest())
- self.assertRaises(zope.webdav.interfaces.BadRequest, unlock.UNLOCK)
-
- def test_unlock_badlocktoken(self):
- resource = self.root["resource"] = Resource()
-
- lockmanager = DAVLockmanager(resource)
- lockmanager.lock(u"exclusive", u"write",
- u"""<D:owner xmlns:D="DAV:">Michael</D:owner>""",
- duration = datetime.timedelta(seconds = 100),
- depth = "0")
-
- request = TestWebDAVRequest(environ = {"LOCK_TOKEN": "XXX"})
- unlock = UNLOCKMethod(resource, request)
- self.assertRaises(zope.webdav.interfaces.ConflictError, unlock.UNLOCK)
-
- def test_unlock_badlocktoken2(self):
- resource = self.root["resource"] = Resource()
-
- lockmanager = DAVLockmanager(resource)
- lockmanager.lock(u"exclusive", u"write",
- u"""<D:owner xmlns:D="DAV:">Michael</D:owner>""",
- duration = datetime.timedelta(seconds = 100),
- depth = "0")
-
- request = TestWebDAVRequest(environ = {"LOCK_TOKEN": "<XXX>"})
- unlock = UNLOCKMethod(resource, request)
- self.assertRaises(zope.webdav.interfaces.ConflictError, unlock.UNLOCK)
-
- def test_unlock(self):
- resource = self.root["resource"] = Resource()
-
- lockmanager = DAVLockmanager(resource)
- lockmanager.lock(u"exclusive", u"write",
- u"""<D:owner xmlns:D="DAV:">Michael</D:owner>""",
- duration = datetime.timedelta(seconds = 100),
- depth = "0")
- locktoken = lockmanager.getActivelock().locktoken[0]
-
- request = TestWebDAVRequest(
- environ = {"LOCK_TOKEN": "<%s>" % locktoken})
- unlock = UNLOCKMethod(resource, request)
- result = unlock.UNLOCK()
-
- self.assertEqual(result, "")
- self.assertEqual(request.response.getStatus(), 204)
-
-
-def test_suite():
- return unittest.TestSuite((
- unittest.makeSuite(LOCKINGHeaders),
- unittest.makeSuite(LOCKTestCase),
- ))
More information about the Checkins
mailing list