~malept/ubuntu/lucid/python2.6/dev-dependency-fix

« back to all changes in this revision

Viewing changes to Lib/test/test_ftplib.py

  • Committer: Bazaar Package Importer
  • Author(s): Matthias Klose
  • Date: 2009-02-13 12:51:00 UTC
  • Revision ID: james.westby@ubuntu.com-20090213125100-uufgcb9yeqzujpqw
Tags: upstream-2.6.1
ImportĀ upstreamĀ versionĀ 2.6.1

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
"""Test script for ftplib module."""
 
2
 
 
3
# Modified by Giampaolo Rodola' to test FTP class and IPv6 environment
 
4
 
 
5
import ftplib
 
6
import threading
 
7
import asyncore
 
8
import asynchat
 
9
import socket
 
10
import StringIO
 
11
 
 
12
from unittest import TestCase
 
13
from test import test_support
 
14
from test.test_support import HOST
 
15
 
 
16
 
 
17
# the dummy data returned by server over the data channel when
 
18
# RETR, LIST and NLST commands are issued
 
19
RETR_DATA = 'abcde12345\r\n' * 1000
 
20
LIST_DATA = 'foo\r\nbar\r\n'
 
21
NLST_DATA = 'foo\r\nbar\r\n'
 
22
 
 
23
 
 
24
class DummyDTPHandler(asynchat.async_chat):
 
25
 
 
26
    def __init__(self, conn, baseclass):
 
27
        asynchat.async_chat.__init__(self, conn)
 
28
        self.baseclass = baseclass
 
29
        self.baseclass.last_received_data = ''
 
30
 
 
31
    def handle_read(self):
 
32
        self.baseclass.last_received_data += self.recv(1024)
 
33
 
 
34
    def handle_close(self):
 
35
        self.baseclass.push('226 transfer complete')
 
36
        self.close()
 
37
 
 
38
 
 
39
class DummyFTPHandler(asynchat.async_chat):
 
40
 
 
41
    def __init__(self, conn):
 
42
        asynchat.async_chat.__init__(self, conn)
 
43
        self.set_terminator("\r\n")
 
44
        self.in_buffer = []
 
45
        self.dtp = None
 
46
        self.last_received_cmd = None
 
47
        self.last_received_data = ''
 
48
        self.next_response = ''
 
49
        self.push('220 welcome')
 
50
 
 
51
    def collect_incoming_data(self, data):
 
52
        self.in_buffer.append(data)
 
53
 
 
54
    def found_terminator(self):
 
55
        line = ''.join(self.in_buffer)
 
56
        self.in_buffer = []
 
57
        if self.next_response:
 
58
            self.push(self.next_response)
 
59
            self.next_response = ''
 
60
        cmd = line.split(' ')[0].lower()
 
61
        self.last_received_cmd = cmd
 
62
        space = line.find(' ')
 
63
        if space != -1:
 
64
            arg = line[space + 1:]
 
65
        else:
 
66
            arg = ""
 
67
        if hasattr(self, 'cmd_' + cmd):
 
68
            method = getattr(self, 'cmd_' + cmd)
 
69
            method(arg)
 
70
        else:
 
71
            self.push('550 command "%s" not understood.' %cmd)
 
72
 
 
73
    def handle_error(self):
 
74
        raise
 
75
 
 
76
    def push(self, data):
 
77
        asynchat.async_chat.push(self, data + '\r\n')
 
78
 
 
79
    def cmd_port(self, arg):
 
80
        addr = map(int, arg.split(','))
 
81
        ip = '%d.%d.%d.%d' %tuple(addr[:4])
 
82
        port = (addr[4] * 256) + addr[5]
 
83
        s = socket.create_connection((ip, port), timeout=2)
 
84
        self.dtp = DummyDTPHandler(s, baseclass=self)
 
85
        self.push('200 active data connection established')
 
86
 
 
87
    def cmd_pasv(self, arg):
 
88
        sock = socket.socket()
 
89
        sock.bind((self.socket.getsockname()[0], 0))
 
90
        sock.listen(5)
 
91
        sock.settimeout(2)
 
92
        ip, port = sock.getsockname()[:2]
 
93
        ip = ip.replace('.', ','); p1 = port / 256; p2 = port % 256
 
94
        self.push('227 entering passive mode (%s,%d,%d)' %(ip, p1, p2))
 
95
        conn, addr = sock.accept()
 
96
        self.dtp = DummyDTPHandler(conn, baseclass=self)
 
97
 
 
98
    def cmd_eprt(self, arg):
 
99
        af, ip, port = arg.split(arg[0])[1:-1]
 
100
        port = int(port)
 
101
        s = socket.create_connection((ip, port), timeout=2)
 
102
        self.dtp = DummyDTPHandler(s, baseclass=self)
 
103
        self.push('200 active data connection established')
 
104
 
 
105
    def cmd_epsv(self, arg):
 
106
        sock = socket.socket(socket.AF_INET6)
 
107
        sock.bind((self.socket.getsockname()[0], 0))
 
108
        sock.listen(5)
 
109
        sock.settimeout(2)
 
110
        port = sock.getsockname()[1]
 
111
        self.push('229 entering extended passive mode (|||%d|)' %port)
 
112
        conn, addr = sock.accept()
 
113
        self.dtp = DummyDTPHandler(conn, baseclass=self)
 
114
 
 
115
    def cmd_echo(self, arg):
 
116
        # sends back the received string (used by the test suite)
 
117
        self.push(arg)
 
118
 
 
119
    def cmd_user(self, arg):
 
120
        self.push('331 username ok')
 
121
 
 
122
    def cmd_pass(self, arg):
 
123
        self.push('230 password ok')
 
124
 
 
125
    def cmd_acct(self, arg):
 
126
        self.push('230 acct ok')
 
127
 
 
128
    def cmd_rnfr(self, arg):
 
129
        self.push('350 rnfr ok')
 
130
 
 
131
    def cmd_rnto(self, arg):
 
132
        self.push('250 rnto ok')
 
133
 
 
134
    def cmd_dele(self, arg):
 
135
        self.push('250 dele ok')
 
136
 
 
137
    def cmd_cwd(self, arg):
 
138
        self.push('250 cwd ok')
 
139
 
 
140
    def cmd_size(self, arg):
 
141
        self.push('250 1000')
 
142
 
 
143
    def cmd_mkd(self, arg):
 
144
        self.push('257 "%s"' %arg)
 
145
 
 
146
    def cmd_rmd(self, arg):
 
147
        self.push('250 rmd ok')
 
148
 
 
149
    def cmd_pwd(self, arg):
 
150
        self.push('257 "pwd ok"')
 
151
 
 
152
    def cmd_type(self, arg):
 
153
        self.push('200 type ok')
 
154
 
 
155
    def cmd_quit(self, arg):
 
156
        self.push('221 quit ok')
 
157
        self.close()
 
158
 
 
159
    def cmd_stor(self, arg):
 
160
        self.push('125 stor ok')
 
161
 
 
162
    def cmd_retr(self, arg):
 
163
        self.push('125 retr ok')
 
164
        self.dtp.push(RETR_DATA)
 
165
        self.dtp.close_when_done()
 
166
 
 
167
    def cmd_list(self, arg):
 
168
        self.push('125 list ok')
 
169
        self.dtp.push(LIST_DATA)
 
170
        self.dtp.close_when_done()
 
171
 
 
172
    def cmd_nlst(self, arg):
 
173
        self.push('125 nlst ok')
 
174
        self.dtp.push(NLST_DATA)
 
175
        self.dtp.close_when_done()
 
176
 
 
177
 
 
178
class DummyFTPServer(asyncore.dispatcher, threading.Thread):
 
179
 
 
180
    handler = DummyFTPHandler
 
181
 
 
182
    def __init__(self, address, af=socket.AF_INET):
 
183
        threading.Thread.__init__(self)
 
184
        asyncore.dispatcher.__init__(self)
 
185
        self.create_socket(af, socket.SOCK_STREAM)
 
186
        self.bind(address)
 
187
        self.listen(5)
 
188
        self.active = False
 
189
        self.active_lock = threading.Lock()
 
190
        self.host, self.port = self.socket.getsockname()[:2]
 
191
 
 
192
    def start(self):
 
193
        assert not self.active
 
194
        self.__flag = threading.Event()
 
195
        threading.Thread.start(self)
 
196
        self.__flag.wait()
 
197
 
 
198
    def run(self):
 
199
        self.active = True
 
200
        self.__flag.set()
 
201
        while self.active and asyncore.socket_map:
 
202
            self.active_lock.acquire()
 
203
            asyncore.loop(timeout=0.1, count=1)
 
204
            self.active_lock.release()
 
205
        asyncore.close_all(ignore_all=True)
 
206
 
 
207
    def stop(self):
 
208
        assert self.active
 
209
        self.active = False
 
210
        self.join()
 
211
 
 
212
    def handle_accept(self):
 
213
        conn, addr = self.accept()
 
214
        self.handler = self.handler(conn)
 
215
        self.close()
 
216
 
 
217
    def handle_connect(self):
 
218
        self.close()
 
219
    handle_read = handle_connect
 
220
 
 
221
    def writable(self):
 
222
        return 0
 
223
 
 
224
    def handle_error(self):
 
225
        raise
 
226
 
 
227
 
 
228
class TestFTPClass(TestCase):
 
229
 
 
230
    def setUp(self):
 
231
        self.server = DummyFTPServer((HOST, 0))
 
232
        self.server.start()
 
233
        self.client = ftplib.FTP(timeout=2)
 
234
        self.client.connect(self.server.host, self.server.port)
 
235
 
 
236
    def tearDown(self):
 
237
        self.client.close()
 
238
        self.server.stop()
 
239
 
 
240
    def test_getwelcome(self):
 
241
        self.assertEqual(self.client.getwelcome(), '220 welcome')
 
242
 
 
243
    def test_sanitize(self):
 
244
        self.assertEqual(self.client.sanitize('foo'), repr('foo'))
 
245
        self.assertEqual(self.client.sanitize('pass 12345'), repr('pass *****'))
 
246
        self.assertEqual(self.client.sanitize('PASS 12345'), repr('PASS *****'))
 
247
 
 
248
    def test_exceptions(self):
 
249
        self.assertRaises(ftplib.error_temp, self.client.sendcmd, 'echo 400')
 
250
        self.assertRaises(ftplib.error_temp, self.client.sendcmd, 'echo 499')
 
251
        self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'echo 500')
 
252
        self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'echo 599')
 
253
        self.assertRaises(ftplib.error_proto, self.client.sendcmd, 'echo 999')
 
254
 
 
255
    def test_all_errors(self):
 
256
        exceptions = (ftplib.error_reply, ftplib.error_temp, ftplib.error_perm,
 
257
                      ftplib.error_proto, ftplib.Error, IOError, EOFError)
 
258
        for x in exceptions:
 
259
            try:
 
260
                raise x('exception not included in all_errors set')
 
261
            except ftplib.all_errors:
 
262
                pass
 
263
 
 
264
    def test_set_pasv(self):
 
265
        # passive mode is supposed to be enabled by default
 
266
        self.assertTrue(self.client.passiveserver)
 
267
        self.client.set_pasv(True)
 
268
        self.assertTrue(self.client.passiveserver)
 
269
        self.client.set_pasv(False)
 
270
        self.assertFalse(self.client.passiveserver)
 
271
 
 
272
    def test_voidcmd(self):
 
273
        self.client.voidcmd('echo 200')
 
274
        self.client.voidcmd('echo 299')
 
275
        self.assertRaises(ftplib.error_reply, self.client.voidcmd, 'echo 199')
 
276
        self.assertRaises(ftplib.error_reply, self.client.voidcmd, 'echo 300')
 
277
 
 
278
    def test_login(self):
 
279
        self.client.login()
 
280
 
 
281
    def test_acct(self):
 
282
        self.client.acct('passwd')
 
283
 
 
284
    def test_rename(self):
 
285
        self.client.rename('a', 'b')
 
286
        self.server.handler.next_response = '200'
 
287
        self.assertRaises(ftplib.error_reply, self.client.rename, 'a', 'b')
 
288
 
 
289
    def test_delete(self):
 
290
        self.client.delete('foo')
 
291
        self.server.handler.next_response = '199'
 
292
        self.assertRaises(ftplib.error_reply, self.client.delete, 'foo')
 
293
 
 
294
    def test_size(self):
 
295
        self.client.size('foo')
 
296
 
 
297
    def test_mkd(self):
 
298
        dir = self.client.mkd('/foo')
 
299
        self.assertEqual(dir, '/foo')
 
300
 
 
301
    def test_rmd(self):
 
302
        self.client.rmd('foo')
 
303
 
 
304
    def test_pwd(self):
 
305
        dir = self.client.pwd()
 
306
        self.assertEqual(dir, 'pwd ok')
 
307
 
 
308
    def test_quit(self):
 
309
        self.assertEqual(self.client.quit(), '221 quit ok')
 
310
        # Ensure the connection gets closed; sock attribute should be None
 
311
        self.assertEqual(self.client.sock, None)
 
312
 
 
313
    def test_retrbinary(self):
 
314
        received = []
 
315
        self.client.retrbinary('retr', received.append)
 
316
        self.assertEqual(''.join(received), RETR_DATA)
 
317
 
 
318
    def test_retrlines(self):
 
319
        received = []
 
320
        self.client.retrlines('retr', received.append)
 
321
        self.assertEqual(''.join(received), RETR_DATA.replace('\r\n', ''))
 
322
 
 
323
    def test_storbinary(self):
 
324
        f = StringIO.StringIO(RETR_DATA)
 
325
        self.client.storbinary('stor', f)
 
326
        self.assertEqual(self.server.handler.last_received_data, RETR_DATA)
 
327
        # test new callback arg
 
328
        flag = []
 
329
        f.seek(0)
 
330
        self.client.storbinary('stor', f, callback=lambda x: flag.append(None))
 
331
        self.assertTrue(flag)
 
332
 
 
333
    def test_storlines(self):
 
334
        f = StringIO.StringIO(RETR_DATA.replace('\r\n', '\n'))
 
335
        self.client.storlines('stor', f)
 
336
        self.assertEqual(self.server.handler.last_received_data, RETR_DATA)
 
337
        # test new callback arg
 
338
        flag = []
 
339
        f.seek(0)
 
340
        self.client.storlines('stor foo', f, callback=lambda x: flag.append(None))
 
341
        self.assertTrue(flag)
 
342
 
 
343
    def test_nlst(self):
 
344
        self.client.nlst()
 
345
        self.assertEqual(self.client.nlst(), NLST_DATA.split('\r\n')[:-1])
 
346
 
 
347
    def test_dir(self):
 
348
        l = []
 
349
        self.client.dir(lambda x: l.append(x))
 
350
        self.assertEqual(''.join(l), LIST_DATA.replace('\r\n', ''))
 
351
 
 
352
    def test_makeport(self):
 
353
        self.client.makeport()
 
354
        # IPv4 is in use, just make sure send_eprt has not been used
 
355
        self.assertEqual(self.server.handler.last_received_cmd, 'port')
 
356
 
 
357
    def test_makepasv(self):
 
358
        host, port = self.client.makepasv()
 
359
        conn = socket.create_connection((host, port), 2)
 
360
        conn.close()
 
361
        # IPv4 is in use, just make sure send_epsv has not been used
 
362
        self.assertEqual(self.server.handler.last_received_cmd, 'pasv')
 
363
 
 
364
 
 
365
class TestIPv6Environment(TestCase):
 
366
 
 
367
    def setUp(self):
 
368
        self.server = DummyFTPServer((HOST, 0), af=socket.AF_INET6)
 
369
        self.server.start()
 
370
        self.client = ftplib.FTP()
 
371
        self.client.connect(self.server.host, self.server.port)
 
372
 
 
373
    def tearDown(self):
 
374
        self.client.close()
 
375
        self.server.stop()
 
376
 
 
377
    def test_af(self):
 
378
        self.assertEqual(self.client.af, socket.AF_INET6)
 
379
 
 
380
    def test_makeport(self):
 
381
        self.client.makeport()
 
382
        self.assertEqual(self.server.handler.last_received_cmd, 'eprt')
 
383
 
 
384
    def test_makepasv(self):
 
385
        host, port = self.client.makepasv()
 
386
        conn = socket.create_connection((host, port), 2)
 
387
        conn.close()
 
388
        self.assertEqual(self.server.handler.last_received_cmd, 'epsv')
 
389
 
 
390
    def test_transfer(self):
 
391
        def retr():
 
392
            received = []
 
393
            self.client.retrbinary('retr', received.append)
 
394
            self.assertEqual(''.join(received), RETR_DATA)
 
395
        self.client.set_pasv(True)
 
396
        retr()
 
397
        self.client.set_pasv(False)
 
398
        retr()
 
399
 
 
400
 
 
401
class TestTimeouts(TestCase):
 
402
 
 
403
    def setUp(self):
 
404
        self.evt = threading.Event()
 
405
        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 
406
        self.sock.settimeout(3)
 
407
        self.port = test_support.bind_port(self.sock)
 
408
        threading.Thread(target=self.server, args=(self.evt,self.sock)).start()
 
409
        # Wait for the server to be ready.
 
410
        self.evt.wait()
 
411
        self.evt.clear()
 
412
        ftplib.FTP.port = self.port
 
413
 
 
414
    def tearDown(self):
 
415
        self.evt.wait()
 
416
 
 
417
    def server(self, evt, serv):
 
418
        # This method sets the evt 3 times:
 
419
        #  1) when the connection is ready to be accepted.
 
420
        #  2) when it is safe for the caller to close the connection
 
421
        #  3) when we have closed the socket
 
422
        serv.listen(5)
 
423
        # (1) Signal the caller that we are ready to accept the connection.
 
424
        evt.set()
 
425
        try:
 
426
            conn, addr = serv.accept()
 
427
        except socket.timeout:
 
428
            pass
 
429
        else:
 
430
            conn.send("1 Hola mundo\n")
 
431
            # (2) Signal the caller that it is safe to close the socket.
 
432
            evt.set()
 
433
            conn.close()
 
434
        finally:
 
435
            serv.close()
 
436
            # (3) Signal the caller that we are done.
 
437
            evt.set()
 
438
 
 
439
    def testTimeoutDefault(self):
 
440
        # default -- use global socket timeout
 
441
        self.assert_(socket.getdefaulttimeout() is None)
 
442
        socket.setdefaulttimeout(30)
 
443
        try:
 
444
            ftp = ftplib.FTP("localhost")
 
445
        finally:
 
446
            socket.setdefaulttimeout(None)
 
447
        self.assertEqual(ftp.sock.gettimeout(), 30)
 
448
        self.evt.wait()
 
449
        ftp.close()
 
450
 
 
451
    def testTimeoutNone(self):
 
452
        # no timeout -- do not use global socket timeout
 
453
        self.assert_(socket.getdefaulttimeout() is None)
 
454
        socket.setdefaulttimeout(30)
 
455
        try:
 
456
            ftp = ftplib.FTP("localhost", timeout=None)
 
457
        finally:
 
458
            socket.setdefaulttimeout(None)
 
459
        self.assertTrue(ftp.sock.gettimeout() is None)
 
460
        self.evt.wait()
 
461
        ftp.close()
 
462
 
 
463
    def testTimeoutValue(self):
 
464
        # a value
 
465
        ftp = ftplib.FTP(HOST, timeout=30)
 
466
        self.assertEqual(ftp.sock.gettimeout(), 30)
 
467
        self.evt.wait()
 
468
        ftp.close()
 
469
 
 
470
    def testTimeoutConnect(self):
 
471
        ftp = ftplib.FTP()
 
472
        ftp.connect(HOST, timeout=30)
 
473
        self.assertEqual(ftp.sock.gettimeout(), 30)
 
474
        self.evt.wait()
 
475
        ftp.close()
 
476
 
 
477
    def testTimeoutDifferentOrder(self):
 
478
        ftp = ftplib.FTP(timeout=30)
 
479
        ftp.connect(HOST)
 
480
        self.assertEqual(ftp.sock.gettimeout(), 30)
 
481
        self.evt.wait()
 
482
        ftp.close()
 
483
 
 
484
    def testTimeoutDirectAccess(self):
 
485
        ftp = ftplib.FTP()
 
486
        ftp.timeout = 30
 
487
        ftp.connect(HOST)
 
488
        self.assertEqual(ftp.sock.gettimeout(), 30)
 
489
        self.evt.wait()
 
490
        ftp.close()
 
491
 
 
492
 
 
493
def test_main():
 
494
    tests = [TestFTPClass, TestTimeouts]
 
495
    if socket.has_ipv6:
 
496
        try:
 
497
            DummyFTPServer((HOST, 0), af=socket.AF_INET6)
 
498
        except socket.error:
 
499
            pass
 
500
        else:
 
501
            tests.append(TestIPv6Environment)
 
502
    thread_info = test_support.threading_setup()
 
503
    try:
 
504
        test_support.run_unittest(*tests)
 
505
    finally:
 
506
        test_support.threading_cleanup(*thread_info)
 
507
 
 
508
 
 
509
if __name__ == '__main__':
 
510
    test_main()