1
# Copyright (c) 2001-2004 Twisted Matrix Laboratories.
2
# See LICENSE for details.
6
Test cases for twisted.mail.pop3 module.
15
from zope.interface import implements
17
from twisted.internet import defer
19
from twisted.trial import unittest, util
20
from twisted import mail
21
import twisted.mail.protocols
22
import twisted.mail.pop3
23
import twisted.internet.protocol
24
from twisted import internet
25
from twisted.mail import pop3
26
from twisted.protocols import loopback
27
from twisted.python import failure
29
from twisted import cred
30
import twisted.cred.portal
31
import twisted.cred.checkers
32
import twisted.cred.credentials
34
from twisted.test.proto_helpers import LineSendingProtocol
37
class UtilityTestCase(unittest.TestCase):
39
Test the various helper functions and classes used by the POP3 server
40
protocol implementation.
43
def testLineBuffering(self):
45
Test creating a LineBuffer and feeding it some lines. The lines should
46
build up in its internal buffer for a while and then get spat out to
50
input = iter(itertools.cycle(['012', '345', '6', '7', '8', '9']))
51
c = pop3._IteratorBuffer(output.extend, input, 6)
53
self.assertEquals(output, []) # nothing is buffer
55
self.assertEquals(output, []) # '012' is buffered
57
self.assertEquals(output, []) # '012345' is buffered
59
self.assertEquals(output, ['012', '345', '6']) # nothing is buffered
62
self.assertEquals(output, ['012', '345', '6', '7', '8', '9', '012', '345'])
65
def testFinishLineBuffering(self):
67
Test that a LineBuffer flushes everything when its iterator is
68
exhausted, and itself raises StopIteration.
71
input = iter(['a', 'b', 'c'])
72
c = pop3._IteratorBuffer(output.extend, input, 5)
75
self.assertEquals(output, ['a', 'b', 'c'])
78
def testSuccessResponseFormatter(self):
80
Test that the thing that spits out POP3 'success responses' works
84
pop3.successResponse('Great.'),
88
def testStatLineFormatter(self):
90
Test that the function which formats stat lines does so appropriately.
92
statLine = list(pop3.formatStatResponse([]))[-1]
93
self.assertEquals(statLine, '+OK 0 0\r\n')
95
statLine = list(pop3.formatStatResponse([10, 31, 0, 10101]))[-1]
96
self.assertEquals(statLine, '+OK 4 10142\r\n')
99
def testListLineFormatter(self):
101
Test that the function which formats the lines in response to a LIST
102
command does so appropriately.
104
listLines = list(pop3.formatListResponse([]))
107
['+OK 0\r\n', '.\r\n'])
109
listLines = list(pop3.formatListResponse([1, 2, 3, 100]))
112
['+OK 4\r\n', '1 1\r\n', '2 2\r\n', '3 3\r\n', '4 100\r\n', '.\r\n'])
116
def testUIDListLineFormatter(self):
118
Test that the function which formats lines in response to a UIDL
119
command does so appropriately.
121
UIDs = ['abc', 'def', 'ghi']
122
listLines = list(pop3.formatUIDListResponse([], UIDs.__getitem__))
125
['+OK \r\n', '.\r\n'])
127
listLines = list(pop3.formatUIDListResponse([123, 431, 591], UIDs.__getitem__))
130
['+OK \r\n', '1 abc\r\n', '2 def\r\n', '3 ghi\r\n', '.\r\n'])
132
listLines = list(pop3.formatUIDListResponse([0, None, 591], UIDs.__getitem__))
135
['+OK \r\n', '1 abc\r\n', '3 ghi\r\n', '.\r\n'])
139
class MyVirtualPOP3(mail.protocols.VirtualPOP3):
143
def authenticateUserAPOP(self, user, digest):
144
user, domain = self.lookupDomain(user)
145
return self.service.domains['baz.com'].authenticateUserAPOP(user, digest, self.magic, domain)
152
def addUser(self, name):
153
self.users[name] = []
155
def addMessage(self, name, message):
156
self.users[name].append(message)
158
def authenticateUserAPOP(self, name, digest, magic, domain):
159
return pop3.IMailbox, ListMailbox(self.users[name]), lambda: None
164
def __init__(self, list):
167
def listMessages(self, i=None):
169
return map(len, self.list)
170
return len(self.list[i])
172
def getMessage(self, i):
173
return StringIO.StringIO(self.list[i])
175
def getUidl(self, i):
178
def deleteMessage(self, i):
184
class MyPOP3Downloader(pop3.POP3Client):
186
def handle_WELCOME(self, line):
187
pop3.POP3Client.handle_WELCOME(self, line)
188
self.apop('hello@baz.com', 'world')
190
def handle_APOP(self, line):
193
data = (parts[1:] or ['NONE'])[0]
196
raise AssertionError, 'code is ' + code
200
def handle_RETR_continue(self, line):
201
self.lines.append(line)
203
def handle_RETR_end(self):
204
self.message = '\n'.join(self.lines) + '\n'
207
def handle_QUIT(self, line):
208
if line[:3] != '+OK':
209
raise AssertionError, 'code is ' + line
212
class POP3TestCase(unittest.TestCase):
217
Someone set up us the bomb!
220
expectedOutput = '''\
222
+OK Authentication succeeded\015
229
Someone set up us the bomb!\015
235
self.factory = internet.protocol.Factory()
236
self.factory.domains = {}
237
self.factory.domains['baz.com'] = DummyDomain()
238
self.factory.domains['baz.com'].addUser('hello')
239
self.factory.domains['baz.com'].addMessage('hello', self.message)
241
def testMessages(self):
242
client = LineSendingProtocol([
243
'APOP hello@baz.com world',
248
server = MyVirtualPOP3()
249
server.service = self.factory
251
output = '\r\n'.join(client.response) + '\r\n'
252
self.assertEquals(output, self.expectedOutput)
253
return loopback.loopbackTCP(server, client).addCallback(check)
255
def testLoopback(self):
256
protocol = MyVirtualPOP3()
257
protocol.service = self.factory
258
clientProtocol = MyPOP3Downloader()
260
self.failUnlessEqual(clientProtocol.message, self.message)
261
protocol.connectionLost(
262
failure.Failure(Exception("Test harness disconnect")))
263
d = loopback.loopbackAsync(protocol, clientProtocol)
264
return d.addCallback(check)
265
testLoopback.suppress = [util.suppress(message="twisted.mail.pop3.POP3Client is deprecated")]
269
class DummyPOP3(pop3.POP3):
273
def authenticateUserAPOP(self, user, password):
274
return pop3.IMailbox, DummyMailbox(ValueError), lambda: None
278
class DummyMailbox(pop3.Mailbox):
288
def __init__(self, exceptionType):
289
self.messages = DummyMailbox.messages[:]
290
self.exceptionType = exceptionType
292
def listMessages(self, i=None):
294
return map(len, self.messages)
295
if i >= len(self.messages):
296
raise self.exceptionType()
297
return len(self.messages[i])
299
def getMessage(self, i):
300
return StringIO.StringIO(self.messages[i])
302
def getUidl(self, i):
303
if i >= len(self.messages):
304
raise self.exceptionType()
307
def deleteMessage(self, i):
308
self.messages[i] = ''
311
class AnotherPOP3TestCase(unittest.TestCase):
313
def runTest(self, lines):
315
client = LineSendingProtocol([
325
d = loopback.loopbackAsync(dummy, client)
326
return d.addCallback(self._cbRunTest, client, dummy)
328
def _cbRunTest(self, ignored, client, dummy):
331
'+OK Authentication succeeded',
342
'How are you, friend?',
345
'-ERR Bad message number argument',
347
'-ERR message deleted',
351
self.failUnlessEqual('\r\n'.join(expected_output),
352
'\r\n'.join(client.response) + '\r\n')
353
dummy.connectionLost(failure.Failure(Exception("Test harness disconnect")))
357
def testBuffer(self):
358
lines = string.split('''\
367
return self.runTest(lines)
370
lines = ['APOP spiv dummy', 'NOOP', 'QUIT']
371
return self.runTest(lines)
373
def testAuthListing(self):
375
p.factory = internet.protocol.Factory()
376
p.factory.challengers = {'Auth1': None, 'secondAuth': None, 'authLast': None}
377
client = LineSendingProtocol([
382
d = loopback.loopbackAsync(p, client)
383
return d.addCallback(self._cbTestAuthListing, client)
385
def _cbTestAuthListing(self, ignored, client):
386
self.failUnless(client.response[1].startswith('+OK'))
387
self.assertEquals(client.response[2:6],
388
["AUTH1", "SECONDAUTH", "AUTHLAST", "."])
390
def testIllegalPASS(self):
392
client = LineSendingProtocol([
396
d = loopback.loopbackAsync(dummy, client)
397
return d.addCallback(self._cbTestIllegalPASS, client, dummy)
399
def _cbTestIllegalPASS(self, ignored, client, dummy):
400
expected_output = '+OK <moshez>\r\n-ERR USER required before PASS\r\n+OK \r\n'
401
self.failUnlessEqual(expected_output, '\r\n'.join(client.response) + '\r\n')
402
dummy.connectionLost(failure.Failure(Exception("Test harness disconnect")))
404
def testEmptyPASS(self):
406
client = LineSendingProtocol([
410
d = loopback.loopbackAsync(dummy, client)
411
return d.addCallback(self._cbTestEmptyPASS, client, dummy)
413
def _cbTestEmptyPASS(self, ignored, client, dummy):
414
expected_output = '+OK <moshez>\r\n-ERR USER required before PASS\r\n+OK \r\n'
415
self.failUnlessEqual(expected_output, '\r\n'.join(client.response) + '\r\n')
416
dummy.connectionLost(failure.Failure(Exception("Test harness disconnect")))
419
class TestServerFactory:
420
implements(pop3.IServerFactory)
422
def cap_IMPLEMENTATION(self):
423
return "Test Implementation String"
425
def cap_EXPIRE(self):
428
challengers = {"SCHEME_1": None, "SCHEME_2": None}
430
def cap_LOGIN_DELAY(self):
434
def perUserExpiration(self):
438
def perUserLoginDelay(self):
444
messageExpiration = 25
447
class CapabilityTestCase(unittest.TestCase):
449
s = StringIO.StringIO()
451
p.factory = TestServerFactory()
452
p.transport = internet.protocol.FileWrapper(s)
456
self.caps = p.listCapabilities()
457
self.pcaps = s.getvalue().splitlines()
459
s = StringIO.StringIO()
460
p.mbox = TestMailbox()
461
p.transport = internet.protocol.FileWrapper(s)
464
self.lpcaps = s.getvalue().splitlines()
465
p.connectionLost(failure.Failure(Exception("Test harness disconnect")))
467
def contained(self, s, *caps):
472
self.contained("UIDL", self.caps, self.pcaps, self.lpcaps)
475
self.contained("TOP", self.caps, self.pcaps, self.lpcaps)
478
self.contained("USER", self.caps, self.pcaps, self.lpcaps)
480
def testEXPIRE(self):
481
self.contained("EXPIRE 60 USER", self.caps, self.pcaps)
482
self.contained("EXPIRE 25", self.lpcaps)
484
def testIMPLEMENTATION(self):
486
"IMPLEMENTATION Test Implementation String",
487
self.caps, self.pcaps, self.lpcaps
492
"SASL SCHEME_1 SCHEME_2",
493
self.caps, self.pcaps, self.lpcaps
496
def testLOGIN_DELAY(self):
497
self.contained("LOGIN-DELAY 120 USER", self.caps, self.pcaps)
498
self.assertIn("LOGIN-DELAY 100", self.lpcaps)
502
class GlobalCapabilitiesTestCase(unittest.TestCase):
504
s = StringIO.StringIO()
506
p.factory = TestServerFactory()
507
p.factory.pue = p.factory.puld = False
508
p.transport = internet.protocol.FileWrapper(s)
512
self.caps = p.listCapabilities()
513
self.pcaps = s.getvalue().splitlines()
515
s = StringIO.StringIO()
516
p.mbox = TestMailbox()
517
p.transport = internet.protocol.FileWrapper(s)
520
self.lpcaps = s.getvalue().splitlines()
521
p.connectionLost(failure.Failure(Exception("Test harness disconnect")))
523
def contained(self, s, *caps):
527
def testEXPIRE(self):
528
self.contained("EXPIRE 60", self.caps, self.pcaps, self.lpcaps)
530
def testLOGIN_DELAY(self):
531
self.contained("LOGIN-DELAY 120", self.caps, self.pcaps, self.lpcaps)
536
def requestAvatar(self, avatarId, mind, *interfaces):
537
if avatarId == 'testuser':
538
return pop3.IMailbox, DummyMailbox(ValueError), lambda: None
543
class SASLTestCase(unittest.TestCase):
544
def testValidLogin(self):
546
p.factory = TestServerFactory()
547
p.factory.challengers = {'CRAM-MD5': cred.credentials.CramMD5Credentials}
548
p.portal = cred.portal.Portal(TestRealm())
549
ch = cred.checkers.InMemoryUsernamePasswordDatabaseDontUse()
550
ch.addUser('testuser', 'testpassword')
551
p.portal.registerChecker(ch)
553
s = StringIO.StringIO()
554
p.transport = internet.protocol.FileWrapper(s)
557
p.lineReceived("CAPA")
558
self.failUnless(s.getvalue().find("SASL CRAM-MD5") >= 0)
560
p.lineReceived("AUTH CRAM-MD5")
561
chal = s.getvalue().splitlines()[-1][2:]
562
chal = base64.decodestring(chal)
563
response = hmac.HMAC('testpassword', chal).hexdigest()
565
p.lineReceived(base64.encodestring('testuser ' + response).rstrip('\n'))
566
self.failUnless(p.mbox)
567
self.failUnless(s.getvalue().splitlines()[-1].find("+OK") >= 0)
568
p.connectionLost(failure.Failure(Exception("Test harness disconnect")))
574
Tests for all the commands a POP3 server is allowed to receive.
581
More message text for you.
587
Make a POP3 server protocol instance hooked up to a simple mailbox and
588
a transport that buffers output to a StringIO.
591
p.mbox = self.mailboxType(self.exceptionType)
595
s = StringIO.StringIO()
596
p.transport = internet.protocol.FileWrapper(s)
599
self.pop3Transport = s
604
Disconnect the server protocol so it can clean up anything it might
607
self.pop3Server.connectionLost(failure.Failure(Exception("Test harness disconnect")))
612
Do some of the things that the reactor would take care of, if the
613
reactor were actually running.
615
# Oh man FileWrapper is pooh.
616
self.pop3Server.transport._checkProducer()
621
Test the two forms of list: with a message index number, which should
622
return a short-form response, and without a message index number, which
623
should return a long-form response, one line per message.
626
s = self.pop3Transport
628
p.lineReceived("LIST 1")
630
self.assertEquals(s.getvalue(), "+OK 1 44\r\n")
633
p.lineReceived("LIST")
635
self.assertEquals(s.getvalue(), "+OK 1\r\n1 44\r\n.\r\n")
638
def testLISTWithBadArgument(self):
640
Test that non-integers and out-of-bound integers produce appropriate
644
s = self.pop3Transport
646
p.lineReceived("LIST a")
649
"-ERR Invalid message-number: 'a'\r\n")
652
p.lineReceived("LIST 0")
655
"-ERR Invalid message-number: 0\r\n")
658
p.lineReceived("LIST 2")
661
"-ERR Invalid message-number: 2\r\n")
667
Test the two forms of the UIDL command. These are just like the two
668
forms of the LIST command.
671
s = self.pop3Transport
673
p.lineReceived("UIDL 1")
674
self.assertEquals(s.getvalue(), "+OK 0\r\n")
677
p.lineReceived("UIDL")
679
self.assertEquals(s.getvalue(), "+OK \r\n1 0\r\n.\r\n")
682
def testUIDLWithBadArgument(self):
684
Test that UIDL with a non-integer or an out-of-bounds integer produces
685
the appropriate error response.
688
s = self.pop3Transport
690
p.lineReceived("UIDL a")
693
"-ERR Bad message number argument\r\n")
696
p.lineReceived("UIDL 0")
699
"-ERR Bad message number argument\r\n")
702
p.lineReceived("UIDL 2")
705
"-ERR Bad message number argument\r\n")
711
Test the single form of the STAT command, which returns a short-form
712
response of the number of messages in the mailbox and their total size.
715
s = self.pop3Transport
717
p.lineReceived("STAT")
719
self.assertEquals(s.getvalue(), "+OK 1 44\r\n")
724
Test downloading a message.
727
s = self.pop3Transport
729
p.lineReceived("RETR 1")
737
"How are you, friend?\r\n"
742
def testRETRWithBadArgument(self):
744
Test that trying to download a message with a bad argument, either not
745
an integer or an out-of-bounds integer, fails with the appropriate
749
s = self.pop3Transport
751
p.lineReceived("RETR a")
754
"-ERR Bad message number argument\r\n")
757
p.lineReceived("RETR 0")
760
"-ERR Bad message number argument\r\n")
763
p.lineReceived("RETR 2")
766
"-ERR Bad message number argument\r\n")
772
Test downloading the headers and part of the body of a message.
775
s = self.pop3Transport
776
p.mbox.messages.append(self.extraMessage)
778
p.lineReceived("TOP 1 0")
782
"+OK Top of message follows\r\n"
789
def testTOPWithBadArgument(self):
791
Test that trying to download a message with a bad argument, either a
792
message number which isn't an integer or is an out-of-bounds integer or
793
a number of lines which isn't an integer or is a negative integer,
794
fails with the appropriate error response.
797
s = self.pop3Transport
798
p.mbox.messages.append(self.extraMessage)
800
p.lineReceived("TOP 1 a")
803
"-ERR Bad line count argument\r\n")
806
p.lineReceived("TOP 1 -1")
809
"-ERR Bad line count argument\r\n")
812
p.lineReceived("TOP a 1")
815
"-ERR Bad message number argument\r\n")
818
p.lineReceived("TOP 0 1")
821
"-ERR Bad message number argument\r\n")
824
p.lineReceived("TOP 3 1")
827
"-ERR Bad message number argument\r\n")
833
Test the exceedingly pointless LAST command, which tells you the
834
highest message index which you have already downloaded.
837
s = self.pop3Transport
838
p.mbox.messages.append(self.extraMessage)
840
p.lineReceived('LAST')
847
def testRetrieveUpdatesHighest(self):
849
Test that issuing a RETR command updates the LAST response.
852
s = self.pop3Transport
853
p.mbox.messages.append(self.extraMessage)
855
p.lineReceived('RETR 2')
858
p.lineReceived('LAST')
865
def testTopUpdatesHighest(self):
867
Test that issuing a TOP command updates the LAST response.
870
s = self.pop3Transport
871
p.mbox.messages.append(self.extraMessage)
873
p.lineReceived('TOP 2 10')
876
p.lineReceived('LAST')
882
def testHighestOnlyProgresses(self):
884
Test that downloading a message with a smaller index than the current
885
LAST response doesn't change the LAST response.
888
s = self.pop3Transport
889
p.mbox.messages.append(self.extraMessage)
891
p.lineReceived('RETR 2')
893
p.lineReceived('TOP 1 10')
896
p.lineReceived('LAST')
902
def testResetClearsHighest(self):
904
Test that issuing RSET changes the LAST response to 0.
907
s = self.pop3Transport
908
p.mbox.messages.append(self.extraMessage)
910
p.lineReceived('RETR 2')
912
p.lineReceived('RSET')
914
p.lineReceived('LAST')
921
_listMessageDeprecation = (
922
"twisted.mail.pop3.IMailbox.listMessages may not "
923
"raise IndexError for out-of-bounds message numbers: "
924
"raise ValueError instead.")
925
_listMessageSuppression = util.suppress(
926
message=_listMessageDeprecation,
927
category=PendingDeprecationWarning)
929
_getUidlDeprecation = (
930
"twisted.mail.pop3.IMailbox.getUidl may not "
931
"raise IndexError for out-of-bounds message numbers: "
932
"raise ValueError instead.")
933
_getUidlSuppression = util.suppress(
934
message=_getUidlDeprecation,
935
category=PendingDeprecationWarning)
937
class IndexErrorCommandTestCase(CommandMixin, unittest.TestCase):
939
Run all of the command tests against a mailbox which raises IndexError
940
when an out of bounds request is made. This behavior will be deprecated
941
shortly and then removed.
943
exceptionType = IndexError
944
mailboxType = DummyMailbox
946
def testLISTWithBadArgument(self):
947
return CommandMixin.testLISTWithBadArgument(self)
948
testLISTWithBadArgument.suppress = [_listMessageSuppression]
951
def testUIDLWithBadArgument(self):
952
return CommandMixin.testUIDLWithBadArgument(self)
953
testUIDLWithBadArgument.suppress = [_getUidlSuppression]
956
def testTOPWithBadArgument(self):
957
return CommandMixin.testTOPWithBadArgument(self)
958
testTOPWithBadArgument.suppress = [_listMessageSuppression]
961
def testRETRWithBadArgument(self):
962
return CommandMixin.testRETRWithBadArgument(self)
963
testRETRWithBadArgument.suppress = [_listMessageSuppression]
967
class ValueErrorCommandTestCase(CommandMixin, unittest.TestCase):
969
Run all of the command tests against a mailbox which raises ValueError
970
when an out of bounds request is made. This is the correct behavior and
971
after support for mailboxes which raise IndexError is removed, this will
972
become just C{CommandTestCase}.
974
exceptionType = ValueError
975
mailboxType = DummyMailbox
979
class SyncDeferredMailbox(DummyMailbox):
981
Mailbox which has a listMessages implementation which returns a Deferred
982
which has already fired.
984
def listMessages(self, n=None):
985
return defer.succeed(DummyMailbox.listMessages(self, n))
989
class IndexErrorSyncDeferredCommandTestCase(IndexErrorCommandTestCase):
991
Run all of the L{IndexErrorCommandTestCase} tests with a
992
synchronous-Deferred returning IMailbox implementation.
994
mailboxType = SyncDeferredMailbox
998
class ValueErrorSyncDeferredCommandTestCase(ValueErrorCommandTestCase):
1000
Run all of the L{ValueErrorCommandTestCase} tests with a
1001
synchronous-Deferred returning IMailbox implementation.
1003
mailboxType = SyncDeferredMailbox
1007
class AsyncDeferredMailbox(DummyMailbox):
1009
Mailbox which has a listMessages implementation which returns a Deferred
1010
which has not yet fired.
1012
def __init__(self, *a, **kw):
1014
DummyMailbox.__init__(self, *a, **kw)
1017
def listMessages(self, n=None):
1018
d = defer.Deferred()
1019
# See AsyncDeferredMailbox._flush
1020
self.waiting.append((d, DummyMailbox.listMessages(self, n)))
1025
class IndexErrorAsyncDeferredCommandTestCase(IndexErrorCommandTestCase):
1027
Run all of the L{IndexErrorCommandTestCase} tests with an asynchronous-Deferred
1028
returning IMailbox implementation.
1030
mailboxType = AsyncDeferredMailbox
1034
Fire whatever Deferreds we've built up in our mailbox.
1036
while self.pop3Server.mbox.waiting:
1037
d, a = self.pop3Server.mbox.waiting.pop()
1039
IndexErrorCommandTestCase._flush(self)
1043
class ValueErrorAsyncDeferredCommandTestCase(ValueErrorCommandTestCase):
1045
Run all of the L{IndexErrorCommandTestCase} tests with an asynchronous-Deferred
1046
returning IMailbox implementation.
1048
mailboxType = AsyncDeferredMailbox
1052
Fire whatever Deferreds we've built up in our mailbox.
1054
while self.pop3Server.mbox.waiting:
1055
d, a = self.pop3Server.mbox.waiting.pop()
1057
ValueErrorCommandTestCase._flush(self)
1059
class POP3MiscTestCase(unittest.TestCase):
1061
Miscellaneous tests more to do with module/package structure than
1062
anything to do with the Post Office Protocol.
1066
This test checks that all names listed in
1067
twisted.mail.pop3.__all__ are actually present in the module.
1069
mod = twisted.mail.pop3
1070
for attr in mod.__all__:
1071
self.failUnless(hasattr(mod, attr))