~certify-web-dev/twisted/certify-trunk

« back to all changes in this revision

Viewing changes to twisted/mail/test/test_mail.py

  • Committer: Bazaar Package Importer
  • Author(s): Matthias Klose
  • Date: 2006-01-16 19:56:10 UTC
  • mfrom: (1.1.3 upstream)
  • Revision ID: james.westby@ubuntu.com-20060116195610-ykmxbia4mnnod9o2
Tags: 2.1.0-0ubuntu2
debian/copyright: Include copyright for python 2.3; some 2.3 files
are included in the upstream tarball, but not in the binary packages.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
 
2
 
# Copyright (c) 2001-2004 Twisted Matrix Laboratories.
3
 
# See LICENSE for details.
4
 
 
5
 
 
6
 
import os
7
 
import errno
8
 
import md5
9
 
import shutil
10
 
import smtplib
11
 
import pickle
12
 
import StringIO
13
 
import rfc822
14
 
 
15
 
from twisted.trial import unittest
16
 
import tempfile
17
 
 
18
 
from zope.interface import providedBy
19
 
 
20
 
from twisted.trial import unittest, util as tutil
21
 
from twisted.mail import smtp
22
 
from twisted.mail import pop3
23
 
from twisted.names import dns
24
 
from twisted.protocols import basic
25
 
from twisted.internet import protocol
26
 
from twisted.internet import defer
27
 
from twisted.internet import reactor
28
 
from twisted.internet import interfaces
29
 
from twisted.internet.error import DNSLookupError, CannotListenError
30
 
from twisted.internet import address
31
 
from twisted.python import components
32
 
from twisted.python import failure
33
 
from twisted.python import util
34
 
 
35
 
from twisted import mail
36
 
import twisted.mail.mail
37
 
import twisted.mail.maildir
38
 
import twisted.mail.relay
39
 
import twisted.mail.relaymanager
40
 
import twisted.mail.protocols
41
 
import twisted.mail.alias
42
 
 
43
 
from twisted import cred
44
 
import twisted.cred.credentials
45
 
import twisted.cred.checkers
46
 
import twisted.cred.portal
47
 
 
48
 
# Since we run a couple processes, we need SignalMixin from test_process
49
 
from twisted.test import test_process
50
 
 
51
 
from twisted.test.proto_helpers import LineSendingProtocol
52
 
 
53
 
class DomainWithDefaultsTestCase(unittest.TestCase):
54
 
    def testMethods(self):
55
 
        d = dict([(x, x + 10) for x in range(10)])
56
 
        d = mail.mail.DomainWithDefaultDict(d, 'Default')
57
 
 
58
 
        self.assertEquals(len(d), 10)
59
 
        self.assertEquals(list(iter(d)), range(10))
60
 
        self.assertEquals(list(d.iterkeys()), list(iter(d)))
61
 
 
62
 
        items = list(d.iteritems())
63
 
        items.sort()
64
 
        self.assertEquals(items, [(x, x + 10) for x in range(10)])
65
 
 
66
 
        values = list(d.itervalues())
67
 
        values.sort()
68
 
        self.assertEquals(values, range(10, 20))
69
 
 
70
 
        items = d.items()
71
 
        items.sort()
72
 
        self.assertEquals(items, [(x, x + 10) for x in range(10)])
73
 
 
74
 
        values = d.values()
75
 
        values.sort()
76
 
        self.assertEquals(values, range(10, 20))
77
 
 
78
 
        for x in range(10):
79
 
            self.assertEquals(d[x], x + 10)
80
 
            self.assertEquals(d.get(x), x + 10)
81
 
            self.failUnless(x in d)
82
 
            self.failUnless(d.has_key(x))
83
 
 
84
 
        del d[2], d[4], d[6]
85
 
 
86
 
        self.assertEquals(len(d), 7)
87
 
        self.assertEquals(d[2], 'Default')
88
 
        self.assertEquals(d[4], 'Default')
89
 
        self.assertEquals(d[6], 'Default')
90
 
 
91
 
        d.update({'a': None, 'b': (), 'c': '*'})
92
 
        self.assertEquals(len(d), 10)
93
 
        self.assertEquals(d['a'], None)
94
 
        self.assertEquals(d['b'], ())
95
 
        self.assertEquals(d['c'], '*')
96
 
 
97
 
        d.clear()
98
 
        self.assertEquals(len(d), 0)
99
 
 
100
 
        self.assertEquals(d.setdefault('key', 'value'), 'value')
101
 
        self.assertEquals(d['key'], 'value')
102
 
 
103
 
        self.assertEquals(d.popitem(), ('key', 'value'))
104
 
        self.assertEquals(len(d), 0)
105
 
 
106
 
class BounceTestCase(unittest.TestCase):
107
 
    def setUp(self):
108
 
        self.domain = mail.mail.BounceDomain()
109
 
 
110
 
    def testExists(self):
111
 
        self.assertRaises(smtp.AddressError, self.domain.exists, "any user")
112
 
 
113
 
    def testRelay(self):
114
 
        self.assertEquals(
115
 
            self.domain.willRelay("random q emailer", "protocol"),
116
 
            False
117
 
        )
118
 
 
119
 
    def testMessage(self):
120
 
        self.assertRaises(AssertionError, self.domain.startMessage, "whomever")
121
 
 
122
 
    def testAddUser(self):
123
 
        self.domain.addUser("bob", "password")
124
 
        self.assertRaises(smtp.SMTPBadRcpt, self.domain.exists, "bob")
125
 
 
126
 
class FileMessageTestCase(unittest.TestCase):
127
 
    def setUp(self):
128
 
        self.name = "fileMessage.testFile"
129
 
        self.final = "final.fileMessage.testFile"
130
 
        self.f = file(self.name, 'w')
131
 
        self.fp = mail.mail.FileMessage(self.f, self.name, self.final)
132
 
 
133
 
    def tearDown(self):
134
 
        try:
135
 
            self.f.close()
136
 
        except:
137
 
            pass
138
 
        try:
139
 
            os.remove(self.name)
140
 
        except:
141
 
            pass
142
 
        try:
143
 
            os.remove(self.final)
144
 
        except:
145
 
            pass
146
 
 
147
 
    def testFinalName(self):
148
 
        return self.fp.eomReceived().addCallback(self._cbFinalName)
149
 
    
150
 
    def _cbFinalName(self, result):
151
 
        self.assertEquals(result, self.final)
152
 
        self.failUnless(self.f.closed)
153
 
        self.failIf(os.path.exists(self.name))
154
 
 
155
 
    def testContents(self):
156
 
        contents = "first line\nsecond line\nthird line\n"
157
 
        for line in contents.splitlines():
158
 
            self.fp.lineReceived(line)
159
 
        self.fp.eomReceived()
160
 
        self.assertEquals(file(self.final).read(), contents)
161
 
 
162
 
    def testInterrupted(self):
163
 
        contents = "first line\nsecond line\n"
164
 
        for line in contents.splitlines():
165
 
            self.fp.lineReceived(line)
166
 
        self.fp.connectionLost()
167
 
        self.failIf(os.path.exists(self.name))
168
 
        self.failIf(os.path.exists(self.final))
169
 
 
170
 
class MailServiceTestCase(unittest.TestCase):
171
 
    def setUp(self):
172
 
        self.service = mail.mail.MailService()
173
 
 
174
 
    def testFactories(self):
175
 
        f = self.service.getPOP3Factory()
176
 
        self.failUnless(isinstance(f, protocol.ServerFactory))
177
 
        self.failUnless(f.buildProtocol(('127.0.0.1', 12345)), pop3.POP3)
178
 
 
179
 
        f = self.service.getSMTPFactory()
180
 
        self.failUnless(isinstance(f, protocol.ServerFactory))
181
 
        self.failUnless(f.buildProtocol(('127.0.0.1', 12345)), smtp.SMTP)
182
 
 
183
 
        f = self.service.getESMTPFactory()
184
 
        self.failUnless(isinstance(f, protocol.ServerFactory))
185
 
        self.failUnless(f.buildProtocol(('127.0.0.1', 12345)), smtp.ESMTP)
186
 
 
187
 
    def testPortals(self):
188
 
        o1 = object()
189
 
        o2 = object()
190
 
        self.service.portals['domain'] = o1
191
 
        self.service.portals[''] = o2
192
 
 
193
 
        self.failUnless(self.service.lookupPortal('domain') is o1)
194
 
        self.failUnless(self.service.defaultPortal() is o2)
195
 
 
196
 
class FailingMaildirMailboxAppendMessageTask(mail.maildir._MaildirMailboxAppendMessageTask):
197
 
    _openstate = True
198
 
    _writestate = True
199
 
    _renamestate = True
200
 
    def osopen(self, fn, attr, mode):
201
 
        if self._openstate:
202
 
            return os.open(fn, attr, mode)
203
 
        else:
204
 
            raise OSError(errno.EPERM, "Faked Permission Problem")
205
 
    def oswrite(self, fh, data):
206
 
        if self._writestate:
207
 
            return os.write(fh, data)
208
 
        else:
209
 
            raise OSError(errno.ENOSPC, "Faked Space problem")
210
 
    def osrename(self, oldname, newname):
211
 
        if self._renamestate:
212
 
            return os.rename(oldname, newname)
213
 
        else:
214
 
            raise OSError(errno.EPERM, "Faked Permission Problem")
215
 
 
216
 
class MaildirAppendStringTestCase(unittest.TestCase):
217
 
    def setUp(self):
218
 
        self.d = self.mktemp()
219
 
        mail.maildir.initializeMaildir(self.d)
220
 
 
221
 
    def tearDown(self):
222
 
        shutil.rmtree(self.d)
223
 
 
224
 
    def testAppend(self):
225
 
        mbox = mail.maildir.MaildirMailbox(self.d)
226
 
        mbox.AppendFactory = FailingMaildirMailboxAppendMessageTask
227
 
        for i in xrange(1, 11):
228
 
            self.assertEquals(
229
 
                unittest.wait(mbox.appendMessage("X" * i)),
230
 
                None)
231
 
        self.assertEquals(len(mbox.listMessages()),
232
 
                          10)
233
 
        self.assertEquals(len(mbox.getMessage(5).read()), 6)
234
 
        # test in the right order: last to first error location.
235
 
        mbox.AppendFactory._renamestate = False
236
 
        self.failUnless(isinstance(unittest.deferredError(mbox.appendMessage("TEST")),
237
 
                                                        failure.Failure))
238
 
        mbox.AppendFactory._renamestate = True
239
 
        mbox.AppendFactory._writestate = False
240
 
        self.failUnless(isinstance(unittest.deferredError(mbox.appendMessage("TEST")),
241
 
                                                        failure.Failure))
242
 
        mbox.AppendFactory._writestate = True
243
 
        mbox.AppendFactory._openstate = False
244
 
        self.failUnless(isinstance(unittest.deferredError(mbox.appendMessage("TEST")),
245
 
                                                        failure.Failure))
246
 
        mbox.AppendFactory._openstate = True
247
 
 
248
 
 
249
 
class MaildirAppendFileTestCase(unittest.TestCase):
250
 
    def setUp(self):
251
 
        self.d = self.mktemp()
252
 
        mail.maildir.initializeMaildir(self.d)
253
 
 
254
 
    def tearDown(self):
255
 
        shutil.rmtree(self.d)
256
 
 
257
 
    def testAppend(self):
258
 
        mbox = mail.maildir.MaildirMailbox(self.d)
259
 
        for i in xrange(1, 11):
260
 
            temp = tempfile.TemporaryFile()
261
 
            temp.write("X" * i)
262
 
            temp.seek(0,0)
263
 
            self.assertEquals(
264
 
                unittest.wait(mbox.appendMessage(temp)),
265
 
                None)
266
 
            temp.close()
267
 
        self.assertEquals(len(mbox.listMessages()),
268
 
                          10)
269
 
        self.assertEquals(len(mbox.getMessage(5).read()), 6)
270
 
 
271
 
 
272
 
class MaildirTestCase(unittest.TestCase):
273
 
    def setUp(self):
274
 
        self.d = self.mktemp()
275
 
        mail.maildir.initializeMaildir(self.d)
276
 
 
277
 
    def tearDown(self):
278
 
        shutil.rmtree(self.d)
279
 
 
280
 
    def testInitializer(self):
281
 
        d = self.d
282
 
        trash = os.path.join(d, '.Trash')
283
 
 
284
 
        self.failUnless(os.path.exists(d) and os.path.isdir(d))
285
 
        self.failUnless(os.path.exists(os.path.join(d, 'new')))
286
 
        self.failUnless(os.path.exists(os.path.join(d, 'cur')))
287
 
        self.failUnless(os.path.exists(os.path.join(d, 'tmp')))
288
 
        self.failUnless(os.path.isdir(os.path.join(d, 'new')))
289
 
        self.failUnless(os.path.isdir(os.path.join(d, 'cur')))
290
 
        self.failUnless(os.path.isdir(os.path.join(d, 'tmp')))
291
 
 
292
 
        self.failUnless(os.path.exists(os.path.join(trash, 'new')))
293
 
        self.failUnless(os.path.exists(os.path.join(trash, 'cur')))
294
 
        self.failUnless(os.path.exists(os.path.join(trash, 'tmp')))
295
 
        self.failUnless(os.path.isdir(os.path.join(trash, 'new')))
296
 
        self.failUnless(os.path.isdir(os.path.join(trash, 'cur')))
297
 
        self.failUnless(os.path.isdir(os.path.join(trash, 'tmp')))
298
 
 
299
 
    def testMailbox(self):
300
 
        j = os.path.join
301
 
        n = mail.maildir._generateMaildirName
302
 
        msgs = [j(b, n()) for b in ('cur', 'new') for x in range(5)]
303
 
 
304
 
        # Toss a few files into the mailbox
305
 
        i = 1
306
 
        for f in msgs:
307
 
            f = file(j(self.d, f), 'w')
308
 
            f.write('x' * i)
309
 
            f.close()
310
 
            i = i + 1
311
 
 
312
 
        mb = mail.maildir.MaildirMailbox(self.d)
313
 
        self.assertEquals(mb.listMessages(), range(1, 11))
314
 
        self.assertEquals(mb.listMessages(1), 2)
315
 
        self.assertEquals(mb.listMessages(5), 6)
316
 
 
317
 
        self.assertEquals(mb.getMessage(6).read(), 'x' * 7)
318
 
        self.assertEquals(mb.getMessage(1).read(), 'x' * 2)
319
 
 
320
 
        d = {}
321
 
        for i in range(10):
322
 
            u = mb.getUidl(i)
323
 
            self.failIf(u in d)
324
 
            d[u] = None
325
 
 
326
 
        p, f = os.path.split(msgs[5])
327
 
 
328
 
        mb.deleteMessage(5)
329
 
        self.assertEquals(mb.listMessages(5), 0)
330
 
        self.failUnless(os.path.exists(j(self.d, '.Trash', 'cur', f)))
331
 
        self.failIf(os.path.exists(j(self.d, msgs[5])))
332
 
 
333
 
        mb.undeleteMessages()
334
 
        self.assertEquals(mb.listMessages(5), 6)
335
 
        self.failIf(os.path.exists(j(self.d, '.Trash', 'cur', f)))
336
 
        self.failUnless(os.path.exists(j(self.d, msgs[5])))
337
 
 
338
 
class MaildirDirdbmDomainTestCase(unittest.TestCase):
339
 
    def setUp(self):
340
 
        self.P = self.mktemp()
341
 
        self.S = mail.mail.MailService()
342
 
        self.D = mail.maildir.MaildirDirdbmDomain(self.S, self.P)
343
 
 
344
 
    def tearDown(self):
345
 
        shutil.rmtree(self.P)
346
 
 
347
 
    def testAddUser(self):
348
 
        toAdd = (('user1', 'pwd1'), ('user2', 'pwd2'), ('user3', 'pwd3'))
349
 
        for (u, p) in toAdd:
350
 
            self.D.addUser(u, p)
351
 
 
352
 
        for (u, p) in toAdd:
353
 
            self.failUnless(u in self.D.dbm)
354
 
            self.assertEquals(self.D.dbm[u], p)
355
 
            self.failUnless(os.path.exists(os.path.join(self.P, u)))
356
 
 
357
 
    def testCredentials(self):
358
 
        creds = self.D.getCredentialsCheckers()
359
 
 
360
 
        self.assertEquals(len(creds), 1)
361
 
        self.failUnless(cred.checkers.ICredentialsChecker.providedBy(creds[0]))
362
 
        self.failUnless(cred.credentials.IUsernamePassword in creds[0].credentialInterfaces)
363
 
 
364
 
    def testRequestAvatar(self):
365
 
        class ISomething(components.Interface):
366
 
            pass
367
 
 
368
 
        self.D.addUser('user', 'password')
369
 
        self.assertRaises(
370
 
            NotImplementedError,
371
 
            self.D.requestAvatar, 'user', None, ISomething
372
 
        )
373
 
 
374
 
        t = self.D.requestAvatar('user', None, pop3.IMailbox)
375
 
        self.assertEquals(len(t), 3)
376
 
        self.failUnless(t[0] is pop3.IMailbox)
377
 
        self.failUnless(pop3.IMailbox.providedBy(t[1]))
378
 
 
379
 
        t[2]()
380
 
 
381
 
    def testRequestAvatarId(self):
382
 
        self.D.addUser('user', 'password')
383
 
        database = self.D.getCredentialsCheckers()[0]
384
 
 
385
 
        creds = cred.credentials.UsernamePassword('user', 'wrong password')
386
 
        self.assertRaises(
387
 
            cred.error.UnauthorizedLogin,
388
 
            database.requestAvatarId, creds
389
 
        )
390
 
 
391
 
        creds = cred.credentials.UsernamePassword('user', 'password')
392
 
        self.assertEquals(database.requestAvatarId(creds), 'user')
393
 
 
394
 
class ServiceDomainTestCase(unittest.TestCase):
395
 
    def setUp(self):
396
 
        self.S = mail.mail.MailService()
397
 
        self.D = mail.protocols.DomainDeliveryBase(self.S, None)
398
 
        self.D.service = self.S
399
 
        self.D.protocolName = 'TEST'
400
 
        self.D.host = 'hostname'
401
 
 
402
 
        self.tmpdir = self.mktemp()
403
 
        domain = mail.maildir.MaildirDirdbmDomain(self.S, self.tmpdir)
404
 
        domain.addUser('user', 'password')
405
 
        self.S.domains['test.domain'] = domain
406
 
 
407
 
    def tearDown(self):
408
 
        shutil.rmtree(self.tmpdir)
409
 
 
410
 
    def testReceivedHeader(self):
411
 
         hdr = self.D.receivedHeader(
412
 
             ('remotehost', '123.232.101.234'),
413
 
             smtp.Address('<someguy@somplace>'),
414
 
             ['user@host.name']
415
 
         )
416
 
         fp = StringIO.StringIO(hdr)
417
 
         m = rfc822.Message(fp)
418
 
         self.assertEquals(len(m.items()), 1)
419
 
         self.failUnless(m.has_key('Received'))
420
 
 
421
 
    def testValidateTo(self):
422
 
        user = smtp.User('user@test.domain', 'helo', None, 'wherever@whatever')
423
 
        return defer.maybeDeferred(self.D.validateTo, user
424
 
            ).addCallback(self._cbValidateTo
425
 
            )
426
 
    
427
 
    def _cbValidateTo(self, result):
428
 
        self.failUnless(callable(result))
429
 
 
430
 
    def testValidateToBadUsername(self):
431
 
        user = smtp.User('resu@test.domain', 'helo', None, 'wherever@whatever')
432
 
        return unittest.assertFailure(
433
 
            defer.maybeDeferred(self.D.validateTo, user),
434
 
            smtp.SMTPBadRcpt)
435
 
 
436
 
    def testValidateToBadDomain(self):
437
 
        user = smtp.User('user@domain.test', 'helo', None, 'wherever@whatever')
438
 
        return unittest.assertFailure(
439
 
            defer.maybeDeferred(self.D.validateTo, user),
440
 
            smtp.SMTPBadRcpt)
441
 
 
442
 
    def testValidateFrom(self):
443
 
        helo = ('hostname', '127.0.0.1')
444
 
        origin = smtp.Address('<user@hostname>')
445
 
        self.failUnless(self.D.validateFrom(helo, origin) is origin)
446
 
 
447
 
        helo = ('hostname', '1.2.3.4')
448
 
        origin = smtp.Address('<user@hostname>')
449
 
        self.failUnless(self.D.validateFrom(helo, origin) is origin)
450
 
 
451
 
        helo = ('hostname', '1.2.3.4')
452
 
        origin = smtp.Address('<>')
453
 
        self.failUnless(self.D.validateFrom(helo, origin) is origin)
454
 
 
455
 
        self.assertRaises(
456
 
            smtp.SMTPBadSender,
457
 
            self.D.validateFrom, None, origin
458
 
        )
459
 
 
460
 
class VirtualPOP3TestCase(unittest.TestCase):
461
 
    def setUp(self):
462
 
        self.tmpdir = self.mktemp()
463
 
        self.S = mail.mail.MailService()
464
 
        self.D = mail.maildir.MaildirDirdbmDomain(self.S, self.tmpdir)
465
 
        self.D.addUser('user', 'password')
466
 
        self.S.domains['test.domain'] = self.D
467
 
 
468
 
        portal = cred.portal.Portal(self.D)
469
 
        map(portal.registerChecker, self.D.getCredentialsCheckers())
470
 
        self.S.portals[''] = self.S.portals['test.domain'] = portal
471
 
 
472
 
        self.P = mail.protocols.VirtualPOP3()
473
 
        self.P.service = self.S
474
 
        self.P.magic = '<unit test magic>'
475
 
 
476
 
    def tearDown(self):
477
 
        shutil.rmtree(self.tmpdir)
478
 
 
479
 
    def testAuthenticateAPOP(self):
480
 
        resp = md5.new(self.P.magic + 'password').hexdigest()
481
 
        return self.P.authenticateUserAPOP('user', resp
482
 
            ).addCallback(self._cbAuthenticateAPOP
483
 
            )
484
 
 
485
 
    def _cbAuthenticateAPOP(self, result):
486
 
        self.assertEquals(len(result), 3)
487
 
        self.assertEquals(result[0], pop3.IMailbox)
488
 
        self.failUnless(pop3.IMailbox.providedBy(result[1]))
489
 
        result[2]()
490
 
 
491
 
    def testAuthenticateIncorrectUserAPOP(self):
492
 
        resp = md5.new(self.P.magic + 'password').hexdigest()
493
 
        return unittest.assertFailure(
494
 
            self.P.authenticateUserAPOP('resu', resp),
495
 
            cred.error.UnauthorizedLogin)
496
 
 
497
 
    def testAuthenticateIncorrectResponseAPOP(self):
498
 
        resp = md5.new('wrong digest').hexdigest()
499
 
        return unittest.assertFailure(
500
 
            self.P.authenticateUserAPOP('user', resp),
501
 
            cred.error.UnauthorizedLogin)
502
 
 
503
 
    def testAuthenticatePASS(self):
504
 
        return self.P.authenticateUserPASS('user', 'password'
505
 
            ).addCallback(self._cbAuthenticatePASS
506
 
            )
507
 
    
508
 
    def _cbAuthenticatePASS(self, result):
509
 
        self.assertEquals(len(result), 3)
510
 
        self.assertEquals(result[0], pop3.IMailbox)
511
 
        self.failUnless(pop3.IMailbox.providedBy(result[1]))
512
 
        result[2]()
513
 
 
514
 
    def testAuthenticateBadUserPASS(self):
515
 
        return unittest.assertFailure(
516
 
            self.P.authenticateUserPASS('resu', 'password'),
517
 
            cred.error.UnauthorizedLogin)
518
 
 
519
 
    def testAuthenticateBadPasswordPASS(self):
520
 
        return unittest.assertFailure(
521
 
            self.P.authenticateUserPASS('user', 'wrong password'),
522
 
            cred.error.UnauthorizedLogin)
523
 
 
524
 
class empty(smtp.User):
525
 
    def __init__(self):
526
 
        pass
527
 
 
528
 
class RelayTestCase(unittest.TestCase):
529
 
    def testExists(self):
530
 
        service = mail.mail.MailService()
531
 
        domain = mail.relay.DomainQueuer(service)
532
 
 
533
 
        doRelay = [
534
 
            address.UNIXAddress('/var/run/mail-relay'),
535
 
            address.IPv4Address('TCP', '127.0.0.1', 12345),
536
 
        ]
537
 
 
538
 
        dontRelay = [
539
 
            address.IPv4Address('TCP', '192.168.2.1', 62),
540
 
            address.IPv4Address('TCP', '1.2.3.4', 1943),
541
 
        ]
542
 
 
543
 
        for peer in doRelay:
544
 
            user = empty()
545
 
            user.orig = 'user@host'
546
 
            user.dest = 'tsoh@resu'
547
 
            user.protocol = empty()
548
 
            user.protocol.transport = empty()
549
 
            user.protocol.transport.getPeer = lambda: peer
550
 
 
551
 
            self.failUnless(callable(domain.exists(user)))
552
 
 
553
 
        for peer in dontRelay:
554
 
            user = empty()
555
 
            user.orig = 'some@place'
556
 
            user.protocol = empty()
557
 
            user.protocol.transport = empty()
558
 
            user.protocol.transport.getPeer = lambda: peer
559
 
            user.dest = 'who@cares'
560
 
 
561
 
            self.assertRaises(smtp.SMTPBadRcpt, domain.exists, user)
562
 
 
563
 
class RelayerTestCase(unittest.TestCase):
564
 
    def setUp(self):
565
 
        self.tmpdir = self.mktemp()
566
 
        os.mkdir(self.tmpdir)
567
 
        self.messageFiles = []
568
 
        for i in range(10):
569
 
            name = os.path.join(self.tmpdir, 'body-%d' % (i,))
570
 
            f = file(name + '-H', 'w')
571
 
            pickle.dump(['from-%d' % (i,), 'to-%d' % (i,)], f)
572
 
            f.close()
573
 
 
574
 
            f = file(name + '-D', 'w')
575
 
            f.write(name)
576
 
            f.seek(0, 0)
577
 
            self.messageFiles.append(name)
578
 
 
579
 
        self.R = mail.relay.RelayerMixin()
580
 
        self.R.loadMessages(self.messageFiles)
581
 
 
582
 
    def tearDown(self):
583
 
        shutil.rmtree(self.tmpdir)
584
 
 
585
 
    def testMailFrom(self):
586
 
        for i in range(10):
587
 
            self.assertEquals(self.R.getMailFrom(), 'from-%d' % (i,))
588
 
            self.R.sentMail(250, None, None, None, None)
589
 
        self.assertEquals(self.R.getMailFrom(), None)
590
 
 
591
 
    def testMailTo(self):
592
 
        for i in range(10):
593
 
            self.assertEquals(self.R.getMailTo(), ['to-%d' % (i,)])
594
 
            self.R.sentMail(250, None, None, None, None)
595
 
        self.assertEquals(self.R.getMailTo(), None)
596
 
 
597
 
    def testMailData(self):
598
 
        for i in range(10):
599
 
            name = os.path.join(self.tmpdir, 'body-%d' % (i,))
600
 
            self.assertEquals(self.R.getMailData().read(), name)
601
 
            self.R.sentMail(250, None, None, None, None)
602
 
        self.assertEquals(self.R.getMailData(), None)
603
 
 
604
 
class Manager:
605
 
    def __init__(self):
606
 
        self.success = []
607
 
        self.failure = []
608
 
        self.done = []
609
 
 
610
 
    def notifySuccess(self, factory, message):
611
 
        self.success.append((factory, message))
612
 
 
613
 
    def notifyFailure(self, factory, message):
614
 
        self.failure.append((factory, message))
615
 
 
616
 
    def notifyDone(self, factory):
617
 
        self.done.append(factory)
618
 
 
619
 
class ManagedRelayerTestCase(unittest.TestCase):
620
 
    def setUp(self):
621
 
        self.manager = Manager()
622
 
        self.messages = range(0, 20, 2)
623
 
        self.factory = object()
624
 
        self.relay = mail.relaymanager.ManagedRelayerMixin(self.manager)
625
 
        self.relay.messages = self.messages[:]
626
 
        self.relay.names = self.messages[:]
627
 
        self.relay.factory = self.factory
628
 
 
629
 
    def testSuccessfulSentMail(self):
630
 
        for i in self.messages:
631
 
            self.relay.sentMail(250, None, None, None, None)
632
 
 
633
 
        self.assertEquals(
634
 
            self.manager.success,
635
 
            [(self.factory, m) for m in self.messages]
636
 
        )
637
 
 
638
 
    def testFailedSentMail(self):
639
 
        for i in self.messages:
640
 
            self.relay.sentMail(550, None, None, None, None)
641
 
 
642
 
        self.assertEquals(
643
 
            self.manager.failure,
644
 
            [(self.factory, m) for m in self.messages]
645
 
        )
646
 
 
647
 
    def testConnectionLost(self):
648
 
        self.relay.connectionLost(failure.Failure(Exception()))
649
 
        self.assertEquals(self.manager.done, [self.factory])
650
 
 
651
 
class DirectoryQueueTestCase(unittest.TestCase):
652
 
    def setUp(self):
653
 
        # This is almost a test case itself.
654
 
        self.tmpdir = self.mktemp()
655
 
        os.mkdir(self.tmpdir)
656
 
        self.queue = mail.relaymanager.Queue(self.tmpdir)
657
 
        self.queue.noisy = False
658
 
        for m in range(25):
659
 
            hdrF, msgF = self.queue.createNewMessage()
660
 
            pickle.dump(['header', m], hdrF)
661
 
            hdrF.close()
662
 
            msgF.lineReceived('body: %d' % (m,))
663
 
            msgF.eomReceived()
664
 
        self.queue.readDirectory()
665
 
 
666
 
    def tearDown(self):
667
 
        shutil.rmtree(self.tmpdir)
668
 
 
669
 
    def testWaiting(self):
670
 
        self.failUnless(self.queue.hasWaiting())
671
 
        self.assertEquals(len(self.queue.getWaiting()), 25)
672
 
 
673
 
        waiting = self.queue.getWaiting()
674
 
        self.queue.setRelaying(waiting[0])
675
 
        self.assertEquals(len(self.queue.getWaiting()), 24)
676
 
 
677
 
        self.queue.setWaiting(waiting[0])
678
 
        self.assertEquals(len(self.queue.getWaiting()), 25)
679
 
 
680
 
    def testRelaying(self):
681
 
        for m in self.queue.getWaiting():
682
 
            self.queue.setRelaying(m)
683
 
            self.assertEquals(
684
 
                len(self.queue.getRelayed()),
685
 
                25 - len(self.queue.getWaiting())
686
 
            )
687
 
 
688
 
        self.failIf(self.queue.hasWaiting())
689
 
 
690
 
        relayed = self.queue.getRelayed()
691
 
        self.queue.setWaiting(relayed[0])
692
 
        self.assertEquals(len(self.queue.getWaiting()), 1)
693
 
        self.assertEquals(len(self.queue.getRelayed()), 24)
694
 
 
695
 
    def testDone(self):
696
 
        msg = self.queue.getWaiting()[0]
697
 
        self.queue.setRelaying(msg)
698
 
        self.queue.done(msg)
699
 
 
700
 
        self.assertEquals(len(self.queue.getWaiting()), 24)
701
 
        self.assertEquals(len(self.queue.getRelayed()), 0)
702
 
 
703
 
        self.failIf(msg in self.queue.getWaiting())
704
 
        self.failIf(msg in self.queue.getRelayed())
705
 
 
706
 
    def testEnvelope(self):
707
 
        envelopes = []
708
 
 
709
 
        for msg in self.queue.getWaiting():
710
 
            envelopes.append(self.queue.getEnvelope(msg))
711
 
 
712
 
        envelopes.sort()
713
 
        for i in range(25):
714
 
            self.assertEquals(
715
 
                envelopes.pop(0),
716
 
                ['header', i]
717
 
            )
718
 
 
719
 
from twisted.names import server
720
 
from twisted.names import client
721
 
from twisted.names import common
722
 
 
723
 
class TestAuthority(common.ResolverBase):
724
 
    def __init__(self):
725
 
        common.ResolverBase.__init__(self)
726
 
        self.addresses = {}
727
 
 
728
 
    def _lookup(self, name, cls, type, timeout = None):
729
 
        if name in self.addresses and type == dns.MX:
730
 
            results = []
731
 
            for a in self.addresses[name]:
732
 
                hdr = dns.RRHeader(
733
 
                    name, dns.MX, dns.IN, 60, dns.Record_MX(0, a)
734
 
                )
735
 
                results.append(hdr)
736
 
            return defer.succeed((results, [], []))
737
 
        return defer.fail(failure.Failure(dns.DomainError(name)))
738
 
 
739
 
def setUpDNS(self):
740
 
    self.auth = TestAuthority()
741
 
    factory = server.DNSServerFactory([self.auth])
742
 
    protocol = dns.DNSDatagramProtocol(factory)
743
 
    while 1:
744
 
        self.port = reactor.listenTCP(0, factory, interface='127.0.0.1')
745
 
        portNumber = self.port.getHost().port
746
 
 
747
 
        try:
748
 
            self.udpPort = reactor.listenUDP(portNumber, protocol, interface='127.0.0.1')
749
 
        except CannotListenError:
750
 
            self.port.stopListening()
751
 
        else:
752
 
            break
753
 
    self.resolver = client.Resolver(servers=[('127.0.0.1', portNumber)])
754
 
 
755
 
def tearDownDNS(self):
756
 
    self.port.stopListening()
757
 
    self.udpPort.stopListening()
758
 
    try:
759
 
        self.resolver._parseCall.cancel()
760
 
    except:
761
 
        pass
762
 
 
763
 
class MXTestCase(unittest.TestCase):
764
 
    def setUp(self):
765
 
        setUpDNS(self)
766
 
        self.mx = mail.relaymanager.MXCalculator(self.resolver)
767
 
 
768
 
    def tearDown(self):
769
 
        tearDownDNS(self)
770
 
 
771
 
    def testSimpleSuccess(self):
772
 
        self.auth.addresses['test.domain'] = ['the.email.test.domain']
773
 
        return self.mx.getMX('test.domain').addCallback(self._cbSimpleSuccess)
774
 
 
775
 
    def _cbSimpleSuccess(self, mx):
776
 
        self.assertEquals(mx.preference, 0)
777
 
        self.assertEquals(str(mx.exchange), 'the.email.test.domain')
778
 
 
779
 
    def testSimpleFailure(self):
780
 
        self.mx.fallbackToDomain = False
781
 
        return unittest.assertFailure(self.mx.getMX('test.domain'), IOError)
782
 
 
783
 
    def testSimpleFailureWithFallback(self):
784
 
        return unittest.assertFailure(self.mx.getMX('test.domain'), DNSLookupError)
785
 
 
786
 
    def testManyRecords(self):
787
 
        self.auth.addresses['test.domain'] = [
788
 
            'mx1.test.domain', 'mx2.test.domain', 'mx3.test.domain'
789
 
        ]
790
 
        return self.mx.getMX('test.domain'
791
 
            ).addCallback(self._cbManyRecordsSuccessfulLookup
792
 
            )
793
 
    
794
 
    def _cbManyRecordsSuccessfulLookup(self, mx):
795
 
        self.failUnless(str(mx.exchange).split('.', 1)[0] in ('mx1', 'mx2', 'mx3'))
796
 
        self.mx.markBad(str(mx.exchange))
797
 
        return self.mx.getMX('test.domain'
798
 
            ).addCallback(self._cbManyRecordsDifferentResult, mx
799
 
            )
800
 
    
801
 
    def _cbManyRecordsDifferentResult(self, nextMX, mx):
802
 
        self.assertNotEqual(str(mx.exchange), str(nextMX.exchange))
803
 
        self.mx.markBad(str(nextMX.exchange))
804
 
 
805
 
        return self.mx.getMX('test.domain'
806
 
            ).addCallback(self._cbManyRecordsLastResult, mx, nextMX
807
 
            )
808
 
    
809
 
    def _cbManyRecordsLastResult(self, lastMX, mx, nextMX):
810
 
        self.assertNotEqual(str(mx.exchange), str(lastMX.exchange))
811
 
        self.assertNotEqual(str(nextMX.exchange), str(lastMX.exchange))
812
 
 
813
 
        self.mx.markBad(str(lastMX.exchange))
814
 
        self.mx.markGood(str(nextMX.exchange))
815
 
        
816
 
        return self.mx.getMX('test.domain'
817
 
            ).addCallback(self._cbManyRecordsRepeatSpecificResult, nextMX
818
 
            )
819
 
    
820
 
    def _cbManyRecordsRepeatSpecificResult(self, againMX, nextMX):
821
 
        self.assertEqual(str(againMX.exchange), str(nextMX.exchange))
822
 
 
823
 
class LiveFireExercise(unittest.TestCase):
824
 
    if interfaces.IReactorUDP(reactor, default=None) is None:
825
 
        skip = "UDP support is required to determining MX records"
826
 
 
827
 
    def setUp(self):
828
 
        setUpDNS(self)
829
 
        self.tmpdirs = [
830
 
            'domainDir', 'insertionDomain', 'insertionQueue',
831
 
            'destinationDomain', 'destinationQueue'
832
 
        ]
833
 
 
834
 
    def tearDown(self):
835
 
        tearDownDNS(self)
836
 
        for d in self.tmpdirs:
837
 
            if os.path.exists(d):
838
 
                shutil.rmtree(d)
839
 
 
840
 
    def testLocalDelivery(self):
841
 
        service = mail.mail.MailService()
842
 
        service.smtpPortal.registerChecker(cred.checkers.AllowAnonymousAccess())
843
 
        domain = mail.maildir.MaildirDirdbmDomain(service, 'domainDir')
844
 
        domain.addUser('user', 'password')
845
 
        service.domains['test.domain'] = domain
846
 
        service.portals['test.domain'] = cred.portal.Portal(domain)
847
 
        service.portals[''] = service.portals['test.domain']
848
 
        map(service.portals[''].registerChecker, domain.getCredentialsCheckers())
849
 
 
850
 
        service.setQueue(mail.relay.DomainQueuer(service))
851
 
        manager = mail.relaymanager.SmartHostSMTPRelayingManager(service.queue, None)
852
 
        helper = mail.relaymanager.RelayStateHelper(manager, 1)
853
 
 
854
 
        f = service.getSMTPFactory()
855
 
 
856
 
        self.smtpServer = reactor.listenTCP(0, f, interface='127.0.0.1')
857
 
 
858
 
        client = LineSendingProtocol([
859
 
            'HELO meson',
860
 
            'MAIL FROM: <user@hostname>',
861
 
            'RCPT TO: <user@test.domain>',
862
 
            'DATA',
863
 
            'This is the message',
864
 
            '.',
865
 
            'QUIT'
866
 
        ])
867
 
 
868
 
        done = []
869
 
        f = protocol.ClientFactory()
870
 
        f.protocol = lambda: client
871
 
        f.clientConnectionLost = lambda *args: done.append(None)
872
 
        reactor.connectTCP('127.0.0.1', self.smtpServer.getHost().port, f)
873
 
 
874
 
        i = 0
875
 
        while len(done) == 0 and i < 1000:
876
 
            reactor.iterate(0.01)
877
 
            i += 1
878
 
 
879
 
        self.failUnless(done)
880
 
 
881
 
        mbox = domain.requestAvatar('user', None, pop3.IMailbox)[1]
882
 
        msg = mbox.getMessage(0).read()
883
 
        self.failIfEqual(msg.find('This is the message'), -1)
884
 
 
885
 
        self.smtpServer.stopListening()
886
 
 
887
 
    def testRelayDelivery(self):
888
 
        # Here is the service we will connect to and send mail from
889
 
        insServ = mail.mail.MailService()
890
 
        insServ.smtpPortal.registerChecker(cred.checkers.AllowAnonymousAccess())
891
 
        domain = mail.maildir.MaildirDirdbmDomain(insServ, 'insertionDomain')
892
 
        insServ.domains['insertion.domain'] = domain
893
 
        insServ.portals['insertion.domain'] = cred.portal.Portal(domain)
894
 
        os.mkdir('insertionQueue')
895
 
        insServ.setQueue(mail.relaymanager.Queue('insertionQueue'))
896
 
        insServ.domains.setDefaultDomain(mail.relay.DomainQueuer(insServ))
897
 
        manager = mail.relaymanager.SmartHostSMTPRelayingManager(insServ.queue)
898
 
        manager.fArgs += ('test.identity.hostname',)
899
 
        helper = mail.relaymanager.RelayStateHelper(manager, 1)
900
 
        # Yoink!  Now the internet obeys OUR every whim!
901
 
        manager.mxcalc = mail.relaymanager.MXCalculator(self.resolver)
902
 
        # And this is our whim.
903
 
        self.auth.addresses['destination.domain'] = ['localhost']
904
 
 
905
 
        f = insServ.getSMTPFactory()
906
 
        self.insServer = reactor.listenTCP(0, f, interface='127.0.0.1')
907
 
 
908
 
        # Here is the service the previous one will connect to for final
909
 
        # delivery
910
 
        destServ = mail.mail.MailService()
911
 
        destServ.smtpPortal.registerChecker(cred.checkers.AllowAnonymousAccess())
912
 
        domain = mail.maildir.MaildirDirdbmDomain(destServ, 'destinationDomain')
913
 
        domain.addUser('user', 'password')
914
 
        destServ.domains['destination.domain'] = domain
915
 
        destServ.portals['destination.domain'] = cred.portal.Portal(domain)
916
 
        os.mkdir('destinationQueue')
917
 
        destServ.setQueue(mail.relaymanager.Queue('destinationQueue'))
918
 
        manager2 = mail.relaymanager.SmartHostSMTPRelayingManager(destServ.queue)
919
 
        helper = mail.relaymanager.RelayStateHelper(manager, 1)
920
 
        helper.startService()
921
 
 
922
 
        f = destServ.getSMTPFactory()
923
 
        self.destServer = reactor.listenTCP(0, f, interface='127.0.0.1')
924
 
 
925
 
        # Update the port number the *first* relay will connect to, because we can't use
926
 
        # port 25
927
 
        manager.PORT = self.destServer.getHost().port
928
 
 
929
 
        client = LineSendingProtocol([
930
 
            'HELO meson',
931
 
            'MAIL FROM: <user@wherever>',
932
 
            'RCPT TO: <user@destination.domain>',
933
 
            'DATA',
934
 
            'This is the message',
935
 
            '.',
936
 
            'QUIT'
937
 
        ])
938
 
 
939
 
        done = []
940
 
        f = protocol.ClientFactory()
941
 
        f.protocol = lambda: client
942
 
        f.clientConnectionLost = lambda *args: done.append(None)
943
 
        reactor.connectTCP('127.0.0.1', self.insServer.getHost().port, f)
944
 
 
945
 
        i = 0
946
 
        while len(done) == 0 and i < 1000:
947
 
            reactor.iterate(0.01)
948
 
            i += 1
949
 
 
950
 
        self.failUnless(done)
951
 
 
952
 
        # First part of the delivery is done.  Poke the queue manually now
953
 
        # so we don't have to wait for the queue to be flushed.
954
 
        manager.checkState()
955
 
 
956
 
        for i in range(1000):
957
 
            reactor.iterate(0.01)
958
 
 
959
 
        mbox = domain.requestAvatar('user', None, pop3.IMailbox)[1]
960
 
        msg = mbox.getMessage(0).read()
961
 
        self.failIfEqual(msg.find('This is the message'), -1)
962
 
 
963
 
        self.insServer.stopListening()
964
 
        self.destServer.stopListening()
965
 
        helper.stopService()
966
 
 
967
 
aliasFile = StringIO.StringIO("""\
968
 
# Here's a comment
969
 
   # woop another one
970
 
testuser:                   address1,address2, address3,
971
 
    continuation@address, |/bin/process/this
972
 
 
973
 
usertwo:thisaddress,thataddress, lastaddress
974
 
lastuser:       :/includable, /filename, |/program, address
975
 
""")
976
 
 
977
 
class LineBufferMessage:
978
 
    def __init__(self):
979
 
        self.lines = []
980
 
        self.eom = False
981
 
        self.lost = False
982
 
 
983
 
    def lineReceived(self, line):
984
 
        self.lines.append(line)
985
 
 
986
 
    def eomReceived(self):
987
 
        self.eom = True
988
 
        return defer.succeed('<Whatever>')
989
 
 
990
 
    def connectionLost(self):
991
 
        self.lost = True
992
 
 
993
 
class AliasTestCase(unittest.TestCase):
994
 
    lines = [
995
 
        'First line',
996
 
        'Next line',
997
 
        '',
998
 
        'After a blank line',
999
 
        'Last line'
1000
 
    ]
1001
 
 
1002
 
    def setUp(self):
1003
 
        aliasFile.seek(0)
1004
 
 
1005
 
    def testHandle(self):
1006
 
        result = {}
1007
 
        lines = [
1008
 
            'user:  another@host\n',
1009
 
            'nextuser:  |/bin/program\n',
1010
 
            'user:  me@again\n',
1011
 
            'moreusers: :/etc/include/filename\n',
1012
 
            'multiuser: first@host, second@host,last@anotherhost',
1013
 
        ]
1014
 
 
1015
 
        for l in lines:
1016
 
            mail.alias.handle(result, l, 'TestCase', None)
1017
 
 
1018
 
        self.assertEquals(result['user'], ['another@host', 'me@again'])
1019
 
        self.assertEquals(result['nextuser'], ['|/bin/program'])
1020
 
        self.assertEquals(result['moreusers'], [':/etc/include/filename'])
1021
 
        self.assertEquals(result['multiuser'], ['first@host', 'second@host', 'last@anotherhost'])
1022
 
 
1023
 
    def testFileLoader(self):
1024
 
        domains = {'': object()}
1025
 
        result = mail.alias.loadAliasFile(domains, fp=aliasFile)
1026
 
 
1027
 
        self.assertEquals(len(result), 3)
1028
 
 
1029
 
        group = result['testuser']
1030
 
        s = str(group)
1031
 
        for a in ('address1', 'address2', 'address3', 'continuation@address', '/bin/process/this'):
1032
 
            self.failIfEqual(s.find(a), -1)
1033
 
        self.assertEquals(len(group), 5)
1034
 
 
1035
 
        group = result['usertwo']
1036
 
        s = str(group)
1037
 
        for a in ('thisaddress', 'thataddress', 'lastaddress'):
1038
 
            self.failIfEqual(s.find(a), -1)
1039
 
        self.assertEquals(len(group), 3)
1040
 
 
1041
 
        group = result['lastuser']
1042
 
        s = str(group)
1043
 
        self.failUnlessEqual(s.find('/includable'), -1)
1044
 
        for a in ('/filename', 'program', 'address'):
1045
 
            self.failIfEqual(s.find(a), -1, '%s not found' % a)
1046
 
        self.assertEquals(len(group), 3)
1047
 
 
1048
 
    def testMultiWrapper(self):
1049
 
        msgs = LineBufferMessage(), LineBufferMessage(), LineBufferMessage()
1050
 
        msg = mail.alias.MultiWrapper(msgs)
1051
 
 
1052
 
        for L in self.lines:
1053
 
            msg.lineReceived(L)
1054
 
        return msg.eomReceived().addCallback(self._cbMultiWrapper, msgs)
1055
 
 
1056
 
    def _cbMultiWrapper(self, ignored, msgs):
1057
 
        for m in msgs:
1058
 
            self.failUnless(m.eom)
1059
 
            self.failIf(m.lost)
1060
 
            self.assertEquals(self.lines, m.lines)
1061
 
 
1062
 
    def testFileAlias(self):
1063
 
        tmpfile = self.mktemp()
1064
 
        a = mail.alias.FileAlias(tmpfile, None, None)
1065
 
        m = a.createMessageReceiver()
1066
 
 
1067
 
        for l in self.lines:
1068
 
            m.lineReceived(l)
1069
 
        return m.eomReceived().addCallback(self._cbTestFileAlias, tmpfile)
1070
 
 
1071
 
    def _cbTestFileAlias(self, ignored, tmpfile):
1072
 
        lines = file(tmpfile).readlines()
1073
 
        self.assertEquals([L[:-1] for L in lines], self.lines)
1074
 
 
1075
 
 
1076
 
class DummyProcess(object):
1077
 
    __slots__ = ['onEnd']
1078
 
 
1079
 
class ProcessAliasTestCase(test_process.SignalMixin, unittest.TestCase):
1080
 
    lines = [
1081
 
        'First line',
1082
 
        'Next line',
1083
 
        '',
1084
 
        'After a blank line',
1085
 
        'Last line'
1086
 
    ]
1087
 
 
1088
 
    def setUpClass(self):
1089
 
        self.DNSNAME = smtp.DNSNAME
1090
 
        smtp.DNSNAME = ''
1091
 
 
1092
 
    def tearDownClass(self):
1093
 
        smtp.DNSNAME = self.DNSNAME
1094
 
 
1095
 
    def tearDown(self):
1096
 
        reactor.iterate()
1097
 
        reactor.iterate()
1098
 
        reactor.iterate()
1099
 
 
1100
 
    def testProcessAlias(self):
1101
 
        path = util.sibpath(__file__, 'process.alias.sh')
1102
 
        a = mail.alias.ProcessAlias(path, None, None)
1103
 
        m = a.createMessageReceiver()
1104
 
 
1105
 
        for l in self.lines:
1106
 
            m.lineReceived(l)
1107
 
        return m.eomReceived().addCallback(self._cbProcessAlias)
1108
 
 
1109
 
    def _cbProcessAlias(self, ignored):
1110
 
        lines = file('process.alias.out').readlines()
1111
 
        self.assertEquals([L[:-1] for L in lines], self.lines)
1112
 
 
1113
 
    def testAliasResolution(self):
1114
 
        aliases = {}
1115
 
        domain = {'': TestDomain(aliases, ['user1', 'user2', 'user3'])}
1116
 
        A1 = mail.alias.AliasGroup(['user1', '|echo', '/file'], domain, 'alias1')
1117
 
        A2 = mail.alias.AliasGroup(['user2', 'user3'], domain, 'alias2')
1118
 
        A3 = mail.alias.AddressAlias('alias1', domain, 'alias3')
1119
 
        aliases.update({
1120
 
            'alias1': A1,
1121
 
            'alias2': A2,
1122
 
            'alias3': A3,
1123
 
        })
1124
 
 
1125
 
        r1 = map(str, A1.resolve(aliases).objs)
1126
 
        r1.sort()
1127
 
        expected = map(str, [
1128
 
            mail.alias.AddressAlias('user1', None, None),
1129
 
            mail.alias.MessageWrapper(DummyProcess(), 'echo'),
1130
 
            mail.alias.FileWrapper('/file'),
1131
 
        ])
1132
 
        expected.sort()
1133
 
        self.assertEquals(r1, expected)
1134
 
 
1135
 
        r2 = map(str, A2.resolve(aliases).objs)
1136
 
        r2.sort()
1137
 
        expected = map(str, [
1138
 
            mail.alias.AddressAlias('user2', None, None),
1139
 
            mail.alias.AddressAlias('user3', None, None)
1140
 
        ])
1141
 
        expected.sort()
1142
 
        self.assertEquals(r2, expected)
1143
 
 
1144
 
        r3 = map(str, A3.resolve(aliases).objs)
1145
 
        r3.sort()
1146
 
        expected = map(str, [
1147
 
            mail.alias.AddressAlias('user1', None, None),
1148
 
            mail.alias.MessageWrapper(DummyProcess(), 'echo'),
1149
 
            mail.alias.FileWrapper('/file'),
1150
 
        ])
1151
 
        expected.sort()
1152
 
        self.assertEquals(r3, expected)
1153
 
 
1154
 
    def testCyclicAlias(self):
1155
 
        aliases = {}
1156
 
        domain = {'': TestDomain(aliases, [])}
1157
 
        A1 = mail.alias.AddressAlias('alias2', domain, 'alias1')
1158
 
        A2 = mail.alias.AddressAlias('alias3', domain, 'alias2')
1159
 
        A3 = mail.alias.AddressAlias('alias1', domain, 'alias3')
1160
 
        aliases.update({
1161
 
            'alias1': A1,
1162
 
            'alias2': A2,
1163
 
            'alias3': A3
1164
 
        })
1165
 
 
1166
 
        self.assertEquals(aliases['alias1'].resolve(aliases), None)
1167
 
        self.assertEquals(aliases['alias2'].resolve(aliases), None)
1168
 
        self.assertEquals(aliases['alias3'].resolve(aliases), None)
1169
 
 
1170
 
        A4 = mail.alias.AliasGroup(['|echo', 'alias1'], domain, 'alias4')
1171
 
        aliases['alias4'] = A4
1172
 
        
1173
 
        r = map(str, A4.resolve(aliases).objs)
1174
 
        r.sort()
1175
 
        expected = map(str, [
1176
 
            mail.alias.MessageWrapper(DummyProcess(), 'echo')
1177
 
        ])
1178
 
 
1179
 
if interfaces.IReactorProcess(reactor, default=None) is None:
1180
 
    ProcessAliasTestCase = "IReactorProcess not supported"
1181
 
 
1182
 
class TestDomain:
1183
 
    def __init__(self, aliases, users):
1184
 
        self.aliases = aliases
1185
 
        self.users = users
1186
 
 
1187
 
    def exists(self, user, memo=None):
1188
 
        user = user.dest.local
1189
 
        if user in self.users:
1190
 
            return lambda: mail.alias.AddressAlias(user, None, None)
1191
 
        try:
1192
 
            a = self.aliases[user]
1193
 
        except:
1194
 
            raise smtp.SMTPBadRcpt(user)
1195
 
        else:
1196
 
            aliases = a.resolve(self.aliases, memo)
1197
 
            if aliases:
1198
 
                return lambda: aliases
1199
 
            raise smtp.SMTPBadRcpt(user)
1200
 
 
1201
 
 
1202
 
from twisted.python.runtime import platformType
1203
 
import types
1204
 
if platformType != "posix":
1205
 
    for o in locals().values():
1206
 
        if isinstance(o, (types.ClassType, type)) and issubclass(o, unittest.TestCase):
1207
 
            o.skip = "twisted.mail only works on posix"