[Zope-Checkins] CVS: Zope3/lib/python/Zope/Server/POP3/tests - foo_mb.py:1.1.2.1 testPOP3Message.py:1.1.2.1 testPOP3MessageList.py:1.1.2.1 testPOP3Server.py:1.1.2.1

Stephan Richter srichter@cbu.edu
Sun, 7 Apr 2002 17:39:17 -0400


Update of /cvs-repository/Zope3/lib/python/Zope/Server/POP3/tests
In directory cvs.zope.org:/tmp/cvs-serv25992/POP3/tests

Added Files:
      Tag: Zope3-Server-Branch
	foo_mb.py testPOP3Message.py testPOP3MessageList.py 
	testPOP3Server.py 
Log Message:
I just finished the POP3 server implmentation. Right now it does only work
with Unix-like mailboxes, but is not limited to run on Unix I believe. It
will not make much sense to write the Zope connection until we have the 
SMTP implemented and a MailMessageService has been written. 

Please try the code and let me know what could be improved. BTW, I tested
the server with Eudora and it worked great, which included teh APOP authen-
tication mechanism!


=== Added File Zope3/lib/python/Zope/Server/POP3/tests/foo_mb.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""

$Id: foo_mb.py,v 1.1.2.1 2002/04/07 21:39:17 srichter Exp $
"""

mailbox = """
>From srichter@iuveno-net.de Sun Apr  7 11:14:50 2002
Received: from Stephan.iuveno-net.de (bohr.cbu.edu [192.168.160.8])
	by cbu.edu (8.11.6+Sun/8.11.6) with ESMTP id g37GEnM20286
	for <srichter@cbu.edu>; Sun, 7 Apr 2002 11:14:50 -0500 (CDT)
Message-Id: <5.1.0.14.2.20020407111422.00bc5fd0@imail.iuveno-net.de>
X-Sender: srichter@imail.iuveno-net.de
X-Mailer: QUALCOMM Windows Eudora Version 5.1
Date: Sun, 07 Apr 2002 11:14:41 -0500
To: srichter@cbu.edu
From: Stephan Richter <srichter@iuveno-net.de>
Subject: Test Message 1
Mime-Version: 1.0
Content-Type: text/plain; charset="us-ascii"; format=flowed
Content-Length: 114

Message 1 Body

--
Stephan Richter


>From srichter@iuveno-net.de Sun Apr  7 11:15:29 2002
Received: from Stephan.iuveno-net.de (bohr.cbu.edu [192.168.160.8])
	by cbu.edu (8.11.6+Sun/8.11.6) with ESMTP id g37GFSM20303;
	Sun, 7 Apr 2002 11:15:28 -0500 (CDT)
Message-Id: <5.1.0.14.2.20020407111444.00bc5b20@imail.iuveno-net.de>
X-Sender: srichter@imail.iuveno-net.de
X-Mailer: QUALCOMM Windows Eudora Version 5.1
Date: Sun, 07 Apr 2002 11:15:20 -0500
To: Stephan Richter <srichter@cbu.edu>
From: Stephan Richter <srichter@iuveno-net.de>
Subject: Test Message 2
Cc: Stephan Richter <srichter@cbu.edu>
Mime-Version: 1.0
Content-Type: text/plain; charset="us-ascii"; format=flowed
Content-Length: 121

One more test message

--
Stephan Richter


>From srichter@iuveno-net.de Sun Apr  7 11:16:00 2002
Received: from Stephan.iuveno-net.de (bohr.cbu.edu [192.168.160.8])
	by cbu.edu (8.11.6+Sun/8.11.6) with ESMTP id g37GFxM20326;
	Sun, 7 Apr 2002 11:16:00 -0500 (CDT)
Message-Id: <5.1.0.14.2.20020407111522.02331ac0@imail.iuveno-net.de>
X-Sender: srichter@imail.iuveno-net.de
X-Mailer: QUALCOMM Windows Eudora Version 5.1
Date: Sun, 07 Apr 2002 11:15:52 -0500
To: Stephan Richter <srichter@cbu.edu>, Stephan Richter <srichter@cbu.edu>
From: Stephan Richter <srichter@iuveno-net.de>
Subject: Test Message 3
Mime-Version: 1.0
Content-Type: text/plain; charset="us-ascii"; format=flowed
Content-Length: 119

The last one, okay?

--
Stephan Richter

"""


=== Added File Zope3/lib/python/Zope/Server/POP3/tests/testPOP3Message.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""

$Id: testPOP3Message.py,v 1.1.2.1 2002/04/07 21:39:17 srichter Exp $
"""

import unittest
import rfc822
import md5
import tempfile

from Interface.Verify import verifyClass 

from Zope.Server.POP3.IPOP3Message import IPOP3Message
from Zope.Server.POP3.POP3Message import POP3Message

import foo_mb


class Tests(unittest.TestCase):

    def setUp(self):
        fn = tempfile.mktemp()
        open(fn, 'w').write(foo_mb.mailbox[1:663])
        msg = rfc822.Message(open(fn, 'r')) 
        self.msg = POP3Message(msg)
        

    def testIsDeleted(self):
        self.assertEqual(self.msg.isDeleted(), 0)


    def testSetDeleted(self):
        self.assertEqual(self.msg.isDeleted(), 0)
        self.msg.setDeleted(1)
        self.assertEqual(self.msg.isDeleted(), 1)


    def testGetEntireMessage(self):
        self.assertEqual(self.msg.getEntireMessage(),
                         foo_mb.mailbox[1:663])


    def testGetBody(self):
        self.assertEqual(self.msg.getBody(),
                         foo_mb.mailbox[626:663])


    def testGetSize(self):
        self.assertEqual(self.msg.getSize(), 662)


    def testGetUID(self):
        hash = md5.md5(foo_mb.mailbox[626:663]).hexdigest()
        self.assertEqual(self.msg.getUID(), hash)


    def testInterface(self):
        self.failUnless(IPOP3Message.isImplementedByInstancesOf(POP3Message))
        self.failUnless(verifyClass(IPOP3Message, POP3Message))


def test_suite():
    loader = unittest.TestLoader()
    return loader.loadTestsFromTestCase(Tests)

if __name__=='__main__':
    unittest.TextTestRunner().run( test_suite() )


=== Added File Zope3/lib/python/Zope/Server/POP3/tests/testPOP3MessageList.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""

$Id: testPOP3MessageList.py,v 1.1.2.1 2002/04/07 21:39:17 srichter Exp $
"""

import unittest
import mailbox
import tempfile
import os

from Interface.Verify import verifyClass 

from Zope.Server.POP3.IPOP3MessageList import IPOP3MessageList
from Zope.Server.POP3.POP3MessageList import POP3MessageList

from Zope.Server.VFS.UnixFileSystem import UnixFileSystem
import foo_mb


class Tests(unittest.TestCase):

    def setUp(self):
        dir = tempfile.gettempdir()
        open(os.path.join(dir, 'foo'), 'w').write(foo_mb.mailbox)
        self.msg_list = POP3MessageList(UnixFileSystem(dir), 'foo')
        self.msg_list.open()
    
    def testOpen(self):
        self._messagelist = []
        self.msg_list.open()
        self.assertEqual(len(self.msg_list._messagelist), 3)

    def testExists(self):
        self.assertEqual(self.msg_list.exists(1), 1)
        self.assertEqual(self.msg_list.exists(4), 0)

    def testGetMessages(self):
        self.assertEqual(len(self.msg_list.getMessages()), 3)

    def testGetMessage(self):
        msg = self.msg_list._messagelist[0]
        self.assertEqual(self.msg_list.getMessage(1), msg)

    def testGetTotal(self):
        self.assertEqual(self.msg_list.getTotalSize(), 2062)        

    def testGetIndex(self):
        msg = self.msg_list._messagelist[0]
        self.assertEqual(self.msg_list.getIndex(msg), 1)        

    def testInterface(self):
        self.failUnless(IPOP3MessageList.isImplementedByInstancesOf(
            POP3MessageList))
        self.failUnless(verifyClass(IPOP3MessageList, POP3MessageList))


def test_suite():
    loader = unittest.TestLoader()
    return loader.loadTestsFromTestCase(Tests)

if __name__=='__main__':
    unittest.TextTestRunner().run( test_suite() )


=== Added File Zope3/lib/python/Zope/Server/POP3/tests/testPOP3Server.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""

$Id: testPOP3Server.py,v 1.1.2.1 2002/04/07 21:39:17 srichter Exp $
"""

import unittest
import tempfile
import os
from asyncore import socket_map, poll

from threading import Thread
from Zope.Server.TaskThreads import ThreadedTaskDispatcher
from Zope.Server.POP3.POP3Server import POP3Server
from Zope.Server.POP3.POP3ServerChannel import POP3ServerChannel
from Zope.Server.POP3.POP3StatusMessages import status_msgs
from Zope.Server.Adjustments import Adjustments
from Zope.Server.ITask import ITask

from Zope.Server.VFS.UnixFileSystem import UnixFileSystem
from Zope.Server.Authentication.DictionaryAuthentication import \
     DictionaryAuthentication

import poplib
import foo_mb

from time import sleep, time

td = ThreadedTaskDispatcher()

LOCALHOST = '127.0.0.1'
SERVER_PORT = 0      # Set these port numbers to 0 to auto-bind, or
CONNECT_TO_PORT = 0  # use specific numbers to inspect using TCPWatch.


my_adj = Adjustments()
# Reduce overflows to make testing easier.
my_adj.outbuf_overflow = 10000
my_adj.inbuf_overflow = 10000


class Tests(unittest.TestCase):

    def setUp(self):
        td.setThreadCount(1)
        self.orig_map_size = len(socket_map)

        root_dir = tempfile.mktemp()
        os.mkdir(root_dir)

        self.root_dir = UnixFileSystem(root_dir)

        file = open(os.path.join(root_dir, 'foo'), 'w')
        file.write(foo_mb.mailbox)
        file.close()

        self.server = POP3Server(LOCALHOST, SERVER_PORT, root_dir,
                                DictionaryAuthentication({'foo': 'bar'}),
                                task_dispatcher=td, adj=my_adj)
        if CONNECT_TO_PORT == 0:
            self.port = self.server.socket.getsockname()[1]
        else:
            self.port = CONNECT_TO_PORT
        self.run_loop = 1
        self.counter = 0
        self.thread = Thread(target=self.loop)
        self.thread.start()
        sleep(0.1)  # Give the thread some time to start.


    def tearDown(self):
        self.run_loop = 0
        self.thread.join()
        td.shutdown()
        self.server.close()
        # Make sure all sockets get closed by asyncore normally.
        # timeout = time() + 5
        # while 1:
        #     if len(socket_map) == self.orig_map_size:
        #         # Clean!
        #         break
        #     if time() >= timeout:
        #         print 'Leaked a socket: %s' % `socket_map`
        #         break
        #         #self.fail('Leaked a socket: %s' % `socket_map`)
        #     poll(0.1, socket_map)


    def loop(self):
        while self.run_loop:
            self.counter = self.counter + 1
            # print 'loop', self.counter
            poll(0.1, socket_map)


    def getPOP3Connection(self, login=1):
        # Refresh the file every single time, since some operations might
        # modify it.
        file = self.root_dir.open('foo', 'w')
        file.write(foo_mb.mailbox)
        file.close()

        pop = poplib.POP3(LOCALHOST, self.port)
        if login:
            self.assertEqual(pop.user('foo'),
                             status_msgs['OK_USER'] %'foo')
            self.assertEqual(pop.pass_('bar'),
                             status_msgs['OK_LOGIN'])
            
        return pop


    def testAPOP(self):
        pop = self.getPOP3Connection(0)
        self.assertEqual(pop.apop('foo', 'bar'),
                         status_msgs['OK_LOGIN'])
        pop.quit()
        

    def testDELE(self):
        pop = self.getPOP3Connection(1)
        self.assertEqual(pop.dele(1),
                         status_msgs['OK_DELETE'])
        pop.quit()


    def testLIST(self):
        pop = self.getPOP3Connection(1)
        self.assertEqual(pop.list(),
                         (status_msgs['OK_MSG_LIST'] %(3, 2062),
                          ['1 662', '2 703', '3 697'], 21)
                          )
        pop.quit()


    def testNOOP(self):
        pop = self.getPOP3Connection(1)
        self.assertEqual(pop.noop(),
                         status_msgs['OK_GENERAL'])
        pop.quit()


    def testPASS(self):
        pop = self.getPOP3Connection(0)
        pop.user('foo')
        self.assertEqual(pop.pass_('bar'),
                         status_msgs['OK_LOGIN'])
        pop.quit()

        pop = self.getPOP3Connection(0)
        pop.user('foo')
        try:
            pop.pass_('blah')
        except poplib.error_proto, error:
            self.assertEqual(str(error),
                             status_msgs['ERR_LOGIN_MISMATCH'])


    def testQUIT(self):
        pop = self.getPOP3Connection(1)
        self.assertEqual(pop.quit(),
                         status_msgs['OK_QUIT'])


    def testRETR(self):
        pop = self.getPOP3Connection(1)
        self.assertEqual(pop.retr(1),
                         (status_msgs['OK_RETR'] %662, [
 'From srichter@iuveno-net.de Sun Apr  7 11:14:50 2002',
 'Received: from Stephan.iuveno-net.de (bohr.cbu.edu [192.168.160.8])',
 '\tby cbu.edu (8.11.6+Sun/8.11.6) with ESMTP id g37GEnM20286',
 '\tfor <srichter@cbu.edu>; Sun, 7 Apr 2002 11:14:50 -0500 (CDT)',
 'Message-Id: <5.1.0.14.2.20020407111422.00bc5fd0@imail.iuveno-net.de>',
 'X-Sender: srichter@imail.iuveno-net.de',
 'X-Mailer: QUALCOMM Windows Eudora Version 5.1',
 'Date: Sun, 07 Apr 2002 11:14:41 -0500',
 'To: srichter@cbu.edu', 'From: Stephan Richter <srichter@iuveno-net.de>',
 'Subject: Test Message 1', 'Mime-Version: 1.0',
 'Content-Type: text/plain; charset="us-ascii"; format=flowed',
 'Content-Length: 114', '',
 'Message 1 Body', '', '--', 'Stephan Richter', '', '', ''], 664))
        try:
            pop.retr(4)
        except poplib.error_proto, error:
            self.assertEqual(str(error),
                             status_msgs['ERR_MSG_UNKNOWN'])
        pop.quit()
    
    
    def testRSET(self):
        pop = self.getPOP3Connection(1)
        pop.dele(1)
        self.assertEqual(len(pop.list()[1]), 2)
        pop.rset()
        self.assertEqual(len(pop.list()[1]), 3)
        pop.quit()
    
    
    def testSTAT(self):
        pop = self.getPOP3Connection(1)
        self.assertEqual(pop.stat(),
                         (2062, 3))
        pop.quit()
    
    
    def testTOP(self):
        pop = self.getPOP3Connection(1)
        self.assertEqual(pop.top(1, 2),
                         (status_msgs['OK_TOP'] % 2,
                          ['Message 1 Body', ''], 18))
        try:
            pop.top(4, 1)
        except poplib.error_proto, error:
            self.assertEqual(str(error),
                             status_msgs['ERR_MSG_UNKNOWN'])
        pop.quit()
    
    
    def testUIDL(self):
        pop = self.getPOP3Connection(1)
        self.assertEqual(pop.uidl(),
                         (status_msgs['OK_MSG_UIDL'],
                          ['1 f10230af2271c19b56442b4422d30244',
                           '2 f35cffda322101a0aaf354c1ca4a7840',
                           '3 059fa436bc14db93c2094c11ba224227'], 108))
        self.assertEqual(pop.uidl(1),
                         status_msgs['OK_SINGLE_UIDL'] %(
                         1, 'd41d8cd98f00b204e9800998ecf8427e'))
        try:
            pop.uidl(4)
        except poplib.error_proto, error:
            self.assertEqual(str(error),
                             status_msgs['ERR_MSG_UNKNOWN'])
        pop.quit()


    def testUSER(self):
        pop = self.getPOP3Connection(0)
        self.assertEqual(pop.user('foo'),
                         status_msgs['OK_USER'] %'foo')
        try:
            pop.user('blah')
        except poplib.error_proto, error:
            self.assertEqual(str(error),
                             status_msgs['ERR_NOT_USER'])
        pop.quit()


def test_suite():
    loader = unittest.TestLoader()
    return loader.loadTestsFromTestCase(Tests)

if __name__=='__main__':
    unittest.TextTestRunner().run( test_suite() )