~certify-web-dev/twisted/certify-trunk

« back to all changes in this revision

Viewing changes to twisted/mail/test/test_pop3.py

  • Committer: Bazaar Package Importer
  • Author(s): Matthias Klose
  • Date: 2007-01-17 14:52:35 UTC
  • mfrom: (1.1.5 upstream) (2.1.2 etch)
  • Revision ID: james.westby@ubuntu.com-20070117145235-btmig6qfmqfen0om
Tags: 2.5.0-0ubuntu1
New upstream version, compatible with python2.5.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (c) 2001-2004 Twisted Matrix Laboratories.
 
2
# See LICENSE for details.
 
3
 
 
4
 
 
5
"""
 
6
Test cases for twisted.mail.pop3 module.
 
7
"""
 
8
 
 
9
import StringIO
 
10
import string
 
11
import hmac
 
12
import base64
 
13
import itertools
 
14
 
 
15
from zope.interface import implements
 
16
 
 
17
from twisted.internet import defer
 
18
 
 
19
from twisted.trial import unittest, util
 
20
from twisted import mail
 
21
import twisted.mail.protocols
 
22
import twisted.mail.pop3
 
23
import twisted.internet.protocol
 
24
from twisted import internet
 
25
from twisted.mail import pop3
 
26
from twisted.protocols import loopback
 
27
from twisted.python import failure
 
28
 
 
29
from twisted import cred
 
30
import twisted.cred.portal
 
31
import twisted.cred.checkers
 
32
import twisted.cred.credentials
 
33
 
 
34
from twisted.test.proto_helpers import LineSendingProtocol
 
35
 
 
36
 
 
37
class UtilityTestCase(unittest.TestCase):
 
38
    """
 
39
    Test the various helper functions and classes used by the POP3 server
 
40
    protocol implementation.
 
41
    """
 
42
 
 
43
    def testLineBuffering(self):
 
44
        """
 
45
        Test creating a LineBuffer and feeding it some lines.  The lines should
 
46
        build up in its internal buffer for a while and then get spat out to
 
47
        the writer.
 
48
        """
 
49
        output = []
 
50
        input = iter(itertools.cycle(['012', '345', '6', '7', '8', '9']))
 
51
        c = pop3._IteratorBuffer(output.extend, input, 6)
 
52
        i = iter(c)
 
53
        self.assertEquals(output, []) # nothing is buffer
 
54
        i.next()
 
55
        self.assertEquals(output, []) # '012' is buffered
 
56
        i.next()
 
57
        self.assertEquals(output, []) # '012345' is buffered
 
58
        i.next()
 
59
        self.assertEquals(output, ['012', '345', '6']) # nothing is buffered
 
60
        for n in range(5):
 
61
            i.next()
 
62
        self.assertEquals(output, ['012', '345', '6', '7', '8', '9', '012', '345'])
 
63
 
 
64
 
 
65
    def testFinishLineBuffering(self):
 
66
        """
 
67
        Test that a LineBuffer flushes everything when its iterator is
 
68
        exhausted, and itself raises StopIteration.
 
69
        """
 
70
        output = []
 
71
        input = iter(['a', 'b', 'c'])
 
72
        c = pop3._IteratorBuffer(output.extend, input, 5)
 
73
        for i in c:
 
74
            pass
 
75
        self.assertEquals(output, ['a', 'b', 'c'])
 
76
 
 
77
 
 
78
    def testSuccessResponseFormatter(self):
 
79
        """
 
80
        Test that the thing that spits out POP3 'success responses' works
 
81
        right.
 
82
        """
 
83
        self.assertEquals(
 
84
            pop3.successResponse('Great.'),
 
85
            '+OK Great.\r\n')
 
86
 
 
87
 
 
88
    def testStatLineFormatter(self):
 
89
        """
 
90
        Test that the function which formats stat lines does so appropriately.
 
91
        """
 
92
        statLine = list(pop3.formatStatResponse([]))[-1]
 
93
        self.assertEquals(statLine, '+OK 0 0\r\n')
 
94
 
 
95
        statLine = list(pop3.formatStatResponse([10, 31, 0, 10101]))[-1]
 
96
        self.assertEquals(statLine, '+OK 4 10142\r\n')
 
97
 
 
98
 
 
99
    def testListLineFormatter(self):
 
100
        """
 
101
        Test that the function which formats the lines in response to a LIST
 
102
        command does so appropriately.
 
103
        """
 
104
        listLines = list(pop3.formatListResponse([]))
 
105
        self.assertEquals(
 
106
            listLines,
 
107
            ['+OK 0\r\n', '.\r\n'])
 
108
 
 
109
        listLines = list(pop3.formatListResponse([1, 2, 3, 100]))
 
110
        self.assertEquals(
 
111
            listLines,
 
112
            ['+OK 4\r\n', '1 1\r\n', '2 2\r\n', '3 3\r\n', '4 100\r\n', '.\r\n'])
 
113
 
 
114
 
 
115
 
 
116
    def testUIDListLineFormatter(self):
 
117
        """
 
118
        Test that the function which formats lines in response to a UIDL
 
119
        command does so appropriately.
 
120
        """
 
121
        UIDs = ['abc', 'def', 'ghi']
 
122
        listLines = list(pop3.formatUIDListResponse([], UIDs.__getitem__))
 
123
        self.assertEquals(
 
124
            listLines,
 
125
            ['+OK \r\n', '.\r\n'])
 
126
 
 
127
        listLines = list(pop3.formatUIDListResponse([123, 431, 591], UIDs.__getitem__))
 
128
        self.assertEquals(
 
129
            listLines,
 
130
            ['+OK \r\n', '1 abc\r\n', '2 def\r\n', '3 ghi\r\n', '.\r\n'])
 
131
 
 
132
        listLines = list(pop3.formatUIDListResponse([0, None, 591], UIDs.__getitem__))
 
133
        self.assertEquals(
 
134
            listLines,
 
135
            ['+OK \r\n', '1 abc\r\n', '3 ghi\r\n', '.\r\n'])
 
136
 
 
137
 
 
138
 
 
139
class MyVirtualPOP3(mail.protocols.VirtualPOP3):
 
140
 
 
141
    magic = '<moshez>'
 
142
 
 
143
    def authenticateUserAPOP(self, user, digest):
 
144
        user, domain = self.lookupDomain(user)
 
145
        return self.service.domains['baz.com'].authenticateUserAPOP(user, digest, self.magic, domain)
 
146
 
 
147
class DummyDomain:
 
148
 
 
149
   def __init__(self):
 
150
       self.users = {}
 
151
 
 
152
   def addUser(self, name):
 
153
       self.users[name] = []
 
154
 
 
155
   def addMessage(self, name, message):
 
156
       self.users[name].append(message)
 
157
 
 
158
   def authenticateUserAPOP(self, name, digest, magic, domain):
 
159
       return pop3.IMailbox, ListMailbox(self.users[name]), lambda: None
 
160
 
 
161
 
 
162
class ListMailbox:
 
163
 
 
164
    def __init__(self, list):
 
165
        self.list = list
 
166
 
 
167
    def listMessages(self, i=None):
 
168
        if i is None:
 
169
            return map(len, self.list)
 
170
        return len(self.list[i])
 
171
 
 
172
    def getMessage(self, i):
 
173
        return StringIO.StringIO(self.list[i])
 
174
 
 
175
    def getUidl(self, i):
 
176
        return i
 
177
 
 
178
    def deleteMessage(self, i):
 
179
        self.list[i] = ''
 
180
 
 
181
    def sync(self):
 
182
        pass
 
183
 
 
184
class MyPOP3Downloader(pop3.POP3Client):
 
185
 
 
186
    def handle_WELCOME(self, line):
 
187
        pop3.POP3Client.handle_WELCOME(self, line)
 
188
        self.apop('hello@baz.com', 'world')
 
189
 
 
190
    def handle_APOP(self, line):
 
191
        parts = line.split()
 
192
        code = parts[0]
 
193
        data = (parts[1:] or ['NONE'])[0]
 
194
        if code != '+OK':
 
195
            print parts
 
196
            raise AssertionError, 'code is ' + code
 
197
        self.lines = []
 
198
        self.retr(1)
 
199
 
 
200
    def handle_RETR_continue(self, line):
 
201
        self.lines.append(line)
 
202
 
 
203
    def handle_RETR_end(self):
 
204
        self.message = '\n'.join(self.lines) + '\n'
 
205
        self.quit()
 
206
 
 
207
    def handle_QUIT(self, line):
 
208
        if line[:3] != '+OK':
 
209
            raise AssertionError, 'code is ' + line
 
210
 
 
211
 
 
212
class POP3TestCase(unittest.TestCase):
 
213
 
 
214
    message = '''\
 
215
Subject: urgent
 
216
 
 
217
Someone set up us the bomb!
 
218
'''
 
219
 
 
220
    expectedOutput = '''\
 
221
+OK <moshez>\015
 
222
+OK Authentication succeeded\015
 
223
+OK \015
 
224
1 0\015
 
225
.\015
 
226
+OK %d\015
 
227
Subject: urgent\015
 
228
\015
 
229
Someone set up us the bomb!\015
 
230
.\015
 
231
+OK \015
 
232
''' % len(message)
 
233
 
 
234
    def setUp(self):
 
235
        self.factory = internet.protocol.Factory()
 
236
        self.factory.domains = {}
 
237
        self.factory.domains['baz.com'] = DummyDomain()
 
238
        self.factory.domains['baz.com'].addUser('hello')
 
239
        self.factory.domains['baz.com'].addMessage('hello', self.message)
 
240
 
 
241
    def testMessages(self):
 
242
        client = LineSendingProtocol([
 
243
            'APOP hello@baz.com world',
 
244
            'UIDL',
 
245
            'RETR 1',
 
246
            'QUIT',
 
247
        ])
 
248
        server =  MyVirtualPOP3()
 
249
        server.service = self.factory
 
250
        def check(ignored):
 
251
            output = '\r\n'.join(client.response) + '\r\n'
 
252
            self.assertEquals(output, self.expectedOutput)
 
253
        return loopback.loopbackTCP(server, client).addCallback(check)
 
254
 
 
255
    def testLoopback(self):
 
256
        protocol =  MyVirtualPOP3()
 
257
        protocol.service = self.factory
 
258
        clientProtocol = MyPOP3Downloader()
 
259
        def check(ignored):
 
260
            self.failUnlessEqual(clientProtocol.message, self.message)
 
261
            protocol.connectionLost(
 
262
                failure.Failure(Exception("Test harness disconnect")))
 
263
        d = loopback.loopbackAsync(protocol, clientProtocol)
 
264
        return d.addCallback(check)
 
265
    testLoopback.suppress = [util.suppress(message="twisted.mail.pop3.POP3Client is deprecated")]
 
266
 
 
267
 
 
268
 
 
269
class DummyPOP3(pop3.POP3):
 
270
 
 
271
    magic = '<moshez>'
 
272
 
 
273
    def authenticateUserAPOP(self, user, password):
 
274
        return pop3.IMailbox, DummyMailbox(ValueError), lambda: None
 
275
 
 
276
 
 
277
 
 
278
class DummyMailbox(pop3.Mailbox):
 
279
 
 
280
    messages = [
 
281
'''\
 
282
From: moshe
 
283
To: moshe
 
284
 
 
285
How are you, friend?
 
286
''']
 
287
 
 
288
    def __init__(self, exceptionType):
 
289
        self.messages = DummyMailbox.messages[:]
 
290
        self.exceptionType = exceptionType
 
291
 
 
292
    def listMessages(self, i=None):
 
293
        if i is None:
 
294
            return map(len, self.messages)
 
295
        if i >= len(self.messages):
 
296
            raise self.exceptionType()
 
297
        return len(self.messages[i])
 
298
 
 
299
    def getMessage(self, i):
 
300
        return StringIO.StringIO(self.messages[i])
 
301
 
 
302
    def getUidl(self, i):
 
303
        if i >= len(self.messages):
 
304
            raise self.exceptionType()
 
305
        return str(i)
 
306
 
 
307
    def deleteMessage(self, i):
 
308
        self.messages[i] = ''
 
309
 
 
310
 
 
311
class AnotherPOP3TestCase(unittest.TestCase):
 
312
 
 
313
    def runTest(self, lines):
 
314
        dummy = DummyPOP3()
 
315
        client = LineSendingProtocol([
 
316
            "APOP moshez dummy",
 
317
            "LIST",
 
318
            "UIDL",
 
319
            "RETR 1",
 
320
            "RETR 2",
 
321
            "DELE 1",
 
322
            "RETR 1",
 
323
            "QUIT",
 
324
        ])
 
325
        d = loopback.loopbackAsync(dummy, client)
 
326
        return d.addCallback(self._cbRunTest, client, dummy)
 
327
 
 
328
    def _cbRunTest(self, ignored, client, dummy):
 
329
        expected_output = [
 
330
            '+OK <moshez>',
 
331
            '+OK Authentication succeeded',
 
332
            '+OK 1',
 
333
            '1 44',
 
334
            '.',
 
335
            '+OK ',
 
336
            '1 0',
 
337
            '.',
 
338
            '+OK 44',
 
339
            'From: moshe',
 
340
            'To: moshe',
 
341
            '',
 
342
            'How are you, friend?',
 
343
            '',
 
344
            '.',
 
345
            '-ERR Bad message number argument',
 
346
            '+OK ',
 
347
            '-ERR message deleted',
 
348
            '+OK ',
 
349
            ''
 
350
            ]
 
351
        self.failUnlessEqual('\r\n'.join(expected_output),
 
352
                             '\r\n'.join(client.response) + '\r\n')
 
353
        dummy.connectionLost(failure.Failure(Exception("Test harness disconnect")))
 
354
        return ignored
 
355
 
 
356
 
 
357
    def testBuffer(self):
 
358
        lines = string.split('''\
 
359
APOP moshez dummy
 
360
LIST
 
361
UIDL
 
362
RETR 1
 
363
RETR 2
 
364
DELE 1
 
365
RETR 1
 
366
QUIT''', '\n')
 
367
        return self.runTest(lines)
 
368
 
 
369
    def testNoop(self):
 
370
        lines = ['APOP spiv dummy', 'NOOP', 'QUIT']
 
371
        return self.runTest(lines)
 
372
 
 
373
    def testAuthListing(self):
 
374
        p = DummyPOP3()
 
375
        p.factory = internet.protocol.Factory()
 
376
        p.factory.challengers = {'Auth1': None, 'secondAuth': None, 'authLast': None}
 
377
        client = LineSendingProtocol([
 
378
            "AUTH",
 
379
            "QUIT",
 
380
        ])
 
381
 
 
382
        d = loopback.loopbackAsync(p, client)
 
383
        return d.addCallback(self._cbTestAuthListing, client)
 
384
 
 
385
    def _cbTestAuthListing(self, ignored, client):
 
386
        self.failUnless(client.response[1].startswith('+OK'))
 
387
        self.assertEquals(client.response[2:6],
 
388
                          ["AUTH1", "SECONDAUTH", "AUTHLAST", "."])
 
389
 
 
390
    def testIllegalPASS(self):
 
391
        dummy = DummyPOP3()
 
392
        client = LineSendingProtocol([
 
393
            "PASS fooz",
 
394
            "QUIT"
 
395
        ])
 
396
        d = loopback.loopbackAsync(dummy, client)
 
397
        return d.addCallback(self._cbTestIllegalPASS, client, dummy)
 
398
 
 
399
    def _cbTestIllegalPASS(self, ignored, client, dummy):
 
400
        expected_output = '+OK <moshez>\r\n-ERR USER required before PASS\r\n+OK \r\n'
 
401
        self.failUnlessEqual(expected_output, '\r\n'.join(client.response) + '\r\n')
 
402
        dummy.connectionLost(failure.Failure(Exception("Test harness disconnect")))
 
403
 
 
404
    def testEmptyPASS(self):
 
405
        dummy = DummyPOP3()
 
406
        client = LineSendingProtocol([
 
407
            "PASS ",
 
408
            "QUIT"
 
409
        ])
 
410
        d = loopback.loopbackAsync(dummy, client)
 
411
        return d.addCallback(self._cbTestEmptyPASS, client, dummy)
 
412
 
 
413
    def _cbTestEmptyPASS(self, ignored, client, dummy):
 
414
        expected_output = '+OK <moshez>\r\n-ERR USER required before PASS\r\n+OK \r\n'
 
415
        self.failUnlessEqual(expected_output, '\r\n'.join(client.response) + '\r\n')
 
416
        dummy.connectionLost(failure.Failure(Exception("Test harness disconnect")))
 
417
 
 
418
 
 
419
class TestServerFactory:
 
420
    implements(pop3.IServerFactory)
 
421
 
 
422
    def cap_IMPLEMENTATION(self):
 
423
        return "Test Implementation String"
 
424
 
 
425
    def cap_EXPIRE(self):
 
426
        return 60
 
427
 
 
428
    challengers = {"SCHEME_1": None, "SCHEME_2": None}
 
429
 
 
430
    def cap_LOGIN_DELAY(self):
 
431
        return 120
 
432
 
 
433
    pue = True
 
434
    def perUserExpiration(self):
 
435
        return self.pue
 
436
 
 
437
    puld = True
 
438
    def perUserLoginDelay(self):
 
439
        return self.puld
 
440
 
 
441
 
 
442
class TestMailbox:
 
443
    loginDelay = 100
 
444
    messageExpiration = 25
 
445
 
 
446
 
 
447
class CapabilityTestCase(unittest.TestCase):
 
448
    def setUp(self):
 
449
        s = StringIO.StringIO()
 
450
        p = pop3.POP3()
 
451
        p.factory = TestServerFactory()
 
452
        p.transport = internet.protocol.FileWrapper(s)
 
453
        p.connectionMade()
 
454
        p.do_CAPA()
 
455
 
 
456
        self.caps = p.listCapabilities()
 
457
        self.pcaps = s.getvalue().splitlines()
 
458
 
 
459
        s = StringIO.StringIO()
 
460
        p.mbox = TestMailbox()
 
461
        p.transport = internet.protocol.FileWrapper(s)
 
462
        p.do_CAPA()
 
463
 
 
464
        self.lpcaps = s.getvalue().splitlines()
 
465
        p.connectionLost(failure.Failure(Exception("Test harness disconnect")))
 
466
 
 
467
    def contained(self, s, *caps):
 
468
        for c in caps:
 
469
            self.assertIn(s, c)
 
470
 
 
471
    def testUIDL(self):
 
472
        self.contained("UIDL", self.caps, self.pcaps, self.lpcaps)
 
473
 
 
474
    def testTOP(self):
 
475
        self.contained("TOP", self.caps, self.pcaps, self.lpcaps)
 
476
 
 
477
    def testUSER(self):
 
478
        self.contained("USER", self.caps, self.pcaps, self.lpcaps)
 
479
 
 
480
    def testEXPIRE(self):
 
481
        self.contained("EXPIRE 60 USER", self.caps, self.pcaps)
 
482
        self.contained("EXPIRE 25", self.lpcaps)
 
483
 
 
484
    def testIMPLEMENTATION(self):
 
485
        self.contained(
 
486
            "IMPLEMENTATION Test Implementation String",
 
487
            self.caps, self.pcaps, self.lpcaps
 
488
        )
 
489
 
 
490
    def testSASL(self):
 
491
        self.contained(
 
492
            "SASL SCHEME_1 SCHEME_2",
 
493
            self.caps, self.pcaps, self.lpcaps
 
494
        )
 
495
 
 
496
    def testLOGIN_DELAY(self):
 
497
        self.contained("LOGIN-DELAY 120 USER", self.caps, self.pcaps)
 
498
        self.assertIn("LOGIN-DELAY 100", self.lpcaps)
 
499
 
 
500
 
 
501
 
 
502
class GlobalCapabilitiesTestCase(unittest.TestCase):
 
503
    def setUp(self):
 
504
        s = StringIO.StringIO()
 
505
        p = pop3.POP3()
 
506
        p.factory = TestServerFactory()
 
507
        p.factory.pue = p.factory.puld = False
 
508
        p.transport = internet.protocol.FileWrapper(s)
 
509
        p.connectionMade()
 
510
        p.do_CAPA()
 
511
 
 
512
        self.caps = p.listCapabilities()
 
513
        self.pcaps = s.getvalue().splitlines()
 
514
 
 
515
        s = StringIO.StringIO()
 
516
        p.mbox = TestMailbox()
 
517
        p.transport = internet.protocol.FileWrapper(s)
 
518
        p.do_CAPA()
 
519
 
 
520
        self.lpcaps = s.getvalue().splitlines()
 
521
        p.connectionLost(failure.Failure(Exception("Test harness disconnect")))
 
522
 
 
523
    def contained(self, s, *caps):
 
524
        for c in caps:
 
525
            self.assertIn(s, c)
 
526
 
 
527
    def testEXPIRE(self):
 
528
        self.contained("EXPIRE 60", self.caps, self.pcaps, self.lpcaps)
 
529
 
 
530
    def testLOGIN_DELAY(self):
 
531
        self.contained("LOGIN-DELAY 120", self.caps, self.pcaps, self.lpcaps)
 
532
 
 
533
 
 
534
 
 
535
class TestRealm:
 
536
    def requestAvatar(self, avatarId, mind, *interfaces):
 
537
        if avatarId == 'testuser':
 
538
            return pop3.IMailbox, DummyMailbox(ValueError), lambda: None
 
539
        assert False
 
540
 
 
541
 
 
542
 
 
543
class SASLTestCase(unittest.TestCase):
 
544
    def testValidLogin(self):
 
545
        p = pop3.POP3()
 
546
        p.factory = TestServerFactory()
 
547
        p.factory.challengers = {'CRAM-MD5': cred.credentials.CramMD5Credentials}
 
548
        p.portal = cred.portal.Portal(TestRealm())
 
549
        ch = cred.checkers.InMemoryUsernamePasswordDatabaseDontUse()
 
550
        ch.addUser('testuser', 'testpassword')
 
551
        p.portal.registerChecker(ch)
 
552
 
 
553
        s = StringIO.StringIO()
 
554
        p.transport = internet.protocol.FileWrapper(s)
 
555
        p.connectionMade()
 
556
 
 
557
        p.lineReceived("CAPA")
 
558
        self.failUnless(s.getvalue().find("SASL CRAM-MD5") >= 0)
 
559
 
 
560
        p.lineReceived("AUTH CRAM-MD5")
 
561
        chal = s.getvalue().splitlines()[-1][2:]
 
562
        chal = base64.decodestring(chal)
 
563
        response = hmac.HMAC('testpassword', chal).hexdigest()
 
564
 
 
565
        p.lineReceived(base64.encodestring('testuser ' + response).rstrip('\n'))
 
566
        self.failUnless(p.mbox)
 
567
        self.failUnless(s.getvalue().splitlines()[-1].find("+OK") >= 0)
 
568
        p.connectionLost(failure.Failure(Exception("Test harness disconnect")))
 
569
 
 
570
 
 
571
 
 
572
class CommandMixin:
 
573
    """
 
574
    Tests for all the commands a POP3 server is allowed to receive.
 
575
    """
 
576
 
 
577
    extraMessage = '''\
 
578
From: guy
 
579
To: fellow
 
580
 
 
581
More message text for you.
 
582
'''
 
583
 
 
584
 
 
585
    def setUp(self):
 
586
        """
 
587
        Make a POP3 server protocol instance hooked up to a simple mailbox and
 
588
        a transport that buffers output to a StringIO.
 
589
        """
 
590
        p = pop3.POP3()
 
591
        p.mbox = self.mailboxType(self.exceptionType)
 
592
        p.schedule = list
 
593
        self.pop3Server = p
 
594
 
 
595
        s = StringIO.StringIO()
 
596
        p.transport = internet.protocol.FileWrapper(s)
 
597
        p.connectionMade()
 
598
        s.truncate(0)
 
599
        self.pop3Transport = s
 
600
 
 
601
 
 
602
    def tearDown(self):
 
603
        """
 
604
        Disconnect the server protocol so it can clean up anything it might
 
605
        need to clean up.
 
606
        """
 
607
        self.pop3Server.connectionLost(failure.Failure(Exception("Test harness disconnect")))
 
608
 
 
609
 
 
610
    def _flush(self):
 
611
        """
 
612
        Do some of the things that the reactor would take care of, if the
 
613
        reactor were actually running.
 
614
        """
 
615
        # Oh man FileWrapper is pooh.
 
616
        self.pop3Server.transport._checkProducer()
 
617
 
 
618
 
 
619
    def testLIST(self):
 
620
        """
 
621
        Test the two forms of list: with a message index number, which should
 
622
        return a short-form response, and without a message index number, which
 
623
        should return a long-form response, one line per message.
 
624
        """
 
625
        p = self.pop3Server
 
626
        s = self.pop3Transport
 
627
 
 
628
        p.lineReceived("LIST 1")
 
629
        self._flush()
 
630
        self.assertEquals(s.getvalue(), "+OK 1 44\r\n")
 
631
        s.truncate(0)
 
632
 
 
633
        p.lineReceived("LIST")
 
634
        self._flush()
 
635
        self.assertEquals(s.getvalue(), "+OK 1\r\n1 44\r\n.\r\n")
 
636
 
 
637
 
 
638
    def testLISTWithBadArgument(self):
 
639
        """
 
640
        Test that non-integers and out-of-bound integers produce appropriate
 
641
        error responses.
 
642
        """
 
643
        p = self.pop3Server
 
644
        s = self.pop3Transport
 
645
 
 
646
        p.lineReceived("LIST a")
 
647
        self.assertEquals(
 
648
            s.getvalue(),
 
649
            "-ERR Invalid message-number: 'a'\r\n")
 
650
        s.truncate(0)
 
651
 
 
652
        p.lineReceived("LIST 0")
 
653
        self.assertEquals(
 
654
            s.getvalue(),
 
655
            "-ERR Invalid message-number: 0\r\n")
 
656
        s.truncate(0)
 
657
 
 
658
        p.lineReceived("LIST 2")
 
659
        self.assertEquals(
 
660
            s.getvalue(),
 
661
            "-ERR Invalid message-number: 2\r\n")
 
662
        s.truncate(0)
 
663
 
 
664
 
 
665
    def testUIDL(self):
 
666
        """
 
667
        Test the two forms of the UIDL command.  These are just like the two
 
668
        forms of the LIST command.
 
669
        """
 
670
        p = self.pop3Server
 
671
        s = self.pop3Transport
 
672
 
 
673
        p.lineReceived("UIDL 1")
 
674
        self.assertEquals(s.getvalue(), "+OK 0\r\n")
 
675
        s.truncate(0)
 
676
 
 
677
        p.lineReceived("UIDL")
 
678
        self._flush()
 
679
        self.assertEquals(s.getvalue(), "+OK \r\n1 0\r\n.\r\n")
 
680
 
 
681
 
 
682
    def testUIDLWithBadArgument(self):
 
683
        """
 
684
        Test that UIDL with a non-integer or an out-of-bounds integer produces
 
685
        the appropriate error response.
 
686
        """
 
687
        p = self.pop3Server
 
688
        s = self.pop3Transport
 
689
 
 
690
        p.lineReceived("UIDL a")
 
691
        self.assertEquals(
 
692
            s.getvalue(),
 
693
            "-ERR Bad message number argument\r\n")
 
694
        s.truncate(0)
 
695
 
 
696
        p.lineReceived("UIDL 0")
 
697
        self.assertEquals(
 
698
            s.getvalue(),
 
699
            "-ERR Bad message number argument\r\n")
 
700
        s.truncate(0)
 
701
 
 
702
        p.lineReceived("UIDL 2")
 
703
        self.assertEquals(
 
704
            s.getvalue(),
 
705
            "-ERR Bad message number argument\r\n")
 
706
        s.truncate(0)
 
707
 
 
708
 
 
709
    def testSTAT(self):
 
710
        """
 
711
        Test the single form of the STAT command, which returns a short-form
 
712
        response of the number of messages in the mailbox and their total size.
 
713
        """
 
714
        p = self.pop3Server
 
715
        s = self.pop3Transport
 
716
 
 
717
        p.lineReceived("STAT")
 
718
        self._flush()
 
719
        self.assertEquals(s.getvalue(), "+OK 1 44\r\n")
 
720
 
 
721
 
 
722
    def testRETR(self):
 
723
        """
 
724
        Test downloading a message.
 
725
        """
 
726
        p = self.pop3Server
 
727
        s = self.pop3Transport
 
728
 
 
729
        p.lineReceived("RETR 1")
 
730
        self._flush()
 
731
        self.assertEquals(
 
732
            s.getvalue(),
 
733
            "+OK 44\r\n"
 
734
            "From: moshe\r\n"
 
735
            "To: moshe\r\n"
 
736
            "\r\n"
 
737
            "How are you, friend?\r\n"
 
738
            ".\r\n")
 
739
        s.truncate(0)
 
740
 
 
741
 
 
742
    def testRETRWithBadArgument(self):
 
743
        """
 
744
        Test that trying to download a message with a bad argument, either not
 
745
        an integer or an out-of-bounds integer, fails with the appropriate
 
746
        error response.
 
747
        """
 
748
        p = self.pop3Server
 
749
        s = self.pop3Transport
 
750
 
 
751
        p.lineReceived("RETR a")
 
752
        self.assertEquals(
 
753
            s.getvalue(),
 
754
            "-ERR Bad message number argument\r\n")
 
755
        s.truncate(0)
 
756
 
 
757
        p.lineReceived("RETR 0")
 
758
        self.assertEquals(
 
759
            s.getvalue(),
 
760
            "-ERR Bad message number argument\r\n")
 
761
        s.truncate(0)
 
762
 
 
763
        p.lineReceived("RETR 2")
 
764
        self.assertEquals(
 
765
            s.getvalue(),
 
766
            "-ERR Bad message number argument\r\n")
 
767
        s.truncate(0)
 
768
 
 
769
 
 
770
    def testTOP(self):
 
771
        """
 
772
        Test downloading the headers and part of the body of a message.
 
773
        """
 
774
        p = self.pop3Server
 
775
        s = self.pop3Transport
 
776
        p.mbox.messages.append(self.extraMessage)
 
777
 
 
778
        p.lineReceived("TOP 1 0")
 
779
        self._flush()
 
780
        self.assertEquals(
 
781
            s.getvalue(),
 
782
            "+OK Top of message follows\r\n"
 
783
            "From: moshe\r\n"
 
784
            "To: moshe\r\n"
 
785
            "\r\n"
 
786
            ".\r\n")
 
787
 
 
788
 
 
789
    def testTOPWithBadArgument(self):
 
790
        """
 
791
        Test that trying to download a message with a bad argument, either a
 
792
        message number which isn't an integer or is an out-of-bounds integer or
 
793
        a number of lines which isn't an integer or is a negative integer,
 
794
        fails with the appropriate error response.
 
795
        """
 
796
        p = self.pop3Server
 
797
        s = self.pop3Transport
 
798
        p.mbox.messages.append(self.extraMessage)
 
799
 
 
800
        p.lineReceived("TOP 1 a")
 
801
        self.assertEquals(
 
802
            s.getvalue(),
 
803
            "-ERR Bad line count argument\r\n")
 
804
        s.truncate(0)
 
805
 
 
806
        p.lineReceived("TOP 1 -1")
 
807
        self.assertEquals(
 
808
            s.getvalue(),
 
809
            "-ERR Bad line count argument\r\n")
 
810
        s.truncate(0)
 
811
 
 
812
        p.lineReceived("TOP a 1")
 
813
        self.assertEquals(
 
814
            s.getvalue(),
 
815
            "-ERR Bad message number argument\r\n")
 
816
        s.truncate(0)
 
817
 
 
818
        p.lineReceived("TOP 0 1")
 
819
        self.assertEquals(
 
820
            s.getvalue(),
 
821
            "-ERR Bad message number argument\r\n")
 
822
        s.truncate(0)
 
823
 
 
824
        p.lineReceived("TOP 3 1")
 
825
        self.assertEquals(
 
826
            s.getvalue(),
 
827
            "-ERR Bad message number argument\r\n")
 
828
        s.truncate(0)
 
829
 
 
830
 
 
831
    def testLAST(self):
 
832
        """
 
833
        Test the exceedingly pointless LAST command, which tells you the
 
834
        highest message index which you have already downloaded.
 
835
        """
 
836
        p = self.pop3Server
 
837
        s = self.pop3Transport
 
838
        p.mbox.messages.append(self.extraMessage)
 
839
 
 
840
        p.lineReceived('LAST')
 
841
        self.assertEquals(
 
842
            s.getvalue(),
 
843
            "+OK 0\r\n")
 
844
        s.truncate(0)
 
845
 
 
846
 
 
847
    def testRetrieveUpdatesHighest(self):
 
848
        """
 
849
        Test that issuing a RETR command updates the LAST response.
 
850
        """
 
851
        p = self.pop3Server
 
852
        s = self.pop3Transport
 
853
        p.mbox.messages.append(self.extraMessage)
 
854
 
 
855
        p.lineReceived('RETR 2')
 
856
        self._flush()
 
857
        s.truncate(0)
 
858
        p.lineReceived('LAST')
 
859
        self.assertEquals(
 
860
            s.getvalue(),
 
861
            '+OK 2\r\n')
 
862
        s.truncate(0)
 
863
 
 
864
 
 
865
    def testTopUpdatesHighest(self):
 
866
        """
 
867
        Test that issuing a TOP command updates the LAST response.
 
868
        """
 
869
        p = self.pop3Server
 
870
        s = self.pop3Transport
 
871
        p.mbox.messages.append(self.extraMessage)
 
872
 
 
873
        p.lineReceived('TOP 2 10')
 
874
        self._flush()
 
875
        s.truncate(0)
 
876
        p.lineReceived('LAST')
 
877
        self.assertEquals(
 
878
            s.getvalue(),
 
879
            '+OK 2\r\n')
 
880
 
 
881
 
 
882
    def testHighestOnlyProgresses(self):
 
883
        """
 
884
        Test that downloading a message with a smaller index than the current
 
885
        LAST response doesn't change the LAST response.
 
886
        """
 
887
        p = self.pop3Server
 
888
        s = self.pop3Transport
 
889
        p.mbox.messages.append(self.extraMessage)
 
890
 
 
891
        p.lineReceived('RETR 2')
 
892
        self._flush()
 
893
        p.lineReceived('TOP 1 10')
 
894
        self._flush()
 
895
        s.truncate(0)
 
896
        p.lineReceived('LAST')
 
897
        self.assertEquals(
 
898
            s.getvalue(),
 
899
            '+OK 2\r\n')
 
900
 
 
901
 
 
902
    def testResetClearsHighest(self):
 
903
        """
 
904
        Test that issuing RSET changes the LAST response to 0.
 
905
        """
 
906
        p = self.pop3Server
 
907
        s = self.pop3Transport
 
908
        p.mbox.messages.append(self.extraMessage)
 
909
 
 
910
        p.lineReceived('RETR 2')
 
911
        self._flush()
 
912
        p.lineReceived('RSET')
 
913
        s.truncate(0)
 
914
        p.lineReceived('LAST')
 
915
        self.assertEquals(
 
916
            s.getvalue(),
 
917
            '+OK 0\r\n')
 
918
 
 
919
 
 
920
 
 
921
_listMessageDeprecation = (
 
922
    "twisted.mail.pop3.IMailbox.listMessages may not "
 
923
    "raise IndexError for out-of-bounds message numbers: "
 
924
    "raise ValueError instead.")
 
925
_listMessageSuppression = util.suppress(
 
926
    message=_listMessageDeprecation,
 
927
    category=PendingDeprecationWarning)
 
928
 
 
929
_getUidlDeprecation = (
 
930
    "twisted.mail.pop3.IMailbox.getUidl may not "
 
931
    "raise IndexError for out-of-bounds message numbers: "
 
932
    "raise ValueError instead.")
 
933
_getUidlSuppression = util.suppress(
 
934
    message=_getUidlDeprecation,
 
935
    category=PendingDeprecationWarning)
 
936
 
 
937
class IndexErrorCommandTestCase(CommandMixin, unittest.TestCase):
 
938
    """
 
939
    Run all of the command tests against a mailbox which raises IndexError
 
940
    when an out of bounds request is made.  This behavior will be deprecated
 
941
    shortly and then removed.
 
942
    """
 
943
    exceptionType = IndexError
 
944
    mailboxType = DummyMailbox
 
945
 
 
946
    def testLISTWithBadArgument(self):
 
947
        return CommandMixin.testLISTWithBadArgument(self)
 
948
    testLISTWithBadArgument.suppress = [_listMessageSuppression]
 
949
 
 
950
 
 
951
    def testUIDLWithBadArgument(self):
 
952
        return CommandMixin.testUIDLWithBadArgument(self)
 
953
    testUIDLWithBadArgument.suppress = [_getUidlSuppression]
 
954
 
 
955
 
 
956
    def testTOPWithBadArgument(self):
 
957
        return CommandMixin.testTOPWithBadArgument(self)
 
958
    testTOPWithBadArgument.suppress = [_listMessageSuppression]
 
959
 
 
960
 
 
961
    def testRETRWithBadArgument(self):
 
962
        return CommandMixin.testRETRWithBadArgument(self)
 
963
    testRETRWithBadArgument.suppress = [_listMessageSuppression]
 
964
 
 
965
 
 
966
 
 
967
class ValueErrorCommandTestCase(CommandMixin, unittest.TestCase):
 
968
    """
 
969
    Run all of the command tests against a mailbox which raises ValueError
 
970
    when an out of bounds request is made.  This is the correct behavior and
 
971
    after support for mailboxes which raise IndexError is removed, this will
 
972
    become just C{CommandTestCase}.
 
973
    """
 
974
    exceptionType = ValueError
 
975
    mailboxType = DummyMailbox
 
976
 
 
977
 
 
978
 
 
979
class SyncDeferredMailbox(DummyMailbox):
 
980
    """
 
981
    Mailbox which has a listMessages implementation which returns a Deferred
 
982
    which has already fired.
 
983
    """
 
984
    def listMessages(self, n=None):
 
985
        return defer.succeed(DummyMailbox.listMessages(self, n))
 
986
 
 
987
 
 
988
 
 
989
class IndexErrorSyncDeferredCommandTestCase(IndexErrorCommandTestCase):
 
990
    """
 
991
    Run all of the L{IndexErrorCommandTestCase} tests with a
 
992
    synchronous-Deferred returning IMailbox implementation.
 
993
    """
 
994
    mailboxType = SyncDeferredMailbox
 
995
 
 
996
 
 
997
 
 
998
class ValueErrorSyncDeferredCommandTestCase(ValueErrorCommandTestCase):
 
999
    """
 
1000
    Run all of the L{ValueErrorCommandTestCase} tests with a
 
1001
    synchronous-Deferred returning IMailbox implementation.
 
1002
    """
 
1003
    mailboxType = SyncDeferredMailbox
 
1004
 
 
1005
 
 
1006
 
 
1007
class AsyncDeferredMailbox(DummyMailbox):
 
1008
    """
 
1009
    Mailbox which has a listMessages implementation which returns a Deferred
 
1010
    which has not yet fired.
 
1011
    """
 
1012
    def __init__(self, *a, **kw):
 
1013
        self.waiting = []
 
1014
        DummyMailbox.__init__(self, *a, **kw)
 
1015
 
 
1016
 
 
1017
    def listMessages(self, n=None):
 
1018
        d = defer.Deferred()
 
1019
        # See AsyncDeferredMailbox._flush
 
1020
        self.waiting.append((d, DummyMailbox.listMessages(self, n)))
 
1021
        return d
 
1022
 
 
1023
 
 
1024
 
 
1025
class IndexErrorAsyncDeferredCommandTestCase(IndexErrorCommandTestCase):
 
1026
    """
 
1027
    Run all of the L{IndexErrorCommandTestCase} tests with an asynchronous-Deferred
 
1028
    returning IMailbox implementation.
 
1029
    """
 
1030
    mailboxType = AsyncDeferredMailbox
 
1031
 
 
1032
    def _flush(self):
 
1033
        """
 
1034
        Fire whatever Deferreds we've built up in our mailbox.
 
1035
        """
 
1036
        while self.pop3Server.mbox.waiting:
 
1037
            d, a = self.pop3Server.mbox.waiting.pop()
 
1038
            d.callback(a)
 
1039
        IndexErrorCommandTestCase._flush(self)
 
1040
 
 
1041
 
 
1042
 
 
1043
class ValueErrorAsyncDeferredCommandTestCase(ValueErrorCommandTestCase):
 
1044
    """
 
1045
    Run all of the L{IndexErrorCommandTestCase} tests with an asynchronous-Deferred
 
1046
    returning IMailbox implementation.
 
1047
    """
 
1048
    mailboxType = AsyncDeferredMailbox
 
1049
 
 
1050
    def _flush(self):
 
1051
        """
 
1052
        Fire whatever Deferreds we've built up in our mailbox.
 
1053
        """
 
1054
        while self.pop3Server.mbox.waiting:
 
1055
            d, a = self.pop3Server.mbox.waiting.pop()
 
1056
            d.callback(a)
 
1057
        ValueErrorCommandTestCase._flush(self)
 
1058
 
 
1059
class POP3MiscTestCase(unittest.TestCase):
 
1060
    """
 
1061
    Miscellaneous tests more to do with module/package structure than
 
1062
    anything to do with the Post Office Protocol.
 
1063
    """
 
1064
    def test_all(self):
 
1065
        """
 
1066
        This test checks that all names listed in
 
1067
        twisted.mail.pop3.__all__ are actually present in the module.
 
1068
        """
 
1069
        mod = twisted.mail.pop3
 
1070
        for attr in mod.__all__:
 
1071
            self.failUnless(hasattr(mod, attr))