1
# -*- test-case-name: twisted.mail.test.test_imap -*-
2
# Copyright (c) 2001-2004 Twisted Matrix Laboratories.
3
# See LICENSE for details.
7
Test case for twisted.mail.imap4
10
from __future__ import nested_scopes
13
from cStringIO import StringIO
15
from StringIO import StringIO
21
from zope.interface import implements
23
from twisted.mail.imap4 import MessageSet
24
from twisted.mail import imap4
25
from twisted.mail import smtp
26
from twisted.protocols import loopback
27
from twisted.internet import defer
28
from twisted.internet import error
29
from twisted.internet import reactor
30
from twisted.internet import interfaces
31
from twisted.trial import unittest
32
from twisted.python import util
33
from twisted.python.util import sibpath
34
from twisted.python import failure
36
from twisted import cred
37
import twisted.cred.error
38
import twisted.cred.checkers
39
import twisted.cred.credentials
40
import twisted.cred.portal
42
from twisted.test.proto_helpers import StringTransport, StringTransportWithDisconnection
45
from twisted.test.ssl_helpers import ClientTLSContext, ServerTLSContext
47
ClientTLSContext = ServerTLSContext = None
50
return lambda result, f=f: f()
55
for i in range(len(l)):
56
if isinstance(l[i], types.ListType):
58
elif isinstance(l[i], types.TupleType):
59
l[i] = tuple(sortNest(list(l[i])))
62
class IMAP4UTF7TestCase(unittest.TestCase):
64
[u'Hello world', 'Hello world'],
65
[u'Hello & world', 'Hello &- world'],
66
[u'Hello\xffworld', 'Hello&AP8-world'],
67
[u'\xff\xfe\xfd\xfc', '&AP8A,gD9APw-'],
68
[u'~peter/mail/\u65e5\u672c\u8a9e/\u53f0\u5317',
69
'~peter/mail/&ZeVnLIqe-/&U,BTFw-'], # example from RFC 2060
73
for (input, output) in self.tests:
74
self.assertEquals(input.encode('imap4-utf-7'), output)
77
for (input, output) in self.tests:
78
# XXX - Piece of *crap* 2.1
79
self.assertEquals(input, imap4.decoder(output)[0])
81
def testPrintableSingletons(self):
82
# All printables represent themselves
83
for o in range(0x20, 0x26) + range(0x27, 0x7f):
84
self.failUnlessEqual(chr(o), chr(o).encode('imap4-utf-7'))
85
self.failUnlessEqual(chr(o), chr(o).decode('imap4-utf-7'))
86
self.failUnlessEqual('&'.encode('imap4-utf-7'), '&-')
87
self.failUnlessEqual('&-'.decode('imap4-utf-7'), '&')
89
class BufferingConsumer:
93
def write(self, bytes):
94
self.buffer.append(bytes)
96
self.consumer.resumeProducing()
98
def registerProducer(self, consumer, streaming):
99
self.consumer = consumer
100
self.consumer.resumeProducing()
102
def unregisterProducer(self):
105
class MessageProducerTestCase(unittest.TestCase):
106
def testSinglePart(self):
107
body = 'This is body text. Rar.'
108
headers = util.OrderedDict()
109
headers['from'] = 'sender@host'
110
headers['to'] = 'recipient@domain'
111
headers['subject'] = 'booga booga boo'
112
headers['content-type'] = 'text/plain'
114
msg = FakeyMessage(headers, (), None, body, 123, None )
116
c = BufferingConsumer()
117
p = imap4.MessageProducer(msg)
118
d = p.beginProducing(c)
120
def cbProduced(result):
121
self.assertIdentical(result, p)
126
'From: sender@host\r\n'
127
'To: recipient@domain\r\n'
128
'Subject: booga booga boo\r\n'
129
'Content-Type: text/plain\r\n'
132
return d.addCallback(cbProduced)
135
def testSingleMultiPart(self):
137
innerBody = 'Contained body message text. Squarge.'
138
headers = util.OrderedDict()
139
headers['from'] = 'sender@host'
140
headers['to'] = 'recipient@domain'
141
headers['subject'] = 'booga booga boo'
142
headers['content-type'] = 'multipart/alternative; boundary="xyz"'
144
innerHeaders = util.OrderedDict()
145
innerHeaders['subject'] = 'this is subject text'
146
innerHeaders['content-type'] = 'text/plain'
147
msg = FakeyMessage(headers, (), None, outerBody, 123,
148
[FakeyMessage(innerHeaders, (), None, innerBody,
152
c = BufferingConsumer()
153
p = imap4.MessageProducer(msg)
154
d = p.beginProducing(c)
156
def cbProduced(result):
157
self.failUnlessIdentical(result, p)
163
'From: sender@host\r\n'
164
'To: recipient@domain\r\n'
165
'Subject: booga booga boo\r\n'
166
'Content-Type: multipart/alternative; boundary="xyz"\r\n'
170
'Subject: this is subject text\r\n'
171
'Content-Type: text/plain\r\n'
176
return d.addCallback(cbProduced)
179
def testMultipleMultiPart(self):
181
innerBody1 = 'Contained body message text. Squarge.'
182
innerBody2 = 'Secondary <i>message</i> text of squarge body.'
183
headers = util.OrderedDict()
184
headers['from'] = 'sender@host'
185
headers['to'] = 'recipient@domain'
186
headers['subject'] = 'booga booga boo'
187
headers['content-type'] = 'multipart/alternative; boundary="xyz"'
188
innerHeaders = util.OrderedDict()
189
innerHeaders['subject'] = 'this is subject text'
190
innerHeaders['content-type'] = 'text/plain'
191
innerHeaders2 = util.OrderedDict()
192
innerHeaders2['subject'] = '<b>this is subject</b>'
193
innerHeaders2['content-type'] = 'text/html'
194
msg = FakeyMessage(headers, (), None, outerBody, 123, [
195
FakeyMessage(innerHeaders, (), None, innerBody1, None, None),
196
FakeyMessage(innerHeaders2, (), None, innerBody2, None, None)
200
c = BufferingConsumer()
201
p = imap4.MessageProducer(msg)
202
d = p.beginProducing(c)
204
def cbProduced(result):
205
self.failUnlessIdentical(result, p)
211
'From: sender@host\r\n'
212
'To: recipient@domain\r\n'
213
'Subject: booga booga boo\r\n'
214
'Content-Type: multipart/alternative; boundary="xyz"\r\n'
218
'Subject: this is subject text\r\n'
219
'Content-Type: text/plain\r\n'
223
'Subject: <b>this is subject</b>\r\n'
224
'Content-Type: text/html\r\n'
228
return d.addCallback(cbProduced)
231
class IMAP4HelperTestCase(unittest.TestCase):
232
def testFileProducer(self):
233
b = (('x' * 1) + ('y' * 1) + ('z' * 1)) * 10
234
c = BufferingConsumer()
236
p = imap4.FileProducer(f)
237
d = p.beginProducing(c)
239
def cbProduced(result):
240
self.failUnlessIdentical(result, p)
242
('{%d}\r\n' % len(b))+ b,
244
return d.addCallback(cbProduced)
246
def testWildcard(self):
249
['foo/bar', 'oo/lalagum/bar', 'foo/gumx/bar', 'foo/gum/baz'],
250
['foo/xgum/bar', 'foo/gum/bar'],
252
['foo', 'bar', 'fuz fuz fuz', 'foo/*/bar', 'foo/xyz/bar', 'foo/xx/baz'],
253
['foo/xyx/bar', 'foo/xx/bar', 'foo/xxxxxxxxxxxxxx/bar'],
254
], ['foo/xyz*abc/bar',
255
['foo/xyz/bar', 'foo/abc/bar', 'foo/xyzab/cbar', 'foo/xyza/bcbar'],
256
['foo/xyzabc/bar', 'foo/xyz/abc/bar', 'foo/xyz/123/abc/bar'],
260
for (wildcard, fail, succeed) in cases:
261
wildcard = imap4.wildcardToRegexp(wildcard, '/')
263
self.failIf(wildcard.match(x))
265
self.failUnless(wildcard.match(x))
267
def testWildcardNoDelim(self):
270
['foo/bar', 'oo/lalagum/bar', 'foo/gumx/bar', 'foo/gum/baz'],
271
['foo/xgum/bar', 'foo/gum/bar', 'foo/x/gum/bar'],
273
['foo', 'bar', 'fuz fuz fuz', 'foo/*/bar', 'foo/xyz/bar', 'foo/xx/baz'],
274
['foo/xyx/bar', 'foo/xx/bar', 'foo/xxxxxxxxxxxxxx/bar', 'foo/x/x/bar'],
275
], ['foo/xyz*abc/bar',
276
['foo/xyz/bar', 'foo/abc/bar', 'foo/xyzab/cbar', 'foo/xyza/bcbar'],
277
['foo/xyzabc/bar', 'foo/xyz/abc/bar', 'foo/xyz/123/abc/bar'],
281
for (wildcard, fail, succeed) in cases:
282
wildcard = imap4.wildcardToRegexp(wildcard, None)
284
self.failIf(wildcard.match(x), x)
286
self.failUnless(wildcard.match(x), x)
288
def testHeaderFormatter(self):
290
({'Header1': 'Value1', 'Header2': 'Value2'}, 'Header2: Value2\r\nHeader1: Value1\r\n'),
293
for (input, output) in cases:
294
self.assertEquals(imap4._formatHeaders(input), output)
296
def testMessageSet(self):
300
self.assertEquals(m1, m2)
303
self.assertEquals(len(m1), 3)
304
self.assertEquals(list(m1), [1, 2, 3])
307
self.assertEquals(m1, m2)
308
self.assertEquals(list(m1 + m2), [1, 2, 3])
310
def testQuotedSplitter(self):
313
'''Hello "World!"''',
314
'''World "Hello" "How are you?"''',
315
'''"Hello world" How "are you?"''',
316
'''foo bar "baz buz" NIL''',
317
'''foo bar "baz buz" "NIL"''',
318
'''foo NIL "baz buz" bar''',
319
'''foo "NIL" "baz buz" bar''',
320
'''"NIL" bar "baz buz" foo''',
326
['World', 'Hello', 'How are you?'],
327
['Hello world', 'How', 'are you?'],
328
['foo', 'bar', 'baz buz', None],
329
['foo', 'bar', 'baz buz', 'NIL'],
330
['foo', None, 'baz buz', 'bar'],
331
['foo', 'NIL', 'baz buz', 'bar'],
332
['NIL', 'bar', 'baz buz', 'foo'],
339
'"oops here is" another"',
343
self.assertRaises(imap4.MismatchedQuoting, imap4.splitQuoted, s)
345
for (case, expected) in zip(cases, answers):
346
self.assertEquals(imap4.splitQuoted(case), expected)
349
def testStringCollapser(self):
351
['a', 'b', 'c', 'd', 'e'],
352
['a', ' ', '"', 'b', 'c', ' ', '"', ' ', 'd', 'e'],
353
[['a', 'b', 'c'], 'd', 'e'],
354
['a', ['b', 'c', 'd'], 'e'],
355
['a', 'b', ['c', 'd', 'e']],
356
['"', 'a', ' ', '"', ['b', 'c', 'd'], '"', ' ', 'e', '"'],
357
['a', ['"', ' ', 'b', 'c', ' ', ' ', '"'], 'd', 'e'],
366
['a ', ['bcd'], ' e'],
367
['a', [' bc '], 'de'],
370
for (case, expected) in zip(cases, answers):
371
self.assertEquals(imap4.collapseStrings(case), expected)
373
def testParenParser(self):
374
s = '\r\n'.join(['xx'] * 4)
376
'(BODY.PEEK[HEADER.FIELDS.NOT (subject bcc cc)] {%d}\r\n%s)' % (len(s), s,),
378
# '(FLAGS (\Seen) INTERNALDATE "17-Jul-1996 02:44:25 -0700" '
379
# 'RFC822.SIZE 4286 ENVELOPE ("Wed, 17 Jul 1996 02:23:25 -0700 (PDT)" '
380
# '"IMAP4rev1 WG mtg summary and minutes" '
381
# '(("Terry Gray" NIL "gray" "cac.washington.edu")) '
382
# '(("Terry Gray" NIL "gray" "cac.washington.edu")) '
383
# '(("Terry Gray" NIL "gray" "cac.washington.edu")) '
384
# '((NIL NIL "imap" "cac.washington.edu")) '
385
# '((NIL NIL "minutes" "CNRI.Reston.VA.US") '
386
# '("John Klensin" NIL "KLENSIN" "INFOODS.MIT.EDU")) NIL NIL '
387
# '"<B27397-0100000@cac.washington.edu>") '
388
# 'BODY ("TEXT" "PLAIN" ("CHARSET" "US-ASCII") NIL NIL "7BIT" 3028 92))',
390
'(FLAGS (\Seen) INTERNALDATE "17-Jul-1996 02:44:25 -0700" '
391
'RFC822.SIZE 4286 ENVELOPE ("Wed, 17 Jul 1996 02:23:25 -0700 (PDT)" '
392
'"IMAP4rev1 WG mtg summary and minutes" '
393
'(("Terry Gray" NIL gray cac.washington.edu)) '
394
'(("Terry Gray" NIL gray cac.washington.edu)) '
395
'(("Terry Gray" NIL gray cac.washington.edu)) '
396
'((NIL NIL imap cac.washington.edu)) '
397
'((NIL NIL minutes CNRI.Reston.VA.US) '
398
'("John Klensin" NIL KLENSIN INFOODS.MIT.EDU)) NIL NIL '
399
'<B27397-0100000@cac.washington.edu>) '
400
'BODY (TEXT PLAIN (CHARSET US-ASCII) NIL NIL 7BIT 3028 92))',
404
['BODY.PEEK', ['HEADER.FIELDS.NOT', ['subject', 'bcc', 'cc']], s],
406
['FLAGS', [r'\Seen'], 'INTERNALDATE',
407
'17-Jul-1996 02:44:25 -0700', 'RFC822.SIZE', '4286', 'ENVELOPE',
408
['Wed, 17 Jul 1996 02:23:25 -0700 (PDT)',
409
'IMAP4rev1 WG mtg summary and minutes', [["Terry Gray", None,
410
"gray", "cac.washington.edu"]], [["Terry Gray", None,
411
"gray", "cac.washington.edu"]], [["Terry Gray", None,
412
"gray", "cac.washington.edu"]], [[None, None, "imap",
413
"cac.washington.edu"]], [[None, None, "minutes",
414
"CNRI.Reston.VA.US"], ["John Klensin", None, "KLENSIN",
415
"INFOODS.MIT.EDU"]], None, None,
416
"<B27397-0100000@cac.washington.edu>"], "BODY", ["TEXT", "PLAIN",
417
["CHARSET", "US-ASCII"], None, None, "7BIT", "3028", "92"]],
420
for (case, expected) in zip(cases, answers):
421
self.assertEquals(imap4.parseNestedParens(case), [expected])
423
# XXX This code used to work, but changes occurred within the
424
# imap4.py module which made it no longer necessary for *all* of it
425
# to work. In particular, only the part that makes
426
# 'BODY.PEEK[HEADER.FIELDS.NOT (Subject Bcc Cc)]' come out correctly
427
# no longer needs to work. So, I am loathe to delete the entire
428
# section of the test. --exarkun
431
# for (case, expected) in zip(answers, cases):
432
# self.assertEquals('(' + imap4.collapseNestedLists(case) + ')', expected)
434
def testFetchParserSimple(self):
436
['ENVELOPE', 'Envelope'],
438
['INTERNALDATE', 'InternalDate'],
439
['RFC822.HEADER', 'RFC822Header'],
440
['RFC822.SIZE', 'RFC822Size'],
441
['RFC822.TEXT', 'RFC822Text'],
442
['RFC822', 'RFC822'],
444
['BODYSTRUCTURE', 'BodyStructure'],
447
for (inp, outp) in cases:
448
p = imap4._FetchParser()
450
self.assertEquals(len(p.result), 1)
451
self.failUnless(isinstance(p.result[0], getattr(p, outp)))
453
def testFetchParserMacros(self):
455
['ALL', (4, ['flags', 'internaldate', 'rfc822.size', 'envelope'])],
456
['FULL', (5, ['flags', 'internaldate', 'rfc822.size', 'envelope', 'body'])],
457
['FAST', (3, ['flags', 'internaldate', 'rfc822.size'])],
460
for (inp, outp) in cases:
461
p = imap4._FetchParser()
463
self.assertEquals(len(p.result), outp[0])
464
p = [str(p).lower() for p in p.result]
467
self.assertEquals(p, outp[1])
469
def testFetchParserBody(self):
470
P = imap4._FetchParser
473
p.parseString('BODY')
474
self.assertEquals(len(p.result), 1)
475
self.failUnless(isinstance(p.result[0], p.Body))
476
self.assertEquals(p.result[0].peek, False)
477
self.assertEquals(p.result[0].header, None)
478
self.assertEquals(str(p.result[0]), 'BODY')
481
p.parseString('BODY.PEEK')
482
self.assertEquals(len(p.result), 1)
483
self.failUnless(isinstance(p.result[0], p.Body))
484
self.assertEquals(p.result[0].peek, True)
485
self.assertEquals(str(p.result[0]), 'BODY')
488
p.parseString('BODY[]')
489
self.assertEquals(len(p.result), 1)
490
self.failUnless(isinstance(p.result[0], p.Body))
491
self.assertEquals(p.result[0].empty, True)
492
self.assertEquals(str(p.result[0]), 'BODY[]')
495
p.parseString('BODY[HEADER]')
496
self.assertEquals(len(p.result), 1)
497
self.failUnless(isinstance(p.result[0], p.Body))
498
self.assertEquals(p.result[0].peek, False)
499
self.failUnless(isinstance(p.result[0].header, p.Header))
500
self.assertEquals(p.result[0].header.negate, True)
501
self.assertEquals(p.result[0].header.fields, ())
502
self.assertEquals(p.result[0].empty, False)
503
self.assertEquals(str(p.result[0]), 'BODY[HEADER]')
506
p.parseString('BODY.PEEK[HEADER]')
507
self.assertEquals(len(p.result), 1)
508
self.failUnless(isinstance(p.result[0], p.Body))
509
self.assertEquals(p.result[0].peek, True)
510
self.failUnless(isinstance(p.result[0].header, p.Header))
511
self.assertEquals(p.result[0].header.negate, True)
512
self.assertEquals(p.result[0].header.fields, ())
513
self.assertEquals(p.result[0].empty, False)
514
self.assertEquals(str(p.result[0]), 'BODY[HEADER]')
517
p.parseString('BODY[HEADER.FIELDS (Subject Cc Message-Id)]')
518
self.assertEquals(len(p.result), 1)
519
self.failUnless(isinstance(p.result[0], p.Body))
520
self.assertEquals(p.result[0].peek, False)
521
self.failUnless(isinstance(p.result[0].header, p.Header))
522
self.assertEquals(p.result[0].header.negate, False)
523
self.assertEquals(p.result[0].header.fields, ['SUBJECT', 'CC', 'MESSAGE-ID'])
524
self.assertEquals(p.result[0].empty, False)
525
self.assertEquals(str(p.result[0]), 'BODY[HEADER.FIELDS (Subject Cc Message-Id)]')
528
p.parseString('BODY.PEEK[HEADER.FIELDS (Subject Cc Message-Id)]')
529
self.assertEquals(len(p.result), 1)
530
self.failUnless(isinstance(p.result[0], p.Body))
531
self.assertEquals(p.result[0].peek, True)
532
self.failUnless(isinstance(p.result[0].header, p.Header))
533
self.assertEquals(p.result[0].header.negate, False)
534
self.assertEquals(p.result[0].header.fields, ['SUBJECT', 'CC', 'MESSAGE-ID'])
535
self.assertEquals(p.result[0].empty, False)
536
self.assertEquals(str(p.result[0]), 'BODY[HEADER.FIELDS (Subject Cc Message-Id)]')
539
p.parseString('BODY.PEEK[HEADER.FIELDS.NOT (Subject Cc Message-Id)]')
540
self.assertEquals(len(p.result), 1)
541
self.failUnless(isinstance(p.result[0], p.Body))
542
self.assertEquals(p.result[0].peek, True)
543
self.failUnless(isinstance(p.result[0].header, p.Header))
544
self.assertEquals(p.result[0].header.negate, True)
545
self.assertEquals(p.result[0].header.fields, ['SUBJECT', 'CC', 'MESSAGE-ID'])
546
self.assertEquals(p.result[0].empty, False)
547
self.assertEquals(str(p.result[0]), 'BODY[HEADER.FIELDS.NOT (Subject Cc Message-Id)]')
550
p.parseString('BODY[1.MIME]<10.50>')
551
self.assertEquals(len(p.result), 1)
552
self.failUnless(isinstance(p.result[0], p.Body))
553
self.assertEquals(p.result[0].peek, False)
554
self.failUnless(isinstance(p.result[0].mime, p.MIME))
555
self.assertEquals(p.result[0].part, (0,))
556
self.assertEquals(p.result[0].partialBegin, 10)
557
self.assertEquals(p.result[0].partialLength, 50)
558
self.assertEquals(p.result[0].empty, False)
559
self.assertEquals(str(p.result[0]), 'BODY[1.MIME]<10.50>')
562
p.parseString('BODY.PEEK[1.3.9.11.HEADER.FIELDS.NOT (Message-Id Date)]<103.69>')
563
self.assertEquals(len(p.result), 1)
564
self.failUnless(isinstance(p.result[0], p.Body))
565
self.assertEquals(p.result[0].peek, True)
566
self.failUnless(isinstance(p.result[0].header, p.Header))
567
self.assertEquals(p.result[0].part, (0, 2, 8, 10))
568
self.assertEquals(p.result[0].header.fields, ['MESSAGE-ID', 'DATE'])
569
self.assertEquals(p.result[0].partialBegin, 103)
570
self.assertEquals(p.result[0].partialLength, 69)
571
self.assertEquals(p.result[0].empty, False)
572
self.assertEquals(str(p.result[0]), 'BODY[1.3.9.11.HEADER.FIELDS.NOT (Message-Id Date)]<103.69>')
577
'foo', 'bar', 'baz', StringIO('this is a file\r\n'), 'buz'
580
output = '"foo" "bar" "baz" {16}\r\nthis is a file\r\n "buz"'
582
self.assertEquals(imap4.collapseNestedLists(inputStructure), output)
584
def testQuoteAvoider(self):
586
'foo', imap4.DontQuoteMe('bar'), "baz", StringIO('this is a file\r\n'),
587
imap4.DontQuoteMe('buz'), ""
590
output = '"foo" bar "baz" {16}\r\nthis is a file\r\n buz ""'
592
self.assertEquals(imap4.collapseNestedLists(input), output)
594
def testLiterals(self):
596
('({10}\r\n0123456789)', [['0123456789']]),
599
for (case, expected) in cases:
600
self.assertEquals(imap4.parseNestedParens(case), expected)
602
def testQueryBuilder(self):
604
imap4.Query(flagged=1),
605
imap4.Query(sorted=1, unflagged=1, deleted=1),
606
imap4.Or(imap4.Query(flagged=1), imap4.Query(deleted=1)),
607
imap4.Query(before='today'),
609
imap4.Query(deleted=1),
610
imap4.Query(unseen=1),
616
imap4.Query(sorted=1, since='yesterday', smaller=1000),
617
imap4.Query(sorted=1, before='tuesday', larger=10000),
618
imap4.Query(sorted=1, unseen=1, deleted=1, before='today'),
620
imap4.Query(subject='spam')
625
imap4.Query(uid='1:5')
632
'(DELETED UNFLAGGED)',
633
'(OR FLAGGED DELETED)',
635
'(OR DELETED (OR UNSEEN NEW))',
636
'(OR (NOT (OR (SINCE "yesterday" SMALLER 1000) ' # Continuing
637
'(OR (BEFORE "tuesday" LARGER 10000) (OR (BEFORE ' # Some more
638
'"today" DELETED UNSEEN) (NOT (SUBJECT "spam")))))) ' # And more
642
for (query, expected) in zip(inputs, outputs):
643
self.assertEquals(query, expected)
645
def testIdListParser(self):
664
MessageSet(5, None) + MessageSet(1, 2),
667
MessageSet(1) + MessageSet(3) + MessageSet(5),
670
MessageSet(1, 5) + MessageSet(10, 20),
671
MessageSet(1) + MessageSet(5, 10),
672
MessageSet(1) + MessageSet(5, 10) + MessageSet(15, 20),
673
MessageSet(1, 10) + MessageSet(15) + MessageSet(20, 25),
678
1, 2, 3, 10, 11, 16, 7, 13, 17,
681
for (input, expected) in zip(inputs, outputs):
682
self.assertEquals(imap4.parseIdList(input), expected)
684
for (input, expected) in zip(inputs, lengths):
686
L = len(imap4.parseIdList(input))
689
self.assertEquals(L, expected,
690
"len(%r) = %r != %r" % (input, L, expected))
693
implements(imap4.IMailboxInfo, imap4.IMailbox, imap4.ICloseableMailbox)
695
flags = ('\\Flag1', 'Flag2', '\\AnotherSysFlag', 'LastFlag')
703
self.addListener = self.listeners.append
704
self.removeListener = self.listeners.remove
709
def getUIDValidity(self):
712
def getUIDNext(self):
713
return len(self.messages) + 1
715
def getMessageCount(self):
718
def getRecentCount(self):
721
def getUnseenCount(self):
724
def isWriteable(self):
730
def getHierarchicalDelimiter(self):
733
def requestStatus(self, names):
735
if 'MESSAGES' in names:
736
r['MESSAGES'] = self.getMessageCount()
737
if 'RECENT' in names:
738
r['RECENT'] = self.getRecentCount()
739
if 'UIDNEXT' in names:
740
r['UIDNEXT'] = self.getMessageCount() + 1
741
if 'UIDVALIDITY' in names:
742
r['UIDVALIDITY'] = self.getUID()
743
if 'UNSEEN' in names:
744
r['UNSEEN'] = self.getUnseenCount()
745
return defer.succeed(r)
747
def addMessage(self, message, flags, date = None):
748
self.messages.append((message, flags, date, self.mUID))
750
return defer.succeed(None)
754
for i in self.messages:
755
if '\\Deleted' in i[1]:
758
self.messages.remove(i)
759
return [i[3] for i in delete]
764
class Account(imap4.MemoryAccount):
765
mailboxFactory = SimpleMailbox
766
def _emptyMailbox(self, name, id):
767
return self.mailboxFactory()
769
def select(self, name, rw=1):
770
mbox = imap4.MemoryAccount.select(self, name)
775
class SimpleServer(imap4.IMAP4Server):
776
def __init__(self, *args, **kw):
777
imap4.IMAP4Server.__init__(self, *args, **kw)
779
realm.theAccount = Account('testuser')
780
portal = cred.portal.Portal(realm)
781
c = cred.checkers.InMemoryUsernamePasswordDatabaseDontUse()
784
portal.registerChecker(c)
785
self.timeoutTest = False
787
def lineReceived(self, line):
789
#Do not send a respones
792
imap4.IMAP4Server.lineReceived(self, line)
794
_username = 'testuser'
795
_password = 'password-test'
796
def authenticateLogin(self, username, password):
797
if username == self._username and password == self._password:
798
return imap4.IAccount, self.theAccount, lambda: None
799
raise cred.error.UnauthorizedLogin()
802
class SimpleClient(imap4.IMAP4Client):
803
def __init__(self, deferred, contextFactory = None):
804
imap4.IMAP4Client.__init__(self, contextFactory)
805
self.deferred = deferred
808
def serverGreeting(self, caps):
809
self.deferred.callback(None)
811
def modeChanged(self, writeable):
812
self.events.append(['modeChanged', writeable])
813
self.transport.loseConnection()
815
def flagsChanged(self, newFlags):
816
self.events.append(['flagsChanged', newFlags])
817
self.transport.loseConnection()
819
def newMessages(self, exists, recent):
820
self.events.append(['newMessages', exists, recent])
821
self.transport.loseConnection()
823
def fetchBodyParts(self, message, parts):
824
"""Fetch some parts of the body.
826
@param message: message with parts to fetch
827
@type message: C{str}
828
@param parts: a list of int/str
831
cmd = "%s (BODY[%s]" % (message, parts[0])
833
cmd += " BODY[%s]" % p
835
d = self.sendCommand(imap4.Command("FETCH", cmd,
836
wantResponse=("FETCH",)))
837
d.addCallback(self.__cb_fetchBodyParts)
840
def __cb_fetchBodyParts(self, (lines, last)):
843
parts = line.split(None, 2)
845
if parts[1] == "FETCH":
847
mail_id = int(parts[0])
849
raise imap4.IllegalServerResponse, line
851
body_parts = imap4.parseNestedParens(parts[2])[0]
853
for i in range(len(body_parts)/3):
854
dict_parts[body_parts[3*i+1][0]] = body_parts[3*i+2]
855
info[mail_id] = dict_parts
858
class IMAP4HelperMixin:
864
self.server = SimpleServer(contextFactory=self.serverCTX)
865
self.client = SimpleClient(d, contextFactory=self.clientCTX)
868
SimpleMailbox.messages = []
869
theAccount = Account('testuser')
870
theAccount.mboxType = SimpleMailbox
871
SimpleServer.theAccount = theAccount
878
def _cbStopClient(self, ignore):
879
self.client.transport.loseConnection()
881
def _ebGeneral(self, failure):
882
self.client.transport.loseConnection()
883
self.server.transport.loseConnection()
884
failure.printTraceback(open('failure.log', 'w'))
885
failure.printTraceback()
889
return loopback.loopbackAsync(self.server, self.client)
891
class IMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):
892
def testCapability(self):
897
self.server.transport.loseConnection()
898
return self.client.getCapabilities().addCallback(gotCaps)
899
d1 = self.connected.addCallback(strip(getCaps)).addErrback(self._ebGeneral)
900
d = defer.gatherResults([self.loopback(), d1])
901
expected = {'IMAP4rev1': None, 'NAMESPACE': None, 'IDLE': None}
902
return d.addCallback(lambda _: self.assertEquals(expected, caps))
904
def testCapabilityWithAuth(self):
906
self.server.challengers['CRAM-MD5'] = cred.credentials.CramMD5Credentials
910
self.server.transport.loseConnection()
911
return self.client.getCapabilities().addCallback(gotCaps)
912
d1 = self.connected.addCallback(strip(getCaps)).addErrback(self._ebGeneral)
913
d = defer.gatherResults([self.loopback(), d1])
915
expCap = {'IMAP4rev1': None, 'NAMESPACE': None,
916
'IDLE': None, 'AUTH': ['CRAM-MD5']}
918
return d.addCallback(lambda _: self.assertEquals(expCap, caps))
920
def testLogout(self):
925
self.client.logout().addCallback(strip(setLoggedOut))
926
self.connected.addCallback(strip(logout)).addErrback(self._ebGeneral)
928
return d.addCallback(lambda _: self.assertEquals(self.loggedOut, 1))
931
self.responses = None
933
def setResponses(responses):
934
self.responses = responses
935
self.server.transport.loseConnection()
936
self.client.noop().addCallback(setResponses)
937
self.connected.addCallback(strip(noop)).addErrback(self._ebGeneral)
939
return d.addCallback(lambda _: self.assertEquals(self.responses, []))
943
d = self.client.login('testuser', 'password-test')
944
d.addCallback(self._cbStopClient)
945
d1 = self.connected.addCallback(strip(login)).addErrback(self._ebGeneral)
946
d = defer.gatherResults([d1, self.loopback()])
947
return d.addCallback(self._cbTestLogin)
949
def _cbTestLogin(self, ignored):
950
self.assertEquals(self.server.account, SimpleServer.theAccount)
951
self.assertEquals(self.server.state, 'auth')
953
def testFailedLogin(self):
955
d = self.client.login('testuser', 'wrong-password')
956
d.addBoth(self._cbStopClient)
958
d1 = self.connected.addCallback(strip(login)).addErrback(self._ebGeneral)
960
d = defer.gatherResults([d1, d2])
961
return d.addCallback(self._cbTestFailedLogin)
963
def _cbTestFailedLogin(self, ignored):
964
self.assertEquals(self.server.account, None)
965
self.assertEquals(self.server.state, 'unauth')
968
def testLoginRequiringQuoting(self):
969
self.server._username = '{test}user'
970
self.server._password = '{test}password'
973
d = self.client.login('{test}user', '{test}password')
974
d.addBoth(self._cbStopClient)
976
d1 = self.connected.addCallback(strip(login)).addErrback(self._ebGeneral)
977
d = defer.gatherResults([self.loopback(), d1])
978
return d.addCallback(self._cbTestLoginRequiringQuoting)
980
def _cbTestLoginRequiringQuoting(self, ignored):
981
self.assertEquals(self.server.account, SimpleServer.theAccount)
982
self.assertEquals(self.server.state, 'auth')
985
def testNamespace(self):
986
self.namespaceArgs = None
988
return self.client.login('testuser', 'password-test')
990
def gotNamespace(args):
991
self.namespaceArgs = args
992
self._cbStopClient(None)
993
return self.client.namespace().addCallback(gotNamespace)
995
d1 = self.connected.addCallback(strip(login))
996
d1.addCallback(strip(namespace))
997
d1.addErrback(self._ebGeneral)
999
d = defer.gatherResults([d1, d2])
1000
d.addCallback(lambda _: self.assertEquals(self.namespaceArgs,
1001
[[['', '/']], [], []]))
1004
def testSelect(self):
1005
SimpleServer.theAccount.addMailbox('test-mailbox')
1006
self.selectedArgs = None
1008
return self.client.login('testuser', 'password-test')
1011
self.selectedArgs = args
1012
self._cbStopClient(None)
1013
d = self.client.select('test-mailbox')
1014
d.addCallback(selected)
1017
d1 = self.connected.addCallback(strip(login))
1018
d1.addCallback(strip(select))
1019
d1.addErrback(self._ebGeneral)
1020
d2 = self.loopback()
1021
return defer.gatherResults([d1, d2]).addCallback(self._cbTestSelect)
1023
def _cbTestSelect(self, ignored):
1024
mbox = SimpleServer.theAccount.mailboxes['TEST-MAILBOX']
1025
self.assertEquals(self.server.mbox, mbox)
1026
self.assertEquals(self.selectedArgs, {
1027
'EXISTS': 9, 'RECENT': 3, 'UIDVALIDITY': 42,
1028
'FLAGS': ('\\Flag1', 'Flag2', '\\AnotherSysFlag', 'LastFlag'),
1032
def testExamine(self):
1033
SimpleServer.theAccount.addMailbox('test-mailbox')
1034
self.examinedArgs = None
1036
return self.client.login('testuser', 'password-test')
1039
self.examinedArgs = args
1040
self._cbStopClient(None)
1041
d = self.client.examine('test-mailbox')
1042
d.addCallback(examined)
1045
d1 = self.connected.addCallback(strip(login))
1046
d1.addCallback(strip(examine))
1047
d1.addErrback(self._ebGeneral)
1048
d2 = self.loopback()
1049
d = defer.gatherResults([d1, d2])
1050
return d.addCallback(self._cbTestExamine)
1052
def _cbTestExamine(self, ignored):
1053
mbox = SimpleServer.theAccount.mailboxes['TEST-MAILBOX']
1054
self.assertEquals(self.server.mbox, mbox)
1055
self.assertEquals(self.examinedArgs, {
1056
'EXISTS': 9, 'RECENT': 3, 'UIDVALIDITY': 42,
1057
'FLAGS': ('\\Flag1', 'Flag2', '\\AnotherSysFlag', 'LastFlag'),
1061
def testCreate(self):
1062
succeed = ('testbox', 'test/box', 'test/', 'test/box/box', 'INBOX')
1063
fail = ('testbox', 'test/box')
1065
def cb(): self.result.append(1)
1066
def eb(failure): self.result.append(0)
1069
return self.client.login('testuser', 'password-test')
1071
for name in succeed + fail:
1072
d = self.client.create(name)
1073
d.addCallback(strip(cb)).addErrback(eb)
1074
d.addCallbacks(self._cbStopClient, self._ebGeneral)
1077
d1 = self.connected.addCallback(strip(login)).addCallback(strip(create))
1078
d2 = self.loopback()
1079
d = defer.gatherResults([d1, d2])
1080
return d.addCallback(self._cbTestCreate, succeed, fail)
1082
def _cbTestCreate(self, ignored, succeed, fail):
1083
self.assertEquals(self.result, [1] * len(succeed) + [0] * len(fail))
1084
mbox = SimpleServer.theAccount.mailboxes.keys()
1085
answers = ['inbox', 'testbox', 'test/box', 'test', 'test/box/box']
1088
self.assertEquals(mbox, [a.upper() for a in answers])
1090
def testDelete(self):
1091
SimpleServer.theAccount.addMailbox('delete/me')
1094
return self.client.login('testuser', 'password-test')
1096
return self.client.delete('delete/me')
1097
d1 = self.connected.addCallback(strip(login))
1098
d1.addCallbacks(strip(delete), self._ebGeneral)
1099
d1.addCallbacks(self._cbStopClient, self._ebGeneral)
1100
d2 = self.loopback()
1101
d = defer.gatherResults([d1, d2])
1102
d.addCallback(lambda _:
1103
self.assertEquals(SimpleServer.theAccount.mailboxes.keys(), []))
1106
def testIllegalInboxDelete(self):
1109
return self.client.login('testuser', 'password-test')
1111
return self.client.delete('inbox')
1113
self.stashed = result
1115
d1 = self.connected.addCallback(strip(login))
1116
d1.addCallbacks(strip(delete), self._ebGeneral)
1118
d1.addCallbacks(self._cbStopClient, self._ebGeneral)
1119
d2 = self.loopback()
1120
d = defer.gatherResults([d1, d2])
1121
d.addCallback(lambda _: self.failUnless(isinstance(self.stashed,
1126
def testNonExistentDelete(self):
1128
return self.client.login('testuser', 'password-test')
1130
return self.client.delete('delete/me')
1131
def deleteFailed(failure):
1132
self.failure = failure
1135
d1 = self.connected.addCallback(strip(login))
1136
d1.addCallback(strip(delete)).addErrback(deleteFailed)
1137
d1.addCallbacks(self._cbStopClient, self._ebGeneral)
1138
d2 = self.loopback()
1139
d = defer.gatherResults([d1, d2])
1140
d.addCallback(lambda _: self.assertEquals(str(self.failure.value),
1145
def testIllegalDelete(self):
1147
m.flags = (r'\Noselect',)
1148
SimpleServer.theAccount.addMailbox('delete', m)
1149
SimpleServer.theAccount.addMailbox('delete/me')
1152
return self.client.login('testuser', 'password-test')
1154
return self.client.delete('delete')
1155
def deleteFailed(failure):
1156
self.failure = failure
1159
d1 = self.connected.addCallback(strip(login))
1160
d1.addCallback(strip(delete)).addErrback(deleteFailed)
1161
d1.addCallbacks(self._cbStopClient, self._ebGeneral)
1162
d2 = self.loopback()
1163
d = defer.gatherResults([d1, d2])
1164
expected = "Hierarchically inferior mailboxes exist and \\Noselect is set"
1165
d.addCallback(lambda _:
1166
self.assertEquals(str(self.failure.value), expected))
1169
def testRename(self):
1170
SimpleServer.theAccount.addMailbox('oldmbox')
1172
return self.client.login('testuser', 'password-test')
1174
return self.client.rename('oldmbox', 'newname')
1176
d1 = self.connected.addCallback(strip(login))
1177
d1.addCallbacks(strip(rename), self._ebGeneral)
1178
d1.addCallbacks(self._cbStopClient, self._ebGeneral)
1179
d2 = self.loopback()
1180
d = defer.gatherResults([d1, d2])
1181
d.addCallback(lambda _:
1182
self.assertEquals(SimpleServer.theAccount.mailboxes.keys(),
1186
def testIllegalInboxRename(self):
1189
return self.client.login('testuser', 'password-test')
1191
return self.client.rename('inbox', 'frotz')
1193
self.stashed = stuff
1195
d1 = self.connected.addCallback(strip(login))
1196
d1.addCallbacks(strip(rename), self._ebGeneral)
1198
d1.addCallbacks(self._cbStopClient, self._ebGeneral)
1199
d2 = self.loopback()
1200
d = defer.gatherResults([d1, d2])
1201
d.addCallback(lambda _:
1202
self.failUnless(isinstance(self.stashed, failure.Failure)))
1205
def testHierarchicalRename(self):
1206
SimpleServer.theAccount.create('oldmbox/m1')
1207
SimpleServer.theAccount.create('oldmbox/m2')
1209
return self.client.login('testuser', 'password-test')
1211
return self.client.rename('oldmbox', 'newname')
1213
d1 = self.connected.addCallback(strip(login))
1214
d1.addCallbacks(strip(rename), self._ebGeneral)
1215
d1.addCallbacks(self._cbStopClient, self._ebGeneral)
1216
d2 = self.loopback()
1217
d = defer.gatherResults([d1, d2])
1218
return d.addCallback(self._cbTestHierarchicalRename)
1220
def _cbTestHierarchicalRename(self, ignored):
1221
mboxes = SimpleServer.theAccount.mailboxes.keys()
1222
expected = ['newname', 'newname/m1', 'newname/m2']
1224
self.assertEquals(mboxes, [s.upper() for s in expected])
1226
def testSubscribe(self):
1228
return self.client.login('testuser', 'password-test')
1230
return self.client.subscribe('this/mbox')
1232
d1 = self.connected.addCallback(strip(login))
1233
d1.addCallbacks(strip(subscribe), self._ebGeneral)
1234
d1.addCallbacks(self._cbStopClient, self._ebGeneral)
1235
d2 = self.loopback()
1236
d = defer.gatherResults([d1, d2])
1237
d.addCallback(lambda _:
1238
self.assertEquals(SimpleServer.theAccount.subscriptions,
1242
def testUnsubscribe(self):
1243
SimpleServer.theAccount.subscriptions = ['THIS/MBOX', 'THAT/MBOX']
1245
return self.client.login('testuser', 'password-test')
1247
return self.client.unsubscribe('this/mbox')
1249
d1 = self.connected.addCallback(strip(login))
1250
d1.addCallbacks(strip(unsubscribe), self._ebGeneral)
1251
d1.addCallbacks(self._cbStopClient, self._ebGeneral)
1252
d2 = self.loopback()
1253
d = defer.gatherResults([d1, d2])
1254
d.addCallback(lambda _:
1255
self.assertEquals(SimpleServer.theAccount.subscriptions,
1259
def _listSetup(self, f):
1260
SimpleServer.theAccount.addMailbox('root/subthing')
1261
SimpleServer.theAccount.addMailbox('root/another-thing')
1262
SimpleServer.theAccount.addMailbox('non-root/subthing')
1265
return self.client.login('testuser', 'password-test')
1266
def listed(answers):
1267
self.listed = answers
1270
d1 = self.connected.addCallback(strip(login))
1271
d1.addCallbacks(strip(f), self._ebGeneral)
1272
d1.addCallbacks(listed, self._ebGeneral)
1273
d1.addCallbacks(self._cbStopClient, self._ebGeneral)
1274
d2 = self.loopback()
1275
return defer.gatherResults([d1, d2]).addCallback(lambda _: self.listed)
1279
return self.client.list('root', '%')
1280
d = self._listSetup(list)
1281
d.addCallback(lambda listed: self.assertEquals(
1284
(SimpleMailbox.flags, "/", "ROOT/SUBTHING"),
1285
(SimpleMailbox.flags, "/", "ROOT/ANOTHER-THING")
1291
SimpleServer.theAccount.subscribe('ROOT/SUBTHING')
1293
return self.client.lsub('root', '%')
1294
d = self._listSetup(lsub)
1295
d.addCallback(self.assertEquals,
1296
[(SimpleMailbox.flags, "/", "ROOT/SUBTHING")])
1299
def testStatus(self):
1300
SimpleServer.theAccount.addMailbox('root/subthing')
1302
return self.client.login('testuser', 'password-test')
1304
return self.client.status('root/subthing', 'MESSAGES', 'UIDNEXT', 'UNSEEN')
1305
def statused(result):
1306
self.statused = result
1308
self.statused = None
1309
d1 = self.connected.addCallback(strip(login))
1310
d1.addCallbacks(strip(status), self._ebGeneral)
1311
d1.addCallbacks(statused, self._ebGeneral)
1312
d1.addCallbacks(self._cbStopClient, self._ebGeneral)
1313
d2 = self.loopback()
1314
d = defer.gatherResults([d1, d2])
1315
d.addCallback(lambda _: self.assertEquals(
1317
{'MESSAGES': 9, 'UIDNEXT': '10', 'UNSEEN': 4}
1321
def testFailedStatus(self):
1323
return self.client.login('testuser', 'password-test')
1325
return self.client.status('root/nonexistent', 'MESSAGES', 'UIDNEXT', 'UNSEEN')
1326
def statused(result):
1327
self.statused = result
1328
def failed(failure):
1329
self.failure = failure
1331
self.statused = self.failure = None
1332
d1 = self.connected.addCallback(strip(login))
1333
d1.addCallbacks(strip(status), self._ebGeneral)
1334
d1.addCallbacks(statused, failed)
1335
d1.addCallbacks(self._cbStopClient, self._ebGeneral)
1336
d2 = self.loopback()
1337
return defer.gatherResults([d1, d2]).addCallback(self._cbTestFailedStatus)
1339
def _cbTestFailedStatus(self, ignored):
1344
self.failure.value.args,
1345
('Could not open mailbox',)
1348
def testFullAppend(self):
1349
infile = util.sibpath(__file__, 'rfc822.message')
1350
message = open(infile)
1351
SimpleServer.theAccount.addMailbox('root/subthing')
1353
return self.client.login('testuser', 'password-test')
1355
return self.client.append(
1358
('\\SEEN', '\\DELETED'),
1359
'Tue, 17 Jun 2003 11:22:16 -0600 (MDT)',
1362
d1 = self.connected.addCallback(strip(login))
1363
d1.addCallbacks(strip(append), self._ebGeneral)
1364
d1.addCallbacks(self._cbStopClient, self._ebGeneral)
1365
d2 = self.loopback()
1366
d = defer.gatherResults([d1, d2])
1367
return d.addCallback(self._cbTestFullAppend, infile)
1369
def _cbTestFullAppend(self, ignored, infile):
1370
mb = SimpleServer.theAccount.mailboxes['ROOT/SUBTHING']
1371
self.assertEquals(1, len(mb.messages))
1373
(['\\SEEN', '\\DELETED'], 'Tue, 17 Jun 2003 11:22:16 -0600 (MDT)', 0),
1376
self.assertEquals(open(infile).read(), mb.messages[0][0].getvalue())
1378
def testPartialAppend(self):
1379
infile = util.sibpath(__file__, 'rfc822.message')
1380
message = open(infile)
1381
SimpleServer.theAccount.addMailbox('PARTIAL/SUBTHING')
1383
return self.client.login('testuser', 'password-test')
1385
message = file(infile)
1386
return self.client.sendCommand(
1389
'PARTIAL/SUBTHING (\\SEEN) "Right now" {%d}' % os.path.getsize(infile),
1390
(), self.client._IMAP4Client__cbContinueAppend, message
1393
d1 = self.connected.addCallback(strip(login))
1394
d1.addCallbacks(strip(append), self._ebGeneral)
1395
d1.addCallbacks(self._cbStopClient, self._ebGeneral)
1396
d2 = self.loopback()
1397
d = defer.gatherResults([d1, d2])
1398
return d.addCallback(self._cbTestPartialAppend, infile)
1400
def _cbTestPartialAppend(self, ignored, infile):
1401
mb = SimpleServer.theAccount.mailboxes['PARTIAL/SUBTHING']
1402
self.assertEquals(1, len(mb.messages))
1404
(['\\SEEN'], 'Right now', 0),
1407
self.assertEquals(open(infile).read(), mb.messages[0][0].getvalue())
1409
def testCheck(self):
1410
SimpleServer.theAccount.addMailbox('root/subthing')
1412
return self.client.login('testuser', 'password-test')
1414
return self.client.select('root/subthing')
1416
return self.client.check()
1418
d = self.connected.addCallback(strip(login))
1419
d.addCallbacks(strip(select), self._ebGeneral)
1420
d.addCallbacks(strip(check), self._ebGeneral)
1421
d.addCallbacks(self._cbStopClient, self._ebGeneral)
1422
return self.loopback()
1424
# Okay, that was fun
1426
def testClose(self):
1429
('Message 1', ('\\Deleted', 'AnotherFlag'), None, 0),
1430
('Message 2', ('AnotherFlag',), None, 1),
1431
('Message 3', ('\\Deleted',), None, 2),
1433
SimpleServer.theAccount.addMailbox('mailbox', m)
1435
return self.client.login('testuser', 'password-test')
1437
return self.client.select('mailbox')
1439
return self.client.close()
1441
d = self.connected.addCallback(strip(login))
1442
d.addCallbacks(strip(select), self._ebGeneral)
1443
d.addCallbacks(strip(close), self._ebGeneral)
1444
d.addCallbacks(self._cbStopClient, self._ebGeneral)
1445
d2 = self.loopback()
1446
return defer.gatherResults([d, d2]).addCallback(self._cbTestClose, m)
1448
def _cbTestClose(self, ignored, m):
1449
self.assertEquals(len(m.messages), 1)
1450
self.assertEquals(m.messages[0], ('Message 2', ('AnotherFlag',), None, 1))
1451
self.failUnless(m.closed)
1453
def testExpunge(self):
1456
('Message 1', ('\\Deleted', 'AnotherFlag'), None, 0),
1457
('Message 2', ('AnotherFlag',), None, 1),
1458
('Message 3', ('\\Deleted',), None, 2),
1460
SimpleServer.theAccount.addMailbox('mailbox', m)
1462
return self.client.login('testuser', 'password-test')
1464
return self.client.select('mailbox')
1466
return self.client.expunge()
1467
def expunged(results):
1468
self.failIf(self.server.mbox is None)
1469
self.results = results
1472
d1 = self.connected.addCallback(strip(login))
1473
d1.addCallbacks(strip(select), self._ebGeneral)
1474
d1.addCallbacks(strip(expunge), self._ebGeneral)
1475
d1.addCallbacks(expunged, self._ebGeneral)
1476
d1.addCallbacks(self._cbStopClient, self._ebGeneral)
1477
d2 = self.loopback()
1478
d = defer.gatherResults([d1, d2])
1479
return d.addCallback(self._cbTestExpunge, m)
1481
def _cbTestExpunge(self, ignored, m):
1482
self.assertEquals(len(m.messages), 1)
1483
self.assertEquals(m.messages[0], ('Message 2', ('AnotherFlag',), None, 1))
1485
self.assertEquals(self.results, [0, 2])
1490
def requestAvatar(self, avatarId, mind, *interfaces):
1491
return imap4.IAccount, self.theAccount, lambda: None
1494
credentialInterfaces = (cred.credentials.IUsernameHashedPassword, cred.credentials.IUsernamePassword)
1497
'testuser': 'secret'
1500
def requestAvatarId(self, credentials):
1501
if credentials.username in self.users:
1502
return defer.maybeDeferred(
1503
credentials.checkPassword, self.users[credentials.username]
1504
).addCallback(self._cbCheck, credentials.username)
1506
def _cbCheck(self, result, username):
1509
raise cred.error.UnauthorizedLogin()
1511
class AuthenticatorTestCase(IMAP4HelperMixin, unittest.TestCase):
1513
IMAP4HelperMixin.setUp(self)
1516
realm.theAccount = Account('testuser')
1517
portal = cred.portal.Portal(realm)
1518
portal.registerChecker(TestChecker())
1519
self.server.portal = portal
1521
self.authenticated = 0
1522
self.account = realm.theAccount
1524
def testCramMD5(self):
1525
self.server.challengers['CRAM-MD5'] = cred.credentials.CramMD5Credentials
1526
cAuth = imap4.CramMD5ClientAuthenticator('testuser')
1527
self.client.registerAuthenticator(cAuth)
1530
return self.client.authenticate('secret')
1532
self.authenticated = 1
1534
d1 = self.connected.addCallback(strip(auth))
1535
d1.addCallbacks(strip(authed), self._ebGeneral)
1536
d1.addCallbacks(self._cbStopClient, self._ebGeneral)
1537
d2 = self.loopback()
1538
d = defer.gatherResults([d1, d2])
1539
return d.addCallback(self._cbTestCramMD5)
1541
def _cbTestCramMD5(self, ignored):
1542
self.assertEquals(self.authenticated, 1)
1543
self.assertEquals(self.server.account, self.account)
1545
def testFailedCramMD5(self):
1546
self.server.challengers['CRAM-MD5'] = cred.credentials.CramMD5Credentials
1547
cAuth = imap4.CramMD5ClientAuthenticator('testuser')
1548
self.client.registerAuthenticator(cAuth)
1551
return self.client.authenticate('not the secret')
1553
self.authenticated = 1
1555
self.authenticated = -1
1557
d1 = self.connected.addCallback(strip(misauth))
1558
d1.addCallbacks(strip(authed), strip(misauthed))
1559
d1.addCallbacks(self._cbStopClient, self._ebGeneral)
1560
d = defer.gatherResults([self.loopback(), d1])
1561
return d.addCallback(self._cbTestFailedCramMD5)
1563
def _cbTestFailedCramMD5(self, ignored):
1564
self.assertEquals(self.authenticated, -1)
1565
self.assertEquals(self.server.account, None)
1567
def testLOGIN(self):
1568
self.server.challengers['LOGIN'] = imap4.LOGINCredentials
1569
cAuth = imap4.LOGINAuthenticator('testuser')
1570
self.client.registerAuthenticator(cAuth)
1573
return self.client.authenticate('secret')
1575
self.authenticated = 1
1577
d1 = self.connected.addCallback(strip(auth))
1578
d1.addCallbacks(strip(authed), self._ebGeneral)
1579
d1.addCallbacks(self._cbStopClient, self._ebGeneral)
1580
d = defer.gatherResults([self.loopback(), d1])
1581
return d.addCallback(self._cbTestLOGIN)
1583
def _cbTestLOGIN(self, ignored):
1584
self.assertEquals(self.authenticated, 1)
1585
self.assertEquals(self.server.account, self.account)
1587
def testFailedLOGIN(self):
1588
self.server.challengers['LOGIN'] = imap4.LOGINCredentials
1589
cAuth = imap4.LOGINAuthenticator('testuser')
1590
self.client.registerAuthenticator(cAuth)
1593
return self.client.authenticate('not the secret')
1595
self.authenticated = 1
1597
self.authenticated = -1
1599
d1 = self.connected.addCallback(strip(misauth))
1600
d1.addCallbacks(strip(authed), strip(misauthed))
1601
d1.addCallbacks(self._cbStopClient, self._ebGeneral)
1602
d = defer.gatherResults([self.loopback(), d1])
1603
return d.addCallback(self._cbTestFailedLOGIN)
1605
def _cbTestFailedLOGIN(self, ignored):
1606
self.assertEquals(self.authenticated, -1)
1607
self.assertEquals(self.server.account, None)
1609
def testPLAIN(self):
1610
self.server.challengers['PLAIN'] = imap4.PLAINCredentials
1611
cAuth = imap4.PLAINAuthenticator('testuser')
1612
self.client.registerAuthenticator(cAuth)
1615
return self.client.authenticate('secret')
1617
self.authenticated = 1
1619
d1 = self.connected.addCallback(strip(auth))
1620
d1.addCallbacks(strip(authed), self._ebGeneral)
1621
d1.addCallbacks(self._cbStopClient, self._ebGeneral)
1622
d = defer.gatherResults([self.loopback(), d1])
1623
return d.addCallback(self._cbTestPLAIN)
1625
def _cbTestPLAIN(self, ignored):
1626
self.assertEquals(self.authenticated, 1)
1627
self.assertEquals(self.server.account, self.account)
1629
def testFailedPLAIN(self):
1630
self.server.challengers['PLAIN'] = imap4.PLAINCredentials
1631
cAuth = imap4.PLAINAuthenticator('testuser')
1632
self.client.registerAuthenticator(cAuth)
1635
return self.client.authenticate('not the secret')
1637
self.authenticated = 1
1639
self.authenticated = -1
1641
d1 = self.connected.addCallback(strip(misauth))
1642
d1.addCallbacks(strip(authed), strip(misauthed))
1643
d1.addCallbacks(self._cbStopClient, self._ebGeneral)
1644
d = defer.gatherResults([self.loopback(), d1])
1645
return d.addCallback(self._cbTestFailedPLAIN)
1647
def _cbTestFailedPLAIN(self, ignored):
1648
self.assertEquals(self.authenticated, -1)
1649
self.assertEquals(self.server.account, None)
1652
class UnsolicitedResponseTestCase(IMAP4HelperMixin, unittest.TestCase):
1653
def testReadWrite(self):
1655
return self.client.login('testuser', 'password-test')
1657
self.server.modeChanged(1)
1659
d1 = self.connected.addCallback(strip(login))
1660
d1.addCallback(strip(loggedIn)).addErrback(self._ebGeneral)
1661
d = defer.gatherResults([self.loopback(), d1])
1662
return d.addCallback(self._cbTestReadWrite)
1664
def _cbTestReadWrite(self, ignored):
1665
E = self.client.events
1666
self.assertEquals(E, [['modeChanged', 1]])
1668
def testReadOnly(self):
1670
return self.client.login('testuser', 'password-test')
1672
self.server.modeChanged(0)
1674
d1 = self.connected.addCallback(strip(login))
1675
d1.addCallback(strip(loggedIn)).addErrback(self._ebGeneral)
1676
d = defer.gatherResults([self.loopback(), d1])
1677
return d.addCallback(self._cbTestReadOnly)
1679
def _cbTestReadOnly(self, ignored):
1680
E = self.client.events
1681
self.assertEquals(E, [['modeChanged', 0]])
1683
def testFlagChange(self):
1685
1: ['\\Answered', '\\Deleted'],
1690
return self.client.login('testuser', 'password-test')
1692
self.server.flagsChanged(flags)
1694
d1 = self.connected.addCallback(strip(login))
1695
d1.addCallback(strip(loggedIn)).addErrback(self._ebGeneral)
1696
d = defer.gatherResults([self.loopback(), d1])
1697
return d.addCallback(self._cbTestFlagChange, flags)
1699
def _cbTestFlagChange(self, ignored, flags):
1700
E = self.client.events
1701
expect = [['flagsChanged', {x[0]: x[1]}] for x in flags.items()]
1704
self.assertEquals(E, expect)
1706
def testNewMessages(self):
1708
return self.client.login('testuser', 'password-test')
1710
self.server.newMessages(10, None)
1712
d1 = self.connected.addCallback(strip(login))
1713
d1.addCallback(strip(loggedIn)).addErrback(self._ebGeneral)
1714
d = defer.gatherResults([self.loopback(), d1])
1715
return d.addCallback(self._cbTestNewMessages)
1717
def _cbTestNewMessages(self, ignored):
1718
E = self.client.events
1719
self.assertEquals(E, [['newMessages', 10, None]])
1721
def testNewRecentMessages(self):
1723
return self.client.login('testuser', 'password-test')
1725
self.server.newMessages(None, 10)
1727
d1 = self.connected.addCallback(strip(login))
1728
d1.addCallback(strip(loggedIn)).addErrback(self._ebGeneral)
1729
d = defer.gatherResults([self.loopback(), d1])
1730
return d.addCallback(self._cbTestNewRecentMessages)
1732
def _cbTestNewRecentMessages(self, ignored):
1733
E = self.client.events
1734
self.assertEquals(E, [['newMessages', None, 10]])
1736
def testNewMessagesAndRecent(self):
1738
return self.client.login('testuser', 'password-test')
1740
self.server.newMessages(20, 10)
1742
d1 = self.connected.addCallback(strip(login))
1743
d1.addCallback(strip(loggedIn)).addErrback(self._ebGeneral)
1744
d = defer.gatherResults([self.loopback(), d1])
1745
return d.addCallback(self._cbTestNewMessagesAndRecent)
1747
def _cbTestNewMessagesAndRecent(self, ignored):
1748
E = self.client.events
1749
self.assertEquals(E, [['newMessages', 20, None], ['newMessages', None, 10]])
1751
class HandCraftedTestCase(IMAP4HelperMixin, unittest.TestCase):
1752
def testTrailingLiteral(self):
1753
transport = StringTransport()
1754
c = imap4.IMAP4Client()
1755
c.makeConnection(transport)
1756
c.lineReceived('* OK [IMAP4rev1]')
1758
def cbSelect(ignored):
1759
d = c.fetchMessage('1')
1760
c.dataReceived('* 1 FETCH (RFC822 {10}\r\n0123456789\r\n RFC822.SIZE 10)\r\n')
1761
c.dataReceived('0003 OK FETCH\r\n')
1764
def cbLogin(ignored):
1765
d = c.select('inbox')
1766
c.lineReceived('0002 OK SELECT')
1767
d.addCallback(cbSelect)
1770
d = c.login('blah', 'blah')
1771
c.dataReceived('0001 OK LOGIN\r\n')
1772
d.addCallback(cbLogin)
1775
def testPathelogicalScatteringOfLiterals(self):
1776
self.server.checker.addUser('testuser', 'password-test')
1777
transport = StringTransport()
1778
self.server.makeConnection(transport)
1781
self.server.dataReceived("01 LOGIN {8}\r\n")
1782
self.assertEquals(transport.value(), "+ Ready for 8 octets of text\r\n")
1785
self.server.dataReceived("testuser {13}\r\n")
1786
self.assertEquals(transport.value(), "+ Ready for 13 octets of text\r\n")
1789
self.server.dataReceived("password-test\r\n")
1790
self.assertEquals(transport.value(), "01 OK LOGIN succeeded\r\n")
1791
self.assertEquals(self.server.state, 'auth')
1793
self.server.connectionLost(error.ConnectionDone("Connection done."))
1795
def testUnsolicitedResponseMixedWithSolicitedResponse(self):
1797
class StillSimplerClient(imap4.IMAP4Client):
1799
def flagsChanged(self, newFlags):
1800
self.events.append(['flagsChanged', newFlags])
1802
transport = StringTransport()
1803
c = StillSimplerClient()
1804
c.makeConnection(transport)
1805
c.lineReceived('* OK [IMAP4rev1]')
1808
d = c.login('blah', 'blah')
1809
c.dataReceived('0001 OK LOGIN\r\n')
1812
d = c.select('inbox')
1813
c.lineReceived('0002 OK SELECT')
1816
d = c.fetchSpecific('1:*',
1817
headerType='HEADER.FIELDS',
1818
headerArgs=['SUBJECT'])
1819
c.dataReceived('* 1 FETCH (BODY[HEADER.FIELDS ("SUBJECT")] {38}\r\n')
1820
c.dataReceived('Subject: Suprise for your woman...\r\n')
1821
c.dataReceived('\r\n')
1822
c.dataReceived(')\r\n')
1823
c.dataReceived('* 1 FETCH (FLAGS (\Seen))\r\n')
1824
c.dataReceived('* 2 FETCH (BODY[HEADER.FIELDS ("SUBJECT")] {75}\r\n')
1825
c.dataReceived('Subject: What you been doing. Order your meds here . ,. handcuff madsen\r\n')
1826
c.dataReceived('\r\n')
1827
c.dataReceived(')\r\n')
1828
c.dataReceived('0003 OK FETCH completed\r\n')
1831
self.assertEquals(res, {
1832
1: [['BODY', ['HEADER.FIELDS', ['SUBJECT']],
1833
'Subject: Suprise for your woman...\r\n\r\n']],
1834
2: [['BODY', ['HEADER.FIELDS', ['SUBJECT']],
1835
'Subject: What you been doing. Order your meds here . ,. handcuff madsen\r\n\r\n']]
1838
self.assertEquals(c.events, [['flagsChanged', {1: ['\\Seen']}]])
1841
).addCallback(strip(select)
1842
).addCallback(strip(fetch)
1845
class FakeyServer(imap4.IMAP4Server):
1849
def sendServerGreeting(self):
1853
implements(imap4.IMessage)
1855
def __init__(self, headers, flags, date, body, uid, subpart):
1856
self.headers = headers
1858
self.body = StringIO(body)
1859
self.size = len(body)
1862
self.subpart = subpart
1864
def getHeaders(self, negate, *names):
1865
self.got_headers = negate, names
1871
def getInternalDate(self):
1874
def getBodyFile(self):
1883
def isMultipart(self):
1884
return self.subpart is not None
1886
def getSubPart(self, part):
1887
self.got_subpart = part
1888
return self.subpart[part]
1890
class NewStoreTestCase(unittest.TestCase, IMAP4HelperMixin):
1895
self.received_messages = self.received_uid = None
1897
self.server = imap4.IMAP4Server()
1898
self.server.state = 'select'
1899
self.server.mbox = self
1900
self.connected = defer.Deferred()
1901
self.client = SimpleClient(self.connected)
1903
def addListener(self, x):
1905
def removeListener(self, x):
1908
def store(self, *args, **kw):
1909
self.storeArgs = args, kw
1910
return self.response
1912
def _storeWork(self):
1914
return self.function(self.messages, self.flags, self.silent, self.uid)
1918
self.connected.addCallback(strip(connected)
1919
).addCallback(result
1920
).addCallback(self._cbStopClient
1921
).addErrback(self._ebGeneral)
1924
self.assertEquals(self.result, self.expected)
1925
self.assertEquals(self.storeArgs, self.expectedArgs)
1926
d = loopback.loopbackTCP(self.server, self.client, noisy=False)
1927
d.addCallback(check)
1930
def testSetFlags(self, uid=0):
1931
self.function = self.client.setFlags
1932
self.messages = '1,5,9'
1933
self.flags = ['\\A', '\\B', 'C']
1937
1: ['\\A', '\\B', 'C'],
1938
5: ['\\A', '\\B', 'C'],
1939
9: ['\\A', '\\B', 'C'],
1942
1: {'FLAGS': ['\\A', '\\B', 'C']},
1943
5: {'FLAGS': ['\\A', '\\B', 'C']},
1944
9: {'FLAGS': ['\\A', '\\B', 'C']},
1946
msg = imap4.MessageSet()
1950
self.expectedArgs = ((msg, ['\\A', '\\B', 'C'], 0), {'uid': 0})
1951
return self._storeWork()
1954
class NewFetchTestCase(unittest.TestCase, IMAP4HelperMixin):
1956
self.received_messages = self.received_uid = None
1959
self.server = imap4.IMAP4Server()
1960
self.server.state = 'select'
1961
self.server.mbox = self
1962
self.connected = defer.Deferred()
1963
self.client = SimpleClient(self.connected)
1965
def addListener(self, x):
1967
def removeListener(self, x):
1970
def fetch(self, messages, uid):
1971
self.received_messages = messages
1972
self.received_uid = uid
1973
return iter(zip(range(len(self.msgObjs)), self.msgObjs))
1975
def _fetchWork(self, uid):
1977
for (i, msg) in zip(range(len(self.msgObjs)), self.msgObjs):
1978
self.expected[i]['UID'] = str(msg.getUID())
1983
self.connected.addCallback(lambda _: self.function(self.messages, uid)
1984
).addCallback(result
1985
).addCallback(self._cbStopClient
1986
).addErrback(self._ebGeneral)
1988
d = loopback.loopbackTCP(self.server, self.client, noisy=False)
1989
d.addCallback(lambda x : self.assertEquals(self.result, self.expected))
1992
def testFetchUID(self):
1993
self.function = lambda m, u: self.client.fetchUID(m)
1997
FakeyMessage({}, (), '', '', 12345, None),
1998
FakeyMessage({}, (), '', '', 999, None),
1999
FakeyMessage({}, (), '', '', 10101, None),
2002
0: {'UID': '12345'},
2004
2: {'UID': '10101'},
2006
return self._fetchWork(0)
2008
def testFetchFlags(self, uid=0):
2009
self.function = self.client.fetchFlags
2012
FakeyMessage({}, ['FlagA', 'FlagB', '\\FlagC'], '', '', 54321, None),
2013
FakeyMessage({}, ['\\FlagC', 'FlagA', 'FlagB'], '', '', 12345, None),
2016
0: {'FLAGS': ['FlagA', 'FlagB', '\\FlagC']},
2017
1: {'FLAGS': ['\\FlagC', 'FlagA', 'FlagB']},
2019
return self._fetchWork(uid)
2021
def testFetchFlagsUID(self):
2022
return self.testFetchFlags(1)
2024
def testFetchInternalDate(self, uid=0):
2025
self.function = self.client.fetchInternalDate
2026
self.messages = '13'
2028
FakeyMessage({}, (), 'Fri, 02 Nov 2003 21:25:10 GMT', '', 23232, None),
2029
FakeyMessage({}, (), 'Thu, 29 Dec 2013 11:31:52 EST', '', 101, None),
2030
FakeyMessage({}, (), 'Mon, 10 Mar 1992 02:44:30 CST', '', 202, None),
2031
FakeyMessage({}, (), 'Sat, 11 Jan 2000 14:40:24 PST', '', 303, None),
2034
0: {'INTERNALDATE': '02-Nov-2003 21:25:10 +0000'},
2035
1: {'INTERNALDATE': '29-Dec-2013 11:31:52 -0500'},
2036
2: {'INTERNALDATE': '10-Mar-1992 02:44:30 -0600'},
2037
3: {'INTERNALDATE': '11-Jan-2000 14:40:24 -0800'},
2039
return self._fetchWork(uid)
2041
def testFetchInternalDateUID(self):
2042
return self.testFetchInternalDate(1)
2044
def testFetchEnvelope(self, uid=0):
2045
self.function = self.client.fetchEnvelope
2046
self.messages = '15'
2049
'from': 'user@domain', 'to': 'resu@domain',
2050
'date': 'thursday', 'subject': 'it is a message',
2051
'message-id': 'id-id-id-yayaya'}, (), '', '', 65656,
2056
['thursday', 'it is a message',
2057
[[None, None, 'user', 'domain']],
2058
[[None, None, 'user', 'domain']],
2059
[[None, None, 'user', 'domain']],
2060
[[None, None, 'resu', 'domain']],
2061
None, None, None, 'id-id-id-yayaya']
2064
return self._fetchWork(uid)
2066
def testFetchEnvelopeUID(self):
2067
return self.testFetchEnvelope(1)
2069
def testFetchBodyStructure(self, uid=0):
2070
self.function = self.client.fetchBodyStructure
2071
self.messages = '3:9,10:*'
2072
self.msgObjs = [FakeyMessage({
2073
'content-type': 'text/plain; name=thing; key="value"',
2074
'content-id': 'this-is-the-content-id',
2075
'content-description': 'describing-the-content-goes-here!',
2076
'content-transfer-encoding': '8BIT',
2077
}, (), '', 'Body\nText\nGoes\nHere\n', 919293, None)]
2078
self.expected = {0: {'BODYSTRUCTURE': [
2079
'text', 'plain', [['name', 'thing'], ['key', 'value']],
2080
'this-is-the-content-id', 'describing-the-content-goes-here!',
2081
'8BIT', '20', '4', None, None, None]}}
2082
return self._fetchWork(uid)
2084
def testFetchBodyStructureUID(self):
2085
return self.testFetchBodyStructure(1)
2087
def testFetchSimplifiedBody(self, uid=0):
2088
self.function = self.client.fetchSimplifiedBody
2089
self.messages = '21'
2090
self.msgObjs = [FakeyMessage({}, (), '', 'Yea whatever', 91825,
2091
[FakeyMessage({'content-type': 'image/jpg'}, (), '',
2092
'Body Body Body', None, None
2097
[None, None, [], None, None, None,
2103
return self._fetchWork(uid)
2105
def testFetchSimplifiedBodyUID(self):
2106
return self.testFetchSimplifiedBody(1)
2108
def testFetchSimplifiedBodyText(self, uid=0):
2109
self.function = self.client.fetchSimplifiedBody
2110
self.messages = '21'
2111
self.msgObjs = [FakeyMessage({'content-type': 'text/plain'},
2112
(), '', 'Yea whatever', 91825, None)]
2115
['text', 'plain', [], None, None, None,
2121
return self._fetchWork(uid)
2123
def testFetchSimplifiedBodyTextUID(self):
2124
return self.testFetchSimplifiedBodyText(1)
2126
def testFetchSimplifiedBodyRFC822(self, uid=0):
2127
self.function = self.client.fetchSimplifiedBody
2128
self.messages = '21'
2129
self.msgObjs = [FakeyMessage({'content-type': 'message/rfc822'},
2130
(), '', 'Yea whatever', 91825,
2131
[FakeyMessage({'content-type': 'image/jpg'}, (), '',
2132
'Body Body Body', None, None
2137
['message', 'rfc822', [], None, None, None,
2138
'12', [None, None, [[None, None, None]],
2139
[[None, None, None]], None, None, None,
2140
None, None, None], ['image', 'jpg', [],
2141
None, None, None, '14'], '1'
2146
return self._fetchWork(uid)
2148
def testFetchSimplifiedBodyRFC822UID(self):
2149
return self.testFetchSimplifiedBodyRFC822(1)
2151
def testFetchMessage(self, uid=0):
2152
self.function = self.client.fetchMessage
2153
self.messages = '1,3,7,10101'
2155
FakeyMessage({'Header': 'Value'}, (), '', 'BODY TEXT\r\n', 91, None),
2158
0: {'RFC822': 'Header: Value\r\n\r\nBODY TEXT\r\n'}
2160
return self._fetchWork(uid)
2162
def testFetchMessageUID(self):
2163
return self.testFetchMessage(1)
2165
def testFetchHeaders(self, uid=0):
2166
self.function = self.client.fetchHeaders
2167
self.messages = '9,6,2'
2169
FakeyMessage({'H1': 'V1', 'H2': 'V2'}, (), '', '', 99, None),
2172
0: {'RFC822.HEADER': imap4._formatHeaders({'H1': 'V1', 'H2': 'V2'})},
2174
return self._fetchWork(uid)
2176
def testFetchHeadersUID(self):
2177
return self.testFetchHeaders(1)
2179
def testFetchBody(self, uid=0):
2180
self.function = self.client.fetchBody
2181
self.messages = '1,2,3,4,5,6,7'
2183
FakeyMessage({'Header': 'Value'}, (), '', 'Body goes here\r\n', 171, None),
2186
0: {'RFC822.TEXT': 'Body goes here\r\n'},
2188
return self._fetchWork(uid)
2190
def testFetchBodyUID(self):
2191
return self.testFetchBody(1)
2193
def testFetchBodyParts(self):
2194
self.function = self.client.fetchBodyParts
2198
innerBody1 = 'Contained body message text. Squarge.'
2199
innerBody2 = 'Secondary <i>message</i> text of squarge body.'
2200
headers = util.OrderedDict()
2201
headers['from'] = 'sender@host'
2202
headers['to'] = 'recipient@domain'
2203
headers['subject'] = 'booga booga boo'
2204
headers['content-type'] = 'multipart/alternative; boundary="xyz"'
2205
innerHeaders = util.OrderedDict()
2206
innerHeaders['subject'] = 'this is subject text'
2207
innerHeaders['content-type'] = 'text/plain'
2208
innerHeaders2 = util.OrderedDict()
2209
innerHeaders2['subject'] = '<b>this is subject</b>'
2210
innerHeaders2['content-type'] = 'text/html'
2211
self.msgObjs = [FakeyMessage(
2212
headers, (), None, outerBody, 123,
2213
[FakeyMessage(innerHeaders, (), None, innerBody1, None, None),
2214
FakeyMessage(innerHeaders2, (), None, innerBody2, None, None)])]
2216
0: {'1': innerBody1, '2': innerBody2},
2222
self.connected.addCallback(lambda _: self.function(self.messages, parts))
2223
self.connected.addCallback(result)
2224
self.connected.addCallback(self._cbStopClient)
2225
self.connected.addErrback(self._ebGeneral)
2227
d = loopback.loopbackTCP(self.server, self.client, noisy=False)
2228
d.addCallback(lambda ign: self.assertEquals(self.result, self.expected))
2232
def test_fetchBodyPartOfNonMultipart(self):
2234
Single-part messages have an implicit first part which clients
2235
should be able to retrieve explicitly. Test that a client
2236
requesting part 1 of a text/plain message receives the body of the
2239
self.function = self.client.fetchBodyParts
2242
outerBody = 'DA body'
2243
headers = util.OrderedDict()
2244
headers['from'] = 'sender@host'
2245
headers['to'] = 'recipient@domain'
2246
headers['subject'] = 'booga booga boo'
2247
headers['content-type'] = 'text/plain'
2248
self.msgObjs = [FakeyMessage(
2249
headers, (), None, outerBody, 123, None)]
2252
0: {'1': outerBody},
2258
self.connected.addCallback(lambda _: self.function(self.messages, parts))
2259
self.connected.addCallback(result)
2260
self.connected.addCallback(self._cbStopClient)
2261
self.connected.addErrback(self._ebGeneral)
2263
d = loopback.loopbackTCP(self.server, self.client, noisy=False)
2264
d.addCallback(lambda ign: self.assertEquals(self.result, self.expected))
2268
def testFetchSize(self, uid=0):
2269
self.function = self.client.fetchSize
2270
self.messages = '1:100,2:*'
2272
FakeyMessage({}, (), '', 'x' * 20, 123, None),
2275
0: {'RFC822.SIZE': '20'},
2277
return self._fetchWork(uid)
2279
def testFetchSizeUID(self):
2280
return self.testFetchSize(1)
2282
def testFetchFull(self, uid=0):
2283
self.function = self.client.fetchFull
2284
self.messages = '1,3'
2286
FakeyMessage({}, ('\\XYZ', '\\YZX', 'Abc'),
2287
'Sun, 25 Jul 2010 06:20:30 -0400 (EDT)',
2288
'xyz' * 2, 654, None),
2289
FakeyMessage({}, ('\\One', '\\Two', 'Three'),
2290
'Mon, 14 Apr 2003 19:43:44 -0400',
2291
'abc' * 4, 555, None),
2294
0: {'FLAGS': ['\\XYZ', '\\YZX', 'Abc'],
2295
'INTERNALDATE': '25-Jul-2010 06:20:30 -0400',
2297
'ENVELOPE': [None, None, [[None, None, None]], [[None, None, None]], None, None, None, None, None, None],
2298
'BODY': [None, None, [], None, None, None, '6']},
2299
1: {'FLAGS': ['\\One', '\\Two', 'Three'],
2300
'INTERNALDATE': '14-Apr-2003 19:43:44 -0400',
2301
'RFC822.SIZE': '12',
2302
'ENVELOPE': [None, None, [[None, None, None]], [[None, None, None]], None, None, None, None, None, None],
2303
'BODY': [None, None, [], None, None, None, '12']},
2305
return self._fetchWork(uid)
2307
def testFetchFullUID(self):
2308
return self.testFetchFull(1)
2310
def testFetchAll(self, uid=0):
2311
self.function = self.client.fetchAll
2312
self.messages = '1,2:3'
2314
FakeyMessage({}, (), 'Mon, 14 Apr 2003 19:43:44 +0400',
2315
'Lalala', 10101, None),
2316
FakeyMessage({}, (), 'Tue, 15 Apr 2003 19:43:44 +0200',
2317
'Alalal', 20202, None),
2320
0: {'ENVELOPE': [None, None, [[None, None, None]], [[None, None, None]], None, None, None, None, None, None],
2322
'INTERNALDATE': '14-Apr-2003 19:43:44 +0400',
2324
1: {'ENVELOPE': [None, None, [[None, None, None]], [[None, None, None]], None, None, None, None, None, None],
2326
'INTERNALDATE': '15-Apr-2003 19:43:44 +0200',
2329
return self._fetchWork(uid)
2331
def testFetchAllUID(self):
2332
return self.testFetchAll(1)
2334
def testFetchFast(self, uid=0):
2335
self.function = self.client.fetchFast
2338
FakeyMessage({}, ('\\X',), '19 Mar 2003 19:22:21 -0500', '', 9, None),
2341
0: {'FLAGS': ['\\X'],
2342
'INTERNALDATE': '19-Mar-2003 19:22:21 -0500',
2343
'RFC822.SIZE': '0'},
2345
return self._fetchWork(uid)
2347
def testFetchFastUID(self):
2348
return self.testFetchFast(1)
2351
class FetchSearchStoreTestCase(unittest.TestCase, IMAP4HelperMixin):
2352
implements(imap4.ISearchableMailbox)
2355
self.expected = self.result = None
2356
self.server_received_query = None
2357
self.server_received_uid = None
2358
self.server_received_parts = None
2359
self.server_received_messages = None
2361
self.server = imap4.IMAP4Server()
2362
self.server.state = 'select'
2363
self.server.mbox = self
2364
self.connected = defer.Deferred()
2365
self.client = SimpleClient(self.connected)
2367
def search(self, query, uid):
2368
self.server_received_query = query
2369
self.server_received_uid = uid
2370
return self.expected
2372
def addListener(self, *a, **kw):
2374
removeListener = addListener
2376
def _searchWork(self, uid):
2378
return self.client.search(self.query, uid=uid)
2382
self.connected.addCallback(strip(search)
2383
).addCallback(result
2384
).addCallback(self._cbStopClient
2385
).addErrback(self._ebGeneral)
2388
# Ensure no short-circuiting wierdness is going on
2389
self.failIf(self.result is self.expected)
2391
self.assertEquals(self.result, self.expected)
2392
self.assertEquals(self.uid, self.server_received_uid)
2394
imap4.parseNestedParens(self.query),
2395
self.server_received_query
2397
d = loopback.loopbackTCP(self.server, self.client, noisy=False)
2398
d.addCallback(check)
2401
def testSearch(self):
2402
self.query = imap4.Or(
2403
imap4.Query(header=('subject', 'substring')),
2404
imap4.Query(larger=1024, smaller=4096),
2406
self.expected = [1, 4, 5, 7]
2408
return self._searchWork(0)
2410
def testUIDSearch(self):
2411
self.query = imap4.Or(
2412
imap4.Query(header=('subject', 'substring')),
2413
imap4.Query(larger=1024, smaller=4096),
2416
self.expected = [1, 2, 3]
2417
return self._searchWork(1)
2419
def getUID(self, msg):
2421
return self.expected[msg]['UID']
2422
except (TypeError, IndexError):
2423
return self.expected[msg-1]
2427
def fetch(self, messages, uid):
2428
self.server_received_uid = uid
2429
self.server_received_messages = str(messages)
2430
return self.expected
2432
def _fetchWork(self, fetch):
2436
self.connected.addCallback(strip(fetch)
2437
).addCallback(result
2438
).addCallback(self._cbStopClient
2439
).addErrback(self._ebGeneral)
2442
# Ensure no short-circuiting wierdness is going on
2443
self.failIf(self.result is self.expected)
2445
self.parts and self.parts.sort()
2446
self.server_received_parts and self.server_received_parts.sort()
2449
for (k, v) in self.expected.items():
2452
self.assertEquals(self.result, self.expected)
2453
self.assertEquals(self.uid, self.server_received_uid)
2454
self.assertEquals(self.parts, self.server_received_parts)
2455
self.assertEquals(imap4.parseIdList(self.messages),
2456
imap4.parseIdList(self.server_received_messages))
2458
d = loopback.loopbackTCP(self.server, self.client, noisy=False)
2459
d.addCallback(check)
2465
def addMessage(self, body, flags, date):
2466
self.args.append((body, flags, date))
2467
return defer.succeed(None)
2469
class FeaturefulMessage:
2470
implements(imap4.IMessageFile)
2475
def getInternalDate(self):
2476
return 'internaldate'
2479
return StringIO("open")
2481
class MessageCopierMailbox:
2482
implements(imap4.IMessageCopier)
2487
def copy(self, msg):
2488
self.msgs.append(msg)
2489
return len(self.msgs)
2491
class CopyWorkerTestCase(unittest.TestCase):
2492
def testFeaturefulMessage(self):
2493
s = imap4.IMAP4Server()
2495
# Yes. I am grabbing this uber-non-public method to test it.
2496
# It is complex. It needs to be tested directly!
2497
# Perhaps it should be refactored, simplified, or split up into
2498
# not-so-private components, but that is a task for another day.
2500
# Ha ha! Addendum! Soon it will be split up, and this test will
2501
# be re-written to just use the default adapter for IMailbox to
2502
# IMessageCopier and call .copy on that adapter.
2503
f = s._IMAP4Server__cbCopy
2506
d = f([(i, FeaturefulMessage()) for i in range(1, 11)], 'tag', m)
2508
def cbCopy(results):
2510
self.assertEquals(a[0].read(), "open")
2511
self.assertEquals(a[1], "flags")
2512
self.assertEquals(a[2], "internaldate")
2514
for (status, result) in results:
2515
self.failUnless(status)
2516
self.assertEquals(result, None)
2518
return d.addCallback(cbCopy)
2521
def testUnfeaturefulMessage(self):
2522
s = imap4.IMAP4Server()
2525
f = s._IMAP4Server__cbCopy
2528
msgs = [FakeyMessage({'Header-Counter': str(i)}, (), 'Date', 'Body %d' % (i,), i + 10, None) for i in range(1, 11)]
2529
d = f([im for im in zip(range(1, 11), msgs)], 'tag', m)
2531
def cbCopy(results):
2534
seen.append(a[0].read())
2535
self.assertEquals(a[1], ())
2536
self.assertEquals(a[2], "Date")
2539
exp = ["Header-Counter: %d\r\n\r\nBody %d" % (i, i) for i in range(1, 11)]
2541
self.assertEquals(seen, exp)
2543
for (status, result) in results:
2544
self.failUnless(status)
2545
self.assertEquals(result, None)
2547
return d.addCallback(cbCopy)
2549
def testMessageCopier(self):
2550
s = imap4.IMAP4Server()
2553
f = s._IMAP4Server__cbCopy
2555
m = MessageCopierMailbox()
2556
msgs = [object() for i in range(1, 11)]
2557
d = f([im for im in zip(range(1, 11), msgs)], 'tag', m)
2559
def cbCopy(results):
2560
self.assertEquals(results, zip([1] * 10, range(1, 11)))
2561
for (orig, new) in zip(msgs, m.msgs):
2562
self.assertIdentical(orig, new)
2564
return d.addCallback(cbCopy)
2567
class TLSTestCase(IMAP4HelperMixin, unittest.TestCase):
2568
serverCTX = ServerTLSContext and ServerTLSContext()
2569
clientCTX = ClientTLSContext and ClientTLSContext()
2572
return loopback.loopbackTCP(self.server, self.client, noisy=False)
2574
def testAPileOfThings(self):
2575
SimpleServer.theAccount.addMailbox('inbox')
2579
return self.client.login('testuser', 'password-test')
2582
return self.client.list('inbox', '%')
2585
return self.client.status('inbox', 'UIDNEXT')
2588
return self.client.examine('inbox')
2591
return self.client.logout()
2593
self.client.requireTransportSecurity = True
2595
methods = [login, list, status, examine, logout]
2596
map(self.connected.addCallback, map(strip, methods))
2597
self.connected.addCallbacks(self._cbStopClient, self._ebGeneral)
2599
self.assertEquals(self.server.startedTLS, True)
2600
self.assertEquals(self.client.startedTLS, True)
2601
self.assertEquals(len(called), len(methods))
2603
d.addCallback(check)
2606
def testLoginLogin(self):
2607
self.server.checker.addUser('testuser', 'password-test')
2609
self.client.registerAuthenticator(imap4.LOGINAuthenticator('testuser'))
2610
self.connected.addCallback(
2611
lambda _: self.client.authenticate('password-test')
2613
lambda _: self.client.logout()
2614
).addCallback(success.append
2615
).addCallback(self._cbStopClient
2616
).addErrback(self._ebGeneral)
2619
d.addCallback(lambda x : self.assertEquals(len(success), 1))
2622
def testStartTLS(self):
2624
self.connected.addCallback(lambda _: self.client.startTLS())
2625
self.connected.addCallback(lambda _: self.assertNotEquals(-1, self.client.transport.__class__.__name__.find('TLS')))
2626
self.connected.addCallback(self._cbStopClient)
2627
self.connected.addCallback(success.append)
2628
self.connected.addErrback(self._ebGeneral)
2631
d.addCallback(lambda x : self.failUnless(success))
2634
def testFailedStartTLS(self):
2636
def breakServerTLS(ign):
2637
self.server.canStartTLS = False
2639
self.connected.addCallback(breakServerTLS)
2640
self.connected.addCallback(lambda ign: self.client.startTLS())
2641
self.connected.addErrback(lambda err: failure.append(err.trap(imap4.IMAP4Exception)))
2642
self.connected.addCallback(self._cbStopClient)
2643
self.connected.addErrback(self._ebGeneral)
2646
self.failUnless(failure)
2647
self.assertIdentical(failure[0], imap4.IMAP4Exception)
2648
return self.loopback().addCallback(check)
2650
class SlowMailbox(SimpleMailbox):
2653
# Not a very nice implementation of fetch(), but it'll
2654
# do for the purposes of testing.
2655
def fetch(self, messages, uid):
2656
d = defer.Deferred()
2657
reactor.callLater(self.howSlow, d.callback, ())
2660
class Timeout(IMAP4HelperMixin, unittest.TestCase):
2661
def testServerTimeout(self):
2662
self.server.timeoutTest = True
2663
self.client.timeout = 5 #seconds
2664
self.selectedArgs = None
2667
d = self.client.login('testuser', 'password-test')
2668
d.addErrback(timedOut)
2671
def timedOut(failure):
2672
self._cbStopClient(None)
2673
failure.trap(error.TimeoutError)
2675
d = self.connected.addCallback(strip(login))
2676
d.addErrback(self._ebGeneral)
2677
return defer.gatherResults([d, self.loopback()])
2679
def testLongFetchDoesntTimeout(self):
2680
SimpleServer.theAccount.mailboxFactory = SlowMailbox
2681
SimpleServer.theAccount.addMailbox('mailbox-test')
2683
self.server.setTimeout(0.1)
2684
self.stillConnected = False
2687
return self.client.login('testuser', 'password-test')
2689
return self.client.select('mailbox-test')
2691
return self.client.fetchUID('1:*')
2692
def stillConnected():
2693
self.stillConnected = not self.server.transport.disconnecting
2695
d1 = self.connected.addCallback(strip(login))
2696
d1.addCallback(strip(select))
2697
d1.addCallback(strip(fetch))
2698
d1.addCallback(strip(stillConnected))
2699
d1.addCallback(self._cbStopClient)
2700
d1.addErrback(self._ebGeneral)
2701
d = defer.gatherResults([d1, self.loopback()])
2702
return d.addCallback(lambda _: self.failUnless(self.stillConnected))
2704
def testIdleClientDoesDisconnect(self):
2705
from twisted.test.time_helpers import Clock
2709
# Hook up our server protocol
2710
transport = StringTransportWithDisconnection()
2711
transport.protocol = self.server
2712
self.server.makeConnection(transport)
2714
# Make sure we can notice when the connection goes away
2716
connLost = self.server.connectionLost
2717
self.server.connectionLost = lambda reason: (lost.append(None), connLost(reason))[1]
2719
# 2/3rds of the idle timeout elapses...
2720
c.pump(reactor, [0.0] + [self.server.timeOut / 3.0] * 2)
2721
self.failIf(lost, lost)
2724
c.pump(reactor, [0.0, self.server.timeOut / 2.0])
2725
self.failUnless(lost)
2731
class Disconnection(unittest.TestCase):
2732
def testClientDisconnectFailsDeferreds(self):
2733
c = imap4.IMAP4Client()
2734
t = StringTransportWithDisconnection()
2736
d = self.assertFailure(c.login('testuser', 'example.com'), error.ConnectionDone)
2737
c.connectionLost(error.ConnectionDone("Connection closed"))
2742
class SynchronousMailbox(object):
2744
Trivial, in-memory mailbox implementation which can produce a message
2747
def __init__(self, messages):
2748
self.messages = messages
2751
def fetch(self, msgset, uid):
2752
assert not uid, "Cannot handle uid requests."
2754
yield msg, self.messages[msg - 1]
2758
class StringTransportConsumer(StringTransport):
2762
def registerProducer(self, producer, streaming):
2763
self.producer = producer
2764
self.streaming = streaming
2768
class Pipelining(unittest.TestCase):
2770
Tests for various aspects of the IMAP4 server's pipelining support.
2773
FakeyMessage({}, [], '', '0', None, None),
2774
FakeyMessage({}, [], '', '1', None, None),
2775
FakeyMessage({}, [], '', '2', None, None),
2781
self.transport = StringTransportConsumer()
2782
self.server = imap4.IMAP4Server(None, None, self.iterateInReactor)
2783
self.server.makeConnection(self.transport)
2786
def iterateInReactor(self, iterator):
2787
d = defer.Deferred()
2788
self.iterators.append((iterator, d))
2793
self.server.connectionLost(failure.Failure(error.ConnectionDone()))
2796
def test_synchronousFetch(self):
2798
Test that pipelined FETCH commands which can be responded to
2799
synchronously are responded to correctly.
2801
mailbox = SynchronousMailbox(self.messages)
2803
# Skip over authentication and folder selection
2804
self.server.state = 'select'
2805
self.server.mbox = mailbox
2807
# Get rid of any greeting junk
2808
self.transport.clear()
2810
# Here's some pipelined stuff
2811
self.server.dataReceived(
2812
'01 FETCH 1 BODY[]\r\n'
2813
'02 FETCH 2 BODY[]\r\n'
2814
'03 FETCH 3 BODY[]\r\n')
2816
# Flush anything the server has scheduled to run
2817
while self.iterators:
2818
for e in self.iterators[0][0]:
2821
self.iterators.pop(0)[1].callback(None)
2823
# The bodies are empty because we aren't simulating a transport
2824
# exactly correctly (we have StringTransportConsumer but we never
2825
# call resumeProducing on its producer). It doesn't matter: just
2826
# make sure the surrounding structure is okay, and that no
2827
# exceptions occurred.
2829
self.transport.value(),
2830
'* 1 FETCH (BODY[] )\r\n'
2831
'01 OK FETCH completed\r\n'
2832
'* 2 FETCH (BODY[] )\r\n'
2833
'02 OK FETCH completed\r\n'
2834
'* 3 FETCH (BODY[] )\r\n'
2835
'03 OK FETCH completed\r\n')
2839
if ClientTLSContext is None:
2840
for case in (TLSTestCase,):
2841
case.skip = "OpenSSL not present"
2842
elif interfaces.IReactorSSL(reactor, None) is None:
2843
for case in (TLSTestCase,):
2844
case.skip = "Reactor doesn't support SSL"