1
# Copyright (c) 2001-2009 Twisted Matrix Laboratories.
2
# See LICENSE for details.
5
Tests for large portions of L{twisted.mail}.
17
from zope.interface import Interface, implements
19
from twisted.trial import unittest
20
from twisted.mail import smtp
21
from twisted.mail import pop3
22
from twisted.names import dns
23
from twisted.internet import protocol
24
from twisted.internet import defer
25
from twisted.internet.defer import Deferred
26
from twisted.internet import reactor
27
from twisted.internet import interfaces
28
from twisted.internet import task
29
from twisted.internet.error import DNSLookupError, CannotListenError
30
from twisted.internet.error import ProcessDone, ProcessTerminated
31
from twisted.internet import address
32
from twisted.python import failure
33
from twisted.python.filepath import FilePath
34
from twisted.python.hashlib import md5
36
from twisted import mail
37
import twisted.mail.mail
38
import twisted.mail.maildir
39
import twisted.mail.relay
40
import twisted.mail.relaymanager
41
import twisted.mail.protocols
42
import twisted.mail.alias
44
from twisted.names.error import DNSNameError
45
from twisted.names.dns import RRHeader, Record_CNAME, Record_MX
47
from twisted import cred
48
import twisted.cred.credentials
49
import twisted.cred.checkers
50
import twisted.cred.portal
52
from twisted.test.proto_helpers import LineSendingProtocol
54
class DomainWithDefaultsTestCase(unittest.TestCase):
55
def testMethods(self):
56
d = dict([(x, x + 10) for x in range(10)])
57
d = mail.mail.DomainWithDefaultDict(d, 'Default')
59
self.assertEquals(len(d), 10)
60
self.assertEquals(list(iter(d)), range(10))
61
self.assertEquals(list(d.iterkeys()), list(iter(d)))
63
items = list(d.iteritems())
65
self.assertEquals(items, [(x, x + 10) for x in range(10)])
67
values = list(d.itervalues())
69
self.assertEquals(values, range(10, 20))
73
self.assertEquals(items, [(x, x + 10) for x in range(10)])
77
self.assertEquals(values, range(10, 20))
80
self.assertEquals(d[x], x + 10)
81
self.assertEquals(d.get(x), x + 10)
82
self.failUnless(x in d)
83
self.failUnless(d.has_key(x))
87
self.assertEquals(len(d), 7)
88
self.assertEquals(d[2], 'Default')
89
self.assertEquals(d[4], 'Default')
90
self.assertEquals(d[6], 'Default')
92
d.update({'a': None, 'b': (), 'c': '*'})
93
self.assertEquals(len(d), 10)
94
self.assertEquals(d['a'], None)
95
self.assertEquals(d['b'], ())
96
self.assertEquals(d['c'], '*')
99
self.assertEquals(len(d), 0)
101
self.assertEquals(d.setdefault('key', 'value'), 'value')
102
self.assertEquals(d['key'], 'value')
104
self.assertEquals(d.popitem(), ('key', 'value'))
105
self.assertEquals(len(d), 0)
108
self.assertEquals(d.domains, dcopy.domains)
109
self.assertEquals(d.default, dcopy.default)
112
def _stringificationTest(self, stringifier):
114
Assert that the class name of a L{mail.mail.DomainWithDefaultDict}
115
instance and the string-formatted underlying domain dictionary both
116
appear in the string produced by the given string-returning function.
118
@type stringifier: one-argument callable
119
@param stringifier: either C{str} or C{repr}, to be used to get a
120
string to make assertions against.
122
domain = mail.mail.DomainWithDefaultDict({}, 'Default')
123
self.assertIn(domain.__class__.__name__, stringifier(domain))
124
domain['key'] = 'value'
125
self.assertIn(str({'key': 'value'}), stringifier(domain))
130
L{DomainWithDefaultDict.__str__} should return a string including
131
the class name and the domain mapping held by the instance.
133
self._stringificationTest(str)
138
L{DomainWithDefaultDict.__repr__} should return a string including
139
the class name and the domain mapping held by the instance.
141
self._stringificationTest(repr)
145
class BounceTestCase(unittest.TestCase):
147
self.domain = mail.mail.BounceDomain()
149
def testExists(self):
150
self.assertRaises(smtp.AddressError, self.domain.exists, "any user")
154
self.domain.willRelay("random q emailer", "protocol"),
158
def testMessage(self):
159
self.assertRaises(NotImplementedError, self.domain.startMessage, "whomever")
161
def testAddUser(self):
162
self.domain.addUser("bob", "password")
163
self.assertRaises(smtp.SMTPBadRcpt, self.domain.exists, "bob")
165
class FileMessageTestCase(unittest.TestCase):
167
self.name = "fileMessage.testFile"
168
self.final = "final.fileMessage.testFile"
169
self.f = file(self.name, 'w')
170
self.fp = mail.mail.FileMessage(self.f, self.name, self.final)
182
os.remove(self.final)
186
def testFinalName(self):
187
return self.fp.eomReceived().addCallback(self._cbFinalName)
189
def _cbFinalName(self, result):
190
self.assertEquals(result, self.final)
191
self.failUnless(self.f.closed)
192
self.failIf(os.path.exists(self.name))
194
def testContents(self):
195
contents = "first line\nsecond line\nthird line\n"
196
for line in contents.splitlines():
197
self.fp.lineReceived(line)
198
self.fp.eomReceived()
199
self.assertEquals(file(self.final).read(), contents)
201
def testInterrupted(self):
202
contents = "first line\nsecond line\n"
203
for line in contents.splitlines():
204
self.fp.lineReceived(line)
205
self.fp.connectionLost()
206
self.failIf(os.path.exists(self.name))
207
self.failIf(os.path.exists(self.final))
209
class MailServiceTestCase(unittest.TestCase):
211
self.service = mail.mail.MailService()
213
def testFactories(self):
214
f = self.service.getPOP3Factory()
215
self.failUnless(isinstance(f, protocol.ServerFactory))
216
self.failUnless(f.buildProtocol(('127.0.0.1', 12345)), pop3.POP3)
218
f = self.service.getSMTPFactory()
219
self.failUnless(isinstance(f, protocol.ServerFactory))
220
self.failUnless(f.buildProtocol(('127.0.0.1', 12345)), smtp.SMTP)
222
f = self.service.getESMTPFactory()
223
self.failUnless(isinstance(f, protocol.ServerFactory))
224
self.failUnless(f.buildProtocol(('127.0.0.1', 12345)), smtp.ESMTP)
226
def testPortals(self):
229
self.service.portals['domain'] = o1
230
self.service.portals[''] = o2
232
self.failUnless(self.service.lookupPortal('domain') is o1)
233
self.failUnless(self.service.defaultPortal() is o2)
236
class StringListMailboxTests(unittest.TestCase):
238
Tests for L{StringListMailbox}, an in-memory only implementation of
241
def test_listOneMessage(self):
243
L{StringListMailbox.listMessages} returns the length of the message at
244
the offset into the mailbox passed to it.
246
mailbox = mail.maildir.StringListMailbox(["abc", "ab", "a"])
247
self.assertEqual(mailbox.listMessages(0), 3)
248
self.assertEqual(mailbox.listMessages(1), 2)
249
self.assertEqual(mailbox.listMessages(2), 1)
252
def test_listAllMessages(self):
254
L{StringListMailbox.listMessages} returns a list of the lengths of all
255
messages if not passed an index.
257
mailbox = mail.maildir.StringListMailbox(["a", "abc", "ab"])
258
self.assertEqual(mailbox.listMessages(), [1, 3, 2])
261
def test_getMessage(self):
263
L{StringListMailbox.getMessage} returns a file-like object from which
264
the contents of the message at the given offset into the mailbox can be
267
mailbox = mail.maildir.StringListMailbox(["foo", "real contents"])
268
self.assertEqual(mailbox.getMessage(1).read(), "real contents")
271
def test_getUidl(self):
273
L{StringListMailbox.getUidl} returns a unique identifier for the
274
message at the given offset into the mailbox.
276
mailbox = mail.maildir.StringListMailbox(["foo", "bar"])
277
self.assertNotEqual(mailbox.getUidl(0), mailbox.getUidl(1))
280
def test_deleteMessage(self):
282
L{StringListMailbox.deleteMessage} marks a message for deletion causing
283
further requests for its length to return 0.
285
mailbox = mail.maildir.StringListMailbox(["foo"])
286
mailbox.deleteMessage(0)
287
self.assertEqual(mailbox.listMessages(0), 0)
288
self.assertEqual(mailbox.listMessages(), [0])
291
def test_undeleteMessages(self):
293
L{StringListMailbox.undeleteMessages} causes any messages marked for
294
deletion to be returned to their original state.
296
mailbox = mail.maildir.StringListMailbox(["foo"])
297
mailbox.deleteMessage(0)
298
mailbox.undeleteMessages()
299
self.assertEqual(mailbox.listMessages(0), 3)
300
self.assertEqual(mailbox.listMessages(), [3])
305
L{StringListMailbox.sync} causes any messages as marked for deletion to
306
be permanently deleted.
308
mailbox = mail.maildir.StringListMailbox(["foo"])
309
mailbox.deleteMessage(0)
311
mailbox.undeleteMessages()
312
self.assertEqual(mailbox.listMessages(0), 0)
313
self.assertEqual(mailbox.listMessages(), [0])
317
class FailingMaildirMailboxAppendMessageTask(mail.maildir._MaildirMailboxAppendMessageTask):
321
def osopen(self, fn, attr, mode):
323
return os.open(fn, attr, mode)
325
raise OSError(errno.EPERM, "Faked Permission Problem")
326
def oswrite(self, fh, data):
328
return os.write(fh, data)
330
raise OSError(errno.ENOSPC, "Faked Space problem")
331
def osrename(self, oldname, newname):
332
if self._renamestate:
333
return os.rename(oldname, newname)
335
raise OSError(errno.EPERM, "Faked Permission Problem")
337
class MaildirAppendStringTestCase(unittest.TestCase):
339
self.d = self.mktemp()
340
mail.maildir.initializeMaildir(self.d)
343
shutil.rmtree(self.d)
345
def _append(self, ignored, mbox):
346
d = mbox.appendMessage('TEST')
347
return self.assertFailure(d, Exception)
349
def _setState(self, ignored, mbox, rename=None, write=None, open=None):
350
if rename is not None:
351
mbox.AppendFactory._renameState = rename
352
if write is not None:
353
mbox.AppendFactory._writeState = write
355
mbox.AppendFactory._openstate = open
357
def testAppend(self):
358
mbox = mail.maildir.MaildirMailbox(self.d)
359
mbox.AppendFactory = FailingMaildirMailboxAppendMessageTask
361
for i in xrange(1, 11):
362
ds.append(mbox.appendMessage("X" * i))
363
ds[-1].addCallback(self.assertEqual, None)
364
d = defer.gatherResults(ds)
365
d.addCallback(self._cbTestAppend, mbox)
368
def _cbTestAppend(self, result, mbox):
369
self.assertEquals(len(mbox.listMessages()),
371
self.assertEquals(len(mbox.getMessage(5).read()), 6)
372
# test in the right order: last to first error location.
373
mbox.AppendFactory._renamestate = False
374
d = self._append(None, mbox)
375
d.addCallback(self._setState, mbox, rename=True, write=False)
376
d.addCallback(self._append, mbox)
377
d.addCallback(self._setState, mbox, write=True, open=False)
378
d.addCallback(self._append, mbox)
379
d.addCallback(self._setState, mbox, open=True)
383
class MaildirAppendFileTestCase(unittest.TestCase):
385
self.d = self.mktemp()
386
mail.maildir.initializeMaildir(self.d)
389
shutil.rmtree(self.d)
391
def testAppend(self):
392
mbox = mail.maildir.MaildirMailbox(self.d)
396
self.assertEqual(res, None)
397
for i in xrange(1, 11):
398
temp = tempfile.TemporaryFile()
401
ds.append(mbox.appendMessage(temp))
402
ds[-1].addCallback(_check, temp)
403
return defer.gatherResults(ds).addCallback(self._cbTestAppend, mbox)
405
def _cbTestAppend(self, result, mbox):
406
self.assertEquals(len(mbox.listMessages()),
408
self.assertEquals(len(mbox.getMessage(5).read()), 6)
411
class MaildirTestCase(unittest.TestCase):
413
self.d = self.mktemp()
414
mail.maildir.initializeMaildir(self.d)
417
shutil.rmtree(self.d)
419
def testInitializer(self):
421
trash = os.path.join(d, '.Trash')
423
self.failUnless(os.path.exists(d) and os.path.isdir(d))
424
self.failUnless(os.path.exists(os.path.join(d, 'new')))
425
self.failUnless(os.path.exists(os.path.join(d, 'cur')))
426
self.failUnless(os.path.exists(os.path.join(d, 'tmp')))
427
self.failUnless(os.path.isdir(os.path.join(d, 'new')))
428
self.failUnless(os.path.isdir(os.path.join(d, 'cur')))
429
self.failUnless(os.path.isdir(os.path.join(d, 'tmp')))
431
self.failUnless(os.path.exists(os.path.join(trash, 'new')))
432
self.failUnless(os.path.exists(os.path.join(trash, 'cur')))
433
self.failUnless(os.path.exists(os.path.join(trash, 'tmp')))
434
self.failUnless(os.path.isdir(os.path.join(trash, 'new')))
435
self.failUnless(os.path.isdir(os.path.join(trash, 'cur')))
436
self.failUnless(os.path.isdir(os.path.join(trash, 'tmp')))
439
def test_nameGenerator(self):
441
Each call to L{_MaildirNameGenerator.generate} returns a unique
442
string suitable for use as the basename of a new message file. The
443
names are ordered such that those generated earlier sort less than
444
those generated later.
448
generator = mail.maildir._MaildirNameGenerator(clock)
450
firstName = generator.generate()
452
secondName = generator.generate()
454
self.assertTrue(firstName < secondName)
457
def test_mailbox(self):
459
Exercise the methods of L{IMailbox} as implemented by
463
n = mail.maildir._generateMaildirName
464
msgs = [j(b, n()) for b in ('cur', 'new') for x in range(5)]
466
# Toss a few files into the mailbox
469
fObj = file(j(self.d, f), 'w')
474
mb = mail.maildir.MaildirMailbox(self.d)
475
self.assertEquals(mb.listMessages(), range(1, 11))
476
self.assertEquals(mb.listMessages(1), 2)
477
self.assertEquals(mb.listMessages(5), 6)
479
self.assertEquals(mb.getMessage(6).read(), 'x' * 7)
480
self.assertEquals(mb.getMessage(1).read(), 'x' * 2)
488
p, f = os.path.split(msgs[5])
491
self.assertEquals(mb.listMessages(5), 0)
492
self.failUnless(os.path.exists(j(self.d, '.Trash', 'cur', f)))
493
self.failIf(os.path.exists(j(self.d, msgs[5])))
495
mb.undeleteMessages()
496
self.assertEquals(mb.listMessages(5), 6)
497
self.failIf(os.path.exists(j(self.d, '.Trash', 'cur', f)))
498
self.failUnless(os.path.exists(j(self.d, msgs[5])))
500
class MaildirDirdbmDomainTestCase(unittest.TestCase):
502
self.P = self.mktemp()
503
self.S = mail.mail.MailService()
504
self.D = mail.maildir.MaildirDirdbmDomain(self.S, self.P)
507
shutil.rmtree(self.P)
509
def testAddUser(self):
510
toAdd = (('user1', 'pwd1'), ('user2', 'pwd2'), ('user3', 'pwd3'))
515
self.failUnless(u in self.D.dbm)
516
self.assertEquals(self.D.dbm[u], p)
517
self.failUnless(os.path.exists(os.path.join(self.P, u)))
519
def testCredentials(self):
520
creds = self.D.getCredentialsCheckers()
522
self.assertEquals(len(creds), 1)
523
self.failUnless(cred.checkers.ICredentialsChecker.providedBy(creds[0]))
524
self.failUnless(cred.credentials.IUsernamePassword in creds[0].credentialInterfaces)
526
def testRequestAvatar(self):
527
class ISomething(Interface):
530
self.D.addUser('user', 'password')
533
self.D.requestAvatar, 'user', None, ISomething
536
t = self.D.requestAvatar('user', None, pop3.IMailbox)
537
self.assertEquals(len(t), 3)
538
self.failUnless(t[0] is pop3.IMailbox)
539
self.failUnless(pop3.IMailbox.providedBy(t[1]))
543
def testRequestAvatarId(self):
544
self.D.addUser('user', 'password')
545
database = self.D.getCredentialsCheckers()[0]
547
creds = cred.credentials.UsernamePassword('user', 'wrong password')
549
cred.error.UnauthorizedLogin,
550
database.requestAvatarId, creds
553
creds = cred.credentials.UsernamePassword('user', 'password')
554
self.assertEquals(database.requestAvatarId(creds), 'user')
557
class StubAliasableDomain(object):
559
Minimal testable implementation of IAliasableDomain.
561
implements(mail.mail.IAliasableDomain)
563
def exists(self, user):
565
No test coverage for invocations of this method on domain objects,
566
so we just won't implement it.
568
raise NotImplementedError()
571
def addUser(self, user, password):
573
No test coverage for invocations of this method on domain objects,
574
so we just won't implement it.
576
raise NotImplementedError()
579
def getCredentialsCheckers(self):
581
This needs to succeed in order for other tests to complete
582
successfully, but we don't actually assert anything about its
583
behavior. Return an empty list. Sometime later we should return
584
something else and assert that a portal got set up properly.
589
def setAliasGroup(self, aliases):
591
Just record the value so the test can check it later.
593
self.aliasGroup = aliases
596
class ServiceDomainTestCase(unittest.TestCase):
598
self.S = mail.mail.MailService()
599
self.D = mail.protocols.DomainDeliveryBase(self.S, None)
600
self.D.service = self.S
601
self.D.protocolName = 'TEST'
602
self.D.host = 'hostname'
604
self.tmpdir = self.mktemp()
605
domain = mail.maildir.MaildirDirdbmDomain(self.S, self.tmpdir)
606
domain.addUser('user', 'password')
607
self.S.addDomain('test.domain', domain)
610
shutil.rmtree(self.tmpdir)
613
def testAddAliasableDomain(self):
615
Test that adding an IAliasableDomain to a mail service properly sets
616
up alias group references and such.
619
domain = StubAliasableDomain()
620
self.S.aliases = aliases
621
self.S.addDomain('example.com', domain)
622
self.assertIdentical(domain.aliasGroup, aliases)
625
def testReceivedHeader(self):
626
hdr = self.D.receivedHeader(
627
('remotehost', '123.232.101.234'),
628
smtp.Address('<someguy@somplace>'),
631
fp = StringIO.StringIO(hdr)
632
m = rfc822.Message(fp)
633
self.assertEquals(len(m.items()), 1)
634
self.failUnless(m.has_key('Received'))
636
def testValidateTo(self):
637
user = smtp.User('user@test.domain', 'helo', None, 'wherever@whatever')
638
return defer.maybeDeferred(self.D.validateTo, user
639
).addCallback(self._cbValidateTo
642
def _cbValidateTo(self, result):
643
self.failUnless(callable(result))
645
def testValidateToBadUsername(self):
646
user = smtp.User('resu@test.domain', 'helo', None, 'wherever@whatever')
647
return self.assertFailure(
648
defer.maybeDeferred(self.D.validateTo, user),
651
def testValidateToBadDomain(self):
652
user = smtp.User('user@domain.test', 'helo', None, 'wherever@whatever')
653
return self.assertFailure(
654
defer.maybeDeferred(self.D.validateTo, user),
657
def testValidateFrom(self):
658
helo = ('hostname', '127.0.0.1')
659
origin = smtp.Address('<user@hostname>')
660
self.failUnless(self.D.validateFrom(helo, origin) is origin)
662
helo = ('hostname', '1.2.3.4')
663
origin = smtp.Address('<user@hostname>')
664
self.failUnless(self.D.validateFrom(helo, origin) is origin)
666
helo = ('hostname', '1.2.3.4')
667
origin = smtp.Address('<>')
668
self.failUnless(self.D.validateFrom(helo, origin) is origin)
672
self.D.validateFrom, None, origin
675
class VirtualPOP3TestCase(unittest.TestCase):
677
self.tmpdir = self.mktemp()
678
self.S = mail.mail.MailService()
679
self.D = mail.maildir.MaildirDirdbmDomain(self.S, self.tmpdir)
680
self.D.addUser('user', 'password')
681
self.S.addDomain('test.domain', self.D)
683
portal = cred.portal.Portal(self.D)
684
map(portal.registerChecker, self.D.getCredentialsCheckers())
685
self.S.portals[''] = self.S.portals['test.domain'] = portal
687
self.P = mail.protocols.VirtualPOP3()
688
self.P.service = self.S
689
self.P.magic = '<unit test magic>'
692
shutil.rmtree(self.tmpdir)
694
def testAuthenticateAPOP(self):
695
resp = md5(self.P.magic + 'password').hexdigest()
696
return self.P.authenticateUserAPOP('user', resp
697
).addCallback(self._cbAuthenticateAPOP
700
def _cbAuthenticateAPOP(self, result):
701
self.assertEquals(len(result), 3)
702
self.assertEquals(result[0], pop3.IMailbox)
703
self.failUnless(pop3.IMailbox.providedBy(result[1]))
706
def testAuthenticateIncorrectUserAPOP(self):
707
resp = md5(self.P.magic + 'password').hexdigest()
708
return self.assertFailure(
709
self.P.authenticateUserAPOP('resu', resp),
710
cred.error.UnauthorizedLogin)
712
def testAuthenticateIncorrectResponseAPOP(self):
713
resp = md5('wrong digest').hexdigest()
714
return self.assertFailure(
715
self.P.authenticateUserAPOP('user', resp),
716
cred.error.UnauthorizedLogin)
718
def testAuthenticatePASS(self):
719
return self.P.authenticateUserPASS('user', 'password'
720
).addCallback(self._cbAuthenticatePASS
723
def _cbAuthenticatePASS(self, result):
724
self.assertEquals(len(result), 3)
725
self.assertEquals(result[0], pop3.IMailbox)
726
self.failUnless(pop3.IMailbox.providedBy(result[1]))
729
def testAuthenticateBadUserPASS(self):
730
return self.assertFailure(
731
self.P.authenticateUserPASS('resu', 'password'),
732
cred.error.UnauthorizedLogin)
734
def testAuthenticateBadPasswordPASS(self):
735
return self.assertFailure(
736
self.P.authenticateUserPASS('user', 'wrong password'),
737
cred.error.UnauthorizedLogin)
739
class empty(smtp.User):
743
class RelayTestCase(unittest.TestCase):
744
def testExists(self):
745
service = mail.mail.MailService()
746
domain = mail.relay.DomainQueuer(service)
749
address.UNIXAddress('/var/run/mail-relay'),
750
address.IPv4Address('TCP', '127.0.0.1', 12345),
754
address.IPv4Address('TCP', '192.168.2.1', 62),
755
address.IPv4Address('TCP', '1.2.3.4', 1943),
760
user.orig = 'user@host'
761
user.dest = 'tsoh@resu'
762
user.protocol = empty()
763
user.protocol.transport = empty()
764
user.protocol.transport.getPeer = lambda: peer
766
self.failUnless(callable(domain.exists(user)))
768
for peer in dontRelay:
770
user.orig = 'some@place'
771
user.protocol = empty()
772
user.protocol.transport = empty()
773
user.protocol.transport.getPeer = lambda: peer
774
user.dest = 'who@cares'
776
self.assertRaises(smtp.SMTPBadRcpt, domain.exists, user)
778
class RelayerTestCase(unittest.TestCase):
780
self.tmpdir = self.mktemp()
781
os.mkdir(self.tmpdir)
782
self.messageFiles = []
784
name = os.path.join(self.tmpdir, 'body-%d' % (i,))
785
f = file(name + '-H', 'w')
786
pickle.dump(['from-%d' % (i,), 'to-%d' % (i,)], f)
789
f = file(name + '-D', 'w')
792
self.messageFiles.append(name)
794
self.R = mail.relay.RelayerMixin()
795
self.R.loadMessages(self.messageFiles)
798
shutil.rmtree(self.tmpdir)
800
def testMailFrom(self):
802
self.assertEquals(self.R.getMailFrom(), 'from-%d' % (i,))
803
self.R.sentMail(250, None, None, None, None)
804
self.assertEquals(self.R.getMailFrom(), None)
806
def testMailTo(self):
808
self.assertEquals(self.R.getMailTo(), ['to-%d' % (i,)])
809
self.R.sentMail(250, None, None, None, None)
810
self.assertEquals(self.R.getMailTo(), None)
812
def testMailData(self):
814
name = os.path.join(self.tmpdir, 'body-%d' % (i,))
815
self.assertEquals(self.R.getMailData().read(), name)
816
self.R.sentMail(250, None, None, None, None)
817
self.assertEquals(self.R.getMailData(), None)
825
def notifySuccess(self, factory, message):
826
self.success.append((factory, message))
828
def notifyFailure(self, factory, message):
829
self.failure.append((factory, message))
831
def notifyDone(self, factory):
832
self.done.append(factory)
834
class ManagedRelayerTestCase(unittest.TestCase):
836
self.manager = Manager()
837
self.messages = range(0, 20, 2)
838
self.factory = object()
839
self.relay = mail.relaymanager.ManagedRelayerMixin(self.manager)
840
self.relay.messages = self.messages[:]
841
self.relay.names = self.messages[:]
842
self.relay.factory = self.factory
844
def testSuccessfulSentMail(self):
845
for i in self.messages:
846
self.relay.sentMail(250, None, None, None, None)
849
self.manager.success,
850
[(self.factory, m) for m in self.messages]
853
def testFailedSentMail(self):
854
for i in self.messages:
855
self.relay.sentMail(550, None, None, None, None)
858
self.manager.failure,
859
[(self.factory, m) for m in self.messages]
862
def testConnectionLost(self):
863
self.relay.connectionLost(failure.Failure(Exception()))
864
self.assertEquals(self.manager.done, [self.factory])
866
class DirectoryQueueTestCase(unittest.TestCase):
868
# This is almost a test case itself.
869
self.tmpdir = self.mktemp()
870
os.mkdir(self.tmpdir)
871
self.queue = mail.relaymanager.Queue(self.tmpdir)
872
self.queue.noisy = False
874
hdrF, msgF = self.queue.createNewMessage()
875
pickle.dump(['header', m], hdrF)
877
msgF.lineReceived('body: %d' % (m,))
879
self.queue.readDirectory()
882
shutil.rmtree(self.tmpdir)
884
def testWaiting(self):
885
self.failUnless(self.queue.hasWaiting())
886
self.assertEquals(len(self.queue.getWaiting()), 25)
888
waiting = self.queue.getWaiting()
889
self.queue.setRelaying(waiting[0])
890
self.assertEquals(len(self.queue.getWaiting()), 24)
892
self.queue.setWaiting(waiting[0])
893
self.assertEquals(len(self.queue.getWaiting()), 25)
895
def testRelaying(self):
896
for m in self.queue.getWaiting():
897
self.queue.setRelaying(m)
899
len(self.queue.getRelayed()),
900
25 - len(self.queue.getWaiting())
903
self.failIf(self.queue.hasWaiting())
905
relayed = self.queue.getRelayed()
906
self.queue.setWaiting(relayed[0])
907
self.assertEquals(len(self.queue.getWaiting()), 1)
908
self.assertEquals(len(self.queue.getRelayed()), 24)
911
msg = self.queue.getWaiting()[0]
912
self.queue.setRelaying(msg)
915
self.assertEquals(len(self.queue.getWaiting()), 24)
916
self.assertEquals(len(self.queue.getRelayed()), 0)
918
self.failIf(msg in self.queue.getWaiting())
919
self.failIf(msg in self.queue.getRelayed())
921
def testEnvelope(self):
924
for msg in self.queue.getWaiting():
925
envelopes.append(self.queue.getEnvelope(msg))
934
from twisted.names import server
935
from twisted.names import client
936
from twisted.names import common
938
class TestAuthority(common.ResolverBase):
940
common.ResolverBase.__init__(self)
943
def _lookup(self, name, cls, type, timeout = None):
944
if name in self.addresses and type == dns.MX:
946
for a in self.addresses[name]:
948
name, dns.MX, dns.IN, 60, dns.Record_MX(0, a)
951
return defer.succeed((results, [], []))
952
return defer.fail(failure.Failure(dns.DomainError(name)))
955
self.auth = TestAuthority()
956
factory = server.DNSServerFactory([self.auth])
957
protocol = dns.DNSDatagramProtocol(factory)
959
self.port = reactor.listenTCP(0, factory, interface='127.0.0.1')
960
portNumber = self.port.getHost().port
963
self.udpPort = reactor.listenUDP(portNumber, protocol, interface='127.0.0.1')
964
except CannotListenError:
965
self.port.stopListening()
968
self.resolver = client.Resolver(servers=[('127.0.0.1', portNumber)])
971
def tearDownDNS(self):
973
dl.append(defer.maybeDeferred(self.port.stopListening))
974
dl.append(defer.maybeDeferred(self.udpPort.stopListening))
975
if self.resolver.protocol.transport is not None:
976
dl.append(defer.maybeDeferred(self.resolver.protocol.transport.stopListening))
978
self.resolver._parseCall.cancel()
981
return defer.DeferredList(dl)
983
class MXTestCase(unittest.TestCase):
985
Tests for L{mail.relaymanager.MXCalculator}.
989
self.clock = task.Clock()
990
self.mx = mail.relaymanager.MXCalculator(self.resolver, self.clock)
993
return tearDownDNS(self)
996
def test_defaultClock(self):
998
L{MXCalculator}'s default clock is C{twisted.internet.reactor}.
1000
self.assertIdentical(
1001
mail.relaymanager.MXCalculator(self.resolver).clock,
1005
def testSimpleSuccess(self):
1006
self.auth.addresses['test.domain'] = ['the.email.test.domain']
1007
return self.mx.getMX('test.domain').addCallback(self._cbSimpleSuccess)
1009
def _cbSimpleSuccess(self, mx):
1010
self.assertEquals(mx.preference, 0)
1011
self.assertEquals(str(mx.name), 'the.email.test.domain')
1013
def testSimpleFailure(self):
1014
self.mx.fallbackToDomain = False
1015
return self.assertFailure(self.mx.getMX('test.domain'), IOError)
1017
def testSimpleFailureWithFallback(self):
1018
return self.assertFailure(self.mx.getMX('test.domain'), DNSLookupError)
1021
def _exchangeTest(self, domain, records, correctMailExchange):
1023
Issue an MX request for the given domain and arrange for it to be
1024
responded to with the given records. Verify that the resulting mail
1025
exchange is the indicated host.
1027
@type domain: C{str}
1028
@type records: C{list} of L{RRHeader}
1029
@type correctMailExchange: C{str}
1032
class DummyResolver(object):
1033
def lookupMailExchange(self, name):
1035
return defer.succeed((
1039
return defer.fail(DNSNameError(domain))
1041
self.mx.resolver = DummyResolver()
1042
d = self.mx.getMX(domain)
1043
def gotMailExchange(record):
1044
self.assertEqual(str(record.name), correctMailExchange)
1045
d.addCallback(gotMailExchange)
1049
def test_mailExchangePreference(self):
1051
The MX record with the lowest preference is returned by
1052
L{MXCalculator.getMX}.
1054
domain = "example.com"
1055
good = "good.example.com"
1056
bad = "bad.example.com"
1059
RRHeader(name=domain,
1060
type=Record_MX.TYPE,
1061
payload=Record_MX(1, bad)),
1062
RRHeader(name=domain,
1063
type=Record_MX.TYPE,
1064
payload=Record_MX(0, good)),
1065
RRHeader(name=domain,
1066
type=Record_MX.TYPE,
1067
payload=Record_MX(2, bad))]
1068
return self._exchangeTest(domain, records, good)
1071
def test_badExchangeExcluded(self):
1073
L{MXCalculator.getMX} returns the MX record with the lowest preference
1074
which is not also marked as bad.
1076
domain = "example.com"
1077
good = "good.example.com"
1078
bad = "bad.example.com"
1081
RRHeader(name=domain,
1082
type=Record_MX.TYPE,
1083
payload=Record_MX(0, bad)),
1084
RRHeader(name=domain,
1085
type=Record_MX.TYPE,
1086
payload=Record_MX(1, good))]
1087
self.mx.markBad(bad)
1088
return self._exchangeTest(domain, records, good)
1091
def test_fallbackForAllBadExchanges(self):
1093
L{MXCalculator.getMX} returns the MX record with the lowest preference
1094
if all the MX records in the response have been marked bad.
1096
domain = "example.com"
1097
bad = "bad.example.com"
1098
worse = "worse.example.com"
1101
RRHeader(name=domain,
1102
type=Record_MX.TYPE,
1103
payload=Record_MX(0, bad)),
1104
RRHeader(name=domain,
1105
type=Record_MX.TYPE,
1106
payload=Record_MX(1, worse))]
1107
self.mx.markBad(bad)
1108
self.mx.markBad(worse)
1109
return self._exchangeTest(domain, records, bad)
1112
def test_badExchangeExpires(self):
1114
L{MXCalculator.getMX} returns the MX record with the lowest preference
1115
if it was last marked bad longer than L{MXCalculator.timeOutBadMX}
1118
domain = "example.com"
1119
good = "good.example.com"
1120
previouslyBad = "bad.example.com"
1123
RRHeader(name=domain,
1124
type=Record_MX.TYPE,
1125
payload=Record_MX(0, previouslyBad)),
1126
RRHeader(name=domain,
1127
type=Record_MX.TYPE,
1128
payload=Record_MX(1, good))]
1129
self.mx.markBad(previouslyBad)
1130
self.clock.advance(self.mx.timeOutBadMX)
1131
return self._exchangeTest(domain, records, previouslyBad)
1134
def test_goodExchangeUsed(self):
1136
L{MXCalculator.getMX} returns the MX record with the lowest preference
1137
if it was marked good after it was marked bad.
1139
domain = "example.com"
1140
good = "good.example.com"
1141
previouslyBad = "bad.example.com"
1144
RRHeader(name=domain,
1145
type=Record_MX.TYPE,
1146
payload=Record_MX(0, previouslyBad)),
1147
RRHeader(name=domain,
1148
type=Record_MX.TYPE,
1149
payload=Record_MX(1, good))]
1150
self.mx.markBad(previouslyBad)
1151
self.mx.markGood(previouslyBad)
1152
self.clock.advance(self.mx.timeOutBadMX)
1153
return self._exchangeTest(domain, records, previouslyBad)
1156
def test_successWithoutResults(self):
1158
If an MX lookup succeeds but the result set is empty,
1159
L{MXCalculator.getMX} should try to look up an I{A} record for the
1160
requested name and call back its returned Deferred with that
1164
domain = 'example.org'
1166
class DummyResolver(object):
1168
Fake resolver which will respond to an MX lookup with an empty
1171
@ivar mx: A dictionary mapping hostnames to three-tuples of
1172
results to be returned from I{MX} lookups.
1174
@ivar a: A dictionary mapping hostnames to addresses to be
1175
returned from I{A} lookups.
1177
mx = {domain: ([], [], [])}
1180
def lookupMailExchange(self, domain):
1181
return defer.succeed(self.mx[domain])
1183
def getHostByName(self, domain):
1184
return defer.succeed(self.a[domain])
1186
self.mx.resolver = DummyResolver()
1187
d = self.mx.getMX(domain)
1188
d.addCallback(self.assertEqual, Record_MX(name=ip))
1192
def test_failureWithSuccessfulFallback(self):
1194
Test that if the MX record lookup fails, fallback is enabled, and an A
1195
record is available for the name, then the Deferred returned by
1196
L{MXCalculator.getMX} ultimately fires with a Record_MX instance which
1197
gives the address in the A record for the name.
1199
class DummyResolver(object):
1201
Fake resolver which will fail an MX lookup but then succeed a
1204
def lookupMailExchange(self, domain):
1205
return defer.fail(DNSNameError())
1207
def getHostByName(self, domain):
1208
return defer.succeed("1.2.3.4")
1210
self.mx.resolver = DummyResolver()
1211
d = self.mx.getMX("domain")
1212
d.addCallback(self.assertEqual, Record_MX(name="1.2.3.4"))
1216
def test_cnameWithoutGlueRecords(self):
1218
If an MX lookup returns a single CNAME record as a result, MXCalculator
1219
will perform an MX lookup for the canonical name indicated and return
1220
the MX record which results.
1222
alias = "alias.example.com"
1223
canonical = "canonical.example.com"
1224
exchange = "mail.example.com"
1226
class DummyResolver(object):
1228
Fake resolver which will return a CNAME for an MX lookup of a name
1229
which is an alias and an MX for an MX lookup of the canonical name.
1231
def lookupMailExchange(self, domain):
1233
return defer.succeed((
1234
[RRHeader(name=domain,
1235
type=Record_CNAME.TYPE,
1236
payload=Record_CNAME(canonical))],
1238
elif domain == canonical:
1239
return defer.succeed((
1240
[RRHeader(name=domain,
1241
type=Record_MX.TYPE,
1242
payload=Record_MX(0, exchange))],
1245
return defer.fail(DNSNameError(domain))
1247
self.mx.resolver = DummyResolver()
1248
d = self.mx.getMX(alias)
1249
d.addCallback(self.assertEqual, Record_MX(name=exchange))
1253
def test_cnameChain(self):
1255
If L{MXCalculator.getMX} encounters a CNAME chain which is longer than
1256
the length specified, the returned L{Deferred} should errback with
1257
L{CanonicalNameChainTooLong}.
1259
class DummyResolver(object):
1261
Fake resolver which generates a CNAME chain of infinite length in
1262
response to MX lookups.
1266
def lookupMailExchange(self, domain):
1267
self.chainCounter += 1
1268
name = 'x-%d.example.com' % (self.chainCounter,)
1269
return defer.succeed((
1270
[RRHeader(name=domain,
1271
type=Record_CNAME.TYPE,
1272
payload=Record_CNAME(name))],
1276
self.mx.resolver = DummyResolver()
1277
d = self.mx.getMX("mail.example.com", cnameLimit)
1279
d, twisted.mail.relaymanager.CanonicalNameChainTooLong)
1280
def cbChainTooLong(error):
1281
self.assertEqual(error.args[0], Record_CNAME("x-%d.example.com" % (cnameLimit + 1,)))
1282
self.assertEqual(self.mx.resolver.chainCounter, cnameLimit + 1)
1283
d.addCallback(cbChainTooLong)
1287
def test_cnameWithGlueRecords(self):
1289
If an MX lookup returns a CNAME and the MX record for the CNAME, the
1290
L{Deferred} returned by L{MXCalculator.getMX} should be called back
1291
with the name from the MX record without further lookups being
1295
alias = "alias.example.com"
1296
canonical = "canonical.example.com"
1297
exchange = "mail.example.com"
1299
class DummyResolver(object):
1300
def lookupMailExchange(self, domain):
1301
if domain != alias or lookedUp:
1302
# Don't give back any results for anything except the alias
1303
# or on any request after the first.
1305
return defer.succeed((
1306
[RRHeader(name=alias,
1307
type=Record_CNAME.TYPE,
1308
payload=Record_CNAME(canonical)),
1309
RRHeader(name=canonical,
1310
type=Record_MX.TYPE,
1311
payload=Record_MX(name=exchange))],
1314
self.mx.resolver = DummyResolver()
1315
d = self.mx.getMX(alias)
1316
d.addCallback(self.assertEqual, Record_MX(name=exchange))
1320
def test_cnameLoopWithGlueRecords(self):
1322
If an MX lookup returns two CNAME records which point to each other,
1323
the loop should be detected and the L{Deferred} returned by
1324
L{MXCalculator.getMX} should be errbacked with L{CanonicalNameLoop}.
1326
firstAlias = "cname1.example.com"
1327
secondAlias = "cname2.example.com"
1329
class DummyResolver(object):
1330
def lookupMailExchange(self, domain):
1331
return defer.succeed((
1332
[RRHeader(name=firstAlias,
1333
type=Record_CNAME.TYPE,
1334
payload=Record_CNAME(secondAlias)),
1335
RRHeader(name=secondAlias,
1336
type=Record_CNAME.TYPE,
1337
payload=Record_CNAME(firstAlias))],
1340
self.mx.resolver = DummyResolver()
1341
d = self.mx.getMX(firstAlias)
1342
self.assertFailure(d, twisted.mail.relaymanager.CanonicalNameLoop)
1346
def testManyRecords(self):
1347
self.auth.addresses['test.domain'] = [
1348
'mx1.test.domain', 'mx2.test.domain', 'mx3.test.domain'
1350
return self.mx.getMX('test.domain'
1351
).addCallback(self._cbManyRecordsSuccessfulLookup
1354
def _cbManyRecordsSuccessfulLookup(self, mx):
1355
self.failUnless(str(mx.name).split('.', 1)[0] in ('mx1', 'mx2', 'mx3'))
1356
self.mx.markBad(str(mx.name))
1357
return self.mx.getMX('test.domain'
1358
).addCallback(self._cbManyRecordsDifferentResult, mx
1361
def _cbManyRecordsDifferentResult(self, nextMX, mx):
1362
self.assertNotEqual(str(mx.name), str(nextMX.name))
1363
self.mx.markBad(str(nextMX.name))
1365
return self.mx.getMX('test.domain'
1366
).addCallback(self._cbManyRecordsLastResult, mx, nextMX
1369
def _cbManyRecordsLastResult(self, lastMX, mx, nextMX):
1370
self.assertNotEqual(str(mx.name), str(lastMX.name))
1371
self.assertNotEqual(str(nextMX.name), str(lastMX.name))
1373
self.mx.markBad(str(lastMX.name))
1374
self.mx.markGood(str(nextMX.name))
1376
return self.mx.getMX('test.domain'
1377
).addCallback(self._cbManyRecordsRepeatSpecificResult, nextMX
1380
def _cbManyRecordsRepeatSpecificResult(self, againMX, nextMX):
1381
self.assertEqual(str(againMX.name), str(nextMX.name))
1383
class LiveFireExercise(unittest.TestCase):
1384
if interfaces.IReactorUDP(reactor, None) is None:
1385
skip = "UDP support is required to determining MX records"
1390
'domainDir', 'insertionDomain', 'insertionQueue',
1391
'destinationDomain', 'destinationQueue'
1395
for d in self.tmpdirs:
1396
if os.path.exists(d):
1398
return tearDownDNS(self)
1400
def testLocalDelivery(self):
1401
service = mail.mail.MailService()
1402
service.smtpPortal.registerChecker(cred.checkers.AllowAnonymousAccess())
1403
domain = mail.maildir.MaildirDirdbmDomain(service, 'domainDir')
1404
domain.addUser('user', 'password')
1405
service.addDomain('test.domain', domain)
1406
service.portals[''] = service.portals['test.domain']
1407
map(service.portals[''].registerChecker, domain.getCredentialsCheckers())
1409
service.setQueue(mail.relay.DomainQueuer(service))
1410
manager = mail.relaymanager.SmartHostSMTPRelayingManager(service.queue, None)
1411
helper = mail.relaymanager.RelayStateHelper(manager, 1)
1413
f = service.getSMTPFactory()
1415
self.smtpServer = reactor.listenTCP(0, f, interface='127.0.0.1')
1417
client = LineSendingProtocol([
1419
'MAIL FROM: <user@hostname>',
1420
'RCPT TO: <user@test.domain>',
1422
'This is the message',
1428
f = protocol.ClientFactory()
1429
f.protocol = lambda: client
1430
f.clientConnectionLost = lambda *args: done.callback(None)
1431
reactor.connectTCP('127.0.0.1', self.smtpServer.getHost().port, f)
1434
mbox = domain.requestAvatar('user', None, pop3.IMailbox)[1]
1435
msg = mbox.getMessage(0).read()
1436
self.failIfEqual(msg.find('This is the message'), -1)
1438
return self.smtpServer.stopListening()
1439
done.addCallback(finished)
1443
def testRelayDelivery(self):
1444
# Here is the service we will connect to and send mail from
1445
insServ = mail.mail.MailService()
1446
insServ.smtpPortal.registerChecker(cred.checkers.AllowAnonymousAccess())
1447
domain = mail.maildir.MaildirDirdbmDomain(insServ, 'insertionDomain')
1448
insServ.addDomain('insertion.domain', domain)
1449
os.mkdir('insertionQueue')
1450
insServ.setQueue(mail.relaymanager.Queue('insertionQueue'))
1451
insServ.domains.setDefaultDomain(mail.relay.DomainQueuer(insServ))
1452
manager = mail.relaymanager.SmartHostSMTPRelayingManager(insServ.queue)
1453
manager.fArgs += ('test.identity.hostname',)
1454
helper = mail.relaymanager.RelayStateHelper(manager, 1)
1455
# Yoink! Now the internet obeys OUR every whim!
1456
manager.mxcalc = mail.relaymanager.MXCalculator(self.resolver)
1457
# And this is our whim.
1458
self.auth.addresses['destination.domain'] = ['127.0.0.1']
1460
f = insServ.getSMTPFactory()
1461
self.insServer = reactor.listenTCP(0, f, interface='127.0.0.1')
1463
# Here is the service the previous one will connect to for final
1465
destServ = mail.mail.MailService()
1466
destServ.smtpPortal.registerChecker(cred.checkers.AllowAnonymousAccess())
1467
domain = mail.maildir.MaildirDirdbmDomain(destServ, 'destinationDomain')
1468
domain.addUser('user', 'password')
1469
destServ.addDomain('destination.domain', domain)
1470
os.mkdir('destinationQueue')
1471
destServ.setQueue(mail.relaymanager.Queue('destinationQueue'))
1472
manager2 = mail.relaymanager.SmartHostSMTPRelayingManager(destServ.queue)
1473
helper = mail.relaymanager.RelayStateHelper(manager, 1)
1474
helper.startService()
1476
f = destServ.getSMTPFactory()
1477
self.destServer = reactor.listenTCP(0, f, interface='127.0.0.1')
1479
# Update the port number the *first* relay will connect to, because we can't use
1481
manager.PORT = self.destServer.getHost().port
1483
client = LineSendingProtocol([
1485
'MAIL FROM: <user@wherever>',
1486
'RCPT TO: <user@destination.domain>',
1488
'This is the message',
1494
f = protocol.ClientFactory()
1495
f.protocol = lambda: client
1496
f.clientConnectionLost = lambda *args: done.callback(None)
1497
reactor.connectTCP('127.0.0.1', self.insServer.getHost().port, f)
1500
# First part of the delivery is done. Poke the queue manually now
1501
# so we don't have to wait for the queue to be flushed.
1502
delivery = manager.checkState()
1504
mbox = domain.requestAvatar('user', None, pop3.IMailbox)[1]
1505
msg = mbox.getMessage(0).read()
1506
self.failIfEqual(msg.find('This is the message'), -1)
1508
self.insServer.stopListening()
1509
self.destServer.stopListening()
1510
helper.stopService()
1511
delivery.addCallback(delivered)
1513
done.addCallback(finished)
1517
aliasFile = StringIO.StringIO("""\
1520
testuser: address1,address2, address3,
1521
continuation@address, |/bin/process/this
1523
usertwo:thisaddress,thataddress, lastaddress
1524
lastuser: :/includable, /filename, |/program, address
1527
class LineBufferMessage:
1533
def lineReceived(self, line):
1534
self.lines.append(line)
1536
def eomReceived(self):
1538
return defer.succeed('<Whatever>')
1540
def connectionLost(self):
1543
class AliasTestCase(unittest.TestCase):
1548
'After a blank line',
1555
def testHandle(self):
1558
'user: another@host\n',
1559
'nextuser: |/bin/program\n',
1561
'moreusers: :/etc/include/filename\n',
1562
'multiuser: first@host, second@host,last@anotherhost',
1566
mail.alias.handle(result, l, 'TestCase', None)
1568
self.assertEquals(result['user'], ['another@host', 'me@again'])
1569
self.assertEquals(result['nextuser'], ['|/bin/program'])
1570
self.assertEquals(result['moreusers'], [':/etc/include/filename'])
1571
self.assertEquals(result['multiuser'], ['first@host', 'second@host', 'last@anotherhost'])
1573
def testFileLoader(self):
1574
domains = {'': object()}
1575
result = mail.alias.loadAliasFile(domains, fp=aliasFile)
1577
self.assertEquals(len(result), 3)
1579
group = result['testuser']
1581
for a in ('address1', 'address2', 'address3', 'continuation@address', '/bin/process/this'):
1582
self.failIfEqual(s.find(a), -1)
1583
self.assertEquals(len(group), 5)
1585
group = result['usertwo']
1587
for a in ('thisaddress', 'thataddress', 'lastaddress'):
1588
self.failIfEqual(s.find(a), -1)
1589
self.assertEquals(len(group), 3)
1591
group = result['lastuser']
1593
self.failUnlessEqual(s.find('/includable'), -1)
1594
for a in ('/filename', 'program', 'address'):
1595
self.failIfEqual(s.find(a), -1, '%s not found' % a)
1596
self.assertEquals(len(group), 3)
1598
def testMultiWrapper(self):
1599
msgs = LineBufferMessage(), LineBufferMessage(), LineBufferMessage()
1600
msg = mail.alias.MultiWrapper(msgs)
1602
for L in self.lines:
1604
return msg.eomReceived().addCallback(self._cbMultiWrapper, msgs)
1606
def _cbMultiWrapper(self, ignored, msgs):
1608
self.failUnless(m.eom)
1610
self.assertEquals(self.lines, m.lines)
1612
def testFileAlias(self):
1613
tmpfile = self.mktemp()
1614
a = mail.alias.FileAlias(tmpfile, None, None)
1615
m = a.createMessageReceiver()
1617
for l in self.lines:
1619
return m.eomReceived().addCallback(self._cbTestFileAlias, tmpfile)
1621
def _cbTestFileAlias(self, ignored, tmpfile):
1622
lines = file(tmpfile).readlines()
1623
self.assertEquals([L[:-1] for L in lines], self.lines)
1627
class DummyProcess(object):
1628
__slots__ = ['onEnd']
1632
class MockProcessAlias(mail.alias.ProcessAlias):
1634
A alias processor that doesn't actually launch processes.
1637
def spawnProcess(self, proto, program, path):
1639
Don't spawn a process.
1644
class MockAliasGroup(mail.alias.AliasGroup):
1646
An alias group using C{MockProcessAlias}.
1648
processAliasFactory = MockProcessAlias
1652
class StubProcess(object):
1654
Fake implementation of L{IProcessTransport}.
1656
@ivar signals: A list of all the signals which have been sent to this fake
1663
def loseConnection(self):
1665
No-op implementation of disconnection.
1669
def signalProcess(self, signal):
1671
Record a signal sent to this process for later inspection.
1673
self.signals.append(signal)
1677
class ProcessAliasTestCase(unittest.TestCase):
1679
Tests for alias resolution.
1681
if interfaces.IReactorProcess(reactor, None) is None:
1682
skip = "IReactorProcess not supported"
1688
'After a blank line',
1692
def exitStatus(self, code):
1694
Construct a status from the given exit code.
1696
@type code: L{int} between 0 and 255 inclusive.
1697
@param code: The exit status which the code will represent.
1700
@return: A status integer for the given exit code.
1702
# /* Macros for constructing status values. */
1703
# #define __W_EXITCODE(ret, sig) ((ret) << 8 | (sig))
1704
status = (code << 8) | 0
1707
self.assertTrue(os.WIFEXITED(status))
1708
self.assertEqual(os.WEXITSTATUS(status), code)
1709
self.assertFalse(os.WIFSIGNALED(status))
1714
def signalStatus(self, signal):
1716
Construct a status from the given signal.
1718
@type signal: L{int} between 0 and 255 inclusive.
1719
@param signal: The signal number which the status will represent.
1722
@return: A status integer for the given signal.
1724
# /* If WIFSIGNALED(STATUS), the terminating signal. */
1725
# #define __WTERMSIG(status) ((status) & 0x7f)
1726
# /* Nonzero if STATUS indicates termination by a signal. */
1727
# #define __WIFSIGNALED(status) \
1728
# (((signed char) (((status) & 0x7f) + 1) >> 1) > 0)
1732
self.assertTrue(os.WIFSIGNALED(status))
1733
self.assertEqual(os.WTERMSIG(status), signal)
1734
self.assertFalse(os.WIFEXITED(status))
1741
Replace L{smtp.DNSNAME} with a well-known value.
1743
self.DNSNAME = smtp.DNSNAME
1749
Restore the original value of L{smtp.DNSNAME}.
1751
smtp.DNSNAME = self.DNSNAME
1754
def test_processAlias(self):
1756
Standard call to C{mail.alias.ProcessAlias}: check that the specified
1757
script is called, and that the input is correctly transferred to it.
1759
sh = FilePath(self.mktemp())
1762
rm -f process.alias.out
1764
echo $i >> process.alias.out
1766
os.chmod(sh.path, 0700)
1767
a = mail.alias.ProcessAlias(sh.path, None, None)
1768
m = a.createMessageReceiver()
1770
for l in self.lines:
1773
def _cbProcessAlias(ignored):
1774
lines = file('process.alias.out').readlines()
1775
self.assertEquals([L[:-1] for L in lines], self.lines)
1777
return m.eomReceived().addCallback(_cbProcessAlias)
1780
def test_processAliasTimeout(self):
1782
If the alias child process does not exit within a particular period of
1783
time, the L{Deferred} returned by L{MessageWrapper.eomReceived} should
1784
fail with L{ProcessAliasTimeout} and send the I{KILL} signal to the
1787
reactor = task.Clock()
1788
transport = StubProcess()
1789
proto = mail.alias.ProcessAliasProtocol()
1790
proto.makeConnection(transport)
1792
receiver = mail.alias.MessageWrapper(proto, None, reactor)
1793
d = receiver.eomReceived()
1794
reactor.advance(receiver.completionTimeout)
1795
def timedOut(ignored):
1796
self.assertEqual(transport.signals, ['KILL'])
1797
# Now that it has been killed, disconnect the protocol associated
1800
ProcessTerminated(self.signalStatus(signal.SIGKILL)))
1801
self.assertFailure(d, mail.alias.ProcessAliasTimeout)
1802
d.addCallback(timedOut)
1806
def test_earlyProcessTermination(self):
1808
If the process associated with an L{mail.alias.MessageWrapper} exits
1809
before I{eomReceived} is called, the L{Deferred} returned by
1810
I{eomReceived} should fail.
1812
transport = StubProcess()
1813
protocol = mail.alias.ProcessAliasProtocol()
1814
protocol.makeConnection(transport)
1815
receiver = mail.alias.MessageWrapper(protocol, None, None)
1816
protocol.processEnded(failure.Failure(ProcessDone(0)))
1817
return self.assertFailure(receiver.eomReceived(), ProcessDone)
1820
def _terminationTest(self, status):
1822
Verify that if the process associated with an
1823
L{mail.alias.MessageWrapper} exits with the given status, the
1824
L{Deferred} returned by I{eomReceived} fails with L{ProcessTerminated}.
1826
transport = StubProcess()
1827
protocol = mail.alias.ProcessAliasProtocol()
1828
protocol.makeConnection(transport)
1829
receiver = mail.alias.MessageWrapper(protocol, None, None)
1830
protocol.processEnded(
1831
failure.Failure(ProcessTerminated(status)))
1832
return self.assertFailure(receiver.eomReceived(), ProcessTerminated)
1835
def test_errorProcessTermination(self):
1837
If the process associated with an L{mail.alias.MessageWrapper} exits
1838
with a non-zero exit code, the L{Deferred} returned by I{eomReceived}
1841
return self._terminationTest(self.exitStatus(1))
1844
def test_signalProcessTermination(self):
1846
If the process associated with an L{mail.alias.MessageWrapper} exits
1847
because it received a signal, the L{Deferred} returned by
1848
I{eomReceived} should fail.
1850
return self._terminationTest(self.signalStatus(signal.SIGHUP))
1853
def test_aliasResolution(self):
1855
Check that the C{resolve} method of alias processors produce the correct
1857
- direct alias with L{mail.alias.AddressAlias} if a simple input is passed
1858
- aliases in a file with L{mail.alias.FileWrapper} if an input in the format
1860
- aliases resulting of a process call wrapped by L{mail.alias.MessageWrapper}
1861
if the format is '|process'
1864
domain = {'': TestDomain(aliases, ['user1', 'user2', 'user3'])}
1865
A1 = MockAliasGroup(['user1', '|echo', '/file'], domain, 'alias1')
1866
A2 = MockAliasGroup(['user2', 'user3'], domain, 'alias2')
1867
A3 = mail.alias.AddressAlias('alias1', domain, 'alias3')
1874
res1 = A1.resolve(aliases)
1875
r1 = map(str, res1.objs)
1877
expected = map(str, [
1878
mail.alias.AddressAlias('user1', None, None),
1879
mail.alias.MessageWrapper(DummyProcess(), 'echo'),
1880
mail.alias.FileWrapper('/file'),
1883
self.assertEquals(r1, expected)
1885
res2 = A2.resolve(aliases)
1886
r2 = map(str, res2.objs)
1888
expected = map(str, [
1889
mail.alias.AddressAlias('user2', None, None),
1890
mail.alias.AddressAlias('user3', None, None)
1893
self.assertEquals(r2, expected)
1895
res3 = A3.resolve(aliases)
1896
r3 = map(str, res3.objs)
1898
expected = map(str, [
1899
mail.alias.AddressAlias('user1', None, None),
1900
mail.alias.MessageWrapper(DummyProcess(), 'echo'),
1901
mail.alias.FileWrapper('/file'),
1904
self.assertEquals(r3, expected)
1907
def test_cyclicAlias(self):
1909
Check that a cycle in alias resolution is correctly handled.
1912
domain = {'': TestDomain(aliases, [])}
1913
A1 = mail.alias.AddressAlias('alias2', domain, 'alias1')
1914
A2 = mail.alias.AddressAlias('alias3', domain, 'alias2')
1915
A3 = mail.alias.AddressAlias('alias1', domain, 'alias3')
1922
self.assertEquals(aliases['alias1'].resolve(aliases), None)
1923
self.assertEquals(aliases['alias2'].resolve(aliases), None)
1924
self.assertEquals(aliases['alias3'].resolve(aliases), None)
1926
A4 = MockAliasGroup(['|echo', 'alias1'], domain, 'alias4')
1927
aliases['alias4'] = A4
1929
res = A4.resolve(aliases)
1930
r = map(str, res.objs)
1932
expected = map(str, [
1933
mail.alias.MessageWrapper(DummyProcess(), 'echo')
1936
self.assertEquals(r, expected)
1944
def __init__(self, aliases, users):
1945
self.aliases = aliases
1948
def exists(self, user, memo=None):
1949
user = user.dest.local
1950
if user in self.users:
1951
return lambda: mail.alias.AddressAlias(user, None, None)
1953
a = self.aliases[user]
1955
raise smtp.SMTPBadRcpt(user)
1957
aliases = a.resolve(self.aliases, memo)
1959
return lambda: aliases
1960
raise smtp.SMTPBadRcpt(user)
1963
from twisted.python.runtime import platformType
1965
if platformType != "posix":
1966
for o in locals().values():
1967
if isinstance(o, (types.ClassType, type)) and issubclass(o, unittest.TestCase):
1968
o.skip = "twisted.mail only works on posix"