~mailman-coders/mailman/2.1

1779 by Mark Sapiro
Bump copyright dates.
1
# Copyright (C) 2001-2018 by the Free Software Foundation, Inc.
1 by
This commit was manufactured by cvs2svn to create branch
2
#
3
# This program is free software; you can redistribute it and/or
4
# modify it under the terms of the GNU General Public License
5
# as published by the Free Software Foundation; either version 2
6
# of the License, or (at your option) any later version.
7
#
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
# GNU General Public License for more details.
12
#
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
749 by tkikuchi
FSF office has moved to 51 Franklin Street.
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
1 by
This commit was manufactured by cvs2svn to create branch
16
17
"""Base class for tests that email things.
18
"""
19
20
import socket
21
import asyncore
22
import smtpd
23
24
from Mailman import mm_cfg
25
26
from TestBase import TestBase
27
28
29

30
MSGTEXT = None
31
32
class OneShotChannel(smtpd.SMTPChannel):
33
    def smtp_QUIT(self, arg):
34
        smtpd.SMTPChannel.smtp_QUIT(self, arg)
35
        raise asyncore.ExitNow
36
37
38
class SinkServer(smtpd.SMTPServer):
39
    def handle_accept(self):
40
        conn, addr = self.accept()
41
        channel = OneShotChannel(self, conn, addr)
42
43
    def process_message(self, peer, mailfrom, rcpttos, data):
44
        global MSGTEXT
45
        MSGTEXT = data
46
47
48

49
class EmailBase(TestBase):
50
    def setUp(self):
51
        TestBase.setUp(self)
1249 by Mark Sapiro
Updated unit tests for current Mailman so all tests should pass.
52
        if mm_cfg.SMTPPORT == 0:
53
            mm_cfg.SMTPPORT = 25
1 by
This commit was manufactured by cvs2svn to create branch
54
        # Second argument tuple is ignored.
55
        self._server = SinkServer(('localhost', mm_cfg.SMTPPORT),
56
                                  ('localhost', 25))
57
58
    def tearDown(self):
59
        self._server.close()
60
        TestBase.tearDown(self)
61
62
    def _readmsg(self):
63
        global MSGTEXT
64
        # Save and unlock the list so that the qrunner process can open it and
65
        # lock it if necessary.  We'll re-lock the list in our finally clause
66
        # since that if an invariant of the test harness.
67
        self._mlist.Unlock()
68
        try:
69
            try:
70
                # timeout is in milliseconds, see asyncore.py poll3()
71
                asyncore.loop(timeout=30.0)
72
                MSGTEXT = None
73
            except asyncore.ExitNow:
74
                pass
75
            return MSGTEXT
76
        finally:
77
            self._mlist.Lock()