1
# -*- test-case-name: twisted.mail.test.test_mail -*-
2
# Copyright (c) 2001-2008 Twisted Matrix Laboratories.
3
# See LICENSE for details.
6
Infrastructure for relaying mail through smart host
8
Today, internet e-mail has stopped being Peer-to-peer for many problems,
9
spam (unsolicited bulk mail) among them. Instead, most nodes on the
10
internet send all e-mail to a single computer, usually the ISP's though
11
sometimes other schemes, such as SMTP-after-POP, are used. This computer
12
is supposedly permanently up and traceable, and will do the work of
13
figuring out MXs and connecting to them. This kind of configuration
14
is usually termed "smart host", since the host we are connecting to
15
is "smart" (and will find MXs and connect to them) rather then just
16
accepting mail for a small set of domains.
18
The classes here are meant to facilitate support for such a configuration
19
for the twisted.mail SMTP server
27
import cPickle as pickle
31
from twisted.python import log
32
from twisted.python.failure import Failure
33
from twisted.python.compat import set
34
from twisted.mail import relay
35
from twisted.mail import bounce
36
from twisted.internet import protocol
37
from twisted.internet.defer import Deferred, DeferredList
38
from twisted.internet.error import DNSLookupError
39
from twisted.mail import smtp
40
from twisted.application import internet
42
class ManagedRelayerMixin:
43
"""SMTP Relayer which notifies a manager
45
Notify the manager about successful mail, failed mail
46
and broken connections
49
def __init__(self, manager):
50
self.manager = manager
52
def sentMail(self, code, resp, numOk, addresses, log):
53
"""called when e-mail has been sent
55
we will always get 0 or 1 addresses.
57
message = self.names[0]
58
if code in smtp.SUCCESS:
59
self.manager.notifySuccess(self.factory, message)
61
self.manager.notifyFailure(self.factory, message)
65
def connectionLost(self, reason):
66
"""called when connection is broken
68
notify manager we will try to send no more e-mail
70
self.manager.notifyDone(self.factory)
72
class SMTPManagedRelayer(ManagedRelayerMixin, relay.SMTPRelayer):
73
def __init__(self, messages, manager, *args, **kw):
75
@type messages: C{list} of C{str}
76
@param messages: Filenames of messages to relay
78
manager should support .notifySuccess, .notifyFailure
81
ManagedRelayerMixin.__init__(self, manager)
82
relay.SMTPRelayer.__init__(self, messages, *args, **kw)
84
class ESMTPManagedRelayer(ManagedRelayerMixin, relay.ESMTPRelayer):
85
def __init__(self, messages, manager, *args, **kw):
87
@type messages: C{list} of C{str}
88
@param messages: Filenames of messages to relay
90
manager should support .notifySuccess, .notifyFailure
93
ManagedRelayerMixin.__init__(self, manager)
94
relay.ESMTPRelayer.__init__(self, messages, *args, **kw)
96
class SMTPManagedRelayerFactory(protocol.ClientFactory):
97
protocol = SMTPManagedRelayer
99
def __init__(self, messages, manager, *args, **kw):
100
self.messages = messages
101
self.manager = manager
105
def buildProtocol(self, addr):
106
protocol = self.protocol(self.messages, self.manager, *self.pArgs,
108
protocol.factory = self
111
def clientConnectionFailed(self, connector, reason):
112
"""called when connection could not be made
114
our manager should be notified that this happened,
115
it might prefer some other host in that case"""
116
self.manager.notifyNoConnection(self)
117
self.manager.notifyDone(self)
119
class ESMTPManagedRelayerFactory(SMTPManagedRelayerFactory):
120
protocol = ESMTPManagedRelayer
122
def __init__(self, messages, manager, secret, contextFactory, *args, **kw):
124
self.contextFactory = contextFactory
125
SMTPManagedRelayerFactory.__init__(self, messages, manager, *args, **kw)
127
def buildProtocol(self, addr):
128
s = self.secret and self.secret(addr)
129
protocol = self.protocol(self.messages, self.manager, s,
130
self.contextFactory, *self.pArgs, **self.pKwArgs)
131
protocol.factory = self
135
"""A queue of ougoing emails."""
139
def __init__(self, directory):
140
self.directory = directory
149
def __getstate__(self):
150
"""(internal) delete volatile state"""
151
return {'directory' : self.directory}
153
def __setstate__(self, state):
154
"""(internal) restore volatile state"""
155
self.__dict__.update(state)
158
def readDirectory(self):
159
"""Read the messages directory.
161
look for new messages.
163
for message in os.listdir(self.directory):
164
# Skip non data files
165
if message[-2:]!='-D':
167
self.addMessage(message[:-2])
169
def getWaiting(self):
170
return self.waiting.keys()
172
def hasWaiting(self):
173
return len(self.waiting) > 0
175
def getRelayed(self):
176
return self.relayed.keys()
178
def setRelaying(self, message):
179
del self.waiting[message]
180
self.relayed[message] = 1
182
def setWaiting(self, message):
183
del self.relayed[message]
184
self.waiting[message] = 1
186
def addMessage(self, message):
187
if message not in self.relayed:
188
self.waiting[message] = 1
190
log.msg('Set ' + message + ' waiting')
192
def done(self, message):
193
"""Remove message to from queue."""
194
message = os.path.basename(message)
195
os.remove(self.getPath(message) + '-D')
196
os.remove(self.getPath(message) + '-H')
197
del self.relayed[message]
199
def getPath(self, message):
200
"""Get the path in the filesystem of a message."""
201
return os.path.join(self.directory, message)
203
def getEnvelope(self, message):
204
return pickle.load(self.getEnvelopeFile(message))
206
def getEnvelopeFile(self, message):
207
return open(os.path.join(self.directory, message+'-H'), 'rb')
209
def createNewMessage(self):
210
"""Create a new message in the queue.
212
Return a tuple - file-like object for headers, and ISMTPMessage.
214
fname = "%s_%s_%s_%s" % (os.getpid(), time.time(), self.n, id(self))
216
headerFile = open(os.path.join(self.directory, fname+'-H'), 'wb')
217
tempFilename = os.path.join(self.directory, fname+'-C')
218
finalFilename = os.path.join(self.directory, fname+'-D')
219
messageFile = open(tempFilename, 'wb')
221
from twisted.mail.mail import FileMessage
222
return headerFile,FileMessage(messageFile, tempFilename, finalFilename)
225
class _AttemptManager(object):
227
Manage the state of a single attempt to flush the relay queue.
229
def __init__(self, manager):
230
self.manager = manager
231
self._completionDeferreds = []
234
def getCompletionDeferred(self):
235
self._completionDeferreds.append(Deferred())
236
return self._completionDeferreds[-1]
239
def _finish(self, relay, message):
240
self.manager.managed[relay].remove(os.path.basename(message))
241
self.manager.queue.done(message)
244
def notifySuccess(self, relay, message):
245
"""a relay sent a message successfully
247
Mark it as sent in our lists
249
if self.manager.queue.noisy:
250
log.msg("success sending %s, removing from queue" % message)
251
self._finish(relay, message)
254
def notifyFailure(self, relay, message):
255
"""Relaying the message has failed."""
256
if self.manager.queue.noisy:
257
log.msg("could not relay "+message)
258
# Moshe - Bounce E-mail here
259
# Be careful: if it's a bounced bounce, silently
261
message = os.path.basename(message)
262
fp = self.manager.queue.getEnvelopeFile(message)
263
from_, to = pickle.load(fp)
265
from_, to, bounceMessage = bounce.generateBounce(open(self.manager.queue.getPath(message)+'-D'), from_, to)
266
fp, outgoingMessage = self.manager.queue.createNewMessage()
267
pickle.dump([from_, to], fp)
269
for line in bounceMessage.splitlines():
270
outgoingMessage.lineReceived(line)
271
outgoingMessage.eomReceived()
272
self._finish(relay, self.manager.queue.getPath(message))
275
def notifyDone(self, relay):
276
"""A relaying SMTP client is disconnected.
278
unmark all pending messages under this relay's responsibility
279
as being relayed, and remove the relay.
281
for message in self.manager.managed.get(relay, ()):
282
if self.manager.queue.noisy:
283
log.msg("Setting " + message + " waiting")
284
self.manager.queue.setWaiting(message)
286
del self.manager.managed[relay]
289
notifications = self._completionDeferreds
290
self._completionDeferreds = None
291
for d in notifications:
295
def notifyNoConnection(self, relay):
296
"""Relaying SMTP client couldn't connect.
298
Useful because it tells us our upstream server is unavailable.
302
msgs = self.manager.managed[relay]
304
log.msg("notifyNoConnection passed unknown relay!")
307
if self.manager.queue.noisy:
308
log.msg("Backing off on delivery of " + str(msgs))
309
def setWaiting(queue, messages):
310
map(queue.setWaiting, messages)
311
from twisted.internet import reactor
312
reactor.callLater(30, setWaiting, self.manager.queue, msgs)
313
del self.manager.managed[relay]
317
class SmartHostSMTPRelayingManager:
318
"""Manage SMTP Relayers
320
Manage SMTP relayers, keeping track of the existing connections,
321
each connection's responsibility in term of messages. Create
322
more relayers if the need arises.
324
Someone should press .checkState periodically
326
@ivar fArgs: Additional positional arguments used to instantiate
329
@ivar fKwArgs: Additional keyword arguments used to instantiate
332
@ivar factory: A callable which returns a ClientFactory suitable for
333
making SMTP connections.
336
factory = SMTPManagedRelayerFactory
342
def __init__(self, queue, maxConnections=2, maxMessagesPerConnection=10):
344
@type queue: Any implementor of C{IQueue}
345
@param queue: The object used to queue messages on their way to
348
@type maxConnections: C{int}
349
@param maxConnections: The maximum number of SMTP connections to
350
allow to be opened at any given time.
352
@type maxMessagesPerConnection: C{int}
353
@param maxMessagesPerConnection: The maximum number of messages a
354
relayer will be given responsibility for.
356
Default values are meant for a small box with 1-5 users.
358
self.maxConnections = maxConnections
359
self.maxMessagesPerConnection = maxMessagesPerConnection
360
self.managed = {} # SMTP clients we're managing
365
def __getstate__(self):
366
"""(internal) delete volatile state"""
367
dct = self.__dict__.copy()
371
def __setstate__(self, state):
372
"""(internal) restore volatile state"""
373
self.__dict__.update(state)
376
def checkState(self):
378
Synchronize with the state of the world, and maybe launch a new
381
Call me periodically to check I am still up to date.
383
@return: None or a Deferred which fires when all of the SMTP clients
384
started by this call have disconnected.
386
self.queue.readDirectory()
387
if (len(self.managed) >= self.maxConnections):
389
if not self.queue.hasWaiting():
392
return self._checkStateMX()
394
def _checkStateMX(self):
395
nextMessages = self.queue.getWaiting()
396
nextMessages.reverse()
399
for msg in nextMessages:
400
from_, to = self.queue.getEnvelope(msg)
401
name, addr = rfc822.parseaddr(to)
402
parts = addr.split('@', 1)
404
log.err("Illegal message destination: " + to)
408
self.queue.setRelaying(msg)
409
exchanges.setdefault(domain, []).append(self.queue.getPath(msg))
410
if len(exchanges) >= (self.maxConnections - len(self.managed)):
413
if self.mxcalc is None:
414
self.mxcalc = MXCalculator()
417
for (domain, msgs) in exchanges.iteritems():
418
manager = _AttemptManager(self)
419
factory = self.factory(msgs, manager, *self.fArgs, **self.fKwArgs)
420
self.managed[factory] = map(os.path.basename, msgs)
421
relayAttemptDeferred = manager.getCompletionDeferred()
422
connectSetupDeferred = self.mxcalc.getMX(domain)
423
connectSetupDeferred.addCallback(lambda mx: str(mx.name))
424
connectSetupDeferred.addCallback(self._cbExchange, self.PORT, factory)
425
connectSetupDeferred.addErrback(lambda err: (relayAttemptDeferred.errback(err), err)[1])
426
connectSetupDeferred.addErrback(self._ebExchange, factory, domain)
427
relays.append(relayAttemptDeferred)
428
return DeferredList(relays)
431
def _cbExchange(self, address, port, factory):
432
from twisted.internet import reactor
433
reactor.connectTCP(address, port, factory)
435
def _ebExchange(self, failure, factory, domain):
436
log.err('Error setting up managed relay factory for ' + domain)
438
def setWaiting(queue, messages):
439
map(queue.setWaiting, messages)
440
from twisted.internet import reactor
441
reactor.callLater(30, setWaiting, self.queue, self.managed[factory])
442
del self.managed[factory]
444
class SmartHostESMTPRelayingManager(SmartHostSMTPRelayingManager):
445
factory = ESMTPManagedRelayerFactory
447
def _checkState(manager):
450
def RelayStateHelper(manager, delay):
451
return internet.TimerService(delay, _checkState, manager)
455
class CanonicalNameLoop(Exception):
457
When trying to look up the MX record for a host, a set of CNAME records was
458
found which form a cycle and resolution was abandoned.
462
class CanonicalNameChainTooLong(Exception):
464
When trying to look up the MX record for a host, too many CNAME records
465
which point to other CNAME records were encountered and resolution was
472
A utility for looking up mail exchange hosts and tracking whether they are
475
@ivar clock: L{IReactorTime} provider which will be used to decide when to
476
retry mail exchanges which have not been working.
478
timeOutBadMX = 60 * 60 # One hour
479
fallbackToDomain = True
481
def __init__(self, resolver=None, clock=None):
484
from twisted.names.client import createResolver
485
resolver = createResolver()
486
self.resolver = resolver
488
from twisted.internet import reactor as clock
492
def markBad(self, mx):
493
"""Indicate a given mx host is not currently functioning.
496
@param mx: The hostname of the host which is down.
498
self.badMXs[str(mx)] = self.clock.seconds() + self.timeOutBadMX
500
def markGood(self, mx):
501
"""Indicate a given mx host is back online.
504
@param mx: The hostname of the host which is up.
511
def getMX(self, domain, maximumCanonicalChainLength=3):
513
Find an MX record for the given domain.
516
@param domain: The domain name for which to look up an MX record.
518
@type maximumCanonicalChainLength: C{int}
519
@param maximumCanonicalChainLength: The maximum number of unique CNAME
520
records to follow while looking up the MX record.
522
@return: A L{Deferred} which is called back with a string giving the
523
name in the found MX record or which is errbacked if no MX record
526
mailExchangeDeferred = self.resolver.lookupMailExchange(domain)
527
mailExchangeDeferred.addCallback(self._filterRecords)
528
mailExchangeDeferred.addCallback(
529
self._cbMX, domain, maximumCanonicalChainLength)
530
mailExchangeDeferred.addErrback(self._ebMX, domain)
531
return mailExchangeDeferred
534
def _filterRecords(self, records):
536
Convert a DNS response (a three-tuple of lists of RRHeaders) into a
537
mapping from record names to lists of corresponding record payloads.
540
for answer in records[0]:
541
recordBag.setdefault(str(answer.name), []).append(answer.payload)
545
def _cbMX(self, answers, domain, cnamesLeft):
547
Try to find the MX host from the given DNS information.
549
This will attempt to resolve CNAME results. It can recognize loops
550
and will give up on non-cyclic chains after a specified number of
553
# Do this import here so that relaymanager.py doesn't depend on
554
# twisted.names, only MXCalculator will.
555
from twisted.names import dns, error
559
# Examine the answers for the domain we asked about
560
pertinentRecords = answers.get(domain, [])
561
while pertinentRecords:
562
record = pertinentRecords.pop()
564
# If it's a CNAME, we'll need to do some more processing
565
if record.TYPE == dns.CNAME:
567
# Remember that this name was an alias.
568
seenAliases.add(domain)
570
canonicalName = str(record.name)
571
# See if we have some local records which might be relevant.
572
if canonicalName in answers:
574
# Make sure it isn't a loop contained entirely within the
575
# results we have here.
576
if canonicalName in seenAliases:
577
return Failure(CanonicalNameLoop(record))
579
pertinentRecords = answers[canonicalName]
583
# Request more information from the server.
584
return self.getMX(canonicalName, cnamesLeft - 1)
587
return Failure(CanonicalNameChainTooLong(record))
589
# If it's an MX, collect it.
590
if record.TYPE == dns.MX:
591
exchanges.append((record.preference, record))
595
for (preference, record) in exchanges:
596
host = str(record.name)
597
if host not in self.badMXs:
599
t = self.clock.seconds() - self.badMXs[host]
601
del self.badMXs[host]
603
return exchanges[0][1]
605
# Treat no answers the same as an error - jump to the errback to try
606
# to look up an A record. This provides behavior described as a
607
# special case in RFC 974 in the section headed I{Interpreting the
610
error.DNSNameError("No MX records for %r" % (domain,)))
613
def _ebMX(self, failure, domain):
614
from twisted.names import error, dns
616
if self.fallbackToDomain:
617
failure.trap(error.DNSNameError)
618
log.msg("MX lookup failed; attempting to use hostname (%s) directly" % (domain,))
620
# Alright, I admit, this is a bit icky.
621
d = self.resolver.getHostByName(domain)
622
def cbResolved(addr):
623
return dns.Record_MX(name=addr)
625
err.trap(error.DNSNameError)
626
raise DNSLookupError()
627
d.addCallbacks(cbResolved, ebResolved)
629
elif failure.check(error.DNSNameError):
630
raise IOError("No MX found for %r" % (domain,))