~ubuntu-branches/ubuntu/lucid/python2.6/lucid

« back to all changes in this revision

Viewing changes to Lib/test/test_imaplib.py

  • Committer: Bazaar Package Importer
  • Author(s): Matthias Klose
  • Date: 2010-03-11 13:30:19 UTC
  • mto: (10.1.13 sid)
  • mto: This revision was merged to the branch mainline in revision 44.
  • Revision ID: james.westby@ubuntu.com-20100311133019-sblbooa3uqrkoe70
Tags: upstream-2.6.5~rc2
ImportĀ upstreamĀ versionĀ 2.6.5~rc2

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
from test import test_support as support
 
2
# If we end up with a significant number of tests that don't require
 
3
# threading, this test module should be split.  Right now we skip
 
4
# them all if we don't have threading.
 
5
threading = support.import_module('threading')
 
6
 
 
7
from contextlib import contextmanager
1
8
import imaplib
 
9
import os.path
 
10
import select
 
11
import socket
 
12
import SocketServer
 
13
import sys
2
14
import time
3
15
 
4
 
from test import test_support
 
16
from test_support import verbose
5
17
import unittest
6
18
 
 
19
try:
 
20
    import ssl
 
21
except ImportError:
 
22
    ssl = None
 
23
 
 
24
CERTFILE = None
 
25
 
7
26
 
8
27
class TestImaplib(unittest.TestCase):
 
28
 
9
29
    def test_that_Time2Internaldate_returns_a_result(self):
10
30
        # We can check only that it successfully produces a result,
11
31
        # not the correctness of the result itself, since the result
17
37
            imaplib.Time2Internaldate(t)
18
38
 
19
39
 
 
40
if ssl:
 
41
 
 
42
    class SecureTCPServer(SocketServer.TCPServer):
 
43
 
 
44
        def get_request(self):
 
45
            newsocket, fromaddr = self.socket.accept()
 
46
            connstream = ssl.wrap_socket(newsocket,
 
47
                                         server_side=True,
 
48
                                         certfile=CERTFILE)
 
49
            return connstream, fromaddr
 
50
 
 
51
    IMAP4_SSL = imaplib.IMAP4_SSL
 
52
 
 
53
else:
 
54
 
 
55
    class SecureTCPServer:
 
56
        pass
 
57
 
 
58
    IMAP4_SSL = None
 
59
 
 
60
class TimeoutStreamRequestHandler(SocketServer.StreamRequestHandler):
 
61
 
 
62
    timeout = 1
 
63
 
 
64
    def setup(self):
 
65
        self.connection = self.request
 
66
        if self.timeout is not None:
 
67
            self.connection.settimeout(self.timeout)
 
68
        self.rfile = self.connection.makefile('rb', self.rbufsize)
 
69
        self.wfile = self.connection.makefile('wb', self.wbufsize)
 
70
 
 
71
 
 
72
class SimpleIMAPHandler(TimeoutStreamRequestHandler):
 
73
 
 
74
    def _send(self, message):
 
75
        if verbose: print "SENT:", message.strip()
 
76
        self.wfile.write(message)
 
77
 
 
78
    def handle(self):
 
79
        # Send a welcome message.
 
80
        self._send('* OK IMAP4rev1\r\n')
 
81
        while 1:
 
82
            # Gather up input until we receive a line terminator or we timeout.
 
83
            # Accumulate read(1) because it's simpler to handle the differences
 
84
            # between naked sockets and SSL sockets.
 
85
            line = ''
 
86
            while 1:
 
87
                try:
 
88
                    part = self.rfile.read(1)
 
89
                    if part == '':
 
90
                        # Naked sockets return empty strings..
 
91
                        return
 
92
                    line += part
 
93
                except IOError:
 
94
                    # ..but SSLSockets throw exceptions.
 
95
                    return
 
96
                if line.endswith('\r\n'):
 
97
                    break
 
98
 
 
99
            if verbose: print 'GOT:', line.strip()
 
100
            splitline = line.split()
 
101
            tag = splitline[0]
 
102
            cmd = splitline[1]
 
103
            args = splitline[2:]
 
104
 
 
105
            if hasattr(self, 'cmd_%s' % (cmd,)):
 
106
                getattr(self, 'cmd_%s' % (cmd,))(tag, args)
 
107
            else:
 
108
                self._send('%s BAD %s unknown\r\n' % (tag, cmd))
 
109
 
 
110
    def cmd_CAPABILITY(self, tag, args):
 
111
        self._send('* CAPABILITY IMAP4rev1\r\n')
 
112
        self._send('%s OK CAPABILITY completed\r\n' % (tag,))
 
113
 
 
114
class BaseThreadedNetworkedTests(unittest.TestCase):
 
115
 
 
116
    def make_server(self, addr, hdlr):
 
117
 
 
118
        class MyServer(self.server_class):
 
119
            def handle_error(self, request, client_address):
 
120
                self.close_request(request)
 
121
                self.server_close()
 
122
                raise
 
123
 
 
124
        if verbose: print "creating server"
 
125
        server = MyServer(addr, hdlr)
 
126
        self.assertEquals(server.server_address, server.socket.getsockname())
 
127
 
 
128
        if verbose:
 
129
            print "server created"
 
130
            print "ADDR =", addr
 
131
            print "CLASS =", self.server_class
 
132
            print "HDLR =", server.RequestHandlerClass
 
133
 
 
134
        t = threading.Thread(
 
135
            name='%s serving' % self.server_class,
 
136
            target=server.serve_forever,
 
137
            # Short poll interval to make the test finish quickly.
 
138
            # Time between requests is short enough that we won't wake
 
139
            # up spuriously too many times.
 
140
            kwargs={'poll_interval':0.01})
 
141
        t.daemon = True  # In case this function raises.
 
142
        t.start()
 
143
        if verbose: print "server running"
 
144
        return server, t
 
145
 
 
146
    def reap_server(self, server, thread):
 
147
        if verbose: print "waiting for server"
 
148
        server.shutdown()
 
149
        thread.join()
 
150
        if verbose: print "done"
 
151
 
 
152
    @contextmanager
 
153
    def reaped_server(self, hdlr):
 
154
        server, thread = self.make_server((support.HOST, 0), hdlr)
 
155
        try:
 
156
            yield server
 
157
        finally:
 
158
            self.reap_server(server, thread)
 
159
 
 
160
    def test_connect(self):
 
161
        with self.reaped_server(SimpleIMAPHandler) as server:
 
162
            client = self.imap_class(*server.server_address)
 
163
            client.shutdown()
 
164
 
 
165
    def test_issue5949(self):
 
166
 
 
167
        class EOFHandler(TimeoutStreamRequestHandler):
 
168
            def handle(self):
 
169
                # EOF without sending a complete welcome message.
 
170
                self.wfile.write('* OK')
 
171
                # explicitly shutdown.  socket.close() merely releases
 
172
                # the socket and waits for GC to perform the actual close.
 
173
                self.request.shutdown(socket.SHUT_WR)
 
174
 
 
175
        with self.reaped_server(EOFHandler) as server:
 
176
            self.assertRaises(imaplib.IMAP4.abort,
 
177
                              self.imap_class, *server.server_address)
 
178
 
 
179
class ThreadedNetworkedTests(BaseThreadedNetworkedTests):
 
180
 
 
181
    server_class = SocketServer.TCPServer
 
182
    imap_class = imaplib.IMAP4
 
183
 
 
184
 
 
185
class ThreadedNetworkedTestsSSL(BaseThreadedNetworkedTests):
 
186
 
 
187
    server_class = SecureTCPServer
 
188
    imap_class = IMAP4_SSL
 
189
 
 
190
 
20
191
def test_main():
21
 
    test_support.run_unittest(TestImaplib)
22
 
 
 
192
    tests = [TestImaplib]
 
193
 
 
194
    if support.is_resource_enabled('network'):
 
195
        if ssl:
 
196
            global CERTFILE
 
197
            CERTFILE = os.path.join(os.path.dirname(__file__) or os.curdir,
 
198
                                    "keycert.pem")
 
199
            if not os.path.exists(CERTFILE):
 
200
                raise support.TestFailed("Can't read certificate files!")
 
201
            tests.append(ThreadedNetworkedTestsSSL)
 
202
        tests.append(ThreadedNetworkedTests)
 
203
 
 
204
    threadinfo = support.threading_setup()
 
205
 
 
206
    support.run_unittest(*tests)
 
207
 
 
208
    support.threading_cleanup(*threadinfo)
23
209
 
24
210
if __name__ == "__main__":
25
 
    unittest.main()
 
211
    support.use_resources = ['network']
 
212
    test_main()