2
# Copyright (c) 2001-2004 Twisted Matrix Laboratories.
3
# See LICENSE for details.
15
from twisted.trial import unittest
18
from zope.interface import providedBy, Interface, implements
20
from twisted.trial import unittest
21
from twisted.mail import smtp
22
from twisted.mail import pop3
23
from twisted.names import dns
24
from twisted.protocols import basic
25
from twisted.internet import protocol
26
from twisted.internet import defer
27
from twisted.internet.defer import Deferred
28
from twisted.internet import reactor
29
from twisted.internet import interfaces
30
from twisted.internet.error import DNSLookupError, CannotListenError
31
from twisted.internet import address
32
from twisted.python import failure
33
from twisted.python import util
35
from twisted import mail
36
import twisted.mail.mail
37
import twisted.mail.maildir
38
import twisted.mail.relay
39
import twisted.mail.relaymanager
40
import twisted.mail.protocols
41
import twisted.mail.alias
43
from twisted.names.error import DNSNameError
44
from twisted.names.dns import Record_MX
46
from twisted import cred
47
import twisted.cred.credentials
48
import twisted.cred.checkers
49
import twisted.cred.portal
51
# Since we run a couple processes, we need SignalMixin from test_process
52
from twisted.test import test_process
54
from twisted.test.proto_helpers import LineSendingProtocol
56
class DomainWithDefaultsTestCase(unittest.TestCase):
57
def testMethods(self):
58
d = dict([(x, x + 10) for x in range(10)])
59
d = mail.mail.DomainWithDefaultDict(d, 'Default')
61
self.assertEquals(len(d), 10)
62
self.assertEquals(list(iter(d)), range(10))
63
self.assertEquals(list(d.iterkeys()), list(iter(d)))
65
items = list(d.iteritems())
67
self.assertEquals(items, [(x, x + 10) for x in range(10)])
69
values = list(d.itervalues())
71
self.assertEquals(values, range(10, 20))
75
self.assertEquals(items, [(x, x + 10) for x in range(10)])
79
self.assertEquals(values, range(10, 20))
82
self.assertEquals(d[x], x + 10)
83
self.assertEquals(d.get(x), x + 10)
84
self.failUnless(x in d)
85
self.failUnless(d.has_key(x))
89
self.assertEquals(len(d), 7)
90
self.assertEquals(d[2], 'Default')
91
self.assertEquals(d[4], 'Default')
92
self.assertEquals(d[6], 'Default')
94
d.update({'a': None, 'b': (), 'c': '*'})
95
self.assertEquals(len(d), 10)
96
self.assertEquals(d['a'], None)
97
self.assertEquals(d['b'], ())
98
self.assertEquals(d['c'], '*')
101
self.assertEquals(len(d), 0)
103
self.assertEquals(d.setdefault('key', 'value'), 'value')
104
self.assertEquals(d['key'], 'value')
106
self.assertEquals(d.popitem(), ('key', 'value'))
107
self.assertEquals(len(d), 0)
109
class BounceTestCase(unittest.TestCase):
111
self.domain = mail.mail.BounceDomain()
113
def testExists(self):
114
self.assertRaises(smtp.AddressError, self.domain.exists, "any user")
118
self.domain.willRelay("random q emailer", "protocol"),
122
def testMessage(self):
123
self.assertRaises(AssertionError, self.domain.startMessage, "whomever")
125
def testAddUser(self):
126
self.domain.addUser("bob", "password")
127
self.assertRaises(smtp.SMTPBadRcpt, self.domain.exists, "bob")
129
class FileMessageTestCase(unittest.TestCase):
131
self.name = "fileMessage.testFile"
132
self.final = "final.fileMessage.testFile"
133
self.f = file(self.name, 'w')
134
self.fp = mail.mail.FileMessage(self.f, self.name, self.final)
146
os.remove(self.final)
150
def testFinalName(self):
151
return self.fp.eomReceived().addCallback(self._cbFinalName)
153
def _cbFinalName(self, result):
154
self.assertEquals(result, self.final)
155
self.failUnless(self.f.closed)
156
self.failIf(os.path.exists(self.name))
158
def testContents(self):
159
contents = "first line\nsecond line\nthird line\n"
160
for line in contents.splitlines():
161
self.fp.lineReceived(line)
162
self.fp.eomReceived()
163
self.assertEquals(file(self.final).read(), contents)
165
def testInterrupted(self):
166
contents = "first line\nsecond line\n"
167
for line in contents.splitlines():
168
self.fp.lineReceived(line)
169
self.fp.connectionLost()
170
self.failIf(os.path.exists(self.name))
171
self.failIf(os.path.exists(self.final))
173
class MailServiceTestCase(unittest.TestCase):
175
self.service = mail.mail.MailService()
177
def testFactories(self):
178
f = self.service.getPOP3Factory()
179
self.failUnless(isinstance(f, protocol.ServerFactory))
180
self.failUnless(f.buildProtocol(('127.0.0.1', 12345)), pop3.POP3)
182
f = self.service.getSMTPFactory()
183
self.failUnless(isinstance(f, protocol.ServerFactory))
184
self.failUnless(f.buildProtocol(('127.0.0.1', 12345)), smtp.SMTP)
186
f = self.service.getESMTPFactory()
187
self.failUnless(isinstance(f, protocol.ServerFactory))
188
self.failUnless(f.buildProtocol(('127.0.0.1', 12345)), smtp.ESMTP)
190
def testPortals(self):
193
self.service.portals['domain'] = o1
194
self.service.portals[''] = o2
196
self.failUnless(self.service.lookupPortal('domain') is o1)
197
self.failUnless(self.service.defaultPortal() is o2)
199
class FailingMaildirMailboxAppendMessageTask(mail.maildir._MaildirMailboxAppendMessageTask):
203
def osopen(self, fn, attr, mode):
205
return os.open(fn, attr, mode)
207
raise OSError(errno.EPERM, "Faked Permission Problem")
208
def oswrite(self, fh, data):
210
return os.write(fh, data)
212
raise OSError(errno.ENOSPC, "Faked Space problem")
213
def osrename(self, oldname, newname):
214
if self._renamestate:
215
return os.rename(oldname, newname)
217
raise OSError(errno.EPERM, "Faked Permission Problem")
219
class MaildirAppendStringTestCase(unittest.TestCase):
221
self.d = self.mktemp()
222
mail.maildir.initializeMaildir(self.d)
225
shutil.rmtree(self.d)
227
def _append(self, ignored, mbox):
228
d = mbox.appendMessage('TEST')
229
return self.assertFailure(d, Exception)
231
def _setState(self, ignored, mbox, rename=None, write=None, open=None):
232
if rename is not None:
233
mbox.AppendFactory._renameState = rename
234
if write is not None:
235
mbox.AppendFactory._writeState = write
237
mbox.AppendFactory._openstate = open
239
def testAppend(self):
240
mbox = mail.maildir.MaildirMailbox(self.d)
241
mbox.AppendFactory = FailingMaildirMailboxAppendMessageTask
243
for i in xrange(1, 11):
244
ds.append(mbox.appendMessage("X" * i))
245
ds[-1].addCallback(self.assertEqual, None)
246
d = defer.gatherResults(ds)
247
d.addCallback(self._cbTestAppend, mbox)
250
def _cbTestAppend(self, result, mbox):
251
self.assertEquals(len(mbox.listMessages()),
253
self.assertEquals(len(mbox.getMessage(5).read()), 6)
254
# test in the right order: last to first error location.
255
mbox.AppendFactory._renamestate = False
256
d = self._append(None, mbox)
257
d.addCallback(self._setState, mbox, rename=True, write=False)
258
d.addCallback(self._append, mbox)
259
d.addCallback(self._setState, mbox, write=True, open=False)
260
d.addCallback(self._append, mbox)
261
d.addCallback(self._setState, mbox, open=True)
265
class MaildirAppendFileTestCase(unittest.TestCase):
267
self.d = self.mktemp()
268
mail.maildir.initializeMaildir(self.d)
271
shutil.rmtree(self.d)
273
def testAppend(self):
274
mbox = mail.maildir.MaildirMailbox(self.d)
278
self.assertEqual(res, None)
279
for i in xrange(1, 11):
280
temp = tempfile.TemporaryFile()
283
ds.append(mbox.appendMessage(temp))
284
ds[-1].addCallback(_check, temp)
285
return defer.gatherResults(ds).addCallback(self._cbTestAppend, mbox)
287
def _cbTestAppend(self, result, mbox):
288
self.assertEquals(len(mbox.listMessages()),
290
self.assertEquals(len(mbox.getMessage(5).read()), 6)
293
class MaildirTestCase(unittest.TestCase):
295
self.d = self.mktemp()
296
mail.maildir.initializeMaildir(self.d)
299
shutil.rmtree(self.d)
301
def testInitializer(self):
303
trash = os.path.join(d, '.Trash')
305
self.failUnless(os.path.exists(d) and os.path.isdir(d))
306
self.failUnless(os.path.exists(os.path.join(d, 'new')))
307
self.failUnless(os.path.exists(os.path.join(d, 'cur')))
308
self.failUnless(os.path.exists(os.path.join(d, 'tmp')))
309
self.failUnless(os.path.isdir(os.path.join(d, 'new')))
310
self.failUnless(os.path.isdir(os.path.join(d, 'cur')))
311
self.failUnless(os.path.isdir(os.path.join(d, 'tmp')))
313
self.failUnless(os.path.exists(os.path.join(trash, 'new')))
314
self.failUnless(os.path.exists(os.path.join(trash, 'cur')))
315
self.failUnless(os.path.exists(os.path.join(trash, 'tmp')))
316
self.failUnless(os.path.isdir(os.path.join(trash, 'new')))
317
self.failUnless(os.path.isdir(os.path.join(trash, 'cur')))
318
self.failUnless(os.path.isdir(os.path.join(trash, 'tmp')))
320
def testMailbox(self):
322
n = mail.maildir._generateMaildirName
323
msgs = [j(b, n()) for b in ('cur', 'new') for x in range(5)]
325
# Toss a few files into the mailbox
328
f = file(j(self.d, f), 'w')
333
mb = mail.maildir.MaildirMailbox(self.d)
334
self.assertEquals(mb.listMessages(), range(1, 11))
335
self.assertEquals(mb.listMessages(1), 2)
336
self.assertEquals(mb.listMessages(5), 6)
338
self.assertEquals(mb.getMessage(6).read(), 'x' * 7)
339
self.assertEquals(mb.getMessage(1).read(), 'x' * 2)
347
p, f = os.path.split(msgs[5])
350
self.assertEquals(mb.listMessages(5), 0)
351
self.failUnless(os.path.exists(j(self.d, '.Trash', 'cur', f)))
352
self.failIf(os.path.exists(j(self.d, msgs[5])))
354
mb.undeleteMessages()
355
self.assertEquals(mb.listMessages(5), 6)
356
self.failIf(os.path.exists(j(self.d, '.Trash', 'cur', f)))
357
self.failUnless(os.path.exists(j(self.d, msgs[5])))
359
class MaildirDirdbmDomainTestCase(unittest.TestCase):
361
self.P = self.mktemp()
362
self.S = mail.mail.MailService()
363
self.D = mail.maildir.MaildirDirdbmDomain(self.S, self.P)
366
shutil.rmtree(self.P)
368
def testAddUser(self):
369
toAdd = (('user1', 'pwd1'), ('user2', 'pwd2'), ('user3', 'pwd3'))
374
self.failUnless(u in self.D.dbm)
375
self.assertEquals(self.D.dbm[u], p)
376
self.failUnless(os.path.exists(os.path.join(self.P, u)))
378
def testCredentials(self):
379
creds = self.D.getCredentialsCheckers()
381
self.assertEquals(len(creds), 1)
382
self.failUnless(cred.checkers.ICredentialsChecker.providedBy(creds[0]))
383
self.failUnless(cred.credentials.IUsernamePassword in creds[0].credentialInterfaces)
385
def testRequestAvatar(self):
386
class ISomething(Interface):
389
self.D.addUser('user', 'password')
392
self.D.requestAvatar, 'user', None, ISomething
395
t = self.D.requestAvatar('user', None, pop3.IMailbox)
396
self.assertEquals(len(t), 3)
397
self.failUnless(t[0] is pop3.IMailbox)
398
self.failUnless(pop3.IMailbox.providedBy(t[1]))
402
def testRequestAvatarId(self):
403
self.D.addUser('user', 'password')
404
database = self.D.getCredentialsCheckers()[0]
406
creds = cred.credentials.UsernamePassword('user', 'wrong password')
408
cred.error.UnauthorizedLogin,
409
database.requestAvatarId, creds
412
creds = cred.credentials.UsernamePassword('user', 'password')
413
self.assertEquals(database.requestAvatarId(creds), 'user')
416
class StubAliasableDomain(object):
418
Minimal testable implementation of IAliasableDomain.
420
implements(mail.mail.IAliasableDomain)
422
def exists(self, user):
424
No test coverage for invocations of this method on domain objects,
425
so we just won't implement it.
427
raise NotImplementedError()
430
def addUser(self, user, password):
432
No test coverage for invocations of this method on domain objects,
433
so we just won't implement it.
435
raise NotImplementedError()
438
def getCredentialsCheckers(self):
440
This needs to succeed in order for other tests to complete
441
successfully, but we don't actually assert anything about its
442
behavior. Return an empty list. Sometime later we should return
443
something else and assert that a portal got set up properly.
448
def setAliasGroup(self, aliases):
450
Just record the value so the test can check it later.
452
self.aliasGroup = aliases
455
class ServiceDomainTestCase(unittest.TestCase):
457
self.S = mail.mail.MailService()
458
self.D = mail.protocols.DomainDeliveryBase(self.S, None)
459
self.D.service = self.S
460
self.D.protocolName = 'TEST'
461
self.D.host = 'hostname'
463
self.tmpdir = self.mktemp()
464
domain = mail.maildir.MaildirDirdbmDomain(self.S, self.tmpdir)
465
domain.addUser('user', 'password')
466
self.S.addDomain('test.domain', domain)
469
shutil.rmtree(self.tmpdir)
472
def testAddAliasableDomain(self):
474
Test that adding an IAliasableDomain to a mail service properly sets
475
up alias group references and such.
478
domain = StubAliasableDomain()
479
self.S.aliases = aliases
480
self.S.addDomain('example.com', domain)
481
self.assertIdentical(domain.aliasGroup, aliases)
484
def testReceivedHeader(self):
485
hdr = self.D.receivedHeader(
486
('remotehost', '123.232.101.234'),
487
smtp.Address('<someguy@somplace>'),
490
fp = StringIO.StringIO(hdr)
491
m = rfc822.Message(fp)
492
self.assertEquals(len(m.items()), 1)
493
self.failUnless(m.has_key('Received'))
495
def testValidateTo(self):
496
user = smtp.User('user@test.domain', 'helo', None, 'wherever@whatever')
497
return defer.maybeDeferred(self.D.validateTo, user
498
).addCallback(self._cbValidateTo
501
def _cbValidateTo(self, result):
502
self.failUnless(callable(result))
504
def testValidateToBadUsername(self):
505
user = smtp.User('resu@test.domain', 'helo', None, 'wherever@whatever')
506
return self.assertFailure(
507
defer.maybeDeferred(self.D.validateTo, user),
510
def testValidateToBadDomain(self):
511
user = smtp.User('user@domain.test', 'helo', None, 'wherever@whatever')
512
return self.assertFailure(
513
defer.maybeDeferred(self.D.validateTo, user),
516
def testValidateFrom(self):
517
helo = ('hostname', '127.0.0.1')
518
origin = smtp.Address('<user@hostname>')
519
self.failUnless(self.D.validateFrom(helo, origin) is origin)
521
helo = ('hostname', '1.2.3.4')
522
origin = smtp.Address('<user@hostname>')
523
self.failUnless(self.D.validateFrom(helo, origin) is origin)
525
helo = ('hostname', '1.2.3.4')
526
origin = smtp.Address('<>')
527
self.failUnless(self.D.validateFrom(helo, origin) is origin)
531
self.D.validateFrom, None, origin
534
class VirtualPOP3TestCase(unittest.TestCase):
536
self.tmpdir = self.mktemp()
537
self.S = mail.mail.MailService()
538
self.D = mail.maildir.MaildirDirdbmDomain(self.S, self.tmpdir)
539
self.D.addUser('user', 'password')
540
self.S.addDomain('test.domain', self.D)
542
portal = cred.portal.Portal(self.D)
543
map(portal.registerChecker, self.D.getCredentialsCheckers())
544
self.S.portals[''] = self.S.portals['test.domain'] = portal
546
self.P = mail.protocols.VirtualPOP3()
547
self.P.service = self.S
548
self.P.magic = '<unit test magic>'
551
shutil.rmtree(self.tmpdir)
553
def testAuthenticateAPOP(self):
554
resp = md5.new(self.P.magic + 'password').hexdigest()
555
return self.P.authenticateUserAPOP('user', resp
556
).addCallback(self._cbAuthenticateAPOP
559
def _cbAuthenticateAPOP(self, result):
560
self.assertEquals(len(result), 3)
561
self.assertEquals(result[0], pop3.IMailbox)
562
self.failUnless(pop3.IMailbox.providedBy(result[1]))
565
def testAuthenticateIncorrectUserAPOP(self):
566
resp = md5.new(self.P.magic + 'password').hexdigest()
567
return self.assertFailure(
568
self.P.authenticateUserAPOP('resu', resp),
569
cred.error.UnauthorizedLogin)
571
def testAuthenticateIncorrectResponseAPOP(self):
572
resp = md5.new('wrong digest').hexdigest()
573
return self.assertFailure(
574
self.P.authenticateUserAPOP('user', resp),
575
cred.error.UnauthorizedLogin)
577
def testAuthenticatePASS(self):
578
return self.P.authenticateUserPASS('user', 'password'
579
).addCallback(self._cbAuthenticatePASS
582
def _cbAuthenticatePASS(self, result):
583
self.assertEquals(len(result), 3)
584
self.assertEquals(result[0], pop3.IMailbox)
585
self.failUnless(pop3.IMailbox.providedBy(result[1]))
588
def testAuthenticateBadUserPASS(self):
589
return self.assertFailure(
590
self.P.authenticateUserPASS('resu', 'password'),
591
cred.error.UnauthorizedLogin)
593
def testAuthenticateBadPasswordPASS(self):
594
return self.assertFailure(
595
self.P.authenticateUserPASS('user', 'wrong password'),
596
cred.error.UnauthorizedLogin)
598
class empty(smtp.User):
602
class RelayTestCase(unittest.TestCase):
603
def testExists(self):
604
service = mail.mail.MailService()
605
domain = mail.relay.DomainQueuer(service)
608
address.UNIXAddress('/var/run/mail-relay'),
609
address.IPv4Address('TCP', '127.0.0.1', 12345),
613
address.IPv4Address('TCP', '192.168.2.1', 62),
614
address.IPv4Address('TCP', '1.2.3.4', 1943),
619
user.orig = 'user@host'
620
user.dest = 'tsoh@resu'
621
user.protocol = empty()
622
user.protocol.transport = empty()
623
user.protocol.transport.getPeer = lambda: peer
625
self.failUnless(callable(domain.exists(user)))
627
for peer in dontRelay:
629
user.orig = 'some@place'
630
user.protocol = empty()
631
user.protocol.transport = empty()
632
user.protocol.transport.getPeer = lambda: peer
633
user.dest = 'who@cares'
635
self.assertRaises(smtp.SMTPBadRcpt, domain.exists, user)
637
class RelayerTestCase(unittest.TestCase):
639
self.tmpdir = self.mktemp()
640
os.mkdir(self.tmpdir)
641
self.messageFiles = []
643
name = os.path.join(self.tmpdir, 'body-%d' % (i,))
644
f = file(name + '-H', 'w')
645
pickle.dump(['from-%d' % (i,), 'to-%d' % (i,)], f)
648
f = file(name + '-D', 'w')
651
self.messageFiles.append(name)
653
self.R = mail.relay.RelayerMixin()
654
self.R.loadMessages(self.messageFiles)
657
shutil.rmtree(self.tmpdir)
659
def testMailFrom(self):
661
self.assertEquals(self.R.getMailFrom(), 'from-%d' % (i,))
662
self.R.sentMail(250, None, None, None, None)
663
self.assertEquals(self.R.getMailFrom(), None)
665
def testMailTo(self):
667
self.assertEquals(self.R.getMailTo(), ['to-%d' % (i,)])
668
self.R.sentMail(250, None, None, None, None)
669
self.assertEquals(self.R.getMailTo(), None)
671
def testMailData(self):
673
name = os.path.join(self.tmpdir, 'body-%d' % (i,))
674
self.assertEquals(self.R.getMailData().read(), name)
675
self.R.sentMail(250, None, None, None, None)
676
self.assertEquals(self.R.getMailData(), None)
684
def notifySuccess(self, factory, message):
685
self.success.append((factory, message))
687
def notifyFailure(self, factory, message):
688
self.failure.append((factory, message))
690
def notifyDone(self, factory):
691
self.done.append(factory)
693
class ManagedRelayerTestCase(unittest.TestCase):
695
self.manager = Manager()
696
self.messages = range(0, 20, 2)
697
self.factory = object()
698
self.relay = mail.relaymanager.ManagedRelayerMixin(self.manager)
699
self.relay.messages = self.messages[:]
700
self.relay.names = self.messages[:]
701
self.relay.factory = self.factory
703
def testSuccessfulSentMail(self):
704
for i in self.messages:
705
self.relay.sentMail(250, None, None, None, None)
708
self.manager.success,
709
[(self.factory, m) for m in self.messages]
712
def testFailedSentMail(self):
713
for i in self.messages:
714
self.relay.sentMail(550, None, None, None, None)
717
self.manager.failure,
718
[(self.factory, m) for m in self.messages]
721
def testConnectionLost(self):
722
self.relay.connectionLost(failure.Failure(Exception()))
723
self.assertEquals(self.manager.done, [self.factory])
725
class DirectoryQueueTestCase(unittest.TestCase):
727
# This is almost a test case itself.
728
self.tmpdir = self.mktemp()
729
os.mkdir(self.tmpdir)
730
self.queue = mail.relaymanager.Queue(self.tmpdir)
731
self.queue.noisy = False
733
hdrF, msgF = self.queue.createNewMessage()
734
pickle.dump(['header', m], hdrF)
736
msgF.lineReceived('body: %d' % (m,))
738
self.queue.readDirectory()
741
shutil.rmtree(self.tmpdir)
743
def testWaiting(self):
744
self.failUnless(self.queue.hasWaiting())
745
self.assertEquals(len(self.queue.getWaiting()), 25)
747
waiting = self.queue.getWaiting()
748
self.queue.setRelaying(waiting[0])
749
self.assertEquals(len(self.queue.getWaiting()), 24)
751
self.queue.setWaiting(waiting[0])
752
self.assertEquals(len(self.queue.getWaiting()), 25)
754
def testRelaying(self):
755
for m in self.queue.getWaiting():
756
self.queue.setRelaying(m)
758
len(self.queue.getRelayed()),
759
25 - len(self.queue.getWaiting())
762
self.failIf(self.queue.hasWaiting())
764
relayed = self.queue.getRelayed()
765
self.queue.setWaiting(relayed[0])
766
self.assertEquals(len(self.queue.getWaiting()), 1)
767
self.assertEquals(len(self.queue.getRelayed()), 24)
770
msg = self.queue.getWaiting()[0]
771
self.queue.setRelaying(msg)
774
self.assertEquals(len(self.queue.getWaiting()), 24)
775
self.assertEquals(len(self.queue.getRelayed()), 0)
777
self.failIf(msg in self.queue.getWaiting())
778
self.failIf(msg in self.queue.getRelayed())
780
def testEnvelope(self):
783
for msg in self.queue.getWaiting():
784
envelopes.append(self.queue.getEnvelope(msg))
793
from twisted.names import server
794
from twisted.names import client
795
from twisted.names import common
797
class TestAuthority(common.ResolverBase):
799
common.ResolverBase.__init__(self)
802
def _lookup(self, name, cls, type, timeout = None):
803
if name in self.addresses and type == dns.MX:
805
for a in self.addresses[name]:
807
name, dns.MX, dns.IN, 60, dns.Record_MX(0, a)
810
return defer.succeed((results, [], []))
811
return defer.fail(failure.Failure(dns.DomainError(name)))
814
self.auth = TestAuthority()
815
factory = server.DNSServerFactory([self.auth])
816
protocol = dns.DNSDatagramProtocol(factory)
818
self.port = reactor.listenTCP(0, factory, interface='127.0.0.1')
819
portNumber = self.port.getHost().port
822
self.udpPort = reactor.listenUDP(portNumber, protocol, interface='127.0.0.1')
823
except CannotListenError:
824
self.port.stopListening()
827
self.resolver = client.Resolver(servers=[('127.0.0.1', portNumber)])
830
def tearDownDNS(self):
832
dl.append(defer.maybeDeferred(self.port.stopListening))
833
dl.append(defer.maybeDeferred(self.udpPort.stopListening))
834
if self.resolver.protocol.transport is not None:
835
dl.append(defer.maybeDeferred(self.resolver.protocol.transport.stopListening))
837
self.resolver._parseCall.cancel()
840
return defer.DeferredList(dl)
842
class MXTestCase(unittest.TestCase):
845
self.mx = mail.relaymanager.MXCalculator(self.resolver)
848
return tearDownDNS(self)
850
def testSimpleSuccess(self):
851
self.auth.addresses['test.domain'] = ['the.email.test.domain']
852
return self.mx.getMX('test.domain').addCallback(self._cbSimpleSuccess)
854
def _cbSimpleSuccess(self, mx):
855
self.assertEquals(mx.preference, 0)
856
self.assertEquals(str(mx.name), 'the.email.test.domain')
858
def testSimpleFailure(self):
859
self.mx.fallbackToDomain = False
860
return self.assertFailure(self.mx.getMX('test.domain'), IOError)
862
def testSimpleFailureWithFallback(self):
863
return self.assertFailure(self.mx.getMX('test.domain'), DNSLookupError)
866
def test_failureWithSuccessfulFallback(self):
868
Test that if the MX record lookup fails, fallback is enabled, and an A
869
record is available for the name, then the Deferred returned by
870
L{MXCalculator.getMX} ultimately fires with a Record_MX instance which
871
gives the address in the A record for the name.
873
class DummyResolver(object):
875
Fake resolver which will fail an MX lookup but then succeed a
878
def lookupMailExchange(self, domain):
879
return defer.fail(DNSNameError())
881
def getHostByName(self, domain):
882
return defer.succeed("1.2.3.4")
884
self.mx.resolver = DummyResolver()
885
d = self.mx.getMX("domain")
886
d.addCallback(self.assertEqual, Record_MX(name="1.2.3.4"))
890
def testManyRecords(self):
891
self.auth.addresses['test.domain'] = [
892
'mx1.test.domain', 'mx2.test.domain', 'mx3.test.domain'
894
return self.mx.getMX('test.domain'
895
).addCallback(self._cbManyRecordsSuccessfulLookup
898
def _cbManyRecordsSuccessfulLookup(self, mx):
899
self.failUnless(str(mx.name).split('.', 1)[0] in ('mx1', 'mx2', 'mx3'))
900
self.mx.markBad(str(mx.name))
901
return self.mx.getMX('test.domain'
902
).addCallback(self._cbManyRecordsDifferentResult, mx
905
def _cbManyRecordsDifferentResult(self, nextMX, mx):
906
self.assertNotEqual(str(mx.name), str(nextMX.name))
907
self.mx.markBad(str(nextMX.name))
909
return self.mx.getMX('test.domain'
910
).addCallback(self._cbManyRecordsLastResult, mx, nextMX
913
def _cbManyRecordsLastResult(self, lastMX, mx, nextMX):
914
self.assertNotEqual(str(mx.name), str(lastMX.name))
915
self.assertNotEqual(str(nextMX.name), str(lastMX.name))
917
self.mx.markBad(str(lastMX.name))
918
self.mx.markGood(str(nextMX.name))
920
return self.mx.getMX('test.domain'
921
).addCallback(self._cbManyRecordsRepeatSpecificResult, nextMX
924
def _cbManyRecordsRepeatSpecificResult(self, againMX, nextMX):
925
self.assertEqual(str(againMX.name), str(nextMX.name))
927
class LiveFireExercise(unittest.TestCase):
928
if interfaces.IReactorUDP(reactor, None) is None:
929
skip = "UDP support is required to determining MX records"
934
'domainDir', 'insertionDomain', 'insertionQueue',
935
'destinationDomain', 'destinationQueue'
939
for d in self.tmpdirs:
940
if os.path.exists(d):
942
return tearDownDNS(self)
944
def testLocalDelivery(self):
945
service = mail.mail.MailService()
946
service.smtpPortal.registerChecker(cred.checkers.AllowAnonymousAccess())
947
domain = mail.maildir.MaildirDirdbmDomain(service, 'domainDir')
948
domain.addUser('user', 'password')
949
service.addDomain('test.domain', domain)
950
service.portals[''] = service.portals['test.domain']
951
map(service.portals[''].registerChecker, domain.getCredentialsCheckers())
953
service.setQueue(mail.relay.DomainQueuer(service))
954
manager = mail.relaymanager.SmartHostSMTPRelayingManager(service.queue, None)
955
helper = mail.relaymanager.RelayStateHelper(manager, 1)
957
f = service.getSMTPFactory()
959
self.smtpServer = reactor.listenTCP(0, f, interface='127.0.0.1')
961
client = LineSendingProtocol([
963
'MAIL FROM: <user@hostname>',
964
'RCPT TO: <user@test.domain>',
966
'This is the message',
972
f = protocol.ClientFactory()
973
f.protocol = lambda: client
974
f.clientConnectionLost = lambda *args: done.callback(None)
975
reactor.connectTCP('127.0.0.1', self.smtpServer.getHost().port, f)
978
mbox = domain.requestAvatar('user', None, pop3.IMailbox)[1]
979
msg = mbox.getMessage(0).read()
980
self.failIfEqual(msg.find('This is the message'), -1)
982
return self.smtpServer.stopListening()
983
done.addCallback(finished)
987
def testRelayDelivery(self):
988
# Here is the service we will connect to and send mail from
989
insServ = mail.mail.MailService()
990
insServ.smtpPortal.registerChecker(cred.checkers.AllowAnonymousAccess())
991
domain = mail.maildir.MaildirDirdbmDomain(insServ, 'insertionDomain')
992
insServ.addDomain('insertion.domain', domain)
993
os.mkdir('insertionQueue')
994
insServ.setQueue(mail.relaymanager.Queue('insertionQueue'))
995
insServ.domains.setDefaultDomain(mail.relay.DomainQueuer(insServ))
996
manager = mail.relaymanager.SmartHostSMTPRelayingManager(insServ.queue)
997
manager.fArgs += ('test.identity.hostname',)
998
helper = mail.relaymanager.RelayStateHelper(manager, 1)
999
# Yoink! Now the internet obeys OUR every whim!
1000
manager.mxcalc = mail.relaymanager.MXCalculator(self.resolver)
1001
# And this is our whim.
1002
self.auth.addresses['destination.domain'] = ['127.0.0.1']
1004
f = insServ.getSMTPFactory()
1005
self.insServer = reactor.listenTCP(0, f, interface='127.0.0.1')
1007
# Here is the service the previous one will connect to for final
1009
destServ = mail.mail.MailService()
1010
destServ.smtpPortal.registerChecker(cred.checkers.AllowAnonymousAccess())
1011
domain = mail.maildir.MaildirDirdbmDomain(destServ, 'destinationDomain')
1012
domain.addUser('user', 'password')
1013
destServ.addDomain('destination.domain', domain)
1014
os.mkdir('destinationQueue')
1015
destServ.setQueue(mail.relaymanager.Queue('destinationQueue'))
1016
manager2 = mail.relaymanager.SmartHostSMTPRelayingManager(destServ.queue)
1017
helper = mail.relaymanager.RelayStateHelper(manager, 1)
1018
helper.startService()
1020
f = destServ.getSMTPFactory()
1021
self.destServer = reactor.listenTCP(0, f, interface='127.0.0.1')
1023
# Update the port number the *first* relay will connect to, because we can't use
1025
manager.PORT = self.destServer.getHost().port
1027
client = LineSendingProtocol([
1029
'MAIL FROM: <user@wherever>',
1030
'RCPT TO: <user@destination.domain>',
1032
'This is the message',
1038
f = protocol.ClientFactory()
1039
f.protocol = lambda: client
1040
f.clientConnectionLost = lambda *args: done.callback(None)
1041
reactor.connectTCP('127.0.0.1', self.insServer.getHost().port, f)
1044
# First part of the delivery is done. Poke the queue manually now
1045
# so we don't have to wait for the queue to be flushed.
1046
delivery = manager.checkState()
1048
mbox = domain.requestAvatar('user', None, pop3.IMailbox)[1]
1049
msg = mbox.getMessage(0).read()
1050
self.failIfEqual(msg.find('This is the message'), -1)
1052
self.insServer.stopListening()
1053
self.destServer.stopListening()
1054
helper.stopService()
1055
delivery.addCallback(delivered)
1057
done.addCallback(finished)
1061
aliasFile = StringIO.StringIO("""\
1064
testuser: address1,address2, address3,
1065
continuation@address, |/bin/process/this
1067
usertwo:thisaddress,thataddress, lastaddress
1068
lastuser: :/includable, /filename, |/program, address
1071
class LineBufferMessage:
1077
def lineReceived(self, line):
1078
self.lines.append(line)
1080
def eomReceived(self):
1082
return defer.succeed('<Whatever>')
1084
def connectionLost(self):
1087
class AliasTestCase(unittest.TestCase):
1092
'After a blank line',
1099
def testHandle(self):
1102
'user: another@host\n',
1103
'nextuser: |/bin/program\n',
1105
'moreusers: :/etc/include/filename\n',
1106
'multiuser: first@host, second@host,last@anotherhost',
1110
mail.alias.handle(result, l, 'TestCase', None)
1112
self.assertEquals(result['user'], ['another@host', 'me@again'])
1113
self.assertEquals(result['nextuser'], ['|/bin/program'])
1114
self.assertEquals(result['moreusers'], [':/etc/include/filename'])
1115
self.assertEquals(result['multiuser'], ['first@host', 'second@host', 'last@anotherhost'])
1117
def testFileLoader(self):
1118
domains = {'': object()}
1119
result = mail.alias.loadAliasFile(domains, fp=aliasFile)
1121
self.assertEquals(len(result), 3)
1123
group = result['testuser']
1125
for a in ('address1', 'address2', 'address3', 'continuation@address', '/bin/process/this'):
1126
self.failIfEqual(s.find(a), -1)
1127
self.assertEquals(len(group), 5)
1129
group = result['usertwo']
1131
for a in ('thisaddress', 'thataddress', 'lastaddress'):
1132
self.failIfEqual(s.find(a), -1)
1133
self.assertEquals(len(group), 3)
1135
group = result['lastuser']
1137
self.failUnlessEqual(s.find('/includable'), -1)
1138
for a in ('/filename', 'program', 'address'):
1139
self.failIfEqual(s.find(a), -1, '%s not found' % a)
1140
self.assertEquals(len(group), 3)
1142
def testMultiWrapper(self):
1143
msgs = LineBufferMessage(), LineBufferMessage(), LineBufferMessage()
1144
msg = mail.alias.MultiWrapper(msgs)
1146
for L in self.lines:
1148
return msg.eomReceived().addCallback(self._cbMultiWrapper, msgs)
1150
def _cbMultiWrapper(self, ignored, msgs):
1152
self.failUnless(m.eom)
1154
self.assertEquals(self.lines, m.lines)
1156
def testFileAlias(self):
1157
tmpfile = self.mktemp()
1158
a = mail.alias.FileAlias(tmpfile, None, None)
1159
m = a.createMessageReceiver()
1161
for l in self.lines:
1163
return m.eomReceived().addCallback(self._cbTestFileAlias, tmpfile)
1165
def _cbTestFileAlias(self, ignored, tmpfile):
1166
lines = file(tmpfile).readlines()
1167
self.assertEquals([L[:-1] for L in lines], self.lines)
1170
class DummyProcess(object):
1171
__slots__ = ['onEnd']
1173
class ProcessAliasTestCase(test_process.SignalMixin, unittest.TestCase):
1178
'After a blank line',
1182
def setUpClass(self):
1183
self.DNSNAME = smtp.DNSNAME
1186
def tearDownClass(self):
1187
smtp.DNSNAME = self.DNSNAME
1189
def testProcessAlias(self):
1190
path = util.sibpath(__file__, 'process.alias.sh')
1191
a = mail.alias.ProcessAlias(path, None, None)
1192
m = a.createMessageReceiver()
1194
for l in self.lines:
1196
return m.eomReceived().addCallback(self._cbProcessAlias)
1198
def _cbProcessAlias(self, ignored):
1199
lines = file('process.alias.out').readlines()
1200
self.assertEquals([L[:-1] for L in lines], self.lines)
1202
def testAliasResolution(self):
1204
domain = {'': TestDomain(aliases, ['user1', 'user2', 'user3'])}
1205
A1 = mail.alias.AliasGroup(['user1', '|echo', '/file'], domain, 'alias1')
1206
A2 = mail.alias.AliasGroup(['user2', 'user3'], domain, 'alias2')
1207
A3 = mail.alias.AddressAlias('alias1', domain, 'alias3')
1214
r1 = map(str, A1.resolve(aliases).objs)
1216
expected = map(str, [
1217
mail.alias.AddressAlias('user1', None, None),
1218
mail.alias.MessageWrapper(DummyProcess(), 'echo'),
1219
mail.alias.FileWrapper('/file'),
1222
self.assertEquals(r1, expected)
1224
r2 = map(str, A2.resolve(aliases).objs)
1226
expected = map(str, [
1227
mail.alias.AddressAlias('user2', None, None),
1228
mail.alias.AddressAlias('user3', None, None)
1231
self.assertEquals(r2, expected)
1233
r3 = map(str, A3.resolve(aliases).objs)
1235
expected = map(str, [
1236
mail.alias.AddressAlias('user1', None, None),
1237
mail.alias.MessageWrapper(DummyProcess(), 'echo'),
1238
mail.alias.FileWrapper('/file'),
1241
self.assertEquals(r3, expected)
1243
def testCyclicAlias(self):
1245
domain = {'': TestDomain(aliases, [])}
1246
A1 = mail.alias.AddressAlias('alias2', domain, 'alias1')
1247
A2 = mail.alias.AddressAlias('alias3', domain, 'alias2')
1248
A3 = mail.alias.AddressAlias('alias1', domain, 'alias3')
1255
self.assertEquals(aliases['alias1'].resolve(aliases), None)
1256
self.assertEquals(aliases['alias2'].resolve(aliases), None)
1257
self.assertEquals(aliases['alias3'].resolve(aliases), None)
1259
A4 = mail.alias.AliasGroup(['|echo', 'alias1'], domain, 'alias4')
1260
aliases['alias4'] = A4
1262
r = map(str, A4.resolve(aliases).objs)
1264
expected = map(str, [
1265
mail.alias.MessageWrapper(DummyProcess(), 'echo')
1268
if interfaces.IReactorProcess(reactor, None) is None:
1269
ProcessAliasTestCase = "IReactorProcess not supported"
1272
def __init__(self, aliases, users):
1273
self.aliases = aliases
1276
def exists(self, user, memo=None):
1277
user = user.dest.local
1278
if user in self.users:
1279
return lambda: mail.alias.AddressAlias(user, None, None)
1281
a = self.aliases[user]
1283
raise smtp.SMTPBadRcpt(user)
1285
aliases = a.resolve(self.aliases, memo)
1287
return lambda: aliases
1288
raise smtp.SMTPBadRcpt(user)
1291
from twisted.python.runtime import platformType
1293
if platformType != "posix":
1294
for o in locals().values():
1295
if isinstance(o, (types.ClassType, type)) and issubclass(o, unittest.TestCase):
1296
o.skip = "twisted.mail only works on posix"