[Zope3-checkins] CVS: Zope3/lib/python/Zope/App/OFS/Services/SmtpService/tests - testSmtpService.py:1.1

Barry Warsaw barry@wooz.org
Mon, 28 Oct 2002 16:05:01 -0500


Update of /cvs-repository/Zope3/lib/python/Zope/App/OFS/Services/SmtpService/tests
In directory cvs.zope.org:/tmp/cvs-serv26569

Added Files:
	testSmtpService.py 
Log Message:
A minimal test suite for SmtpService.


=== Added File Zope3/lib/python/Zope/App/OFS/Services/SmtpService/tests/testSmtpService.py ===
##############################################################################
#
# Copyright (c) 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################

"""This module tests the regular persistent SMTP Service.

$Id: testSmtpService.py,v 1.1 2002/10/28 21:05:00 bwarsaw Exp $
"""

import sys
import unittest
import smtpd
import asyncore
import threading

from Zope.App.OFS.Services.SmtpService.SmtpService import SmtpService

UNPRIV_PORT = 9225
#smtpd.DEBUGSTREAM = sys.stderr

class OneShotChannel(smtpd.SMTPChannel):
    def smtp_QUIT(self, arg):
        smtpd.SMTPChannel.smtp_QUIT(self, arg)
        raise asyncore.ExitNow


class SinkServer(smtpd.SMTPServer, threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)
        smtpd.SMTPServer.__init__(
            self ,
            ('localhost', UNPRIV_PORT), ('ignored', 25))
        self.msgtext = None

    def handle_accept(self):
        conn, addr = self.accept()
        channel = OneShotChannel(self, conn, addr)

    def process_message(self, peer, mailfrom, rcpttos, data):
        self.msgtext = data

    def run(self):
        try:
            # timeout is in milliseconds, see asyncore.py poll3()
            asyncore.loop(timeout=30.0)
            self.msgtext = None
        except asyncore.ExitNow:
            pass

    def close(self):
        smtpd.SMTPServer.close(self)


class TestSmtpService(unittest.TestCase):
    def setUp(self):
        self._service = SmtpService(smtpport=UNPRIV_PORT)
        self._reader = SinkServer()

    def tearDown(self):
        self._reader.close()

    def testSendMessage(self):
        text = """\
This is a test message.
"""
        mfrom = 'tests@example.com (Zope3 Test Suite)'
        mto = 'results@example.com (Zope3 Test Results)'
        subject = 'testSmtpService.testSendMessage'

        self._reader.start()
        self._service.sendMessage(text, mto, mfrom, subject)
        self._reader.join()
        response = self._reader.msgtext

        self.assertEqual(response, """\
subject: testSmtpService.testSendMessage

This is a test message.""")


def test_suite():
    loader = unittest.TestLoader()
    return loader.loadTestsFromTestCase(TestSmtpService)


if __name__ == '__main__':
    unittest.TextTestRunner().run(test_suite())