1
"""Test script for poplib module."""
3
# Modified by Giampaolo Rodola' to give poplib.POP3 and poplib.POP3_SSL
14
from unittest import TestCase, skipUnless
15
from test import support as test_support
16
threading = test_support.import_module('threading')
18
HOST = test_support.HOST
22
if hasattr(poplib, 'POP3_SSL'):
26
CERTFILE = os.path.join(os.path.dirname(__file__) or os.curdir, "keycert.pem")
27
requires_ssl = skipUnless(SUPPORTS_SSL, 'SSL not supported')
29
# the dummy data returned by server when LIST and RETR commands are issued
30
LIST_RESP = b'1 1\r\n2 2\r\n3 3\r\n4 4\r\n5 5\r\n.\r\n'
31
RETR_RESP = b"""From: postmaster@python.org\
32
\r\nContent-Type: text/plain\r\n\
33
MIME-Version: 1.0\r\n\
42
class DummyPOP3Handler(asynchat.async_chat):
44
CAPAS = {'UIDL': [], 'IMPLEMENTATION': ['python-testlib-pop-server']}
46
def __init__(self, conn):
47
asynchat.async_chat.__init__(self, conn)
48
self.set_terminator(b"\r\n")
50
self.push('+OK dummy pop3 server ready. <timestamp>')
51
self.tls_active = False
52
self.tls_starting = False
54
def collect_incoming_data(self, data):
55
self.in_buffer.append(data)
57
def found_terminator(self):
58
line = b''.join(self.in_buffer)
59
line = str(line, 'ISO-8859-1')
61
cmd = line.split(' ')[0].lower()
62
space = line.find(' ')
64
arg = line[space + 1:]
67
if hasattr(self, 'cmd_' + cmd):
68
method = getattr(self, 'cmd_' + cmd)
71
self.push('-ERR unrecognized POP3 command "%s".' %cmd)
73
def handle_error(self):
77
asynchat.async_chat.push(self, data.encode("ISO-8859-1") + b'\r\n')
79
def cmd_echo(self, arg):
80
# sends back the received string (used by the test suite)
83
def cmd_user(self, arg):
85
self.push("-ERR no such user")
86
self.push('+OK password required')
88
def cmd_pass(self, arg):
90
self.push("-ERR wrong password")
91
self.push('+OK 10 messages')
93
def cmd_stat(self, arg):
94
self.push('+OK 10 100')
96
def cmd_list(self, arg):
98
self.push('+OK %s %s' % (arg, arg))
101
asynchat.async_chat.push(self, LIST_RESP)
105
def cmd_retr(self, arg):
106
self.push('+OK %s bytes' %len(RETR_RESP))
107
asynchat.async_chat.push(self, RETR_RESP)
111
def cmd_dele(self, arg):
112
self.push('+OK message marked for deletion.')
114
def cmd_noop(self, arg):
115
self.push('+OK done nothing.')
117
def cmd_rpop(self, arg):
118
self.push('+OK done nothing.')
120
def cmd_apop(self, arg):
121
self.push('+OK done nothing.')
123
def cmd_quit(self, arg):
124
self.push('+OK closing.')
125
self.close_when_done()
127
def _get_capas(self):
128
_capas = dict(self.CAPAS)
129
if not self.tls_active and SUPPORTS_SSL:
133
def cmd_capa(self, arg):
134
self.push('+OK Capability list follows')
135
if self._get_capas():
136
for cap, params in self._get_capas().items():
140
self.push(' '.join(_ln))
145
def cmd_stls(self, arg):
146
if self.tls_active is False:
147
self.push('+OK Begin TLS negotiation')
148
tls_sock = ssl.wrap_socket(self.socket, certfile=CERTFILE,
150
do_handshake_on_connect=False,
151
suppress_ragged_eofs=False)
153
self.set_socket(tls_sock)
154
self.tls_active = True
155
self.tls_starting = True
157
self._do_tls_handshake()
159
self.push('-ERR Command not permitted when TLS active')
161
def _do_tls_handshake(self):
163
self.socket.do_handshake()
164
except ssl.SSLError as err:
165
if err.args[0] in (ssl.SSL_ERROR_WANT_READ,
166
ssl.SSL_ERROR_WANT_WRITE):
168
elif err.args[0] == ssl.SSL_ERROR_EOF:
169
return self.handle_close()
171
except OSError as err:
172
if err.args[0] == errno.ECONNABORTED:
173
return self.handle_close()
175
self.tls_active = True
176
self.tls_starting = False
178
def handle_read(self):
179
if self.tls_starting:
180
self._do_tls_handshake()
183
asynchat.async_chat.handle_read(self)
184
except ssl.SSLEOFError:
187
class DummyPOP3Server(asyncore.dispatcher, threading.Thread):
189
handler = DummyPOP3Handler
191
def __init__(self, address, af=socket.AF_INET):
192
threading.Thread.__init__(self)
193
asyncore.dispatcher.__init__(self)
194
self.create_socket(af, socket.SOCK_STREAM)
198
self.active_lock = threading.Lock()
199
self.host, self.port = self.socket.getsockname()[:2]
200
self.handler_instance = None
203
assert not self.active
204
self.__flag = threading.Event()
205
threading.Thread.start(self)
211
while self.active and asyncore.socket_map:
212
self.active_lock.acquire()
213
asyncore.loop(timeout=0.1, count=1)
214
self.active_lock.release()
215
asyncore.close_all(ignore_all=True)
222
def handle_accepted(self, conn, addr):
223
self.handler_instance = self.handler(conn)
225
def handle_connect(self):
227
handle_read = handle_connect
232
def handle_error(self):
236
class TestPOP3Class(TestCase):
237
def assertOK(self, resp):
238
self.assertTrue(resp.startswith(b"+OK"))
241
self.server = DummyPOP3Server((HOST, PORT))
243
self.client = poplib.POP3(self.server.host, self.server.port, timeout=3)
249
def test_getwelcome(self):
250
self.assertEqual(self.client.getwelcome(),
251
b'+OK dummy pop3 server ready. <timestamp>')
253
def test_exceptions(self):
254
self.assertRaises(poplib.error_proto, self.client._shortcmd, 'echo -err')
257
self.assertOK(self.client.user('guido'))
258
self.assertRaises(poplib.error_proto, self.client.user, 'invalid')
260
def test_pass_(self):
261
self.assertOK(self.client.pass_('python'))
262
self.assertRaises(poplib.error_proto, self.client.user, 'invalid')
265
self.assertEqual(self.client.stat(), (10, 100))
268
self.assertEqual(self.client.list()[1:],
269
([b'1 1', b'2 2', b'3 3', b'4 4', b'5 5'],
271
self.assertTrue(self.client.list('1').endswith(b"OK 1 1"))
274
expected = (b'+OK 116 bytes',
275
[b'From: postmaster@python.org', b'Content-Type: text/plain',
276
b'MIME-Version: 1.0', b'Subject: Dummy',
277
b'', b'line1', b'line2', b'line3'],
279
foo = self.client.retr('foo')
280
self.assertEqual(foo, expected)
282
def test_too_long_lines(self):
283
self.assertRaises(poplib.error_proto, self.client._shortcmd,
284
'echo +%s' % ((poplib._MAXLINE + 10) * 'a'))
287
self.assertOK(self.client.dele('foo'))
290
self.assertOK(self.client.noop())
293
self.assertOK(self.client.rpop('foo'))
296
self.assertOK(self.client.apop('foo', 'dummypassword'))
299
expected = (b'+OK 116 bytes',
300
[b'From: postmaster@python.org', b'Content-Type: text/plain',
301
b'MIME-Version: 1.0', b'Subject: Dummy', b'',
302
b'line1', b'line2', b'line3'],
304
self.assertEqual(self.client.top(1, 1), expected)
308
self.client.uidl('foo')
311
capa = self.client.capa()
312
self.assertTrue('IMPLEMENTATION' in capa.keys())
315
resp = self.client.quit()
316
self.assertTrue(resp)
317
self.assertIsNone(self.client.sock)
318
self.assertIsNone(self.client.file)
321
def test_stls_capa(self):
322
capa = self.client.capa()
323
self.assertTrue('STLS' in capa.keys())
327
expected = b'+OK Begin TLS negotiation'
328
resp = self.client.stls()
329
self.assertEqual(resp, expected)
332
def test_stls_context(self):
333
expected = b'+OK Begin TLS negotiation'
334
ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
335
resp = self.client.stls(context=ctx)
336
self.assertEqual(resp, expected)
341
class DummyPOP3_SSLHandler(DummyPOP3Handler):
343
def __init__(self, conn):
344
asynchat.async_chat.__init__(self, conn)
345
ssl_socket = ssl.wrap_socket(self.socket, certfile=CERTFILE,
347
do_handshake_on_connect=False)
349
self.set_socket(ssl_socket)
350
# Must try handshake before calling push()
351
self.tls_active = True
352
self.tls_starting = True
353
self._do_tls_handshake()
354
self.set_terminator(b"\r\n")
356
self.push('+OK dummy pop3 server ready. <timestamp>')
360
class TestPOP3_SSLClass(TestPOP3Class):
361
# repeat previous tests by using poplib.POP3_SSL
364
self.server = DummyPOP3Server((HOST, PORT))
365
self.server.handler = DummyPOP3_SSLHandler
367
self.client = poplib.POP3_SSL(self.server.host, self.server.port)
369
def test__all__(self):
370
self.assertIn('POP3_SSL', poplib.__all__)
372
def test_context(self):
373
ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
374
self.assertRaises(ValueError, poplib.POP3_SSL, self.server.host,
375
self.server.port, keyfile=CERTFILE, context=ctx)
376
self.assertRaises(ValueError, poplib.POP3_SSL, self.server.host,
377
self.server.port, certfile=CERTFILE, context=ctx)
378
self.assertRaises(ValueError, poplib.POP3_SSL, self.server.host,
379
self.server.port, keyfile=CERTFILE,
380
certfile=CERTFILE, context=ctx)
383
self.client = poplib.POP3_SSL(self.server.host, self.server.port,
385
self.assertIsInstance(self.client.sock, ssl.SSLSocket)
386
self.assertIs(self.client.sock.context, ctx)
387
self.assertTrue(self.client.noop().startswith(b'+OK'))
390
self.assertRaises(poplib.error_proto, self.client.stls)
392
test_stls_context = test_stls
394
def test_stls_capa(self):
395
capa = self.client.capa()
396
self.assertFalse('STLS' in capa.keys())
400
class TestPOP3_TLSClass(TestPOP3Class):
401
# repeat previous tests by using poplib.POP3.stls()
404
self.server = DummyPOP3Server((HOST, PORT))
406
self.client = poplib.POP3(self.server.host, self.server.port, timeout=3)
410
if self.client.file is not None and self.client.sock is not None:
413
except poplib.error_proto:
414
# happens in the test_too_long_lines case; the overlong
415
# response will be treated as response to QUIT and raise
421
self.assertRaises(poplib.error_proto, self.client.stls)
423
test_stls_context = test_stls
425
def test_stls_capa(self):
426
capa = self.client.capa()
427
self.assertFalse(b'STLS' in capa.keys())
430
class TestTimeouts(TestCase):
433
self.evt = threading.Event()
434
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
435
self.sock.settimeout(60) # Safety net. Look issue 11812
436
self.port = test_support.bind_port(self.sock)
437
self.thread = threading.Thread(target=self.server, args=(self.evt,self.sock))
438
self.thread.setDaemon(True)
444
del self.thread # Clear out any dangling Thread objects.
446
def server(self, evt, serv):
450
conn, addr = serv.accept()
451
conn.send(b"+ Hola mundo\n")
453
except socket.timeout:
458
def testTimeoutDefault(self):
459
self.assertTrue(socket.getdefaulttimeout() is None)
460
socket.setdefaulttimeout(30)
462
pop = poplib.POP3(HOST, self.port)
464
socket.setdefaulttimeout(None)
465
self.assertEqual(pop.sock.gettimeout(), 30)
468
def testTimeoutNone(self):
469
self.assertTrue(socket.getdefaulttimeout() is None)
470
socket.setdefaulttimeout(30)
472
pop = poplib.POP3(HOST, self.port, timeout=None)
474
socket.setdefaulttimeout(None)
475
self.assertTrue(pop.sock.gettimeout() is None)
478
def testTimeoutValue(self):
479
pop = poplib.POP3(HOST, self.port, timeout=30)
480
self.assertEqual(pop.sock.gettimeout(), 30)
485
tests = [TestPOP3Class, TestTimeouts,
486
TestPOP3_SSLClass, TestPOP3_TLSClass]
487
thread_info = test_support.threading_setup()
489
test_support.run_unittest(*tests)
491
test_support.threading_cleanup(*thread_info)
494
if __name__ == '__main__':