~justin-fathomdb/nova/justinsb-openstack-api-volumes

« back to all changes in this revision

Viewing changes to vendor/Twisted-10.0.0/twisted/mail/test/test_mail.py

  • Committer: Jesse Andrews
  • Date: 2010-05-28 06:05:26 UTC
  • Revision ID: git-v1:bf6e6e718cdc7488e2da87b21e258ccc065fe499
initial commit

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (c) 2001-2009 Twisted Matrix Laboratories.
 
2
# See LICENSE for details.
 
3
 
 
4
"""
 
5
Tests for large portions of L{twisted.mail}.
 
6
"""
 
7
 
 
8
import os
 
9
import errno
 
10
import shutil
 
11
import pickle
 
12
import StringIO
 
13
import rfc822
 
14
import tempfile
 
15
import signal
 
16
 
 
17
from zope.interface import Interface, implements
 
18
 
 
19
from twisted.trial import unittest
 
20
from twisted.mail import smtp
 
21
from twisted.mail import pop3
 
22
from twisted.names import dns
 
23
from twisted.internet import protocol
 
24
from twisted.internet import defer
 
25
from twisted.internet.defer import Deferred
 
26
from twisted.internet import reactor
 
27
from twisted.internet import interfaces
 
28
from twisted.internet import task
 
29
from twisted.internet.error import DNSLookupError, CannotListenError
 
30
from twisted.internet.error import ProcessDone, ProcessTerminated
 
31
from twisted.internet import address
 
32
from twisted.python import failure
 
33
from twisted.python.filepath import FilePath
 
34
from twisted.python.hashlib import md5
 
35
 
 
36
from twisted import mail
 
37
import twisted.mail.mail
 
38
import twisted.mail.maildir
 
39
import twisted.mail.relay
 
40
import twisted.mail.relaymanager
 
41
import twisted.mail.protocols
 
42
import twisted.mail.alias
 
43
 
 
44
from twisted.names.error import DNSNameError
 
45
from twisted.names.dns import RRHeader, Record_CNAME, Record_MX
 
46
 
 
47
from twisted import cred
 
48
import twisted.cred.credentials
 
49
import twisted.cred.checkers
 
50
import twisted.cred.portal
 
51
 
 
52
from twisted.test.proto_helpers import LineSendingProtocol
 
53
 
 
54
class DomainWithDefaultsTestCase(unittest.TestCase):
 
55
    def testMethods(self):
 
56
        d = dict([(x, x + 10) for x in range(10)])
 
57
        d = mail.mail.DomainWithDefaultDict(d, 'Default')
 
58
 
 
59
        self.assertEquals(len(d), 10)
 
60
        self.assertEquals(list(iter(d)), range(10))
 
61
        self.assertEquals(list(d.iterkeys()), list(iter(d)))
 
62
 
 
63
        items = list(d.iteritems())
 
64
        items.sort()
 
65
        self.assertEquals(items, [(x, x + 10) for x in range(10)])
 
66
 
 
67
        values = list(d.itervalues())
 
68
        values.sort()
 
69
        self.assertEquals(values, range(10, 20))
 
70
 
 
71
        items = d.items()
 
72
        items.sort()
 
73
        self.assertEquals(items, [(x, x + 10) for x in range(10)])
 
74
 
 
75
        values = d.values()
 
76
        values.sort()
 
77
        self.assertEquals(values, range(10, 20))
 
78
 
 
79
        for x in range(10):
 
80
            self.assertEquals(d[x], x + 10)
 
81
            self.assertEquals(d.get(x), x + 10)
 
82
            self.failUnless(x in d)
 
83
            self.failUnless(d.has_key(x))
 
84
 
 
85
        del d[2], d[4], d[6]
 
86
 
 
87
        self.assertEquals(len(d), 7)
 
88
        self.assertEquals(d[2], 'Default')
 
89
        self.assertEquals(d[4], 'Default')
 
90
        self.assertEquals(d[6], 'Default')
 
91
 
 
92
        d.update({'a': None, 'b': (), 'c': '*'})
 
93
        self.assertEquals(len(d), 10)
 
94
        self.assertEquals(d['a'], None)
 
95
        self.assertEquals(d['b'], ())
 
96
        self.assertEquals(d['c'], '*')
 
97
 
 
98
        d.clear()
 
99
        self.assertEquals(len(d), 0)
 
100
 
 
101
        self.assertEquals(d.setdefault('key', 'value'), 'value')
 
102
        self.assertEquals(d['key'], 'value')
 
103
 
 
104
        self.assertEquals(d.popitem(), ('key', 'value'))
 
105
        self.assertEquals(len(d), 0)
 
106
 
 
107
        dcopy = d.copy()
 
108
        self.assertEquals(d.domains, dcopy.domains)
 
109
        self.assertEquals(d.default, dcopy.default)
 
110
 
 
111
 
 
112
    def _stringificationTest(self, stringifier):
 
113
        """
 
114
        Assert that the class name of a L{mail.mail.DomainWithDefaultDict}
 
115
        instance and the string-formatted underlying domain dictionary both
 
116
        appear in the string produced by the given string-returning function.
 
117
 
 
118
        @type stringifier: one-argument callable
 
119
        @param stringifier: either C{str} or C{repr}, to be used to get a
 
120
            string to make assertions against.
 
121
        """
 
122
        domain = mail.mail.DomainWithDefaultDict({}, 'Default')
 
123
        self.assertIn(domain.__class__.__name__, stringifier(domain))
 
124
        domain['key'] = 'value'
 
125
        self.assertIn(str({'key': 'value'}), stringifier(domain))
 
126
 
 
127
 
 
128
    def test_str(self):
 
129
        """
 
130
        L{DomainWithDefaultDict.__str__} should return a string including
 
131
        the class name and the domain mapping held by the instance.
 
132
        """
 
133
        self._stringificationTest(str)
 
134
 
 
135
 
 
136
    def test_repr(self):
 
137
        """
 
138
        L{DomainWithDefaultDict.__repr__} should return a string including
 
139
        the class name and the domain mapping held by the instance.
 
140
        """
 
141
        self._stringificationTest(repr)
 
142
 
 
143
 
 
144
 
 
145
class BounceTestCase(unittest.TestCase):
 
146
    def setUp(self):
 
147
        self.domain = mail.mail.BounceDomain()
 
148
 
 
149
    def testExists(self):
 
150
        self.assertRaises(smtp.AddressError, self.domain.exists, "any user")
 
151
 
 
152
    def testRelay(self):
 
153
        self.assertEquals(
 
154
            self.domain.willRelay("random q emailer", "protocol"),
 
155
            False
 
156
        )
 
157
 
 
158
    def testMessage(self):
 
159
        self.assertRaises(NotImplementedError, self.domain.startMessage, "whomever")
 
160
 
 
161
    def testAddUser(self):
 
162
        self.domain.addUser("bob", "password")
 
163
        self.assertRaises(smtp.SMTPBadRcpt, self.domain.exists, "bob")
 
164
 
 
165
class FileMessageTestCase(unittest.TestCase):
 
166
    def setUp(self):
 
167
        self.name = "fileMessage.testFile"
 
168
        self.final = "final.fileMessage.testFile"
 
169
        self.f = file(self.name, 'w')
 
170
        self.fp = mail.mail.FileMessage(self.f, self.name, self.final)
 
171
 
 
172
    def tearDown(self):
 
173
        try:
 
174
            self.f.close()
 
175
        except:
 
176
            pass
 
177
        try:
 
178
            os.remove(self.name)
 
179
        except:
 
180
            pass
 
181
        try:
 
182
            os.remove(self.final)
 
183
        except:
 
184
            pass
 
185
 
 
186
    def testFinalName(self):
 
187
        return self.fp.eomReceived().addCallback(self._cbFinalName)
 
188
 
 
189
    def _cbFinalName(self, result):
 
190
        self.assertEquals(result, self.final)
 
191
        self.failUnless(self.f.closed)
 
192
        self.failIf(os.path.exists(self.name))
 
193
 
 
194
    def testContents(self):
 
195
        contents = "first line\nsecond line\nthird line\n"
 
196
        for line in contents.splitlines():
 
197
            self.fp.lineReceived(line)
 
198
        self.fp.eomReceived()
 
199
        self.assertEquals(file(self.final).read(), contents)
 
200
 
 
201
    def testInterrupted(self):
 
202
        contents = "first line\nsecond line\n"
 
203
        for line in contents.splitlines():
 
204
            self.fp.lineReceived(line)
 
205
        self.fp.connectionLost()
 
206
        self.failIf(os.path.exists(self.name))
 
207
        self.failIf(os.path.exists(self.final))
 
208
 
 
209
class MailServiceTestCase(unittest.TestCase):
 
210
    def setUp(self):
 
211
        self.service = mail.mail.MailService()
 
212
 
 
213
    def testFactories(self):
 
214
        f = self.service.getPOP3Factory()
 
215
        self.failUnless(isinstance(f, protocol.ServerFactory))
 
216
        self.failUnless(f.buildProtocol(('127.0.0.1', 12345)), pop3.POP3)
 
217
 
 
218
        f = self.service.getSMTPFactory()
 
219
        self.failUnless(isinstance(f, protocol.ServerFactory))
 
220
        self.failUnless(f.buildProtocol(('127.0.0.1', 12345)), smtp.SMTP)
 
221
 
 
222
        f = self.service.getESMTPFactory()
 
223
        self.failUnless(isinstance(f, protocol.ServerFactory))
 
224
        self.failUnless(f.buildProtocol(('127.0.0.1', 12345)), smtp.ESMTP)
 
225
 
 
226
    def testPortals(self):
 
227
        o1 = object()
 
228
        o2 = object()
 
229
        self.service.portals['domain'] = o1
 
230
        self.service.portals[''] = o2
 
231
 
 
232
        self.failUnless(self.service.lookupPortal('domain') is o1)
 
233
        self.failUnless(self.service.defaultPortal() is o2)
 
234
 
 
235
 
 
236
class StringListMailboxTests(unittest.TestCase):
 
237
    """
 
238
    Tests for L{StringListMailbox}, an in-memory only implementation of
 
239
    L{pop3.IMailbox}.
 
240
    """
 
241
    def test_listOneMessage(self):
 
242
        """
 
243
        L{StringListMailbox.listMessages} returns the length of the message at
 
244
        the offset into the mailbox passed to it.
 
245
        """
 
246
        mailbox = mail.maildir.StringListMailbox(["abc", "ab", "a"])
 
247
        self.assertEqual(mailbox.listMessages(0), 3)
 
248
        self.assertEqual(mailbox.listMessages(1), 2)
 
249
        self.assertEqual(mailbox.listMessages(2), 1)
 
250
 
 
251
 
 
252
    def test_listAllMessages(self):
 
253
        """
 
254
        L{StringListMailbox.listMessages} returns a list of the lengths of all
 
255
        messages if not passed an index.
 
256
        """
 
257
        mailbox = mail.maildir.StringListMailbox(["a", "abc", "ab"])
 
258
        self.assertEqual(mailbox.listMessages(), [1, 3, 2])
 
259
 
 
260
 
 
261
    def test_getMessage(self):
 
262
        """
 
263
        L{StringListMailbox.getMessage} returns a file-like object from which
 
264
        the contents of the message at the given offset into the mailbox can be
 
265
        read.
 
266
        """
 
267
        mailbox = mail.maildir.StringListMailbox(["foo", "real contents"])
 
268
        self.assertEqual(mailbox.getMessage(1).read(), "real contents")
 
269
 
 
270
 
 
271
    def test_getUidl(self):
 
272
        """
 
273
        L{StringListMailbox.getUidl} returns a unique identifier for the
 
274
        message at the given offset into the mailbox.
 
275
        """
 
276
        mailbox = mail.maildir.StringListMailbox(["foo", "bar"])
 
277
        self.assertNotEqual(mailbox.getUidl(0), mailbox.getUidl(1))
 
278
 
 
279
 
 
280
    def test_deleteMessage(self):
 
281
        """
 
282
        L{StringListMailbox.deleteMessage} marks a message for deletion causing
 
283
        further requests for its length to return 0.
 
284
        """
 
285
        mailbox = mail.maildir.StringListMailbox(["foo"])
 
286
        mailbox.deleteMessage(0)
 
287
        self.assertEqual(mailbox.listMessages(0), 0)
 
288
        self.assertEqual(mailbox.listMessages(), [0])
 
289
 
 
290
 
 
291
    def test_undeleteMessages(self):
 
292
        """
 
293
        L{StringListMailbox.undeleteMessages} causes any messages marked for
 
294
        deletion to be returned to their original state.
 
295
        """
 
296
        mailbox = mail.maildir.StringListMailbox(["foo"])
 
297
        mailbox.deleteMessage(0)
 
298
        mailbox.undeleteMessages()
 
299
        self.assertEqual(mailbox.listMessages(0), 3)
 
300
        self.assertEqual(mailbox.listMessages(), [3])
 
301
 
 
302
 
 
303
    def test_sync(self):
 
304
        """
 
305
        L{StringListMailbox.sync} causes any messages as marked for deletion to
 
306
        be permanently deleted.
 
307
        """
 
308
        mailbox = mail.maildir.StringListMailbox(["foo"])
 
309
        mailbox.deleteMessage(0)
 
310
        mailbox.sync()
 
311
        mailbox.undeleteMessages()
 
312
        self.assertEqual(mailbox.listMessages(0), 0)
 
313
        self.assertEqual(mailbox.listMessages(), [0])
 
314
 
 
315
 
 
316
 
 
317
class FailingMaildirMailboxAppendMessageTask(mail.maildir._MaildirMailboxAppendMessageTask):
 
318
    _openstate = True
 
319
    _writestate = True
 
320
    _renamestate = True
 
321
    def osopen(self, fn, attr, mode):
 
322
        if self._openstate:
 
323
            return os.open(fn, attr, mode)
 
324
        else:
 
325
            raise OSError(errno.EPERM, "Faked Permission Problem")
 
326
    def oswrite(self, fh, data):
 
327
        if self._writestate:
 
328
            return os.write(fh, data)
 
329
        else:
 
330
            raise OSError(errno.ENOSPC, "Faked Space problem")
 
331
    def osrename(self, oldname, newname):
 
332
        if self._renamestate:
 
333
            return os.rename(oldname, newname)
 
334
        else:
 
335
            raise OSError(errno.EPERM, "Faked Permission Problem")
 
336
 
 
337
class MaildirAppendStringTestCase(unittest.TestCase):
 
338
    def setUp(self):
 
339
        self.d = self.mktemp()
 
340
        mail.maildir.initializeMaildir(self.d)
 
341
 
 
342
    def tearDown(self):
 
343
        shutil.rmtree(self.d)
 
344
 
 
345
    def _append(self, ignored, mbox):
 
346
        d = mbox.appendMessage('TEST')
 
347
        return self.assertFailure(d, Exception)
 
348
 
 
349
    def _setState(self, ignored, mbox, rename=None, write=None, open=None):
 
350
        if rename is not None:
 
351
            mbox.AppendFactory._renameState = rename
 
352
        if write is not None:
 
353
            mbox.AppendFactory._writeState = write
 
354
        if open is not None:
 
355
            mbox.AppendFactory._openstate = open
 
356
 
 
357
    def testAppend(self):
 
358
        mbox = mail.maildir.MaildirMailbox(self.d)
 
359
        mbox.AppendFactory = FailingMaildirMailboxAppendMessageTask
 
360
        ds = []
 
361
        for i in xrange(1, 11):
 
362
            ds.append(mbox.appendMessage("X" * i))
 
363
            ds[-1].addCallback(self.assertEqual, None)
 
364
        d = defer.gatherResults(ds)
 
365
        d.addCallback(self._cbTestAppend, mbox)
 
366
        return d
 
367
 
 
368
    def _cbTestAppend(self, result, mbox):
 
369
        self.assertEquals(len(mbox.listMessages()),
 
370
                          10)
 
371
        self.assertEquals(len(mbox.getMessage(5).read()), 6)
 
372
        # test in the right order: last to first error location.
 
373
        mbox.AppendFactory._renamestate = False
 
374
        d = self._append(None, mbox)
 
375
        d.addCallback(self._setState, mbox, rename=True, write=False)
 
376
        d.addCallback(self._append, mbox)
 
377
        d.addCallback(self._setState, mbox, write=True, open=False)
 
378
        d.addCallback(self._append, mbox)
 
379
        d.addCallback(self._setState, mbox, open=True)
 
380
        return d
 
381
 
 
382
 
 
383
class MaildirAppendFileTestCase(unittest.TestCase):
 
384
    def setUp(self):
 
385
        self.d = self.mktemp()
 
386
        mail.maildir.initializeMaildir(self.d)
 
387
 
 
388
    def tearDown(self):
 
389
        shutil.rmtree(self.d)
 
390
 
 
391
    def testAppend(self):
 
392
        mbox = mail.maildir.MaildirMailbox(self.d)
 
393
        ds = []
 
394
        def _check(res, t):
 
395
            t.close()
 
396
            self.assertEqual(res, None)
 
397
        for i in xrange(1, 11):
 
398
            temp = tempfile.TemporaryFile()
 
399
            temp.write("X" * i)
 
400
            temp.seek(0,0)
 
401
            ds.append(mbox.appendMessage(temp))
 
402
            ds[-1].addCallback(_check, temp)
 
403
        return defer.gatherResults(ds).addCallback(self._cbTestAppend, mbox)
 
404
 
 
405
    def _cbTestAppend(self, result, mbox):
 
406
        self.assertEquals(len(mbox.listMessages()),
 
407
                          10)
 
408
        self.assertEquals(len(mbox.getMessage(5).read()), 6)
 
409
 
 
410
 
 
411
class MaildirTestCase(unittest.TestCase):
 
412
    def setUp(self):
 
413
        self.d = self.mktemp()
 
414
        mail.maildir.initializeMaildir(self.d)
 
415
 
 
416
    def tearDown(self):
 
417
        shutil.rmtree(self.d)
 
418
 
 
419
    def testInitializer(self):
 
420
        d = self.d
 
421
        trash = os.path.join(d, '.Trash')
 
422
 
 
423
        self.failUnless(os.path.exists(d) and os.path.isdir(d))
 
424
        self.failUnless(os.path.exists(os.path.join(d, 'new')))
 
425
        self.failUnless(os.path.exists(os.path.join(d, 'cur')))
 
426
        self.failUnless(os.path.exists(os.path.join(d, 'tmp')))
 
427
        self.failUnless(os.path.isdir(os.path.join(d, 'new')))
 
428
        self.failUnless(os.path.isdir(os.path.join(d, 'cur')))
 
429
        self.failUnless(os.path.isdir(os.path.join(d, 'tmp')))
 
430
 
 
431
        self.failUnless(os.path.exists(os.path.join(trash, 'new')))
 
432
        self.failUnless(os.path.exists(os.path.join(trash, 'cur')))
 
433
        self.failUnless(os.path.exists(os.path.join(trash, 'tmp')))
 
434
        self.failUnless(os.path.isdir(os.path.join(trash, 'new')))
 
435
        self.failUnless(os.path.isdir(os.path.join(trash, 'cur')))
 
436
        self.failUnless(os.path.isdir(os.path.join(trash, 'tmp')))
 
437
 
 
438
 
 
439
    def test_nameGenerator(self):
 
440
        """
 
441
        Each call to L{_MaildirNameGenerator.generate} returns a unique
 
442
        string suitable for use as the basename of a new message file.  The
 
443
        names are ordered such that those generated earlier sort less than
 
444
        those generated later.
 
445
        """
 
446
        clock = task.Clock()
 
447
        clock.advance(0.05)
 
448
        generator = mail.maildir._MaildirNameGenerator(clock)
 
449
 
 
450
        firstName = generator.generate()
 
451
        clock.advance(0.05)
 
452
        secondName = generator.generate()
 
453
 
 
454
        self.assertTrue(firstName < secondName)
 
455
 
 
456
 
 
457
    def test_mailbox(self):
 
458
        """
 
459
        Exercise the methods of L{IMailbox} as implemented by
 
460
        L{MaildirMailbox}.
 
461
        """
 
462
        j = os.path.join
 
463
        n = mail.maildir._generateMaildirName
 
464
        msgs = [j(b, n()) for b in ('cur', 'new') for x in range(5)]
 
465
 
 
466
        # Toss a few files into the mailbox
 
467
        i = 1
 
468
        for f in msgs:
 
469
            fObj = file(j(self.d, f), 'w')
 
470
            fObj.write('x' * i)
 
471
            fObj.close()
 
472
            i = i + 1
 
473
 
 
474
        mb = mail.maildir.MaildirMailbox(self.d)
 
475
        self.assertEquals(mb.listMessages(), range(1, 11))
 
476
        self.assertEquals(mb.listMessages(1), 2)
 
477
        self.assertEquals(mb.listMessages(5), 6)
 
478
 
 
479
        self.assertEquals(mb.getMessage(6).read(), 'x' * 7)
 
480
        self.assertEquals(mb.getMessage(1).read(), 'x' * 2)
 
481
 
 
482
        d = {}
 
483
        for i in range(10):
 
484
            u = mb.getUidl(i)
 
485
            self.failIf(u in d)
 
486
            d[u] = None
 
487
 
 
488
        p, f = os.path.split(msgs[5])
 
489
 
 
490
        mb.deleteMessage(5)
 
491
        self.assertEquals(mb.listMessages(5), 0)
 
492
        self.failUnless(os.path.exists(j(self.d, '.Trash', 'cur', f)))
 
493
        self.failIf(os.path.exists(j(self.d, msgs[5])))
 
494
 
 
495
        mb.undeleteMessages()
 
496
        self.assertEquals(mb.listMessages(5), 6)
 
497
        self.failIf(os.path.exists(j(self.d, '.Trash', 'cur', f)))
 
498
        self.failUnless(os.path.exists(j(self.d, msgs[5])))
 
499
 
 
500
class MaildirDirdbmDomainTestCase(unittest.TestCase):
 
501
    def setUp(self):
 
502
        self.P = self.mktemp()
 
503
        self.S = mail.mail.MailService()
 
504
        self.D = mail.maildir.MaildirDirdbmDomain(self.S, self.P)
 
505
 
 
506
    def tearDown(self):
 
507
        shutil.rmtree(self.P)
 
508
 
 
509
    def testAddUser(self):
 
510
        toAdd = (('user1', 'pwd1'), ('user2', 'pwd2'), ('user3', 'pwd3'))
 
511
        for (u, p) in toAdd:
 
512
            self.D.addUser(u, p)
 
513
 
 
514
        for (u, p) in toAdd:
 
515
            self.failUnless(u in self.D.dbm)
 
516
            self.assertEquals(self.D.dbm[u], p)
 
517
            self.failUnless(os.path.exists(os.path.join(self.P, u)))
 
518
 
 
519
    def testCredentials(self):
 
520
        creds = self.D.getCredentialsCheckers()
 
521
 
 
522
        self.assertEquals(len(creds), 1)
 
523
        self.failUnless(cred.checkers.ICredentialsChecker.providedBy(creds[0]))
 
524
        self.failUnless(cred.credentials.IUsernamePassword in creds[0].credentialInterfaces)
 
525
 
 
526
    def testRequestAvatar(self):
 
527
        class ISomething(Interface):
 
528
            pass
 
529
 
 
530
        self.D.addUser('user', 'password')
 
531
        self.assertRaises(
 
532
            NotImplementedError,
 
533
            self.D.requestAvatar, 'user', None, ISomething
 
534
        )
 
535
 
 
536
        t = self.D.requestAvatar('user', None, pop3.IMailbox)
 
537
        self.assertEquals(len(t), 3)
 
538
        self.failUnless(t[0] is pop3.IMailbox)
 
539
        self.failUnless(pop3.IMailbox.providedBy(t[1]))
 
540
 
 
541
        t[2]()
 
542
 
 
543
    def testRequestAvatarId(self):
 
544
        self.D.addUser('user', 'password')
 
545
        database = self.D.getCredentialsCheckers()[0]
 
546
 
 
547
        creds = cred.credentials.UsernamePassword('user', 'wrong password')
 
548
        self.assertRaises(
 
549
            cred.error.UnauthorizedLogin,
 
550
            database.requestAvatarId, creds
 
551
        )
 
552
 
 
553
        creds = cred.credentials.UsernamePassword('user', 'password')
 
554
        self.assertEquals(database.requestAvatarId(creds), 'user')
 
555
 
 
556
 
 
557
class StubAliasableDomain(object):
 
558
    """
 
559
    Minimal testable implementation of IAliasableDomain.
 
560
    """
 
561
    implements(mail.mail.IAliasableDomain)
 
562
 
 
563
    def exists(self, user):
 
564
        """
 
565
        No test coverage for invocations of this method on domain objects,
 
566
        so we just won't implement it.
 
567
        """
 
568
        raise NotImplementedError()
 
569
 
 
570
 
 
571
    def addUser(self, user, password):
 
572
        """
 
573
        No test coverage for invocations of this method on domain objects,
 
574
        so we just won't implement it.
 
575
        """
 
576
        raise NotImplementedError()
 
577
 
 
578
 
 
579
    def getCredentialsCheckers(self):
 
580
        """
 
581
        This needs to succeed in order for other tests to complete
 
582
        successfully, but we don't actually assert anything about its
 
583
        behavior.  Return an empty list.  Sometime later we should return
 
584
        something else and assert that a portal got set up properly.
 
585
        """
 
586
        return []
 
587
 
 
588
 
 
589
    def setAliasGroup(self, aliases):
 
590
        """
 
591
        Just record the value so the test can check it later.
 
592
        """
 
593
        self.aliasGroup = aliases
 
594
 
 
595
 
 
596
class ServiceDomainTestCase(unittest.TestCase):
 
597
    def setUp(self):
 
598
        self.S = mail.mail.MailService()
 
599
        self.D = mail.protocols.DomainDeliveryBase(self.S, None)
 
600
        self.D.service = self.S
 
601
        self.D.protocolName = 'TEST'
 
602
        self.D.host = 'hostname'
 
603
 
 
604
        self.tmpdir = self.mktemp()
 
605
        domain = mail.maildir.MaildirDirdbmDomain(self.S, self.tmpdir)
 
606
        domain.addUser('user', 'password')
 
607
        self.S.addDomain('test.domain', domain)
 
608
 
 
609
    def tearDown(self):
 
610
        shutil.rmtree(self.tmpdir)
 
611
 
 
612
 
 
613
    def testAddAliasableDomain(self):
 
614
        """
 
615
        Test that adding an IAliasableDomain to a mail service properly sets
 
616
        up alias group references and such.
 
617
        """
 
618
        aliases = object()
 
619
        domain = StubAliasableDomain()
 
620
        self.S.aliases = aliases
 
621
        self.S.addDomain('example.com', domain)
 
622
        self.assertIdentical(domain.aliasGroup, aliases)
 
623
 
 
624
 
 
625
    def testReceivedHeader(self):
 
626
         hdr = self.D.receivedHeader(
 
627
             ('remotehost', '123.232.101.234'),
 
628
             smtp.Address('<someguy@somplace>'),
 
629
             ['user@host.name']
 
630
         )
 
631
         fp = StringIO.StringIO(hdr)
 
632
         m = rfc822.Message(fp)
 
633
         self.assertEquals(len(m.items()), 1)
 
634
         self.failUnless(m.has_key('Received'))
 
635
 
 
636
    def testValidateTo(self):
 
637
        user = smtp.User('user@test.domain', 'helo', None, 'wherever@whatever')
 
638
        return defer.maybeDeferred(self.D.validateTo, user
 
639
            ).addCallback(self._cbValidateTo
 
640
            )
 
641
 
 
642
    def _cbValidateTo(self, result):
 
643
        self.failUnless(callable(result))
 
644
 
 
645
    def testValidateToBadUsername(self):
 
646
        user = smtp.User('resu@test.domain', 'helo', None, 'wherever@whatever')
 
647
        return self.assertFailure(
 
648
            defer.maybeDeferred(self.D.validateTo, user),
 
649
            smtp.SMTPBadRcpt)
 
650
 
 
651
    def testValidateToBadDomain(self):
 
652
        user = smtp.User('user@domain.test', 'helo', None, 'wherever@whatever')
 
653
        return self.assertFailure(
 
654
            defer.maybeDeferred(self.D.validateTo, user),
 
655
            smtp.SMTPBadRcpt)
 
656
 
 
657
    def testValidateFrom(self):
 
658
        helo = ('hostname', '127.0.0.1')
 
659
        origin = smtp.Address('<user@hostname>')
 
660
        self.failUnless(self.D.validateFrom(helo, origin) is origin)
 
661
 
 
662
        helo = ('hostname', '1.2.3.4')
 
663
        origin = smtp.Address('<user@hostname>')
 
664
        self.failUnless(self.D.validateFrom(helo, origin) is origin)
 
665
 
 
666
        helo = ('hostname', '1.2.3.4')
 
667
        origin = smtp.Address('<>')
 
668
        self.failUnless(self.D.validateFrom(helo, origin) is origin)
 
669
 
 
670
        self.assertRaises(
 
671
            smtp.SMTPBadSender,
 
672
            self.D.validateFrom, None, origin
 
673
        )
 
674
 
 
675
class VirtualPOP3TestCase(unittest.TestCase):
 
676
    def setUp(self):
 
677
        self.tmpdir = self.mktemp()
 
678
        self.S = mail.mail.MailService()
 
679
        self.D = mail.maildir.MaildirDirdbmDomain(self.S, self.tmpdir)
 
680
        self.D.addUser('user', 'password')
 
681
        self.S.addDomain('test.domain', self.D)
 
682
 
 
683
        portal = cred.portal.Portal(self.D)
 
684
        map(portal.registerChecker, self.D.getCredentialsCheckers())
 
685
        self.S.portals[''] = self.S.portals['test.domain'] = portal
 
686
 
 
687
        self.P = mail.protocols.VirtualPOP3()
 
688
        self.P.service = self.S
 
689
        self.P.magic = '<unit test magic>'
 
690
 
 
691
    def tearDown(self):
 
692
        shutil.rmtree(self.tmpdir)
 
693
 
 
694
    def testAuthenticateAPOP(self):
 
695
        resp = md5(self.P.magic + 'password').hexdigest()
 
696
        return self.P.authenticateUserAPOP('user', resp
 
697
            ).addCallback(self._cbAuthenticateAPOP
 
698
            )
 
699
 
 
700
    def _cbAuthenticateAPOP(self, result):
 
701
        self.assertEquals(len(result), 3)
 
702
        self.assertEquals(result[0], pop3.IMailbox)
 
703
        self.failUnless(pop3.IMailbox.providedBy(result[1]))
 
704
        result[2]()
 
705
 
 
706
    def testAuthenticateIncorrectUserAPOP(self):
 
707
        resp = md5(self.P.magic + 'password').hexdigest()
 
708
        return self.assertFailure(
 
709
            self.P.authenticateUserAPOP('resu', resp),
 
710
            cred.error.UnauthorizedLogin)
 
711
 
 
712
    def testAuthenticateIncorrectResponseAPOP(self):
 
713
        resp = md5('wrong digest').hexdigest()
 
714
        return self.assertFailure(
 
715
            self.P.authenticateUserAPOP('user', resp),
 
716
            cred.error.UnauthorizedLogin)
 
717
 
 
718
    def testAuthenticatePASS(self):
 
719
        return self.P.authenticateUserPASS('user', 'password'
 
720
            ).addCallback(self._cbAuthenticatePASS
 
721
            )
 
722
 
 
723
    def _cbAuthenticatePASS(self, result):
 
724
        self.assertEquals(len(result), 3)
 
725
        self.assertEquals(result[0], pop3.IMailbox)
 
726
        self.failUnless(pop3.IMailbox.providedBy(result[1]))
 
727
        result[2]()
 
728
 
 
729
    def testAuthenticateBadUserPASS(self):
 
730
        return self.assertFailure(
 
731
            self.P.authenticateUserPASS('resu', 'password'),
 
732
            cred.error.UnauthorizedLogin)
 
733
 
 
734
    def testAuthenticateBadPasswordPASS(self):
 
735
        return self.assertFailure(
 
736
            self.P.authenticateUserPASS('user', 'wrong password'),
 
737
            cred.error.UnauthorizedLogin)
 
738
 
 
739
class empty(smtp.User):
 
740
    def __init__(self):
 
741
        pass
 
742
 
 
743
class RelayTestCase(unittest.TestCase):
 
744
    def testExists(self):
 
745
        service = mail.mail.MailService()
 
746
        domain = mail.relay.DomainQueuer(service)
 
747
 
 
748
        doRelay = [
 
749
            address.UNIXAddress('/var/run/mail-relay'),
 
750
            address.IPv4Address('TCP', '127.0.0.1', 12345),
 
751
        ]
 
752
 
 
753
        dontRelay = [
 
754
            address.IPv4Address('TCP', '192.168.2.1', 62),
 
755
            address.IPv4Address('TCP', '1.2.3.4', 1943),
 
756
        ]
 
757
 
 
758
        for peer in doRelay:
 
759
            user = empty()
 
760
            user.orig = 'user@host'
 
761
            user.dest = 'tsoh@resu'
 
762
            user.protocol = empty()
 
763
            user.protocol.transport = empty()
 
764
            user.protocol.transport.getPeer = lambda: peer
 
765
 
 
766
            self.failUnless(callable(domain.exists(user)))
 
767
 
 
768
        for peer in dontRelay:
 
769
            user = empty()
 
770
            user.orig = 'some@place'
 
771
            user.protocol = empty()
 
772
            user.protocol.transport = empty()
 
773
            user.protocol.transport.getPeer = lambda: peer
 
774
            user.dest = 'who@cares'
 
775
 
 
776
            self.assertRaises(smtp.SMTPBadRcpt, domain.exists, user)
 
777
 
 
778
class RelayerTestCase(unittest.TestCase):
 
779
    def setUp(self):
 
780
        self.tmpdir = self.mktemp()
 
781
        os.mkdir(self.tmpdir)
 
782
        self.messageFiles = []
 
783
        for i in range(10):
 
784
            name = os.path.join(self.tmpdir, 'body-%d' % (i,))
 
785
            f = file(name + '-H', 'w')
 
786
            pickle.dump(['from-%d' % (i,), 'to-%d' % (i,)], f)
 
787
            f.close()
 
788
 
 
789
            f = file(name + '-D', 'w')
 
790
            f.write(name)
 
791
            f.seek(0, 0)
 
792
            self.messageFiles.append(name)
 
793
 
 
794
        self.R = mail.relay.RelayerMixin()
 
795
        self.R.loadMessages(self.messageFiles)
 
796
 
 
797
    def tearDown(self):
 
798
        shutil.rmtree(self.tmpdir)
 
799
 
 
800
    def testMailFrom(self):
 
801
        for i in range(10):
 
802
            self.assertEquals(self.R.getMailFrom(), 'from-%d' % (i,))
 
803
            self.R.sentMail(250, None, None, None, None)
 
804
        self.assertEquals(self.R.getMailFrom(), None)
 
805
 
 
806
    def testMailTo(self):
 
807
        for i in range(10):
 
808
            self.assertEquals(self.R.getMailTo(), ['to-%d' % (i,)])
 
809
            self.R.sentMail(250, None, None, None, None)
 
810
        self.assertEquals(self.R.getMailTo(), None)
 
811
 
 
812
    def testMailData(self):
 
813
        for i in range(10):
 
814
            name = os.path.join(self.tmpdir, 'body-%d' % (i,))
 
815
            self.assertEquals(self.R.getMailData().read(), name)
 
816
            self.R.sentMail(250, None, None, None, None)
 
817
        self.assertEquals(self.R.getMailData(), None)
 
818
 
 
819
class Manager:
 
820
    def __init__(self):
 
821
        self.success = []
 
822
        self.failure = []
 
823
        self.done = []
 
824
 
 
825
    def notifySuccess(self, factory, message):
 
826
        self.success.append((factory, message))
 
827
 
 
828
    def notifyFailure(self, factory, message):
 
829
        self.failure.append((factory, message))
 
830
 
 
831
    def notifyDone(self, factory):
 
832
        self.done.append(factory)
 
833
 
 
834
class ManagedRelayerTestCase(unittest.TestCase):
 
835
    def setUp(self):
 
836
        self.manager = Manager()
 
837
        self.messages = range(0, 20, 2)
 
838
        self.factory = object()
 
839
        self.relay = mail.relaymanager.ManagedRelayerMixin(self.manager)
 
840
        self.relay.messages = self.messages[:]
 
841
        self.relay.names = self.messages[:]
 
842
        self.relay.factory = self.factory
 
843
 
 
844
    def testSuccessfulSentMail(self):
 
845
        for i in self.messages:
 
846
            self.relay.sentMail(250, None, None, None, None)
 
847
 
 
848
        self.assertEquals(
 
849
            self.manager.success,
 
850
            [(self.factory, m) for m in self.messages]
 
851
        )
 
852
 
 
853
    def testFailedSentMail(self):
 
854
        for i in self.messages:
 
855
            self.relay.sentMail(550, None, None, None, None)
 
856
 
 
857
        self.assertEquals(
 
858
            self.manager.failure,
 
859
            [(self.factory, m) for m in self.messages]
 
860
        )
 
861
 
 
862
    def testConnectionLost(self):
 
863
        self.relay.connectionLost(failure.Failure(Exception()))
 
864
        self.assertEquals(self.manager.done, [self.factory])
 
865
 
 
866
class DirectoryQueueTestCase(unittest.TestCase):
 
867
    def setUp(self):
 
868
        # This is almost a test case itself.
 
869
        self.tmpdir = self.mktemp()
 
870
        os.mkdir(self.tmpdir)
 
871
        self.queue = mail.relaymanager.Queue(self.tmpdir)
 
872
        self.queue.noisy = False
 
873
        for m in range(25):
 
874
            hdrF, msgF = self.queue.createNewMessage()
 
875
            pickle.dump(['header', m], hdrF)
 
876
            hdrF.close()
 
877
            msgF.lineReceived('body: %d' % (m,))
 
878
            msgF.eomReceived()
 
879
        self.queue.readDirectory()
 
880
 
 
881
    def tearDown(self):
 
882
        shutil.rmtree(self.tmpdir)
 
883
 
 
884
    def testWaiting(self):
 
885
        self.failUnless(self.queue.hasWaiting())
 
886
        self.assertEquals(len(self.queue.getWaiting()), 25)
 
887
 
 
888
        waiting = self.queue.getWaiting()
 
889
        self.queue.setRelaying(waiting[0])
 
890
        self.assertEquals(len(self.queue.getWaiting()), 24)
 
891
 
 
892
        self.queue.setWaiting(waiting[0])
 
893
        self.assertEquals(len(self.queue.getWaiting()), 25)
 
894
 
 
895
    def testRelaying(self):
 
896
        for m in self.queue.getWaiting():
 
897
            self.queue.setRelaying(m)
 
898
            self.assertEquals(
 
899
                len(self.queue.getRelayed()),
 
900
                25 - len(self.queue.getWaiting())
 
901
            )
 
902
 
 
903
        self.failIf(self.queue.hasWaiting())
 
904
 
 
905
        relayed = self.queue.getRelayed()
 
906
        self.queue.setWaiting(relayed[0])
 
907
        self.assertEquals(len(self.queue.getWaiting()), 1)
 
908
        self.assertEquals(len(self.queue.getRelayed()), 24)
 
909
 
 
910
    def testDone(self):
 
911
        msg = self.queue.getWaiting()[0]
 
912
        self.queue.setRelaying(msg)
 
913
        self.queue.done(msg)
 
914
 
 
915
        self.assertEquals(len(self.queue.getWaiting()), 24)
 
916
        self.assertEquals(len(self.queue.getRelayed()), 0)
 
917
 
 
918
        self.failIf(msg in self.queue.getWaiting())
 
919
        self.failIf(msg in self.queue.getRelayed())
 
920
 
 
921
    def testEnvelope(self):
 
922
        envelopes = []
 
923
 
 
924
        for msg in self.queue.getWaiting():
 
925
            envelopes.append(self.queue.getEnvelope(msg))
 
926
 
 
927
        envelopes.sort()
 
928
        for i in range(25):
 
929
            self.assertEquals(
 
930
                envelopes.pop(0),
 
931
                ['header', i]
 
932
            )
 
933
 
 
934
from twisted.names import server
 
935
from twisted.names import client
 
936
from twisted.names import common
 
937
 
 
938
class TestAuthority(common.ResolverBase):
 
939
    def __init__(self):
 
940
        common.ResolverBase.__init__(self)
 
941
        self.addresses = {}
 
942
 
 
943
    def _lookup(self, name, cls, type, timeout = None):
 
944
        if name in self.addresses and type == dns.MX:
 
945
            results = []
 
946
            for a in self.addresses[name]:
 
947
                hdr = dns.RRHeader(
 
948
                    name, dns.MX, dns.IN, 60, dns.Record_MX(0, a)
 
949
                )
 
950
                results.append(hdr)
 
951
            return defer.succeed((results, [], []))
 
952
        return defer.fail(failure.Failure(dns.DomainError(name)))
 
953
 
 
954
def setUpDNS(self):
 
955
    self.auth = TestAuthority()
 
956
    factory = server.DNSServerFactory([self.auth])
 
957
    protocol = dns.DNSDatagramProtocol(factory)
 
958
    while 1:
 
959
        self.port = reactor.listenTCP(0, factory, interface='127.0.0.1')
 
960
        portNumber = self.port.getHost().port
 
961
 
 
962
        try:
 
963
            self.udpPort = reactor.listenUDP(portNumber, protocol, interface='127.0.0.1')
 
964
        except CannotListenError:
 
965
            self.port.stopListening()
 
966
        else:
 
967
            break
 
968
    self.resolver = client.Resolver(servers=[('127.0.0.1', portNumber)])
 
969
 
 
970
 
 
971
def tearDownDNS(self):
 
972
    dl = []
 
973
    dl.append(defer.maybeDeferred(self.port.stopListening))
 
974
    dl.append(defer.maybeDeferred(self.udpPort.stopListening))
 
975
    if self.resolver.protocol.transport is not None:
 
976
        dl.append(defer.maybeDeferred(self.resolver.protocol.transport.stopListening))
 
977
    try:
 
978
        self.resolver._parseCall.cancel()
 
979
    except:
 
980
        pass
 
981
    return defer.DeferredList(dl)
 
982
 
 
983
class MXTestCase(unittest.TestCase):
 
984
    """
 
985
    Tests for L{mail.relaymanager.MXCalculator}.
 
986
    """
 
987
    def setUp(self):
 
988
        setUpDNS(self)
 
989
        self.clock = task.Clock()
 
990
        self.mx = mail.relaymanager.MXCalculator(self.resolver, self.clock)
 
991
 
 
992
    def tearDown(self):
 
993
        return tearDownDNS(self)
 
994
 
 
995
 
 
996
    def test_defaultClock(self):
 
997
        """
 
998
        L{MXCalculator}'s default clock is C{twisted.internet.reactor}.
 
999
        """
 
1000
        self.assertIdentical(
 
1001
            mail.relaymanager.MXCalculator(self.resolver).clock,
 
1002
            reactor)
 
1003
 
 
1004
 
 
1005
    def testSimpleSuccess(self):
 
1006
        self.auth.addresses['test.domain'] = ['the.email.test.domain']
 
1007
        return self.mx.getMX('test.domain').addCallback(self._cbSimpleSuccess)
 
1008
 
 
1009
    def _cbSimpleSuccess(self, mx):
 
1010
        self.assertEquals(mx.preference, 0)
 
1011
        self.assertEquals(str(mx.name), 'the.email.test.domain')
 
1012
 
 
1013
    def testSimpleFailure(self):
 
1014
        self.mx.fallbackToDomain = False
 
1015
        return self.assertFailure(self.mx.getMX('test.domain'), IOError)
 
1016
 
 
1017
    def testSimpleFailureWithFallback(self):
 
1018
        return self.assertFailure(self.mx.getMX('test.domain'), DNSLookupError)
 
1019
 
 
1020
 
 
1021
    def _exchangeTest(self, domain, records, correctMailExchange):
 
1022
        """
 
1023
        Issue an MX request for the given domain and arrange for it to be
 
1024
        responded to with the given records.  Verify that the resulting mail
 
1025
        exchange is the indicated host.
 
1026
 
 
1027
        @type domain: C{str}
 
1028
        @type records: C{list} of L{RRHeader}
 
1029
        @type correctMailExchange: C{str}
 
1030
        @rtype: L{Deferred}
 
1031
        """
 
1032
        class DummyResolver(object):
 
1033
            def lookupMailExchange(self, name):
 
1034
                if name == domain:
 
1035
                    return defer.succeed((
 
1036
                            records,
 
1037
                            [],
 
1038
                            []))
 
1039
                return defer.fail(DNSNameError(domain))
 
1040
 
 
1041
        self.mx.resolver = DummyResolver()
 
1042
        d = self.mx.getMX(domain)
 
1043
        def gotMailExchange(record):
 
1044
            self.assertEqual(str(record.name), correctMailExchange)
 
1045
        d.addCallback(gotMailExchange)
 
1046
        return d
 
1047
 
 
1048
 
 
1049
    def test_mailExchangePreference(self):
 
1050
        """
 
1051
        The MX record with the lowest preference is returned by
 
1052
        L{MXCalculator.getMX}.
 
1053
        """
 
1054
        domain = "example.com"
 
1055
        good = "good.example.com"
 
1056
        bad = "bad.example.com"
 
1057
 
 
1058
        records = [
 
1059
            RRHeader(name=domain,
 
1060
                     type=Record_MX.TYPE,
 
1061
                     payload=Record_MX(1, bad)),
 
1062
            RRHeader(name=domain,
 
1063
                     type=Record_MX.TYPE,
 
1064
                     payload=Record_MX(0, good)),
 
1065
            RRHeader(name=domain,
 
1066
                     type=Record_MX.TYPE,
 
1067
                     payload=Record_MX(2, bad))]
 
1068
        return self._exchangeTest(domain, records, good)
 
1069
 
 
1070
 
 
1071
    def test_badExchangeExcluded(self):
 
1072
        """
 
1073
        L{MXCalculator.getMX} returns the MX record with the lowest preference
 
1074
        which is not also marked as bad.
 
1075
        """
 
1076
        domain = "example.com"
 
1077
        good = "good.example.com"
 
1078
        bad = "bad.example.com"
 
1079
 
 
1080
        records = [
 
1081
            RRHeader(name=domain,
 
1082
                     type=Record_MX.TYPE,
 
1083
                     payload=Record_MX(0, bad)),
 
1084
            RRHeader(name=domain,
 
1085
                     type=Record_MX.TYPE,
 
1086
                     payload=Record_MX(1, good))]
 
1087
        self.mx.markBad(bad)
 
1088
        return self._exchangeTest(domain, records, good)
 
1089
 
 
1090
 
 
1091
    def test_fallbackForAllBadExchanges(self):
 
1092
        """
 
1093
        L{MXCalculator.getMX} returns the MX record with the lowest preference
 
1094
        if all the MX records in the response have been marked bad.
 
1095
        """
 
1096
        domain = "example.com"
 
1097
        bad = "bad.example.com"
 
1098
        worse = "worse.example.com"
 
1099
 
 
1100
        records = [
 
1101
            RRHeader(name=domain,
 
1102
                     type=Record_MX.TYPE,
 
1103
                     payload=Record_MX(0, bad)),
 
1104
            RRHeader(name=domain,
 
1105
                     type=Record_MX.TYPE,
 
1106
                     payload=Record_MX(1, worse))]
 
1107
        self.mx.markBad(bad)
 
1108
        self.mx.markBad(worse)
 
1109
        return self._exchangeTest(domain, records, bad)
 
1110
 
 
1111
 
 
1112
    def test_badExchangeExpires(self):
 
1113
        """
 
1114
        L{MXCalculator.getMX} returns the MX record with the lowest preference
 
1115
        if it was last marked bad longer than L{MXCalculator.timeOutBadMX}
 
1116
        seconds ago.
 
1117
        """
 
1118
        domain = "example.com"
 
1119
        good = "good.example.com"
 
1120
        previouslyBad = "bad.example.com"
 
1121
 
 
1122
        records = [
 
1123
            RRHeader(name=domain,
 
1124
                     type=Record_MX.TYPE,
 
1125
                     payload=Record_MX(0, previouslyBad)),
 
1126
            RRHeader(name=domain,
 
1127
                     type=Record_MX.TYPE,
 
1128
                     payload=Record_MX(1, good))]
 
1129
        self.mx.markBad(previouslyBad)
 
1130
        self.clock.advance(self.mx.timeOutBadMX)
 
1131
        return self._exchangeTest(domain, records, previouslyBad)
 
1132
 
 
1133
 
 
1134
    def test_goodExchangeUsed(self):
 
1135
        """
 
1136
        L{MXCalculator.getMX} returns the MX record with the lowest preference
 
1137
        if it was marked good after it was marked bad.
 
1138
        """
 
1139
        domain = "example.com"
 
1140
        good = "good.example.com"
 
1141
        previouslyBad = "bad.example.com"
 
1142
 
 
1143
        records = [
 
1144
            RRHeader(name=domain,
 
1145
                     type=Record_MX.TYPE,
 
1146
                     payload=Record_MX(0, previouslyBad)),
 
1147
            RRHeader(name=domain,
 
1148
                     type=Record_MX.TYPE,
 
1149
                     payload=Record_MX(1, good))]
 
1150
        self.mx.markBad(previouslyBad)
 
1151
        self.mx.markGood(previouslyBad)
 
1152
        self.clock.advance(self.mx.timeOutBadMX)
 
1153
        return self._exchangeTest(domain, records, previouslyBad)
 
1154
 
 
1155
 
 
1156
    def test_successWithoutResults(self):
 
1157
        """
 
1158
        If an MX lookup succeeds but the result set is empty,
 
1159
        L{MXCalculator.getMX} should try to look up an I{A} record for the
 
1160
        requested name and call back its returned Deferred with that
 
1161
        address.
 
1162
        """
 
1163
        ip = '1.2.3.4'
 
1164
        domain = 'example.org'
 
1165
 
 
1166
        class DummyResolver(object):
 
1167
            """
 
1168
            Fake resolver which will respond to an MX lookup with an empty
 
1169
            result set.
 
1170
 
 
1171
            @ivar mx: A dictionary mapping hostnames to three-tuples of
 
1172
                results to be returned from I{MX} lookups.
 
1173
 
 
1174
            @ivar a: A dictionary mapping hostnames to addresses to be
 
1175
                returned from I{A} lookups.
 
1176
            """
 
1177
            mx = {domain: ([], [], [])}
 
1178
            a = {domain: ip}
 
1179
 
 
1180
            def lookupMailExchange(self, domain):
 
1181
                return defer.succeed(self.mx[domain])
 
1182
 
 
1183
            def getHostByName(self, domain):
 
1184
                return defer.succeed(self.a[domain])
 
1185
 
 
1186
        self.mx.resolver = DummyResolver()
 
1187
        d = self.mx.getMX(domain)
 
1188
        d.addCallback(self.assertEqual, Record_MX(name=ip))
 
1189
        return d
 
1190
 
 
1191
 
 
1192
    def test_failureWithSuccessfulFallback(self):
 
1193
        """
 
1194
        Test that if the MX record lookup fails, fallback is enabled, and an A
 
1195
        record is available for the name, then the Deferred returned by
 
1196
        L{MXCalculator.getMX} ultimately fires with a Record_MX instance which
 
1197
        gives the address in the A record for the name.
 
1198
        """
 
1199
        class DummyResolver(object):
 
1200
            """
 
1201
            Fake resolver which will fail an MX lookup but then succeed a
 
1202
            getHostByName call.
 
1203
            """
 
1204
            def lookupMailExchange(self, domain):
 
1205
                return defer.fail(DNSNameError())
 
1206
 
 
1207
            def getHostByName(self, domain):
 
1208
                return defer.succeed("1.2.3.4")
 
1209
 
 
1210
        self.mx.resolver = DummyResolver()
 
1211
        d = self.mx.getMX("domain")
 
1212
        d.addCallback(self.assertEqual, Record_MX(name="1.2.3.4"))
 
1213
        return d
 
1214
 
 
1215
 
 
1216
    def test_cnameWithoutGlueRecords(self):
 
1217
        """
 
1218
        If an MX lookup returns a single CNAME record as a result, MXCalculator
 
1219
        will perform an MX lookup for the canonical name indicated and return
 
1220
        the MX record which results.
 
1221
        """
 
1222
        alias = "alias.example.com"
 
1223
        canonical = "canonical.example.com"
 
1224
        exchange = "mail.example.com"
 
1225
 
 
1226
        class DummyResolver(object):
 
1227
            """
 
1228
            Fake resolver which will return a CNAME for an MX lookup of a name
 
1229
            which is an alias and an MX for an MX lookup of the canonical name.
 
1230
            """
 
1231
            def lookupMailExchange(self, domain):
 
1232
                if domain == alias:
 
1233
                    return defer.succeed((
 
1234
                            [RRHeader(name=domain,
 
1235
                                      type=Record_CNAME.TYPE,
 
1236
                                      payload=Record_CNAME(canonical))],
 
1237
                            [], []))
 
1238
                elif domain == canonical:
 
1239
                    return defer.succeed((
 
1240
                            [RRHeader(name=domain,
 
1241
                                      type=Record_MX.TYPE,
 
1242
                                      payload=Record_MX(0, exchange))],
 
1243
                            [], []))
 
1244
                else:
 
1245
                    return defer.fail(DNSNameError(domain))
 
1246
 
 
1247
        self.mx.resolver = DummyResolver()
 
1248
        d = self.mx.getMX(alias)
 
1249
        d.addCallback(self.assertEqual, Record_MX(name=exchange))
 
1250
        return d
 
1251
 
 
1252
 
 
1253
    def test_cnameChain(self):
 
1254
        """
 
1255
        If L{MXCalculator.getMX} encounters a CNAME chain which is longer than
 
1256
        the length specified, the returned L{Deferred} should errback with
 
1257
        L{CanonicalNameChainTooLong}.
 
1258
        """
 
1259
        class DummyResolver(object):
 
1260
            """
 
1261
            Fake resolver which generates a CNAME chain of infinite length in
 
1262
            response to MX lookups.
 
1263
            """
 
1264
            chainCounter = 0
 
1265
 
 
1266
            def lookupMailExchange(self, domain):
 
1267
                self.chainCounter += 1
 
1268
                name = 'x-%d.example.com' % (self.chainCounter,)
 
1269
                return defer.succeed((
 
1270
                        [RRHeader(name=domain,
 
1271
                                  type=Record_CNAME.TYPE,
 
1272
                                  payload=Record_CNAME(name))],
 
1273
                        [], []))
 
1274
 
 
1275
        cnameLimit = 3
 
1276
        self.mx.resolver = DummyResolver()
 
1277
        d = self.mx.getMX("mail.example.com", cnameLimit)
 
1278
        self.assertFailure(
 
1279
            d, twisted.mail.relaymanager.CanonicalNameChainTooLong)
 
1280
        def cbChainTooLong(error):
 
1281
            self.assertEqual(error.args[0], Record_CNAME("x-%d.example.com" % (cnameLimit + 1,)))
 
1282
            self.assertEqual(self.mx.resolver.chainCounter, cnameLimit + 1)
 
1283
        d.addCallback(cbChainTooLong)
 
1284
        return d
 
1285
 
 
1286
 
 
1287
    def test_cnameWithGlueRecords(self):
 
1288
        """
 
1289
        If an MX lookup returns a CNAME and the MX record for the CNAME, the
 
1290
        L{Deferred} returned by L{MXCalculator.getMX} should be called back
 
1291
        with the name from the MX record without further lookups being
 
1292
        attempted.
 
1293
        """
 
1294
        lookedUp = []
 
1295
        alias = "alias.example.com"
 
1296
        canonical = "canonical.example.com"
 
1297
        exchange = "mail.example.com"
 
1298
 
 
1299
        class DummyResolver(object):
 
1300
            def lookupMailExchange(self, domain):
 
1301
                if domain != alias or lookedUp:
 
1302
                    # Don't give back any results for anything except the alias
 
1303
                    # or on any request after the first.
 
1304
                    return ([], [], [])
 
1305
                return defer.succeed((
 
1306
                        [RRHeader(name=alias,
 
1307
                                  type=Record_CNAME.TYPE,
 
1308
                                  payload=Record_CNAME(canonical)),
 
1309
                         RRHeader(name=canonical,
 
1310
                                  type=Record_MX.TYPE,
 
1311
                                  payload=Record_MX(name=exchange))],
 
1312
                        [], []))
 
1313
 
 
1314
        self.mx.resolver = DummyResolver()
 
1315
        d = self.mx.getMX(alias)
 
1316
        d.addCallback(self.assertEqual, Record_MX(name=exchange))
 
1317
        return d
 
1318
 
 
1319
 
 
1320
    def test_cnameLoopWithGlueRecords(self):
 
1321
        """
 
1322
        If an MX lookup returns two CNAME records which point to each other,
 
1323
        the loop should be detected and the L{Deferred} returned by
 
1324
        L{MXCalculator.getMX} should be errbacked with L{CanonicalNameLoop}.
 
1325
        """
 
1326
        firstAlias = "cname1.example.com"
 
1327
        secondAlias = "cname2.example.com"
 
1328
 
 
1329
        class DummyResolver(object):
 
1330
            def lookupMailExchange(self, domain):
 
1331
                return defer.succeed((
 
1332
                        [RRHeader(name=firstAlias,
 
1333
                                  type=Record_CNAME.TYPE,
 
1334
                                  payload=Record_CNAME(secondAlias)),
 
1335
                         RRHeader(name=secondAlias,
 
1336
                                  type=Record_CNAME.TYPE,
 
1337
                                  payload=Record_CNAME(firstAlias))],
 
1338
                        [], []))
 
1339
 
 
1340
        self.mx.resolver = DummyResolver()
 
1341
        d = self.mx.getMX(firstAlias)
 
1342
        self.assertFailure(d, twisted.mail.relaymanager.CanonicalNameLoop)
 
1343
        return d
 
1344
 
 
1345
 
 
1346
    def testManyRecords(self):
 
1347
        self.auth.addresses['test.domain'] = [
 
1348
            'mx1.test.domain', 'mx2.test.domain', 'mx3.test.domain'
 
1349
        ]
 
1350
        return self.mx.getMX('test.domain'
 
1351
            ).addCallback(self._cbManyRecordsSuccessfulLookup
 
1352
            )
 
1353
 
 
1354
    def _cbManyRecordsSuccessfulLookup(self, mx):
 
1355
        self.failUnless(str(mx.name).split('.', 1)[0] in ('mx1', 'mx2', 'mx3'))
 
1356
        self.mx.markBad(str(mx.name))
 
1357
        return self.mx.getMX('test.domain'
 
1358
            ).addCallback(self._cbManyRecordsDifferentResult, mx
 
1359
            )
 
1360
 
 
1361
    def _cbManyRecordsDifferentResult(self, nextMX, mx):
 
1362
        self.assertNotEqual(str(mx.name), str(nextMX.name))
 
1363
        self.mx.markBad(str(nextMX.name))
 
1364
 
 
1365
        return self.mx.getMX('test.domain'
 
1366
            ).addCallback(self._cbManyRecordsLastResult, mx, nextMX
 
1367
            )
 
1368
 
 
1369
    def _cbManyRecordsLastResult(self, lastMX, mx, nextMX):
 
1370
        self.assertNotEqual(str(mx.name), str(lastMX.name))
 
1371
        self.assertNotEqual(str(nextMX.name), str(lastMX.name))
 
1372
 
 
1373
        self.mx.markBad(str(lastMX.name))
 
1374
        self.mx.markGood(str(nextMX.name))
 
1375
 
 
1376
        return self.mx.getMX('test.domain'
 
1377
            ).addCallback(self._cbManyRecordsRepeatSpecificResult, nextMX
 
1378
            )
 
1379
 
 
1380
    def _cbManyRecordsRepeatSpecificResult(self, againMX, nextMX):
 
1381
        self.assertEqual(str(againMX.name), str(nextMX.name))
 
1382
 
 
1383
class LiveFireExercise(unittest.TestCase):
 
1384
    if interfaces.IReactorUDP(reactor, None) is None:
 
1385
        skip = "UDP support is required to determining MX records"
 
1386
 
 
1387
    def setUp(self):
 
1388
        setUpDNS(self)
 
1389
        self.tmpdirs = [
 
1390
            'domainDir', 'insertionDomain', 'insertionQueue',
 
1391
            'destinationDomain', 'destinationQueue'
 
1392
        ]
 
1393
 
 
1394
    def tearDown(self):
 
1395
        for d in self.tmpdirs:
 
1396
            if os.path.exists(d):
 
1397
                shutil.rmtree(d)
 
1398
        return tearDownDNS(self)
 
1399
 
 
1400
    def testLocalDelivery(self):
 
1401
        service = mail.mail.MailService()
 
1402
        service.smtpPortal.registerChecker(cred.checkers.AllowAnonymousAccess())
 
1403
        domain = mail.maildir.MaildirDirdbmDomain(service, 'domainDir')
 
1404
        domain.addUser('user', 'password')
 
1405
        service.addDomain('test.domain', domain)
 
1406
        service.portals[''] = service.portals['test.domain']
 
1407
        map(service.portals[''].registerChecker, domain.getCredentialsCheckers())
 
1408
 
 
1409
        service.setQueue(mail.relay.DomainQueuer(service))
 
1410
        manager = mail.relaymanager.SmartHostSMTPRelayingManager(service.queue, None)
 
1411
        helper = mail.relaymanager.RelayStateHelper(manager, 1)
 
1412
 
 
1413
        f = service.getSMTPFactory()
 
1414
 
 
1415
        self.smtpServer = reactor.listenTCP(0, f, interface='127.0.0.1')
 
1416
 
 
1417
        client = LineSendingProtocol([
 
1418
            'HELO meson',
 
1419
            'MAIL FROM: <user@hostname>',
 
1420
            'RCPT TO: <user@test.domain>',
 
1421
            'DATA',
 
1422
            'This is the message',
 
1423
            '.',
 
1424
            'QUIT'
 
1425
        ])
 
1426
 
 
1427
        done = Deferred()
 
1428
        f = protocol.ClientFactory()
 
1429
        f.protocol = lambda: client
 
1430
        f.clientConnectionLost = lambda *args: done.callback(None)
 
1431
        reactor.connectTCP('127.0.0.1', self.smtpServer.getHost().port, f)
 
1432
 
 
1433
        def finished(ign):
 
1434
            mbox = domain.requestAvatar('user', None, pop3.IMailbox)[1]
 
1435
            msg = mbox.getMessage(0).read()
 
1436
            self.failIfEqual(msg.find('This is the message'), -1)
 
1437
 
 
1438
            return self.smtpServer.stopListening()
 
1439
        done.addCallback(finished)
 
1440
        return done
 
1441
 
 
1442
 
 
1443
    def testRelayDelivery(self):
 
1444
        # Here is the service we will connect to and send mail from
 
1445
        insServ = mail.mail.MailService()
 
1446
        insServ.smtpPortal.registerChecker(cred.checkers.AllowAnonymousAccess())
 
1447
        domain = mail.maildir.MaildirDirdbmDomain(insServ, 'insertionDomain')
 
1448
        insServ.addDomain('insertion.domain', domain)
 
1449
        os.mkdir('insertionQueue')
 
1450
        insServ.setQueue(mail.relaymanager.Queue('insertionQueue'))
 
1451
        insServ.domains.setDefaultDomain(mail.relay.DomainQueuer(insServ))
 
1452
        manager = mail.relaymanager.SmartHostSMTPRelayingManager(insServ.queue)
 
1453
        manager.fArgs += ('test.identity.hostname',)
 
1454
        helper = mail.relaymanager.RelayStateHelper(manager, 1)
 
1455
        # Yoink!  Now the internet obeys OUR every whim!
 
1456
        manager.mxcalc = mail.relaymanager.MXCalculator(self.resolver)
 
1457
        # And this is our whim.
 
1458
        self.auth.addresses['destination.domain'] = ['127.0.0.1']
 
1459
 
 
1460
        f = insServ.getSMTPFactory()
 
1461
        self.insServer = reactor.listenTCP(0, f, interface='127.0.0.1')
 
1462
 
 
1463
        # Here is the service the previous one will connect to for final
 
1464
        # delivery
 
1465
        destServ = mail.mail.MailService()
 
1466
        destServ.smtpPortal.registerChecker(cred.checkers.AllowAnonymousAccess())
 
1467
        domain = mail.maildir.MaildirDirdbmDomain(destServ, 'destinationDomain')
 
1468
        domain.addUser('user', 'password')
 
1469
        destServ.addDomain('destination.domain', domain)
 
1470
        os.mkdir('destinationQueue')
 
1471
        destServ.setQueue(mail.relaymanager.Queue('destinationQueue'))
 
1472
        manager2 = mail.relaymanager.SmartHostSMTPRelayingManager(destServ.queue)
 
1473
        helper = mail.relaymanager.RelayStateHelper(manager, 1)
 
1474
        helper.startService()
 
1475
 
 
1476
        f = destServ.getSMTPFactory()
 
1477
        self.destServer = reactor.listenTCP(0, f, interface='127.0.0.1')
 
1478
 
 
1479
        # Update the port number the *first* relay will connect to, because we can't use
 
1480
        # port 25
 
1481
        manager.PORT = self.destServer.getHost().port
 
1482
 
 
1483
        client = LineSendingProtocol([
 
1484
            'HELO meson',
 
1485
            'MAIL FROM: <user@wherever>',
 
1486
            'RCPT TO: <user@destination.domain>',
 
1487
            'DATA',
 
1488
            'This is the message',
 
1489
            '.',
 
1490
            'QUIT'
 
1491
        ])
 
1492
 
 
1493
        done = Deferred()
 
1494
        f = protocol.ClientFactory()
 
1495
        f.protocol = lambda: client
 
1496
        f.clientConnectionLost = lambda *args: done.callback(None)
 
1497
        reactor.connectTCP('127.0.0.1', self.insServer.getHost().port, f)
 
1498
 
 
1499
        def finished(ign):
 
1500
            # First part of the delivery is done.  Poke the queue manually now
 
1501
            # so we don't have to wait for the queue to be flushed.
 
1502
            delivery = manager.checkState()
 
1503
            def delivered(ign):
 
1504
                mbox = domain.requestAvatar('user', None, pop3.IMailbox)[1]
 
1505
                msg = mbox.getMessage(0).read()
 
1506
                self.failIfEqual(msg.find('This is the message'), -1)
 
1507
 
 
1508
                self.insServer.stopListening()
 
1509
                self.destServer.stopListening()
 
1510
                helper.stopService()
 
1511
            delivery.addCallback(delivered)
 
1512
            return delivery
 
1513
        done.addCallback(finished)
 
1514
        return done
 
1515
 
 
1516
 
 
1517
aliasFile = StringIO.StringIO("""\
 
1518
# Here's a comment
 
1519
   # woop another one
 
1520
testuser:                   address1,address2, address3,
 
1521
    continuation@address, |/bin/process/this
 
1522
 
 
1523
usertwo:thisaddress,thataddress, lastaddress
 
1524
lastuser:       :/includable, /filename, |/program, address
 
1525
""")
 
1526
 
 
1527
class LineBufferMessage:
 
1528
    def __init__(self):
 
1529
        self.lines = []
 
1530
        self.eom = False
 
1531
        self.lost = False
 
1532
 
 
1533
    def lineReceived(self, line):
 
1534
        self.lines.append(line)
 
1535
 
 
1536
    def eomReceived(self):
 
1537
        self.eom = True
 
1538
        return defer.succeed('<Whatever>')
 
1539
 
 
1540
    def connectionLost(self):
 
1541
        self.lost = True
 
1542
 
 
1543
class AliasTestCase(unittest.TestCase):
 
1544
    lines = [
 
1545
        'First line',
 
1546
        'Next line',
 
1547
        '',
 
1548
        'After a blank line',
 
1549
        'Last line'
 
1550
    ]
 
1551
 
 
1552
    def setUp(self):
 
1553
        aliasFile.seek(0)
 
1554
 
 
1555
    def testHandle(self):
 
1556
        result = {}
 
1557
        lines = [
 
1558
            'user:  another@host\n',
 
1559
            'nextuser:  |/bin/program\n',
 
1560
            'user:  me@again\n',
 
1561
            'moreusers: :/etc/include/filename\n',
 
1562
            'multiuser: first@host, second@host,last@anotherhost',
 
1563
        ]
 
1564
 
 
1565
        for l in lines:
 
1566
            mail.alias.handle(result, l, 'TestCase', None)
 
1567
 
 
1568
        self.assertEquals(result['user'], ['another@host', 'me@again'])
 
1569
        self.assertEquals(result['nextuser'], ['|/bin/program'])
 
1570
        self.assertEquals(result['moreusers'], [':/etc/include/filename'])
 
1571
        self.assertEquals(result['multiuser'], ['first@host', 'second@host', 'last@anotherhost'])
 
1572
 
 
1573
    def testFileLoader(self):
 
1574
        domains = {'': object()}
 
1575
        result = mail.alias.loadAliasFile(domains, fp=aliasFile)
 
1576
 
 
1577
        self.assertEquals(len(result), 3)
 
1578
 
 
1579
        group = result['testuser']
 
1580
        s = str(group)
 
1581
        for a in ('address1', 'address2', 'address3', 'continuation@address', '/bin/process/this'):
 
1582
            self.failIfEqual(s.find(a), -1)
 
1583
        self.assertEquals(len(group), 5)
 
1584
 
 
1585
        group = result['usertwo']
 
1586
        s = str(group)
 
1587
        for a in ('thisaddress', 'thataddress', 'lastaddress'):
 
1588
            self.failIfEqual(s.find(a), -1)
 
1589
        self.assertEquals(len(group), 3)
 
1590
 
 
1591
        group = result['lastuser']
 
1592
        s = str(group)
 
1593
        self.failUnlessEqual(s.find('/includable'), -1)
 
1594
        for a in ('/filename', 'program', 'address'):
 
1595
            self.failIfEqual(s.find(a), -1, '%s not found' % a)
 
1596
        self.assertEquals(len(group), 3)
 
1597
 
 
1598
    def testMultiWrapper(self):
 
1599
        msgs = LineBufferMessage(), LineBufferMessage(), LineBufferMessage()
 
1600
        msg = mail.alias.MultiWrapper(msgs)
 
1601
 
 
1602
        for L in self.lines:
 
1603
            msg.lineReceived(L)
 
1604
        return msg.eomReceived().addCallback(self._cbMultiWrapper, msgs)
 
1605
 
 
1606
    def _cbMultiWrapper(self, ignored, msgs):
 
1607
        for m in msgs:
 
1608
            self.failUnless(m.eom)
 
1609
            self.failIf(m.lost)
 
1610
            self.assertEquals(self.lines, m.lines)
 
1611
 
 
1612
    def testFileAlias(self):
 
1613
        tmpfile = self.mktemp()
 
1614
        a = mail.alias.FileAlias(tmpfile, None, None)
 
1615
        m = a.createMessageReceiver()
 
1616
 
 
1617
        for l in self.lines:
 
1618
            m.lineReceived(l)
 
1619
        return m.eomReceived().addCallback(self._cbTestFileAlias, tmpfile)
 
1620
 
 
1621
    def _cbTestFileAlias(self, ignored, tmpfile):
 
1622
        lines = file(tmpfile).readlines()
 
1623
        self.assertEquals([L[:-1] for L in lines], self.lines)
 
1624
 
 
1625
 
 
1626
 
 
1627
class DummyProcess(object):
 
1628
    __slots__ = ['onEnd']
 
1629
 
 
1630
 
 
1631
 
 
1632
class MockProcessAlias(mail.alias.ProcessAlias):
 
1633
    """
 
1634
    A alias processor that doesn't actually launch processes.
 
1635
    """
 
1636
 
 
1637
    def spawnProcess(self, proto, program, path):
 
1638
        """
 
1639
        Don't spawn a process.
 
1640
        """
 
1641
 
 
1642
 
 
1643
 
 
1644
class MockAliasGroup(mail.alias.AliasGroup):
 
1645
    """
 
1646
    An alias group using C{MockProcessAlias}.
 
1647
    """
 
1648
    processAliasFactory = MockProcessAlias
 
1649
 
 
1650
 
 
1651
 
 
1652
class StubProcess(object):
 
1653
    """
 
1654
    Fake implementation of L{IProcessTransport}.
 
1655
 
 
1656
    @ivar signals: A list of all the signals which have been sent to this fake
 
1657
        process.
 
1658
    """
 
1659
    def __init__(self):
 
1660
        self.signals = []
 
1661
 
 
1662
 
 
1663
    def loseConnection(self):
 
1664
        """
 
1665
        No-op implementation of disconnection.
 
1666
        """
 
1667
 
 
1668
 
 
1669
    def signalProcess(self, signal):
 
1670
        """
 
1671
        Record a signal sent to this process for later inspection.
 
1672
        """
 
1673
        self.signals.append(signal)
 
1674
 
 
1675
 
 
1676
 
 
1677
class ProcessAliasTestCase(unittest.TestCase):
 
1678
    """
 
1679
    Tests for alias resolution.
 
1680
    """
 
1681
    if interfaces.IReactorProcess(reactor, None) is None:
 
1682
        skip = "IReactorProcess not supported"
 
1683
 
 
1684
    lines = [
 
1685
        'First line',
 
1686
        'Next line',
 
1687
        '',
 
1688
        'After a blank line',
 
1689
        'Last line'
 
1690
    ]
 
1691
 
 
1692
    def exitStatus(self, code):
 
1693
        """
 
1694
        Construct a status from the given exit code.
 
1695
 
 
1696
        @type code: L{int} between 0 and 255 inclusive.
 
1697
        @param code: The exit status which the code will represent.
 
1698
 
 
1699
        @rtype: L{int}
 
1700
        @return: A status integer for the given exit code.
 
1701
        """
 
1702
        # /* Macros for constructing status values.  */
 
1703
        # #define __W_EXITCODE(ret, sig)  ((ret) << 8 | (sig))
 
1704
        status = (code << 8) | 0
 
1705
 
 
1706
        # Sanity check
 
1707
        self.assertTrue(os.WIFEXITED(status))
 
1708
        self.assertEqual(os.WEXITSTATUS(status), code)
 
1709
        self.assertFalse(os.WIFSIGNALED(status))
 
1710
 
 
1711
        return status
 
1712
 
 
1713
 
 
1714
    def signalStatus(self, signal):
 
1715
        """
 
1716
        Construct a status from the given signal.
 
1717
 
 
1718
        @type signal: L{int} between 0 and 255 inclusive.
 
1719
        @param signal: The signal number which the status will represent.
 
1720
 
 
1721
        @rtype: L{int}
 
1722
        @return: A status integer for the given signal.
 
1723
        """
 
1724
        # /* If WIFSIGNALED(STATUS), the terminating signal.  */
 
1725
        # #define __WTERMSIG(status)      ((status) & 0x7f)
 
1726
        # /* Nonzero if STATUS indicates termination by a signal.  */
 
1727
        # #define __WIFSIGNALED(status) \
 
1728
        #    (((signed char) (((status) & 0x7f) + 1) >> 1) > 0)
 
1729
        status = signal
 
1730
 
 
1731
        # Sanity check
 
1732
        self.assertTrue(os.WIFSIGNALED(status))
 
1733
        self.assertEqual(os.WTERMSIG(status), signal)
 
1734
        self.assertFalse(os.WIFEXITED(status))
 
1735
 
 
1736
        return status
 
1737
 
 
1738
 
 
1739
    def setUp(self):
 
1740
        """
 
1741
        Replace L{smtp.DNSNAME} with a well-known value.
 
1742
        """
 
1743
        self.DNSNAME = smtp.DNSNAME
 
1744
        smtp.DNSNAME = ''
 
1745
 
 
1746
 
 
1747
    def tearDown(self):
 
1748
        """
 
1749
        Restore the original value of L{smtp.DNSNAME}.
 
1750
        """
 
1751
        smtp.DNSNAME = self.DNSNAME
 
1752
 
 
1753
 
 
1754
    def test_processAlias(self):
 
1755
        """
 
1756
        Standard call to C{mail.alias.ProcessAlias}: check that the specified
 
1757
        script is called, and that the input is correctly transferred to it.
 
1758
        """
 
1759
        sh = FilePath(self.mktemp())
 
1760
        sh.setContent("""\
 
1761
#!/bin/sh
 
1762
rm -f process.alias.out
 
1763
while read i; do
 
1764
    echo $i >> process.alias.out
 
1765
done""")
 
1766
        os.chmod(sh.path, 0700)
 
1767
        a = mail.alias.ProcessAlias(sh.path, None, None)
 
1768
        m = a.createMessageReceiver()
 
1769
 
 
1770
        for l in self.lines:
 
1771
            m.lineReceived(l)
 
1772
 
 
1773
        def _cbProcessAlias(ignored):
 
1774
            lines = file('process.alias.out').readlines()
 
1775
            self.assertEquals([L[:-1] for L in lines], self.lines)
 
1776
 
 
1777
        return m.eomReceived().addCallback(_cbProcessAlias)
 
1778
 
 
1779
 
 
1780
    def test_processAliasTimeout(self):
 
1781
        """
 
1782
        If the alias child process does not exit within a particular period of
 
1783
        time, the L{Deferred} returned by L{MessageWrapper.eomReceived} should
 
1784
        fail with L{ProcessAliasTimeout} and send the I{KILL} signal to the
 
1785
        child process..
 
1786
        """
 
1787
        reactor = task.Clock()
 
1788
        transport = StubProcess()
 
1789
        proto = mail.alias.ProcessAliasProtocol()
 
1790
        proto.makeConnection(transport)
 
1791
 
 
1792
        receiver = mail.alias.MessageWrapper(proto, None, reactor)
 
1793
        d = receiver.eomReceived()
 
1794
        reactor.advance(receiver.completionTimeout)
 
1795
        def timedOut(ignored):
 
1796
            self.assertEqual(transport.signals, ['KILL'])
 
1797
            # Now that it has been killed, disconnect the protocol associated
 
1798
            # with it.
 
1799
            proto.processEnded(
 
1800
                ProcessTerminated(self.signalStatus(signal.SIGKILL)))
 
1801
        self.assertFailure(d, mail.alias.ProcessAliasTimeout)
 
1802
        d.addCallback(timedOut)
 
1803
        return d
 
1804
 
 
1805
 
 
1806
    def test_earlyProcessTermination(self):
 
1807
        """
 
1808
        If the process associated with an L{mail.alias.MessageWrapper} exits
 
1809
        before I{eomReceived} is called, the L{Deferred} returned by
 
1810
        I{eomReceived} should fail.
 
1811
        """
 
1812
        transport = StubProcess()
 
1813
        protocol = mail.alias.ProcessAliasProtocol()
 
1814
        protocol.makeConnection(transport)
 
1815
        receiver = mail.alias.MessageWrapper(protocol, None, None)
 
1816
        protocol.processEnded(failure.Failure(ProcessDone(0)))
 
1817
        return self.assertFailure(receiver.eomReceived(), ProcessDone)
 
1818
 
 
1819
 
 
1820
    def _terminationTest(self, status):
 
1821
        """
 
1822
        Verify that if the process associated with an
 
1823
        L{mail.alias.MessageWrapper} exits with the given status, the
 
1824
        L{Deferred} returned by I{eomReceived} fails with L{ProcessTerminated}.
 
1825
        """
 
1826
        transport = StubProcess()
 
1827
        protocol = mail.alias.ProcessAliasProtocol()
 
1828
        protocol.makeConnection(transport)
 
1829
        receiver = mail.alias.MessageWrapper(protocol, None, None)
 
1830
        protocol.processEnded(
 
1831
            failure.Failure(ProcessTerminated(status)))
 
1832
        return self.assertFailure(receiver.eomReceived(), ProcessTerminated)
 
1833
 
 
1834
 
 
1835
    def test_errorProcessTermination(self):
 
1836
        """
 
1837
        If the process associated with an L{mail.alias.MessageWrapper} exits
 
1838
        with a non-zero exit code, the L{Deferred} returned by I{eomReceived}
 
1839
        should fail.
 
1840
        """
 
1841
        return self._terminationTest(self.exitStatus(1))
 
1842
 
 
1843
 
 
1844
    def test_signalProcessTermination(self):
 
1845
        """
 
1846
        If the process associated with an L{mail.alias.MessageWrapper} exits
 
1847
        because it received a signal, the L{Deferred} returned by
 
1848
        I{eomReceived} should fail.
 
1849
        """
 
1850
        return self._terminationTest(self.signalStatus(signal.SIGHUP))
 
1851
 
 
1852
 
 
1853
    def test_aliasResolution(self):
 
1854
        """
 
1855
        Check that the C{resolve} method of alias processors produce the correct
 
1856
        set of objects:
 
1857
            - direct alias with L{mail.alias.AddressAlias} if a simple input is passed
 
1858
            - aliases in a file with L{mail.alias.FileWrapper} if an input in the format
 
1859
              '/file' is given
 
1860
            - aliases resulting of a process call wrapped by L{mail.alias.MessageWrapper}
 
1861
              if the format is '|process'
 
1862
        """
 
1863
        aliases = {}
 
1864
        domain = {'': TestDomain(aliases, ['user1', 'user2', 'user3'])}
 
1865
        A1 = MockAliasGroup(['user1', '|echo', '/file'], domain, 'alias1')
 
1866
        A2 = MockAliasGroup(['user2', 'user3'], domain, 'alias2')
 
1867
        A3 = mail.alias.AddressAlias('alias1', domain, 'alias3')
 
1868
        aliases.update({
 
1869
            'alias1': A1,
 
1870
            'alias2': A2,
 
1871
            'alias3': A3,
 
1872
        })
 
1873
 
 
1874
        res1 = A1.resolve(aliases)
 
1875
        r1 = map(str, res1.objs)
 
1876
        r1.sort()
 
1877
        expected = map(str, [
 
1878
            mail.alias.AddressAlias('user1', None, None),
 
1879
            mail.alias.MessageWrapper(DummyProcess(), 'echo'),
 
1880
            mail.alias.FileWrapper('/file'),
 
1881
        ])
 
1882
        expected.sort()
 
1883
        self.assertEquals(r1, expected)
 
1884
 
 
1885
        res2 = A2.resolve(aliases)
 
1886
        r2 = map(str, res2.objs)
 
1887
        r2.sort()
 
1888
        expected = map(str, [
 
1889
            mail.alias.AddressAlias('user2', None, None),
 
1890
            mail.alias.AddressAlias('user3', None, None)
 
1891
        ])
 
1892
        expected.sort()
 
1893
        self.assertEquals(r2, expected)
 
1894
 
 
1895
        res3 = A3.resolve(aliases)
 
1896
        r3 = map(str, res3.objs)
 
1897
        r3.sort()
 
1898
        expected = map(str, [
 
1899
            mail.alias.AddressAlias('user1', None, None),
 
1900
            mail.alias.MessageWrapper(DummyProcess(), 'echo'),
 
1901
            mail.alias.FileWrapper('/file'),
 
1902
        ])
 
1903
        expected.sort()
 
1904
        self.assertEquals(r3, expected)
 
1905
 
 
1906
 
 
1907
    def test_cyclicAlias(self):
 
1908
        """
 
1909
        Check that a cycle in alias resolution is correctly handled.
 
1910
        """
 
1911
        aliases = {}
 
1912
        domain = {'': TestDomain(aliases, [])}
 
1913
        A1 = mail.alias.AddressAlias('alias2', domain, 'alias1')
 
1914
        A2 = mail.alias.AddressAlias('alias3', domain, 'alias2')
 
1915
        A3 = mail.alias.AddressAlias('alias1', domain, 'alias3')
 
1916
        aliases.update({
 
1917
            'alias1': A1,
 
1918
            'alias2': A2,
 
1919
            'alias3': A3
 
1920
        })
 
1921
 
 
1922
        self.assertEquals(aliases['alias1'].resolve(aliases), None)
 
1923
        self.assertEquals(aliases['alias2'].resolve(aliases), None)
 
1924
        self.assertEquals(aliases['alias3'].resolve(aliases), None)
 
1925
 
 
1926
        A4 = MockAliasGroup(['|echo', 'alias1'], domain, 'alias4')
 
1927
        aliases['alias4'] = A4
 
1928
 
 
1929
        res = A4.resolve(aliases)
 
1930
        r = map(str, res.objs)
 
1931
        r.sort()
 
1932
        expected = map(str, [
 
1933
            mail.alias.MessageWrapper(DummyProcess(), 'echo')
 
1934
        ])
 
1935
        expected.sort()
 
1936
        self.assertEquals(r, expected)
 
1937
 
 
1938
 
 
1939
 
 
1940
 
 
1941
 
 
1942
 
 
1943
class TestDomain:
 
1944
    def __init__(self, aliases, users):
 
1945
        self.aliases = aliases
 
1946
        self.users = users
 
1947
 
 
1948
    def exists(self, user, memo=None):
 
1949
        user = user.dest.local
 
1950
        if user in self.users:
 
1951
            return lambda: mail.alias.AddressAlias(user, None, None)
 
1952
        try:
 
1953
            a = self.aliases[user]
 
1954
        except:
 
1955
            raise smtp.SMTPBadRcpt(user)
 
1956
        else:
 
1957
            aliases = a.resolve(self.aliases, memo)
 
1958
            if aliases:
 
1959
                return lambda: aliases
 
1960
            raise smtp.SMTPBadRcpt(user)
 
1961
 
 
1962
 
 
1963
from twisted.python.runtime import platformType
 
1964
import types
 
1965
if platformType != "posix":
 
1966
    for o in locals().values():
 
1967
        if isinstance(o, (types.ClassType, type)) and issubclass(o, unittest.TestCase):
 
1968
            o.skip = "twisted.mail only works on posix"