2
# Copyright (c) 2001-2004 Twisted Matrix Laboratories.
3
# See LICENSE for details.
15
from twisted.trial import unittest
18
from zope.interface import providedBy
20
from twisted.trial import unittest, util as tutil
21
from twisted.mail import smtp
22
from twisted.mail import pop3
23
from twisted.names import dns
24
from twisted.protocols import basic
25
from twisted.internet import protocol
26
from twisted.internet import defer
27
from twisted.internet import reactor
28
from twisted.internet import interfaces
29
from twisted.internet.error import DNSLookupError, CannotListenError
30
from twisted.internet import address
31
from twisted.python import components
32
from twisted.python import failure
33
from twisted.python import util
35
from twisted import mail
36
import twisted.mail.mail
37
import twisted.mail.maildir
38
import twisted.mail.relay
39
import twisted.mail.relaymanager
40
import twisted.mail.protocols
41
import twisted.mail.alias
43
from twisted import cred
44
import twisted.cred.credentials
45
import twisted.cred.checkers
46
import twisted.cred.portal
48
# Since we run a couple processes, we need SignalMixin from test_process
49
from twisted.test import test_process
51
from twisted.test.proto_helpers import LineSendingProtocol
53
class DomainWithDefaultsTestCase(unittest.TestCase):
54
def testMethods(self):
55
d = dict([(x, x + 10) for x in range(10)])
56
d = mail.mail.DomainWithDefaultDict(d, 'Default')
58
self.assertEquals(len(d), 10)
59
self.assertEquals(list(iter(d)), range(10))
60
self.assertEquals(list(d.iterkeys()), list(iter(d)))
62
items = list(d.iteritems())
64
self.assertEquals(items, [(x, x + 10) for x in range(10)])
66
values = list(d.itervalues())
68
self.assertEquals(values, range(10, 20))
72
self.assertEquals(items, [(x, x + 10) for x in range(10)])
76
self.assertEquals(values, range(10, 20))
79
self.assertEquals(d[x], x + 10)
80
self.assertEquals(d.get(x), x + 10)
81
self.failUnless(x in d)
82
self.failUnless(d.has_key(x))
86
self.assertEquals(len(d), 7)
87
self.assertEquals(d[2], 'Default')
88
self.assertEquals(d[4], 'Default')
89
self.assertEquals(d[6], 'Default')
91
d.update({'a': None, 'b': (), 'c': '*'})
92
self.assertEquals(len(d), 10)
93
self.assertEquals(d['a'], None)
94
self.assertEquals(d['b'], ())
95
self.assertEquals(d['c'], '*')
98
self.assertEquals(len(d), 0)
100
self.assertEquals(d.setdefault('key', 'value'), 'value')
101
self.assertEquals(d['key'], 'value')
103
self.assertEquals(d.popitem(), ('key', 'value'))
104
self.assertEquals(len(d), 0)
106
class BounceTestCase(unittest.TestCase):
108
self.domain = mail.mail.BounceDomain()
110
def testExists(self):
111
self.assertRaises(smtp.AddressError, self.domain.exists, "any user")
115
self.domain.willRelay("random q emailer", "protocol"),
119
def testMessage(self):
120
self.assertRaises(AssertionError, self.domain.startMessage, "whomever")
122
def testAddUser(self):
123
self.domain.addUser("bob", "password")
124
self.assertRaises(smtp.SMTPBadRcpt, self.domain.exists, "bob")
126
class FileMessageTestCase(unittest.TestCase):
128
self.name = "fileMessage.testFile"
129
self.final = "final.fileMessage.testFile"
130
self.f = file(self.name, 'w')
131
self.fp = mail.mail.FileMessage(self.f, self.name, self.final)
143
os.remove(self.final)
147
def testFinalName(self):
148
return self.fp.eomReceived().addCallback(self._cbFinalName)
150
def _cbFinalName(self, result):
151
self.assertEquals(result, self.final)
152
self.failUnless(self.f.closed)
153
self.failIf(os.path.exists(self.name))
155
def testContents(self):
156
contents = "first line\nsecond line\nthird line\n"
157
for line in contents.splitlines():
158
self.fp.lineReceived(line)
159
self.fp.eomReceived()
160
self.assertEquals(file(self.final).read(), contents)
162
def testInterrupted(self):
163
contents = "first line\nsecond line\n"
164
for line in contents.splitlines():
165
self.fp.lineReceived(line)
166
self.fp.connectionLost()
167
self.failIf(os.path.exists(self.name))
168
self.failIf(os.path.exists(self.final))
170
class MailServiceTestCase(unittest.TestCase):
172
self.service = mail.mail.MailService()
174
def testFactories(self):
175
f = self.service.getPOP3Factory()
176
self.failUnless(isinstance(f, protocol.ServerFactory))
177
self.failUnless(f.buildProtocol(('127.0.0.1', 12345)), pop3.POP3)
179
f = self.service.getSMTPFactory()
180
self.failUnless(isinstance(f, protocol.ServerFactory))
181
self.failUnless(f.buildProtocol(('127.0.0.1', 12345)), smtp.SMTP)
183
f = self.service.getESMTPFactory()
184
self.failUnless(isinstance(f, protocol.ServerFactory))
185
self.failUnless(f.buildProtocol(('127.0.0.1', 12345)), smtp.ESMTP)
187
def testPortals(self):
190
self.service.portals['domain'] = o1
191
self.service.portals[''] = o2
193
self.failUnless(self.service.lookupPortal('domain') is o1)
194
self.failUnless(self.service.defaultPortal() is o2)
196
class FailingMaildirMailboxAppendMessageTask(mail.maildir._MaildirMailboxAppendMessageTask):
200
def osopen(self, fn, attr, mode):
202
return os.open(fn, attr, mode)
204
raise OSError(errno.EPERM, "Faked Permission Problem")
205
def oswrite(self, fh, data):
207
return os.write(fh, data)
209
raise OSError(errno.ENOSPC, "Faked Space problem")
210
def osrename(self, oldname, newname):
211
if self._renamestate:
212
return os.rename(oldname, newname)
214
raise OSError(errno.EPERM, "Faked Permission Problem")
216
class MaildirAppendStringTestCase(unittest.TestCase):
218
self.d = self.mktemp()
219
mail.maildir.initializeMaildir(self.d)
222
shutil.rmtree(self.d)
224
def testAppend(self):
225
mbox = mail.maildir.MaildirMailbox(self.d)
226
mbox.AppendFactory = FailingMaildirMailboxAppendMessageTask
227
for i in xrange(1, 11):
229
unittest.wait(mbox.appendMessage("X" * i)),
231
self.assertEquals(len(mbox.listMessages()),
233
self.assertEquals(len(mbox.getMessage(5).read()), 6)
234
# test in the right order: last to first error location.
235
mbox.AppendFactory._renamestate = False
236
self.failUnless(isinstance(unittest.deferredError(mbox.appendMessage("TEST")),
238
mbox.AppendFactory._renamestate = True
239
mbox.AppendFactory._writestate = False
240
self.failUnless(isinstance(unittest.deferredError(mbox.appendMessage("TEST")),
242
mbox.AppendFactory._writestate = True
243
mbox.AppendFactory._openstate = False
244
self.failUnless(isinstance(unittest.deferredError(mbox.appendMessage("TEST")),
246
mbox.AppendFactory._openstate = True
249
class MaildirAppendFileTestCase(unittest.TestCase):
251
self.d = self.mktemp()
252
mail.maildir.initializeMaildir(self.d)
255
shutil.rmtree(self.d)
257
def testAppend(self):
258
mbox = mail.maildir.MaildirMailbox(self.d)
259
for i in xrange(1, 11):
260
temp = tempfile.TemporaryFile()
264
unittest.wait(mbox.appendMessage(temp)),
267
self.assertEquals(len(mbox.listMessages()),
269
self.assertEquals(len(mbox.getMessage(5).read()), 6)
272
class MaildirTestCase(unittest.TestCase):
274
self.d = self.mktemp()
275
mail.maildir.initializeMaildir(self.d)
278
shutil.rmtree(self.d)
280
def testInitializer(self):
282
trash = os.path.join(d, '.Trash')
284
self.failUnless(os.path.exists(d) and os.path.isdir(d))
285
self.failUnless(os.path.exists(os.path.join(d, 'new')))
286
self.failUnless(os.path.exists(os.path.join(d, 'cur')))
287
self.failUnless(os.path.exists(os.path.join(d, 'tmp')))
288
self.failUnless(os.path.isdir(os.path.join(d, 'new')))
289
self.failUnless(os.path.isdir(os.path.join(d, 'cur')))
290
self.failUnless(os.path.isdir(os.path.join(d, 'tmp')))
292
self.failUnless(os.path.exists(os.path.join(trash, 'new')))
293
self.failUnless(os.path.exists(os.path.join(trash, 'cur')))
294
self.failUnless(os.path.exists(os.path.join(trash, 'tmp')))
295
self.failUnless(os.path.isdir(os.path.join(trash, 'new')))
296
self.failUnless(os.path.isdir(os.path.join(trash, 'cur')))
297
self.failUnless(os.path.isdir(os.path.join(trash, 'tmp')))
299
def testMailbox(self):
301
n = mail.maildir._generateMaildirName
302
msgs = [j(b, n()) for b in ('cur', 'new') for x in range(5)]
304
# Toss a few files into the mailbox
307
f = file(j(self.d, f), 'w')
312
mb = mail.maildir.MaildirMailbox(self.d)
313
self.assertEquals(mb.listMessages(), range(1, 11))
314
self.assertEquals(mb.listMessages(1), 2)
315
self.assertEquals(mb.listMessages(5), 6)
317
self.assertEquals(mb.getMessage(6).read(), 'x' * 7)
318
self.assertEquals(mb.getMessage(1).read(), 'x' * 2)
326
p, f = os.path.split(msgs[5])
329
self.assertEquals(mb.listMessages(5), 0)
330
self.failUnless(os.path.exists(j(self.d, '.Trash', 'cur', f)))
331
self.failIf(os.path.exists(j(self.d, msgs[5])))
333
mb.undeleteMessages()
334
self.assertEquals(mb.listMessages(5), 6)
335
self.failIf(os.path.exists(j(self.d, '.Trash', 'cur', f)))
336
self.failUnless(os.path.exists(j(self.d, msgs[5])))
338
class MaildirDirdbmDomainTestCase(unittest.TestCase):
340
self.P = self.mktemp()
341
self.S = mail.mail.MailService()
342
self.D = mail.maildir.MaildirDirdbmDomain(self.S, self.P)
345
shutil.rmtree(self.P)
347
def testAddUser(self):
348
toAdd = (('user1', 'pwd1'), ('user2', 'pwd2'), ('user3', 'pwd3'))
353
self.failUnless(u in self.D.dbm)
354
self.assertEquals(self.D.dbm[u], p)
355
self.failUnless(os.path.exists(os.path.join(self.P, u)))
357
def testCredentials(self):
358
creds = self.D.getCredentialsCheckers()
360
self.assertEquals(len(creds), 1)
361
self.failUnless(cred.checkers.ICredentialsChecker.providedBy(creds[0]))
362
self.failUnless(cred.credentials.IUsernamePassword in creds[0].credentialInterfaces)
364
def testRequestAvatar(self):
365
class ISomething(components.Interface):
368
self.D.addUser('user', 'password')
371
self.D.requestAvatar, 'user', None, ISomething
374
t = self.D.requestAvatar('user', None, pop3.IMailbox)
375
self.assertEquals(len(t), 3)
376
self.failUnless(t[0] is pop3.IMailbox)
377
self.failUnless(pop3.IMailbox.providedBy(t[1]))
381
def testRequestAvatarId(self):
382
self.D.addUser('user', 'password')
383
database = self.D.getCredentialsCheckers()[0]
385
creds = cred.credentials.UsernamePassword('user', 'wrong password')
387
cred.error.UnauthorizedLogin,
388
database.requestAvatarId, creds
391
creds = cred.credentials.UsernamePassword('user', 'password')
392
self.assertEquals(database.requestAvatarId(creds), 'user')
394
class ServiceDomainTestCase(unittest.TestCase):
396
self.S = mail.mail.MailService()
397
self.D = mail.protocols.DomainDeliveryBase(self.S, None)
398
self.D.service = self.S
399
self.D.protocolName = 'TEST'
400
self.D.host = 'hostname'
402
self.tmpdir = self.mktemp()
403
domain = mail.maildir.MaildirDirdbmDomain(self.S, self.tmpdir)
404
domain.addUser('user', 'password')
405
self.S.domains['test.domain'] = domain
408
shutil.rmtree(self.tmpdir)
410
def testReceivedHeader(self):
411
hdr = self.D.receivedHeader(
412
('remotehost', '123.232.101.234'),
413
smtp.Address('<someguy@somplace>'),
416
fp = StringIO.StringIO(hdr)
417
m = rfc822.Message(fp)
418
self.assertEquals(len(m.items()), 1)
419
self.failUnless(m.has_key('Received'))
421
def testValidateTo(self):
422
user = smtp.User('user@test.domain', 'helo', None, 'wherever@whatever')
423
return defer.maybeDeferred(self.D.validateTo, user
424
).addCallback(self._cbValidateTo
427
def _cbValidateTo(self, result):
428
self.failUnless(callable(result))
430
def testValidateToBadUsername(self):
431
user = smtp.User('resu@test.domain', 'helo', None, 'wherever@whatever')
432
return unittest.assertFailure(
433
defer.maybeDeferred(self.D.validateTo, user),
436
def testValidateToBadDomain(self):
437
user = smtp.User('user@domain.test', 'helo', None, 'wherever@whatever')
438
return unittest.assertFailure(
439
defer.maybeDeferred(self.D.validateTo, user),
442
def testValidateFrom(self):
443
helo = ('hostname', '127.0.0.1')
444
origin = smtp.Address('<user@hostname>')
445
self.failUnless(self.D.validateFrom(helo, origin) is origin)
447
helo = ('hostname', '1.2.3.4')
448
origin = smtp.Address('<user@hostname>')
449
self.failUnless(self.D.validateFrom(helo, origin) is origin)
451
helo = ('hostname', '1.2.3.4')
452
origin = smtp.Address('<>')
453
self.failUnless(self.D.validateFrom(helo, origin) is origin)
457
self.D.validateFrom, None, origin
460
class VirtualPOP3TestCase(unittest.TestCase):
462
self.tmpdir = self.mktemp()
463
self.S = mail.mail.MailService()
464
self.D = mail.maildir.MaildirDirdbmDomain(self.S, self.tmpdir)
465
self.D.addUser('user', 'password')
466
self.S.domains['test.domain'] = self.D
468
portal = cred.portal.Portal(self.D)
469
map(portal.registerChecker, self.D.getCredentialsCheckers())
470
self.S.portals[''] = self.S.portals['test.domain'] = portal
472
self.P = mail.protocols.VirtualPOP3()
473
self.P.service = self.S
474
self.P.magic = '<unit test magic>'
477
shutil.rmtree(self.tmpdir)
479
def testAuthenticateAPOP(self):
480
resp = md5.new(self.P.magic + 'password').hexdigest()
481
return self.P.authenticateUserAPOP('user', resp
482
).addCallback(self._cbAuthenticateAPOP
485
def _cbAuthenticateAPOP(self, result):
486
self.assertEquals(len(result), 3)
487
self.assertEquals(result[0], pop3.IMailbox)
488
self.failUnless(pop3.IMailbox.providedBy(result[1]))
491
def testAuthenticateIncorrectUserAPOP(self):
492
resp = md5.new(self.P.magic + 'password').hexdigest()
493
return unittest.assertFailure(
494
self.P.authenticateUserAPOP('resu', resp),
495
cred.error.UnauthorizedLogin)
497
def testAuthenticateIncorrectResponseAPOP(self):
498
resp = md5.new('wrong digest').hexdigest()
499
return unittest.assertFailure(
500
self.P.authenticateUserAPOP('user', resp),
501
cred.error.UnauthorizedLogin)
503
def testAuthenticatePASS(self):
504
return self.P.authenticateUserPASS('user', 'password'
505
).addCallback(self._cbAuthenticatePASS
508
def _cbAuthenticatePASS(self, result):
509
self.assertEquals(len(result), 3)
510
self.assertEquals(result[0], pop3.IMailbox)
511
self.failUnless(pop3.IMailbox.providedBy(result[1]))
514
def testAuthenticateBadUserPASS(self):
515
return unittest.assertFailure(
516
self.P.authenticateUserPASS('resu', 'password'),
517
cred.error.UnauthorizedLogin)
519
def testAuthenticateBadPasswordPASS(self):
520
return unittest.assertFailure(
521
self.P.authenticateUserPASS('user', 'wrong password'),
522
cred.error.UnauthorizedLogin)
524
class empty(smtp.User):
528
class RelayTestCase(unittest.TestCase):
529
def testExists(self):
530
service = mail.mail.MailService()
531
domain = mail.relay.DomainQueuer(service)
534
address.UNIXAddress('/var/run/mail-relay'),
535
address.IPv4Address('TCP', '127.0.0.1', 12345),
539
address.IPv4Address('TCP', '192.168.2.1', 62),
540
address.IPv4Address('TCP', '1.2.3.4', 1943),
545
user.orig = 'user@host'
546
user.dest = 'tsoh@resu'
547
user.protocol = empty()
548
user.protocol.transport = empty()
549
user.protocol.transport.getPeer = lambda: peer
551
self.failUnless(callable(domain.exists(user)))
553
for peer in dontRelay:
555
user.orig = 'some@place'
556
user.protocol = empty()
557
user.protocol.transport = empty()
558
user.protocol.transport.getPeer = lambda: peer
559
user.dest = 'who@cares'
561
self.assertRaises(smtp.SMTPBadRcpt, domain.exists, user)
563
class RelayerTestCase(unittest.TestCase):
565
self.tmpdir = self.mktemp()
566
os.mkdir(self.tmpdir)
567
self.messageFiles = []
569
name = os.path.join(self.tmpdir, 'body-%d' % (i,))
570
f = file(name + '-H', 'w')
571
pickle.dump(['from-%d' % (i,), 'to-%d' % (i,)], f)
574
f = file(name + '-D', 'w')
577
self.messageFiles.append(name)
579
self.R = mail.relay.RelayerMixin()
580
self.R.loadMessages(self.messageFiles)
583
shutil.rmtree(self.tmpdir)
585
def testMailFrom(self):
587
self.assertEquals(self.R.getMailFrom(), 'from-%d' % (i,))
588
self.R.sentMail(250, None, None, None, None)
589
self.assertEquals(self.R.getMailFrom(), None)
591
def testMailTo(self):
593
self.assertEquals(self.R.getMailTo(), ['to-%d' % (i,)])
594
self.R.sentMail(250, None, None, None, None)
595
self.assertEquals(self.R.getMailTo(), None)
597
def testMailData(self):
599
name = os.path.join(self.tmpdir, 'body-%d' % (i,))
600
self.assertEquals(self.R.getMailData().read(), name)
601
self.R.sentMail(250, None, None, None, None)
602
self.assertEquals(self.R.getMailData(), None)
610
def notifySuccess(self, factory, message):
611
self.success.append((factory, message))
613
def notifyFailure(self, factory, message):
614
self.failure.append((factory, message))
616
def notifyDone(self, factory):
617
self.done.append(factory)
619
class ManagedRelayerTestCase(unittest.TestCase):
621
self.manager = Manager()
622
self.messages = range(0, 20, 2)
623
self.factory = object()
624
self.relay = mail.relaymanager.ManagedRelayerMixin(self.manager)
625
self.relay.messages = self.messages[:]
626
self.relay.names = self.messages[:]
627
self.relay.factory = self.factory
629
def testSuccessfulSentMail(self):
630
for i in self.messages:
631
self.relay.sentMail(250, None, None, None, None)
634
self.manager.success,
635
[(self.factory, m) for m in self.messages]
638
def testFailedSentMail(self):
639
for i in self.messages:
640
self.relay.sentMail(550, None, None, None, None)
643
self.manager.failure,
644
[(self.factory, m) for m in self.messages]
647
def testConnectionLost(self):
648
self.relay.connectionLost(failure.Failure(Exception()))
649
self.assertEquals(self.manager.done, [self.factory])
651
class DirectoryQueueTestCase(unittest.TestCase):
653
# This is almost a test case itself.
654
self.tmpdir = self.mktemp()
655
os.mkdir(self.tmpdir)
656
self.queue = mail.relaymanager.Queue(self.tmpdir)
657
self.queue.noisy = False
659
hdrF, msgF = self.queue.createNewMessage()
660
pickle.dump(['header', m], hdrF)
662
msgF.lineReceived('body: %d' % (m,))
664
self.queue.readDirectory()
667
shutil.rmtree(self.tmpdir)
669
def testWaiting(self):
670
self.failUnless(self.queue.hasWaiting())
671
self.assertEquals(len(self.queue.getWaiting()), 25)
673
waiting = self.queue.getWaiting()
674
self.queue.setRelaying(waiting[0])
675
self.assertEquals(len(self.queue.getWaiting()), 24)
677
self.queue.setWaiting(waiting[0])
678
self.assertEquals(len(self.queue.getWaiting()), 25)
680
def testRelaying(self):
681
for m in self.queue.getWaiting():
682
self.queue.setRelaying(m)
684
len(self.queue.getRelayed()),
685
25 - len(self.queue.getWaiting())
688
self.failIf(self.queue.hasWaiting())
690
relayed = self.queue.getRelayed()
691
self.queue.setWaiting(relayed[0])
692
self.assertEquals(len(self.queue.getWaiting()), 1)
693
self.assertEquals(len(self.queue.getRelayed()), 24)
696
msg = self.queue.getWaiting()[0]
697
self.queue.setRelaying(msg)
700
self.assertEquals(len(self.queue.getWaiting()), 24)
701
self.assertEquals(len(self.queue.getRelayed()), 0)
703
self.failIf(msg in self.queue.getWaiting())
704
self.failIf(msg in self.queue.getRelayed())
706
def testEnvelope(self):
709
for msg in self.queue.getWaiting():
710
envelopes.append(self.queue.getEnvelope(msg))
719
from twisted.names import server
720
from twisted.names import client
721
from twisted.names import common
723
class TestAuthority(common.ResolverBase):
725
common.ResolverBase.__init__(self)
728
def _lookup(self, name, cls, type, timeout = None):
729
if name in self.addresses and type == dns.MX:
731
for a in self.addresses[name]:
733
name, dns.MX, dns.IN, 60, dns.Record_MX(0, a)
736
return defer.succeed((results, [], []))
737
return defer.fail(failure.Failure(dns.DomainError(name)))
740
self.auth = TestAuthority()
741
factory = server.DNSServerFactory([self.auth])
742
protocol = dns.DNSDatagramProtocol(factory)
744
self.port = reactor.listenTCP(0, factory, interface='127.0.0.1')
745
portNumber = self.port.getHost().port
748
self.udpPort = reactor.listenUDP(portNumber, protocol, interface='127.0.0.1')
749
except CannotListenError:
750
self.port.stopListening()
753
self.resolver = client.Resolver(servers=[('127.0.0.1', portNumber)])
755
def tearDownDNS(self):
756
self.port.stopListening()
757
self.udpPort.stopListening()
759
self.resolver._parseCall.cancel()
763
class MXTestCase(unittest.TestCase):
766
self.mx = mail.relaymanager.MXCalculator(self.resolver)
771
def testSimpleSuccess(self):
772
self.auth.addresses['test.domain'] = ['the.email.test.domain']
773
return self.mx.getMX('test.domain').addCallback(self._cbSimpleSuccess)
775
def _cbSimpleSuccess(self, mx):
776
self.assertEquals(mx.preference, 0)
777
self.assertEquals(str(mx.exchange), 'the.email.test.domain')
779
def testSimpleFailure(self):
780
self.mx.fallbackToDomain = False
781
return unittest.assertFailure(self.mx.getMX('test.domain'), IOError)
783
def testSimpleFailureWithFallback(self):
784
return unittest.assertFailure(self.mx.getMX('test.domain'), DNSLookupError)
786
def testManyRecords(self):
787
self.auth.addresses['test.domain'] = [
788
'mx1.test.domain', 'mx2.test.domain', 'mx3.test.domain'
790
return self.mx.getMX('test.domain'
791
).addCallback(self._cbManyRecordsSuccessfulLookup
794
def _cbManyRecordsSuccessfulLookup(self, mx):
795
self.failUnless(str(mx.exchange).split('.', 1)[0] in ('mx1', 'mx2', 'mx3'))
796
self.mx.markBad(str(mx.exchange))
797
return self.mx.getMX('test.domain'
798
).addCallback(self._cbManyRecordsDifferentResult, mx
801
def _cbManyRecordsDifferentResult(self, nextMX, mx):
802
self.assertNotEqual(str(mx.exchange), str(nextMX.exchange))
803
self.mx.markBad(str(nextMX.exchange))
805
return self.mx.getMX('test.domain'
806
).addCallback(self._cbManyRecordsLastResult, mx, nextMX
809
def _cbManyRecordsLastResult(self, lastMX, mx, nextMX):
810
self.assertNotEqual(str(mx.exchange), str(lastMX.exchange))
811
self.assertNotEqual(str(nextMX.exchange), str(lastMX.exchange))
813
self.mx.markBad(str(lastMX.exchange))
814
self.mx.markGood(str(nextMX.exchange))
816
return self.mx.getMX('test.domain'
817
).addCallback(self._cbManyRecordsRepeatSpecificResult, nextMX
820
def _cbManyRecordsRepeatSpecificResult(self, againMX, nextMX):
821
self.assertEqual(str(againMX.exchange), str(nextMX.exchange))
823
class LiveFireExercise(unittest.TestCase):
824
if interfaces.IReactorUDP(reactor, default=None) is None:
825
skip = "UDP support is required to determining MX records"
830
'domainDir', 'insertionDomain', 'insertionQueue',
831
'destinationDomain', 'destinationQueue'
836
for d in self.tmpdirs:
837
if os.path.exists(d):
840
def testLocalDelivery(self):
841
service = mail.mail.MailService()
842
service.smtpPortal.registerChecker(cred.checkers.AllowAnonymousAccess())
843
domain = mail.maildir.MaildirDirdbmDomain(service, 'domainDir')
844
domain.addUser('user', 'password')
845
service.domains['test.domain'] = domain
846
service.portals['test.domain'] = cred.portal.Portal(domain)
847
service.portals[''] = service.portals['test.domain']
848
map(service.portals[''].registerChecker, domain.getCredentialsCheckers())
850
service.setQueue(mail.relay.DomainQueuer(service))
851
manager = mail.relaymanager.SmartHostSMTPRelayingManager(service.queue, None)
852
helper = mail.relaymanager.RelayStateHelper(manager, 1)
854
f = service.getSMTPFactory()
856
self.smtpServer = reactor.listenTCP(0, f, interface='127.0.0.1')
858
client = LineSendingProtocol([
860
'MAIL FROM: <user@hostname>',
861
'RCPT TO: <user@test.domain>',
863
'This is the message',
869
f = protocol.ClientFactory()
870
f.protocol = lambda: client
871
f.clientConnectionLost = lambda *args: done.append(None)
872
reactor.connectTCP('127.0.0.1', self.smtpServer.getHost().port, f)
875
while len(done) == 0 and i < 1000:
876
reactor.iterate(0.01)
879
self.failUnless(done)
881
mbox = domain.requestAvatar('user', None, pop3.IMailbox)[1]
882
msg = mbox.getMessage(0).read()
883
self.failIfEqual(msg.find('This is the message'), -1)
885
self.smtpServer.stopListening()
887
def testRelayDelivery(self):
888
# Here is the service we will connect to and send mail from
889
insServ = mail.mail.MailService()
890
insServ.smtpPortal.registerChecker(cred.checkers.AllowAnonymousAccess())
891
domain = mail.maildir.MaildirDirdbmDomain(insServ, 'insertionDomain')
892
insServ.domains['insertion.domain'] = domain
893
insServ.portals['insertion.domain'] = cred.portal.Portal(domain)
894
os.mkdir('insertionQueue')
895
insServ.setQueue(mail.relaymanager.Queue('insertionQueue'))
896
insServ.domains.setDefaultDomain(mail.relay.DomainQueuer(insServ))
897
manager = mail.relaymanager.SmartHostSMTPRelayingManager(insServ.queue)
898
manager.fArgs += ('test.identity.hostname',)
899
helper = mail.relaymanager.RelayStateHelper(manager, 1)
900
# Yoink! Now the internet obeys OUR every whim!
901
manager.mxcalc = mail.relaymanager.MXCalculator(self.resolver)
902
# And this is our whim.
903
self.auth.addresses['destination.domain'] = ['localhost']
905
f = insServ.getSMTPFactory()
906
self.insServer = reactor.listenTCP(0, f, interface='127.0.0.1')
908
# Here is the service the previous one will connect to for final
910
destServ = mail.mail.MailService()
911
destServ.smtpPortal.registerChecker(cred.checkers.AllowAnonymousAccess())
912
domain = mail.maildir.MaildirDirdbmDomain(destServ, 'destinationDomain')
913
domain.addUser('user', 'password')
914
destServ.domains['destination.domain'] = domain
915
destServ.portals['destination.domain'] = cred.portal.Portal(domain)
916
os.mkdir('destinationQueue')
917
destServ.setQueue(mail.relaymanager.Queue('destinationQueue'))
918
manager2 = mail.relaymanager.SmartHostSMTPRelayingManager(destServ.queue)
919
helper = mail.relaymanager.RelayStateHelper(manager, 1)
920
helper.startService()
922
f = destServ.getSMTPFactory()
923
self.destServer = reactor.listenTCP(0, f, interface='127.0.0.1')
925
# Update the port number the *first* relay will connect to, because we can't use
927
manager.PORT = self.destServer.getHost().port
929
client = LineSendingProtocol([
931
'MAIL FROM: <user@wherever>',
932
'RCPT TO: <user@destination.domain>',
934
'This is the message',
940
f = protocol.ClientFactory()
941
f.protocol = lambda: client
942
f.clientConnectionLost = lambda *args: done.append(None)
943
reactor.connectTCP('127.0.0.1', self.insServer.getHost().port, f)
946
while len(done) == 0 and i < 1000:
947
reactor.iterate(0.01)
950
self.failUnless(done)
952
# First part of the delivery is done. Poke the queue manually now
953
# so we don't have to wait for the queue to be flushed.
956
for i in range(1000):
957
reactor.iterate(0.01)
959
mbox = domain.requestAvatar('user', None, pop3.IMailbox)[1]
960
msg = mbox.getMessage(0).read()
961
self.failIfEqual(msg.find('This is the message'), -1)
963
self.insServer.stopListening()
964
self.destServer.stopListening()
967
aliasFile = StringIO.StringIO("""\
970
testuser: address1,address2, address3,
971
continuation@address, |/bin/process/this
973
usertwo:thisaddress,thataddress, lastaddress
974
lastuser: :/includable, /filename, |/program, address
977
class LineBufferMessage:
983
def lineReceived(self, line):
984
self.lines.append(line)
986
def eomReceived(self):
988
return defer.succeed('<Whatever>')
990
def connectionLost(self):
993
class AliasTestCase(unittest.TestCase):
998
'After a blank line',
1005
def testHandle(self):
1008
'user: another@host\n',
1009
'nextuser: |/bin/program\n',
1011
'moreusers: :/etc/include/filename\n',
1012
'multiuser: first@host, second@host,last@anotherhost',
1016
mail.alias.handle(result, l, 'TestCase', None)
1018
self.assertEquals(result['user'], ['another@host', 'me@again'])
1019
self.assertEquals(result['nextuser'], ['|/bin/program'])
1020
self.assertEquals(result['moreusers'], [':/etc/include/filename'])
1021
self.assertEquals(result['multiuser'], ['first@host', 'second@host', 'last@anotherhost'])
1023
def testFileLoader(self):
1024
domains = {'': object()}
1025
result = mail.alias.loadAliasFile(domains, fp=aliasFile)
1027
self.assertEquals(len(result), 3)
1029
group = result['testuser']
1031
for a in ('address1', 'address2', 'address3', 'continuation@address', '/bin/process/this'):
1032
self.failIfEqual(s.find(a), -1)
1033
self.assertEquals(len(group), 5)
1035
group = result['usertwo']
1037
for a in ('thisaddress', 'thataddress', 'lastaddress'):
1038
self.failIfEqual(s.find(a), -1)
1039
self.assertEquals(len(group), 3)
1041
group = result['lastuser']
1043
self.failUnlessEqual(s.find('/includable'), -1)
1044
for a in ('/filename', 'program', 'address'):
1045
self.failIfEqual(s.find(a), -1, '%s not found' % a)
1046
self.assertEquals(len(group), 3)
1048
def testMultiWrapper(self):
1049
msgs = LineBufferMessage(), LineBufferMessage(), LineBufferMessage()
1050
msg = mail.alias.MultiWrapper(msgs)
1052
for L in self.lines:
1054
return msg.eomReceived().addCallback(self._cbMultiWrapper, msgs)
1056
def _cbMultiWrapper(self, ignored, msgs):
1058
self.failUnless(m.eom)
1060
self.assertEquals(self.lines, m.lines)
1062
def testFileAlias(self):
1063
tmpfile = self.mktemp()
1064
a = mail.alias.FileAlias(tmpfile, None, None)
1065
m = a.createMessageReceiver()
1067
for l in self.lines:
1069
return m.eomReceived().addCallback(self._cbTestFileAlias, tmpfile)
1071
def _cbTestFileAlias(self, ignored, tmpfile):
1072
lines = file(tmpfile).readlines()
1073
self.assertEquals([L[:-1] for L in lines], self.lines)
1076
class DummyProcess(object):
1077
__slots__ = ['onEnd']
1079
class ProcessAliasTestCase(test_process.SignalMixin, unittest.TestCase):
1084
'After a blank line',
1088
def setUpClass(self):
1089
self.DNSNAME = smtp.DNSNAME
1092
def tearDownClass(self):
1093
smtp.DNSNAME = self.DNSNAME
1100
def testProcessAlias(self):
1101
path = util.sibpath(__file__, 'process.alias.sh')
1102
a = mail.alias.ProcessAlias(path, None, None)
1103
m = a.createMessageReceiver()
1105
for l in self.lines:
1107
return m.eomReceived().addCallback(self._cbProcessAlias)
1109
def _cbProcessAlias(self, ignored):
1110
lines = file('process.alias.out').readlines()
1111
self.assertEquals([L[:-1] for L in lines], self.lines)
1113
def testAliasResolution(self):
1115
domain = {'': TestDomain(aliases, ['user1', 'user2', 'user3'])}
1116
A1 = mail.alias.AliasGroup(['user1', '|echo', '/file'], domain, 'alias1')
1117
A2 = mail.alias.AliasGroup(['user2', 'user3'], domain, 'alias2')
1118
A3 = mail.alias.AddressAlias('alias1', domain, 'alias3')
1125
r1 = map(str, A1.resolve(aliases).objs)
1127
expected = map(str, [
1128
mail.alias.AddressAlias('user1', None, None),
1129
mail.alias.MessageWrapper(DummyProcess(), 'echo'),
1130
mail.alias.FileWrapper('/file'),
1133
self.assertEquals(r1, expected)
1135
r2 = map(str, A2.resolve(aliases).objs)
1137
expected = map(str, [
1138
mail.alias.AddressAlias('user2', None, None),
1139
mail.alias.AddressAlias('user3', None, None)
1142
self.assertEquals(r2, expected)
1144
r3 = map(str, A3.resolve(aliases).objs)
1146
expected = map(str, [
1147
mail.alias.AddressAlias('user1', None, None),
1148
mail.alias.MessageWrapper(DummyProcess(), 'echo'),
1149
mail.alias.FileWrapper('/file'),
1152
self.assertEquals(r3, expected)
1154
def testCyclicAlias(self):
1156
domain = {'': TestDomain(aliases, [])}
1157
A1 = mail.alias.AddressAlias('alias2', domain, 'alias1')
1158
A2 = mail.alias.AddressAlias('alias3', domain, 'alias2')
1159
A3 = mail.alias.AddressAlias('alias1', domain, 'alias3')
1166
self.assertEquals(aliases['alias1'].resolve(aliases), None)
1167
self.assertEquals(aliases['alias2'].resolve(aliases), None)
1168
self.assertEquals(aliases['alias3'].resolve(aliases), None)
1170
A4 = mail.alias.AliasGroup(['|echo', 'alias1'], domain, 'alias4')
1171
aliases['alias4'] = A4
1173
r = map(str, A4.resolve(aliases).objs)
1175
expected = map(str, [
1176
mail.alias.MessageWrapper(DummyProcess(), 'echo')
1179
if interfaces.IReactorProcess(reactor, default=None) is None:
1180
ProcessAliasTestCase = "IReactorProcess not supported"
1183
def __init__(self, aliases, users):
1184
self.aliases = aliases
1187
def exists(self, user, memo=None):
1188
user = user.dest.local
1189
if user in self.users:
1190
return lambda: mail.alias.AddressAlias(user, None, None)
1192
a = self.aliases[user]
1194
raise smtp.SMTPBadRcpt(user)
1196
aliases = a.resolve(self.aliases, memo)
1198
return lambda: aliases
1199
raise smtp.SMTPBadRcpt(user)
1202
from twisted.python.runtime import platformType
1204
if platformType != "posix":
1205
for o in locals().values():
1206
if isinstance(o, (types.ClassType, type)) and issubclass(o, unittest.TestCase):
1207
o.skip = "twisted.mail only works on posix"