~landscape/zope3/newer-from-ztk

« back to all changes in this revision

Viewing changes to src/twisted/mail/test/test_mail.py

  • Committer: Thomas Hervé
  • Date: 2009-07-08 13:52:04 UTC
  • Revision ID: thomas@canonical.com-20090708135204-df5eesrthifpylf8
Remove twisted copy

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
 
2
 
# Copyright (c) 2001-2004 Twisted Matrix Laboratories.
3
 
# See LICENSE for details.
4
 
 
5
 
 
6
 
import os
7
 
import errno
8
 
import md5
9
 
import shutil
10
 
import smtplib
11
 
import pickle
12
 
import StringIO
13
 
import rfc822
14
 
 
15
 
from twisted.trial import unittest
16
 
import tempfile
17
 
 
18
 
from zope.interface import providedBy, Interface, implements
19
 
 
20
 
from twisted.trial import unittest
21
 
from twisted.mail import smtp
22
 
from twisted.mail import pop3
23
 
from twisted.names import dns
24
 
from twisted.protocols import basic
25
 
from twisted.internet import protocol
26
 
from twisted.internet import defer
27
 
from twisted.internet.defer import Deferred
28
 
from twisted.internet import reactor
29
 
from twisted.internet import interfaces
30
 
from twisted.internet.error import DNSLookupError, CannotListenError
31
 
from twisted.internet import address
32
 
from twisted.python import failure
33
 
from twisted.python import util
34
 
 
35
 
from twisted import mail
36
 
import twisted.mail.mail
37
 
import twisted.mail.maildir
38
 
import twisted.mail.relay
39
 
import twisted.mail.relaymanager
40
 
import twisted.mail.protocols
41
 
import twisted.mail.alias
42
 
 
43
 
from twisted.names.error import DNSNameError
44
 
from twisted.names.dns import Record_MX
45
 
 
46
 
from twisted import cred
47
 
import twisted.cred.credentials
48
 
import twisted.cred.checkers
49
 
import twisted.cred.portal
50
 
 
51
 
# Since we run a couple processes, we need SignalMixin from test_process
52
 
from twisted.test import test_process
53
 
 
54
 
from twisted.test.proto_helpers import LineSendingProtocol
55
 
 
56
 
class DomainWithDefaultsTestCase(unittest.TestCase):
57
 
    def testMethods(self):
58
 
        d = dict([(x, x + 10) for x in range(10)])
59
 
        d = mail.mail.DomainWithDefaultDict(d, 'Default')
60
 
 
61
 
        self.assertEquals(len(d), 10)
62
 
        self.assertEquals(list(iter(d)), range(10))
63
 
        self.assertEquals(list(d.iterkeys()), list(iter(d)))
64
 
 
65
 
        items = list(d.iteritems())
66
 
        items.sort()
67
 
        self.assertEquals(items, [(x, x + 10) for x in range(10)])
68
 
 
69
 
        values = list(d.itervalues())
70
 
        values.sort()
71
 
        self.assertEquals(values, range(10, 20))
72
 
 
73
 
        items = d.items()
74
 
        items.sort()
75
 
        self.assertEquals(items, [(x, x + 10) for x in range(10)])
76
 
 
77
 
        values = d.values()
78
 
        values.sort()
79
 
        self.assertEquals(values, range(10, 20))
80
 
 
81
 
        for x in range(10):
82
 
            self.assertEquals(d[x], x + 10)
83
 
            self.assertEquals(d.get(x), x + 10)
84
 
            self.failUnless(x in d)
85
 
            self.failUnless(d.has_key(x))
86
 
 
87
 
        del d[2], d[4], d[6]
88
 
 
89
 
        self.assertEquals(len(d), 7)
90
 
        self.assertEquals(d[2], 'Default')
91
 
        self.assertEquals(d[4], 'Default')
92
 
        self.assertEquals(d[6], 'Default')
93
 
 
94
 
        d.update({'a': None, 'b': (), 'c': '*'})
95
 
        self.assertEquals(len(d), 10)
96
 
        self.assertEquals(d['a'], None)
97
 
        self.assertEquals(d['b'], ())
98
 
        self.assertEquals(d['c'], '*')
99
 
 
100
 
        d.clear()
101
 
        self.assertEquals(len(d), 0)
102
 
 
103
 
        self.assertEquals(d.setdefault('key', 'value'), 'value')
104
 
        self.assertEquals(d['key'], 'value')
105
 
 
106
 
        self.assertEquals(d.popitem(), ('key', 'value'))
107
 
        self.assertEquals(len(d), 0)
108
 
 
109
 
class BounceTestCase(unittest.TestCase):
110
 
    def setUp(self):
111
 
        self.domain = mail.mail.BounceDomain()
112
 
 
113
 
    def testExists(self):
114
 
        self.assertRaises(smtp.AddressError, self.domain.exists, "any user")
115
 
 
116
 
    def testRelay(self):
117
 
        self.assertEquals(
118
 
            self.domain.willRelay("random q emailer", "protocol"),
119
 
            False
120
 
        )
121
 
 
122
 
    def testMessage(self):
123
 
        self.assertRaises(AssertionError, self.domain.startMessage, "whomever")
124
 
 
125
 
    def testAddUser(self):
126
 
        self.domain.addUser("bob", "password")
127
 
        self.assertRaises(smtp.SMTPBadRcpt, self.domain.exists, "bob")
128
 
 
129
 
class FileMessageTestCase(unittest.TestCase):
130
 
    def setUp(self):
131
 
        self.name = "fileMessage.testFile"
132
 
        self.final = "final.fileMessage.testFile"
133
 
        self.f = file(self.name, 'w')
134
 
        self.fp = mail.mail.FileMessage(self.f, self.name, self.final)
135
 
 
136
 
    def tearDown(self):
137
 
        try:
138
 
            self.f.close()
139
 
        except:
140
 
            pass
141
 
        try:
142
 
            os.remove(self.name)
143
 
        except:
144
 
            pass
145
 
        try:
146
 
            os.remove(self.final)
147
 
        except:
148
 
            pass
149
 
 
150
 
    def testFinalName(self):
151
 
        return self.fp.eomReceived().addCallback(self._cbFinalName)
152
 
    
153
 
    def _cbFinalName(self, result):
154
 
        self.assertEquals(result, self.final)
155
 
        self.failUnless(self.f.closed)
156
 
        self.failIf(os.path.exists(self.name))
157
 
 
158
 
    def testContents(self):
159
 
        contents = "first line\nsecond line\nthird line\n"
160
 
        for line in contents.splitlines():
161
 
            self.fp.lineReceived(line)
162
 
        self.fp.eomReceived()
163
 
        self.assertEquals(file(self.final).read(), contents)
164
 
 
165
 
    def testInterrupted(self):
166
 
        contents = "first line\nsecond line\n"
167
 
        for line in contents.splitlines():
168
 
            self.fp.lineReceived(line)
169
 
        self.fp.connectionLost()
170
 
        self.failIf(os.path.exists(self.name))
171
 
        self.failIf(os.path.exists(self.final))
172
 
 
173
 
class MailServiceTestCase(unittest.TestCase):
174
 
    def setUp(self):
175
 
        self.service = mail.mail.MailService()
176
 
 
177
 
    def testFactories(self):
178
 
        f = self.service.getPOP3Factory()
179
 
        self.failUnless(isinstance(f, protocol.ServerFactory))
180
 
        self.failUnless(f.buildProtocol(('127.0.0.1', 12345)), pop3.POP3)
181
 
 
182
 
        f = self.service.getSMTPFactory()
183
 
        self.failUnless(isinstance(f, protocol.ServerFactory))
184
 
        self.failUnless(f.buildProtocol(('127.0.0.1', 12345)), smtp.SMTP)
185
 
 
186
 
        f = self.service.getESMTPFactory()
187
 
        self.failUnless(isinstance(f, protocol.ServerFactory))
188
 
        self.failUnless(f.buildProtocol(('127.0.0.1', 12345)), smtp.ESMTP)
189
 
 
190
 
    def testPortals(self):
191
 
        o1 = object()
192
 
        o2 = object()
193
 
        self.service.portals['domain'] = o1
194
 
        self.service.portals[''] = o2
195
 
 
196
 
        self.failUnless(self.service.lookupPortal('domain') is o1)
197
 
        self.failUnless(self.service.defaultPortal() is o2)
198
 
 
199
 
class FailingMaildirMailboxAppendMessageTask(mail.maildir._MaildirMailboxAppendMessageTask):
200
 
    _openstate = True
201
 
    _writestate = True
202
 
    _renamestate = True
203
 
    def osopen(self, fn, attr, mode):
204
 
        if self._openstate:
205
 
            return os.open(fn, attr, mode)
206
 
        else:
207
 
            raise OSError(errno.EPERM, "Faked Permission Problem")
208
 
    def oswrite(self, fh, data):
209
 
        if self._writestate:
210
 
            return os.write(fh, data)
211
 
        else:
212
 
            raise OSError(errno.ENOSPC, "Faked Space problem")
213
 
    def osrename(self, oldname, newname):
214
 
        if self._renamestate:
215
 
            return os.rename(oldname, newname)
216
 
        else:
217
 
            raise OSError(errno.EPERM, "Faked Permission Problem")
218
 
 
219
 
class MaildirAppendStringTestCase(unittest.TestCase):
220
 
    def setUp(self):
221
 
        self.d = self.mktemp()
222
 
        mail.maildir.initializeMaildir(self.d)
223
 
 
224
 
    def tearDown(self):
225
 
        shutil.rmtree(self.d)
226
 
 
227
 
    def _append(self, ignored, mbox):
228
 
        d = mbox.appendMessage('TEST')
229
 
        return self.assertFailure(d, Exception)
230
 
 
231
 
    def _setState(self, ignored, mbox, rename=None, write=None, open=None):
232
 
        if rename is not None:
233
 
            mbox.AppendFactory._renameState = rename
234
 
        if write is not None:
235
 
            mbox.AppendFactory._writeState = write
236
 
        if open is not None:
237
 
            mbox.AppendFactory._openstate = open
238
 
 
239
 
    def testAppend(self):
240
 
        mbox = mail.maildir.MaildirMailbox(self.d)
241
 
        mbox.AppendFactory = FailingMaildirMailboxAppendMessageTask
242
 
        ds = []
243
 
        for i in xrange(1, 11):
244
 
            ds.append(mbox.appendMessage("X" * i))
245
 
            ds[-1].addCallback(self.assertEqual, None)
246
 
        d = defer.gatherResults(ds)
247
 
        d.addCallback(self._cbTestAppend, mbox)
248
 
        return d
249
 
 
250
 
    def _cbTestAppend(self, result, mbox):
251
 
        self.assertEquals(len(mbox.listMessages()),
252
 
                          10)
253
 
        self.assertEquals(len(mbox.getMessage(5).read()), 6)
254
 
        # test in the right order: last to first error location.
255
 
        mbox.AppendFactory._renamestate = False
256
 
        d = self._append(None, mbox)
257
 
        d.addCallback(self._setState, mbox, rename=True, write=False)
258
 
        d.addCallback(self._append, mbox)
259
 
        d.addCallback(self._setState, mbox, write=True, open=False)
260
 
        d.addCallback(self._append, mbox)
261
 
        d.addCallback(self._setState, mbox, open=True)
262
 
        return d
263
 
 
264
 
 
265
 
class MaildirAppendFileTestCase(unittest.TestCase):
266
 
    def setUp(self):
267
 
        self.d = self.mktemp()
268
 
        mail.maildir.initializeMaildir(self.d)
269
 
 
270
 
    def tearDown(self):
271
 
        shutil.rmtree(self.d)
272
 
 
273
 
    def testAppend(self):
274
 
        mbox = mail.maildir.MaildirMailbox(self.d)
275
 
        ds = []
276
 
        def _check(res, t):
277
 
            t.close()
278
 
            self.assertEqual(res, None)
279
 
        for i in xrange(1, 11):
280
 
            temp = tempfile.TemporaryFile()
281
 
            temp.write("X" * i)
282
 
            temp.seek(0,0)
283
 
            ds.append(mbox.appendMessage(temp))
284
 
            ds[-1].addCallback(_check, temp)
285
 
        return defer.gatherResults(ds).addCallback(self._cbTestAppend, mbox)
286
 
 
287
 
    def _cbTestAppend(self, result, mbox):
288
 
        self.assertEquals(len(mbox.listMessages()),
289
 
                          10)
290
 
        self.assertEquals(len(mbox.getMessage(5).read()), 6)
291
 
 
292
 
 
293
 
class MaildirTestCase(unittest.TestCase):
294
 
    def setUp(self):
295
 
        self.d = self.mktemp()
296
 
        mail.maildir.initializeMaildir(self.d)
297
 
 
298
 
    def tearDown(self):
299
 
        shutil.rmtree(self.d)
300
 
 
301
 
    def testInitializer(self):
302
 
        d = self.d
303
 
        trash = os.path.join(d, '.Trash')
304
 
 
305
 
        self.failUnless(os.path.exists(d) and os.path.isdir(d))
306
 
        self.failUnless(os.path.exists(os.path.join(d, 'new')))
307
 
        self.failUnless(os.path.exists(os.path.join(d, 'cur')))
308
 
        self.failUnless(os.path.exists(os.path.join(d, 'tmp')))
309
 
        self.failUnless(os.path.isdir(os.path.join(d, 'new')))
310
 
        self.failUnless(os.path.isdir(os.path.join(d, 'cur')))
311
 
        self.failUnless(os.path.isdir(os.path.join(d, 'tmp')))
312
 
 
313
 
        self.failUnless(os.path.exists(os.path.join(trash, 'new')))
314
 
        self.failUnless(os.path.exists(os.path.join(trash, 'cur')))
315
 
        self.failUnless(os.path.exists(os.path.join(trash, 'tmp')))
316
 
        self.failUnless(os.path.isdir(os.path.join(trash, 'new')))
317
 
        self.failUnless(os.path.isdir(os.path.join(trash, 'cur')))
318
 
        self.failUnless(os.path.isdir(os.path.join(trash, 'tmp')))
319
 
 
320
 
    def testMailbox(self):
321
 
        j = os.path.join
322
 
        n = mail.maildir._generateMaildirName
323
 
        msgs = [j(b, n()) for b in ('cur', 'new') for x in range(5)]
324
 
 
325
 
        # Toss a few files into the mailbox
326
 
        i = 1
327
 
        for f in msgs:
328
 
            f = file(j(self.d, f), 'w')
329
 
            f.write('x' * i)
330
 
            f.close()
331
 
            i = i + 1
332
 
 
333
 
        mb = mail.maildir.MaildirMailbox(self.d)
334
 
        self.assertEquals(mb.listMessages(), range(1, 11))
335
 
        self.assertEquals(mb.listMessages(1), 2)
336
 
        self.assertEquals(mb.listMessages(5), 6)
337
 
 
338
 
        self.assertEquals(mb.getMessage(6).read(), 'x' * 7)
339
 
        self.assertEquals(mb.getMessage(1).read(), 'x' * 2)
340
 
 
341
 
        d = {}
342
 
        for i in range(10):
343
 
            u = mb.getUidl(i)
344
 
            self.failIf(u in d)
345
 
            d[u] = None
346
 
 
347
 
        p, f = os.path.split(msgs[5])
348
 
 
349
 
        mb.deleteMessage(5)
350
 
        self.assertEquals(mb.listMessages(5), 0)
351
 
        self.failUnless(os.path.exists(j(self.d, '.Trash', 'cur', f)))
352
 
        self.failIf(os.path.exists(j(self.d, msgs[5])))
353
 
 
354
 
        mb.undeleteMessages()
355
 
        self.assertEquals(mb.listMessages(5), 6)
356
 
        self.failIf(os.path.exists(j(self.d, '.Trash', 'cur', f)))
357
 
        self.failUnless(os.path.exists(j(self.d, msgs[5])))
358
 
 
359
 
class MaildirDirdbmDomainTestCase(unittest.TestCase):
360
 
    def setUp(self):
361
 
        self.P = self.mktemp()
362
 
        self.S = mail.mail.MailService()
363
 
        self.D = mail.maildir.MaildirDirdbmDomain(self.S, self.P)
364
 
 
365
 
    def tearDown(self):
366
 
        shutil.rmtree(self.P)
367
 
 
368
 
    def testAddUser(self):
369
 
        toAdd = (('user1', 'pwd1'), ('user2', 'pwd2'), ('user3', 'pwd3'))
370
 
        for (u, p) in toAdd:
371
 
            self.D.addUser(u, p)
372
 
 
373
 
        for (u, p) in toAdd:
374
 
            self.failUnless(u in self.D.dbm)
375
 
            self.assertEquals(self.D.dbm[u], p)
376
 
            self.failUnless(os.path.exists(os.path.join(self.P, u)))
377
 
 
378
 
    def testCredentials(self):
379
 
        creds = self.D.getCredentialsCheckers()
380
 
 
381
 
        self.assertEquals(len(creds), 1)
382
 
        self.failUnless(cred.checkers.ICredentialsChecker.providedBy(creds[0]))
383
 
        self.failUnless(cred.credentials.IUsernamePassword in creds[0].credentialInterfaces)
384
 
 
385
 
    def testRequestAvatar(self):
386
 
        class ISomething(Interface):
387
 
            pass
388
 
 
389
 
        self.D.addUser('user', 'password')
390
 
        self.assertRaises(
391
 
            NotImplementedError,
392
 
            self.D.requestAvatar, 'user', None, ISomething
393
 
        )
394
 
 
395
 
        t = self.D.requestAvatar('user', None, pop3.IMailbox)
396
 
        self.assertEquals(len(t), 3)
397
 
        self.failUnless(t[0] is pop3.IMailbox)
398
 
        self.failUnless(pop3.IMailbox.providedBy(t[1]))
399
 
 
400
 
        t[2]()
401
 
 
402
 
    def testRequestAvatarId(self):
403
 
        self.D.addUser('user', 'password')
404
 
        database = self.D.getCredentialsCheckers()[0]
405
 
 
406
 
        creds = cred.credentials.UsernamePassword('user', 'wrong password')
407
 
        self.assertRaises(
408
 
            cred.error.UnauthorizedLogin,
409
 
            database.requestAvatarId, creds
410
 
        )
411
 
 
412
 
        creds = cred.credentials.UsernamePassword('user', 'password')
413
 
        self.assertEquals(database.requestAvatarId(creds), 'user')
414
 
 
415
 
 
416
 
class StubAliasableDomain(object):
417
 
    """
418
 
    Minimal testable implementation of IAliasableDomain.
419
 
    """
420
 
    implements(mail.mail.IAliasableDomain)
421
 
 
422
 
    def exists(self, user):
423
 
        """
424
 
        No test coverage for invocations of this method on domain objects,
425
 
        so we just won't implement it.
426
 
        """
427
 
        raise NotImplementedError()
428
 
 
429
 
 
430
 
    def addUser(self, user, password):
431
 
        """
432
 
        No test coverage for invocations of this method on domain objects,
433
 
        so we just won't implement it.
434
 
        """
435
 
        raise NotImplementedError()
436
 
 
437
 
 
438
 
    def getCredentialsCheckers(self):
439
 
        """
440
 
        This needs to succeed in order for other tests to complete
441
 
        successfully, but we don't actually assert anything about its
442
 
        behavior.  Return an empty list.  Sometime later we should return
443
 
        something else and assert that a portal got set up properly.
444
 
        """
445
 
        return []
446
 
 
447
 
 
448
 
    def setAliasGroup(self, aliases):
449
 
        """
450
 
        Just record the value so the test can check it later.
451
 
        """
452
 
        self.aliasGroup = aliases
453
 
 
454
 
 
455
 
class ServiceDomainTestCase(unittest.TestCase):
456
 
    def setUp(self):
457
 
        self.S = mail.mail.MailService()
458
 
        self.D = mail.protocols.DomainDeliveryBase(self.S, None)
459
 
        self.D.service = self.S
460
 
        self.D.protocolName = 'TEST'
461
 
        self.D.host = 'hostname'
462
 
 
463
 
        self.tmpdir = self.mktemp()
464
 
        domain = mail.maildir.MaildirDirdbmDomain(self.S, self.tmpdir)
465
 
        domain.addUser('user', 'password')
466
 
        self.S.addDomain('test.domain', domain)
467
 
 
468
 
    def tearDown(self):
469
 
        shutil.rmtree(self.tmpdir)
470
 
 
471
 
 
472
 
    def testAddAliasableDomain(self):
473
 
        """
474
 
        Test that adding an IAliasableDomain to a mail service properly sets
475
 
        up alias group references and such.
476
 
        """
477
 
        aliases = object()
478
 
        domain = StubAliasableDomain()
479
 
        self.S.aliases = aliases
480
 
        self.S.addDomain('example.com', domain)
481
 
        self.assertIdentical(domain.aliasGroup, aliases)
482
 
 
483
 
 
484
 
    def testReceivedHeader(self):
485
 
         hdr = self.D.receivedHeader(
486
 
             ('remotehost', '123.232.101.234'),
487
 
             smtp.Address('<someguy@somplace>'),
488
 
             ['user@host.name']
489
 
         )
490
 
         fp = StringIO.StringIO(hdr)
491
 
         m = rfc822.Message(fp)
492
 
         self.assertEquals(len(m.items()), 1)
493
 
         self.failUnless(m.has_key('Received'))
494
 
 
495
 
    def testValidateTo(self):
496
 
        user = smtp.User('user@test.domain', 'helo', None, 'wherever@whatever')
497
 
        return defer.maybeDeferred(self.D.validateTo, user
498
 
            ).addCallback(self._cbValidateTo
499
 
            )
500
 
    
501
 
    def _cbValidateTo(self, result):
502
 
        self.failUnless(callable(result))
503
 
 
504
 
    def testValidateToBadUsername(self):
505
 
        user = smtp.User('resu@test.domain', 'helo', None, 'wherever@whatever')
506
 
        return self.assertFailure(
507
 
            defer.maybeDeferred(self.D.validateTo, user),
508
 
            smtp.SMTPBadRcpt)
509
 
 
510
 
    def testValidateToBadDomain(self):
511
 
        user = smtp.User('user@domain.test', 'helo', None, 'wherever@whatever')
512
 
        return self.assertFailure(
513
 
            defer.maybeDeferred(self.D.validateTo, user),
514
 
            smtp.SMTPBadRcpt)
515
 
 
516
 
    def testValidateFrom(self):
517
 
        helo = ('hostname', '127.0.0.1')
518
 
        origin = smtp.Address('<user@hostname>')
519
 
        self.failUnless(self.D.validateFrom(helo, origin) is origin)
520
 
 
521
 
        helo = ('hostname', '1.2.3.4')
522
 
        origin = smtp.Address('<user@hostname>')
523
 
        self.failUnless(self.D.validateFrom(helo, origin) is origin)
524
 
 
525
 
        helo = ('hostname', '1.2.3.4')
526
 
        origin = smtp.Address('<>')
527
 
        self.failUnless(self.D.validateFrom(helo, origin) is origin)
528
 
 
529
 
        self.assertRaises(
530
 
            smtp.SMTPBadSender,
531
 
            self.D.validateFrom, None, origin
532
 
        )
533
 
 
534
 
class VirtualPOP3TestCase(unittest.TestCase):
535
 
    def setUp(self):
536
 
        self.tmpdir = self.mktemp()
537
 
        self.S = mail.mail.MailService()
538
 
        self.D = mail.maildir.MaildirDirdbmDomain(self.S, self.tmpdir)
539
 
        self.D.addUser('user', 'password')
540
 
        self.S.addDomain('test.domain', self.D)
541
 
 
542
 
        portal = cred.portal.Portal(self.D)
543
 
        map(portal.registerChecker, self.D.getCredentialsCheckers())
544
 
        self.S.portals[''] = self.S.portals['test.domain'] = portal
545
 
 
546
 
        self.P = mail.protocols.VirtualPOP3()
547
 
        self.P.service = self.S
548
 
        self.P.magic = '<unit test magic>'
549
 
 
550
 
    def tearDown(self):
551
 
        shutil.rmtree(self.tmpdir)
552
 
 
553
 
    def testAuthenticateAPOP(self):
554
 
        resp = md5.new(self.P.magic + 'password').hexdigest()
555
 
        return self.P.authenticateUserAPOP('user', resp
556
 
            ).addCallback(self._cbAuthenticateAPOP
557
 
            )
558
 
 
559
 
    def _cbAuthenticateAPOP(self, result):
560
 
        self.assertEquals(len(result), 3)
561
 
        self.assertEquals(result[0], pop3.IMailbox)
562
 
        self.failUnless(pop3.IMailbox.providedBy(result[1]))
563
 
        result[2]()
564
 
 
565
 
    def testAuthenticateIncorrectUserAPOP(self):
566
 
        resp = md5.new(self.P.magic + 'password').hexdigest()
567
 
        return self.assertFailure(
568
 
            self.P.authenticateUserAPOP('resu', resp),
569
 
            cred.error.UnauthorizedLogin)
570
 
 
571
 
    def testAuthenticateIncorrectResponseAPOP(self):
572
 
        resp = md5.new('wrong digest').hexdigest()
573
 
        return self.assertFailure(
574
 
            self.P.authenticateUserAPOP('user', resp),
575
 
            cred.error.UnauthorizedLogin)
576
 
 
577
 
    def testAuthenticatePASS(self):
578
 
        return self.P.authenticateUserPASS('user', 'password'
579
 
            ).addCallback(self._cbAuthenticatePASS
580
 
            )
581
 
    
582
 
    def _cbAuthenticatePASS(self, result):
583
 
        self.assertEquals(len(result), 3)
584
 
        self.assertEquals(result[0], pop3.IMailbox)
585
 
        self.failUnless(pop3.IMailbox.providedBy(result[1]))
586
 
        result[2]()
587
 
 
588
 
    def testAuthenticateBadUserPASS(self):
589
 
        return self.assertFailure(
590
 
            self.P.authenticateUserPASS('resu', 'password'),
591
 
            cred.error.UnauthorizedLogin)
592
 
 
593
 
    def testAuthenticateBadPasswordPASS(self):
594
 
        return self.assertFailure(
595
 
            self.P.authenticateUserPASS('user', 'wrong password'),
596
 
            cred.error.UnauthorizedLogin)
597
 
 
598
 
class empty(smtp.User):
599
 
    def __init__(self):
600
 
        pass
601
 
 
602
 
class RelayTestCase(unittest.TestCase):
603
 
    def testExists(self):
604
 
        service = mail.mail.MailService()
605
 
        domain = mail.relay.DomainQueuer(service)
606
 
 
607
 
        doRelay = [
608
 
            address.UNIXAddress('/var/run/mail-relay'),
609
 
            address.IPv4Address('TCP', '127.0.0.1', 12345),
610
 
        ]
611
 
 
612
 
        dontRelay = [
613
 
            address.IPv4Address('TCP', '192.168.2.1', 62),
614
 
            address.IPv4Address('TCP', '1.2.3.4', 1943),
615
 
        ]
616
 
 
617
 
        for peer in doRelay:
618
 
            user = empty()
619
 
            user.orig = 'user@host'
620
 
            user.dest = 'tsoh@resu'
621
 
            user.protocol = empty()
622
 
            user.protocol.transport = empty()
623
 
            user.protocol.transport.getPeer = lambda: peer
624
 
 
625
 
            self.failUnless(callable(domain.exists(user)))
626
 
 
627
 
        for peer in dontRelay:
628
 
            user = empty()
629
 
            user.orig = 'some@place'
630
 
            user.protocol = empty()
631
 
            user.protocol.transport = empty()
632
 
            user.protocol.transport.getPeer = lambda: peer
633
 
            user.dest = 'who@cares'
634
 
 
635
 
            self.assertRaises(smtp.SMTPBadRcpt, domain.exists, user)
636
 
 
637
 
class RelayerTestCase(unittest.TestCase):
638
 
    def setUp(self):
639
 
        self.tmpdir = self.mktemp()
640
 
        os.mkdir(self.tmpdir)
641
 
        self.messageFiles = []
642
 
        for i in range(10):
643
 
            name = os.path.join(self.tmpdir, 'body-%d' % (i,))
644
 
            f = file(name + '-H', 'w')
645
 
            pickle.dump(['from-%d' % (i,), 'to-%d' % (i,)], f)
646
 
            f.close()
647
 
 
648
 
            f = file(name + '-D', 'w')
649
 
            f.write(name)
650
 
            f.seek(0, 0)
651
 
            self.messageFiles.append(name)
652
 
 
653
 
        self.R = mail.relay.RelayerMixin()
654
 
        self.R.loadMessages(self.messageFiles)
655
 
 
656
 
    def tearDown(self):
657
 
        shutil.rmtree(self.tmpdir)
658
 
 
659
 
    def testMailFrom(self):
660
 
        for i in range(10):
661
 
            self.assertEquals(self.R.getMailFrom(), 'from-%d' % (i,))
662
 
            self.R.sentMail(250, None, None, None, None)
663
 
        self.assertEquals(self.R.getMailFrom(), None)
664
 
 
665
 
    def testMailTo(self):
666
 
        for i in range(10):
667
 
            self.assertEquals(self.R.getMailTo(), ['to-%d' % (i,)])
668
 
            self.R.sentMail(250, None, None, None, None)
669
 
        self.assertEquals(self.R.getMailTo(), None)
670
 
 
671
 
    def testMailData(self):
672
 
        for i in range(10):
673
 
            name = os.path.join(self.tmpdir, 'body-%d' % (i,))
674
 
            self.assertEquals(self.R.getMailData().read(), name)
675
 
            self.R.sentMail(250, None, None, None, None)
676
 
        self.assertEquals(self.R.getMailData(), None)
677
 
 
678
 
class Manager:
679
 
    def __init__(self):
680
 
        self.success = []
681
 
        self.failure = []
682
 
        self.done = []
683
 
 
684
 
    def notifySuccess(self, factory, message):
685
 
        self.success.append((factory, message))
686
 
 
687
 
    def notifyFailure(self, factory, message):
688
 
        self.failure.append((factory, message))
689
 
 
690
 
    def notifyDone(self, factory):
691
 
        self.done.append(factory)
692
 
 
693
 
class ManagedRelayerTestCase(unittest.TestCase):
694
 
    def setUp(self):
695
 
        self.manager = Manager()
696
 
        self.messages = range(0, 20, 2)
697
 
        self.factory = object()
698
 
        self.relay = mail.relaymanager.ManagedRelayerMixin(self.manager)
699
 
        self.relay.messages = self.messages[:]
700
 
        self.relay.names = self.messages[:]
701
 
        self.relay.factory = self.factory
702
 
 
703
 
    def testSuccessfulSentMail(self):
704
 
        for i in self.messages:
705
 
            self.relay.sentMail(250, None, None, None, None)
706
 
 
707
 
        self.assertEquals(
708
 
            self.manager.success,
709
 
            [(self.factory, m) for m in self.messages]
710
 
        )
711
 
 
712
 
    def testFailedSentMail(self):
713
 
        for i in self.messages:
714
 
            self.relay.sentMail(550, None, None, None, None)
715
 
 
716
 
        self.assertEquals(
717
 
            self.manager.failure,
718
 
            [(self.factory, m) for m in self.messages]
719
 
        )
720
 
 
721
 
    def testConnectionLost(self):
722
 
        self.relay.connectionLost(failure.Failure(Exception()))
723
 
        self.assertEquals(self.manager.done, [self.factory])
724
 
 
725
 
class DirectoryQueueTestCase(unittest.TestCase):
726
 
    def setUp(self):
727
 
        # This is almost a test case itself.
728
 
        self.tmpdir = self.mktemp()
729
 
        os.mkdir(self.tmpdir)
730
 
        self.queue = mail.relaymanager.Queue(self.tmpdir)
731
 
        self.queue.noisy = False
732
 
        for m in range(25):
733
 
            hdrF, msgF = self.queue.createNewMessage()
734
 
            pickle.dump(['header', m], hdrF)
735
 
            hdrF.close()
736
 
            msgF.lineReceived('body: %d' % (m,))
737
 
            msgF.eomReceived()
738
 
        self.queue.readDirectory()
739
 
 
740
 
    def tearDown(self):
741
 
        shutil.rmtree(self.tmpdir)
742
 
 
743
 
    def testWaiting(self):
744
 
        self.failUnless(self.queue.hasWaiting())
745
 
        self.assertEquals(len(self.queue.getWaiting()), 25)
746
 
 
747
 
        waiting = self.queue.getWaiting()
748
 
        self.queue.setRelaying(waiting[0])
749
 
        self.assertEquals(len(self.queue.getWaiting()), 24)
750
 
 
751
 
        self.queue.setWaiting(waiting[0])
752
 
        self.assertEquals(len(self.queue.getWaiting()), 25)
753
 
 
754
 
    def testRelaying(self):
755
 
        for m in self.queue.getWaiting():
756
 
            self.queue.setRelaying(m)
757
 
            self.assertEquals(
758
 
                len(self.queue.getRelayed()),
759
 
                25 - len(self.queue.getWaiting())
760
 
            )
761
 
 
762
 
        self.failIf(self.queue.hasWaiting())
763
 
 
764
 
        relayed = self.queue.getRelayed()
765
 
        self.queue.setWaiting(relayed[0])
766
 
        self.assertEquals(len(self.queue.getWaiting()), 1)
767
 
        self.assertEquals(len(self.queue.getRelayed()), 24)
768
 
 
769
 
    def testDone(self):
770
 
        msg = self.queue.getWaiting()[0]
771
 
        self.queue.setRelaying(msg)
772
 
        self.queue.done(msg)
773
 
 
774
 
        self.assertEquals(len(self.queue.getWaiting()), 24)
775
 
        self.assertEquals(len(self.queue.getRelayed()), 0)
776
 
 
777
 
        self.failIf(msg in self.queue.getWaiting())
778
 
        self.failIf(msg in self.queue.getRelayed())
779
 
 
780
 
    def testEnvelope(self):
781
 
        envelopes = []
782
 
 
783
 
        for msg in self.queue.getWaiting():
784
 
            envelopes.append(self.queue.getEnvelope(msg))
785
 
 
786
 
        envelopes.sort()
787
 
        for i in range(25):
788
 
            self.assertEquals(
789
 
                envelopes.pop(0),
790
 
                ['header', i]
791
 
            )
792
 
 
793
 
from twisted.names import server
794
 
from twisted.names import client
795
 
from twisted.names import common
796
 
 
797
 
class TestAuthority(common.ResolverBase):
798
 
    def __init__(self):
799
 
        common.ResolverBase.__init__(self)
800
 
        self.addresses = {}
801
 
 
802
 
    def _lookup(self, name, cls, type, timeout = None):
803
 
        if name in self.addresses and type == dns.MX:
804
 
            results = []
805
 
            for a in self.addresses[name]:
806
 
                hdr = dns.RRHeader(
807
 
                    name, dns.MX, dns.IN, 60, dns.Record_MX(0, a)
808
 
                )
809
 
                results.append(hdr)
810
 
            return defer.succeed((results, [], []))
811
 
        return defer.fail(failure.Failure(dns.DomainError(name)))
812
 
 
813
 
def setUpDNS(self):
814
 
    self.auth = TestAuthority()
815
 
    factory = server.DNSServerFactory([self.auth])
816
 
    protocol = dns.DNSDatagramProtocol(factory)
817
 
    while 1:
818
 
        self.port = reactor.listenTCP(0, factory, interface='127.0.0.1')
819
 
        portNumber = self.port.getHost().port
820
 
 
821
 
        try:
822
 
            self.udpPort = reactor.listenUDP(portNumber, protocol, interface='127.0.0.1')
823
 
        except CannotListenError:
824
 
            self.port.stopListening()
825
 
        else:
826
 
            break
827
 
    self.resolver = client.Resolver(servers=[('127.0.0.1', portNumber)])
828
 
 
829
 
 
830
 
def tearDownDNS(self):
831
 
    dl = []
832
 
    dl.append(defer.maybeDeferred(self.port.stopListening))
833
 
    dl.append(defer.maybeDeferred(self.udpPort.stopListening))
834
 
    if self.resolver.protocol.transport is not None:
835
 
        dl.append(defer.maybeDeferred(self.resolver.protocol.transport.stopListening))
836
 
    try:
837
 
        self.resolver._parseCall.cancel()
838
 
    except:
839
 
        pass
840
 
    return defer.DeferredList(dl)
841
 
 
842
 
class MXTestCase(unittest.TestCase):
843
 
    def setUp(self):
844
 
        setUpDNS(self)
845
 
        self.mx = mail.relaymanager.MXCalculator(self.resolver)
846
 
 
847
 
    def tearDown(self):
848
 
        return tearDownDNS(self)
849
 
 
850
 
    def testSimpleSuccess(self):
851
 
        self.auth.addresses['test.domain'] = ['the.email.test.domain']
852
 
        return self.mx.getMX('test.domain').addCallback(self._cbSimpleSuccess)
853
 
 
854
 
    def _cbSimpleSuccess(self, mx):
855
 
        self.assertEquals(mx.preference, 0)
856
 
        self.assertEquals(str(mx.name), 'the.email.test.domain')
857
 
 
858
 
    def testSimpleFailure(self):
859
 
        self.mx.fallbackToDomain = False
860
 
        return self.assertFailure(self.mx.getMX('test.domain'), IOError)
861
 
 
862
 
    def testSimpleFailureWithFallback(self):
863
 
        return self.assertFailure(self.mx.getMX('test.domain'), DNSLookupError)
864
 
 
865
 
 
866
 
    def test_failureWithSuccessfulFallback(self):
867
 
        """
868
 
        Test that if the MX record lookup fails, fallback is enabled, and an A
869
 
        record is available for the name, then the Deferred returned by
870
 
        L{MXCalculator.getMX} ultimately fires with a Record_MX instance which
871
 
        gives the address in the A record for the name.
872
 
        """
873
 
        class DummyResolver(object):
874
 
            """
875
 
            Fake resolver which will fail an MX lookup but then succeed a
876
 
            getHostByName call.
877
 
            """
878
 
            def lookupMailExchange(self, domain):
879
 
                return defer.fail(DNSNameError())
880
 
 
881
 
            def getHostByName(self, domain):
882
 
                return defer.succeed("1.2.3.4")
883
 
 
884
 
        self.mx.resolver = DummyResolver()
885
 
        d = self.mx.getMX("domain")
886
 
        d.addCallback(self.assertEqual, Record_MX(name="1.2.3.4"))
887
 
        return d
888
 
 
889
 
 
890
 
    def testManyRecords(self):
891
 
        self.auth.addresses['test.domain'] = [
892
 
            'mx1.test.domain', 'mx2.test.domain', 'mx3.test.domain'
893
 
        ]
894
 
        return self.mx.getMX('test.domain'
895
 
            ).addCallback(self._cbManyRecordsSuccessfulLookup
896
 
            )
897
 
    
898
 
    def _cbManyRecordsSuccessfulLookup(self, mx):
899
 
        self.failUnless(str(mx.name).split('.', 1)[0] in ('mx1', 'mx2', 'mx3'))
900
 
        self.mx.markBad(str(mx.name))
901
 
        return self.mx.getMX('test.domain'
902
 
            ).addCallback(self._cbManyRecordsDifferentResult, mx
903
 
            )
904
 
    
905
 
    def _cbManyRecordsDifferentResult(self, nextMX, mx):
906
 
        self.assertNotEqual(str(mx.name), str(nextMX.name))
907
 
        self.mx.markBad(str(nextMX.name))
908
 
 
909
 
        return self.mx.getMX('test.domain'
910
 
            ).addCallback(self._cbManyRecordsLastResult, mx, nextMX
911
 
            )
912
 
    
913
 
    def _cbManyRecordsLastResult(self, lastMX, mx, nextMX):
914
 
        self.assertNotEqual(str(mx.name), str(lastMX.name))
915
 
        self.assertNotEqual(str(nextMX.name), str(lastMX.name))
916
 
 
917
 
        self.mx.markBad(str(lastMX.name))
918
 
        self.mx.markGood(str(nextMX.name))
919
 
        
920
 
        return self.mx.getMX('test.domain'
921
 
            ).addCallback(self._cbManyRecordsRepeatSpecificResult, nextMX
922
 
            )
923
 
    
924
 
    def _cbManyRecordsRepeatSpecificResult(self, againMX, nextMX):
925
 
        self.assertEqual(str(againMX.name), str(nextMX.name))
926
 
 
927
 
class LiveFireExercise(unittest.TestCase):
928
 
    if interfaces.IReactorUDP(reactor, None) is None:
929
 
        skip = "UDP support is required to determining MX records"
930
 
 
931
 
    def setUp(self):
932
 
        setUpDNS(self)
933
 
        self.tmpdirs = [
934
 
            'domainDir', 'insertionDomain', 'insertionQueue',
935
 
            'destinationDomain', 'destinationQueue'
936
 
        ]
937
 
 
938
 
    def tearDown(self):
939
 
        for d in self.tmpdirs:
940
 
            if os.path.exists(d):
941
 
                shutil.rmtree(d)
942
 
        return tearDownDNS(self)
943
 
 
944
 
    def testLocalDelivery(self):
945
 
        service = mail.mail.MailService()
946
 
        service.smtpPortal.registerChecker(cred.checkers.AllowAnonymousAccess())
947
 
        domain = mail.maildir.MaildirDirdbmDomain(service, 'domainDir')
948
 
        domain.addUser('user', 'password')
949
 
        service.addDomain('test.domain', domain)
950
 
        service.portals[''] = service.portals['test.domain']
951
 
        map(service.portals[''].registerChecker, domain.getCredentialsCheckers())
952
 
 
953
 
        service.setQueue(mail.relay.DomainQueuer(service))
954
 
        manager = mail.relaymanager.SmartHostSMTPRelayingManager(service.queue, None)
955
 
        helper = mail.relaymanager.RelayStateHelper(manager, 1)
956
 
 
957
 
        f = service.getSMTPFactory()
958
 
 
959
 
        self.smtpServer = reactor.listenTCP(0, f, interface='127.0.0.1')
960
 
 
961
 
        client = LineSendingProtocol([
962
 
            'HELO meson',
963
 
            'MAIL FROM: <user@hostname>',
964
 
            'RCPT TO: <user@test.domain>',
965
 
            'DATA',
966
 
            'This is the message',
967
 
            '.',
968
 
            'QUIT'
969
 
        ])
970
 
 
971
 
        done = Deferred()
972
 
        f = protocol.ClientFactory()
973
 
        f.protocol = lambda: client
974
 
        f.clientConnectionLost = lambda *args: done.callback(None)
975
 
        reactor.connectTCP('127.0.0.1', self.smtpServer.getHost().port, f)
976
 
 
977
 
        def finished(ign):
978
 
            mbox = domain.requestAvatar('user', None, pop3.IMailbox)[1]
979
 
            msg = mbox.getMessage(0).read()
980
 
            self.failIfEqual(msg.find('This is the message'), -1)
981
 
 
982
 
            return self.smtpServer.stopListening()
983
 
        done.addCallback(finished)
984
 
        return done
985
 
 
986
 
 
987
 
    def testRelayDelivery(self):
988
 
        # Here is the service we will connect to and send mail from
989
 
        insServ = mail.mail.MailService()
990
 
        insServ.smtpPortal.registerChecker(cred.checkers.AllowAnonymousAccess())
991
 
        domain = mail.maildir.MaildirDirdbmDomain(insServ, 'insertionDomain')
992
 
        insServ.addDomain('insertion.domain', domain)
993
 
        os.mkdir('insertionQueue')
994
 
        insServ.setQueue(mail.relaymanager.Queue('insertionQueue'))
995
 
        insServ.domains.setDefaultDomain(mail.relay.DomainQueuer(insServ))
996
 
        manager = mail.relaymanager.SmartHostSMTPRelayingManager(insServ.queue)
997
 
        manager.fArgs += ('test.identity.hostname',)
998
 
        helper = mail.relaymanager.RelayStateHelper(manager, 1)
999
 
        # Yoink!  Now the internet obeys OUR every whim!
1000
 
        manager.mxcalc = mail.relaymanager.MXCalculator(self.resolver)
1001
 
        # And this is our whim.
1002
 
        self.auth.addresses['destination.domain'] = ['127.0.0.1']
1003
 
 
1004
 
        f = insServ.getSMTPFactory()
1005
 
        self.insServer = reactor.listenTCP(0, f, interface='127.0.0.1')
1006
 
 
1007
 
        # Here is the service the previous one will connect to for final
1008
 
        # delivery
1009
 
        destServ = mail.mail.MailService()
1010
 
        destServ.smtpPortal.registerChecker(cred.checkers.AllowAnonymousAccess())
1011
 
        domain = mail.maildir.MaildirDirdbmDomain(destServ, 'destinationDomain')
1012
 
        domain.addUser('user', 'password')
1013
 
        destServ.addDomain('destination.domain', domain)
1014
 
        os.mkdir('destinationQueue')
1015
 
        destServ.setQueue(mail.relaymanager.Queue('destinationQueue'))
1016
 
        manager2 = mail.relaymanager.SmartHostSMTPRelayingManager(destServ.queue)
1017
 
        helper = mail.relaymanager.RelayStateHelper(manager, 1)
1018
 
        helper.startService()
1019
 
 
1020
 
        f = destServ.getSMTPFactory()
1021
 
        self.destServer = reactor.listenTCP(0, f, interface='127.0.0.1')
1022
 
 
1023
 
        # Update the port number the *first* relay will connect to, because we can't use
1024
 
        # port 25
1025
 
        manager.PORT = self.destServer.getHost().port
1026
 
 
1027
 
        client = LineSendingProtocol([
1028
 
            'HELO meson',
1029
 
            'MAIL FROM: <user@wherever>',
1030
 
            'RCPT TO: <user@destination.domain>',
1031
 
            'DATA',
1032
 
            'This is the message',
1033
 
            '.',
1034
 
            'QUIT'
1035
 
        ])
1036
 
 
1037
 
        done = Deferred()
1038
 
        f = protocol.ClientFactory()
1039
 
        f.protocol = lambda: client
1040
 
        f.clientConnectionLost = lambda *args: done.callback(None)
1041
 
        reactor.connectTCP('127.0.0.1', self.insServer.getHost().port, f)
1042
 
 
1043
 
        def finished(ign):
1044
 
            # First part of the delivery is done.  Poke the queue manually now
1045
 
            # so we don't have to wait for the queue to be flushed.
1046
 
            delivery = manager.checkState()
1047
 
            def delivered(ign):
1048
 
                mbox = domain.requestAvatar('user', None, pop3.IMailbox)[1]
1049
 
                msg = mbox.getMessage(0).read()
1050
 
                self.failIfEqual(msg.find('This is the message'), -1)
1051
 
 
1052
 
                self.insServer.stopListening()
1053
 
                self.destServer.stopListening()
1054
 
                helper.stopService()
1055
 
            delivery.addCallback(delivered)
1056
 
            return delivery
1057
 
        done.addCallback(finished)
1058
 
        return done
1059
 
 
1060
 
 
1061
 
aliasFile = StringIO.StringIO("""\
1062
 
# Here's a comment
1063
 
   # woop another one
1064
 
testuser:                   address1,address2, address3,
1065
 
    continuation@address, |/bin/process/this
1066
 
 
1067
 
usertwo:thisaddress,thataddress, lastaddress
1068
 
lastuser:       :/includable, /filename, |/program, address
1069
 
""")
1070
 
 
1071
 
class LineBufferMessage:
1072
 
    def __init__(self):
1073
 
        self.lines = []
1074
 
        self.eom = False
1075
 
        self.lost = False
1076
 
 
1077
 
    def lineReceived(self, line):
1078
 
        self.lines.append(line)
1079
 
 
1080
 
    def eomReceived(self):
1081
 
        self.eom = True
1082
 
        return defer.succeed('<Whatever>')
1083
 
 
1084
 
    def connectionLost(self):
1085
 
        self.lost = True
1086
 
 
1087
 
class AliasTestCase(unittest.TestCase):
1088
 
    lines = [
1089
 
        'First line',
1090
 
        'Next line',
1091
 
        '',
1092
 
        'After a blank line',
1093
 
        'Last line'
1094
 
    ]
1095
 
 
1096
 
    def setUp(self):
1097
 
        aliasFile.seek(0)
1098
 
 
1099
 
    def testHandle(self):
1100
 
        result = {}
1101
 
        lines = [
1102
 
            'user:  another@host\n',
1103
 
            'nextuser:  |/bin/program\n',
1104
 
            'user:  me@again\n',
1105
 
            'moreusers: :/etc/include/filename\n',
1106
 
            'multiuser: first@host, second@host,last@anotherhost',
1107
 
        ]
1108
 
 
1109
 
        for l in lines:
1110
 
            mail.alias.handle(result, l, 'TestCase', None)
1111
 
 
1112
 
        self.assertEquals(result['user'], ['another@host', 'me@again'])
1113
 
        self.assertEquals(result['nextuser'], ['|/bin/program'])
1114
 
        self.assertEquals(result['moreusers'], [':/etc/include/filename'])
1115
 
        self.assertEquals(result['multiuser'], ['first@host', 'second@host', 'last@anotherhost'])
1116
 
 
1117
 
    def testFileLoader(self):
1118
 
        domains = {'': object()}
1119
 
        result = mail.alias.loadAliasFile(domains, fp=aliasFile)
1120
 
 
1121
 
        self.assertEquals(len(result), 3)
1122
 
 
1123
 
        group = result['testuser']
1124
 
        s = str(group)
1125
 
        for a in ('address1', 'address2', 'address3', 'continuation@address', '/bin/process/this'):
1126
 
            self.failIfEqual(s.find(a), -1)
1127
 
        self.assertEquals(len(group), 5)
1128
 
 
1129
 
        group = result['usertwo']
1130
 
        s = str(group)
1131
 
        for a in ('thisaddress', 'thataddress', 'lastaddress'):
1132
 
            self.failIfEqual(s.find(a), -1)
1133
 
        self.assertEquals(len(group), 3)
1134
 
 
1135
 
        group = result['lastuser']
1136
 
        s = str(group)
1137
 
        self.failUnlessEqual(s.find('/includable'), -1)
1138
 
        for a in ('/filename', 'program', 'address'):
1139
 
            self.failIfEqual(s.find(a), -1, '%s not found' % a)
1140
 
        self.assertEquals(len(group), 3)
1141
 
 
1142
 
    def testMultiWrapper(self):
1143
 
        msgs = LineBufferMessage(), LineBufferMessage(), LineBufferMessage()
1144
 
        msg = mail.alias.MultiWrapper(msgs)
1145
 
 
1146
 
        for L in self.lines:
1147
 
            msg.lineReceived(L)
1148
 
        return msg.eomReceived().addCallback(self._cbMultiWrapper, msgs)
1149
 
 
1150
 
    def _cbMultiWrapper(self, ignored, msgs):
1151
 
        for m in msgs:
1152
 
            self.failUnless(m.eom)
1153
 
            self.failIf(m.lost)
1154
 
            self.assertEquals(self.lines, m.lines)
1155
 
 
1156
 
    def testFileAlias(self):
1157
 
        tmpfile = self.mktemp()
1158
 
        a = mail.alias.FileAlias(tmpfile, None, None)
1159
 
        m = a.createMessageReceiver()
1160
 
 
1161
 
        for l in self.lines:
1162
 
            m.lineReceived(l)
1163
 
        return m.eomReceived().addCallback(self._cbTestFileAlias, tmpfile)
1164
 
 
1165
 
    def _cbTestFileAlias(self, ignored, tmpfile):
1166
 
        lines = file(tmpfile).readlines()
1167
 
        self.assertEquals([L[:-1] for L in lines], self.lines)
1168
 
 
1169
 
 
1170
 
class DummyProcess(object):
1171
 
    __slots__ = ['onEnd']
1172
 
 
1173
 
class ProcessAliasTestCase(test_process.SignalMixin, unittest.TestCase):
1174
 
    lines = [
1175
 
        'First line',
1176
 
        'Next line',
1177
 
        '',
1178
 
        'After a blank line',
1179
 
        'Last line'
1180
 
    ]
1181
 
 
1182
 
    def setUpClass(self):
1183
 
        self.DNSNAME = smtp.DNSNAME
1184
 
        smtp.DNSNAME = ''
1185
 
 
1186
 
    def tearDownClass(self):
1187
 
        smtp.DNSNAME = self.DNSNAME
1188
 
 
1189
 
    def testProcessAlias(self):
1190
 
        path = util.sibpath(__file__, 'process.alias.sh')
1191
 
        a = mail.alias.ProcessAlias(path, None, None)
1192
 
        m = a.createMessageReceiver()
1193
 
 
1194
 
        for l in self.lines:
1195
 
            m.lineReceived(l)
1196
 
        return m.eomReceived().addCallback(self._cbProcessAlias)
1197
 
 
1198
 
    def _cbProcessAlias(self, ignored):
1199
 
        lines = file('process.alias.out').readlines()
1200
 
        self.assertEquals([L[:-1] for L in lines], self.lines)
1201
 
 
1202
 
    def testAliasResolution(self):
1203
 
        aliases = {}
1204
 
        domain = {'': TestDomain(aliases, ['user1', 'user2', 'user3'])}
1205
 
        A1 = mail.alias.AliasGroup(['user1', '|echo', '/file'], domain, 'alias1')
1206
 
        A2 = mail.alias.AliasGroup(['user2', 'user3'], domain, 'alias2')
1207
 
        A3 = mail.alias.AddressAlias('alias1', domain, 'alias3')
1208
 
        aliases.update({
1209
 
            'alias1': A1,
1210
 
            'alias2': A2,
1211
 
            'alias3': A3,
1212
 
        })
1213
 
 
1214
 
        r1 = map(str, A1.resolve(aliases).objs)
1215
 
        r1.sort()
1216
 
        expected = map(str, [
1217
 
            mail.alias.AddressAlias('user1', None, None),
1218
 
            mail.alias.MessageWrapper(DummyProcess(), 'echo'),
1219
 
            mail.alias.FileWrapper('/file'),
1220
 
        ])
1221
 
        expected.sort()
1222
 
        self.assertEquals(r1, expected)
1223
 
 
1224
 
        r2 = map(str, A2.resolve(aliases).objs)
1225
 
        r2.sort()
1226
 
        expected = map(str, [
1227
 
            mail.alias.AddressAlias('user2', None, None),
1228
 
            mail.alias.AddressAlias('user3', None, None)
1229
 
        ])
1230
 
        expected.sort()
1231
 
        self.assertEquals(r2, expected)
1232
 
 
1233
 
        r3 = map(str, A3.resolve(aliases).objs)
1234
 
        r3.sort()
1235
 
        expected = map(str, [
1236
 
            mail.alias.AddressAlias('user1', None, None),
1237
 
            mail.alias.MessageWrapper(DummyProcess(), 'echo'),
1238
 
            mail.alias.FileWrapper('/file'),
1239
 
        ])
1240
 
        expected.sort()
1241
 
        self.assertEquals(r3, expected)
1242
 
 
1243
 
    def testCyclicAlias(self):
1244
 
        aliases = {}
1245
 
        domain = {'': TestDomain(aliases, [])}
1246
 
        A1 = mail.alias.AddressAlias('alias2', domain, 'alias1')
1247
 
        A2 = mail.alias.AddressAlias('alias3', domain, 'alias2')
1248
 
        A3 = mail.alias.AddressAlias('alias1', domain, 'alias3')
1249
 
        aliases.update({
1250
 
            'alias1': A1,
1251
 
            'alias2': A2,
1252
 
            'alias3': A3
1253
 
        })
1254
 
 
1255
 
        self.assertEquals(aliases['alias1'].resolve(aliases), None)
1256
 
        self.assertEquals(aliases['alias2'].resolve(aliases), None)
1257
 
        self.assertEquals(aliases['alias3'].resolve(aliases), None)
1258
 
 
1259
 
        A4 = mail.alias.AliasGroup(['|echo', 'alias1'], domain, 'alias4')
1260
 
        aliases['alias4'] = A4
1261
 
        
1262
 
        r = map(str, A4.resolve(aliases).objs)
1263
 
        r.sort()
1264
 
        expected = map(str, [
1265
 
            mail.alias.MessageWrapper(DummyProcess(), 'echo')
1266
 
        ])
1267
 
 
1268
 
if interfaces.IReactorProcess(reactor, None) is None:
1269
 
    ProcessAliasTestCase = "IReactorProcess not supported"
1270
 
 
1271
 
class TestDomain:
1272
 
    def __init__(self, aliases, users):
1273
 
        self.aliases = aliases
1274
 
        self.users = users
1275
 
 
1276
 
    def exists(self, user, memo=None):
1277
 
        user = user.dest.local
1278
 
        if user in self.users:
1279
 
            return lambda: mail.alias.AddressAlias(user, None, None)
1280
 
        try:
1281
 
            a = self.aliases[user]
1282
 
        except:
1283
 
            raise smtp.SMTPBadRcpt(user)
1284
 
        else:
1285
 
            aliases = a.resolve(self.aliases, memo)
1286
 
            if aliases:
1287
 
                return lambda: aliases
1288
 
            raise smtp.SMTPBadRcpt(user)
1289
 
 
1290
 
 
1291
 
from twisted.python.runtime import platformType
1292
 
import types
1293
 
if platformType != "posix":
1294
 
    for o in locals().values():
1295
 
        if isinstance(o, (types.ClassType, type)) and issubclass(o, unittest.TestCase):
1296
 
            o.skip = "twisted.mail only works on posix"