~ubuntu-branches/ubuntu/trusty/python3.4/trusty-proposed

« back to all changes in this revision

Viewing changes to Lib/test/test_poplib.py

  • Committer: Package Import Robot
  • Author(s): Matthias Klose
  • Date: 2013-11-25 09:44:27 UTC
  • Revision ID: package-import@ubuntu.com-20131125094427-lzxj8ap5w01lmo7f
Tags: upstream-3.4~b1
ImportĀ upstreamĀ versionĀ 3.4~b1

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
"""Test script for poplib module."""
 
2
 
 
3
# Modified by Giampaolo Rodola' to give poplib.POP3 and poplib.POP3_SSL
 
4
# a real test suite
 
5
 
 
6
import poplib
 
7
import asyncore
 
8
import asynchat
 
9
import socket
 
10
import os
 
11
import time
 
12
import errno
 
13
 
 
14
from unittest import TestCase, skipUnless
 
15
from test import support as test_support
 
16
threading = test_support.import_module('threading')
 
17
 
 
18
HOST = test_support.HOST
 
19
PORT = 0
 
20
 
 
21
SUPPORTS_SSL = False
 
22
if hasattr(poplib, 'POP3_SSL'):
 
23
    import ssl
 
24
 
 
25
    SUPPORTS_SSL = True
 
26
    CERTFILE = os.path.join(os.path.dirname(__file__) or os.curdir, "keycert.pem")
 
27
requires_ssl = skipUnless(SUPPORTS_SSL, 'SSL not supported')
 
28
 
 
29
# the dummy data returned by server when LIST and RETR commands are issued
 
30
LIST_RESP = b'1 1\r\n2 2\r\n3 3\r\n4 4\r\n5 5\r\n.\r\n'
 
31
RETR_RESP = b"""From: postmaster@python.org\
 
32
\r\nContent-Type: text/plain\r\n\
 
33
MIME-Version: 1.0\r\n\
 
34
Subject: Dummy\r\n\
 
35
\r\n\
 
36
line1\r\n\
 
37
line2\r\n\
 
38
line3\r\n\
 
39
.\r\n"""
 
40
 
 
41
 
 
42
class DummyPOP3Handler(asynchat.async_chat):
 
43
 
 
44
    CAPAS = {'UIDL': [], 'IMPLEMENTATION': ['python-testlib-pop-server']}
 
45
 
 
46
    def __init__(self, conn):
 
47
        asynchat.async_chat.__init__(self, conn)
 
48
        self.set_terminator(b"\r\n")
 
49
        self.in_buffer = []
 
50
        self.push('+OK dummy pop3 server ready. <timestamp>')
 
51
        self.tls_active = False
 
52
        self.tls_starting = False
 
53
 
 
54
    def collect_incoming_data(self, data):
 
55
        self.in_buffer.append(data)
 
56
 
 
57
    def found_terminator(self):
 
58
        line = b''.join(self.in_buffer)
 
59
        line = str(line, 'ISO-8859-1')
 
60
        self.in_buffer = []
 
61
        cmd = line.split(' ')[0].lower()
 
62
        space = line.find(' ')
 
63
        if space != -1:
 
64
            arg = line[space + 1:]
 
65
        else:
 
66
            arg = ""
 
67
        if hasattr(self, 'cmd_' + cmd):
 
68
            method = getattr(self, 'cmd_' + cmd)
 
69
            method(arg)
 
70
        else:
 
71
            self.push('-ERR unrecognized POP3 command "%s".' %cmd)
 
72
 
 
73
    def handle_error(self):
 
74
        raise
 
75
 
 
76
    def push(self, data):
 
77
        asynchat.async_chat.push(self, data.encode("ISO-8859-1") + b'\r\n')
 
78
 
 
79
    def cmd_echo(self, arg):
 
80
        # sends back the received string (used by the test suite)
 
81
        self.push(arg)
 
82
 
 
83
    def cmd_user(self, arg):
 
84
        if arg != "guido":
 
85
            self.push("-ERR no such user")
 
86
        self.push('+OK password required')
 
87
 
 
88
    def cmd_pass(self, arg):
 
89
        if arg != "python":
 
90
            self.push("-ERR wrong password")
 
91
        self.push('+OK 10 messages')
 
92
 
 
93
    def cmd_stat(self, arg):
 
94
        self.push('+OK 10 100')
 
95
 
 
96
    def cmd_list(self, arg):
 
97
        if arg:
 
98
            self.push('+OK %s %s' % (arg, arg))
 
99
        else:
 
100
            self.push('+OK')
 
101
            asynchat.async_chat.push(self, LIST_RESP)
 
102
 
 
103
    cmd_uidl = cmd_list
 
104
 
 
105
    def cmd_retr(self, arg):
 
106
        self.push('+OK %s bytes' %len(RETR_RESP))
 
107
        asynchat.async_chat.push(self, RETR_RESP)
 
108
 
 
109
    cmd_top = cmd_retr
 
110
 
 
111
    def cmd_dele(self, arg):
 
112
        self.push('+OK message marked for deletion.')
 
113
 
 
114
    def cmd_noop(self, arg):
 
115
        self.push('+OK done nothing.')
 
116
 
 
117
    def cmd_rpop(self, arg):
 
118
        self.push('+OK done nothing.')
 
119
 
 
120
    def cmd_apop(self, arg):
 
121
        self.push('+OK done nothing.')
 
122
 
 
123
    def cmd_quit(self, arg):
 
124
        self.push('+OK closing.')
 
125
        self.close_when_done()
 
126
 
 
127
    def _get_capas(self):
 
128
        _capas = dict(self.CAPAS)
 
129
        if not self.tls_active and SUPPORTS_SSL:
 
130
            _capas['STLS'] = []
 
131
        return _capas
 
132
 
 
133
    def cmd_capa(self, arg):
 
134
        self.push('+OK Capability list follows')
 
135
        if self._get_capas():
 
136
            for cap, params in self._get_capas().items():
 
137
                _ln = [cap]
 
138
                if params:
 
139
                    _ln.extend(params)
 
140
                self.push(' '.join(_ln))
 
141
        self.push('.')
 
142
 
 
143
    if SUPPORTS_SSL:
 
144
 
 
145
        def cmd_stls(self, arg):
 
146
            if self.tls_active is False:
 
147
                self.push('+OK Begin TLS negotiation')
 
148
                tls_sock = ssl.wrap_socket(self.socket, certfile=CERTFILE,
 
149
                                           server_side=True,
 
150
                                           do_handshake_on_connect=False,
 
151
                                           suppress_ragged_eofs=False)
 
152
                self.del_channel()
 
153
                self.set_socket(tls_sock)
 
154
                self.tls_active = True
 
155
                self.tls_starting = True
 
156
                self.in_buffer = []
 
157
                self._do_tls_handshake()
 
158
            else:
 
159
                self.push('-ERR Command not permitted when TLS active')
 
160
 
 
161
        def _do_tls_handshake(self):
 
162
            try:
 
163
                self.socket.do_handshake()
 
164
            except ssl.SSLError as err:
 
165
                if err.args[0] in (ssl.SSL_ERROR_WANT_READ,
 
166
                                   ssl.SSL_ERROR_WANT_WRITE):
 
167
                    return
 
168
                elif err.args[0] == ssl.SSL_ERROR_EOF:
 
169
                    return self.handle_close()
 
170
                raise
 
171
            except OSError as err:
 
172
                if err.args[0] == errno.ECONNABORTED:
 
173
                    return self.handle_close()
 
174
            else:
 
175
                self.tls_active = True
 
176
                self.tls_starting = False
 
177
 
 
178
        def handle_read(self):
 
179
            if self.tls_starting:
 
180
                self._do_tls_handshake()
 
181
            else:
 
182
                try:
 
183
                    asynchat.async_chat.handle_read(self)
 
184
                except ssl.SSLEOFError:
 
185
                    self.handle_close()
 
186
 
 
187
class DummyPOP3Server(asyncore.dispatcher, threading.Thread):
 
188
 
 
189
    handler = DummyPOP3Handler
 
190
 
 
191
    def __init__(self, address, af=socket.AF_INET):
 
192
        threading.Thread.__init__(self)
 
193
        asyncore.dispatcher.__init__(self)
 
194
        self.create_socket(af, socket.SOCK_STREAM)
 
195
        self.bind(address)
 
196
        self.listen(5)
 
197
        self.active = False
 
198
        self.active_lock = threading.Lock()
 
199
        self.host, self.port = self.socket.getsockname()[:2]
 
200
        self.handler_instance = None
 
201
 
 
202
    def start(self):
 
203
        assert not self.active
 
204
        self.__flag = threading.Event()
 
205
        threading.Thread.start(self)
 
206
        self.__flag.wait()
 
207
 
 
208
    def run(self):
 
209
        self.active = True
 
210
        self.__flag.set()
 
211
        while self.active and asyncore.socket_map:
 
212
            self.active_lock.acquire()
 
213
            asyncore.loop(timeout=0.1, count=1)
 
214
            self.active_lock.release()
 
215
        asyncore.close_all(ignore_all=True)
 
216
 
 
217
    def stop(self):
 
218
        assert self.active
 
219
        self.active = False
 
220
        self.join()
 
221
 
 
222
    def handle_accepted(self, conn, addr):
 
223
        self.handler_instance = self.handler(conn)
 
224
 
 
225
    def handle_connect(self):
 
226
        self.close()
 
227
    handle_read = handle_connect
 
228
 
 
229
    def writable(self):
 
230
        return 0
 
231
 
 
232
    def handle_error(self):
 
233
        raise
 
234
 
 
235
 
 
236
class TestPOP3Class(TestCase):
 
237
    def assertOK(self, resp):
 
238
        self.assertTrue(resp.startswith(b"+OK"))
 
239
 
 
240
    def setUp(self):
 
241
        self.server = DummyPOP3Server((HOST, PORT))
 
242
        self.server.start()
 
243
        self.client = poplib.POP3(self.server.host, self.server.port, timeout=3)
 
244
 
 
245
    def tearDown(self):
 
246
        self.client.close()
 
247
        self.server.stop()
 
248
 
 
249
    def test_getwelcome(self):
 
250
        self.assertEqual(self.client.getwelcome(),
 
251
                         b'+OK dummy pop3 server ready. <timestamp>')
 
252
 
 
253
    def test_exceptions(self):
 
254
        self.assertRaises(poplib.error_proto, self.client._shortcmd, 'echo -err')
 
255
 
 
256
    def test_user(self):
 
257
        self.assertOK(self.client.user('guido'))
 
258
        self.assertRaises(poplib.error_proto, self.client.user, 'invalid')
 
259
 
 
260
    def test_pass_(self):
 
261
        self.assertOK(self.client.pass_('python'))
 
262
        self.assertRaises(poplib.error_proto, self.client.user, 'invalid')
 
263
 
 
264
    def test_stat(self):
 
265
        self.assertEqual(self.client.stat(), (10, 100))
 
266
 
 
267
    def test_list(self):
 
268
        self.assertEqual(self.client.list()[1:],
 
269
                         ([b'1 1', b'2 2', b'3 3', b'4 4', b'5 5'],
 
270
                          25))
 
271
        self.assertTrue(self.client.list('1').endswith(b"OK 1 1"))
 
272
 
 
273
    def test_retr(self):
 
274
        expected = (b'+OK 116 bytes',
 
275
                    [b'From: postmaster@python.org', b'Content-Type: text/plain',
 
276
                     b'MIME-Version: 1.0', b'Subject: Dummy',
 
277
                     b'', b'line1', b'line2', b'line3'],
 
278
                    113)
 
279
        foo = self.client.retr('foo')
 
280
        self.assertEqual(foo, expected)
 
281
 
 
282
    def test_too_long_lines(self):
 
283
        self.assertRaises(poplib.error_proto, self.client._shortcmd,
 
284
                          'echo +%s' % ((poplib._MAXLINE + 10) * 'a'))
 
285
 
 
286
    def test_dele(self):
 
287
        self.assertOK(self.client.dele('foo'))
 
288
 
 
289
    def test_noop(self):
 
290
        self.assertOK(self.client.noop())
 
291
 
 
292
    def test_rpop(self):
 
293
        self.assertOK(self.client.rpop('foo'))
 
294
 
 
295
    def test_apop(self):
 
296
        self.assertOK(self.client.apop('foo', 'dummypassword'))
 
297
 
 
298
    def test_top(self):
 
299
        expected =  (b'+OK 116 bytes',
 
300
                     [b'From: postmaster@python.org', b'Content-Type: text/plain',
 
301
                      b'MIME-Version: 1.0', b'Subject: Dummy', b'',
 
302
                      b'line1', b'line2', b'line3'],
 
303
                     113)
 
304
        self.assertEqual(self.client.top(1, 1), expected)
 
305
 
 
306
    def test_uidl(self):
 
307
        self.client.uidl()
 
308
        self.client.uidl('foo')
 
309
 
 
310
    def test_capa(self):
 
311
        capa = self.client.capa()
 
312
        self.assertTrue('IMPLEMENTATION' in capa.keys())
 
313
 
 
314
    def test_quit(self):
 
315
        resp = self.client.quit()
 
316
        self.assertTrue(resp)
 
317
        self.assertIsNone(self.client.sock)
 
318
        self.assertIsNone(self.client.file)
 
319
 
 
320
    @requires_ssl
 
321
    def test_stls_capa(self):
 
322
        capa = self.client.capa()
 
323
        self.assertTrue('STLS' in capa.keys())
 
324
 
 
325
    @requires_ssl
 
326
    def test_stls(self):
 
327
        expected = b'+OK Begin TLS negotiation'
 
328
        resp = self.client.stls()
 
329
        self.assertEqual(resp, expected)
 
330
 
 
331
    @requires_ssl
 
332
    def test_stls_context(self):
 
333
        expected = b'+OK Begin TLS negotiation'
 
334
        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
 
335
        resp = self.client.stls(context=ctx)
 
336
        self.assertEqual(resp, expected)
 
337
 
 
338
 
 
339
if SUPPORTS_SSL:
 
340
 
 
341
    class DummyPOP3_SSLHandler(DummyPOP3Handler):
 
342
 
 
343
        def __init__(self, conn):
 
344
            asynchat.async_chat.__init__(self, conn)
 
345
            ssl_socket = ssl.wrap_socket(self.socket, certfile=CERTFILE,
 
346
                                          server_side=True,
 
347
                                          do_handshake_on_connect=False)
 
348
            self.del_channel()
 
349
            self.set_socket(ssl_socket)
 
350
            # Must try handshake before calling push()
 
351
            self.tls_active = True
 
352
            self.tls_starting = True
 
353
            self._do_tls_handshake()
 
354
            self.set_terminator(b"\r\n")
 
355
            self.in_buffer = []
 
356
            self.push('+OK dummy pop3 server ready. <timestamp>')
 
357
 
 
358
 
 
359
@requires_ssl
 
360
class TestPOP3_SSLClass(TestPOP3Class):
 
361
    # repeat previous tests by using poplib.POP3_SSL
 
362
 
 
363
    def setUp(self):
 
364
        self.server = DummyPOP3Server((HOST, PORT))
 
365
        self.server.handler = DummyPOP3_SSLHandler
 
366
        self.server.start()
 
367
        self.client = poplib.POP3_SSL(self.server.host, self.server.port)
 
368
 
 
369
    def test__all__(self):
 
370
        self.assertIn('POP3_SSL', poplib.__all__)
 
371
 
 
372
    def test_context(self):
 
373
        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
 
374
        self.assertRaises(ValueError, poplib.POP3_SSL, self.server.host,
 
375
                            self.server.port, keyfile=CERTFILE, context=ctx)
 
376
        self.assertRaises(ValueError, poplib.POP3_SSL, self.server.host,
 
377
                            self.server.port, certfile=CERTFILE, context=ctx)
 
378
        self.assertRaises(ValueError, poplib.POP3_SSL, self.server.host,
 
379
                            self.server.port, keyfile=CERTFILE,
 
380
                            certfile=CERTFILE, context=ctx)
 
381
 
 
382
        self.client.quit()
 
383
        self.client = poplib.POP3_SSL(self.server.host, self.server.port,
 
384
                                        context=ctx)
 
385
        self.assertIsInstance(self.client.sock, ssl.SSLSocket)
 
386
        self.assertIs(self.client.sock.context, ctx)
 
387
        self.assertTrue(self.client.noop().startswith(b'+OK'))
 
388
 
 
389
    def test_stls(self):
 
390
        self.assertRaises(poplib.error_proto, self.client.stls)
 
391
 
 
392
    test_stls_context = test_stls
 
393
 
 
394
    def test_stls_capa(self):
 
395
        capa = self.client.capa()
 
396
        self.assertFalse('STLS' in capa.keys())
 
397
 
 
398
 
 
399
@requires_ssl
 
400
class TestPOP3_TLSClass(TestPOP3Class):
 
401
    # repeat previous tests by using poplib.POP3.stls()
 
402
 
 
403
    def setUp(self):
 
404
        self.server = DummyPOP3Server((HOST, PORT))
 
405
        self.server.start()
 
406
        self.client = poplib.POP3(self.server.host, self.server.port, timeout=3)
 
407
        self.client.stls()
 
408
 
 
409
    def tearDown(self):
 
410
        if self.client.file is not None and self.client.sock is not None:
 
411
            try:
 
412
                self.client.quit()
 
413
            except poplib.error_proto:
 
414
                # happens in the test_too_long_lines case; the overlong
 
415
                # response will be treated as response to QUIT and raise
 
416
                # this exception
 
417
                pass
 
418
        self.server.stop()
 
419
 
 
420
    def test_stls(self):
 
421
        self.assertRaises(poplib.error_proto, self.client.stls)
 
422
 
 
423
    test_stls_context = test_stls
 
424
 
 
425
    def test_stls_capa(self):
 
426
        capa = self.client.capa()
 
427
        self.assertFalse(b'STLS' in capa.keys())
 
428
 
 
429
 
 
430
class TestTimeouts(TestCase):
 
431
 
 
432
    def setUp(self):
 
433
        self.evt = threading.Event()
 
434
        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 
435
        self.sock.settimeout(60)  # Safety net. Look issue 11812
 
436
        self.port = test_support.bind_port(self.sock)
 
437
        self.thread = threading.Thread(target=self.server, args=(self.evt,self.sock))
 
438
        self.thread.setDaemon(True)
 
439
        self.thread.start()
 
440
        self.evt.wait()
 
441
 
 
442
    def tearDown(self):
 
443
        self.thread.join()
 
444
        del self.thread  # Clear out any dangling Thread objects.
 
445
 
 
446
    def server(self, evt, serv):
 
447
        serv.listen(5)
 
448
        evt.set()
 
449
        try:
 
450
            conn, addr = serv.accept()
 
451
            conn.send(b"+ Hola mundo\n")
 
452
            conn.close()
 
453
        except socket.timeout:
 
454
            pass
 
455
        finally:
 
456
            serv.close()
 
457
 
 
458
    def testTimeoutDefault(self):
 
459
        self.assertTrue(socket.getdefaulttimeout() is None)
 
460
        socket.setdefaulttimeout(30)
 
461
        try:
 
462
            pop = poplib.POP3(HOST, self.port)
 
463
        finally:
 
464
            socket.setdefaulttimeout(None)
 
465
        self.assertEqual(pop.sock.gettimeout(), 30)
 
466
        pop.sock.close()
 
467
 
 
468
    def testTimeoutNone(self):
 
469
        self.assertTrue(socket.getdefaulttimeout() is None)
 
470
        socket.setdefaulttimeout(30)
 
471
        try:
 
472
            pop = poplib.POP3(HOST, self.port, timeout=None)
 
473
        finally:
 
474
            socket.setdefaulttimeout(None)
 
475
        self.assertTrue(pop.sock.gettimeout() is None)
 
476
        pop.sock.close()
 
477
 
 
478
    def testTimeoutValue(self):
 
479
        pop = poplib.POP3(HOST, self.port, timeout=30)
 
480
        self.assertEqual(pop.sock.gettimeout(), 30)
 
481
        pop.sock.close()
 
482
 
 
483
 
 
484
def test_main():
 
485
    tests = [TestPOP3Class, TestTimeouts,
 
486
             TestPOP3_SSLClass, TestPOP3_TLSClass]
 
487
    thread_info = test_support.threading_setup()
 
488
    try:
 
489
        test_support.run_unittest(*tests)
 
490
    finally:
 
491
        test_support.threading_cleanup(*thread_info)
 
492
 
 
493
 
 
494
if __name__ == '__main__':
 
495
    test_main()