[Zope3-checkins] CVS: Zope3/src/zope/server/ftp/tests - demofs.py:1.1 fstests.py:1.1 test_demofs.py:1.1 test_publisher.py:1.1 test_ftpserver.py:1.4 test_osemulators.py:NONE

Jim Fulton jim@zope.com
Mon, 3 Feb 2003 10:09:37 -0500


Update of /cvs-repository/Zope3/src/zope/server/ftp/tests
In directory cvs.zope.org:/tmp/cvs-serv15846/src/zope/server/ftp/tests

Modified Files:
	test_ftpserver.py 
Added Files:
	demofs.py fstests.py test_demofs.py test_publisher.py 
Removed Files:
	test_osemulators.py 
Log Message:
Refactored the ftp framework to make it much simpler, less general,
and easier to maintain.  This included ripping out the vfs framework.


=== Added File Zope3/src/zope/server/ftp/tests/demofs.py ===
##############################################################################
# Copyright (c) 2003 Zope Corporation and Contributors.
# All Rights Reserved.
# 
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
##############################################################################
"""Demo file-system implementation, for testing

$Id: demofs.py,v 1.1 2003/02/03 15:09:01 jim Exp $
"""
__metaclass__ = type

import posixpath
from zope.exceptions import Unauthorized
from zope.server.interfaces.ftp import IFileSystem
from zope.server.interfaces.ftp import IFileSystemAccess

execute = 1
read = 2
write = 4

class File:
    type = 'f'
    modified=None

    def __init__(self):
        self.access = {'anonymous': read}

    def accessable(self, user, access=read):
        return (user == 'root'
                or (self.access.get(user, 0) & access)
                or (self.access.get('anonymous', 0) & access)
                )

    def grant(self, user, access):
        self.access[user] = self.access.get(user, 0) | access

    def revoke(self, user, access):
        self.access[user] = self.access.get(user, 0) ^ access

class Directory(File):

    type = 'd'

    def __init__(self):
        super(Directory, self).__init__()
        self.files = {}

    def get(self, name, default=None):
        return self.files.get(name, default)

    def __getitem__(self, name):
        return self.files[name]

    def __setitem__(self, name, v):
        self.files[name] = v

    def __delitem__(self, name):
        del self.files[name]

    def __contains__(self, name):
        return name in self.files

    def __iter__(self):
        return iter(self.files)

class DemoFileSystem:
    __doc__ = IFileSystem.__doc__

    __implements__ =  IFileSystem

    File = File
    Directory = Directory

    def __init__(self, files, user=''):
        self.files = files
        self.user = user

    def get(self, path, default=None):

        while path.startswith('/'):
            path = path[1:]
            
        d = self.files
        if path:
            for name in path.split('/'):
                if d.type is not 'd':
                    return default
                if not d.accessable(self.user):
                    raise Unauthorized
                d = d.get(name)
                if d is None:
                    break

        return d

    def getany(self, path):
        d = self.get(path)
        if d is None:
            raise OSError("No such file or directory:", path)
        return d
    
    def getdir(self, path):
        d = self.getany(path)
        if d.type != 'd':
            raise OSError("Not a directory:", path)
        return d
    
    def getfile(self, path):
        d = self.getany(path)
        if d.type != 'f':
            raise OSError("Not a file:", path)
        return d

    def getwdir(self, path):
        d = self.getdir(path)
        if not d.accessable(self.user, write):
            raise OSError("Permission denied")
        return d

    def type(self, path):
        "See zope.server.interfaces.ftp.IFileSystem"
        f = self.get(path)
        return getattr(f, 'type', None)        

    def names(self, path, filter=None):
        "See zope.server.interfaces.ftp.IFileSystem"
        f = list(self.getdir(path))
        if filter is not None:
            f = [name for name in f if filter(name)]

        return f

    def _lsinfo(self, name, file):
        info = {
            'type': file.type,
            'name': name,
            'group_read': file.accessable(self.user, read),
            'group_write': file.accessable(self.user, write),
            }
        if file.type == 'f':
            info['size'] = len(file.data)
        if file.modified is not None:
            info['mtime'] = file.modified

        return info
    
    def ls(self, path, filter=None):
        "See zope.server.interfaces.ftp.IFileSystem"
        f = self.getdir(path)
        if filter is None:
            return [self._lsinfo(name, f.files[name])
                    for name in f
                    ]

        return [self._lsinfo(name, f.files[name])
                for name in f
                if filter(name)]

    def readfile(self, path, outstream, start=0, end=None):
        "See zope.server.interfaces.ftp.IFileSystem"
        f = self.getfile(path)

        data = f.data
        if end is not None:
            data = data[:end]
        if start:
            data = data[start:]

        outstream.write(data)

    def lsinfo(self, path):
        "See zope.server.interfaces.ftp.IFileSystem"
        f = self.getany(path)
        return self._lsinfo(posixpath.split(path)[1], f) 

    def mtime(self, path):
        "See zope.server.interfaces.ftp.IFileSystem"
        f = self.getany(path)
        return f.modified

    def size(self, path):
        "See zope.server.interfaces.ftp.IFileSystem"
        f = self.getany(path)
        return len(getattr(f, 'data', ''))
        
    def mkdir(self, path):
        "See zope.server.interfaces.ftp.IFileSystem"
        path, name = posixpath.split(path)
        d = self.getwdir(path)
        if name in d.files:
            raise OSError("Already exists:", name)
        newdir = self.Directory()
        newdir.grant(self.user, read | write)
        d.files[name] = newdir
        
    def remove(self, path):
        "See zope.server.interfaces.ftp.IFileSystem"
        path, name = posixpath.split(path)
        d = self.getwdir(path)
        if name not in d.files:
            raise OSError("Not exists:", name)
        f = d.files[name]
        if f.type == 'd':
            raise OSError('Is a directory:', name)
        del d.files[name]
                          
    def rmdir(self, path):
        "See zope.server.interfaces.ftp.IFileSystem"
        path, name = posixpath.split(path)
        d = self.getwdir(path)
        if name not in d.files:
            raise OSError("Not exists:", name)
        f = d.files[name]
        if f.type != 'd':
            raise OSError('Is not a directory:', name)
        del d.files[name]

    def rename(self, old, new):
        "See zope.server.interfaces.ftp.IFileSystem"
        oldpath, oldname = posixpath.split(old)
        newpath, newname = posixpath.split(new)

        olddir = self.getwdir(oldpath)
        newdir = self.getwdir(newpath)

        if oldname not in olddir.files:
            raise OSError("Not exists:", oldname)
        if newname in newdir.files:
            raise OSError("Already exists:", newname)

        newdir.files[newname] = olddir.files[oldname]
        del olddir.files[oldname]

    def writefile(self, path, instream, start=None, end=None, append=False):
        "See zope.server.interfaces.ftp.IFileSystem"
        path, name = posixpath.split(path)
        d = self.getdir(path)
        f = d.files.get(name)
        if f is None:
            d = self.getwdir(path)
            f = d.files[name] = self.File()
            f.grant(self.user, read | write)
        elif f.type != 'f':
            raise OSError("Can't overwrite a directory")

        if not f.accessable(self.user, write):
            raise OSError("Permission denied")

        if append:
            f.data += instream.read()
        else:
            
            if start:
                if start < 0:
                    raise ValueError("Negative starting file position")
                prefix = f.data[:start]
                if len(prefix) < start:
                    prefix += '\0' * (start - len(prefix))
            else:
                prefix = ''
                start=0

            if end:
                if end < 0:
                    raise ValueError("Negative ending file position")
                l = end - start
                newdata = instream.read(l)

                f.data = prefix+newdata+f.data[start+len(newdata):]
            else:
                f.data = prefix + instream.read()

    def writable(self, path):
        "See zope.server.interfaces.ftp.IFileSystem"
        path, name = posixpath.split(path)
        d = self.getdir(path)
        if name not in d:
            return d.accessable(self.user, write)
        f = d[name]
        return f.type == 'f' and f.accessable(self.user, write)
            
class DemoFileSystemAccess:
    __doc__ = IFileSystemAccess.__doc__

    __implements__ =  IFileSystemAccess

    def __init__(self, files, users):
        self.files = files
        self.users = users

    def authenticate(self, credentials):
        "See zope.server.interfaces.ftp.IFileSystemAccess"
        user, password = credentials
        if user != 'anonymous':
            if self.users.get(user) != password:
                raise Unauthorized
        return user

    def open(self, credentials):
        "See zope.server.interfaces.ftp.IFileSystemAccess"
        user = self.authenticate(credentials)
        return DemoFileSystem(self.files, user)


=== Added File Zope3/src/zope/server/ftp/tests/fstests.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Abstract file-system tests

$Id: fstests.py,v 1.1 2003/02/03 15:09:01 jim Exp $
"""

import stat
from StringIO import StringIO
from zope.interface.verify import verifyObject
from zope.server.interfaces.ftp import IFileSystem

class FileSystemTests:
    """Tests of a readable filesystem
    """

    filesystem = None
    dir_name  = '/dir'
    file_name = '/dir/file.txt'
    unwritable_filename = '/dir/protected.txt'
    dir_contents = ['file.txt', 'protected.txt']
    file_contents = 'Lengthen your stride'

    def test_type(self):
        self.assertEqual(self.filesystem.type(self.dir_name), 'd')
        self.assertEqual(self.filesystem.type(self.file_name), 'f')


    def test_names(self):
        lst = self.filesystem.names(self.dir_name)
        lst.sort()
        self.assertEqual(lst, self.dir_contents)

    def test_readfile(self):
        s = StringIO()
        self.filesystem.readfile(self.file_name, s)
        self.assertEqual(s.getvalue(), self.file_contents)


    def testReadPartOfFile(self):
        s = StringIO()
        self.filesystem.readfile(self.file_name, s, 2)
        self.assertEqual(s.getvalue(), self.file_contents[2:])


    def testReadPartOfFile2(self):
        s = StringIO()
        self.filesystem.readfile(self.file_name, s, 1, 5)
        self.assertEqual(s.getvalue(), self.file_contents[1:5])

    def test_IFileSystemInterface(self):
        verifyObject(IFileSystem, self.filesystem)

    def testRemove(self):
        self.filesystem.remove(self.file_name)
        self.failIf(self.filesystem.type(self.file_name))


    def testMkdir(self):
        path = self.dir_name + '/x'
        self.filesystem.mkdir(path)
        self.assertEqual(self.filesystem.type(path), 'd')

    def testRmdir(self):
        self.filesystem.remove(self.file_name)
        self.filesystem.rmdir(self.dir_name)
        self.failIf(self.filesystem.type(self.dir_name))


    def testRename(self):
        self.filesystem.rename(self.file_name, self.file_name + '.bak')
        self.assertEqual(self.filesystem.type(self.file_name), None)
        self.assertEqual(self.filesystem.type(self.file_name + '.bak'), 'f')


    def testWriteFile(self):
        s = StringIO()
        self.filesystem.readfile(self.file_name, s)
        self.assertEqual(s.getvalue(), self.file_contents)

        data = 'Always ' + self.file_contents
        s = StringIO(data)
        self.filesystem.writefile(self.file_name, s)

        s = StringIO()
        self.filesystem.readfile(self.file_name, s)
        self.assertEqual(s.getvalue(), data)


    def testAppendToFile(self):
        data = ' again'
        s = StringIO(data)
        self.filesystem.writefile(self.file_name, s, append=True)

        s = StringIO()
        self.filesystem.readfile(self.file_name, s)
        self.assertEqual(s.getvalue(), self.file_contents + data)

    def testWritePartOfFile(self):
        data = '123'
        s = StringIO(data)
        self.filesystem.writefile(self.file_name, s, 3, 6)

        expect = self.file_contents[:3] + data + self.file_contents[6:]

        s = StringIO()
        self.filesystem.readfile(self.file_name, s)
        self.assertEqual(s.getvalue(), expect)

    def testWritePartOfFile_and_truncate(self):
        data = '123'
        s = StringIO(data)
        self.filesystem.writefile(self.file_name, s, 3)

        expect = self.file_contents[:3] + data

        s = StringIO()
        self.filesystem.readfile(self.file_name, s)
        self.assertEqual(s.getvalue(), expect)

    def testWriteBeyondEndOfFile(self):
        partlen = len(self.file_contents) - 6
        data = 'daylight savings'
        s = StringIO(data)
        self.filesystem.writefile(self.file_name, s, partlen)

        expect = self.file_contents[:partlen] + data

        s = StringIO()
        self.filesystem.readfile(self.file_name, s)
        self.assertEqual(s.getvalue(), expect)


    def testWriteNewFile(self):
        s = StringIO(self.file_contents)
        self.filesystem.writefile(self.file_name + '.new', s)

        s = StringIO()
        self.filesystem.readfile(self.file_name, s)
        self.assertEqual(s.getvalue(), self.file_contents)


    def test_writable(self):
        self.failIf(self.filesystem.writable(self.dir_name))
        self.failIf(self.filesystem.writable(self.unwritable_filename))
        self.failUnless(self.filesystem.writable(self.file_name))
        self.failUnless(self.filesystem.writable(self.file_name+'1'))


=== Added File Zope3/src/zope/server/ftp/tests/test_demofs.py ===
##############################################################################
#
# Copyright (c) 2003 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""XXX short summary goes here.

XXX longer description goes here.

$Id: test_demofs.py,v 1.1 2003/02/03 15:09:01 jim Exp $
"""

import demofs
from unittest import TestCase, TestSuite, main, makeSuite
from fstests import FileSystemTests
from StringIO import StringIO

class Test(FileSystemTests, TestCase):

    def setUp(self):
        root = demofs.Directory()
        root.grant('bob', demofs.write)
        fs = self.filesystem = demofs.DemoFileSystem(root, 'bob')
        fs.mkdir(self.dir_name)
        fs.writefile(self.file_name, StringIO(self.file_contents))
        fs.writefile(self.unwritable_filename, StringIO("save this"))
        fs.get(self.unwritable_filename).revoke('bob', demofs.write)

def test_suite():
    return TestSuite((
        makeSuite(Test),
        ))

if __name__=='__main__':
    main(defaultTest='test_suite')


=== Added File Zope3/src/zope/server/ftp/tests/test_publisher.py ===
##############################################################################
#
# Copyright (c) 2003 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""XXX short summary goes here.

XXX longer description goes here.

$Id: test_publisher.py,v 1.1 2003/02/03 15:09:01 jim Exp $
"""

import demofs
from unittest import TestCase, TestSuite, main, makeSuite
from fstests import FileSystemTests
from StringIO import StringIO
from zope.publisher.publish import mapply
from zope.server.ftp.publisher import PublisherFileSystem

class DemoFileSystem(demofs.DemoFileSystem):

    def rename(self, path, old, new):
        return demofs.DemoFileSystem.rename(
            self, "%s/%s" % (path, old), "%s/%s" % (path, new)) 

class Publication:

    def __init__(self, root):
        self.root = root

    def beforeTraversal(self, request):
        pass
    
    def getApplication(self, request):
        return self.root

    def afterTraversal(self, request, ob):
        pass

    def callObject(self, request, ob):
        command = getattr(ob, request.env['command'])
        if 'name' in request.env:
            request.env['path'] += "/" + request.env['name']
        return mapply(command, request = request.env)

    def afterCall(self, request):
        pass

    def handleException(self, object, request, info, retry_allowed=True):
        request.response._exc = info[:2]
        

class Request:

    def __init__(self, input, output, env):
        self.env = env
        self.response = Response()
        self.user = env['credentials']
        del env['credentials']

    def processInputs(self):
        pass

    def traverse(self, root):
        root.user = self.user
        return root

    def close(self):
        pass

class Response:

    _exc = _body = None

    def setBody(self, result):
        self._body = result

    def outputBody(self):
        pass

    def getResult(self):
        if self._exc:
            raise self._exc[0], self._exc[1]
        return self._body

class RequestFactory:

    def __init__(self, root):
        self.pub = Publication(root)

    def __call__(self, input, output, env):
        r = Request(input, output, env)
        r.publication = self.pub
        return r

class TestPublisherFileSystem(FileSystemTests, TestCase):

    def setUp(self):
        root = demofs.Directory()
        root.grant('bob', demofs.write)
        fs = DemoFileSystem(root, 'bob')
        fs.mkdir(self.dir_name)
        fs.writefile(self.file_name, StringIO(self.file_contents))
        fs.writefile(self.unwritable_filename, StringIO("save this"))
        fs.get(self.unwritable_filename).revoke('bob', demofs.write)

        self.filesystem = PublisherFileSystem('bob', RequestFactory(fs))

def test_suite():
    return TestSuite((
        makeSuite(TestPublisherFileSystem),
        ))

if __name__=='__main__':
    main(defaultTest='test_suite')


=== Zope3/src/zope/server/ftp/tests/test_ftpserver.py 1.3 => 1.4 ===
--- Zope3/src/zope/server/ftp/tests/test_ftpserver.py:1.3	Thu Jan 30 11:01:11 2003
+++ Zope3/src/zope/server/ftp/tests/test_ftpserver.py	Mon Feb  3 10:09:01 2003
@@ -29,16 +29,15 @@
 
 from threading import Thread, Event
 from zope.server.taskthreads import ThreadedTaskDispatcher
-from zope.server.ftp.ftpserver import FTPServer
-from zope.server.ftp.ftpstatusmessages import status_msgs
+from zope.server.ftp.server import FTPServer, status_messages
 from zope.server.adjustments import Adjustments
 from zope.server.interfaces import ITask
 
-from zope.server.vfs.osfilesystem import OSFileSystem
-from zope.server.ftp.testfilesystemaccess import TestFilesystemAccess
+from zope.server.ftp.tests import demofs
 
 from zope.server.tests.asyncerror import AsyncoreErrorHook
 
+
 import ftplib
 
 from time import sleep, time
@@ -66,13 +65,22 @@
         self.orig_map_size = len(asyncore.socket_map)
         self.hook_asyncore_error()
 
-        self.root_dir = tempfile.mktemp()
-        os.mkdir(self.root_dir)
-        os.mkdir(os.path.join(self.root_dir, 'test'))
-
-        fs = OSFileSystem(self.root_dir)
-        fs_access = TestFilesystemAccess(fs)
+        root_dir = demofs.Directory()
+        root_dir['test'] = demofs.Directory()
+        root_dir['test'].access['foo'] = 7
+        root_dir['private'] = demofs.Directory()
+        root_dir['private'].access['foo'] = 7
+        root_dir['private'].access['anonymous'] = 0
+
+        fs = demofs.DemoFileSystem(root_dir, 'foo')
+        fs.writefile('/test/existing', StringIO('test initial data'))
+        fs.writefile('/private/existing', StringIO('private initial data'))
+        
+        self.__fs = fs = demofs.DemoFileSystem(root_dir, 'root')
+        fs.writefile('/existing', StringIO('root initial data'))
 
+        fs_access = demofs.DemoFileSystemAccess(root_dir, {'foo': 'bar'})
+        
         self.server = FTPServer(LOCALHOST, SERVER_PORT, fs_access,
                                 task_dispatcher=td, adj=my_adj)
         if CONNECT_TO_PORT == 0:
@@ -105,9 +113,6 @@
 
         self.unhook_asyncore_error()
 
-        if os.path.exists(self.root_dir):
-            shutil.rmtree(self.root_dir)
-
     def loop(self):
         self.thread_started.set()
         import select
@@ -131,10 +136,10 @@
         if login:
             ftp.send('USER foo\r\n')
             self.assertEqual(ftp.recv(1024),
-                             status_msgs['PASS_REQUIRED'] +'\r\n')
+                             status_messages['PASS_REQUIRED'] +'\r\n')
             ftp.send('PASS bar\r\n')
             self.assertEqual(ftp.recv(1024),
-                             status_msgs['LOGIN_SUCCESS'] +'\r\n')
+                             status_messages['LOGIN_SUCCESS'] +'\r\n')
 
         return ftp
 
@@ -158,40 +163,83 @@
 
     def testABOR(self):
         self.assertEqual(self.execute('ABOR', 1),
-                         status_msgs['TRANSFER_ABORTED'])
+                         status_messages['TRANSFER_ABORTED'])
+
+
+    def testAPPE(self):
+        conn = ftplib.FTP()
+        try:
+            conn.connect(LOCALHOST, self.port)
+            conn.login('foo', 'bar')
+            fp = StringIO('Charity never faileth')
+            # Successful write
+            conn.storbinary('APPE /test/existing', fp)
+            self.assertEqual(self.__fs.files['test']['existing'].data,
+                             'test initial dataCharity never faileth')
+        finally:
+            conn.close()
+        # Make sure no garbage was left behind.
+        self.testNOOP()
+
+    def testAPPE_errors(self):
+        conn = ftplib.FTP()
+        try:
+            conn.connect(LOCALHOST, self.port)
+            conn.login('foo', 'bar')
+
+            fp = StringIO('Speak softly')
 
+            # Can't overwrite directory
+            self.assertRaises(
+                ftplib.error_perm, conn.storbinary, 'APPE /test', fp)
+
+            # No such file
+            self.assertRaises(
+                ftplib.error_perm, conn.storbinary, 'APPE /nosush', fp)
+
+            # No such dir
+            self.assertRaises(
+                ftplib.error_perm, conn.storbinary, 'APPE /nosush/f', fp)
+
+            # Not allowed
+            self.assertRaises(
+                ftplib.error_perm, conn.storbinary, 'APPE /existing', fp)
+
+        finally:
+            conn.close()
+        # Make sure no garbage was left behind.
+        self.testNOOP()
 
     def testCDUP(self):
         self.assertEqual(self.execute(('CWD test', 'CDUP'), 1),
-                         status_msgs['SUCCESS_250'] %'CDUP')
+                         status_messages['SUCCESS_250'] %'CDUP')
         self.assertEqual(self.execute('CDUP', 1),
-                         status_msgs['SUCCESS_250'] %'CDUP')
+                         status_messages['SUCCESS_250'] %'CDUP')
 
 
     def testCWD(self):
         self.assertEqual(self.execute('CWD test', 1),
-                         status_msgs['SUCCESS_250'] %'CWD')
+                         status_messages['SUCCESS_250'] %'CWD')
         self.assertEqual(self.execute('CWD foo', 1),
-                         status_msgs['ERR_NO_DIR'] %'/foo')
+                         status_messages['ERR_NO_DIR'] %'/foo')
 
 
     def testDELE(self):
-        open(os.path.join(self.root_dir, 'foo'), 'w').write('blah')
-        self.assertEqual(self.execute('DELE foo', 1),
-                         status_msgs['SUCCESS_250'] %'DELE')
+        self.assertEqual(self.execute('DELE test/existing', 1),
+                         status_messages['SUCCESS_250'] %'DELE')
         res = self.execute('DELE bar', 1).split()[0]
         self.assertEqual(res, '550')
         self.assertEqual(self.execute('DELE', 1),
-                         status_msgs['ERR_ARGS'])
+                         status_messages['ERR_ARGS'])
 
 
     def XXXtestHELP(self):
         # XXX This test doesn't work.  I think it is because execute()
         #     doesn't read the whole reply.  The execeute() helper
         #     function should be fixed, but that's for another day.
-        result = status_msgs['HELP_START'] + '\r\n'
+        result = status_messages['HELP_START'] + '\r\n'
         result += 'Help goes here somewhen.\r\n'
-        result += status_msgs['HELP_END']
+        result += status_messages['HELP_END']
 
         self.assertEqual(self.execute('HELP', 1), result)
 
@@ -200,7 +248,7 @@
         conn = ftplib.FTP()
         try:
             conn.connect(LOCALHOST, self.port)
-            conn.login('foo', 'bar')
+            conn.login('anonymous', 'bar')
             self.assertRaises(ftplib.Error, retrlines, conn, 'LIST /foo')
             listing = retrlines(conn, 'LIST')
             self.assert_(len(listing) > 0)
@@ -212,16 +260,16 @@
         self.testNOOP()
 
     def testMKDLIST(self):
-        self.execute(['MKD f1', 'MKD f2'], 1)
+        self.execute(['MKD test/f1', 'MKD test/f2'], 1)
         conn = ftplib.FTP()
         try:
             conn.connect(LOCALHOST, self.port)
             conn.login('foo', 'bar')
             listing = []
-            conn.retrlines('LIST', listing.append)
+            conn.retrlines('LIST /test', listing.append)
             self.assert_(len(listing) > 2)
             listing = []
-            conn.retrlines('LIST -lad f1', listing.append)
+            conn.retrlines('LIST -lad test/f1', listing.append)
             self.assertEqual(len(listing), 1)
             self.assertEqual(listing[0][0], 'd')
         finally:
@@ -232,23 +280,23 @@
 
     def testNOOP(self):
         self.assertEqual(self.execute('NOOP', 0),
-                         status_msgs['SUCCESS_200'] %'NOOP')
+                         status_messages['SUCCESS_200'] %'NOOP')
         self.assertEqual(self.execute('NOOP', 1),
-                         status_msgs['SUCCESS_200'] %'NOOP')
+                         status_messages['SUCCESS_200'] %'NOOP')
 
 
     def testPASS(self):
         self.assertEqual(self.execute('PASS', 0),
-                         status_msgs['LOGIN_MISMATCH'])
+                         status_messages['LOGIN_MISMATCH'])
         self.assertEqual(self.execute(('USER blah', 'PASS bar'), 0),
-                         status_msgs['LOGIN_MISMATCH'])
+                         status_messages['LOGIN_MISMATCH'])
 
 
     def testQUIT(self):
         self.assertEqual(self.execute('QUIT', 0),
-                         status_msgs['GOODBYE'])
+                         status_messages['GOODBYE'])
         self.assertEqual(self.execute('QUIT', 1),
-                         status_msgs['GOODBYE'])
+                         status_messages['GOODBYE'])
 
 
     def testSTOR(self):
@@ -262,7 +310,24 @@
                 ftplib.error_perm, conn.storbinary, 'STOR /test', fp)
             fp = StringIO('Charity never faileth')
             # Successful write
-            conn.storbinary('STOR /stuff', fp)
+            conn.storbinary('STOR /test/stuff', fp)
+            self.assertEqual(self.__fs.files['test']['stuff'].data,
+                             'Charity never faileth')
+        finally:
+            conn.close()
+        # Make sure no garbage was left behind.
+        self.testNOOP()
+
+
+    def testSTOR_over(self):
+        conn = ftplib.FTP()
+        try:
+            conn.connect(LOCALHOST, self.port)
+            conn.login('foo', 'bar')
+            fp = StringIO('Charity never faileth')
+            conn.storbinary('STOR /test/existing', fp)
+            self.assertEqual(self.__fs.files['test']['existing'].data,
+                             'Charity never faileth')
         finally:
             conn.close()
         # Make sure no garbage was left behind.
@@ -271,9 +336,9 @@
 
     def testUSER(self):
         self.assertEqual(self.execute('USER foo', 0),
-                         status_msgs['PASS_REQUIRED'])
+                         status_messages['PASS_REQUIRED'])
         self.assertEqual(self.execute('USER', 0),
-                         status_msgs['ERR_ARGS'])
+                         status_messages['ERR_ARGS'])
 
 
 

=== Removed File Zope3/src/zope/server/ftp/tests/test_osemulators.py ===