~landscape/zope3/newer-from-ztk

« back to all changes in this revision

Viewing changes to src/twisted/mail/test/test_smtp.py

  • Committer: Thomas Hervé
  • Date: 2009-07-08 13:52:04 UTC
  • Revision ID: thomas@canonical.com-20090708135204-df5eesrthifpylf8
Remove twisted copy

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (c) 2001-2004 Twisted Matrix Laboratories.
2
 
# See LICENSE for details.
3
 
 
4
 
 
5
 
"""
6
 
Test cases for twisted.mail.smtp module.
7
 
"""
8
 
 
9
 
import time
10
 
from zope.interface import implements
11
 
 
12
 
from twisted.trial import unittest, util
13
 
from twisted import protocols
14
 
from twisted import internet
15
 
from twisted.protocols import loopback
16
 
from twisted.mail import smtp
17
 
from twisted.internet import defer, protocol, reactor, interfaces, address, error
18
 
from twisted.test.test_protocols import StringIOWithoutClosing
19
 
from twisted.test.proto_helpers import StringTransport
20
 
 
21
 
from twisted import cred
22
 
import twisted.cred.error
23
 
import twisted.cred.portal
24
 
import twisted.cred.checkers
25
 
import twisted.cred.credentials
26
 
 
27
 
from twisted.cred.portal import IRealm, Portal
28
 
from twisted.cred.checkers import ICredentialsChecker, AllowAnonymousAccess
29
 
from twisted.cred.credentials import IAnonymous
30
 
from twisted.cred.error import UnauthorizedLogin
31
 
 
32
 
from twisted.mail import imap4
33
 
 
34
 
 
35
 
try:
36
 
    from twisted.test.ssl_helpers import ClientTLSContext, ServerTLSContext
37
 
except ImportError:
38
 
    ClientTLSContext = ServerTLSContext = None
39
 
 
40
 
import re
41
 
 
42
 
try:
43
 
    from cStringIO import StringIO
44
 
except ImportError:
45
 
    from StringIO import StringIO
46
 
 
47
 
def spameater(*spam, **eggs):
48
 
    return None
49
 
 
50
 
class DummyMessage:
51
 
 
52
 
    def __init__(self, domain, user):
53
 
        self.domain = domain
54
 
        self.user = user
55
 
        self.buffer = []
56
 
 
57
 
    def lineReceived(self, line):
58
 
        # Throw away the generated Received: header
59
 
        if not re.match('Received: From yyy.com \(\[.*\]\) by localhost;', line):
60
 
            self.buffer.append(line)
61
 
 
62
 
    def eomReceived(self):
63
 
        message = '\n'.join(self.buffer) + '\n'
64
 
        self.domain.messages[self.user.dest.local].append(message)
65
 
        deferred = defer.Deferred()
66
 
        deferred.callback("saved")
67
 
        return deferred
68
 
 
69
 
 
70
 
class DummyDomain:
71
 
 
72
 
   def __init__(self, names):
73
 
       self.messages = {}
74
 
       for name in names:
75
 
           self.messages[name] = []
76
 
 
77
 
   def exists(self, user):
78
 
       if self.messages.has_key(user.dest.local):
79
 
           return defer.succeed(lambda: self.startMessage(user))
80
 
       return defer.fail(smtp.SMTPBadRcpt(user))
81
 
 
82
 
   def startMessage(self, user):
83
 
       return DummyMessage(self, user)
84
 
 
85
 
class SMTPTestCase(unittest.TestCase):
86
 
 
87
 
    messages = [('foo@bar.com', ['foo@baz.com', 'qux@baz.com'], '''\
88
 
Subject: urgent\015
89
 
\015
90
 
Someone set up us the bomb!\015
91
 
''')]
92
 
 
93
 
    mbox = {'foo': ['Subject: urgent\n\nSomeone set up us the bomb!\n']}
94
 
 
95
 
    def setUp(self):
96
 
        self.factory = smtp.SMTPFactory()
97
 
        self.factory.domains = {}
98
 
        self.factory.domains['baz.com'] = DummyDomain(['foo'])
99
 
        self.output = StringIOWithoutClosing()
100
 
        self.transport = internet.protocol.FileWrapper(self.output)
101
 
 
102
 
    def testMessages(self):
103
 
        from twisted.mail import protocols
104
 
        protocol =  protocols.DomainSMTP()
105
 
        protocol.service = self.factory
106
 
        protocol.factory = self.factory
107
 
        protocol.receivedHeader = spameater
108
 
        protocol.makeConnection(self.transport)
109
 
        protocol.lineReceived('HELO yyy.com')
110
 
        for message in self.messages:
111
 
            protocol.lineReceived('MAIL FROM:<%s>' % message[0])
112
 
            for target in message[1]:
113
 
                protocol.lineReceived('RCPT TO:<%s>' % target)
114
 
            protocol.lineReceived('DATA')
115
 
            protocol.dataReceived(message[2])
116
 
            protocol.lineReceived('.')
117
 
        protocol.lineReceived('QUIT')
118
 
        if self.mbox != self.factory.domains['baz.com'].messages:
119
 
            raise AssertionError(self.factory.domains['baz.com'].messages)
120
 
        protocol.setTimeout(None)
121
 
        
122
 
    testMessages.suppress = [util.suppress(message='DomainSMTP', category=DeprecationWarning)]
123
 
 
124
 
mail = '''\
125
 
Subject: hello
126
 
 
127
 
Goodbye
128
 
'''
129
 
 
130
 
class MyClient:
131
 
    def __init__(self):
132
 
        self.mail = 'moshez@foo.bar', ['moshez@foo.bar'], mail
133
 
 
134
 
    def getMailFrom(self):
135
 
        return self.mail[0]
136
 
 
137
 
    def getMailTo(self):
138
 
        return self.mail[1]
139
 
 
140
 
    def getMailData(self):
141
 
        return StringIO(self.mail[2])
142
 
 
143
 
    def sentMail(self, code, resp, numOk, addresses, log):
144
 
        self.mail = None, None, None
145
 
 
146
 
class MySMTPClient(MyClient, smtp.SMTPClient):
147
 
    def __init__(self):
148
 
        smtp.SMTPClient.__init__(self, 'foo.baz')
149
 
        MyClient.__init__(self)
150
 
 
151
 
class MyESMTPClient(MyClient, smtp.ESMTPClient):
152
 
    def __init__(self, secret = '', contextFactory = None):
153
 
        smtp.ESMTPClient.__init__(self, secret, contextFactory, 'foo.baz')
154
 
        MyClient.__init__(self)
155
 
 
156
 
class LoopbackMixin:
157
 
    def loopback(self, server, client):
158
 
        return loopback.loopbackTCP(server, client)
159
 
 
160
 
class LoopbackTestCase(LoopbackMixin):
161
 
    def testMessages(self):
162
 
        factory = smtp.SMTPFactory()
163
 
        factory.domains = {}
164
 
        factory.domains['foo.bar'] = DummyDomain(['moshez'])
165
 
        from twisted.mail.protocols import DomainSMTP
166
 
        protocol =  DomainSMTP()
167
 
        protocol.service = factory
168
 
        protocol.factory = factory
169
 
        clientProtocol = self.clientClass()
170
 
        return self.loopback(protocol, clientProtocol)
171
 
    testMessages.suppress = [util.suppress(message='DomainSMTP', category=DeprecationWarning)]
172
 
 
173
 
class LoopbackSMTPTestCase(LoopbackTestCase, unittest.TestCase):
174
 
    clientClass = MySMTPClient
175
 
 
176
 
class LoopbackESMTPTestCase(LoopbackTestCase, unittest.TestCase):
177
 
    clientClass = MyESMTPClient
178
 
 
179
 
 
180
 
class FakeSMTPServer(protocols.basic.LineReceiver):
181
 
 
182
 
    clientData = [
183
 
        '220 hello', '250 nice to meet you',
184
 
        '250 great', '250 great', '354 go on, lad'
185
 
    ]
186
 
 
187
 
    def connectionMade(self):
188
 
        self.buffer = []
189
 
        self.clientData = self.clientData[:]
190
 
        self.clientData.reverse()
191
 
        self.sendLine(self.clientData.pop())
192
 
 
193
 
    def lineReceived(self, line):
194
 
        self.buffer.append(line)
195
 
        if line == "QUIT":
196
 
            self.transport.write("221 see ya around\r\n")
197
 
            self.transport.loseConnection()
198
 
        elif line == ".":
199
 
            self.transport.write("250 gotcha\r\n")
200
 
        elif line == "RSET":
201
 
            self.transport.loseConnection()
202
 
 
203
 
        if self.clientData:
204
 
            self.sendLine(self.clientData.pop())
205
 
 
206
 
 
207
 
class SMTPClientTestCase(unittest.TestCase, LoopbackMixin):
208
 
 
209
 
    expected_output = [
210
 
        'HELO foo.baz', 'MAIL FROM:<moshez@foo.bar>',
211
 
        'RCPT TO:<moshez@foo.bar>', 'DATA',
212
 
        'Subject: hello', '', 'Goodbye', '.', 'RSET'
213
 
    ]
214
 
 
215
 
    def testMessages(self):
216
 
        # this test is disabled temporarily
217
 
        client = MySMTPClient()
218
 
        server = FakeSMTPServer()
219
 
        d = self.loopback(server, client)
220
 
        d.addCallback(lambda x :
221
 
                      self.assertEquals(server.buffer, self.expected_output))
222
 
        return d
223
 
 
224
 
class DummySMTPMessage:
225
 
 
226
 
    def __init__(self, protocol, users):
227
 
        self.protocol = protocol
228
 
        self.users = users
229
 
        self.buffer = []
230
 
 
231
 
    def lineReceived(self, line):
232
 
        self.buffer.append(line)
233
 
 
234
 
    def eomReceived(self):
235
 
        message = '\n'.join(self.buffer) + '\n'
236
 
        helo, origin = self.users[0].helo[0], str(self.users[0].orig)
237
 
        recipients = []
238
 
        for user in self.users:
239
 
            recipients.append(str(user))
240
 
        self.protocol.message[tuple(recipients)] = (helo, origin, recipients, message)
241
 
        return defer.succeed("saved")
242
 
 
243
 
class DummyProto:
244
 
    def connectionMade(self):
245
 
        self.dummyMixinBase.connectionMade(self)
246
 
        self.message = {}
247
 
 
248
 
    def startMessage(self, users):
249
 
        return DummySMTPMessage(self, users)
250
 
 
251
 
    def receivedHeader(*spam):
252
 
        return None
253
 
 
254
 
    def validateTo(self, user):
255
 
        self.delivery = DummyDelivery()
256
 
        return lambda: self.startMessage([user])
257
 
 
258
 
    def validateFrom(self, helo, origin):
259
 
        return origin
260
 
 
261
 
class DummySMTP(DummyProto, smtp.SMTP):
262
 
    dummyMixinBase = smtp.SMTP
263
 
 
264
 
class DummyESMTP(DummyProto, smtp.ESMTP):
265
 
    dummyMixinBase = smtp.ESMTP
266
 
 
267
 
class AnotherTestCase:
268
 
    serverClass = None
269
 
    clientClass = None
270
 
 
271
 
    messages = [ ('foo.com', 'moshez@foo.com', ['moshez@bar.com'],
272
 
                  'moshez@foo.com', ['moshez@bar.com'], '''\
273
 
From: Moshe
274
 
To: Moshe
275
 
 
276
 
Hi,
277
 
how are you?
278
 
'''),
279
 
                 ('foo.com', 'tttt@rrr.com', ['uuu@ooo', 'yyy@eee'],
280
 
                  'tttt@rrr.com', ['uuu@ooo', 'yyy@eee'], '''\
281
 
Subject: pass
282
 
 
283
 
..rrrr..
284
 
'''),
285
 
                 ('foo.com', '@this,@is,@ignored:foo@bar.com',
286
 
                  ['@ignore,@this,@too:bar@foo.com'],
287
 
                  'foo@bar.com', ['bar@foo.com'], '''\
288
 
Subject: apa
289
 
To: foo
290
 
 
291
 
123
292
 
.
293
 
456
294
 
'''),
295
 
              ]
296
 
 
297
 
    data = [
298
 
        ('', '220.*\r\n$', None, None),
299
 
        ('HELO foo.com\r\n', '250.*\r\n$', None, None),
300
 
        ('RSET\r\n', '250.*\r\n$', None, None),
301
 
        ]
302
 
    for helo_, from_, to_, realfrom, realto, msg in messages:
303
 
        data.append(('MAIL FROM:<%s>\r\n' % from_, '250.*\r\n',
304
 
                     None, None))
305
 
        for rcpt in to_:
306
 
            data.append(('RCPT TO:<%s>\r\n' % rcpt, '250.*\r\n',
307
 
                         None, None))
308
 
 
309
 
        data.append(('DATA\r\n','354.*\r\n',
310
 
                     msg, ('250.*\r\n',
311
 
                           (helo_, realfrom, realto, msg))))
312
 
 
313
 
 
314
 
    def testBuffer(self):
315
 
        output = StringIOWithoutClosing()
316
 
        a = self.serverClass()
317
 
        class fooFactory:
318
 
            domain = 'foo.com'
319
 
 
320
 
        a.factory = fooFactory()
321
 
        a.makeConnection(protocol.FileWrapper(output))
322
 
        for (send, expect, msg, msgexpect) in self.data:
323
 
            if send:
324
 
                a.dataReceived(send)
325
 
            data = output.getvalue()
326
 
            output.truncate(0)
327
 
            if not re.match(expect, data):
328
 
                raise AssertionError, (send, expect, data)
329
 
            if data[:3] == '354':
330
 
                for line in msg.splitlines():
331
 
                    if line and line[0] == '.':
332
 
                        line = '.' + line
333
 
                    a.dataReceived(line + '\r\n')
334
 
                a.dataReceived('.\r\n')
335
 
                # Special case for DATA. Now we want a 250, and then
336
 
                # we compare the messages
337
 
                data = output.getvalue()
338
 
                output.truncate()
339
 
                resp, msgdata = msgexpect
340
 
                if not re.match(resp, data):
341
 
                    raise AssertionError, (resp, data)
342
 
                for recip in msgdata[2]:
343
 
                    expected = list(msgdata[:])
344
 
                    expected[2] = [recip]
345
 
                    self.assertEquals(
346
 
                        a.message[(recip,)],
347
 
                        tuple(expected)
348
 
                    )
349
 
        a.setTimeout(None)
350
 
 
351
 
 
352
 
class AnotherESMTPTestCase(AnotherTestCase, unittest.TestCase):
353
 
    serverClass = DummyESMTP
354
 
    clientClass = MyESMTPClient
355
 
 
356
 
class AnotherSMTPTestCase(AnotherTestCase, unittest.TestCase):
357
 
    serverClass = DummySMTP
358
 
    clientClass = MySMTPClient
359
 
 
360
 
 
361
 
 
362
 
class DummyChecker:
363
 
    implements(cred.checkers.ICredentialsChecker)
364
 
 
365
 
    users = {
366
 
        'testuser': 'testpassword'
367
 
    }
368
 
 
369
 
    credentialInterfaces = (cred.credentials.IUsernameHashedPassword,)
370
 
 
371
 
    def requestAvatarId(self, credentials):
372
 
        return defer.maybeDeferred(
373
 
            credentials.checkPassword, self.users[credentials.username]
374
 
        ).addCallback(self._cbCheck, credentials.username)
375
 
 
376
 
    def _cbCheck(self, result, username):
377
 
        if result:
378
 
            return username
379
 
        raise cred.error.UnauthorizedLogin()
380
 
 
381
 
class DummyDelivery:
382
 
    implements(smtp.IMessageDelivery)
383
 
 
384
 
    def validateTo(self, user):
385
 
        return user
386
 
 
387
 
    def validateFrom(self, helo, origin):
388
 
        return origin
389
 
 
390
 
    def receivedHeader(*args):
391
 
        return None
392
 
 
393
 
class DummyRealm:
394
 
    def requestAvatar(self, avatarId, mind, *interfaces):
395
 
        return smtp.IMessageDelivery, DummyDelivery(), lambda: None
396
 
 
397
 
class AuthTestCase(unittest.TestCase, LoopbackMixin):
398
 
    def testAuth(self):
399
 
        realm = DummyRealm()
400
 
        p = cred.portal.Portal(realm)
401
 
        p.registerChecker(DummyChecker())
402
 
 
403
 
        server = DummyESMTP({'CRAM-MD5': cred.credentials.CramMD5Credentials})
404
 
        server.portal = p
405
 
        client = MyESMTPClient('testpassword')
406
 
 
407
 
        cAuth = imap4.CramMD5ClientAuthenticator('testuser')
408
 
        client.registerAuthenticator(cAuth)
409
 
 
410
 
        d = self.loopback(server, client)
411
 
        d.addCallback(lambda x : self.assertEquals(server.authenticated, 1))
412
 
        return d
413
 
 
414
 
class SMTPHelperTestCase(unittest.TestCase):
415
 
    def testMessageID(self):
416
 
        d = {}
417
 
        for i in range(1000):
418
 
            m = smtp.messageid('testcase')
419
 
            self.failIf(m in d)
420
 
            d[m] = None
421
 
 
422
 
    def testQuoteAddr(self):
423
 
        cases = [
424
 
            ['user@host.name', '<user@host.name>'],
425
 
            ['"User Name" <user@host.name>', '<user@host.name>'],
426
 
            [smtp.Address('someguy@someplace'), '<someguy@someplace>'],
427
 
            ['', '<>'],
428
 
            [smtp.Address(''), '<>'],
429
 
        ]
430
 
 
431
 
        for (c, e) in cases:
432
 
            self.assertEquals(smtp.quoteaddr(c), e)
433
 
 
434
 
    def testUser(self):
435
 
        u = smtp.User('user@host', 'helo.host.name', None, None)
436
 
        self.assertEquals(str(u), 'user@host')
437
 
 
438
 
    def testXtextEncoding(self):
439
 
        cases = [
440
 
            ('Hello world', 'Hello+20world'),
441
 
            ('Hello+world', 'Hello+2Bworld'),
442
 
            ('\0\1\2\3\4\5', '+00+01+02+03+04+05'),
443
 
            ('e=mc2@example.com', 'e+3Dmc2@example.com')
444
 
        ]
445
 
 
446
 
        for (case, expected) in cases:
447
 
            self.assertEquals(case.encode('xtext'), expected)
448
 
            self.assertEquals(expected.decode('xtext'), case)
449
 
 
450
 
 
451
 
class NoticeTLSClient(MyESMTPClient):
452
 
    tls = False
453
 
 
454
 
    def esmtpState_starttls(self, code, resp):
455
 
        MyESMTPClient.esmtpState_starttls(self, code, resp)
456
 
        self.tls = True
457
 
 
458
 
class TLSTestCase(unittest.TestCase, LoopbackMixin):
459
 
    def testTLS(self):
460
 
        clientCTX = ClientTLSContext()
461
 
        serverCTX = ServerTLSContext()
462
 
 
463
 
        client = NoticeTLSClient(contextFactory=clientCTX)
464
 
        server = DummyESMTP(contextFactory=serverCTX)
465
 
 
466
 
        def check(ignored):
467
 
            self.assertEquals(client.tls, True)
468
 
            self.assertEquals(server.startedTLS, True)
469
 
 
470
 
        return self.loopback(server, client).addCallback(check)
471
 
 
472
 
if ClientTLSContext is None:
473
 
    for case in (TLSTestCase,):
474
 
        case.skip = "OpenSSL not present"
475
 
 
476
 
if not interfaces.IReactorSSL.providedBy(reactor):
477
 
    for case in (TLSTestCase,):
478
 
        case.skip = "Reactor doesn't support SSL"
479
 
 
480
 
class EmptyLineTestCase(unittest.TestCase):
481
 
    def testEmptyLineSyntaxError(self):
482
 
        proto = smtp.SMTP()
483
 
        output = StringIOWithoutClosing()
484
 
        transport = internet.protocol.FileWrapper(output)
485
 
        proto.makeConnection(transport)
486
 
        proto.lineReceived('')
487
 
        proto.setTimeout(None)
488
 
 
489
 
        out = output.getvalue().splitlines()
490
 
        self.assertEquals(len(out), 2)
491
 
        self.failUnless(out[0].startswith('220'))
492
 
        self.assertEquals(out[1], "500 Error: bad syntax")
493
 
 
494
 
class TimeoutTestCase(unittest.TestCase, LoopbackMixin):
495
 
    def _timeoutTest(self, onDone, clientFactory):
496
 
        before = time.time()
497
 
 
498
 
        client = clientFactory.buildProtocol(
499
 
            address.IPv4Address('TCP', 'example.net', 25))
500
 
        server = protocol.Protocol()
501
 
 
502
 
        def check(ignored):
503
 
            after = time.time()
504
 
            self.failIf(after - before > 1.0)
505
 
            return self.assertFailure(onDone, smtp.SMTPTimeoutError)
506
 
            
507
 
        return self.loopback(client, server).addCallback(check)
508
 
 
509
 
 
510
 
    def testSMTPClient(self):
511
 
        onDone = defer.Deferred()
512
 
        clientFactory = smtp.SMTPSenderFactory(
513
 
            'source@address', 'recipient@address',
514
 
            StringIO("Message body"), onDone,
515
 
            retries=0, timeout=0.5)
516
 
        return self._timeoutTest(onDone, clientFactory)
517
 
 
518
 
 
519
 
    def testESMTPClient(self):
520
 
        onDone = defer.Deferred()
521
 
        clientFactory = smtp.ESMTPSenderFactory(
522
 
            'username', 'password',
523
 
            'source@address', 'recipient@address',
524
 
            StringIO("Message body"), onDone,
525
 
            retries=0, timeout=0.5)
526
 
        return self._timeoutTest(onDone, clientFactory)
527
 
 
528
 
 
529
 
 
530
 
class SingletonRealm(object):
531
 
    """
532
 
    Trivial realm implementation which is constructed with an interface and an
533
 
    avatar and returns that avatar when asked for that interface.
534
 
    """
535
 
    implements(IRealm)
536
 
 
537
 
    def __init__(self, interface, avatar):
538
 
        self.interface = interface
539
 
        self.avatar = avatar
540
 
 
541
 
 
542
 
    def requestAvatar(self, avatarId, mind, *interfaces):
543
 
        for iface in interfaces:
544
 
            if iface is self.interface:
545
 
                return iface, self.avatar, lambda: None
546
 
 
547
 
 
548
 
 
549
 
class NotImplementedDelivery(object):
550
 
    """
551
 
    Non-implementation of L{smtp.IMessageDelivery} which only has methods which
552
 
    raise L{NotImplementedError}.  Subclassed by various tests to provide the
553
 
    particular behavior being tested.
554
 
    """
555
 
    def validateFrom(self, helo, origin):
556
 
        raise NotImplementedError("This oughtn't be called in the course of this test.")
557
 
 
558
 
 
559
 
    def validateTo(self, user):
560
 
        raise NotImplementedError("This oughtn't be called in the course of this test.")
561
 
 
562
 
 
563
 
    def receivedHeader(self, helo, origin, recipients):
564
 
        raise NotImplementedError("This oughtn't be called in the course of this test.")
565
 
 
566
 
 
567
 
 
568
 
class SMTPServerTestCase(unittest.TestCase):
569
 
    """
570
 
    Test various behaviors of L{twisted.mail.smtp.SMTP} and
571
 
    L{twisted.mail.smtp.ESMTP}.
572
 
    """
573
 
    def testSMTPGreetingHost(self, serverClass=smtp.SMTP):
574
 
        """
575
 
        Test that the specified hostname shows up in the SMTP server's
576
 
        greeting.
577
 
        """
578
 
        s = serverClass()
579
 
        s.host = "example.com"
580
 
        t = StringTransport()
581
 
        s.makeConnection(t)
582
 
        s.connectionLost(error.ConnectionDone())
583
 
        self.assertIn("example.com", t.value())
584
 
 
585
 
 
586
 
    def testSMTPGreetingNotExtended(self):
587
 
        """
588
 
        Test that the string "ESMTP" does not appear in the SMTP server's
589
 
        greeting since that string strongly suggests the presence of support
590
 
        for various SMTP extensions which are not supported by L{smtp.SMTP}.
591
 
        """
592
 
        s = smtp.SMTP()
593
 
        t = StringTransport()
594
 
        s.makeConnection(t)
595
 
        s.connectionLost(error.ConnectionDone())
596
 
        self.assertNotIn("ESMTP", t.value())
597
 
 
598
 
 
599
 
    def testESMTPGreetingHost(self):
600
 
        """
601
 
        Similar to testSMTPGreetingHost, but for the L{smtp.ESMTP} class.
602
 
        """
603
 
        self.testSMTPGreetingHost(smtp.ESMTP)
604
 
 
605
 
 
606
 
    def testESMTPGreetingExtended(self):
607
 
        """
608
 
        Test that the string "ESMTP" does appear in the ESMTP server's
609
 
        greeting since L{smtp.ESMTP} does support the SMTP extensions which
610
 
        that advertises to the client.
611
 
        """
612
 
        s = smtp.ESMTP()
613
 
        t = StringTransport()
614
 
        s.makeConnection(t)
615
 
        s.connectionLost(error.ConnectionDone())
616
 
        self.assertIn("ESMTP", t.value())
617
 
 
618
 
 
619
 
    def test_acceptSenderAddress(self):
620
 
        """
621
 
        Test that a C{MAIL FROM} command with an acceptable address is
622
 
        responded to with the correct success code.
623
 
        """
624
 
        class AcceptanceDelivery(NotImplementedDelivery):
625
 
            """
626
 
            Delivery object which accepts all senders as valid.
627
 
            """
628
 
            def validateFrom(self, helo, origin):
629
 
                return origin
630
 
 
631
 
        realm = SingletonRealm(smtp.IMessageDelivery, AcceptanceDelivery())
632
 
        portal = Portal(realm, [AllowAnonymousAccess()])
633
 
        proto = smtp.SMTP()
634
 
        proto.portal = portal
635
 
        trans = StringTransport()
636
 
        proto.makeConnection(trans)
637
 
 
638
 
        # Deal with the necessary preliminaries
639
 
        proto.dataReceived('HELO example.com\r\n')
640
 
        trans.clear()
641
 
 
642
 
        # Try to specify our sender address
643
 
        proto.dataReceived('MAIL FROM:<alice@example.com>\r\n')
644
 
 
645
 
        # Clean up the protocol before doing anything that might raise an
646
 
        # exception.
647
 
        proto.connectionLost(error.ConnectionLost())
648
 
 
649
 
        # Make sure that we received exactly the correct response
650
 
        self.assertEqual(
651
 
            trans.value(),
652
 
            '250 Sender address accepted\r\n')
653
 
 
654
 
 
655
 
    def test_deliveryRejectedSenderAddress(self):
656
 
        """
657
 
        Test that a C{MAIL FROM} command with an address rejected by a
658
 
        L{smtp.IMessageDelivery} instance is responded to with the correct
659
 
        error code.
660
 
        """
661
 
        class RejectionDelivery(NotImplementedDelivery):
662
 
            """
663
 
            Delivery object which rejects all senders as invalid.
664
 
            """
665
 
            def validateFrom(self, helo, origin):
666
 
                raise smtp.SMTPBadSender(origin)
667
 
 
668
 
        realm = SingletonRealm(smtp.IMessageDelivery, RejectionDelivery())
669
 
        portal = Portal(realm, [AllowAnonymousAccess()])
670
 
        proto = smtp.SMTP()
671
 
        proto.portal = portal
672
 
        trans = StringTransport()
673
 
        proto.makeConnection(trans)
674
 
 
675
 
        # Deal with the necessary preliminaries
676
 
        proto.dataReceived('HELO example.com\r\n')
677
 
        trans.clear()
678
 
 
679
 
        # Try to specify our sender address
680
 
        proto.dataReceived('MAIL FROM:<alice@example.com>\r\n')
681
 
 
682
 
        # Clean up the protocol before doing anything that might raise an
683
 
        # exception.
684
 
        proto.connectionLost(error.ConnectionLost())
685
 
 
686
 
        # Make sure that we received exactly the correct response
687
 
        self.assertEqual(
688
 
            trans.value(),
689
 
            '550 Cannot receive from specified address '
690
 
            '<alice@example.com>: Sender not acceptable\r\n')
691
 
 
692
 
 
693
 
    def test_portalRejectedSenderAddress(self):
694
 
        """
695
 
        Test that a C{MAIL FROM} command with an address rejected by an
696
 
        L{smtp.SMTP} instance's portal is responded to with the correct error
697
 
        code.
698
 
        """
699
 
        class DisallowAnonymousAccess(object):
700
 
            """
701
 
            Checker for L{IAnonymous} which rejects authentication attempts.
702
 
            """
703
 
            implements(ICredentialsChecker)
704
 
 
705
 
            credentialInterfaces = (IAnonymous,)
706
 
 
707
 
            def requestAvatarId(self, credentials):
708
 
                return defer.fail(UnauthorizedLogin())
709
 
 
710
 
        realm = SingletonRealm(smtp.IMessageDelivery, NotImplementedDelivery())
711
 
        portal = Portal(realm, [DisallowAnonymousAccess()])
712
 
        proto = smtp.SMTP()
713
 
        proto.portal = portal
714
 
        trans = StringTransport()
715
 
        proto.makeConnection(trans)
716
 
 
717
 
        # Deal with the necessary preliminaries
718
 
        proto.dataReceived('HELO example.com\r\n')
719
 
        trans.clear()
720
 
 
721
 
        # Try to specify our sender address
722
 
        proto.dataReceived('MAIL FROM:<alice@example.com>\r\n')
723
 
 
724
 
        # Clean up the protocol before doing anything that might raise an
725
 
        # exception.
726
 
        proto.connectionLost(error.ConnectionLost())
727
 
 
728
 
        # Make sure that we received exactly the correct response
729
 
        self.assertEqual(
730
 
            trans.value(),
731
 
            '550 Cannot receive from specified address '
732
 
            '<alice@example.com>: Sender not acceptable\r\n')
733
 
 
734
 
 
735
 
    def test_portalRejectedAnonymousSender(self):
736
 
        """
737
 
        Test that a C{MAIL FROM} command issued without first authenticating
738
 
        when a portal has been configured to disallow anonymous logins is
739
 
        responded to with the correct error code.
740
 
        """
741
 
        realm = SingletonRealm(smtp.IMessageDelivery, NotImplementedDelivery())
742
 
        portal = Portal(realm, [])
743
 
        proto = smtp.SMTP()
744
 
        proto.portal = portal
745
 
        trans = StringTransport()
746
 
        proto.makeConnection(trans)
747
 
 
748
 
        # Deal with the necessary preliminaries
749
 
        proto.dataReceived('HELO example.com\r\n')
750
 
        trans.clear()
751
 
 
752
 
        # Try to specify our sender address
753
 
        proto.dataReceived('MAIL FROM:<alice@example.com>\r\n')
754
 
 
755
 
        # Clean up the protocol before doing anything that might raise an
756
 
        # exception.
757
 
        proto.connectionLost(error.ConnectionLost())
758
 
 
759
 
        # Make sure that we received exactly the correct response
760
 
        self.assertEqual(
761
 
            trans.value(),
762
 
            '550 Cannot receive from specified address '
763
 
            '<alice@example.com>: Unauthenticated senders not allowed\r\n')
764
 
 
765
 
 
766
 
 
767
 
class ESMTPAuthenticationTestCase(unittest.TestCase):
768
 
    def assertServerResponse(self, bytes, response):
769
 
        """
770
 
        Assert that when the given bytes are delivered to the ESMTP server
771
 
        instance, it responds with the indicated lines.
772
 
 
773
 
        @type bytes: str
774
 
        @type response: list of str
775
 
        """
776
 
        self.transport.clear()
777
 
        self.server.dataReceived(bytes)
778
 
        self.assertEqual(
779
 
            response,
780
 
            self.transport.value().splitlines())
781
 
 
782
 
 
783
 
    def assertServerAuthenticated(self, loginArgs):
784
 
        """
785
 
        Assert that a login attempt has been made, that the credentials and
786
 
        interfaces passed to it are correct, and that when the login request
787
 
        is satisfied, a successful response is sent by the ESMTP server
788
 
        instance.
789
 
 
790
 
        @param loginArgs: A C{list} previously passed to L{portalFactory}.
791
 
        """
792
 
        d, credentials, mind, interfaces = loginArgs.pop()
793
 
        self.assertEqual(loginArgs, [])
794
 
        self.failUnless(twisted.cred.credentials.IUsernamePassword.providedBy(credentials))
795
 
        self.assertEqual(credentials.username, 'username')
796
 
        self.failUnless(credentials.checkPassword('password'))
797
 
        self.assertIn(smtp.IMessageDeliveryFactory, interfaces)
798
 
        self.assertIn(smtp.IMessageDelivery, interfaces)
799
 
        d.callback((smtp.IMessageDeliveryFactory, None, lambda: None))
800
 
 
801
 
        self.assertEqual(
802
 
            ["235 Authentication successful."],
803
 
            self.transport.value().splitlines())
804
 
 
805
 
 
806
 
    def setUp(self):
807
 
        """
808
 
        Create an ESMTP instance attached to a StringTransport.
809
 
        """
810
 
        self.server = smtp.ESMTP({
811
 
                'LOGIN': imap4.LOGINCredentials})
812
 
        self.server.host = 'localhost'
813
 
        self.transport = StringTransport(
814
 
            peerAddress=address.IPv4Address('TCP', '127.0.0.1', 12345))
815
 
        self.server.makeConnection(self.transport)
816
 
 
817
 
 
818
 
    def tearDown(self):
819
 
        """
820
 
        Disconnect the ESMTP instance to clean up its timeout DelayedCall.
821
 
        """
822
 
        self.server.connectionLost(error.ConnectionDone())
823
 
 
824
 
 
825
 
    def portalFactory(self, loginList):
826
 
        class DummyPortal:
827
 
            def login(self, credentials, mind, *interfaces):
828
 
                d = defer.Deferred()
829
 
                loginList.append((d, credentials, mind, interfaces))
830
 
                return d
831
 
        return DummyPortal()
832
 
 
833
 
 
834
 
    def test_authenticationCapabilityAdvertised(self):
835
 
        """
836
 
        Test that AUTH is advertised to clients which issue an EHLO command.
837
 
        """
838
 
        self.transport.clear()
839
 
        self.server.dataReceived('EHLO\r\n')
840
 
        responseLines = self.transport.value().splitlines()
841
 
        self.assertEqual(
842
 
            responseLines[0],
843
 
            "250-localhost Hello 127.0.0.1, nice to meet you")
844
 
        self.assertEqual(
845
 
            responseLines[1],
846
 
            "250 AUTH LOGIN")
847
 
        self.assertEqual(len(responseLines), 2)
848
 
 
849
 
 
850
 
    def test_plainAuthentication(self):
851
 
        """
852
 
        Test that the LOGIN authentication mechanism can be used
853
 
        """
854
 
        loginArgs = []
855
 
        self.server.portal = self.portalFactory(loginArgs)
856
 
 
857
 
        self.server.dataReceived('EHLO\r\n')
858
 
        self.transport.clear()
859
 
 
860
 
        self.assertServerResponse(
861
 
            'AUTH LOGIN\r\n',
862
 
            ["334 " + "User Name\0".encode('base64').strip()])
863
 
 
864
 
        self.assertServerResponse(
865
 
            'username'.encode('base64') + '\r\n',
866
 
            ["334 " + "Password\0".encode('base64').strip()])
867
 
 
868
 
        self.assertServerResponse(
869
 
            'password'.encode('base64').strip() + '\r\n',
870
 
            [])
871
 
 
872
 
        self.assertServerAuthenticated(loginArgs)
873
 
 
874
 
 
875
 
    def test_plainAuthenticationInitialResponse(self):
876
 
        """
877
 
        The response to the first challenge may be included on the AUTH command
878
 
        line.  Test that this is also supported.
879
 
        """
880
 
        loginArgs = []
881
 
        self.server.portal = self.portalFactory(loginArgs)
882
 
 
883
 
        self.server.dataReceived('EHLO\r\n')
884
 
        self.transport.clear()
885
 
 
886
 
        self.assertServerResponse(
887
 
            'AUTH LOGIN ' + "username".encode('base64').strip() + '\r\n',
888
 
            ["334 " + "Password\0".encode('base64').strip()])
889
 
 
890
 
        self.assertServerResponse(
891
 
            'password'.encode('base64').strip() + '\r\n',
892
 
            [])
893
 
 
894
 
        self.assertServerAuthenticated(loginArgs)
895
 
 
896
 
 
897
 
    def test_abortAuthentication(self):
898
 
        """
899
 
        Test that a challenge/response sequence can be aborted by the client.
900
 
        """
901
 
        loginArgs = []
902
 
        self.server.portal = self.portalFactory(loginArgs)
903
 
 
904
 
        self.server.dataReceived('EHLO\r\n')
905
 
        self.server.dataReceived('AUTH LOGIN\r\n')
906
 
 
907
 
        self.assertServerResponse(
908
 
            '*\r\n',
909
 
            ['501 Authentication aborted'])
910
 
 
911
 
 
912
 
    def test_invalidBase64EncodedResponse(self):
913
 
        """
914
 
        Test that a response which is not properly Base64 encoded results in
915
 
        the appropriate error code.
916
 
        """
917
 
        loginArgs = []
918
 
        self.server.portal = self.portalFactory(loginArgs)
919
 
 
920
 
        self.server.dataReceived('EHLO\r\n')
921
 
        self.server.dataReceived('AUTH LOGIN\r\n')
922
 
 
923
 
        self.assertServerResponse(
924
 
            'x\r\n',
925
 
            ['501 Syntax error in parameters or arguments'])
926
 
 
927
 
        self.assertEqual(loginArgs, [])
928
 
 
929
 
 
930
 
    def test_invalidBase64EncodedInitialResponse(self):
931
 
        """
932
 
        Like L{test_invalidBase64EncodedResponse} but for the case of an
933
 
        initial response included with the C{AUTH} command.
934
 
        """
935
 
        loginArgs = []
936
 
        self.server.portal = self.portalFactory(loginArgs)
937
 
 
938
 
        self.server.dataReceived('EHLO\r\n')
939
 
        self.assertServerResponse(
940
 
            'AUTH LOGIN x\r\n',
941
 
            ['501 Syntax error in parameters or arguments'])
942
 
 
943
 
        self.assertEqual(loginArgs, [])