~certify-web-dev/twisted/certify-trunk

« back to all changes in this revision

Viewing changes to twisted/mail/test/test_imap.py

  • Committer: Bazaar Package Importer
  • Author(s): Matthias Klose
  • Date: 2007-01-17 14:52:35 UTC
  • mfrom: (1.1.5 upstream) (2.1.2 etch)
  • Revision ID: james.westby@ubuntu.com-20070117145235-btmig6qfmqfen0om
Tags: 2.5.0-0ubuntu1
New upstream version, compatible with python2.5.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# -*- test-case-name: twisted.mail.test.test_imap -*-
 
2
# Copyright (c) 2001-2004 Twisted Matrix Laboratories.
 
3
# See LICENSE for details.
 
4
 
 
5
 
 
6
"""
 
7
Test case for twisted.mail.imap4
 
8
"""
 
9
 
 
10
from __future__ import nested_scopes
 
11
 
 
12
try:
 
13
    from cStringIO import StringIO
 
14
except ImportError:
 
15
    from StringIO import StringIO
 
16
 
 
17
import os
 
18
import sys
 
19
import types
 
20
import time
 
21
from zope.interface import implements
 
22
 
 
23
from twisted.mail.imap4 import MessageSet
 
24
from twisted.mail import imap4
 
25
from twisted.mail import smtp
 
26
from twisted.protocols import loopback
 
27
from twisted.internet import defer
 
28
from twisted.internet import error
 
29
from twisted.internet import reactor
 
30
from twisted.internet import interfaces
 
31
from twisted.trial import unittest
 
32
from twisted.python import util
 
33
from twisted.python.util import sibpath
 
34
from twisted.python import failure
 
35
 
 
36
from twisted import cred
 
37
import twisted.cred.error
 
38
import twisted.cred.checkers
 
39
import twisted.cred.credentials
 
40
import twisted.cred.portal
 
41
 
 
42
from twisted.test.proto_helpers import StringTransport, StringTransportWithDisconnection
 
43
 
 
44
try:
 
45
    from twisted.test.ssl_helpers import ClientTLSContext, ServerTLSContext
 
46
except ImportError:
 
47
    ClientTLSContext = ServerTLSContext = None
 
48
 
 
49
def strip(f):
 
50
    return lambda result, f=f: f()
 
51
 
 
52
def sortNest(l):
 
53
    l = l[:]
 
54
    l.sort()
 
55
    for i in range(len(l)):
 
56
        if isinstance(l[i], types.ListType):
 
57
            l[i] = sortNest(l[i])
 
58
        elif isinstance(l[i], types.TupleType):
 
59
            l[i] = tuple(sortNest(list(l[i])))
 
60
    return l
 
61
 
 
62
class IMAP4UTF7TestCase(unittest.TestCase):
 
63
    tests = [
 
64
        [u'Hello world', 'Hello world'],
 
65
        [u'Hello & world', 'Hello &- world'],
 
66
        [u'Hello\xffworld', 'Hello&AP8-world'],
 
67
        [u'\xff\xfe\xfd\xfc', '&AP8A,gD9APw-'],
 
68
        [u'~peter/mail/\u65e5\u672c\u8a9e/\u53f0\u5317', 
 
69
         '~peter/mail/&ZeVnLIqe-/&U,BTFw-'], # example from RFC 2060
 
70
    ]
 
71
 
 
72
    def testEncode(self):
 
73
        for (input, output) in self.tests:
 
74
            self.assertEquals(input.encode('imap4-utf-7'), output)
 
75
 
 
76
    def testDecode(self):
 
77
        for (input, output) in self.tests:
 
78
            # XXX - Piece of *crap* 2.1
 
79
            self.assertEquals(input, imap4.decoder(output)[0])
 
80
 
 
81
    def testPrintableSingletons(self):
 
82
        # All printables represent themselves
 
83
        for o in range(0x20, 0x26) + range(0x27, 0x7f):
 
84
            self.failUnlessEqual(chr(o), chr(o).encode('imap4-utf-7'))
 
85
            self.failUnlessEqual(chr(o), chr(o).decode('imap4-utf-7'))
 
86
        self.failUnlessEqual('&'.encode('imap4-utf-7'), '&-')
 
87
        self.failUnlessEqual('&-'.decode('imap4-utf-7'), '&')
 
88
 
 
89
class BufferingConsumer:
 
90
    def __init__(self):
 
91
        self.buffer = []
 
92
 
 
93
    def write(self, bytes):
 
94
        self.buffer.append(bytes)
 
95
        if self.consumer:
 
96
            self.consumer.resumeProducing()
 
97
 
 
98
    def registerProducer(self, consumer, streaming):
 
99
        self.consumer = consumer
 
100
        self.consumer.resumeProducing()
 
101
 
 
102
    def unregisterProducer(self):
 
103
        self.consumer = None
 
104
 
 
105
class MessageProducerTestCase(unittest.TestCase):
 
106
    def testSinglePart(self):
 
107
        body = 'This is body text.  Rar.'
 
108
        headers = util.OrderedDict()
 
109
        headers['from'] = 'sender@host'
 
110
        headers['to'] = 'recipient@domain'
 
111
        headers['subject'] = 'booga booga boo'
 
112
        headers['content-type'] = 'text/plain'
 
113
 
 
114
        msg = FakeyMessage(headers, (), None, body, 123, None )
 
115
 
 
116
        c = BufferingConsumer()
 
117
        p = imap4.MessageProducer(msg)
 
118
        d = p.beginProducing(c)
 
119
 
 
120
        def cbProduced(result):
 
121
            self.assertIdentical(result, p)
 
122
            self.assertEquals(
 
123
                ''.join(c.buffer),
 
124
 
 
125
                '{119}\r\n'
 
126
                'From: sender@host\r\n'
 
127
                'To: recipient@domain\r\n'
 
128
                'Subject: booga booga boo\r\n'
 
129
                'Content-Type: text/plain\r\n'
 
130
                '\r\n'
 
131
                + body)
 
132
        return d.addCallback(cbProduced)
 
133
 
 
134
 
 
135
    def testSingleMultiPart(self):
 
136
        outerBody = ''
 
137
        innerBody = 'Contained body message text.  Squarge.'
 
138
        headers = util.OrderedDict()
 
139
        headers['from'] = 'sender@host'
 
140
        headers['to'] = 'recipient@domain'
 
141
        headers['subject'] = 'booga booga boo'
 
142
        headers['content-type'] = 'multipart/alternative; boundary="xyz"'
 
143
 
 
144
        innerHeaders = util.OrderedDict()
 
145
        innerHeaders['subject'] = 'this is subject text'
 
146
        innerHeaders['content-type'] = 'text/plain'
 
147
        msg = FakeyMessage(headers, (), None, outerBody, 123,
 
148
                           [FakeyMessage(innerHeaders, (), None, innerBody,
 
149
                                         None, None)],
 
150
                           )
 
151
 
 
152
        c = BufferingConsumer()
 
153
        p = imap4.MessageProducer(msg)
 
154
        d = p.beginProducing(c)
 
155
 
 
156
        def cbProduced(result):
 
157
            self.failUnlessIdentical(result, p)
 
158
 
 
159
            self.assertEquals(
 
160
                ''.join(c.buffer),
 
161
 
 
162
                '{239}\r\n'
 
163
                'From: sender@host\r\n'
 
164
                'To: recipient@domain\r\n'
 
165
                'Subject: booga booga boo\r\n'
 
166
                'Content-Type: multipart/alternative; boundary="xyz"\r\n'
 
167
                '\r\n'
 
168
                '\r\n'
 
169
                '--xyz\r\n'
 
170
                'Subject: this is subject text\r\n'
 
171
                'Content-Type: text/plain\r\n'
 
172
                '\r\n'
 
173
                + innerBody
 
174
                + '\r\n--xyz--\r\n')
 
175
 
 
176
        return d.addCallback(cbProduced)
 
177
 
 
178
 
 
179
    def testMultipleMultiPart(self):
 
180
        outerBody = ''
 
181
        innerBody1 = 'Contained body message text.  Squarge.'
 
182
        innerBody2 = 'Secondary <i>message</i> text of squarge body.'
 
183
        headers = util.OrderedDict()
 
184
        headers['from'] = 'sender@host'
 
185
        headers['to'] = 'recipient@domain'
 
186
        headers['subject'] = 'booga booga boo'
 
187
        headers['content-type'] = 'multipart/alternative; boundary="xyz"'
 
188
        innerHeaders = util.OrderedDict()
 
189
        innerHeaders['subject'] = 'this is subject text'
 
190
        innerHeaders['content-type'] = 'text/plain'
 
191
        innerHeaders2 = util.OrderedDict()
 
192
        innerHeaders2['subject'] = '<b>this is subject</b>'
 
193
        innerHeaders2['content-type'] = 'text/html'
 
194
        msg = FakeyMessage(headers, (), None, outerBody, 123, [
 
195
            FakeyMessage(innerHeaders, (), None, innerBody1, None, None),
 
196
            FakeyMessage(innerHeaders2, (), None, innerBody2, None, None)
 
197
            ],
 
198
        )
 
199
 
 
200
        c = BufferingConsumer()
 
201
        p = imap4.MessageProducer(msg)
 
202
        d = p.beginProducing(c)
 
203
 
 
204
        def cbProduced(result):
 
205
            self.failUnlessIdentical(result, p)
 
206
 
 
207
            self.assertEquals(
 
208
                ''.join(c.buffer),
 
209
 
 
210
                '{354}\r\n'
 
211
                'From: sender@host\r\n'
 
212
                'To: recipient@domain\r\n'
 
213
                'Subject: booga booga boo\r\n'
 
214
                'Content-Type: multipart/alternative; boundary="xyz"\r\n'
 
215
                '\r\n'
 
216
                '\r\n'
 
217
                '--xyz\r\n'
 
218
                'Subject: this is subject text\r\n'
 
219
                'Content-Type: text/plain\r\n'
 
220
                '\r\n'
 
221
                + innerBody1
 
222
                + '\r\n--xyz\r\n'
 
223
                'Subject: <b>this is subject</b>\r\n'
 
224
                'Content-Type: text/html\r\n'
 
225
                '\r\n'
 
226
                + innerBody2
 
227
                + '\r\n--xyz--\r\n')
 
228
        return d.addCallback(cbProduced)
 
229
 
 
230
 
 
231
class IMAP4HelperTestCase(unittest.TestCase):
 
232
    def testFileProducer(self):
 
233
        b = (('x' * 1) + ('y' * 1) + ('z' * 1)) * 10
 
234
        c = BufferingConsumer()
 
235
        f = StringIO(b)
 
236
        p = imap4.FileProducer(f)
 
237
        d = p.beginProducing(c)
 
238
 
 
239
        def cbProduced(result):
 
240
            self.failUnlessIdentical(result, p)
 
241
            self.assertEquals(
 
242
                ('{%d}\r\n' % len(b))+ b,
 
243
                ''.join(c.buffer))
 
244
        return d.addCallback(cbProduced)
 
245
 
 
246
    def testWildcard(self):
 
247
        cases = [
 
248
            ['foo/%gum/bar',
 
249
                ['foo/bar', 'oo/lalagum/bar', 'foo/gumx/bar', 'foo/gum/baz'],
 
250
                ['foo/xgum/bar', 'foo/gum/bar'],
 
251
            ], ['foo/x%x/bar',
 
252
                ['foo', 'bar', 'fuz fuz fuz', 'foo/*/bar', 'foo/xyz/bar', 'foo/xx/baz'],
 
253
                ['foo/xyx/bar', 'foo/xx/bar', 'foo/xxxxxxxxxxxxxx/bar'],
 
254
            ], ['foo/xyz*abc/bar',
 
255
                ['foo/xyz/bar', 'foo/abc/bar', 'foo/xyzab/cbar', 'foo/xyza/bcbar'],
 
256
                ['foo/xyzabc/bar', 'foo/xyz/abc/bar', 'foo/xyz/123/abc/bar'],
 
257
            ]
 
258
        ]
 
259
 
 
260
        for (wildcard, fail, succeed) in cases:
 
261
            wildcard = imap4.wildcardToRegexp(wildcard, '/')
 
262
            for x in fail:
 
263
                self.failIf(wildcard.match(x))
 
264
            for x in succeed:
 
265
                self.failUnless(wildcard.match(x))
 
266
 
 
267
    def testWildcardNoDelim(self):
 
268
        cases = [
 
269
            ['foo/%gum/bar',
 
270
                ['foo/bar', 'oo/lalagum/bar', 'foo/gumx/bar', 'foo/gum/baz'],
 
271
                ['foo/xgum/bar', 'foo/gum/bar', 'foo/x/gum/bar'],
 
272
            ], ['foo/x%x/bar',
 
273
                ['foo', 'bar', 'fuz fuz fuz', 'foo/*/bar', 'foo/xyz/bar', 'foo/xx/baz'],
 
274
                ['foo/xyx/bar', 'foo/xx/bar', 'foo/xxxxxxxxxxxxxx/bar', 'foo/x/x/bar'],
 
275
            ], ['foo/xyz*abc/bar',
 
276
                ['foo/xyz/bar', 'foo/abc/bar', 'foo/xyzab/cbar', 'foo/xyza/bcbar'],
 
277
                ['foo/xyzabc/bar', 'foo/xyz/abc/bar', 'foo/xyz/123/abc/bar'],
 
278
            ]
 
279
        ]
 
280
 
 
281
        for (wildcard, fail, succeed) in cases:
 
282
            wildcard = imap4.wildcardToRegexp(wildcard, None)
 
283
            for x in fail:
 
284
                self.failIf(wildcard.match(x), x)
 
285
            for x in succeed:
 
286
                self.failUnless(wildcard.match(x), x)
 
287
 
 
288
    def testHeaderFormatter(self):
 
289
        cases = [
 
290
            ({'Header1': 'Value1', 'Header2': 'Value2'}, 'Header2: Value2\r\nHeader1: Value1\r\n'),
 
291
        ]
 
292
 
 
293
        for (input, output) in cases:
 
294
            self.assertEquals(imap4._formatHeaders(input), output)
 
295
 
 
296
    def testMessageSet(self):
 
297
        m1 = MessageSet()
 
298
        m2 = MessageSet()
 
299
 
 
300
        self.assertEquals(m1, m2)
 
301
 
 
302
        m1 = m1 + (1, 3)
 
303
        self.assertEquals(len(m1), 3)
 
304
        self.assertEquals(list(m1), [1, 2, 3])
 
305
 
 
306
        m2 = m2 + (1, 3)
 
307
        self.assertEquals(m1, m2)
 
308
        self.assertEquals(list(m1 + m2), [1, 2, 3])
 
309
 
 
310
    def testQuotedSplitter(self):
 
311
        cases = [
 
312
            '''Hello World''',
 
313
            '''Hello "World!"''',
 
314
            '''World "Hello" "How are you?"''',
 
315
            '''"Hello world" How "are you?"''',
 
316
            '''foo bar "baz buz" NIL''',
 
317
            '''foo bar "baz buz" "NIL"''',
 
318
            '''foo NIL "baz buz" bar''',
 
319
            '''foo "NIL" "baz buz" bar''',
 
320
            '''"NIL" bar "baz buz" foo''',
 
321
        ]
 
322
 
 
323
        answers = [
 
324
            ['Hello', 'World'],
 
325
            ['Hello', 'World!'],
 
326
            ['World', 'Hello', 'How are you?'],
 
327
            ['Hello world', 'How', 'are you?'],
 
328
            ['foo', 'bar', 'baz buz', None],
 
329
            ['foo', 'bar', 'baz buz', 'NIL'],
 
330
            ['foo', None, 'baz buz', 'bar'],
 
331
            ['foo', 'NIL', 'baz buz', 'bar'],
 
332
            ['NIL', 'bar', 'baz buz', 'foo'],
 
333
        ]
 
334
 
 
335
        errors = [
 
336
            '"mismatched quote',
 
337
            'mismatched quote"',
 
338
            'mismatched"quote',
 
339
            '"oops here is" another"',
 
340
        ]
 
341
 
 
342
        for s in errors:
 
343
            self.assertRaises(imap4.MismatchedQuoting, imap4.splitQuoted, s)
 
344
 
 
345
        for (case, expected) in zip(cases, answers):
 
346
            self.assertEquals(imap4.splitQuoted(case), expected)
 
347
 
 
348
 
 
349
    def testStringCollapser(self):
 
350
        cases = [
 
351
            ['a', 'b', 'c', 'd', 'e'],
 
352
            ['a', ' ', '"', 'b', 'c', ' ', '"', ' ', 'd', 'e'],
 
353
            [['a', 'b', 'c'], 'd', 'e'],
 
354
            ['a', ['b', 'c', 'd'], 'e'],
 
355
            ['a', 'b', ['c', 'd', 'e']],
 
356
            ['"', 'a', ' ', '"', ['b', 'c', 'd'], '"', ' ', 'e', '"'],
 
357
            ['a', ['"', ' ', 'b', 'c', ' ', ' ', '"'], 'd', 'e'],
 
358
        ]
 
359
 
 
360
        answers = [
 
361
            ['abcde'],
 
362
            ['a', 'bc ', 'de'],
 
363
            [['abc'], 'de'],
 
364
            ['a', ['bcd'], 'e'],
 
365
            ['ab', ['cde']],
 
366
            ['a ', ['bcd'], ' e'],
 
367
            ['a', [' bc  '], 'de'],
 
368
        ]
 
369
 
 
370
        for (case, expected) in zip(cases, answers):
 
371
            self.assertEquals(imap4.collapseStrings(case), expected)
 
372
 
 
373
    def testParenParser(self):
 
374
        s = '\r\n'.join(['xx'] * 4)
 
375
        cases = [
 
376
            '(BODY.PEEK[HEADER.FIELDS.NOT (subject bcc cc)] {%d}\r\n%s)' % (len(s), s,),
 
377
 
 
378
#            '(FLAGS (\Seen) INTERNALDATE "17-Jul-1996 02:44:25 -0700" '
 
379
#            'RFC822.SIZE 4286 ENVELOPE ("Wed, 17 Jul 1996 02:23:25 -0700 (PDT)" '
 
380
#            '"IMAP4rev1 WG mtg summary and minutes" '
 
381
#            '(("Terry Gray" NIL "gray" "cac.washington.edu")) '
 
382
#            '(("Terry Gray" NIL "gray" "cac.washington.edu")) '
 
383
#            '(("Terry Gray" NIL "gray" "cac.washington.edu")) '
 
384
#            '((NIL NIL "imap" "cac.washington.edu")) '
 
385
#            '((NIL NIL "minutes" "CNRI.Reston.VA.US") '
 
386
#            '("John Klensin" NIL "KLENSIN" "INFOODS.MIT.EDU")) NIL NIL '
 
387
#            '"<B27397-0100000@cac.washington.edu>") '
 
388
#            'BODY ("TEXT" "PLAIN" ("CHARSET" "US-ASCII") NIL NIL "7BIT" 3028 92))',
 
389
 
 
390
            '(FLAGS (\Seen) INTERNALDATE "17-Jul-1996 02:44:25 -0700" '
 
391
            'RFC822.SIZE 4286 ENVELOPE ("Wed, 17 Jul 1996 02:23:25 -0700 (PDT)" '
 
392
            '"IMAP4rev1 WG mtg summary and minutes" '
 
393
            '(("Terry Gray" NIL gray cac.washington.edu)) '
 
394
            '(("Terry Gray" NIL gray cac.washington.edu)) '
 
395
            '(("Terry Gray" NIL gray cac.washington.edu)) '
 
396
            '((NIL NIL imap cac.washington.edu)) '
 
397
            '((NIL NIL minutes CNRI.Reston.VA.US) '
 
398
            '("John Klensin" NIL KLENSIN INFOODS.MIT.EDU)) NIL NIL '
 
399
            '<B27397-0100000@cac.washington.edu>) '
 
400
            'BODY (TEXT PLAIN (CHARSET US-ASCII) NIL NIL 7BIT 3028 92))',
 
401
        ]
 
402
 
 
403
        answers = [
 
404
            ['BODY.PEEK', ['HEADER.FIELDS.NOT', ['subject', 'bcc', 'cc']], s],
 
405
 
 
406
            ['FLAGS', [r'\Seen'], 'INTERNALDATE',
 
407
            '17-Jul-1996 02:44:25 -0700', 'RFC822.SIZE', '4286', 'ENVELOPE',
 
408
            ['Wed, 17 Jul 1996 02:23:25 -0700 (PDT)',
 
409
            'IMAP4rev1 WG mtg summary and minutes', [["Terry Gray", None,
 
410
            "gray", "cac.washington.edu"]], [["Terry Gray", None,
 
411
            "gray", "cac.washington.edu"]], [["Terry Gray", None,
 
412
            "gray", "cac.washington.edu"]], [[None, None, "imap",
 
413
            "cac.washington.edu"]], [[None, None, "minutes",
 
414
            "CNRI.Reston.VA.US"], ["John Klensin", None, "KLENSIN",
 
415
            "INFOODS.MIT.EDU"]], None, None,
 
416
            "<B27397-0100000@cac.washington.edu>"], "BODY", ["TEXT", "PLAIN",
 
417
            ["CHARSET", "US-ASCII"], None, None, "7BIT", "3028", "92"]],
 
418
        ]
 
419
 
 
420
        for (case, expected) in zip(cases, answers):
 
421
            self.assertEquals(imap4.parseNestedParens(case), [expected])
 
422
 
 
423
        # XXX This code used to work, but changes occurred within the
 
424
        # imap4.py module which made it no longer necessary for *all* of it
 
425
        # to work.  In particular, only the part that makes
 
426
        # 'BODY.PEEK[HEADER.FIELDS.NOT (Subject Bcc Cc)]' come out correctly
 
427
        # no longer needs to work.  So, I am loathe to delete the entire
 
428
        # section of the test. --exarkun
 
429
        #
 
430
 
 
431
#        for (case, expected) in zip(answers, cases):
 
432
#            self.assertEquals('(' + imap4.collapseNestedLists(case) + ')', expected)
 
433
 
 
434
    def testFetchParserSimple(self):
 
435
        cases = [
 
436
            ['ENVELOPE', 'Envelope'],
 
437
            ['FLAGS', 'Flags'],
 
438
            ['INTERNALDATE', 'InternalDate'],
 
439
            ['RFC822.HEADER', 'RFC822Header'],
 
440
            ['RFC822.SIZE', 'RFC822Size'],
 
441
            ['RFC822.TEXT', 'RFC822Text'],
 
442
            ['RFC822', 'RFC822'],
 
443
            ['UID', 'UID'],
 
444
            ['BODYSTRUCTURE', 'BodyStructure'],
 
445
        ]
 
446
 
 
447
        for (inp, outp) in cases:
 
448
            p = imap4._FetchParser()
 
449
            p.parseString(inp)
 
450
            self.assertEquals(len(p.result), 1)
 
451
            self.failUnless(isinstance(p.result[0], getattr(p, outp)))
 
452
 
 
453
    def testFetchParserMacros(self):
 
454
        cases = [
 
455
            ['ALL', (4, ['flags', 'internaldate', 'rfc822.size', 'envelope'])],
 
456
            ['FULL', (5, ['flags', 'internaldate', 'rfc822.size', 'envelope', 'body'])],
 
457
            ['FAST', (3, ['flags', 'internaldate', 'rfc822.size'])],
 
458
        ]
 
459
 
 
460
        for (inp, outp) in cases:
 
461
            p = imap4._FetchParser()
 
462
            p.parseString(inp)
 
463
            self.assertEquals(len(p.result), outp[0])
 
464
            p = [str(p).lower() for p in p.result]
 
465
            p.sort()
 
466
            outp[1].sort()
 
467
            self.assertEquals(p, outp[1])
 
468
 
 
469
    def testFetchParserBody(self):
 
470
        P = imap4._FetchParser
 
471
 
 
472
        p = P()
 
473
        p.parseString('BODY')
 
474
        self.assertEquals(len(p.result), 1)
 
475
        self.failUnless(isinstance(p.result[0], p.Body))
 
476
        self.assertEquals(p.result[0].peek, False)
 
477
        self.assertEquals(p.result[0].header, None)
 
478
        self.assertEquals(str(p.result[0]), 'BODY')
 
479
 
 
480
        p = P()
 
481
        p.parseString('BODY.PEEK')
 
482
        self.assertEquals(len(p.result), 1)
 
483
        self.failUnless(isinstance(p.result[0], p.Body))
 
484
        self.assertEquals(p.result[0].peek, True)
 
485
        self.assertEquals(str(p.result[0]), 'BODY')
 
486
 
 
487
        p = P()
 
488
        p.parseString('BODY[]')
 
489
        self.assertEquals(len(p.result), 1)
 
490
        self.failUnless(isinstance(p.result[0], p.Body))
 
491
        self.assertEquals(p.result[0].empty, True)
 
492
        self.assertEquals(str(p.result[0]), 'BODY[]')
 
493
 
 
494
        p = P()
 
495
        p.parseString('BODY[HEADER]')
 
496
        self.assertEquals(len(p.result), 1)
 
497
        self.failUnless(isinstance(p.result[0], p.Body))
 
498
        self.assertEquals(p.result[0].peek, False)
 
499
        self.failUnless(isinstance(p.result[0].header, p.Header))
 
500
        self.assertEquals(p.result[0].header.negate, True)
 
501
        self.assertEquals(p.result[0].header.fields, ())
 
502
        self.assertEquals(p.result[0].empty, False)
 
503
        self.assertEquals(str(p.result[0]), 'BODY[HEADER]')
 
504
 
 
505
        p = P()
 
506
        p.parseString('BODY.PEEK[HEADER]')
 
507
        self.assertEquals(len(p.result), 1)
 
508
        self.failUnless(isinstance(p.result[0], p.Body))
 
509
        self.assertEquals(p.result[0].peek, True)
 
510
        self.failUnless(isinstance(p.result[0].header, p.Header))
 
511
        self.assertEquals(p.result[0].header.negate, True)
 
512
        self.assertEquals(p.result[0].header.fields, ())
 
513
        self.assertEquals(p.result[0].empty, False)
 
514
        self.assertEquals(str(p.result[0]), 'BODY[HEADER]')
 
515
 
 
516
        p = P()
 
517
        p.parseString('BODY[HEADER.FIELDS (Subject Cc Message-Id)]')
 
518
        self.assertEquals(len(p.result), 1)
 
519
        self.failUnless(isinstance(p.result[0], p.Body))
 
520
        self.assertEquals(p.result[0].peek, False)
 
521
        self.failUnless(isinstance(p.result[0].header, p.Header))
 
522
        self.assertEquals(p.result[0].header.negate, False)
 
523
        self.assertEquals(p.result[0].header.fields, ['SUBJECT', 'CC', 'MESSAGE-ID'])
 
524
        self.assertEquals(p.result[0].empty, False)
 
525
        self.assertEquals(str(p.result[0]), 'BODY[HEADER.FIELDS (Subject Cc Message-Id)]')
 
526
 
 
527
        p = P()
 
528
        p.parseString('BODY.PEEK[HEADER.FIELDS (Subject Cc Message-Id)]')
 
529
        self.assertEquals(len(p.result), 1)
 
530
        self.failUnless(isinstance(p.result[0], p.Body))
 
531
        self.assertEquals(p.result[0].peek, True)
 
532
        self.failUnless(isinstance(p.result[0].header, p.Header))
 
533
        self.assertEquals(p.result[0].header.negate, False)
 
534
        self.assertEquals(p.result[0].header.fields, ['SUBJECT', 'CC', 'MESSAGE-ID'])
 
535
        self.assertEquals(p.result[0].empty, False)
 
536
        self.assertEquals(str(p.result[0]), 'BODY[HEADER.FIELDS (Subject Cc Message-Id)]')
 
537
 
 
538
        p = P()
 
539
        p.parseString('BODY.PEEK[HEADER.FIELDS.NOT (Subject Cc Message-Id)]')
 
540
        self.assertEquals(len(p.result), 1)
 
541
        self.failUnless(isinstance(p.result[0], p.Body))
 
542
        self.assertEquals(p.result[0].peek, True)
 
543
        self.failUnless(isinstance(p.result[0].header, p.Header))
 
544
        self.assertEquals(p.result[0].header.negate, True)
 
545
        self.assertEquals(p.result[0].header.fields, ['SUBJECT', 'CC', 'MESSAGE-ID'])
 
546
        self.assertEquals(p.result[0].empty, False)
 
547
        self.assertEquals(str(p.result[0]), 'BODY[HEADER.FIELDS.NOT (Subject Cc Message-Id)]')
 
548
 
 
549
        p = P()
 
550
        p.parseString('BODY[1.MIME]<10.50>')
 
551
        self.assertEquals(len(p.result), 1)
 
552
        self.failUnless(isinstance(p.result[0], p.Body))
 
553
        self.assertEquals(p.result[0].peek, False)
 
554
        self.failUnless(isinstance(p.result[0].mime, p.MIME))
 
555
        self.assertEquals(p.result[0].part, (0,))
 
556
        self.assertEquals(p.result[0].partialBegin, 10)
 
557
        self.assertEquals(p.result[0].partialLength, 50)
 
558
        self.assertEquals(p.result[0].empty, False)
 
559
        self.assertEquals(str(p.result[0]), 'BODY[1.MIME]<10.50>')
 
560
 
 
561
        p = P()
 
562
        p.parseString('BODY.PEEK[1.3.9.11.HEADER.FIELDS.NOT (Message-Id Date)]<103.69>')
 
563
        self.assertEquals(len(p.result), 1)
 
564
        self.failUnless(isinstance(p.result[0], p.Body))
 
565
        self.assertEquals(p.result[0].peek, True)
 
566
        self.failUnless(isinstance(p.result[0].header, p.Header))
 
567
        self.assertEquals(p.result[0].part, (0, 2, 8, 10))
 
568
        self.assertEquals(p.result[0].header.fields, ['MESSAGE-ID', 'DATE'])
 
569
        self.assertEquals(p.result[0].partialBegin, 103)
 
570
        self.assertEquals(p.result[0].partialLength, 69)
 
571
        self.assertEquals(p.result[0].empty, False)
 
572
        self.assertEquals(str(p.result[0]), 'BODY[1.3.9.11.HEADER.FIELDS.NOT (Message-Id Date)]<103.69>')
 
573
 
 
574
 
 
575
    def testFiles(self):
 
576
        inputStructure = [
 
577
            'foo', 'bar', 'baz', StringIO('this is a file\r\n'), 'buz'
 
578
        ]
 
579
 
 
580
        output = '"foo" "bar" "baz" {16}\r\nthis is a file\r\n "buz"'
 
581
 
 
582
        self.assertEquals(imap4.collapseNestedLists(inputStructure), output)
 
583
 
 
584
    def testQuoteAvoider(self):
 
585
        input = [
 
586
            'foo', imap4.DontQuoteMe('bar'), "baz", StringIO('this is a file\r\n'),
 
587
            imap4.DontQuoteMe('buz'), ""
 
588
        ]
 
589
 
 
590
        output = '"foo" bar "baz" {16}\r\nthis is a file\r\n buz ""'
 
591
 
 
592
        self.assertEquals(imap4.collapseNestedLists(input), output)
 
593
 
 
594
    def testLiterals(self):
 
595
        cases = [
 
596
            ('({10}\r\n0123456789)', [['0123456789']]),
 
597
        ]
 
598
 
 
599
        for (case, expected) in cases:
 
600
            self.assertEquals(imap4.parseNestedParens(case), expected)
 
601
 
 
602
    def testQueryBuilder(self):
 
603
        inputs = [
 
604
            imap4.Query(flagged=1),
 
605
            imap4.Query(sorted=1, unflagged=1, deleted=1),
 
606
            imap4.Or(imap4.Query(flagged=1), imap4.Query(deleted=1)),
 
607
            imap4.Query(before='today'),
 
608
            imap4.Or(
 
609
                imap4.Query(deleted=1),
 
610
                imap4.Query(unseen=1),
 
611
                imap4.Query(new=1)
 
612
            ),
 
613
            imap4.Or(
 
614
                imap4.Not(
 
615
                    imap4.Or(
 
616
                        imap4.Query(sorted=1, since='yesterday', smaller=1000),
 
617
                        imap4.Query(sorted=1, before='tuesday', larger=10000),
 
618
                        imap4.Query(sorted=1, unseen=1, deleted=1, before='today'),
 
619
                        imap4.Not(
 
620
                            imap4.Query(subject='spam')
 
621
                        ),
 
622
                    ),
 
623
                ),
 
624
                imap4.Not(
 
625
                    imap4.Query(uid='1:5')
 
626
                ),
 
627
            )
 
628
        ]
 
629
 
 
630
        outputs = [
 
631
            'FLAGGED',
 
632
            '(DELETED UNFLAGGED)',
 
633
            '(OR FLAGGED DELETED)',
 
634
            '(BEFORE "today")',
 
635
            '(OR DELETED (OR UNSEEN NEW))',
 
636
            '(OR (NOT (OR (SINCE "yesterday" SMALLER 1000) ' # Continuing
 
637
            '(OR (BEFORE "tuesday" LARGER 10000) (OR (BEFORE ' # Some more
 
638
            '"today" DELETED UNSEEN) (NOT (SUBJECT "spam")))))) ' # And more
 
639
            '(NOT (UID 1:5)))',
 
640
        ]
 
641
 
 
642
        for (query, expected) in zip(inputs, outputs):
 
643
            self.assertEquals(query, expected)
 
644
 
 
645
    def testIdListParser(self):
 
646
        inputs = [
 
647
            '1:*',
 
648
            '5:*',
 
649
            '1:2,5:*',
 
650
            '1',
 
651
            '1,2',
 
652
            '1,3,5',
 
653
            '1:10',
 
654
            '1:10,11',
 
655
            '1:5,10:20',
 
656
            '1,5:10',
 
657
            '1,5:10,15:20',
 
658
            '1:10,15,20:25',
 
659
        ]
 
660
 
 
661
        outputs = [
 
662
            MessageSet(1, None),
 
663
            MessageSet(5, None),
 
664
            MessageSet(5, None) + MessageSet(1, 2),
 
665
            MessageSet(1),
 
666
            MessageSet(1, 2),
 
667
            MessageSet(1) + MessageSet(3) + MessageSet(5),
 
668
            MessageSet(1, 10),
 
669
            MessageSet(1, 11),
 
670
            MessageSet(1, 5) + MessageSet(10, 20),
 
671
            MessageSet(1) + MessageSet(5, 10),
 
672
            MessageSet(1) + MessageSet(5, 10) + MessageSet(15, 20),
 
673
            MessageSet(1, 10) + MessageSet(15) + MessageSet(20, 25),
 
674
        ]
 
675
 
 
676
        lengths = [
 
677
            None, None, None,
 
678
            1, 2, 3, 10, 11, 16, 7, 13, 17,
 
679
        ]
 
680
 
 
681
        for (input, expected) in zip(inputs, outputs):
 
682
            self.assertEquals(imap4.parseIdList(input), expected)
 
683
 
 
684
        for (input, expected) in zip(inputs, lengths):
 
685
            try:
 
686
                L = len(imap4.parseIdList(input))
 
687
            except TypeError:
 
688
                L = None
 
689
            self.assertEquals(L, expected,
 
690
                "len(%r) = %r != %r" % (input, L, expected))
 
691
 
 
692
class SimpleMailbox:
 
693
    implements(imap4.IMailboxInfo, imap4.IMailbox, imap4.ICloseableMailbox)
 
694
 
 
695
    flags = ('\\Flag1', 'Flag2', '\\AnotherSysFlag', 'LastFlag')
 
696
    messages = []
 
697
    mUID = 0
 
698
    rw = 1
 
699
    closed = False
 
700
 
 
701
    def __init__(self):
 
702
        self.listeners = []
 
703
        self.addListener = self.listeners.append
 
704
        self.removeListener = self.listeners.remove
 
705
 
 
706
    def getFlags(self):
 
707
        return self.flags
 
708
 
 
709
    def getUIDValidity(self):
 
710
        return 42
 
711
 
 
712
    def getUIDNext(self):
 
713
        return len(self.messages) + 1
 
714
 
 
715
    def getMessageCount(self):
 
716
        return 9
 
717
 
 
718
    def getRecentCount(self):
 
719
        return 3
 
720
 
 
721
    def getUnseenCount(self):
 
722
        return 4
 
723
 
 
724
    def isWriteable(self):
 
725
        return self.rw
 
726
 
 
727
    def destroy(self):
 
728
        pass
 
729
 
 
730
    def getHierarchicalDelimiter(self):
 
731
        return '/'
 
732
 
 
733
    def requestStatus(self, names):
 
734
        r = {}
 
735
        if 'MESSAGES' in names:
 
736
            r['MESSAGES'] = self.getMessageCount()
 
737
        if 'RECENT' in names:
 
738
            r['RECENT'] = self.getRecentCount()
 
739
        if 'UIDNEXT' in names:
 
740
            r['UIDNEXT'] = self.getMessageCount() + 1
 
741
        if 'UIDVALIDITY' in names:
 
742
            r['UIDVALIDITY'] = self.getUID()
 
743
        if 'UNSEEN' in names:
 
744
            r['UNSEEN'] = self.getUnseenCount()
 
745
        return defer.succeed(r)
 
746
 
 
747
    def addMessage(self, message, flags, date = None):
 
748
        self.messages.append((message, flags, date, self.mUID))
 
749
        self.mUID += 1
 
750
        return defer.succeed(None)
 
751
 
 
752
    def expunge(self):
 
753
        delete = []
 
754
        for i in self.messages:
 
755
            if '\\Deleted' in i[1]:
 
756
                delete.append(i)
 
757
        for i in delete:
 
758
            self.messages.remove(i)
 
759
        return [i[3] for i in delete]
 
760
 
 
761
    def close(self):
 
762
        self.closed = True
 
763
 
 
764
class Account(imap4.MemoryAccount):
 
765
    mailboxFactory = SimpleMailbox
 
766
    def _emptyMailbox(self, name, id):
 
767
        return self.mailboxFactory()
 
768
 
 
769
    def select(self, name, rw=1):
 
770
        mbox = imap4.MemoryAccount.select(self, name)
 
771
        if mbox is not None:
 
772
            mbox.rw = rw
 
773
        return mbox
 
774
 
 
775
class SimpleServer(imap4.IMAP4Server):
 
776
    def __init__(self, *args, **kw):
 
777
        imap4.IMAP4Server.__init__(self, *args, **kw)
 
778
        realm = TestRealm()
 
779
        realm.theAccount = Account('testuser')
 
780
        portal = cred.portal.Portal(realm)
 
781
        c = cred.checkers.InMemoryUsernamePasswordDatabaseDontUse()
 
782
        self.checker = c
 
783
        self.portal = portal
 
784
        portal.registerChecker(c)
 
785
        self.timeoutTest = False
 
786
 
 
787
    def lineReceived(self, line):
 
788
        if self.timeoutTest:
 
789
            #Do not send a respones
 
790
            return
 
791
 
 
792
        imap4.IMAP4Server.lineReceived(self, line)
 
793
 
 
794
    _username = 'testuser'
 
795
    _password = 'password-test'
 
796
    def authenticateLogin(self, username, password):
 
797
        if username == self._username and password == self._password:
 
798
            return imap4.IAccount, self.theAccount, lambda: None
 
799
        raise cred.error.UnauthorizedLogin()
 
800
 
 
801
 
 
802
class SimpleClient(imap4.IMAP4Client):
 
803
    def __init__(self, deferred, contextFactory = None):
 
804
        imap4.IMAP4Client.__init__(self, contextFactory)
 
805
        self.deferred = deferred
 
806
        self.events = []
 
807
 
 
808
    def serverGreeting(self, caps):
 
809
        self.deferred.callback(None)
 
810
 
 
811
    def modeChanged(self, writeable):
 
812
        self.events.append(['modeChanged', writeable])
 
813
        self.transport.loseConnection()
 
814
 
 
815
    def flagsChanged(self, newFlags):
 
816
        self.events.append(['flagsChanged', newFlags])
 
817
        self.transport.loseConnection()
 
818
 
 
819
    def newMessages(self, exists, recent):
 
820
        self.events.append(['newMessages', exists, recent])
 
821
        self.transport.loseConnection()
 
822
 
 
823
    def fetchBodyParts(self, message, parts):
 
824
        """Fetch some parts of the body.
 
825
 
 
826
        @param message: message with parts to fetch
 
827
        @type message: C{str}
 
828
        @param parts: a list of int/str
 
829
        @type parts: C{list}
 
830
        """
 
831
        cmd = "%s (BODY[%s]" % (message, parts[0])
 
832
        for p in parts[1:]:
 
833
            cmd += " BODY[%s]" % p
 
834
        cmd += ")"
 
835
        d = self.sendCommand(imap4.Command("FETCH", cmd,
 
836
                                           wantResponse=("FETCH",)))
 
837
        d.addCallback(self.__cb_fetchBodyParts)
 
838
        return d
 
839
 
 
840
    def __cb_fetchBodyParts(self, (lines, last)):
 
841
        info = {}
 
842
        for line in lines:
 
843
            parts = line.split(None, 2)
 
844
            if len(parts) == 3:
 
845
                if parts[1] == "FETCH":
 
846
                    try:
 
847
                        mail_id = int(parts[0])
 
848
                    except ValueError:
 
849
                        raise imap4.IllegalServerResponse, line
 
850
                    else:
 
851
                        body_parts = imap4.parseNestedParens(parts[2])[0]
 
852
                        dict_parts = {}
 
853
                        for i in range(len(body_parts)/3):
 
854
                            dict_parts[body_parts[3*i+1][0]] = body_parts[3*i+2]
 
855
                        info[mail_id] = dict_parts
 
856
        return info
 
857
 
 
858
class IMAP4HelperMixin:
 
859
    serverCTX = None
 
860
    clientCTX = None
 
861
 
 
862
    def setUp(self):
 
863
        d = defer.Deferred()
 
864
        self.server = SimpleServer(contextFactory=self.serverCTX)
 
865
        self.client = SimpleClient(d, contextFactory=self.clientCTX)
 
866
        self.connected = d
 
867
 
 
868
        SimpleMailbox.messages = []
 
869
        theAccount = Account('testuser')
 
870
        theAccount.mboxType = SimpleMailbox
 
871
        SimpleServer.theAccount = theAccount
 
872
 
 
873
    def tearDown(self):
 
874
        del self.server
 
875
        del self.client
 
876
        del self.connected
 
877
 
 
878
    def _cbStopClient(self, ignore):
 
879
        self.client.transport.loseConnection()
 
880
 
 
881
    def _ebGeneral(self, failure):
 
882
        self.client.transport.loseConnection()
 
883
        self.server.transport.loseConnection()
 
884
        failure.printTraceback(open('failure.log', 'w'))
 
885
        failure.printTraceback()
 
886
        raise failure.value
 
887
 
 
888
    def loopback(self):
 
889
        return loopback.loopbackAsync(self.server, self.client)
 
890
 
 
891
class IMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):
 
892
    def testCapability(self):
 
893
        caps = {}
 
894
        def getCaps():
 
895
            def gotCaps(c):
 
896
                caps.update(c)
 
897
                self.server.transport.loseConnection()
 
898
            return self.client.getCapabilities().addCallback(gotCaps)
 
899
        d1 = self.connected.addCallback(strip(getCaps)).addErrback(self._ebGeneral)
 
900
        d = defer.gatherResults([self.loopback(), d1])
 
901
        expected = {'IMAP4rev1': None, 'NAMESPACE': None, 'IDLE': None}
 
902
        return d.addCallback(lambda _: self.assertEquals(expected, caps))
 
903
 
 
904
    def testCapabilityWithAuth(self):
 
905
        caps = {}
 
906
        self.server.challengers['CRAM-MD5'] = cred.credentials.CramMD5Credentials
 
907
        def getCaps():
 
908
            def gotCaps(c):
 
909
                caps.update(c)
 
910
                self.server.transport.loseConnection()
 
911
            return self.client.getCapabilities().addCallback(gotCaps)
 
912
        d1 = self.connected.addCallback(strip(getCaps)).addErrback(self._ebGeneral)
 
913
        d = defer.gatherResults([self.loopback(), d1])
 
914
 
 
915
        expCap = {'IMAP4rev1': None, 'NAMESPACE': None,
 
916
                  'IDLE': None, 'AUTH': ['CRAM-MD5']}
 
917
 
 
918
        return d.addCallback(lambda _: self.assertEquals(expCap, caps))
 
919
 
 
920
    def testLogout(self):
 
921
        self.loggedOut = 0
 
922
        def logout():
 
923
            def setLoggedOut():
 
924
                self.loggedOut = 1
 
925
            self.client.logout().addCallback(strip(setLoggedOut))
 
926
        self.connected.addCallback(strip(logout)).addErrback(self._ebGeneral)
 
927
        d = self.loopback()
 
928
        return d.addCallback(lambda _: self.assertEquals(self.loggedOut, 1))
 
929
 
 
930
    def testNoop(self):
 
931
        self.responses = None
 
932
        def noop():
 
933
            def setResponses(responses):
 
934
                self.responses = responses
 
935
                self.server.transport.loseConnection()
 
936
            self.client.noop().addCallback(setResponses)
 
937
        self.connected.addCallback(strip(noop)).addErrback(self._ebGeneral)
 
938
        d = self.loopback()
 
939
        return d.addCallback(lambda _: self.assertEquals(self.responses, []))
 
940
 
 
941
    def testLogin(self):
 
942
        def login():
 
943
            d = self.client.login('testuser', 'password-test')
 
944
            d.addCallback(self._cbStopClient)
 
945
        d1 = self.connected.addCallback(strip(login)).addErrback(self._ebGeneral)
 
946
        d = defer.gatherResults([d1, self.loopback()])
 
947
        return d.addCallback(self._cbTestLogin)
 
948
 
 
949
    def _cbTestLogin(self, ignored):
 
950
        self.assertEquals(self.server.account, SimpleServer.theAccount)
 
951
        self.assertEquals(self.server.state, 'auth')
 
952
 
 
953
    def testFailedLogin(self):
 
954
        def login():
 
955
            d = self.client.login('testuser', 'wrong-password')
 
956
            d.addBoth(self._cbStopClient)
 
957
 
 
958
        d1 = self.connected.addCallback(strip(login)).addErrback(self._ebGeneral)
 
959
        d2 = self.loopback()
 
960
        d = defer.gatherResults([d1, d2])
 
961
        return d.addCallback(self._cbTestFailedLogin)
 
962
 
 
963
    def _cbTestFailedLogin(self, ignored):
 
964
        self.assertEquals(self.server.account, None)
 
965
        self.assertEquals(self.server.state, 'unauth')
 
966
 
 
967
 
 
968
    def testLoginRequiringQuoting(self):
 
969
        self.server._username = '{test}user'
 
970
        self.server._password = '{test}password'
 
971
 
 
972
        def login():
 
973
            d = self.client.login('{test}user', '{test}password')
 
974
            d.addBoth(self._cbStopClient)
 
975
 
 
976
        d1 = self.connected.addCallback(strip(login)).addErrback(self._ebGeneral)
 
977
        d = defer.gatherResults([self.loopback(), d1])
 
978
        return d.addCallback(self._cbTestLoginRequiringQuoting)
 
979
 
 
980
    def _cbTestLoginRequiringQuoting(self, ignored):
 
981
        self.assertEquals(self.server.account, SimpleServer.theAccount)
 
982
        self.assertEquals(self.server.state, 'auth')
 
983
 
 
984
 
 
985
    def testNamespace(self):
 
986
        self.namespaceArgs = None
 
987
        def login():
 
988
            return self.client.login('testuser', 'password-test')
 
989
        def namespace():
 
990
            def gotNamespace(args):
 
991
                self.namespaceArgs = args
 
992
                self._cbStopClient(None)
 
993
            return self.client.namespace().addCallback(gotNamespace)
 
994
 
 
995
        d1 = self.connected.addCallback(strip(login))
 
996
        d1.addCallback(strip(namespace))
 
997
        d1.addErrback(self._ebGeneral)
 
998
        d2 = self.loopback()
 
999
        d = defer.gatherResults([d1, d2])
 
1000
        d.addCallback(lambda _: self.assertEquals(self.namespaceArgs,
 
1001
                                                  [[['', '/']], [], []]))
 
1002
        return d
 
1003
 
 
1004
    def testSelect(self):
 
1005
        SimpleServer.theAccount.addMailbox('test-mailbox')
 
1006
        self.selectedArgs = None
 
1007
        def login():
 
1008
            return self.client.login('testuser', 'password-test')
 
1009
        def select():
 
1010
            def selected(args):
 
1011
                self.selectedArgs = args
 
1012
                self._cbStopClient(None)
 
1013
            d = self.client.select('test-mailbox')
 
1014
            d.addCallback(selected)
 
1015
            return d
 
1016
 
 
1017
        d1 = self.connected.addCallback(strip(login))
 
1018
        d1.addCallback(strip(select))
 
1019
        d1.addErrback(self._ebGeneral)
 
1020
        d2 = self.loopback()
 
1021
        return defer.gatherResults([d1, d2]).addCallback(self._cbTestSelect)
 
1022
 
 
1023
    def _cbTestSelect(self, ignored):
 
1024
        mbox = SimpleServer.theAccount.mailboxes['TEST-MAILBOX']
 
1025
        self.assertEquals(self.server.mbox, mbox)
 
1026
        self.assertEquals(self.selectedArgs, {
 
1027
            'EXISTS': 9, 'RECENT': 3, 'UIDVALIDITY': 42,
 
1028
            'FLAGS': ('\\Flag1', 'Flag2', '\\AnotherSysFlag', 'LastFlag'),
 
1029
            'READ-WRITE': 1
 
1030
        })
 
1031
 
 
1032
    def testExamine(self):
 
1033
        SimpleServer.theAccount.addMailbox('test-mailbox')
 
1034
        self.examinedArgs = None
 
1035
        def login():
 
1036
            return self.client.login('testuser', 'password-test')
 
1037
        def examine():
 
1038
            def examined(args):
 
1039
                self.examinedArgs = args
 
1040
                self._cbStopClient(None)
 
1041
            d = self.client.examine('test-mailbox')
 
1042
            d.addCallback(examined)
 
1043
            return d
 
1044
 
 
1045
        d1 = self.connected.addCallback(strip(login))
 
1046
        d1.addCallback(strip(examine))
 
1047
        d1.addErrback(self._ebGeneral)
 
1048
        d2 = self.loopback()
 
1049
        d = defer.gatherResults([d1, d2])
 
1050
        return d.addCallback(self._cbTestExamine)
 
1051
 
 
1052
    def _cbTestExamine(self, ignored):
 
1053
        mbox = SimpleServer.theAccount.mailboxes['TEST-MAILBOX']
 
1054
        self.assertEquals(self.server.mbox, mbox)
 
1055
        self.assertEquals(self.examinedArgs, {
 
1056
            'EXISTS': 9, 'RECENT': 3, 'UIDVALIDITY': 42,
 
1057
            'FLAGS': ('\\Flag1', 'Flag2', '\\AnotherSysFlag', 'LastFlag'),
 
1058
            'READ-WRITE': 0
 
1059
        })
 
1060
 
 
1061
    def testCreate(self):
 
1062
        succeed = ('testbox', 'test/box', 'test/', 'test/box/box', 'INBOX')
 
1063
        fail = ('testbox', 'test/box')
 
1064
 
 
1065
        def cb(): self.result.append(1)
 
1066
        def eb(failure): self.result.append(0)
 
1067
 
 
1068
        def login():
 
1069
            return self.client.login('testuser', 'password-test')
 
1070
        def create():
 
1071
            for name in succeed + fail:
 
1072
                d = self.client.create(name)
 
1073
                d.addCallback(strip(cb)).addErrback(eb)
 
1074
            d.addCallbacks(self._cbStopClient, self._ebGeneral)
 
1075
 
 
1076
        self.result = []
 
1077
        d1 = self.connected.addCallback(strip(login)).addCallback(strip(create))
 
1078
        d2 = self.loopback()
 
1079
        d = defer.gatherResults([d1, d2])
 
1080
        return d.addCallback(self._cbTestCreate, succeed, fail)
 
1081
 
 
1082
    def _cbTestCreate(self, ignored, succeed, fail):
 
1083
        self.assertEquals(self.result, [1] * len(succeed) + [0] * len(fail))
 
1084
        mbox = SimpleServer.theAccount.mailboxes.keys()
 
1085
        answers = ['inbox', 'testbox', 'test/box', 'test', 'test/box/box']
 
1086
        mbox.sort()
 
1087
        answers.sort()
 
1088
        self.assertEquals(mbox, [a.upper() for a in answers])
 
1089
 
 
1090
    def testDelete(self):
 
1091
        SimpleServer.theAccount.addMailbox('delete/me')
 
1092
 
 
1093
        def login():
 
1094
            return self.client.login('testuser', 'password-test')
 
1095
        def delete():
 
1096
            return self.client.delete('delete/me')
 
1097
        d1 = self.connected.addCallback(strip(login))
 
1098
        d1.addCallbacks(strip(delete), self._ebGeneral)
 
1099
        d1.addCallbacks(self._cbStopClient, self._ebGeneral)
 
1100
        d2 = self.loopback()
 
1101
        d = defer.gatherResults([d1, d2])
 
1102
        d.addCallback(lambda _:
 
1103
                      self.assertEquals(SimpleServer.theAccount.mailboxes.keys(), []))
 
1104
        return d
 
1105
 
 
1106
    def testIllegalInboxDelete(self):
 
1107
        self.stashed = None
 
1108
        def login():
 
1109
            return self.client.login('testuser', 'password-test')
 
1110
        def delete():
 
1111
            return self.client.delete('inbox')
 
1112
        def stash(result):
 
1113
            self.stashed = result
 
1114
 
 
1115
        d1 = self.connected.addCallback(strip(login))
 
1116
        d1.addCallbacks(strip(delete), self._ebGeneral)
 
1117
        d1.addBoth(stash)
 
1118
        d1.addCallbacks(self._cbStopClient, self._ebGeneral)
 
1119
        d2 = self.loopback()
 
1120
        d = defer.gatherResults([d1, d2])
 
1121
        d.addCallback(lambda _: self.failUnless(isinstance(self.stashed,
 
1122
                                                           failure.Failure)))
 
1123
        return d
 
1124
 
 
1125
 
 
1126
    def testNonExistentDelete(self):
 
1127
        def login():
 
1128
            return self.client.login('testuser', 'password-test')
 
1129
        def delete():
 
1130
            return self.client.delete('delete/me')
 
1131
        def deleteFailed(failure):
 
1132
            self.failure = failure
 
1133
 
 
1134
        self.failure = None
 
1135
        d1 = self.connected.addCallback(strip(login))
 
1136
        d1.addCallback(strip(delete)).addErrback(deleteFailed)
 
1137
        d1.addCallbacks(self._cbStopClient, self._ebGeneral)
 
1138
        d2 = self.loopback()
 
1139
        d = defer.gatherResults([d1, d2])
 
1140
        d.addCallback(lambda _: self.assertEquals(str(self.failure.value),
 
1141
                                                  'No such mailbox'))
 
1142
        return d
 
1143
    
 
1144
 
 
1145
    def testIllegalDelete(self):
 
1146
        m = SimpleMailbox()
 
1147
        m.flags = (r'\Noselect',)
 
1148
        SimpleServer.theAccount.addMailbox('delete', m)
 
1149
        SimpleServer.theAccount.addMailbox('delete/me')
 
1150
 
 
1151
        def login():
 
1152
            return self.client.login('testuser', 'password-test')
 
1153
        def delete():
 
1154
            return self.client.delete('delete')
 
1155
        def deleteFailed(failure):
 
1156
            self.failure = failure
 
1157
 
 
1158
        self.failure = None
 
1159
        d1 = self.connected.addCallback(strip(login))
 
1160
        d1.addCallback(strip(delete)).addErrback(deleteFailed)
 
1161
        d1.addCallbacks(self._cbStopClient, self._ebGeneral)
 
1162
        d2 = self.loopback()
 
1163
        d = defer.gatherResults([d1, d2])
 
1164
        expected = "Hierarchically inferior mailboxes exist and \\Noselect is set"
 
1165
        d.addCallback(lambda _:
 
1166
                      self.assertEquals(str(self.failure.value), expected))
 
1167
        return d
 
1168
 
 
1169
    def testRename(self):
 
1170
        SimpleServer.theAccount.addMailbox('oldmbox')
 
1171
        def login():
 
1172
            return self.client.login('testuser', 'password-test')
 
1173
        def rename():
 
1174
            return self.client.rename('oldmbox', 'newname')
 
1175
 
 
1176
        d1 = self.connected.addCallback(strip(login))
 
1177
        d1.addCallbacks(strip(rename), self._ebGeneral)
 
1178
        d1.addCallbacks(self._cbStopClient, self._ebGeneral)
 
1179
        d2 = self.loopback()
 
1180
        d = defer.gatherResults([d1, d2])
 
1181
        d.addCallback(lambda _:
 
1182
                      self.assertEquals(SimpleServer.theAccount.mailboxes.keys(),
 
1183
                                        ['NEWNAME']))
 
1184
        return d
 
1185
 
 
1186
    def testIllegalInboxRename(self):
 
1187
        self.stashed = None
 
1188
        def login():
 
1189
            return self.client.login('testuser', 'password-test')
 
1190
        def rename():
 
1191
            return self.client.rename('inbox', 'frotz')
 
1192
        def stash(stuff):
 
1193
            self.stashed = stuff
 
1194
 
 
1195
        d1 = self.connected.addCallback(strip(login))
 
1196
        d1.addCallbacks(strip(rename), self._ebGeneral)
 
1197
        d1.addBoth(stash)
 
1198
        d1.addCallbacks(self._cbStopClient, self._ebGeneral)
 
1199
        d2 = self.loopback()
 
1200
        d = defer.gatherResults([d1, d2])
 
1201
        d.addCallback(lambda _:
 
1202
                      self.failUnless(isinstance(self.stashed, failure.Failure)))
 
1203
        return d
 
1204
 
 
1205
    def testHierarchicalRename(self):
 
1206
        SimpleServer.theAccount.create('oldmbox/m1')
 
1207
        SimpleServer.theAccount.create('oldmbox/m2')
 
1208
        def login():
 
1209
            return self.client.login('testuser', 'password-test')
 
1210
        def rename():
 
1211
            return self.client.rename('oldmbox', 'newname')
 
1212
 
 
1213
        d1 = self.connected.addCallback(strip(login))
 
1214
        d1.addCallbacks(strip(rename), self._ebGeneral)
 
1215
        d1.addCallbacks(self._cbStopClient, self._ebGeneral)
 
1216
        d2 = self.loopback()
 
1217
        d = defer.gatherResults([d1, d2])
 
1218
        return d.addCallback(self._cbTestHierarchicalRename)
 
1219
 
 
1220
    def _cbTestHierarchicalRename(self, ignored):
 
1221
        mboxes = SimpleServer.theAccount.mailboxes.keys()
 
1222
        expected = ['newname', 'newname/m1', 'newname/m2']
 
1223
        mboxes.sort()
 
1224
        self.assertEquals(mboxes, [s.upper() for s in expected])
 
1225
 
 
1226
    def testSubscribe(self):
 
1227
        def login():
 
1228
            return self.client.login('testuser', 'password-test')
 
1229
        def subscribe():
 
1230
            return self.client.subscribe('this/mbox')
 
1231
 
 
1232
        d1 = self.connected.addCallback(strip(login))
 
1233
        d1.addCallbacks(strip(subscribe), self._ebGeneral)
 
1234
        d1.addCallbacks(self._cbStopClient, self._ebGeneral)
 
1235
        d2 = self.loopback()
 
1236
        d = defer.gatherResults([d1, d2])
 
1237
        d.addCallback(lambda _:
 
1238
                      self.assertEquals(SimpleServer.theAccount.subscriptions,
 
1239
                                        ['THIS/MBOX']))
 
1240
        return d
 
1241
 
 
1242
    def testUnsubscribe(self):
 
1243
        SimpleServer.theAccount.subscriptions = ['THIS/MBOX', 'THAT/MBOX']
 
1244
        def login():
 
1245
            return self.client.login('testuser', 'password-test')
 
1246
        def unsubscribe():
 
1247
            return self.client.unsubscribe('this/mbox')
 
1248
 
 
1249
        d1 = self.connected.addCallback(strip(login))
 
1250
        d1.addCallbacks(strip(unsubscribe), self._ebGeneral)
 
1251
        d1.addCallbacks(self._cbStopClient, self._ebGeneral)
 
1252
        d2 = self.loopback()
 
1253
        d = defer.gatherResults([d1, d2])
 
1254
        d.addCallback(lambda _:
 
1255
                      self.assertEquals(SimpleServer.theAccount.subscriptions,
 
1256
                                        ['THAT/MBOX']))
 
1257
        return d
 
1258
 
 
1259
    def _listSetup(self, f):
 
1260
        SimpleServer.theAccount.addMailbox('root/subthing')
 
1261
        SimpleServer.theAccount.addMailbox('root/another-thing')
 
1262
        SimpleServer.theAccount.addMailbox('non-root/subthing')
 
1263
 
 
1264
        def login():
 
1265
            return self.client.login('testuser', 'password-test')
 
1266
        def listed(answers):
 
1267
            self.listed = answers
 
1268
 
 
1269
        self.listed = None
 
1270
        d1 = self.connected.addCallback(strip(login))
 
1271
        d1.addCallbacks(strip(f), self._ebGeneral)
 
1272
        d1.addCallbacks(listed, self._ebGeneral)
 
1273
        d1.addCallbacks(self._cbStopClient, self._ebGeneral)
 
1274
        d2 = self.loopback()
 
1275
        return defer.gatherResults([d1, d2]).addCallback(lambda _: self.listed)
 
1276
 
 
1277
    def testList(self):
 
1278
        def list():
 
1279
            return self.client.list('root', '%')
 
1280
        d = self._listSetup(list)
 
1281
        d.addCallback(lambda listed: self.assertEquals(
 
1282
            sortNest(listed),
 
1283
            sortNest([
 
1284
                (SimpleMailbox.flags, "/", "ROOT/SUBTHING"),
 
1285
                (SimpleMailbox.flags, "/", "ROOT/ANOTHER-THING")
 
1286
            ])
 
1287
        ))
 
1288
        return d
 
1289
 
 
1290
    def testLSub(self):
 
1291
        SimpleServer.theAccount.subscribe('ROOT/SUBTHING')
 
1292
        def lsub():
 
1293
            return self.client.lsub('root', '%')
 
1294
        d = self._listSetup(lsub)
 
1295
        d.addCallback(self.assertEquals,
 
1296
                      [(SimpleMailbox.flags, "/", "ROOT/SUBTHING")])
 
1297
        return d
 
1298
 
 
1299
    def testStatus(self):
 
1300
        SimpleServer.theAccount.addMailbox('root/subthing')
 
1301
        def login():
 
1302
            return self.client.login('testuser', 'password-test')
 
1303
        def status():
 
1304
            return self.client.status('root/subthing', 'MESSAGES', 'UIDNEXT', 'UNSEEN')
 
1305
        def statused(result):
 
1306
            self.statused = result
 
1307
 
 
1308
        self.statused = None
 
1309
        d1 = self.connected.addCallback(strip(login))
 
1310
        d1.addCallbacks(strip(status), self._ebGeneral)
 
1311
        d1.addCallbacks(statused, self._ebGeneral)
 
1312
        d1.addCallbacks(self._cbStopClient, self._ebGeneral)
 
1313
        d2 = self.loopback()
 
1314
        d = defer.gatherResults([d1, d2])
 
1315
        d.addCallback(lambda _: self.assertEquals(
 
1316
            self.statused,
 
1317
            {'MESSAGES': 9, 'UIDNEXT': '10', 'UNSEEN': 4}
 
1318
        ))
 
1319
        return d
 
1320
 
 
1321
    def testFailedStatus(self):
 
1322
        def login():
 
1323
            return self.client.login('testuser', 'password-test')
 
1324
        def status():
 
1325
            return self.client.status('root/nonexistent', 'MESSAGES', 'UIDNEXT', 'UNSEEN')
 
1326
        def statused(result):
 
1327
            self.statused = result
 
1328
        def failed(failure):
 
1329
            self.failure = failure
 
1330
 
 
1331
        self.statused = self.failure = None
 
1332
        d1 = self.connected.addCallback(strip(login))
 
1333
        d1.addCallbacks(strip(status), self._ebGeneral)
 
1334
        d1.addCallbacks(statused, failed)
 
1335
        d1.addCallbacks(self._cbStopClient, self._ebGeneral)
 
1336
        d2 = self.loopback()
 
1337
        return defer.gatherResults([d1, d2]).addCallback(self._cbTestFailedStatus)
 
1338
 
 
1339
    def _cbTestFailedStatus(self, ignored):
 
1340
        self.assertEquals(
 
1341
            self.statused, None
 
1342
        )
 
1343
        self.assertEquals(
 
1344
            self.failure.value.args,
 
1345
            ('Could not open mailbox',)
 
1346
        )
 
1347
 
 
1348
    def testFullAppend(self):
 
1349
        infile = util.sibpath(__file__, 'rfc822.message')
 
1350
        message = open(infile)
 
1351
        SimpleServer.theAccount.addMailbox('root/subthing')
 
1352
        def login():
 
1353
            return self.client.login('testuser', 'password-test')
 
1354
        def append():
 
1355
            return self.client.append(
 
1356
                'root/subthing',
 
1357
                message,
 
1358
                ('\\SEEN', '\\DELETED'),
 
1359
                'Tue, 17 Jun 2003 11:22:16 -0600 (MDT)',
 
1360
            )
 
1361
 
 
1362
        d1 = self.connected.addCallback(strip(login))
 
1363
        d1.addCallbacks(strip(append), self._ebGeneral)
 
1364
        d1.addCallbacks(self._cbStopClient, self._ebGeneral)
 
1365
        d2 = self.loopback()
 
1366
        d = defer.gatherResults([d1, d2])
 
1367
        return d.addCallback(self._cbTestFullAppend, infile)
 
1368
 
 
1369
    def _cbTestFullAppend(self, ignored, infile):
 
1370
        mb = SimpleServer.theAccount.mailboxes['ROOT/SUBTHING']
 
1371
        self.assertEquals(1, len(mb.messages))
 
1372
        self.assertEquals(
 
1373
            (['\\SEEN', '\\DELETED'], 'Tue, 17 Jun 2003 11:22:16 -0600 (MDT)', 0),
 
1374
            mb.messages[0][1:]
 
1375
        )
 
1376
        self.assertEquals(open(infile).read(), mb.messages[0][0].getvalue())
 
1377
 
 
1378
    def testPartialAppend(self):
 
1379
        infile = util.sibpath(__file__, 'rfc822.message')
 
1380
        message = open(infile)
 
1381
        SimpleServer.theAccount.addMailbox('PARTIAL/SUBTHING')
 
1382
        def login():
 
1383
            return self.client.login('testuser', 'password-test')
 
1384
        def append():
 
1385
            message = file(infile)
 
1386
            return self.client.sendCommand(
 
1387
                imap4.Command(
 
1388
                    'APPEND',
 
1389
                    'PARTIAL/SUBTHING (\\SEEN) "Right now" {%d}' % os.path.getsize(infile),
 
1390
                    (), self.client._IMAP4Client__cbContinueAppend, message
 
1391
                )
 
1392
            )
 
1393
        d1 = self.connected.addCallback(strip(login))
 
1394
        d1.addCallbacks(strip(append), self._ebGeneral)
 
1395
        d1.addCallbacks(self._cbStopClient, self._ebGeneral)
 
1396
        d2 = self.loopback()
 
1397
        d = defer.gatherResults([d1, d2])
 
1398
        return d.addCallback(self._cbTestPartialAppend, infile)
 
1399
 
 
1400
    def _cbTestPartialAppend(self, ignored, infile):
 
1401
        mb = SimpleServer.theAccount.mailboxes['PARTIAL/SUBTHING']
 
1402
        self.assertEquals(1, len(mb.messages))
 
1403
        self.assertEquals(
 
1404
            (['\\SEEN'], 'Right now', 0),
 
1405
            mb.messages[0][1:]
 
1406
        )
 
1407
        self.assertEquals(open(infile).read(), mb.messages[0][0].getvalue())
 
1408
 
 
1409
    def testCheck(self):
 
1410
        SimpleServer.theAccount.addMailbox('root/subthing')
 
1411
        def login():
 
1412
            return self.client.login('testuser', 'password-test')
 
1413
        def select():
 
1414
            return self.client.select('root/subthing')
 
1415
        def check():
 
1416
            return self.client.check()
 
1417
 
 
1418
        d = self.connected.addCallback(strip(login))
 
1419
        d.addCallbacks(strip(select), self._ebGeneral)
 
1420
        d.addCallbacks(strip(check), self._ebGeneral)
 
1421
        d.addCallbacks(self._cbStopClient, self._ebGeneral)
 
1422
        return self.loopback()
 
1423
 
 
1424
        # Okay, that was fun
 
1425
 
 
1426
    def testClose(self):
 
1427
        m = SimpleMailbox()
 
1428
        m.messages = [
 
1429
            ('Message 1', ('\\Deleted', 'AnotherFlag'), None, 0),
 
1430
            ('Message 2', ('AnotherFlag',), None, 1),
 
1431
            ('Message 3', ('\\Deleted',), None, 2),
 
1432
        ]
 
1433
        SimpleServer.theAccount.addMailbox('mailbox', m)
 
1434
        def login():
 
1435
            return self.client.login('testuser', 'password-test')
 
1436
        def select():
 
1437
            return self.client.select('mailbox')
 
1438
        def close():
 
1439
            return self.client.close()
 
1440
 
 
1441
        d = self.connected.addCallback(strip(login))
 
1442
        d.addCallbacks(strip(select), self._ebGeneral)
 
1443
        d.addCallbacks(strip(close), self._ebGeneral)
 
1444
        d.addCallbacks(self._cbStopClient, self._ebGeneral)
 
1445
        d2 = self.loopback()
 
1446
        return defer.gatherResults([d, d2]).addCallback(self._cbTestClose, m)
 
1447
 
 
1448
    def _cbTestClose(self, ignored, m):
 
1449
        self.assertEquals(len(m.messages), 1)
 
1450
        self.assertEquals(m.messages[0], ('Message 2', ('AnotherFlag',), None, 1))
 
1451
        self.failUnless(m.closed)
 
1452
 
 
1453
    def testExpunge(self):
 
1454
        m = SimpleMailbox()
 
1455
        m.messages = [
 
1456
            ('Message 1', ('\\Deleted', 'AnotherFlag'), None, 0),
 
1457
            ('Message 2', ('AnotherFlag',), None, 1),
 
1458
            ('Message 3', ('\\Deleted',), None, 2),
 
1459
        ]
 
1460
        SimpleServer.theAccount.addMailbox('mailbox', m)
 
1461
        def login():
 
1462
            return self.client.login('testuser', 'password-test')
 
1463
        def select():
 
1464
            return self.client.select('mailbox')
 
1465
        def expunge():
 
1466
            return self.client.expunge()
 
1467
        def expunged(results):
 
1468
            self.failIf(self.server.mbox is None)
 
1469
            self.results = results
 
1470
 
 
1471
        self.results = None
 
1472
        d1 = self.connected.addCallback(strip(login))
 
1473
        d1.addCallbacks(strip(select), self._ebGeneral)
 
1474
        d1.addCallbacks(strip(expunge), self._ebGeneral)
 
1475
        d1.addCallbacks(expunged, self._ebGeneral)
 
1476
        d1.addCallbacks(self._cbStopClient, self._ebGeneral)
 
1477
        d2 = self.loopback()
 
1478
        d = defer.gatherResults([d1, d2])
 
1479
        return d.addCallback(self._cbTestExpunge, m)
 
1480
 
 
1481
    def _cbTestExpunge(self, ignored, m):
 
1482
        self.assertEquals(len(m.messages), 1)
 
1483
        self.assertEquals(m.messages[0], ('Message 2', ('AnotherFlag',), None, 1))
 
1484
 
 
1485
        self.assertEquals(self.results, [0, 2])
 
1486
 
 
1487
class TestRealm:
 
1488
    theAccount = None
 
1489
 
 
1490
    def requestAvatar(self, avatarId, mind, *interfaces):
 
1491
        return imap4.IAccount, self.theAccount, lambda: None
 
1492
 
 
1493
class TestChecker:
 
1494
    credentialInterfaces = (cred.credentials.IUsernameHashedPassword, cred.credentials.IUsernamePassword)
 
1495
 
 
1496
    users = {
 
1497
        'testuser': 'secret'
 
1498
    }
 
1499
 
 
1500
    def requestAvatarId(self, credentials):
 
1501
        if credentials.username in self.users:
 
1502
            return defer.maybeDeferred(
 
1503
                credentials.checkPassword, self.users[credentials.username]
 
1504
        ).addCallback(self._cbCheck, credentials.username)
 
1505
 
 
1506
    def _cbCheck(self, result, username):
 
1507
        if result:
 
1508
            return username
 
1509
        raise cred.error.UnauthorizedLogin()
 
1510
 
 
1511
class AuthenticatorTestCase(IMAP4HelperMixin, unittest.TestCase):
 
1512
    def setUp(self):
 
1513
        IMAP4HelperMixin.setUp(self)
 
1514
 
 
1515
        realm = TestRealm()
 
1516
        realm.theAccount = Account('testuser')
 
1517
        portal = cred.portal.Portal(realm)
 
1518
        portal.registerChecker(TestChecker())
 
1519
        self.server.portal = portal
 
1520
 
 
1521
        self.authenticated = 0
 
1522
        self.account = realm.theAccount
 
1523
 
 
1524
    def testCramMD5(self):
 
1525
        self.server.challengers['CRAM-MD5'] = cred.credentials.CramMD5Credentials
 
1526
        cAuth = imap4.CramMD5ClientAuthenticator('testuser')
 
1527
        self.client.registerAuthenticator(cAuth)
 
1528
 
 
1529
        def auth():
 
1530
            return self.client.authenticate('secret')
 
1531
        def authed():
 
1532
            self.authenticated = 1
 
1533
 
 
1534
        d1 = self.connected.addCallback(strip(auth))
 
1535
        d1.addCallbacks(strip(authed), self._ebGeneral)
 
1536
        d1.addCallbacks(self._cbStopClient, self._ebGeneral)
 
1537
        d2 = self.loopback()
 
1538
        d = defer.gatherResults([d1, d2])
 
1539
        return d.addCallback(self._cbTestCramMD5)
 
1540
 
 
1541
    def _cbTestCramMD5(self, ignored):
 
1542
        self.assertEquals(self.authenticated, 1)
 
1543
        self.assertEquals(self.server.account, self.account)
 
1544
 
 
1545
    def testFailedCramMD5(self):
 
1546
        self.server.challengers['CRAM-MD5'] = cred.credentials.CramMD5Credentials
 
1547
        cAuth = imap4.CramMD5ClientAuthenticator('testuser')
 
1548
        self.client.registerAuthenticator(cAuth)
 
1549
 
 
1550
        def misauth():
 
1551
            return self.client.authenticate('not the secret')
 
1552
        def authed():
 
1553
            self.authenticated = 1
 
1554
        def misauthed():
 
1555
            self.authenticated = -1
 
1556
 
 
1557
        d1 = self.connected.addCallback(strip(misauth))
 
1558
        d1.addCallbacks(strip(authed), strip(misauthed))
 
1559
        d1.addCallbacks(self._cbStopClient, self._ebGeneral)
 
1560
        d = defer.gatherResults([self.loopback(), d1])
 
1561
        return d.addCallback(self._cbTestFailedCramMD5)
 
1562
 
 
1563
    def _cbTestFailedCramMD5(self, ignored):
 
1564
        self.assertEquals(self.authenticated, -1)
 
1565
        self.assertEquals(self.server.account, None)
 
1566
 
 
1567
    def testLOGIN(self):
 
1568
        self.server.challengers['LOGIN'] = imap4.LOGINCredentials
 
1569
        cAuth = imap4.LOGINAuthenticator('testuser')
 
1570
        self.client.registerAuthenticator(cAuth)
 
1571
 
 
1572
        def auth():
 
1573
            return self.client.authenticate('secret')
 
1574
        def authed():
 
1575
            self.authenticated = 1
 
1576
 
 
1577
        d1 = self.connected.addCallback(strip(auth))
 
1578
        d1.addCallbacks(strip(authed), self._ebGeneral)
 
1579
        d1.addCallbacks(self._cbStopClient, self._ebGeneral)
 
1580
        d = defer.gatherResults([self.loopback(), d1])
 
1581
        return d.addCallback(self._cbTestLOGIN)
 
1582
 
 
1583
    def _cbTestLOGIN(self, ignored):
 
1584
        self.assertEquals(self.authenticated, 1)
 
1585
        self.assertEquals(self.server.account, self.account)
 
1586
 
 
1587
    def testFailedLOGIN(self):
 
1588
        self.server.challengers['LOGIN'] = imap4.LOGINCredentials
 
1589
        cAuth = imap4.LOGINAuthenticator('testuser')
 
1590
        self.client.registerAuthenticator(cAuth)
 
1591
 
 
1592
        def misauth():
 
1593
            return self.client.authenticate('not the secret')
 
1594
        def authed():
 
1595
            self.authenticated = 1
 
1596
        def misauthed():
 
1597
            self.authenticated = -1
 
1598
 
 
1599
        d1 = self.connected.addCallback(strip(misauth))
 
1600
        d1.addCallbacks(strip(authed), strip(misauthed))
 
1601
        d1.addCallbacks(self._cbStopClient, self._ebGeneral)
 
1602
        d = defer.gatherResults([self.loopback(), d1])
 
1603
        return d.addCallback(self._cbTestFailedLOGIN)
 
1604
 
 
1605
    def _cbTestFailedLOGIN(self, ignored):
 
1606
        self.assertEquals(self.authenticated, -1)
 
1607
        self.assertEquals(self.server.account, None)
 
1608
 
 
1609
    def testPLAIN(self):
 
1610
        self.server.challengers['PLAIN'] = imap4.PLAINCredentials
 
1611
        cAuth = imap4.PLAINAuthenticator('testuser')
 
1612
        self.client.registerAuthenticator(cAuth)
 
1613
 
 
1614
        def auth():
 
1615
            return self.client.authenticate('secret')
 
1616
        def authed():
 
1617
            self.authenticated = 1
 
1618
 
 
1619
        d1 = self.connected.addCallback(strip(auth))
 
1620
        d1.addCallbacks(strip(authed), self._ebGeneral)
 
1621
        d1.addCallbacks(self._cbStopClient, self._ebGeneral)
 
1622
        d = defer.gatherResults([self.loopback(), d1])
 
1623
        return d.addCallback(self._cbTestPLAIN)
 
1624
 
 
1625
    def _cbTestPLAIN(self, ignored):
 
1626
        self.assertEquals(self.authenticated, 1)
 
1627
        self.assertEquals(self.server.account, self.account)
 
1628
 
 
1629
    def testFailedPLAIN(self):
 
1630
        self.server.challengers['PLAIN'] = imap4.PLAINCredentials
 
1631
        cAuth = imap4.PLAINAuthenticator('testuser')
 
1632
        self.client.registerAuthenticator(cAuth)
 
1633
 
 
1634
        def misauth():
 
1635
            return self.client.authenticate('not the secret')
 
1636
        def authed():
 
1637
            self.authenticated = 1
 
1638
        def misauthed():
 
1639
            self.authenticated = -1
 
1640
 
 
1641
        d1 = self.connected.addCallback(strip(misauth))
 
1642
        d1.addCallbacks(strip(authed), strip(misauthed))
 
1643
        d1.addCallbacks(self._cbStopClient, self._ebGeneral)
 
1644
        d = defer.gatherResults([self.loopback(), d1])
 
1645
        return d.addCallback(self._cbTestFailedPLAIN)
 
1646
 
 
1647
    def _cbTestFailedPLAIN(self, ignored):
 
1648
        self.assertEquals(self.authenticated, -1)
 
1649
        self.assertEquals(self.server.account, None)
 
1650
 
 
1651
 
 
1652
class UnsolicitedResponseTestCase(IMAP4HelperMixin, unittest.TestCase):
 
1653
    def testReadWrite(self):
 
1654
        def login():
 
1655
            return self.client.login('testuser', 'password-test')
 
1656
        def loggedIn():
 
1657
            self.server.modeChanged(1)
 
1658
 
 
1659
        d1 = self.connected.addCallback(strip(login))
 
1660
        d1.addCallback(strip(loggedIn)).addErrback(self._ebGeneral)
 
1661
        d = defer.gatherResults([self.loopback(), d1])
 
1662
        return d.addCallback(self._cbTestReadWrite)
 
1663
 
 
1664
    def _cbTestReadWrite(self, ignored):
 
1665
        E = self.client.events
 
1666
        self.assertEquals(E, [['modeChanged', 1]])
 
1667
 
 
1668
    def testReadOnly(self):
 
1669
        def login():
 
1670
            return self.client.login('testuser', 'password-test')
 
1671
        def loggedIn():
 
1672
            self.server.modeChanged(0)
 
1673
 
 
1674
        d1 = self.connected.addCallback(strip(login))
 
1675
        d1.addCallback(strip(loggedIn)).addErrback(self._ebGeneral)
 
1676
        d = defer.gatherResults([self.loopback(), d1])
 
1677
        return d.addCallback(self._cbTestReadOnly)
 
1678
 
 
1679
    def _cbTestReadOnly(self, ignored):
 
1680
        E = self.client.events
 
1681
        self.assertEquals(E, [['modeChanged', 0]])
 
1682
 
 
1683
    def testFlagChange(self):
 
1684
        flags = {
 
1685
            1: ['\\Answered', '\\Deleted'],
 
1686
            5: [],
 
1687
            10: ['\\Recent']
 
1688
        }
 
1689
        def login():
 
1690
            return self.client.login('testuser', 'password-test')
 
1691
        def loggedIn():
 
1692
            self.server.flagsChanged(flags)
 
1693
 
 
1694
        d1 = self.connected.addCallback(strip(login))
 
1695
        d1.addCallback(strip(loggedIn)).addErrback(self._ebGeneral)
 
1696
        d = defer.gatherResults([self.loopback(), d1])
 
1697
        return d.addCallback(self._cbTestFlagChange, flags)
 
1698
 
 
1699
    def _cbTestFlagChange(self, ignored, flags):
 
1700
        E = self.client.events
 
1701
        expect = [['flagsChanged', {x[0]: x[1]}] for x in flags.items()]
 
1702
        E.sort()
 
1703
        expect.sort()
 
1704
        self.assertEquals(E, expect)
 
1705
 
 
1706
    def testNewMessages(self):
 
1707
        def login():
 
1708
            return self.client.login('testuser', 'password-test')
 
1709
        def loggedIn():
 
1710
            self.server.newMessages(10, None)
 
1711
 
 
1712
        d1 = self.connected.addCallback(strip(login))
 
1713
        d1.addCallback(strip(loggedIn)).addErrback(self._ebGeneral)
 
1714
        d = defer.gatherResults([self.loopback(), d1])
 
1715
        return d.addCallback(self._cbTestNewMessages)
 
1716
 
 
1717
    def _cbTestNewMessages(self, ignored):
 
1718
        E = self.client.events
 
1719
        self.assertEquals(E, [['newMessages', 10, None]])
 
1720
 
 
1721
    def testNewRecentMessages(self):
 
1722
        def login():
 
1723
            return self.client.login('testuser', 'password-test')
 
1724
        def loggedIn():
 
1725
            self.server.newMessages(None, 10)
 
1726
 
 
1727
        d1 = self.connected.addCallback(strip(login))
 
1728
        d1.addCallback(strip(loggedIn)).addErrback(self._ebGeneral)
 
1729
        d = defer.gatherResults([self.loopback(), d1])
 
1730
        return d.addCallback(self._cbTestNewRecentMessages)
 
1731
 
 
1732
    def _cbTestNewRecentMessages(self, ignored):
 
1733
        E = self.client.events
 
1734
        self.assertEquals(E, [['newMessages', None, 10]])
 
1735
 
 
1736
    def testNewMessagesAndRecent(self):
 
1737
        def login():
 
1738
            return self.client.login('testuser', 'password-test')
 
1739
        def loggedIn():
 
1740
            self.server.newMessages(20, 10)
 
1741
 
 
1742
        d1 = self.connected.addCallback(strip(login))
 
1743
        d1.addCallback(strip(loggedIn)).addErrback(self._ebGeneral)
 
1744
        d = defer.gatherResults([self.loopback(), d1])
 
1745
        return d.addCallback(self._cbTestNewMessagesAndRecent)
 
1746
 
 
1747
    def _cbTestNewMessagesAndRecent(self, ignored):
 
1748
        E = self.client.events
 
1749
        self.assertEquals(E, [['newMessages', 20, None], ['newMessages', None, 10]])
 
1750
 
 
1751
class HandCraftedTestCase(IMAP4HelperMixin, unittest.TestCase):
 
1752
    def testTrailingLiteral(self):
 
1753
        transport = StringTransport()
 
1754
        c = imap4.IMAP4Client()
 
1755
        c.makeConnection(transport)
 
1756
        c.lineReceived('* OK [IMAP4rev1]')
 
1757
 
 
1758
        def cbSelect(ignored):
 
1759
            d = c.fetchMessage('1')
 
1760
            c.dataReceived('* 1 FETCH (RFC822 {10}\r\n0123456789\r\n RFC822.SIZE 10)\r\n')
 
1761
            c.dataReceived('0003 OK FETCH\r\n')
 
1762
            return d
 
1763
 
 
1764
        def cbLogin(ignored):
 
1765
            d = c.select('inbox')
 
1766
            c.lineReceived('0002 OK SELECT')
 
1767
            d.addCallback(cbSelect)
 
1768
            return d
 
1769
 
 
1770
        d = c.login('blah', 'blah')
 
1771
        c.dataReceived('0001 OK LOGIN\r\n')
 
1772
        d.addCallback(cbLogin)
 
1773
        return d
 
1774
 
 
1775
    def testPathelogicalScatteringOfLiterals(self):
 
1776
        self.server.checker.addUser('testuser', 'password-test')
 
1777
        transport = StringTransport()
 
1778
        self.server.makeConnection(transport)
 
1779
 
 
1780
        transport.clear()
 
1781
        self.server.dataReceived("01 LOGIN {8}\r\n")
 
1782
        self.assertEquals(transport.value(), "+ Ready for 8 octets of text\r\n")
 
1783
 
 
1784
        transport.clear()
 
1785
        self.server.dataReceived("testuser {13}\r\n")
 
1786
        self.assertEquals(transport.value(), "+ Ready for 13 octets of text\r\n")
 
1787
 
 
1788
        transport.clear()
 
1789
        self.server.dataReceived("password-test\r\n")
 
1790
        self.assertEquals(transport.value(), "01 OK LOGIN succeeded\r\n")
 
1791
        self.assertEquals(self.server.state, 'auth')
 
1792
 
 
1793
        self.server.connectionLost(error.ConnectionDone("Connection done."))
 
1794
 
 
1795
    def testUnsolicitedResponseMixedWithSolicitedResponse(self):
 
1796
 
 
1797
        class StillSimplerClient(imap4.IMAP4Client):
 
1798
            events = []
 
1799
            def flagsChanged(self, newFlags):
 
1800
                self.events.append(['flagsChanged', newFlags])
 
1801
 
 
1802
        transport = StringTransport()
 
1803
        c = StillSimplerClient()
 
1804
        c.makeConnection(transport)
 
1805
        c.lineReceived('* OK [IMAP4rev1]')
 
1806
 
 
1807
        def login():
 
1808
            d = c.login('blah', 'blah')
 
1809
            c.dataReceived('0001 OK LOGIN\r\n')
 
1810
            return d
 
1811
        def select():
 
1812
            d = c.select('inbox')
 
1813
            c.lineReceived('0002 OK SELECT')
 
1814
            return d
 
1815
        def fetch():
 
1816
            d = c.fetchSpecific('1:*',
 
1817
                headerType='HEADER.FIELDS',
 
1818
                headerArgs=['SUBJECT'])
 
1819
            c.dataReceived('* 1 FETCH (BODY[HEADER.FIELDS ("SUBJECT")] {38}\r\n')
 
1820
            c.dataReceived('Subject: Suprise for your woman...\r\n')
 
1821
            c.dataReceived('\r\n')
 
1822
            c.dataReceived(')\r\n')
 
1823
            c.dataReceived('* 1 FETCH (FLAGS (\Seen))\r\n')
 
1824
            c.dataReceived('* 2 FETCH (BODY[HEADER.FIELDS ("SUBJECT")] {75}\r\n')
 
1825
            c.dataReceived('Subject: What you been doing. Order your meds here . ,. handcuff madsen\r\n')
 
1826
            c.dataReceived('\r\n')
 
1827
            c.dataReceived(')\r\n')
 
1828
            c.dataReceived('0003 OK FETCH completed\r\n')
 
1829
            return d
 
1830
        def test(res):
 
1831
            self.assertEquals(res, {
 
1832
                1: [['BODY', ['HEADER.FIELDS', ['SUBJECT']],
 
1833
                    'Subject: Suprise for your woman...\r\n\r\n']],
 
1834
                2: [['BODY', ['HEADER.FIELDS', ['SUBJECT']],
 
1835
                    'Subject: What you been doing. Order your meds here . ,. handcuff madsen\r\n\r\n']]
 
1836
            })
 
1837
 
 
1838
            self.assertEquals(c.events, [['flagsChanged', {1: ['\\Seen']}]])
 
1839
 
 
1840
        return login(
 
1841
            ).addCallback(strip(select)
 
1842
            ).addCallback(strip(fetch)
 
1843
            ).addCallback(test)
 
1844
 
 
1845
class FakeyServer(imap4.IMAP4Server):
 
1846
    state = 'select'
 
1847
    timeout = None
 
1848
 
 
1849
    def sendServerGreeting(self):
 
1850
        pass
 
1851
 
 
1852
class FakeyMessage:
 
1853
    implements(imap4.IMessage)
 
1854
 
 
1855
    def __init__(self, headers, flags, date, body, uid, subpart):
 
1856
        self.headers = headers
 
1857
        self.flags = flags
 
1858
        self.body = StringIO(body)
 
1859
        self.size = len(body)
 
1860
        self.date = date
 
1861
        self.uid = uid
 
1862
        self.subpart = subpart
 
1863
 
 
1864
    def getHeaders(self, negate, *names):
 
1865
        self.got_headers = negate, names
 
1866
        return self.headers
 
1867
 
 
1868
    def getFlags(self):
 
1869
        return self.flags
 
1870
 
 
1871
    def getInternalDate(self):
 
1872
        return self.date
 
1873
 
 
1874
    def getBodyFile(self):
 
1875
        return self.body
 
1876
 
 
1877
    def getSize(self):
 
1878
        return self.size
 
1879
 
 
1880
    def getUID(self):
 
1881
        return self.uid
 
1882
 
 
1883
    def isMultipart(self):
 
1884
        return self.subpart is not None
 
1885
 
 
1886
    def getSubPart(self, part):
 
1887
        self.got_subpart = part
 
1888
        return self.subpart[part]
 
1889
 
 
1890
class NewStoreTestCase(unittest.TestCase, IMAP4HelperMixin):
 
1891
    result = None
 
1892
    storeArgs = None
 
1893
 
 
1894
    def setUp(self):
 
1895
        self.received_messages = self.received_uid = None
 
1896
 
 
1897
        self.server = imap4.IMAP4Server()
 
1898
        self.server.state = 'select'
 
1899
        self.server.mbox = self
 
1900
        self.connected = defer.Deferred()
 
1901
        self.client = SimpleClient(self.connected)
 
1902
 
 
1903
    def addListener(self, x):
 
1904
        pass
 
1905
    def removeListener(self, x):
 
1906
        pass
 
1907
 
 
1908
    def store(self, *args, **kw):
 
1909
        self.storeArgs = args, kw
 
1910
        return self.response
 
1911
 
 
1912
    def _storeWork(self):
 
1913
        def connected():
 
1914
            return self.function(self.messages, self.flags, self.silent, self.uid)
 
1915
        def result(R):
 
1916
            self.result = R
 
1917
 
 
1918
        self.connected.addCallback(strip(connected)
 
1919
        ).addCallback(result
 
1920
        ).addCallback(self._cbStopClient
 
1921
        ).addErrback(self._ebGeneral)
 
1922
 
 
1923
        def check(ignored):
 
1924
            self.assertEquals(self.result, self.expected)
 
1925
            self.assertEquals(self.storeArgs, self.expectedArgs)
 
1926
        d = loopback.loopbackTCP(self.server, self.client, noisy=False)
 
1927
        d.addCallback(check)
 
1928
        return d
 
1929
 
 
1930
    def testSetFlags(self, uid=0):
 
1931
        self.function = self.client.setFlags
 
1932
        self.messages = '1,5,9'
 
1933
        self.flags = ['\\A', '\\B', 'C']
 
1934
        self.silent = False
 
1935
        self.uid = uid
 
1936
        self.response = {
 
1937
            1: ['\\A', '\\B', 'C'],
 
1938
            5: ['\\A', '\\B', 'C'],
 
1939
            9: ['\\A', '\\B', 'C'],
 
1940
        }
 
1941
        self.expected = {
 
1942
            1: {'FLAGS': ['\\A', '\\B', 'C']},
 
1943
            5: {'FLAGS': ['\\A', '\\B', 'C']},
 
1944
            9: {'FLAGS': ['\\A', '\\B', 'C']},
 
1945
        }
 
1946
        msg = imap4.MessageSet()
 
1947
        msg.add(1)
 
1948
        msg.add(5)
 
1949
        msg.add(9)
 
1950
        self.expectedArgs = ((msg, ['\\A', '\\B', 'C'], 0), {'uid': 0})
 
1951
        return self._storeWork()
 
1952
 
 
1953
 
 
1954
class NewFetchTestCase(unittest.TestCase, IMAP4HelperMixin):
 
1955
    def setUp(self):
 
1956
        self.received_messages = self.received_uid = None
 
1957
        self.result = None
 
1958
 
 
1959
        self.server = imap4.IMAP4Server()
 
1960
        self.server.state = 'select'
 
1961
        self.server.mbox = self
 
1962
        self.connected = defer.Deferred()
 
1963
        self.client = SimpleClient(self.connected)
 
1964
 
 
1965
    def addListener(self, x):
 
1966
        pass
 
1967
    def removeListener(self, x):
 
1968
        pass
 
1969
 
 
1970
    def fetch(self, messages, uid):
 
1971
        self.received_messages = messages
 
1972
        self.received_uid = uid
 
1973
        return iter(zip(range(len(self.msgObjs)), self.msgObjs))
 
1974
 
 
1975
    def _fetchWork(self, uid):
 
1976
        if uid:
 
1977
            for (i, msg) in zip(range(len(self.msgObjs)), self.msgObjs):
 
1978
                self.expected[i]['UID'] = str(msg.getUID())
 
1979
 
 
1980
        def result(R):
 
1981
            self.result = R
 
1982
 
 
1983
        self.connected.addCallback(lambda _: self.function(self.messages, uid)
 
1984
        ).addCallback(result
 
1985
        ).addCallback(self._cbStopClient
 
1986
        ).addErrback(self._ebGeneral)
 
1987
 
 
1988
        d = loopback.loopbackTCP(self.server, self.client, noisy=False)
 
1989
        d.addCallback(lambda x : self.assertEquals(self.result, self.expected))
 
1990
        return d
 
1991
 
 
1992
    def testFetchUID(self):
 
1993
        self.function = lambda m, u: self.client.fetchUID(m)
 
1994
 
 
1995
        self.messages = '7'
 
1996
        self.msgObjs = [
 
1997
            FakeyMessage({}, (), '', '', 12345, None),
 
1998
            FakeyMessage({}, (), '', '', 999, None),
 
1999
            FakeyMessage({}, (), '', '', 10101, None),
 
2000
        ]
 
2001
        self.expected = {
 
2002
            0: {'UID': '12345'},
 
2003
            1: {'UID': '999'},
 
2004
            2: {'UID': '10101'},
 
2005
        }
 
2006
        return self._fetchWork(0)
 
2007
 
 
2008
    def testFetchFlags(self, uid=0):
 
2009
        self.function = self.client.fetchFlags
 
2010
        self.messages = '9'
 
2011
        self.msgObjs = [
 
2012
            FakeyMessage({}, ['FlagA', 'FlagB', '\\FlagC'], '', '', 54321, None),
 
2013
            FakeyMessage({}, ['\\FlagC', 'FlagA', 'FlagB'], '', '', 12345, None),
 
2014
        ]
 
2015
        self.expected = {
 
2016
            0: {'FLAGS': ['FlagA', 'FlagB', '\\FlagC']},
 
2017
            1: {'FLAGS': ['\\FlagC', 'FlagA', 'FlagB']},
 
2018
        }
 
2019
        return self._fetchWork(uid)
 
2020
 
 
2021
    def testFetchFlagsUID(self):
 
2022
        return self.testFetchFlags(1)
 
2023
 
 
2024
    def testFetchInternalDate(self, uid=0):
 
2025
        self.function = self.client.fetchInternalDate
 
2026
        self.messages = '13'
 
2027
        self.msgObjs = [
 
2028
            FakeyMessage({}, (), 'Fri, 02 Nov 2003 21:25:10 GMT', '', 23232, None),
 
2029
            FakeyMessage({}, (), 'Thu, 29 Dec 2013 11:31:52 EST', '', 101, None),
 
2030
            FakeyMessage({}, (), 'Mon, 10 Mar 1992 02:44:30 CST', '', 202, None),
 
2031
            FakeyMessage({}, (), 'Sat, 11 Jan 2000 14:40:24 PST', '', 303, None),
 
2032
        ]
 
2033
        self.expected = {
 
2034
            0: {'INTERNALDATE': '02-Nov-2003 21:25:10 +0000'},
 
2035
            1: {'INTERNALDATE': '29-Dec-2013 11:31:52 -0500'},
 
2036
            2: {'INTERNALDATE': '10-Mar-1992 02:44:30 -0600'},
 
2037
            3: {'INTERNALDATE': '11-Jan-2000 14:40:24 -0800'},
 
2038
        }
 
2039
        return self._fetchWork(uid)
 
2040
 
 
2041
    def testFetchInternalDateUID(self):
 
2042
        return self.testFetchInternalDate(1)
 
2043
 
 
2044
    def testFetchEnvelope(self, uid=0):
 
2045
        self.function = self.client.fetchEnvelope
 
2046
        self.messages = '15'
 
2047
        self.msgObjs = [
 
2048
            FakeyMessage({
 
2049
                'from': 'user@domain', 'to': 'resu@domain',
 
2050
                'date': 'thursday', 'subject': 'it is a message',
 
2051
                'message-id': 'id-id-id-yayaya'}, (), '', '', 65656,
 
2052
                None),
 
2053
        ]
 
2054
        self.expected = {
 
2055
            0: {'ENVELOPE':
 
2056
                ['thursday', 'it is a message',
 
2057
                    [[None, None, 'user', 'domain']],
 
2058
                    [[None, None, 'user', 'domain']],
 
2059
                    [[None, None, 'user', 'domain']],
 
2060
                    [[None, None, 'resu', 'domain']],
 
2061
                    None, None, None, 'id-id-id-yayaya']
 
2062
            }
 
2063
        }
 
2064
        return self._fetchWork(uid)
 
2065
 
 
2066
    def testFetchEnvelopeUID(self):
 
2067
        return self.testFetchEnvelope(1)
 
2068
 
 
2069
    def testFetchBodyStructure(self, uid=0):
 
2070
        self.function = self.client.fetchBodyStructure
 
2071
        self.messages = '3:9,10:*'
 
2072
        self.msgObjs = [FakeyMessage({
 
2073
                'content-type': 'text/plain; name=thing; key="value"',
 
2074
                'content-id': 'this-is-the-content-id',
 
2075
                'content-description': 'describing-the-content-goes-here!',
 
2076
                'content-transfer-encoding': '8BIT',
 
2077
            }, (), '', 'Body\nText\nGoes\nHere\n', 919293, None)]
 
2078
        self.expected = {0: {'BODYSTRUCTURE': [
 
2079
            'text', 'plain', [['name', 'thing'], ['key', 'value']],
 
2080
            'this-is-the-content-id', 'describing-the-content-goes-here!',
 
2081
            '8BIT', '20', '4', None, None, None]}}
 
2082
        return self._fetchWork(uid)
 
2083
 
 
2084
    def testFetchBodyStructureUID(self):
 
2085
        return self.testFetchBodyStructure(1)
 
2086
    
 
2087
    def testFetchSimplifiedBody(self, uid=0):
 
2088
        self.function = self.client.fetchSimplifiedBody
 
2089
        self.messages = '21'
 
2090
        self.msgObjs = [FakeyMessage({}, (), '', 'Yea whatever', 91825,
 
2091
            [FakeyMessage({'content-type': 'image/jpg'}, (), '',
 
2092
                'Body Body Body', None, None
 
2093
            )]
 
2094
        )]
 
2095
        self.expected = {0:
 
2096
            {'BODY':
 
2097
                [None, None, [], None, None, None,
 
2098
                    '12'
 
2099
                ]
 
2100
            }
 
2101
        }
 
2102
 
 
2103
        return self._fetchWork(uid)
 
2104
 
 
2105
    def testFetchSimplifiedBodyUID(self):
 
2106
        return self.testFetchSimplifiedBody(1)
 
2107
 
 
2108
    def testFetchSimplifiedBodyText(self, uid=0):
 
2109
        self.function = self.client.fetchSimplifiedBody
 
2110
        self.messages = '21'
 
2111
        self.msgObjs = [FakeyMessage({'content-type': 'text/plain'},
 
2112
            (), '', 'Yea whatever', 91825, None)]
 
2113
        self.expected = {0:
 
2114
            {'BODY':
 
2115
                ['text', 'plain', [], None, None, None,
 
2116
                    '12', '1'
 
2117
                ]
 
2118
            }
 
2119
        }
 
2120
 
 
2121
        return self._fetchWork(uid)
 
2122
 
 
2123
    def testFetchSimplifiedBodyTextUID(self):
 
2124
        return self.testFetchSimplifiedBodyText(1)
 
2125
 
 
2126
    def testFetchSimplifiedBodyRFC822(self, uid=0):
 
2127
        self.function = self.client.fetchSimplifiedBody
 
2128
        self.messages = '21'
 
2129
        self.msgObjs = [FakeyMessage({'content-type': 'message/rfc822'},
 
2130
            (), '', 'Yea whatever', 91825,
 
2131
            [FakeyMessage({'content-type': 'image/jpg'}, (), '',
 
2132
                'Body Body Body', None, None
 
2133
            )]
 
2134
        )]
 
2135
        self.expected = {0:
 
2136
            {'BODY':
 
2137
                ['message', 'rfc822', [], None, None, None,
 
2138
                    '12', [None, None, [[None, None, None]],
 
2139
                    [[None, None, None]], None, None, None,
 
2140
                    None, None, None], ['image', 'jpg', [],
 
2141
                    None, None, None, '14'], '1'
 
2142
                ]
 
2143
            }
 
2144
        }
 
2145
 
 
2146
        return self._fetchWork(uid)
 
2147
 
 
2148
    def testFetchSimplifiedBodyRFC822UID(self):
 
2149
        return self.testFetchSimplifiedBodyRFC822(1)
 
2150
 
 
2151
    def testFetchMessage(self, uid=0):
 
2152
        self.function = self.client.fetchMessage
 
2153
        self.messages = '1,3,7,10101'
 
2154
        self.msgObjs = [
 
2155
            FakeyMessage({'Header': 'Value'}, (), '', 'BODY TEXT\r\n', 91, None),
 
2156
        ]
 
2157
        self.expected = {
 
2158
            0: {'RFC822': 'Header: Value\r\n\r\nBODY TEXT\r\n'}
 
2159
        }
 
2160
        return self._fetchWork(uid)
 
2161
 
 
2162
    def testFetchMessageUID(self):
 
2163
        return self.testFetchMessage(1)
 
2164
 
 
2165
    def testFetchHeaders(self, uid=0):
 
2166
        self.function = self.client.fetchHeaders
 
2167
        self.messages = '9,6,2'
 
2168
        self.msgObjs = [
 
2169
            FakeyMessage({'H1': 'V1', 'H2': 'V2'}, (), '', '', 99, None),
 
2170
        ]
 
2171
        self.expected = {
 
2172
            0: {'RFC822.HEADER': imap4._formatHeaders({'H1': 'V1', 'H2': 'V2'})},
 
2173
        }
 
2174
        return self._fetchWork(uid)
 
2175
 
 
2176
    def testFetchHeadersUID(self):
 
2177
        return self.testFetchHeaders(1)
 
2178
 
 
2179
    def testFetchBody(self, uid=0):
 
2180
        self.function = self.client.fetchBody
 
2181
        self.messages = '1,2,3,4,5,6,7'
 
2182
        self.msgObjs = [
 
2183
            FakeyMessage({'Header': 'Value'}, (), '', 'Body goes here\r\n', 171, None),
 
2184
        ]
 
2185
        self.expected = {
 
2186
            0: {'RFC822.TEXT': 'Body goes here\r\n'},
 
2187
        }
 
2188
        return self._fetchWork(uid)
 
2189
 
 
2190
    def testFetchBodyUID(self):
 
2191
        return self.testFetchBody(1)
 
2192
 
 
2193
    def testFetchBodyParts(self):
 
2194
        self.function = self.client.fetchBodyParts
 
2195
        self.messages = '1'
 
2196
        parts = [1, 2]
 
2197
        outerBody = ''
 
2198
        innerBody1 = 'Contained body message text.  Squarge.'
 
2199
        innerBody2 = 'Secondary <i>message</i> text of squarge body.'
 
2200
        headers = util.OrderedDict()
 
2201
        headers['from'] = 'sender@host'
 
2202
        headers['to'] = 'recipient@domain'
 
2203
        headers['subject'] = 'booga booga boo'
 
2204
        headers['content-type'] = 'multipart/alternative; boundary="xyz"'
 
2205
        innerHeaders = util.OrderedDict()
 
2206
        innerHeaders['subject'] = 'this is subject text'
 
2207
        innerHeaders['content-type'] = 'text/plain'
 
2208
        innerHeaders2 = util.OrderedDict()
 
2209
        innerHeaders2['subject'] = '<b>this is subject</b>'
 
2210
        innerHeaders2['content-type'] = 'text/html'
 
2211
        self.msgObjs = [FakeyMessage(
 
2212
            headers, (), None, outerBody, 123,
 
2213
            [FakeyMessage(innerHeaders, (), None, innerBody1, None, None),
 
2214
             FakeyMessage(innerHeaders2, (), None, innerBody2, None, None)])]
 
2215
        self.expected = {
 
2216
            0: {'1': innerBody1, '2': innerBody2},
 
2217
        }
 
2218
 
 
2219
        def result(R):
 
2220
            self.result = R
 
2221
 
 
2222
        self.connected.addCallback(lambda _: self.function(self.messages, parts))
 
2223
        self.connected.addCallback(result)
 
2224
        self.connected.addCallback(self._cbStopClient)
 
2225
        self.connected.addErrback(self._ebGeneral)
 
2226
 
 
2227
        d = loopback.loopbackTCP(self.server, self.client, noisy=False)
 
2228
        d.addCallback(lambda ign: self.assertEquals(self.result, self.expected))
 
2229
        return d
 
2230
 
 
2231
 
 
2232
    def test_fetchBodyPartOfNonMultipart(self):
 
2233
        """
 
2234
        Single-part messages have an implicit first part which clients
 
2235
        should be able to retrieve explicitly.  Test that a client
 
2236
        requesting part 1 of a text/plain message receives the body of the
 
2237
        text/plain part.
 
2238
        """
 
2239
        self.function = self.client.fetchBodyParts
 
2240
        self.messages = '1'
 
2241
        parts = [1]
 
2242
        outerBody = 'DA body'
 
2243
        headers = util.OrderedDict()
 
2244
        headers['from'] = 'sender@host'
 
2245
        headers['to'] = 'recipient@domain'
 
2246
        headers['subject'] = 'booga booga boo'
 
2247
        headers['content-type'] = 'text/plain'
 
2248
        self.msgObjs = [FakeyMessage(
 
2249
            headers, (), None, outerBody, 123, None)]
 
2250
 
 
2251
        self.expected = {
 
2252
            0: {'1': outerBody},
 
2253
        }
 
2254
 
 
2255
        def result(R):
 
2256
            self.result = R
 
2257
 
 
2258
        self.connected.addCallback(lambda _: self.function(self.messages, parts))
 
2259
        self.connected.addCallback(result)
 
2260
        self.connected.addCallback(self._cbStopClient)
 
2261
        self.connected.addErrback(self._ebGeneral)
 
2262
 
 
2263
        d = loopback.loopbackTCP(self.server, self.client, noisy=False)
 
2264
        d.addCallback(lambda ign: self.assertEquals(self.result, self.expected))
 
2265
        return d
 
2266
 
 
2267
 
 
2268
    def testFetchSize(self, uid=0):
 
2269
        self.function = self.client.fetchSize
 
2270
        self.messages = '1:100,2:*'
 
2271
        self.msgObjs = [
 
2272
            FakeyMessage({}, (), '', 'x' * 20, 123, None),
 
2273
        ]
 
2274
        self.expected = {
 
2275
            0: {'RFC822.SIZE': '20'},
 
2276
        }
 
2277
        return self._fetchWork(uid)
 
2278
 
 
2279
    def testFetchSizeUID(self):
 
2280
        return self.testFetchSize(1)
 
2281
 
 
2282
    def testFetchFull(self, uid=0):
 
2283
        self.function = self.client.fetchFull
 
2284
        self.messages = '1,3'
 
2285
        self.msgObjs = [
 
2286
            FakeyMessage({}, ('\\XYZ', '\\YZX', 'Abc'),
 
2287
                'Sun, 25 Jul 2010 06:20:30 -0400 (EDT)',
 
2288
                'xyz' * 2, 654, None),
 
2289
            FakeyMessage({}, ('\\One', '\\Two', 'Three'),
 
2290
                'Mon, 14 Apr 2003 19:43:44 -0400',
 
2291
                'abc' * 4, 555, None),
 
2292
        ]
 
2293
        self.expected = {
 
2294
            0: {'FLAGS': ['\\XYZ', '\\YZX', 'Abc'],
 
2295
                'INTERNALDATE': '25-Jul-2010 06:20:30 -0400',
 
2296
                'RFC822.SIZE': '6',
 
2297
                'ENVELOPE': [None, None, [[None, None, None]], [[None, None, None]], None, None, None, None, None, None],
 
2298
                'BODY': [None, None, [], None, None, None, '6']},
 
2299
            1: {'FLAGS': ['\\One', '\\Two', 'Three'],
 
2300
                'INTERNALDATE': '14-Apr-2003 19:43:44 -0400',
 
2301
                'RFC822.SIZE': '12',
 
2302
                'ENVELOPE': [None, None, [[None, None, None]], [[None, None, None]], None, None, None, None, None, None],
 
2303
                'BODY': [None, None, [], None, None, None, '12']},
 
2304
        }
 
2305
        return self._fetchWork(uid)
 
2306
 
 
2307
    def testFetchFullUID(self):
 
2308
        return self.testFetchFull(1)
 
2309
 
 
2310
    def testFetchAll(self, uid=0):
 
2311
        self.function = self.client.fetchAll
 
2312
        self.messages = '1,2:3'
 
2313
        self.msgObjs = [
 
2314
            FakeyMessage({}, (), 'Mon, 14 Apr 2003 19:43:44 +0400',
 
2315
                'Lalala', 10101, None),
 
2316
            FakeyMessage({}, (), 'Tue, 15 Apr 2003 19:43:44 +0200',
 
2317
                'Alalal', 20202, None),
 
2318
        ]
 
2319
        self.expected = {
 
2320
            0: {'ENVELOPE': [None, None, [[None, None, None]], [[None, None, None]], None, None, None, None, None, None],
 
2321
                'RFC822.SIZE': '6',
 
2322
                'INTERNALDATE': '14-Apr-2003 19:43:44 +0400',
 
2323
                'FLAGS': []},
 
2324
            1: {'ENVELOPE': [None, None, [[None, None, None]], [[None, None, None]], None, None, None, None, None, None],
 
2325
                'RFC822.SIZE': '6',
 
2326
                'INTERNALDATE': '15-Apr-2003 19:43:44 +0200',
 
2327
                'FLAGS': []},
 
2328
        }
 
2329
        return self._fetchWork(uid)
 
2330
 
 
2331
    def testFetchAllUID(self):
 
2332
        return self.testFetchAll(1)
 
2333
 
 
2334
    def testFetchFast(self, uid=0):
 
2335
        self.function = self.client.fetchFast
 
2336
        self.messages = '1'
 
2337
        self.msgObjs = [
 
2338
            FakeyMessage({}, ('\\X',), '19 Mar 2003 19:22:21 -0500', '', 9, None),
 
2339
        ]
 
2340
        self.expected = {
 
2341
            0: {'FLAGS': ['\\X'],
 
2342
                'INTERNALDATE': '19-Mar-2003 19:22:21 -0500',
 
2343
                'RFC822.SIZE': '0'},
 
2344
        }
 
2345
        return self._fetchWork(uid)
 
2346
 
 
2347
    def testFetchFastUID(self):
 
2348
        return self.testFetchFast(1)
 
2349
 
 
2350
 
 
2351
class FetchSearchStoreTestCase(unittest.TestCase, IMAP4HelperMixin):
 
2352
    implements(imap4.ISearchableMailbox)
 
2353
 
 
2354
    def setUp(self):
 
2355
        self.expected = self.result = None
 
2356
        self.server_received_query = None
 
2357
        self.server_received_uid = None
 
2358
        self.server_received_parts = None
 
2359
        self.server_received_messages = None
 
2360
 
 
2361
        self.server = imap4.IMAP4Server()
 
2362
        self.server.state = 'select'
 
2363
        self.server.mbox = self
 
2364
        self.connected = defer.Deferred()
 
2365
        self.client = SimpleClient(self.connected)
 
2366
 
 
2367
    def search(self, query, uid):
 
2368
        self.server_received_query = query
 
2369
        self.server_received_uid = uid
 
2370
        return self.expected
 
2371
 
 
2372
    def addListener(self, *a, **kw):
 
2373
        pass
 
2374
    removeListener = addListener
 
2375
 
 
2376
    def _searchWork(self, uid):
 
2377
        def search():
 
2378
            return self.client.search(self.query, uid=uid)
 
2379
        def result(R):
 
2380
            self.result = R
 
2381
 
 
2382
        self.connected.addCallback(strip(search)
 
2383
        ).addCallback(result
 
2384
        ).addCallback(self._cbStopClient
 
2385
        ).addErrback(self._ebGeneral)
 
2386
 
 
2387
        def check(ignored):
 
2388
            # Ensure no short-circuiting wierdness is going on
 
2389
            self.failIf(self.result is self.expected)
 
2390
 
 
2391
            self.assertEquals(self.result, self.expected)
 
2392
            self.assertEquals(self.uid, self.server_received_uid)
 
2393
            self.assertEquals(
 
2394
                imap4.parseNestedParens(self.query),
 
2395
                self.server_received_query
 
2396
            )
 
2397
        d = loopback.loopbackTCP(self.server, self.client, noisy=False)
 
2398
        d.addCallback(check)
 
2399
        return d
 
2400
 
 
2401
    def testSearch(self):
 
2402
        self.query = imap4.Or(
 
2403
            imap4.Query(header=('subject', 'substring')),
 
2404
            imap4.Query(larger=1024, smaller=4096),
 
2405
        )
 
2406
        self.expected = [1, 4, 5, 7]
 
2407
        self.uid = 0
 
2408
        return self._searchWork(0)
 
2409
 
 
2410
    def testUIDSearch(self):
 
2411
        self.query = imap4.Or(
 
2412
            imap4.Query(header=('subject', 'substring')),
 
2413
            imap4.Query(larger=1024, smaller=4096),
 
2414
        )
 
2415
        self.uid = 1
 
2416
        self.expected = [1, 2, 3]
 
2417
        return self._searchWork(1)
 
2418
 
 
2419
    def getUID(self, msg):
 
2420
        try:
 
2421
            return self.expected[msg]['UID']
 
2422
        except (TypeError, IndexError):
 
2423
            return self.expected[msg-1]
 
2424
        except KeyError:
 
2425
            return 42
 
2426
 
 
2427
    def fetch(self, messages, uid):
 
2428
        self.server_received_uid = uid
 
2429
        self.server_received_messages = str(messages)
 
2430
        return self.expected
 
2431
 
 
2432
    def _fetchWork(self, fetch):
 
2433
        def result(R):
 
2434
            self.result = R
 
2435
 
 
2436
        self.connected.addCallback(strip(fetch)
 
2437
        ).addCallback(result
 
2438
        ).addCallback(self._cbStopClient
 
2439
        ).addErrback(self._ebGeneral)
 
2440
 
 
2441
        def check(ignored):
 
2442
            # Ensure no short-circuiting wierdness is going on
 
2443
            self.failIf(self.result is self.expected)
 
2444
 
 
2445
            self.parts and self.parts.sort()
 
2446
            self.server_received_parts and self.server_received_parts.sort()
 
2447
 
 
2448
            if self.uid:
 
2449
                for (k, v) in self.expected.items():
 
2450
                    v['UID'] = str(k)
 
2451
 
 
2452
            self.assertEquals(self.result, self.expected)
 
2453
            self.assertEquals(self.uid, self.server_received_uid)
 
2454
            self.assertEquals(self.parts, self.server_received_parts)
 
2455
            self.assertEquals(imap4.parseIdList(self.messages),
 
2456
                              imap4.parseIdList(self.server_received_messages))
 
2457
 
 
2458
        d = loopback.loopbackTCP(self.server, self.client, noisy=False)
 
2459
        d.addCallback(check)
 
2460
        return d
 
2461
 
 
2462
class FakeMailbox:
 
2463
    def __init__(self):
 
2464
        self.args = []
 
2465
    def addMessage(self, body, flags, date):
 
2466
        self.args.append((body, flags, date))
 
2467
        return defer.succeed(None)
 
2468
 
 
2469
class FeaturefulMessage:
 
2470
    implements(imap4.IMessageFile)
 
2471
 
 
2472
    def getFlags(self):
 
2473
        return 'flags'
 
2474
 
 
2475
    def getInternalDate(self):
 
2476
        return 'internaldate'
 
2477
 
 
2478
    def open(self):
 
2479
        return StringIO("open")
 
2480
 
 
2481
class MessageCopierMailbox:
 
2482
    implements(imap4.IMessageCopier)
 
2483
 
 
2484
    def __init__(self):
 
2485
        self.msgs = []
 
2486
 
 
2487
    def copy(self, msg):
 
2488
        self.msgs.append(msg)
 
2489
        return len(self.msgs)
 
2490
 
 
2491
class CopyWorkerTestCase(unittest.TestCase):
 
2492
    def testFeaturefulMessage(self):
 
2493
        s = imap4.IMAP4Server()
 
2494
 
 
2495
        # Yes.  I am grabbing this uber-non-public method to test it.
 
2496
        # It is complex.  It needs to be tested directly!
 
2497
        # Perhaps it should be refactored, simplified, or split up into
 
2498
        # not-so-private components, but that is a task for another day.
 
2499
 
 
2500
        # Ha ha! Addendum!  Soon it will be split up, and this test will
 
2501
        # be re-written to just use the default adapter for IMailbox to
 
2502
        # IMessageCopier and call .copy on that adapter.
 
2503
        f = s._IMAP4Server__cbCopy
 
2504
 
 
2505
        m = FakeMailbox()
 
2506
        d = f([(i, FeaturefulMessage()) for i in range(1, 11)], 'tag', m)
 
2507
 
 
2508
        def cbCopy(results):
 
2509
            for a in m.args:
 
2510
                self.assertEquals(a[0].read(), "open")
 
2511
                self.assertEquals(a[1], "flags")
 
2512
                self.assertEquals(a[2], "internaldate")
 
2513
 
 
2514
            for (status, result) in results:
 
2515
                self.failUnless(status)
 
2516
                self.assertEquals(result, None)
 
2517
 
 
2518
        return d.addCallback(cbCopy)
 
2519
 
 
2520
 
 
2521
    def testUnfeaturefulMessage(self):
 
2522
        s = imap4.IMAP4Server()
 
2523
 
 
2524
        # See above comment
 
2525
        f = s._IMAP4Server__cbCopy
 
2526
 
 
2527
        m = FakeMailbox()
 
2528
        msgs = [FakeyMessage({'Header-Counter': str(i)}, (), 'Date', 'Body %d' % (i,), i + 10, None) for i in range(1, 11)]
 
2529
        d = f([im for im in zip(range(1, 11), msgs)], 'tag', m)
 
2530
 
 
2531
        def cbCopy(results):
 
2532
            seen = []
 
2533
            for a in m.args:
 
2534
                seen.append(a[0].read())
 
2535
                self.assertEquals(a[1], ())
 
2536
                self.assertEquals(a[2], "Date")
 
2537
 
 
2538
            seen.sort()
 
2539
            exp = ["Header-Counter: %d\r\n\r\nBody %d" % (i, i) for i in range(1, 11)]
 
2540
            exp.sort()
 
2541
            self.assertEquals(seen, exp)
 
2542
 
 
2543
            for (status, result) in results:
 
2544
                self.failUnless(status)
 
2545
                self.assertEquals(result, None)
 
2546
 
 
2547
        return d.addCallback(cbCopy)
 
2548
 
 
2549
    def testMessageCopier(self):
 
2550
        s = imap4.IMAP4Server()
 
2551
 
 
2552
        # See above comment
 
2553
        f = s._IMAP4Server__cbCopy
 
2554
 
 
2555
        m = MessageCopierMailbox()
 
2556
        msgs = [object() for i in range(1, 11)]
 
2557
        d = f([im for im in zip(range(1, 11), msgs)], 'tag', m)
 
2558
 
 
2559
        def cbCopy(results):
 
2560
            self.assertEquals(results, zip([1] * 10, range(1, 11)))
 
2561
            for (orig, new) in zip(msgs, m.msgs):
 
2562
                self.assertIdentical(orig, new)
 
2563
 
 
2564
        return d.addCallback(cbCopy)
 
2565
 
 
2566
 
 
2567
class TLSTestCase(IMAP4HelperMixin, unittest.TestCase):
 
2568
    serverCTX = ServerTLSContext and ServerTLSContext()
 
2569
    clientCTX = ClientTLSContext and ClientTLSContext()
 
2570
 
 
2571
    def loopback(self):
 
2572
        return loopback.loopbackTCP(self.server, self.client, noisy=False)
 
2573
 
 
2574
    def testAPileOfThings(self):
 
2575
        SimpleServer.theAccount.addMailbox('inbox')
 
2576
        called = []
 
2577
        def login():
 
2578
            called.append(None)
 
2579
            return self.client.login('testuser', 'password-test')
 
2580
        def list():
 
2581
            called.append(None)
 
2582
            return self.client.list('inbox', '%')
 
2583
        def status():
 
2584
            called.append(None)
 
2585
            return self.client.status('inbox', 'UIDNEXT')
 
2586
        def examine():
 
2587
            called.append(None)
 
2588
            return self.client.examine('inbox')
 
2589
        def logout():
 
2590
            called.append(None)
 
2591
            return self.client.logout()
 
2592
 
 
2593
        self.client.requireTransportSecurity = True
 
2594
 
 
2595
        methods = [login, list, status, examine, logout]
 
2596
        map(self.connected.addCallback, map(strip, methods))
 
2597
        self.connected.addCallbacks(self._cbStopClient, self._ebGeneral)
 
2598
        def check(ignored):
 
2599
            self.assertEquals(self.server.startedTLS, True)
 
2600
            self.assertEquals(self.client.startedTLS, True)
 
2601
            self.assertEquals(len(called), len(methods))        
 
2602
        d = self.loopback()
 
2603
        d.addCallback(check)
 
2604
        return d
 
2605
 
 
2606
    def testLoginLogin(self):
 
2607
        self.server.checker.addUser('testuser', 'password-test')
 
2608
        success = []
 
2609
        self.client.registerAuthenticator(imap4.LOGINAuthenticator('testuser'))
 
2610
        self.connected.addCallback(
 
2611
                lambda _: self.client.authenticate('password-test')
 
2612
            ).addCallback(
 
2613
                lambda _: self.client.logout()
 
2614
            ).addCallback(success.append
 
2615
            ).addCallback(self._cbStopClient
 
2616
            ).addErrback(self._ebGeneral)
 
2617
 
 
2618
        d = self.loopback()
 
2619
        d.addCallback(lambda x : self.assertEquals(len(success), 1))
 
2620
        return d
 
2621
 
 
2622
    def testStartTLS(self):
 
2623
        success = []
 
2624
        self.connected.addCallback(lambda _: self.client.startTLS())
 
2625
        self.connected.addCallback(lambda _: self.assertNotEquals(-1, self.client.transport.__class__.__name__.find('TLS')))
 
2626
        self.connected.addCallback(self._cbStopClient)
 
2627
        self.connected.addCallback(success.append)
 
2628
        self.connected.addErrback(self._ebGeneral)
 
2629
 
 
2630
        d = self.loopback()
 
2631
        d.addCallback(lambda x : self.failUnless(success))
 
2632
        return d
 
2633
 
 
2634
    def testFailedStartTLS(self):
 
2635
        failure = []
 
2636
        def breakServerTLS(ign):
 
2637
            self.server.canStartTLS = False
 
2638
 
 
2639
        self.connected.addCallback(breakServerTLS)
 
2640
        self.connected.addCallback(lambda ign: self.client.startTLS())
 
2641
        self.connected.addErrback(lambda err: failure.append(err.trap(imap4.IMAP4Exception)))
 
2642
        self.connected.addCallback(self._cbStopClient)
 
2643
        self.connected.addErrback(self._ebGeneral)
 
2644
 
 
2645
        def check(ignored):
 
2646
            self.failUnless(failure)
 
2647
            self.assertIdentical(failure[0], imap4.IMAP4Exception)
 
2648
        return self.loopback().addCallback(check)
 
2649
        
 
2650
class SlowMailbox(SimpleMailbox):
 
2651
    howSlow = 2
 
2652
 
 
2653
    # Not a very nice implementation of fetch(), but it'll
 
2654
    # do for the purposes of testing.
 
2655
    def fetch(self, messages, uid):
 
2656
        d = defer.Deferred()
 
2657
        reactor.callLater(self.howSlow, d.callback, ())
 
2658
        return d
 
2659
 
 
2660
class Timeout(IMAP4HelperMixin, unittest.TestCase):
 
2661
    def testServerTimeout(self):
 
2662
        self.server.timeoutTest = True
 
2663
        self.client.timeout = 5 #seconds
 
2664
        self.selectedArgs = None
 
2665
 
 
2666
        def login():
 
2667
            d = self.client.login('testuser', 'password-test')
 
2668
            d.addErrback(timedOut)
 
2669
            return d
 
2670
 
 
2671
        def timedOut(failure):
 
2672
            self._cbStopClient(None)
 
2673
            failure.trap(error.TimeoutError)
 
2674
 
 
2675
        d = self.connected.addCallback(strip(login))
 
2676
        d.addErrback(self._ebGeneral)
 
2677
        return defer.gatherResults([d, self.loopback()])
 
2678
 
 
2679
    def testLongFetchDoesntTimeout(self):
 
2680
        SimpleServer.theAccount.mailboxFactory = SlowMailbox
 
2681
        SimpleServer.theAccount.addMailbox('mailbox-test')
 
2682
 
 
2683
        self.server.setTimeout(0.1)
 
2684
        self.stillConnected = False
 
2685
 
 
2686
        def login():
 
2687
            return self.client.login('testuser', 'password-test')
 
2688
        def select():
 
2689
            return self.client.select('mailbox-test')
 
2690
        def fetch():
 
2691
            return self.client.fetchUID('1:*')
 
2692
        def stillConnected():
 
2693
            self.stillConnected = not self.server.transport.disconnecting
 
2694
 
 
2695
        d1 = self.connected.addCallback(strip(login))
 
2696
        d1.addCallback(strip(select))
 
2697
        d1.addCallback(strip(fetch))
 
2698
        d1.addCallback(strip(stillConnected))
 
2699
        d1.addCallback(self._cbStopClient)
 
2700
        d1.addErrback(self._ebGeneral)
 
2701
        d = defer.gatherResults([d1, self.loopback()])
 
2702
        return d.addCallback(lambda _: self.failUnless(self.stillConnected))
 
2703
 
 
2704
    def testIdleClientDoesDisconnect(self):
 
2705
        from twisted.test.time_helpers import Clock
 
2706
        c = Clock()
 
2707
        c.install()
 
2708
        try:
 
2709
            # Hook up our server protocol
 
2710
            transport = StringTransportWithDisconnection()
 
2711
            transport.protocol = self.server
 
2712
            self.server.makeConnection(transport)
 
2713
 
 
2714
            # Make sure we can notice when the connection goes away
 
2715
            lost = []
 
2716
            connLost = self.server.connectionLost
 
2717
            self.server.connectionLost = lambda reason: (lost.append(None), connLost(reason))[1]
 
2718
 
 
2719
            # 2/3rds of the idle timeout elapses...
 
2720
            c.pump(reactor, [0.0] + [self.server.timeOut / 3.0] * 2)
 
2721
            self.failIf(lost, lost)
 
2722
 
 
2723
            # Now some more
 
2724
            c.pump(reactor, [0.0, self.server.timeOut / 2.0])
 
2725
            self.failUnless(lost)
 
2726
        finally:
 
2727
            c.uninstall()
 
2728
 
 
2729
 
 
2730
 
 
2731
class Disconnection(unittest.TestCase):
 
2732
    def testClientDisconnectFailsDeferreds(self):
 
2733
        c = imap4.IMAP4Client()
 
2734
        t = StringTransportWithDisconnection()
 
2735
        c.makeConnection(t)
 
2736
        d = self.assertFailure(c.login('testuser', 'example.com'), error.ConnectionDone)
 
2737
        c.connectionLost(error.ConnectionDone("Connection closed"))
 
2738
        return d
 
2739
 
 
2740
 
 
2741
 
 
2742
class SynchronousMailbox(object):
 
2743
    """
 
2744
    Trivial, in-memory mailbox implementation which can produce a message
 
2745
    synchronously.
 
2746
    """
 
2747
    def __init__(self, messages):
 
2748
        self.messages = messages
 
2749
 
 
2750
 
 
2751
    def fetch(self, msgset, uid):
 
2752
        assert not uid, "Cannot handle uid requests."
 
2753
        for msg in msgset:
 
2754
            yield msg, self.messages[msg - 1]
 
2755
 
 
2756
 
 
2757
 
 
2758
class StringTransportConsumer(StringTransport):
 
2759
    producer = None
 
2760
    streaming = None
 
2761
 
 
2762
    def registerProducer(self, producer, streaming):
 
2763
        self.producer = producer
 
2764
        self.streaming = streaming
 
2765
 
 
2766
 
 
2767
 
 
2768
class Pipelining(unittest.TestCase):
 
2769
    """
 
2770
    Tests for various aspects of the IMAP4 server's pipelining support.
 
2771
    """
 
2772
    messages = [
 
2773
        FakeyMessage({}, [], '', '0', None, None),
 
2774
        FakeyMessage({}, [], '', '1', None, None),
 
2775
        FakeyMessage({}, [], '', '2', None, None),
 
2776
        ]
 
2777
 
 
2778
    def setUp(self):
 
2779
        self.iterators = []
 
2780
 
 
2781
        self.transport = StringTransportConsumer()
 
2782
        self.server = imap4.IMAP4Server(None, None, self.iterateInReactor)
 
2783
        self.server.makeConnection(self.transport)
 
2784
 
 
2785
 
 
2786
    def iterateInReactor(self, iterator):
 
2787
        d = defer.Deferred()
 
2788
        self.iterators.append((iterator, d))
 
2789
        return d
 
2790
 
 
2791
 
 
2792
    def tearDown(self):
 
2793
        self.server.connectionLost(failure.Failure(error.ConnectionDone()))
 
2794
 
 
2795
 
 
2796
    def test_synchronousFetch(self):
 
2797
        """
 
2798
        Test that pipelined FETCH commands which can be responded to
 
2799
        synchronously are responded to correctly.
 
2800
        """
 
2801
        mailbox = SynchronousMailbox(self.messages)
 
2802
 
 
2803
        # Skip over authentication and folder selection
 
2804
        self.server.state = 'select'
 
2805
        self.server.mbox = mailbox
 
2806
 
 
2807
        # Get rid of any greeting junk
 
2808
        self.transport.clear()
 
2809
 
 
2810
        # Here's some pipelined stuff
 
2811
        self.server.dataReceived(
 
2812
            '01 FETCH 1 BODY[]\r\n'
 
2813
            '02 FETCH 2 BODY[]\r\n'
 
2814
            '03 FETCH 3 BODY[]\r\n')
 
2815
 
 
2816
        # Flush anything the server has scheduled to run
 
2817
        while self.iterators:
 
2818
            for e in self.iterators[0][0]:
 
2819
                break
 
2820
            else:
 
2821
                self.iterators.pop(0)[1].callback(None)
 
2822
 
 
2823
        # The bodies are empty because we aren't simulating a transport
 
2824
        # exactly correctly (we have StringTransportConsumer but we never
 
2825
        # call resumeProducing on its producer).  It doesn't matter: just
 
2826
        # make sure the surrounding structure is okay, and that no
 
2827
        # exceptions occurred.
 
2828
        self.assertEquals(
 
2829
            self.transport.value(),
 
2830
            '* 1 FETCH (BODY[] )\r\n'
 
2831
            '01 OK FETCH completed\r\n'
 
2832
            '* 2 FETCH (BODY[] )\r\n'
 
2833
            '02 OK FETCH completed\r\n'
 
2834
            '* 3 FETCH (BODY[] )\r\n'
 
2835
            '03 OK FETCH completed\r\n')
 
2836
 
 
2837
 
 
2838
 
 
2839
if ClientTLSContext is None:
 
2840
    for case in (TLSTestCase,):
 
2841
        case.skip = "OpenSSL not present"
 
2842
elif interfaces.IReactorSSL(reactor, None) is None:
 
2843
    for case in (TLSTestCase,):
 
2844
        case.skip = "Reactor doesn't support SSL"