[Checkins] SVN: zope.webdav/trunk/src/zope/webdav/ Rewrite the locking unit tests as a doctest as they were overly complicated.

Michael Kerrin michael.kerrin at openapp.biz
Wed Apr 4 15:56:56 EDT 2007


Log message for revision 74014:
  Rewrite the locking unit tests as a doctest as they were overly complicated.
  
  Plus I made the locking.py module responsible for checking that a LOCK request
  contains valid lock-token information.
  

Changed:
  U   zope.webdav/trunk/src/zope/webdav/configure.zcml
  U   zope.webdav/trunk/src/zope/webdav/interfaces.py
  U   zope.webdav/trunk/src/zope/webdav/locking.py
  A   zope.webdav/trunk/src/zope/webdav/locking.txt
  U   zope.webdav/trunk/src/zope/webdav/lockingutils.py
  U   zope.webdav/trunk/src/zope/webdav/tests/test_copymove.py
  U   zope.webdav/trunk/src/zope/webdav/tests/test_doctests.py
  D   zope.webdav/trunk/src/zope/webdav/tests/test_locking.py

-=-
Modified: zope.webdav/trunk/src/zope/webdav/configure.zcml
===================================================================
--- zope.webdav/trunk/src/zope/webdav/configure.zcml	2007-04-04 18:35:41 UTC (rev 74013)
+++ zope.webdav/trunk/src/zope/webdav/configure.zcml	2007-04-04 19:56:56 UTC (rev 74014)
@@ -243,6 +243,13 @@
        parent="zope.webdav"
        />
 
+    <bookchapter
+       id="zope.webdav.locking"
+       title="Locking"
+       doc_path="locking.txt"
+       parent="zope.webdav"
+       />
+
   </configure>
 
 </configure>

Modified: zope.webdav/trunk/src/zope/webdav/interfaces.py
===================================================================
--- zope.webdav/trunk/src/zope/webdav/interfaces.py	2007-04-04 18:35:41 UTC (rev 74013)
+++ zope.webdav/trunk/src/zope/webdav/interfaces.py	2007-04-04 19:56:56 UTC (rev 74014)
@@ -559,14 +559,6 @@
         Raise a AlreadyLockedError if some resource is already locked.
         """
 
-    def getActivelock(request = None):
-        """
-        Return an implementation of zope.webdav.coreproperties.IActiveLock
-
-        If you are testing the lock manager then you don't need to pass
-        in the request - just don't access the lockroot attribute :-)
-        """
-
     def refreshlock(timeout):
         """
         Refresh to lock token associated with this resource.

Modified: zope.webdav/trunk/src/zope/webdav/locking.py
===================================================================
--- zope.webdav/trunk/src/zope/webdav/locking.py	2007-04-04 18:35:41 UTC (rev 74013)
+++ zope.webdav/trunk/src/zope/webdav/locking.py	2007-04-04 19:56:56 UTC (rev 74014)
@@ -46,14 +46,36 @@
 
 import zope.webdav.interfaces
 import zope.webdav.properties
-from zope.webdav.coreproperties import IActiveLock
+from zope.webdav.coreproperties import IActiveLock, IDAVSupportedlock
 from zope.etree.interfaces import IEtree
 import zope.webdav.utils
 
 MAXTIMEOUT = (2L ** 32) - 1
 DEFAULTTIMEOUT = 12 * 60L
 
+def getIfHeader(request):
+    """
+    Parse the `If` HTTP header in this request and return a list of lock tokens
+    and entity tags.
 
+    XXX - This implementation is overly simplicitic.
+
+      >>> from zope.publisher.browser import TestRequest
+
+      >>> getIfHeader(TestRequest()) is None
+      True
+      >>> getIfHeader(TestRequest(environ = {'IF': 'xxx'})) is None
+      True
+      >>> getIfHeader(TestRequest(environ = {'IF': '<xxx>'}))
+      'xxx'
+
+    """
+    headervalue = request.get("IF", "")
+    if headervalue and headervalue[0] == "<" and headervalue[-1] == ">":
+        return headervalue[1:-1]
+    return None
+
+
 @component.adapter(interface.Interface, zope.webdav.interfaces.IWebDAVMethod)
 @interface.implementer(zope.webdav.interfaces.IWebDAVMethod)
 def LOCK(context, request):
@@ -81,10 +103,6 @@
         self.context = context
         self.request = request
 
-    def getDepth(self):
-        # default is infinity
-        return self.request.getHeader("depth", "infinity")
-
     def getTimeout(self):
         """
         Return a datetime.timedelta object representing the duration of
@@ -99,6 +117,82 @@
         Multiple TimeType entries are listed in order of performace so this
         method will return the first valid TimeType converted to a
         `datetime.timedelta' object or else it returns the default timeout.
+
+          >>> from zope.publisher.browser import TestRequest
+
+        No supplied value -> default value.
+
+          >>> LOCKMethod(None, TestRequest(environ = {})).getTimeout()
+          datetime.timedelta(0, 720)
+
+        Infinity lock timeout is too long so revert to the default timeout.
+
+          >>> LOCKMethod(None,
+          ...    TestRequest(environ = {'TIMEOUT': 'infinity'})).getTimeout()
+          datetime.timedelta(0, 720)
+          >>> LOCKMethod(None,
+          ...    TestRequest(environ = {'TIMEOUT': 'infinite'})).getTimeout()
+          datetime.timedelta(0, 720)
+
+        Specify a lock timeout of 500 seconds.
+
+          >>> LOCKMethod(None,
+          ...    TestRequest(environ = {'TIMEOUT': 'Second-500'})).getTimeout()
+          datetime.timedelta(0, 500)
+
+        Invalid and invalid second.
+
+          >>> LOCKMethod(None,
+          ...    TestRequest(environ = {'TIMEOUT': 'XXX500'})).getTimeout()
+          Traceback (most recent call last):
+          ...
+          BadRequest: <zope.publisher.browser.TestRequest instance URL=http://127.0.0.1>, u'Invalid TIMEOUT header'
+
+          >>> LOCKMethod(None,
+          ...    TestRequest(environ = {'TIMEOUT': 'XXX-500'})).getTimeout()
+          Traceback (most recent call last):
+          ...
+          BadRequest: <zope.publisher.browser.TestRequest instance URL=http://127.0.0.1>, u'Invalid TIMEOUT header'
+
+          >>> LOCKMethod(None,
+          ...    TestRequest(environ = {'TIMEOUT': 'Second-500x'})).getTimeout()
+          Traceback (most recent call last):
+          ...
+          BadRequest: <zope.publisher.browser.TestRequest instance URL=http://127.0.0.1>, u'Invalid TIMEOUT header'
+
+        Maximum timeout value.
+
+          >>> timeout = 'Second-%d' %(MAXTIMEOUT + 100)
+          >>> LOCKMethod(None,
+          ...    TestRequest(environ = {'TIMEOUT': timeout})).getTimeout()
+          datetime.timedelta(0, 720)
+
+          >>> LOCKMethod(None,
+          ...    TestRequest(environ = {'TIMEOUT': 'Second-3600'})).getTimeout()
+          datetime.timedelta(0, 3600)
+
+        Specify multiple timeout values. The first applicable time type value
+        is choosen.
+
+          >>> LOCKMethod(None, TestRequest(
+          ...    environ = {'TIMEOUT': 'Infinity, Second-3600'})).getTimeout()
+          datetime.timedelta(0, 3600)
+
+          >>> timeout = 'Infinity, Second-%d' %(MAXTIMEOUT + 10)
+          >>> LOCKMethod(None,
+          ...    TestRequest(environ = {'TIMEOUT': timeout})).getTimeout()
+          datetime.timedelta(0, 720)
+
+          >>> timeout = 'Second-1200, Second-450, Second-500'
+          >>> LOCKMethod(None,
+          ...    TestRequest(environ = {'TIMEOUT': timeout})).getTimeout()
+          datetime.timedelta(0, 1200)
+
+          >>> timeout = 'Second-%d, Second-450' %(MAXTIMEOUT + 10)
+          >>> LOCKMethod(None,
+          ...    TestRequest(environ = {'TIMEOUT': timeout})).getTimeout()
+          datetime.timedelta(0, 450)
+
         """
         timeout = None
         header = self.request.getHeader("timeout", "infinity")
@@ -133,6 +227,33 @@
 
         return datetime.timedelta(seconds = timeout)
 
+    def getDepth(self):
+        """Default is infinity.
+
+          >>> from zope.publisher.browser import TestRequest
+
+          >>> LOCKMethod(None, TestRequest()).getDepth()
+          'infinity'
+          >>> LOCKMethod(None, TestRequest(environ = {'DEPTH': '0'})).getDepth()
+          '0'
+          >>> LOCKMethod(None, TestRequest(
+          ...    environ = {'DEPTH': 'infinity'})).getDepth()
+          'infinity'
+          >>> LOCKMethod(None, TestRequest(
+          ...    environ = {'DEPTH': '1'})).getDepth()
+          Traceback (most recent call last):
+          ...
+          BadRequest: <zope.publisher.browser.TestRequest instance URL=http://127.0.0.1>, u"Invalid depth header. Must be either '0' or 'infinity'"
+
+        """
+        depth = self.request.getHeader("depth", "infinity")
+        if depth not in ("0", "infinity"):
+            raise zope.webdav.interfaces.BadRequest(
+                self.request,
+                u"Invalid depth header. Must be either '0' or 'infinity'")
+
+        return depth
+
     def LOCK(self):
         # The Lock-Token header is not returned in the response for a
         # successful refresh LOCK request.
@@ -174,7 +295,8 @@
             raise zope.webdav.interfaces.PreconditionFailed(
                 self.context, message = u"Context is not locked.")
 
-        locktoken = lockmanager.getActivelock().locktoken[0]
+        locktoken = component.getMultiAdapter((self.context, self.request),
+                                              IActiveLock).locktoken[0]
         request_uri = self.request.getHeader("IF", "")
         if not request_uri or \
                request_uri[0] != "<" or request_uri[-1] != ">" or \
@@ -194,16 +316,12 @@
                 self.context,
                 message = u"LOCK request body must be a `lockinfo' XML element")
 
+        timeout = self.getTimeout()
+
         depth = self.getDepth()
-        if depth not in ("0", "infinity"):
-            raise zope.webdav.interfaces.BadRequest(
-                self.request,
-                u"Invalid depth header. Must be either '0' or 'infinity'")
 
         etree = component.getUtility(IEtree)
 
-        timeout = self.getTimeout()
-
         lockscope = xmlsource.find("{DAV:}lockscope")
         if not lockscope:
             raise zope.webdav.interfaces.UnprocessableError(
@@ -218,12 +336,25 @@
                 message = u"No `{DAV:}locktype' XML element in request")
         locktype_str = zope.webdav.utils.parseEtreeTag(locktype[0].tag)[1]
 
+        supportedlocks = component.getMultiAdapter(
+            (self.context, self.request), IDAVSupportedlock)
+        for entry in supportedlocks.supportedlock:
+            if entry.locktype[0] == locktype_str and \
+               entry.lockscope[0] == lockscope_str:
+                break
+        else:
+            raise zope.webdav.interfaces.UnprocessableError(
+                self.context,
+                message = u"Unknown lock-token requested.")
+
         owner = xmlsource.find("{DAV:}owner")
-        owner_str = etree.tostring(owner)
+        if owner is not None: # The owner element is optional.
+            owner_str = etree.tostring(owner)
+        else:
+            owner_str = None
 
         lockmanager = zope.webdav.interfaces.IDAVLockmanager(self.context)
 
-        roottoken = None
         try:
             lockmanager.lock(scope = lockscope_str,
                              type = locktype_str,
@@ -278,11 +409,12 @@
                 self.request, message = u"No lock-token header supplied")
 
         lockmanager = zope.webdav.interfaces.IDAVLockmanager(self.context)
-        if not lockmanager.islocked() or \
-             lockmanager.getActivelock(self.request).locktoken[0] != locktoken:
+        activelock = component.getMultiAdapter((self.context, self.request),
+                                               IActiveLock)
+        if not lockmanager.islocked() or activelock.locktoken[0] != locktoken:
             raise zope.webdav.interfaces.ConflictError(
                 self.context, message = "object is locked or the lock isn't" \
-                                        "in the scope the passed.")
+                                        " in the scope the passed.")
 
         lockmanager.unlock()
 

Added: zope.webdav/trunk/src/zope/webdav/locking.txt
===================================================================
--- zope.webdav/trunk/src/zope/webdav/locking.txt	2007-04-04 18:35:41 UTC (rev 74013)
+++ zope.webdav/trunk/src/zope/webdav/locking.txt	2007-04-04 19:56:56 UTC (rev 74014)
@@ -0,0 +1,700 @@
+==============
+WebDAV locking
+==============
+
+zope.webdav.locking doesn't fully implement the WebDAV LOCK and UNLOCK methods.
+Instead it tries to hide most of the protocols details from developers and
+provide a way for developers to integrate the WebDAV LOCK and UNLOCK methods
+with different locking mechanisms. This is mainly done by an implementation
+of the zope.webdav.interfaces.IDAVLockmanager adapter. The
+zope.webdav.lockingutils module uses the zope.locking package to integrate
+the zope.locking with the LOCK and UNLOCK methods.
+
+This test will define a very simple locking mechanism that will show what
+needs to be done in order to integrate this mechanism with the LOCK and
+UNLOCK methods. At the end of the tests we will have successfully locked
+and then unlocked an instance of the Resource content type defined below.
+
+Setup
+=====
+
+  >>> import UserDict
+  >>> from cStringIO import StringIO
+  >>> import zope.interface
+  >>> from zope.interface.verify import verifyObject
+  >>> import zope.component
+  >>> import zope.app.container.interfaces
+  >>> import zope.webdav.interfaces
+
+Define a test request object and a content type against which to run the
+tests.
+
+  >>> class TestWebDAVRequest(zope.webdav.publisher.WebDAVRequest):
+  ...    def __init__(self, lockinfo = {}, body = "", environ = {}):
+  ...        env = environ.copy()
+  ...        if body:
+  ...            env.setdefault("CONTENT_TYPE", "text/xml")
+  ...        env.setdefault("CONTENT_LENGTH", len(body))
+  ...        super(TestWebDAVRequest, self).__init__(StringIO(body), env)
+  ...        self.processInputs()
+
+  >>> class IResource(zope.interface.Interface):
+  ...    """ """
+
+  >>> class Resource(object):
+  ...    zope.interface.implements(IResource)
+  ...    _lockinfo = None
+
+  >>> gsm = zope.component.getGlobalSiteManager()
+
+LOCK Method
+===========
+
+  >>> from zope.webdav.locking import LOCK
+
+The LOCK method is only defined when the current resource is adaptable to
+`zope.webdav.interfaces.IDAVLockManager`.
+
+  >>> LOCK(Resource(), TestWebDAVRequest()) is None
+  True
+
+Implement the zope.webdav.interfaces.IDAVLockmanager adapter. When the
+lock, refreshlock, and unlock methods are called by the LOCK / UNLOCK methods
+below then these methods will print out a simple unique message that we
+can test for.
+
+  >>> class DAVLockmanager(object):
+  ...    zope.interface.implements(zope.webdav.interfaces.IDAVLockmanager)
+  ...    zope.component.adapts(IResource)
+  ...    _islockable = True
+  ...    def __init__(self, context):
+  ...        self.context = context
+  ...    def islockable(self):
+  ...        return self._islockable
+  ...    def islocked(self):
+  ...        return self.context._lockinfo is not None
+  ...    def refreshlock(self, timeout):
+  ...        self.context._lockinfo['duration'] = timeout
+  ...        print "Refreshed lock token."
+  ...    def lock(self, scope, type, owner, duration, depth):
+  ...        if self.context._lockinfo is not None:
+  ...            raise zope.webdav.interfaces.AlreadyLocked(self.context)
+  ...        self.context._lockinfo = {'scope': scope,
+  ...            'type': type,
+  ...            'owner': owner,
+  ...            'duration': duration,
+  ...            'depth': depth}
+  ...        print "Locked the resource."
+  ...    def unlock(self):
+  ...        self.context._lockinfo = None
+  ...        print "Unlocked the resource."
+
+  >>> verifyObject(zope.webdav.interfaces.IDAVLockmanager,
+  ...    DAVLockmanager(Resource()))
+  True
+  >>> gsm.registerAdapter(DAVLockmanager)
+
+  >>> LOCK(Resource(), TestWebDAVRequest()) #doctest:+ELLIPSIS
+  <zope.webdav.locking.LOCKMethod object at 0x...>
+
+In some locking implementations the lockmanager can say that a resource can
+not be locked - ever. This is when the islockable method returns False. In
+this case we don't create a LOCK view.
+
+  >>> DAVLockmanager._islockable = False
+  >>> DAVLockmanager(Resource()).islockable()
+  False
+
+  >>> LOCK(Resource(), TestWebDAVRequest()) is None
+  True
+  >>> DAVLockmanager._islockable = True
+
+handleLock
+----------
+
+The `handleLock` method is responsible for locking a resource. It parses the
+body of the request and calls the `IDAVLockmanager` lock method (assuming that
+everything is OK) with the information in the request body. Whether or not
+the `handleLock` method is called just depends on whether or not the request
+contains an xml body that the elementtree used was able to use.
+
+The request body must conform to the specification in order to the LOCK method
+to succeed in locking the resource.
+
+  >>> body = """<?xml version="1.0" encoding="utf-8" ?>
+  ... <D:notlockinfo xmlns:D="DAV:">
+  ...  Not a lockinfo.
+  ... </D:notlockinfo>"""
+  >>> resource = Resource()
+  >>> LOCK(resource, TestWebDAVRequest(body = body)).handleLock()
+  Traceback (most recent call last):
+  ...
+  UnprocessableError: LOCK request body must be a `lockinfo' XML element
+
+  >>> body = """<?xml version="1.0" encoding="utf-8" ?>
+  ... <D:lockinfo xmlns:D="DAV:">
+  ...  Not a lockinfo.
+  ... </D:lockinfo>"""
+  >>> LOCK(resource, TestWebDAVRequest(body = body,
+  ...    environ = {'DEPTH': '1'})).handleLock()
+  Traceback (most recent call last):
+  ...
+  BadRequest: <__builtin__.TestWebDAVRequest instance URL=http:/>, u"Invalid depth header. Must be either '0' or 'infinity'"
+
+  >>> LOCK(resource, TestWebDAVRequest(body = body)).handleLock()
+  Traceback (most recent call last):
+  ...
+  UnprocessableError: No `{DAV:}lockscope' XML element in request
+
+  >>> body = """<?xml version="1.0" encoding="utf-8" ?>
+  ... <D:lockinfo xmlns:D="DAV:">
+  ...   <D:lockscope><D:exculsive/></D:lockscope>
+  ... </D:lockinfo>"""
+  >>> LOCK(resource, TestWebDAVRequest(body = body)).handleLock()
+  Traceback (most recent call last):
+  ...
+  UnprocessableError: No `{DAV:}locktype' XML element in request
+
+Now in-order to valid the request lock we need a `ILockEntry` and
+`IDAVSupportedlock` implementation in order to integrate what the system
+supports.
+
+  >>> class Exclusivelock(object):
+  ...    zope.interface.implements(zope.webdav.coreproperties.ILockEntry)
+  ...    lockscope = [u"exclusive"]
+  ...    locktype = [u"write"]
+
+  >>> class Supportedlock(object):
+  ...    zope.interface.implements(zope.webdav.coreproperties.IDAVSupportedlock)
+  ...    zope.component.adapts(IResource, zope.webdav.interfaces.IWebDAVRequest)
+  ...    def __init__(self, context, request):
+  ...        pass
+  ...    @property
+  ...    def supportedlock(self):
+  ...        return [Exclusivelock()]
+
+  >>> gsm.registerAdapter(Supportedlock)
+
+If the system doesn't know about the lockscope or the locktype then we get
+an `UnprocessableError`.
+
+  >>> errors = LOCK(resource, TestWebDAVRequest(
+  ...    body = """<?xml version="1.0" encoding="utf-8" ?>
+  ... <D:lockinfo xmlns:D="DAV:">
+  ...   <D:lockscope><D:none-exclusive/></D:lockscope>
+  ...   <D:locktype><D:write/></D:locktype>
+  ... </D:lockinfo>""")).LOCK()
+  Traceback (most recent call last):
+  ...
+  UnprocessableError: Unknown lock-token requested.
+
+Now enough of the errors that can occur, we will now lock the resource.
+
+  >>> errors = LOCK(resource, TestWebDAVRequest(
+  ...    body = """<?xml version="1.0" encoding="utf-8" ?>
+  ... <D:lockinfo xmlns:D="DAV:">
+  ...   <D:lockscope><D:exclusive/></D:lockscope>
+  ...   <D:locktype><D:write/></D:locktype>
+  ... </D:lockinfo>""")).handleLock()
+  Locked the resource.
+  >>> errors
+  []
+
+  >>> manager = DAVLockmanager(resource)
+  >>> manager.islocked()
+  True
+
+The lock method sets the `_lockinfo` attribute on the resource object which
+just contains all the info need to set up the lock.
+
+  >>> resource._lockinfo
+  {'owner': None, 'scope': 'exclusive', 'duration': datetime.timedelta(0, 720), 'type': 'write', 'depth': 'infinity'}
+  >>> resource._lockinfo = None # unlocks the resource.
+
+  >>> errors = LOCK(resource, TestWebDAVRequest(
+  ...    body = """<?xml version="1.0" encoding="utf-8" ?>
+  ... <D:lockinfo xmlns:D="DAV:">
+  ...   <D:lockscope><D:exclusive/></D:lockscope>
+  ...   <D:locktype><D:write/></D:locktype>
+  ...   <D:owner>
+  ...     <D:href>http://example.org/~ejw/contact.html</D:href>
+  ...   </D:owner>
+  ... </D:lockinfo>""")).handleLock()
+  Locked the resource.
+  >>> errors
+  []
+  >>> lockinfo = resource._lockinfo
+  >>> print lockinfo['owner'] #doctest:+XMLDATA
+  <owner xmlns="DAV:">
+    <href>http://example.org/~ejw/contact.html</href>
+  </owner>
+  >>> resource._lockinfo['duration']
+  datetime.timedelta(0, 720)
+  >>> resource._lockinfo['scope']
+  'exclusive'
+  >>> resource._lockinfo['type']
+  'write'
+  >>> resource._lockinfo['depth']
+  'infinity'
+
+The depth parameter defaults to 'infinity' but could also specify a zero depth.
+
+  >>> resource._lockinfo = None
+
+  >>> errors = LOCK(resource, TestWebDAVRequest(
+  ...    environ = {'DEPTH': '0'},
+  ...    body = """<?xml version="1.0" encoding="utf-8" ?>
+  ... <D:lockinfo xmlns:D="DAV:">
+  ...   <D:lockscope><D:exclusive/></D:lockscope>
+  ...   <D:locktype><D:write/></D:locktype>
+  ...   <D:owner>
+  ...     <D:href>http://example.org/~ejw/contact.html</D:href>
+  ...   </D:owner>
+  ... </D:lockinfo>""")).handleLock()
+  Locked the resource.
+  >>> errors
+  []
+  >>> lockinfo = resource._lockinfo
+  >>> print lockinfo['owner'] #doctest:+XMLDATA
+  <owner xmlns="DAV:">
+    <href>http://example.org/~ejw/contact.html</href>
+  </owner>
+  >>> resource._lockinfo['duration']
+  datetime.timedelta(0, 720)
+  >>> resource._lockinfo['scope']
+  'exclusive'
+  >>> resource._lockinfo['type']
+  'write'
+  >>> resource._lockinfo['depth']
+  '0'
+
+Now if the resource is already locked then the `handleLock` returns an
+`zope.webdav.interfaces.AlreadyLocked` error.
+
+  >>> errors = LOCK(resource, TestWebDAVRequest(
+  ...    body = """<?xml version="1.0" encoding="utf-8" ?>
+  ... <D:lockinfo xmlns:D="DAV:">
+  ...   <D:lockscope><D:exclusive/></D:lockscope>
+  ...   <D:locktype><D:write/></D:locktype>
+  ...   <D:owner>
+  ...     <D:href>http://example.org/~ejw/contact.html</D:href>
+  ...   </D:owner>
+  ... </D:lockinfo>""")).handleLock()
+  >>> errors #doctest:+ELLIPSIS
+  [<zope.webdav.interfaces.AlreadyLocked instance at ...>]
+
+LOCK
+----
+
+The LOCK method will wrap any AlreadyLockedErrors in a
+`zope.webdav.interfaces.WebDAVErrors` containing exception. The logic behind
+this is to make it easier write a view of the error.
+
+  >>> LOCK(resource, TestWebDAVRequest(
+  ...    body = """<?xml version="1.0" encoding="utf-8" ?>
+  ... <D:lockinfo xmlns:D="DAV:">
+  ...   <D:lockscope><D:exclusive/></D:lockscope>
+  ...   <D:locktype><D:write/></D:locktype>
+  ...   <D:owner>
+  ...     <D:href>http://example.org/~ejw/contact.html</D:href>
+  ...   </D:owner>
+  ... </D:lockinfo>""")).LOCK()
+  Traceback (most recent call last):
+  ...
+  WebDAVErrors
+
+Now unlock the resource for the other locking tests to work.
+
+  >>> resource._lockinfo = None
+
+When the LOCK method is called with a correct body it calls the `handleLock`
+method tested previously tries to render the `{DAV:}lockdiscovery` property to
+return to the requesting client.
+
+  >>> LOCK(resource, TestWebDAVRequest(
+  ...    body = """<?xml version="1.0" encoding="utf-8" ?>
+  ... <D:lockinfo xmlns:D="DAV:">
+  ...   <D:lockscope><D:exclusive/></D:lockscope>
+  ...   <D:locktype><D:write/></D:locktype>
+  ...   <D:owner>
+  ...     <D:href>http://example.org/~ejw/contact.html</D:href>
+  ...   </D:owner>
+  ... </D:lockinfo>""")).LOCK()
+  Traceback (most recent call last):
+  ...
+  PropertyNotFound: {DAV:}lockdiscovery
+
+Now we will define the `{DAV:}lockdiscovery` property and re-lock the resource.
+We keep the resource locked until after we make sure that the `Activelock`
+implementation implements the `IActiveLock` interface with the `verifyObject`
+method. As this method calls all the properties on this adapter. Note that the
+previous test locked the resource since these tests aren't run within a
+transaction.
+
+  >>> manager.islocked()
+  True
+
+  >>> class Activelock(object):
+  ...    zope.interface.implements(zope.webdav.coreproperties.IActiveLock)
+  ...    zope.component.adapts(IResource, zope.webdav.interfaces.IWebDAVRequest)
+  ...    def __init__(self, context, request):
+  ...        self.context = context
+  ...        self.data = context._lockinfo
+  ...    @property
+  ...    def lockscope(self):
+  ...        return [self.data['scope']]
+  ...    @property
+  ...    def locktype(self):
+  ...        return [self.data['type']]
+  ...    @property
+  ...    def depth(self):
+  ...        return self.data['depth']
+  ...    @property
+  ...    def owner(self):
+  ...        return self.data['owner'].replace('\n', '')
+  ...    @property
+  ...    def timeout(self):
+  ...        return "Second-%d" % self.data['duration'].seconds
+  ...    @property
+  ...    def lockroot(self):
+  ...        return "http://localhost/resource"
+  ...    @property
+  ...    def locktoken(self):
+  ...        return ['urn:resourcelocktoken']
+
+  >>> verifyObject(zope.webdav.coreproperties.IActiveLock,
+  ...    Activelock(resource, None))
+  True
+  >>> gsm.registerAdapter(Activelock)
+
+  >>> class Lockdiscovery(object):
+  ...    zope.interface.implements(zope.webdav.coreproperties.IDAVLockdiscovery)
+  ...    zope.component.adapts(IResource, zope.webdav.interfaces.IWebDAVRequest)
+  ...    def __init__(self, context, request):
+  ...        self.context, self.request = context, request
+  ...    @property
+  ...    def lockdiscovery(self):
+  ...        return [Activelock(self.context, self.request)]
+
+We need the following setup in-order for the LOCK method to render the
+`{DAV:}lockdiscovery` widget.
+
+  >>> gsm.registerAdapter(zope.webdav.widgets.ListDAVWidget,
+  ...                     (zope.schema.interfaces.IList,
+  ...                      zope.webdav.interfaces.IWebDAVRequest))
+  >>> gsm.registerAdapter(zope.webdav.widgets.ObjectDAVWidget,
+  ...                     (zope.schema.interfaces.IObject,
+  ...                      zope.webdav.interfaces.IWebDAVRequest))
+  >>> gsm.registerAdapter(zope.webdav.widgets.TextDAVWidget,
+  ...                     (zope.schema.interfaces.IText,
+  ...                      zope.webdav.interfaces.IWebDAVRequest))
+  >>> gsm.registerAdapter(zope.webdav.properties.OpaqueWidget,
+  ...                     (zope.webdav.properties.DeadField,
+  ...                      zope.webdav.interfaces.IWebDAVRequest))
+  >>> gsm.registerAdapter(zope.webdav.widgets.TextDAVWidget,
+  ...                     (zope.schema.interfaces.IURI,
+  ...                      zope.webdav.interfaces.IWebDAVRequest))
+
+  >>> gsm.registerUtility(zope.webdav.coreproperties.lockdiscovery,
+  ...                     name = "{DAV:}lockdiscovery")
+  >>> gsm.registerAdapter(Lockdiscovery)
+
+Now unlock the resource since we have already tested the `Activelock`
+implementation.
+
+  >>> resource._lockinfo = None
+
+By calling the LOCK method directly on a unlocked resource we get a full
+response to a LOCK request. This is a response with a status of 200 (we get
+a 201 on unmapped URL's but this isn't supported yet), a correct content-type
+of application/xml and the body should be a rendering of the
+`{DAV:}lockdiscovery` property.
+
+  >>> request = TestWebDAVRequest(
+  ...    body = """<?xml version="1.0" encoding="utf-8" ?>
+  ... <D:lockinfo xmlns:D="DAV:">
+  ...   <D:lockscope><D:exclusive/></D:lockscope>
+  ...   <D:locktype><D:write/></D:locktype>
+  ...   <D:owner>
+  ...     <D:href>http://example.org/~ejw/contact.html</D:href>
+  ...   </D:owner>
+  ... </D:lockinfo>""")
+  >>> respbody = LOCK(resource, request).LOCK()
+  Locked the resource.
+  >>> respbody #doctest:+XMLDATA
+  <prop xmlns="DAV:">
+    <lockdiscovery>
+      <activelock>
+        <lockscope><exclusive /></lockscope>
+        <locktype><write /></locktype>
+        <depth>infinity</depth>
+        <owner>
+          <href>http://example.org/~ejw/contact.html</href>
+        </owner>
+        <timeout>Second-720</timeout>
+        <locktoken><href>urn:resourcelocktoken</href></locktoken>
+        <lockroot>http://localhost/resource</lockroot>
+      </activelock>
+    </lockdiscovery>
+  </prop>
+  >>> request.response.getStatus()
+  200
+  >>> request.response.getHeader("Content-type")
+  'application/xml'
+  >>> request.response.getHeader("Lock-token")
+  '<urn:resourcelocktoken>'
+
+handleLockRefresh
+-----------------
+
+  >>> manager.islocked()
+  True
+
+Lock token can be refreshed by submitting a request without a body. In this
+case the `handleLock` method is not called but the `handleLockRefresh` is
+called.
+
+  >>> LOCK(resource, TestWebDAVRequest()).handleLockRefresh()
+  Traceback (most recent call last):
+  ...
+  PreconditionFailed: Lock-Token doesn't match request uri
+
+  >>> LOCK(resource, TestWebDAVRequest(
+  ...    environ = {'IF': '<urn:wrong-resourcelocktoken>'})).handleLockRefresh()
+  Traceback (most recent call last):
+  ...
+  PreconditionFailed: Lock-Token doesn't match request uri
+
+Now refresh the lock info but without a timeout so it doesn't change.
+
+  >>> LOCK(resource, TestWebDAVRequest(
+  ...    environ = {'IF': '<urn:resourcelocktoken>'})).handleLockRefresh()
+  Refreshed lock token.
+  >>> resource._lockinfo['duration']
+  datetime.timedelta(0, 720)
+  >>> resource._lockinfo['scope']
+  'exclusive'
+  >>> resource._lockinfo['type']
+  'write'
+  >>> resource._lockinfo['depth']
+  'infinity'
+
+  >>> LOCK(resource, TestWebDAVRequest(
+  ...    environ = {'IF': '<urn:resourcelocktoken>',
+  ...               'TIMEOUT': 'Second-1440'})).handleLockRefresh()
+  Refreshed lock token.
+  >>> resource._lockinfo['duration']
+  datetime.timedelta(0, 1440)
+  >>> resource._lockinfo['scope']
+  'exclusive'
+  >>> resource._lockinfo['type']
+  'write'
+  >>> resource._lockinfo['depth']
+  'infinity'
+
+Now when we call the LOCK method without a request body the handleLockRefresh
+method is called and assuming that everything else is OK then the lock token
+gets refreshed and the `{DAV:}lockdiscovery` property is rendered and
+returned.
+
+  >>> request = TestWebDAVRequest(environ = {'IF': '<urn:resourcelocktoken>'})
+  >>> respbody = LOCK(resource, request).LOCK()
+  Refreshed lock token.
+
+We didn't specify a timeout but in this cause the default timeout is selected
+and the lock token timeout is updated with this value.
+
+  >>> resource._lockinfo['duration']
+  datetime.timedelta(0, 720)
+  >>> resource._lockinfo['scope']
+  'exclusive'
+  >>> resource._lockinfo['type']
+  'write'
+  >>> resource._lockinfo['depth']
+  'infinity'
+  >>> print respbody #doctest:+XMLDATA
+  <prop xmlns="DAV:">
+    <lockdiscovery>
+      <activelock>
+        <lockscope><exclusive /></lockscope>
+        <locktype><write /></locktype>
+        <depth>infinity</depth>
+        <owner>
+          <href>http://example.org/~ejw/contact.html</href>
+        </owner>
+        <timeout>Second-720</timeout>
+        <locktoken>
+          <href>urn:resourcelocktoken</href>
+        </locktoken>
+        <lockroot>http://localhost/resource</lockroot>
+      </activelock>
+    </lockdiscovery>
+  </prop>
+  >>> request.response.getStatus()
+  200
+  >>> request.response.getHeader('content-type')
+  'application/xml'
+
+Similar if we specify a timeout header.
+
+  >>> request = TestWebDAVRequest(environ = {'IF': '<urn:resourcelocktoken>',
+  ...                                  'TIMEOUT': 'Second-1440'})
+  >>> respbody = LOCK(resource, request).LOCK()
+  Refreshed lock token.
+  >>> resource._lockinfo['duration']
+  datetime.timedelta(0, 1440)
+  >>> resource._lockinfo['scope']
+  'exclusive'
+  >>> resource._lockinfo['type']
+  'write'
+  >>> resource._lockinfo['depth']
+  'infinity'
+  >>> print respbody #doctest:+XMLDATA
+  <prop xmlns="DAV:">
+    <lockdiscovery>
+      <activelock>
+        <lockscope><exclusive /></lockscope>
+        <locktype><write /></locktype>
+        <depth>infinity</depth>
+        <owner>
+          <href>http://example.org/~ejw/contact.html</href>
+        </owner>
+        <timeout>Second-1440</timeout>
+        <locktoken>
+          <href>urn:resourcelocktoken</href>
+        </locktoken>
+        <lockroot>http://localhost/resource</lockroot>
+      </activelock>
+    </lockdiscovery>
+  </prop>
+  >>> request.response.getStatus()
+  200
+  >>> request.response.getHeader('content-type')
+  'application/xml'
+
+It doesn't make sense trying to refresh the lock on a unlock resource.
+
+  >>> resource._lockinfo = None
+
+  >>> LOCK(resource, request).LOCK()
+  Traceback (most recent call last):
+  ...
+  PreconditionFailed: Context is not locked.
+
+UNLOCK Method
+=============
+
+  >>> from zope.webdav.locking import UNLOCK
+
+Re-lock the resource which just got unlocked.
+
+  >>> manager.islocked()
+  False
+  >>> request = TestWebDAVRequest(
+  ...    body = """<?xml version="1.0" encoding="utf-8" ?>
+  ... <D:lockinfo xmlns:D="DAV:">
+  ...   <D:lockscope><D:exclusive/></D:lockscope>
+  ...   <D:locktype><D:write/></D:locktype>
+  ...   <D:owner>
+  ...     <D:href>http://example.org/~ejw/contact.html</D:href>
+  ...   </D:owner>
+  ... </D:lockinfo>""")
+  >>> respbody = LOCK(resource, request).LOCK()
+  Locked the resource.
+  >>> request.response.getStatus()
+  200
+
+  >>> manager.islocked()
+  True
+
+A UNLOCK request needs a lock-token header.
+
+  >>> UNLOCK(resource, TestWebDAVRequest()).UNLOCK()
+  Traceback (most recent call last):
+  ...
+  BadRequest: <__builtin__.TestWebDAVRequest instance URL=http:/>, u'No lock-token header supplied'
+
+We need to be test this again later - as we get the same error when resource
+is finally locked.
+
+  >>> UNLOCK(resource, TestWebDAVRequest(
+  ...    environ = {'LOCK_TOKEN': '<urn:wrong-resourcelocktoken>'})).UNLOCK()
+  Traceback (most recent call last):
+  ...
+  ConflictError: object is locked or the lock isn't in the scope the passed.
+
+Now successfully unlock the object.
+
+  >>> request = TestWebDAVRequest(
+  ...    environ = {'LOCK_TOKEN': '<urn:resourcelocktoken>'})
+  >>> respbody = UNLOCK(resource, request).UNLOCK()
+  Unlocked the resource.
+  >>> respbody
+  ''
+  >>> request.response.getStatus()
+  204
+
+  >>> manager.islocked()
+  False
+
+If we try and unlock an unlocked resource we get the following error.
+
+  >>> UNLOCK(resource, request).UNLOCK()
+  Traceback (most recent call last):
+  ...
+  ConflictError: object is locked or the lock isn't in the scope the passed.
+
+In some locking implementations the lockmanager can say that a resource can
+not be locked - and hence unlocked.
+
+  >>> DAVLockmanager._islockable = False
+  >>> DAVLockmanager(Resource()).islockable()
+  False
+
+  >>> UNLOCK(Resource(), TestWebDAVRequest()) is None
+  True
+  >>> DAVLockmanager._islockable = True
+
+Finally the UNLOCK method is only defined when their is a `IDAVLockmanager`
+implementation registered with the system.
+
+  >>> gsm.unregisterAdapter(DAVLockmanager)
+  True
+
+  >>> UNLOCK(resource, TestWebDAVRequest()) is None
+  True
+
+Cleanup
+-------
+
+  >>> gsm.unregisterAdapter(Supportedlock)
+  True
+  >>> gsm.unregisterAdapter(Activelock)
+  True
+
+  >>> gsm.unregisterAdapter(zope.webdav.widgets.ListDAVWidget,
+  ...                       (zope.schema.interfaces.IList,
+  ...                        zope.webdav.interfaces.IWebDAVRequest))
+  True
+  >>> gsm.unregisterAdapter(zope.webdav.widgets.ObjectDAVWidget,
+  ...                       (zope.schema.interfaces.IObject,
+  ...                        zope.webdav.interfaces.IWebDAVRequest))
+  True
+  >>> gsm.unregisterAdapter(zope.webdav.widgets.TextDAVWidget,
+  ...                       (zope.schema.interfaces.IText,
+  ...                        zope.webdav.interfaces.IWebDAVRequest))
+  True
+  >>> gsm.unregisterAdapter(zope.webdav.properties.OpaqueWidget,
+  ...                       (zope.webdav.properties.DeadField,
+  ...                        zope.webdav.interfaces.IWebDAVRequest))
+  True
+  >>> gsm.unregisterAdapter(zope.webdav.widgets.TextDAVWidget,
+  ...                       (zope.schema.interfaces.IURI,
+  ...                        zope.webdav.interfaces.IWebDAVRequest))
+  True
+
+  >>> gsm.unregisterUtility(zope.webdav.coreproperties.lockdiscovery,
+  ...                       name = "{DAV:}lockdiscovery")
+  True
+  >>> gsm.unregisterAdapter(Lockdiscovery)
+  True


Property changes on: zope.webdav/trunk/src/zope/webdav/locking.txt
___________________________________________________________________
Name: svn:eol-style
   + native

Modified: zope.webdav/trunk/src/zope/webdav/lockingutils.py
===================================================================
--- zope.webdav/trunk/src/zope/webdav/lockingutils.py	2007-04-04 18:35:41 UTC (rev 74013)
+++ zope.webdav/trunk/src/zope/webdav/lockingutils.py	2007-04-04 19:56:56 UTC (rev 74014)
@@ -495,15 +495,17 @@
 @component.adapter(interface.Interface, zope.webdav.interfaces.IWebDAVRequest)
 @interface.implementer(IDAVSupportedlock)
 def DAVSupportedlock(context, request):
-    utility = component.queryUtility(zope.locking.interfaces.ITokenUtility)
+    ## XXX - not tested.
+    utility = component.queryUtility(zope.locking.interfaces.ITokenUtility,
+                                     context = context, default = None)
     if utility is None:
         return None
-    return DAVSupportedlockAdapter(context, request)
+    return DAVSupportedlockAdapter()
 
 
 class DAVSupportedlockAdapter(object):
     """
-      >>> slock = DAVSupportedlockAdapter(None, None)
+      >>> slock = DAVSupportedlockAdapter()
       >>> exclusive, shared = slock.supportedlock
 
       >>> exclusive.lockscope
@@ -521,9 +523,6 @@
     component.adapts(interface.Interface,
                      zope.webdav.interfaces.IWebDAVRequest)
 
-    def __init__(self, context, request):
-        pass
-
     @property
     def supportedlock(self):
         return [ExclusiveLockEntry(), SharedLockEntry()]
@@ -536,6 +535,8 @@
 def DAVActiveLock(context, request):
     """
     The activelock property only exists whenever the resource is locked.
+
+    XXX - not tested.
     """
     try:
         token = zope.locking.interfaces.ITokenBroker(context).get()

Modified: zope.webdav/trunk/src/zope/webdav/tests/test_copymove.py
===================================================================
--- zope.webdav/trunk/src/zope/webdav/tests/test_copymove.py	2007-04-04 18:35:41 UTC (rev 74013)
+++ zope.webdav/trunk/src/zope/webdav/tests/test_copymove.py	2007-04-04 19:56:56 UTC (rev 74014)
@@ -17,22 +17,61 @@
 """
 
 import unittest
+import UserDict
 from cStringIO import StringIO
 
 from zope import interface
 from zope import component
+from zope import schema
 from zope.copypastemove.interfaces import IObjectCopier, IObjectMover
 from zope.location.traversing import LocationPhysicallyLocatable
 from zope.traversing.adapters import Traverser, DefaultTraversable
 from zope.traversing.browser.interfaces import IAbsoluteURL
 from zope.app.publication.http import MethodNotAllowed
+from zope.app.container.interfaces import IReadContainer
 from zope.traversing.interfaces import IContainmentRoot
 
 import zope.webdav.publisher
 from zope.webdav.copymove import COPY, MOVE
 
-import test_locking
+class IResource(interface.Interface):
 
+    text = schema.TextLine(
+        title = u"Example Text Property")
+
+    intprop = schema.Int(
+        title = u"Example Int Property")
+
+
+class Resource(object):
+    interface.implements(IResource)
+
+    def __init__(self, text = u"", intprop = 0):
+        self.text = text
+        self.intprop = intprop
+
+
+class ICollectionResource(IReadContainer):
+
+    title = schema.TextLine(
+        title = u"Title",
+        description = u"Title of resource")
+
+
+class CollectionResource(UserDict.UserDict):
+    interface.implements(ICollectionResource)
+
+    title = None
+
+    def __setitem__(self, key, val):
+        val.__parent__ = self
+        val.__name__ = key
+
+        self.data[key] = val
+
+class RootCollectionResource(CollectionResource):
+    interface.implements(IContainmentRoot)
+
 class TestRequest(zope.webdav.publisher.WebDAVRequest):
 
     def __init__(self, environ = {}):
@@ -46,33 +85,33 @@
 def baseSetUp():
     gsm = component.getGlobalSiteManager()
     gsm.registerAdapter(LocationPhysicallyLocatable,
-                        (test_locking.IResource,))
+                        (IResource,))
     gsm.registerAdapter(LocationPhysicallyLocatable,
-                        (test_locking.ICollectionResource,))
-    gsm.registerAdapter(Traverser, (test_locking.IResource,))
-    gsm.registerAdapter(Traverser, (test_locking.ICollectionResource,))
-    gsm.registerAdapter(DefaultTraversable, (test_locking.IResource,))
+                        (ICollectionResource,))
+    gsm.registerAdapter(Traverser, (IResource,))
+    gsm.registerAdapter(Traverser, (ICollectionResource,))
+    gsm.registerAdapter(DefaultTraversable, (IResource,))
     gsm.registerAdapter(DefaultTraversable,
-                        (test_locking.ICollectionResource,))
+                        (ICollectionResource,))
 
 
 def baseTearDown():
     gsm = component.getGlobalSiteManager()
     gsm.unregisterAdapter(LocationPhysicallyLocatable,
-                          (test_locking.IResource,))
+                          (IResource,))
     gsm.unregisterAdapter(LocationPhysicallyLocatable,
-                          (test_locking.ICollectionResource,))
-    gsm.unregisterAdapter(Traverser, (test_locking.IResource,))
-    gsm.unregisterAdapter(Traverser, (test_locking.ICollectionResource,))
-    gsm.unregisterAdapter(DefaultTraversable, (test_locking.IResource,))
+                          (ICollectionResource,))
+    gsm.unregisterAdapter(Traverser, (IResource,))
+    gsm.unregisterAdapter(Traverser, (ICollectionResource,))
+    gsm.unregisterAdapter(DefaultTraversable, (IResource,))
     gsm.unregisterAdapter(DefaultTraversable,
-                          (test_locking.ICollectionResource,))
+                          (ICollectionResource,))
 
 
 class COPYMOVEParseHeadersTestCase(unittest.TestCase):
 
     def setUp(self):
-        self.root = test_locking.RootCollectionResource()
+        self.root = RootCollectionResource()
 
         baseSetUp()
 
@@ -147,7 +186,7 @@
                           copy.getDestinationPath)
 
     def test_getDestinationPath_with_username(self):
-        resource = self.root["resource"] = test_locking.Resource()
+        resource = self.root["resource"] = Resource()
         request = TestRequest(
             environ = {"DESTINATION": "http://michael@localhost/testpath"})
         copy = COPY(resource, request)
@@ -157,7 +196,7 @@
         self.assertEqual(parent, self.root)
 
     def test_getDestinationPath_with_username_and_password(self):
-        resource = self.root["resource"] = test_locking.Resource()
+        resource = self.root["resource"] = Resource()
         request = TestRequest(
             environ = {"DESTINATION": "http://michael:pw@localhost/testpath"})
         copy = COPY(resource, request)
@@ -169,7 +208,7 @@
     def test_getDestinationPath_with_port(self):
         # this is correct since localhost:10080 is a different server to
         # localhost.
-        resource = self.root["resource"] = test_locking.Resource()
+        resource = self.root["resource"] = Resource()
         request = TestRequest(
             environ = {"DESTINATION": "http://localhost:10080/testpath"})
         copy = COPY(resource, request)
@@ -177,7 +216,7 @@
                           copy.getDestinationNameAndParentObject)
 
     def test_getDestinationNameAndParentObject(self):
-        resource = self.root["resource"] = test_locking.Resource()
+        resource = self.root["resource"] = Resource()
         request = TestRequest(
             environ = {"DESTINATION": "http://localhost/testpath"})
 
@@ -188,8 +227,8 @@
         self.assertEqual(parent, self.root)
 
     def test_getDestinationNameAndParentObject_destob_overwrite(self):
-        destresource = self.root["destresource"] = test_locking.Resource()
-        resource = self.root["resource"] = test_locking.Resource()
+        destresource = self.root["destresource"] = Resource()
+        resource = self.root["resource"] = Resource()
         request = TestRequest(
             environ = {"DESTINATION": "http://localhost/destresource",
                        "OVERWRITE": "T"})
@@ -202,8 +241,8 @@
         self.assertEqual(parent, self.root)
 
     def test_getDestinationNameAndParentObject_destob_overwrite_failed(self):
-        destresource = self.root["destresource"] = test_locking.Resource()
-        resource = self.root["resource"] = test_locking.Resource()
+        destresource = self.root["destresource"] = Resource()
+        resource = self.root["resource"] = Resource()
         request = TestRequest(
             environ = {"DESTINATION": "http://localhost/destresource",
                        "OVERWRITE": "F"})
@@ -214,7 +253,7 @@
         self.assert_("destresource" in self.root)
 
     def test_getDestinationNameAndParentObject_noparent(self):
-        resource = self.root["resource"] = test_locking.Resource()
+        resource = self.root["resource"] = Resource()
         request = TestRequest(
             environ = {"DESTINATION": "http://localhost/noparent/testpath"})
 
@@ -223,7 +262,7 @@
                           copy.getDestinationNameAndParentObject)
 
     def test_getDestinationNameAndParentObject_destob_sameob(self):
-        resource = self.root["resource"] = test_locking.Resource()
+        resource = self.root["resource"] = Resource()
         request = TestRequest(
             environ = {"DESTINATION": "http://localhost/resource",
                        "OVERWRITE": "T"})
@@ -233,7 +272,7 @@
                           copy.getDestinationNameAndParentObject)
 
     def test_nocopier(self):
-        resource = self.root["resource"] = test_locking.Resource()
+        resource = self.root["resource"] = Resource()
         request = TestRequest(
             environ = {"DESTINATION": "http://localhost/copy_of_resource"})
 
@@ -241,7 +280,7 @@
         self.assertRaises(MethodNotAllowed, copy.COPY)
 
     def test_nomovier(self):
-        resource = self.root["resource"] = test_locking.Resource()
+        resource = self.root["resource"] = Resource()
         request = TestRequest(
             environ = {"DESTINATION": "http://localhost/copy_of_resource"})
 
@@ -265,9 +304,9 @@
 
         if getattr(self.context, "__name__", None) is not None:
             path += "/" + self.context.__name__
-        elif test_locking.IResource.providedBy(self.context):
+        elif IResource.providedBy(self.context):
             path += "/resource"
-        elif test_locking.ICollectionResource.providedBy(self.context):
+        elif ICollectionResource.providedBy(self.context):
             path += "/collection"
         else:
             raise ValueError("unknown context type")
@@ -301,16 +340,16 @@
 class COPYObjectTestCase(unittest.TestCase):
 
     def setUp(self):
-        self.root = test_locking.RootCollectionResource()
+        self.root = RootCollectionResource()
 
         baseSetUp()
 
         Copier.iscopyable = True
         Copier.canCopyableTo = True
         gsm = component.getGlobalSiteManager()
-        gsm.registerAdapter(Copier, (test_locking.IResource,))
+        gsm.registerAdapter(Copier, (IResource,))
         gsm.registerAdapter(DummyResourceURL,
-                            (test_locking.IResource,
+                            (IResource,
                              zope.webdav.interfaces.IWebDAVRequest))
 
     def tearDown(self):
@@ -319,13 +358,13 @@
         baseTearDown()
 
         gsm = component.getGlobalSiteManager()
-        gsm.unregisterAdapter(Copier, (test_locking.IResource,))
+        gsm.unregisterAdapter(Copier, (IResource,))
         gsm.unregisterAdapter(DummyResourceURL,
-                              (test_locking.IResource,
+                              (IResource,
                                zope.webdav.interfaces.IWebDAVRequest))
 
     def test_copy(self):
-        resource = self.root["resource"] = test_locking.Resource()
+        resource = self.root["resource"] = Resource()
         request = TestRequest(
             environ = {"DESTINATION": "http://localhost/copy_of_resource"})
 
@@ -340,8 +379,8 @@
         self.assertEqual(self.root["resource"] is resource, True)
 
     def test_copy_overwrite(self):
-        resource = self.root["resource"] = test_locking.Resource()
-        resource2 = self.root["resource2"] = test_locking.Resource()
+        resource = self.root["resource"] = Resource()
+        resource2 = self.root["resource2"] = Resource()
         request = TestRequest(
             environ = {"DESTINATION": "http://localhost/resource2",
                        "OVERWRITE": "T"})
@@ -355,7 +394,7 @@
         self.assertEqual(self.root["resource2"] is resource, True)
 
     def test_copy_not_copyable(self):
-        resource = self.root["resource"] = test_locking.Resource()
+        resource = self.root["resource"] = Resource()
         request = TestRequest(
             environ = {"DESTINATION": "http://localhost/copy_of_resource"})
 
@@ -365,7 +404,7 @@
         self.assertRaises(MethodNotAllowed, copy.COPY)
 
     def test_copy_not_copyableto(self):
-        resource = self.root["resource"] = test_locking.Resource()
+        resource = self.root["resource"] = Resource()
         request = TestRequest(
             environ = {"DESTINATION": "http://localhost/copy_of_resource"})
 
@@ -400,16 +439,16 @@
 class MOVEObjectTestCase(unittest.TestCase):
 
     def setUp(self):
-        self.root = test_locking.RootCollectionResource()
+        self.root = RootCollectionResource()
 
         baseSetUp()
 
         Movier.isMoveable = True
         Movier.isMoveableTo = True
         gsm = component.getGlobalSiteManager()
-        gsm.registerAdapter(Movier, (test_locking.IResource,))
+        gsm.registerAdapter(Movier, (IResource,))
         gsm.registerAdapter(DummyResourceURL,
-                            (test_locking.IResource,
+                            (IResource,
                              zope.webdav.interfaces.IWebDAVRequest))
 
     def tearDown(self):
@@ -418,13 +457,13 @@
         baseTearDown()
 
         gsm = component.getGlobalSiteManager()
-        gsm.unregisterAdapter(Movier, (test_locking.IResource,))
+        gsm.unregisterAdapter(Movier, (IResource,))
         gsm.unregisterAdapter(DummyResourceURL,
-                              (test_locking.IResource,
+                              (IResource,
                                zope.webdav.interfaces.IWebDAVRequest))
 
     def test_move(self):
-        resource = self.root["resource"] = test_locking.Resource()
+        resource = self.root["resource"] = Resource()
         request = TestRequest(
             environ = {"DESTINATION": "http://localhost/copy_of_resource"})
 
@@ -438,8 +477,8 @@
         self.assertEqual(self.root["copy_of_resource"], resource)
 
     def test_move_overwrite(self):
-        resource = self.root["resource"] = test_locking.Resource()
-        resource2 = self.root["resource2"] = test_locking.Resource()
+        resource = self.root["resource"] = Resource()
+        resource2 = self.root["resource2"] = Resource()
         request = TestRequest(
             environ = {"DESTINATION": "http://localhost/resource2",
                        "OVERWRITE": "T"})
@@ -452,7 +491,7 @@
         self.assertEqual(self.root["resource2"] is resource, True)
 
     def test_move_not_moveable(self):
-        resource = self.root["resource"] = test_locking.Resource()
+        resource = self.root["resource"] = Resource()
         request = TestRequest(
             environ = {"DESTINATION": "http://localhost/copy_of_resource"})
 
@@ -462,7 +501,7 @@
         self.assertRaises(MethodNotAllowed, move.MOVE)
 
     def test_move_not_moveableTo(self):
-        resource = self.root["resource"] = test_locking.Resource()
+        resource = self.root["resource"] = Resource()
         request = TestRequest(
             environ = {"DESTINATION": "http://localhost/copy_of_resource"})
 

Modified: zope.webdav/trunk/src/zope/webdav/tests/test_doctests.py
===================================================================
--- zope.webdav/trunk/src/zope/webdav/tests/test_doctests.py	2007-04-04 18:35:41 UTC (rev 74013)
+++ zope.webdav/trunk/src/zope/webdav/tests/test_doctests.py	2007-04-04 19:56:56 UTC (rev 74014)
@@ -204,5 +204,13 @@
                              setUp = lockingSetUp, tearDown = lockingTearDown),
         doctest.DocTestSuite("zope.webdav.deadproperties"),
         doctest.DocTestSuite("zope.webdav.adapters"),
+        doctest.DocTestSuite("zope.webdav.locking",
+                             checker = zope.etree.testing.xmlOutputChecker,
+                             setUp = zope.etree.testing.etreeSetup,
+                             tearDown = zope.etree.testing.etreeTearDown),
+        doctest.DocFileSuite("locking.txt", package = "zope.webdav",
+                             checker = zope.etree.testing.xmlOutputChecker,
+                             setUp = zope.etree.testing.etreeSetup,
+                             tearDown = zope.etree.testing.etreeTearDown),
         doctest.DocTestSuite("zope.webdav.mkcol"),
         ))

Deleted: zope.webdav/trunk/src/zope/webdav/tests/test_locking.py
===================================================================
--- zope.webdav/trunk/src/zope/webdav/tests/test_locking.py	2007-04-04 18:35:41 UTC (rev 74013)
+++ zope.webdav/trunk/src/zope/webdav/tests/test_locking.py	2007-04-04 19:56:56 UTC (rev 74014)
@@ -1,707 +0,0 @@
-##############################################################################
-#
-# Copyright (c) 2006 Zope Corporation and Contributors.
-# All Rights Reserved.
-#
-# This software is subject to the provisions of the Zope Public License,
-# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
-# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
-# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
-# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
-# FOR A PARTICULAR PURPOSE.
-#
-##############################################################################
-"""Test WebDAV propfind method.
-
-$Id$
-"""
-
-import unittest
-from cStringIO import StringIO
-import UserDict
-import datetime
-import random
-import time
-
-from zope import interface
-from zope import schema
-import zope.schema.interfaces
-from zope import component
-from zope.security.proxy import removeSecurityProxy
-from zope.traversing.api import getPath, traverse, getRoot
-from zope.traversing.interfaces import IContainmentRoot
-from zope.location.traversing import LocationPhysicallyLocatable
-from zope.app.container.interfaces import IReadContainer
-from zope.webdav.locking import DEFAULTTIMEOUT, MAXTIMEOUT
-from zope.webdav.locking import UNLOCKMethod, LOCKMethod
-import zope.webdav.publisher
-import zope.webdav.interfaces
-from zope.etree.testing import etreeSetup, etreeTearDown, assertXMLEqual
-
-_randGen = random.Random(time.time())
-
-class TestWebDAVRequest(zope.webdav.publisher.WebDAVRequest):
-
-    def __init__(self, lockinfo = {}, body = "", environ = {}):
-        env = environ.copy()
-
-        if body:
-            env.setdefault("CONTENT_TYPE", "text/xml")
-        env.setdefault("CONTENT_LENGTH", len(body))
-
-        super(TestWebDAVRequest, self).__init__(StringIO(body), env)
-
-        self.processInputs()
-
-
-class LOCKINGHeaders(unittest.TestCase):
-
-    def test_depth_empty(self):
-        lock = LOCKMethod(None, TestWebDAVRequest())
-        self.assertEqual(lock.getDepth(), "infinity")
-
-    def test_depth_zero(self):
-        lock = LOCKMethod(None, TestWebDAVRequest(environ = {"DEPTH": "0"}))
-        self.assertEqual(lock.getDepth(), "0")
-
-    def test_timeout_default(self):
-        lock = LOCKMethod(None, TestWebDAVRequest(environ = {}))
-        self.assertEqual(lock.getTimeout(),
-                         datetime.timedelta(seconds = DEFAULTTIMEOUT))
-
-    def test_timeout_infinity(self):
-        request = TestWebDAVRequest(environ = {"TIMEOUT": "infinity"})
-        lock = LOCKMethod(None, request)
-        self.assertEqual(lock.getTimeout(),
-                         datetime.timedelta(seconds = DEFAULTTIMEOUT))
-
-    def test_timeout_infinite(self):
-        request = TestWebDAVRequest(environ = {"TIMEOUT": "infinite"})
-        lock = LOCKMethod(None, request)
-        self.assertEqual(lock.getTimeout(),
-                         datetime.timedelta(seconds = DEFAULTTIMEOUT))
-
-    def test_timeout_second_500(self):
-        request = TestWebDAVRequest(environ = {"TIMEOUT": "Second-500"})
-        lock = LOCKMethod(None, request)
-        self.assertEqual(lock.getTimeout(),
-                         datetime.timedelta(seconds = 500))
-
-    def test_invalid_second(self):
-        request = TestWebDAVRequest(environ = {"TIMEOUT": "XXX-500"})
-        lock = LOCKMethod(None, request)
-        self.assertRaises(zope.webdav.interfaces.BadRequest, lock.getTimeout)
-
-    def test_invalid_second_value(self):
-        request = TestWebDAVRequest(environ = {"TIMEOUT": "Second-500x"})
-        lock = LOCKMethod(None, request)
-        self.assertRaises(zope.webdav.interfaces.BadRequest, lock.getTimeout)
-
-    def test_big_second(self):
-        request = TestWebDAVRequest(
-            environ = {"TIMEOUT": "Second-%d" %(MAXTIMEOUT + 100)})
-        lock = LOCKMethod(None, request)
-        self.assertEqual(lock.getTimeout(),
-                         datetime.timedelta(seconds = DEFAULTTIMEOUT))
-
-    def test_timeout_infinity_and_3600(self):
-        request = TestWebDAVRequest(
-            environ = {"TIMEOUT": "Infinite, Second-3600"})
-        lock = LOCKMethod(None, request)
-        self.assertEqual(lock.getTimeout(), datetime.timedelta(seconds = 3600))
-
-    def test_timeout_infinity_and_big_int(self):
-        request = TestWebDAVRequest(
-            environ = {"TIMEOUT": "Infinite, Second-%d" %(MAXTIMEOUT + 100)})
-        lock = LOCKMethod(None, request)
-        self.assertEqual(lock.getTimeout(),
-                         datetime.timedelta(seconds = DEFAULTTIMEOUT))
-
-
-class DAVLockmanager(object):
-    interface.implements(zope.webdav.interfaces.IDAVLockmanager)
-
-    def __init__(self, context):
-        self.context = context
-
-    def generateLocktoken(self):
-        return "opaquelocktoken:%s-%s-00105A989226:%.03f" % \
-               (_randGen.random(), _randGen.random(), time.time())
-
-    def lock(self, scope, type, owner, duration, depth,
-             context = None):
-        if context is None:
-            if self.islockedObject(self.context):
-                raise zope.webdav.interfaces.AlreadyLocked(self.context)
-
-            ob = removeSecurityProxy(self.context)
-            setattr(ob, "_is_locked", {"scope": scope,
-                                       "type": type,
-                                       "owner": owner,
-                                       "duration": duration,
-                                       "depth": depth,
-                                       "token": self.generateLocktoken(),
-                                       "indirectlylocked": [],
-                                       "lockroot": getPath(self.context)})
-        else:
-            if self.islockedObject(context):
-                raise zope.webdav.interfaces.AlreadyLocked(context)
-
-            ob = removeSecurityProxy(context)
-            setattr(ob, "_is_indirectly_locked",
-                    {"lockroot": getPath(self.context),
-                     "rootinfo": removeSecurityProxy(self.context)._is_locked,
-                     })
-            root = removeSecurityProxy(self.context)
-            root._is_locked["indirectlylocked"].append(getPath(ob))
-
-        if context is None:
-            context = self.context
-
-        if depth == "infinity" and IReadContainer.providedBy(context):
-            for subob in context.values():
-                self.lock(scope, type, owner, duration, depth, subob)
-
-    def getActivelock(self, request = None):
-        return getActiveLock(self.context, request)
-
-    def refreshlock(self, timeout):
-        if not self.islockedObject(self.context):
-            raise zope.webdav.interfaces.ConflictError(
-                self.context, u"The context is not locked")
-
-        ob = removeSecurityProxy(self.context)
-        root = getRoot(self.context)
-
-        if getattr(ob, "_is_indirectly_locked", None) is not None:
-            lockroot = traverse(root, ob._is_indirectly_locked["lockroot"])
-        else:
-            lockroot = ob
-
-        removeSecurityProxy(lockroot)._is_locked["duration"] = timeout
-
-    def unlock(self):
-        if not self.islockedObject(self.context):
-            raise Exception("object is not locked")
-
-        ob = removeSecurityProxy(self.context)
-        root = getRoot(self.context)
-
-        if getattr(ob, "_is_indirectly_locked", None) is not None:
-            lockroot = traverse(root, ob._is_indirectly_locked["lockroot"])
-        else:
-            lockroot = ob
-
-        lockroot = removeSecurityProxy(lockroot)
-        for path in lockroot._is_locked["indirectlylocked"]:
-            ob = removeSecurityProxy(traverse(root, path))
-            delattr(ob, "_is_indirectly_locked")
-        delattr(lockroot, "_is_locked")
-
-    def islocked(self):
-        return self.islockedObject(self.context)
-
-    def islockedObject(self, context):
-        ob = removeSecurityProxy(context)
-        return getattr(ob, "_is_locked", None) is not None or \
-               getattr(ob, "_is_indirectly_locked", None) is not None
-
-
- at interface.implementer(zope.webdav.coreproperties.IActiveLock)
-def getActiveLock(context, request):
-    ob = removeSecurityProxy(context)
-
-    data = getattr(ob, "_is_indirectly_locked", None)
-    if data is not None:
-        root = traverse(getRoot(context), data["lockroot"])
-        root = removeSecurityProxy(root)
-        return ActiveLock(root._is_locked)
-
-    data = getattr(ob, "_is_locked", None)
-    if data is not None:
-        return ActiveLock(data)
-
-    return None
-
-
-class ActiveLock(object):
-    interface.implements(zope.webdav.coreproperties.IActiveLock)
-
-    def __init__(self, data):
-        self.data = data
-
-    @property
-    def lockscope(self):
-        return [u"exclusive"]
-
-    @property
-    def locktype(self):
-        return [u"write"]
-
-    @property
-    def depth(self):
-        return self.data["depth"]
-
-    @property
-    def owner(self):
-        return self.data["owner"]
-
-    @property
-    def timeout(self):
-        seconds = self.data["duration"]
-        if not isinstance(seconds, int):
-            seconds = seconds.seconds
-        return u"Second-%d" % seconds
-
-    @property
-    def locktoken(self):
-        return [self.data["token"]]
-
-    @property
-    def lockroot(self):
-        return "http://localhost%s" % self.data["lockroot"]
-
-
-class Lockdiscovery(object):
-    interface.implements(zope.webdav.coreproperties.IDAVLockdiscovery)
-
-    def __init__(self, context, request):
-        self.context = context
-        self.request = request
-
-    @property
-    def lockdiscovery(self):
-        activelock = getActiveLock(self.context, self.request)
-        if activelock is not None:
-            return [activelock]
-        return None
-
-
-class IResource(interface.Interface):
-
-    text = schema.TextLine(
-        title = u"Example Text Property")
-
-    intprop = schema.Int(
-        title = u"Example Int Property")
-
-
-class Resource(object):
-    interface.implements(IResource)
-
-    def __init__(self, text = u"", intprop = 0):
-        self.text = text
-        self.intprop = intprop
-
-
-class ICollectionResource(IReadContainer):
-
-    title = schema.TextLine(
-        title = u"Title",
-        description = u"Title of resource")
-
-
-class CollectionResource(UserDict.UserDict):
-    interface.implements(ICollectionResource)
-
-    title = None
-
-    def __setitem__(self, key, val):
-        val.__parent__ = self
-        val.__name__ = key
-
-        self.data[key] = val
-
-class RootCollectionResource(CollectionResource):
-    interface.implements(IContainmentRoot)
-
-
-class LOCKTestCase(unittest.TestCase):
-
-    def setUp(self):
-        etreeSetup()
-
-        self.root = RootCollectionResource()
-
-        gsm = component.getGlobalSiteManager()
-        gsm.registerAdapter(DAVLockmanager, (IResource,))
-        gsm.registerAdapter(DAVLockmanager, (ICollectionResource,))
-        gsm.registerAdapter(LocationPhysicallyLocatable, (IResource,))
-        gsm.registerAdapter(LocationPhysicallyLocatable, (ICollectionResource,))
-
-        gsm.registerAdapter(Lockdiscovery,
-                            (IResource, zope.webdav.interfaces.IWebDAVRequest))
-        gsm.registerUtility(zope.webdav.coreproperties.lockdiscovery,
-                            name = "{DAV:}lockdiscovery")
-        gsm.registerAdapter(getActiveLock,
-                            (IResource, zope.webdav.interfaces.IWebDAVRequest))
-
-        gsm.registerAdapter(zope.webdav.widgets.TextDAVWidget,
-                            (zope.schema.interfaces.IText,
-                             zope.webdav.interfaces.IWebDAVRequest))
-        gsm.registerAdapter(zope.webdav.widgets.TextDAVWidget,
-                            (zope.schema.interfaces.IURI,
-                             zope.webdav.interfaces.IWebDAVRequest))
-        gsm.registerAdapter(zope.webdav.properties.OpaqueWidget,
-                            (zope.webdav.properties.DeadField,
-                             zope.webdav.interfaces.IWebDAVRequest))
-        gsm.registerAdapter(zope.webdav.widgets.ListDAVWidget,
-                            (zope.schema.interfaces.IList,
-                             zope.webdav.interfaces.IWebDAVRequest))
-        gsm.registerAdapter(zope.webdav.widgets.ObjectDAVWidget,
-                            (zope.schema.interfaces.IObject,
-                             zope.webdav.interfaces.IWebDAVRequest))
-
-    def tearDown(self):
-        etreeTearDown()
-
-        gsm = component.getGlobalSiteManager()
-        gsm.unregisterAdapter(DAVLockmanager, (IResource,))
-        gsm.unregisterAdapter(DAVLockmanager, (ICollectionResource,))
-        gsm.unregisterAdapter(LocationPhysicallyLocatable, (IResource,))
-        gsm.unregisterAdapter(
-            LocationPhysicallyLocatable, (ICollectionResource,))
-
-        del self.root
-
-    def test_handleLock_notlockinfo(self):
-        body = """<?xml version="1.0" encoding="utf-8" ?>
-<D:notlockinfo xmlns:D="DAV:">
-  Not a lockinfo.
-</D:notlockinfo>"""
-        request = TestWebDAVRequest(body = body)
-
-        lock = LOCKMethod(None, request)
-        self.assertRaises(
-            zope.webdav.interfaces.UnprocessableError, lock.handleLock)
-
-    def test_handleLock_invalidDepth(self):
-        body = """<?xml version="1.0" encoding="utf-8" ?>
-<D:lockinfo xmlns:D="DAV:">
-  Not a lockinfo.
-</D:lockinfo>"""
-        request = TestWebDAVRequest(body = body, environ = {"DEPTH": "1"})
-
-        lock = LOCKMethod(None, request)
-        self.assertRaises(
-            zope.webdav.interfaces.BadRequest, lock.handleLock)
-
-    def test_handleLock_nolockscope(self):
-        body = """<?xml version="1.0" encoding="utf-8" ?>
-<D:lockinfo xmlns:D="DAV:">
-  Not a lockinfo.
-</D:lockinfo>"""
-        request = TestWebDAVRequest(body = body)
-
-        lock = LOCKMethod(None, request)
-        self.assertRaises(
-            zope.webdav.interfaces.UnprocessableError, lock.handleLock)
-
-    def test_handleLock_nolocktype(self):
-        body = """<?xml version="1.0" encoding="utf-8" ?>
-<D:lockinfo xmlns:D="DAV:">
-  <D:lockscope><D:exculsive/></D:lockscope>
-</D:lockinfo>"""
-        request = TestWebDAVRequest(body = body)
-
-        lock = LOCKMethod(None, request)
-        self.assertRaises(
-            zope.webdav.interfaces.UnprocessableError, lock.handleLock)
-
-    def test_handleLock(self):
-        body = """<?xml version="1.0" encoding="utf-8" ?>
-<D:lockinfo xmlns:D="DAV:">
-  <D:lockscope><D:exclusive/></D:lockscope>
-  <D:locktype><D:write/></D:locktype>
-  <D:owner>
-    <D:href>http://example.org/~ejw/contact.html</D:href>
-  </D:owner>
-</D:lockinfo>"""
-        request = TestWebDAVRequest(body = body)
-        resource = self.root["resource"] = Resource()
-
-        lock = LOCKMethod(resource, request)
-        errors = lock.handleLock()
-
-        lockmanager = DAVLockmanager(resource)
-        self.assertEqual(lockmanager.islocked(), True)
-
-    def test_handleLock_alreadLocked(self):
-        body = """<?xml version="1.0" encoding="utf-8" ?>
-<D:lockinfo xmlns:D="DAV:">
-  <D:lockscope><D:exclusive/></D:lockscope>
-  <D:locktype><D:write/></D:locktype>
-  <D:owner>
-    <D:href>http://example.org/~ejw/contact.html</D:href>
-  </D:owner>
-</D:lockinfo>"""
-        request = TestWebDAVRequest(body = body)
-        resource = self.root["resource"] = Resource()
-
-        lock = LOCKMethod(resource, request)
-        errors = lock.handleLock()
-        self.assertEqual(errors, [])
-
-        lockmanager = DAVLockmanager(resource)
-        self.assertEqual(lockmanager.islocked(), True)
-
-        lock = LOCKMethod(resource, request)
-        errors = lock.handleLock()
-        self.assertEqual(len(errors), 1)
-        self.assert_(zope.webdav.interfaces.IAlreadyLocked.providedBy(errors[0]))
-
-    def test_handleLockRefresh_notlocked(self):
-        request = TestWebDAVRequest()
-        resource = self.root["resource"] = Resource()
-
-        lock = LOCKMethod(resource, request)
-
-        self.assertRaises(zope.webdav.interfaces.PreconditionFailed,
-                          lock.handleLockRefresh)
-
-    def test_handleLockRefresh_noifheader(self):
-        request = TestWebDAVRequest()
-        resource = self.root["resource"] = Resource()
-
-        lock = LOCKMethod(resource, request)
-        lockmanager = DAVLockmanager(resource)
-        lockmanager.lock(u"exclusive", u"write", u"Michael",
-                         duration = datetime.timedelta(seconds = 100),
-                         depth = "0")
-
-        self.assertRaises(zope.webdav.interfaces.PreconditionFailed,
-                          lock.handleLockRefresh)
-
-    def test_handleLockRefresh_ifbadheader(self):
-        request = TestWebDAVRequest(environ = {"IF": "<xxx>"})
-        resource = self.root["resource"] = Resource()
-
-        lock = LOCKMethod(resource, request)
-        lockmanager = DAVLockmanager(resource)
-        lockmanager.lock(u"exclusive", u"write", u"Michael",
-                         duration = datetime.timedelta(seconds = 100),
-                         depth = "0")
-
-        self.assertRaises(zope.webdav.interfaces.PreconditionFailed,
-                          lock.handleLockRefresh)
-
-    def test_handleLockRefresh_ifbadheader2(self):
-        request = TestWebDAVRequest(environ = {"IF": "xxx"})
-        resource = self.root["resource"] = Resource()
-
-        lock = LOCKMethod(resource, request)
-        lockmanager = DAVLockmanager(resource)
-        lockmanager.lock(u"exclusive", u"write", u"Michael",
-                         duration = datetime.timedelta(seconds = 100),
-                         depth = "0")
-
-        self.assertRaises(zope.webdav.interfaces.PreconditionFailed,
-                          lock.handleLockRefresh)
-
-    def test_handleLockRefresh_defaulttimeout(self):
-        request = TestWebDAVRequest(environ = {"IF": "xxx"})
-        resource = self.root["resource"] = Resource()
-
-        lock = LOCKMethod(resource, request)
-        lockmanager = DAVLockmanager(resource)
-        lockmanager.lock(u"exclusive", u"write", u"Michael",
-                         duration = datetime.timedelta(seconds = 100),
-                         depth = "0")
-
-        self.assertEqual(lockmanager.getActivelock().timeout,
-                         u"Second-100")
-
-        locktoken = lockmanager.getActivelock().locktoken[0]
-        request = TestWebDAVRequest(environ = {"IF": "<%s>" % locktoken})
-        lock = LOCKMethod(resource, request)
-
-        lock.handleLockRefresh()
-
-        self.assertEqual(lockmanager.getActivelock().timeout,
-                         u"Second-720")
-
-    def test_handleLockRefresh_timeout(self):
-        request = TestWebDAVRequest(environ = {"IF": "xxx"})
-        resource = self.root["resource"] = Resource()
-
-        lock = LOCKMethod(resource, request)
-        lockmanager = DAVLockmanager(resource)
-        lockmanager.lock(u"exclusive", u"write", u"Michael",
-                         duration = datetime.timedelta(seconds = 100),
-                         depth = "0")
-
-        self.assertEqual(lockmanager.getActivelock().timeout,
-                         u"Second-100")
-
-        locktoken = lockmanager.getActivelock().locktoken[0]
-        request = TestWebDAVRequest(environ = {"IF": "<%s>" % locktoken,
-                                               "TIMEOUT": "Second-500"})
-        lock = LOCKMethod(resource, request)
-
-        lock.handleLockRefresh()
-
-        self.assertEqual(lockmanager.getActivelock().timeout,
-                         u"Second-500")
-
-    def test_lock_alreadLocked(self):
-        body = """<?xml version="1.0" encoding="utf-8" ?>
-<D:lockinfo xmlns:D="DAV:">
-  <D:lockscope><D:exclusive/></D:lockscope>
-  <D:locktype><D:write/></D:locktype>
-  <D:owner>
-    <D:href>http://example.org/~ejw/contact.html</D:href>
-  </D:owner>
-</D:lockinfo>"""
-        request = TestWebDAVRequest(body = body)
-        resource = self.root["resource"] = Resource()
-
-        lockmanager = DAVLockmanager(resource)
-        lockmanager.lock(u"exclusive", u"write", u"Michael",
-                         duration = datetime.timedelta(seconds = 100),
-                         depth = "0")
-
-        lock = LOCKMethod(resource, request)
-        self.assertRaises(zope.webdav.interfaces.WebDAVErrors, lock.LOCK)
-
-    def test_lock(self):
-        body = """<?xml version="1.0" encoding="utf-8" ?>
-<D:lockinfo xmlns:D="DAV:">
-  <D:lockscope><D:exclusive/></D:lockscope>
-  <D:locktype><D:write/></D:locktype>
-  <D:owner>
-    <D:href>http://example.org/~ejw/contact.html</D:href>
-  </D:owner>
-</D:lockinfo>"""
-        request = TestWebDAVRequest(body = body)
-        resource = self.root["resource"] = Resource()
-
-        lock = LOCKMethod(resource, request)
-        result = lock.LOCK()
-
-        locktoken = request.response.getHeader("lock-token")
-        lockmanager = DAVLockmanager(resource)
-        currentlocktoken = lockmanager.getActivelock().locktoken[0]
-        self.assert_(locktoken[0], "<")
-        self.assert_(locktoken[-1], ">")
-        self.assertEqual(currentlocktoken, locktoken[1:-1])
-
-        assertXMLEqual(result, """<ns0:prop xmlns:ns0="DAV:">
-<ns0:lockdiscovery xmlns:ns0="DAV:">
-  <ns0:activelock xmlns:ns0="DAV:">
-    <ns0:lockscope xmlns:ns0="DAV:"><ns0:exclusive xmlns:ns0="DAV:"/></ns0:lockscope>
-    <ns0:locktype xmlns:ns0="DAV:"><ns0:write xmlns:ns0="DAV:"/></ns0:locktype>
-    <ns0:depth xmlns:ns0="DAV:">infinity</ns0:depth>
-    <ns0:owner xmlns:D="DAV:">
-      <ns0:href>http://example.org/~ejw/contact.html</ns0:href>
-    </ns0:owner>
-    <ns0:timeout xmlns:ns0="DAV:">Second-720</ns0:timeout>
-    <ns0:locktoken xmlns:ns0="DAV:">
-      <ns0:href xmlns:ns0="DAV:">%s</ns0:href>
-    </ns0:locktoken>
-    <ns0:lockroot xmlns:ns0="DAV:">http://localhost/resource</ns0:lockroot>
-  </ns0:activelock>
-</ns0:lockdiscovery></ns0:prop>""" % locktoken[1:-1])
-
-    def test_refreshlock_alreadLocked(self):
-        request = TestWebDAVRequest()
-        resource = self.root["resource"] = Resource()
-
-        lockmanager = DAVLockmanager(resource)
-        lockmanager.lock(u"exclusive", u"write", u"Michael",
-                         duration = datetime.timedelta(seconds = 100),
-                         depth = "0")
-
-        lock = LOCKMethod(resource, request)
-        self.assertRaises(zope.webdav.interfaces.PreconditionFailed, lock.LOCK)
-
-    def test_refreshlock(self):
-        resource = self.root["resource"] = Resource()
-
-        lockmanager = DAVLockmanager(resource)
-        lockmanager.lock(u"exclusive", u"write",
-                         u"""<D:owner xmlns:D="DAV:">Michael</D:owner>""",
-                         duration = datetime.timedelta(seconds = 100),
-                         depth = "0")
-        locktoken = lockmanager.getActivelock().locktoken[0]
-
-        request = TestWebDAVRequest(environ = {"IF": "<%s>" % locktoken})
-        lock = LOCKMethod(resource, request)
-        result = lock.LOCK()
-
-        assertXMLEqual(result, """<ns0:prop xmlns:ns0="DAV:">
-<ns0:lockdiscovery xmlns:ns0="DAV:">
-  <ns0:activelock xmlns:ns0="DAV:">
-    <ns0:lockscope xmlns:ns0="DAV:"><ns0:exclusive xmlns:ns0="DAV:"/></ns0:lockscope>
-    <ns0:locktype xmlns:ns0="DAV:"><ns0:write xmlns:ns0="DAV:"/></ns0:locktype>
-    <ns0:depth xmlns:ns0="DAV:">0</ns0:depth>
-    <ns0:owner xmlns:D="DAV:">Michael</ns0:owner>
-    <ns0:timeout xmlns:ns0="DAV:">Second-720</ns0:timeout>
-    <ns0:locktoken xmlns:ns0="DAV:">
-      <ns0:href xmlns:ns0="DAV:">%s</ns0:href>
-    </ns0:locktoken>
-    <ns0:lockroot xmlns:ns0="DAV:">http://localhost/resource</ns0:lockroot>
-  </ns0:activelock>
-</ns0:lockdiscovery></ns0:prop>""" % locktoken)
-
-    def test_unlock_nolocktoken(self):
-        resource = self.root["resource"] = Resource()
-
-        lockmanager = DAVLockmanager(resource)
-        lockmanager.lock(u"exclusive", u"write",
-                         u"""<D:owner xmlns:D="DAV:">Michael</D:owner>""",
-                         duration = datetime.timedelta(seconds = 100),
-                         depth = "0")
-
-        unlock = UNLOCKMethod(resource, TestWebDAVRequest())
-        self.assertRaises(zope.webdav.interfaces.BadRequest, unlock.UNLOCK)
-
-    def test_unlock_badlocktoken(self):
-        resource = self.root["resource"] = Resource()
-
-        lockmanager = DAVLockmanager(resource)
-        lockmanager.lock(u"exclusive", u"write",
-                         u"""<D:owner xmlns:D="DAV:">Michael</D:owner>""",
-                         duration = datetime.timedelta(seconds = 100),
-                         depth = "0")
-
-        request = TestWebDAVRequest(environ = {"LOCK_TOKEN": "XXX"})
-        unlock = UNLOCKMethod(resource, request)
-        self.assertRaises(zope.webdav.interfaces.ConflictError, unlock.UNLOCK)
-
-    def test_unlock_badlocktoken2(self):
-        resource = self.root["resource"] = Resource()
-
-        lockmanager = DAVLockmanager(resource)
-        lockmanager.lock(u"exclusive", u"write",
-                         u"""<D:owner xmlns:D="DAV:">Michael</D:owner>""",
-                         duration = datetime.timedelta(seconds = 100),
-                         depth = "0")
-
-        request = TestWebDAVRequest(environ = {"LOCK_TOKEN": "<XXX>"})
-        unlock = UNLOCKMethod(resource, request)
-        self.assertRaises(zope.webdav.interfaces.ConflictError, unlock.UNLOCK)
-
-    def test_unlock(self):
-        resource = self.root["resource"] = Resource()
-
-        lockmanager = DAVLockmanager(resource)
-        lockmanager.lock(u"exclusive", u"write",
-                         u"""<D:owner xmlns:D="DAV:">Michael</D:owner>""",
-                         duration = datetime.timedelta(seconds = 100),
-                         depth = "0")
-        locktoken = lockmanager.getActivelock().locktoken[0]
-
-        request = TestWebDAVRequest(
-            environ = {"LOCK_TOKEN": "<%s>" % locktoken})
-        unlock = UNLOCKMethod(resource, request)
-        result = unlock.UNLOCK()
-
-        self.assertEqual(result, "")
-        self.assertEqual(request.response.getStatus(), 204)
-
-
-def test_suite():
-    return unittest.TestSuite((
-        unittest.makeSuite(LOCKINGHeaders),
-        unittest.makeSuite(LOCKTestCase),
-        ))



More information about the Checkins mailing list