~justin-fathomdb/nova/justinsb-openstack-api-volumes

« back to all changes in this revision

Viewing changes to vendor/Twisted-10.0.0/twisted/names/test/test_names.py

  • Committer: Jesse Andrews
  • Date: 2010-05-28 06:05:26 UTC
  • Revision ID: git-v1:bf6e6e718cdc7488e2da87b21e258ccc065fe499
initial commit

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# -*- test-case-name: twisted.names.test.test_names -*-
 
2
# Copyright (c) 2001-2009 Twisted Matrix Laboratories.
 
3
# See LICENSE for details.
 
4
 
 
5
"""
 
6
Test cases for twisted.names.
 
7
"""
 
8
 
 
9
import socket, operator, copy
 
10
 
 
11
from twisted.trial import unittest
 
12
 
 
13
from twisted.internet import reactor, defer, error
 
14
from twisted.internet.defer import succeed
 
15
from twisted.names import client, server, common, authority, hosts, dns
 
16
from twisted.python import failure
 
17
from twisted.names.error import DNSFormatError, DNSServerError, DNSNameError
 
18
from twisted.names.error import DNSNotImplementedError, DNSQueryRefusedError
 
19
from twisted.names.error import DNSUnknownError
 
20
from twisted.names.dns import EFORMAT, ESERVER, ENAME, ENOTIMP, EREFUSED
 
21
from twisted.names.dns import Message
 
22
from twisted.names.client import Resolver
 
23
 
 
24
from twisted.names.test.test_client import StubPort
 
25
from twisted.python.compat import reduce
 
26
 
 
27
def justPayload(results):
 
28
    return [r.payload for r in results[0]]
 
29
 
 
30
class NoFileAuthority(authority.FileAuthority):
 
31
    def __init__(self, soa, records):
 
32
        # Yes, skip FileAuthority
 
33
        common.ResolverBase.__init__(self)
 
34
        self.soa, self.records = soa, records
 
35
 
 
36
 
 
37
soa_record = dns.Record_SOA(
 
38
                    mname = 'test-domain.com',
 
39
                    rname = 'root.test-domain.com',
 
40
                    serial = 100,
 
41
                    refresh = 1234,
 
42
                    minimum = 7654,
 
43
                    expire = 19283784,
 
44
                    retry = 15,
 
45
                    ttl=1
 
46
                )
 
47
 
 
48
reverse_soa = dns.Record_SOA(
 
49
                     mname = '93.84.28.in-addr.arpa',
 
50
                     rname = '93.84.28.in-addr.arpa',
 
51
                     serial = 120,
 
52
                     refresh = 54321,
 
53
                     minimum = 382,
 
54
                     expire = 11193983,
 
55
                     retry = 30,
 
56
                     ttl=3
 
57
                )
 
58
 
 
59
my_soa = dns.Record_SOA(
 
60
    mname = 'my-domain.com',
 
61
    rname = 'postmaster.test-domain.com',
 
62
    serial = 130,
 
63
    refresh = 12345,
 
64
    minimum = 1,
 
65
    expire = 999999,
 
66
    retry = 100,
 
67
    )
 
68
 
 
69
test_domain_com = NoFileAuthority(
 
70
    soa = ('test-domain.com', soa_record),
 
71
    records = {
 
72
        'test-domain.com': [
 
73
            soa_record,
 
74
            dns.Record_A('127.0.0.1'),
 
75
            dns.Record_NS('39.28.189.39'),
 
76
            dns.Record_MX(10, 'host.test-domain.com'),
 
77
            dns.Record_HINFO(os='Linux', cpu='A Fast One, Dontcha know'),
 
78
            dns.Record_CNAME('canonical.name.com'),
 
79
            dns.Record_MB('mailbox.test-domain.com'),
 
80
            dns.Record_MG('mail.group.someplace'),
 
81
            dns.Record_TXT('A First piece of Text', 'a SecoNd piece'),
 
82
            dns.Record_A6(0, 'ABCD::4321', ''),
 
83
            dns.Record_A6(12, '0:0069::0', 'some.network.tld'),
 
84
            dns.Record_A6(8, '0:5634:1294:AFCB:56AC:48EF:34C3:01FF', 'tra.la.la.net'),
 
85
            dns.Record_TXT('Some more text, haha!  Yes.  \0  Still here?'),
 
86
            dns.Record_MR('mail.redirect.or.whatever'),
 
87
            dns.Record_MINFO(rmailbx='r mail box', emailbx='e mail box'),
 
88
            dns.Record_AFSDB(subtype=1, hostname='afsdb.test-domain.com'),
 
89
            dns.Record_RP(mbox='whatever.i.dunno', txt='some.more.text'),
 
90
            dns.Record_WKS('12.54.78.12', socket.IPPROTO_TCP,
 
91
                           '\x12\x01\x16\xfe\xc1\x00\x01'),
 
92
            dns.Record_NAPTR(100, 10, "u", "sip+E2U",
 
93
                             "!^.*$!sip:information@domain.tld!"),
 
94
            dns.Record_AAAA('AF43:5634:1294:AFCB:56AC:48EF:34C3:01FF')],
 
95
        'http.tcp.test-domain.com': [
 
96
            dns.Record_SRV(257, 16383, 43690, 'some.other.place.fool')
 
97
        ],
 
98
        'host.test-domain.com': [
 
99
            dns.Record_A('123.242.1.5'),
 
100
            dns.Record_A('0.255.0.255'),
 
101
        ],
 
102
        'host-two.test-domain.com': [
 
103
#
 
104
#  Python bug
 
105
#           dns.Record_A('255.255.255.255'),
 
106
#
 
107
            dns.Record_A('255.255.255.254'),
 
108
            dns.Record_A('0.0.0.0')
 
109
        ],
 
110
        'cname.test-domain.com': [
 
111
            dns.Record_CNAME('test-domain.com')
 
112
        ],
 
113
        'anothertest-domain.com': [
 
114
            dns.Record_A('1.2.3.4')],
 
115
    }
 
116
)
 
117
 
 
118
reverse_domain = NoFileAuthority(
 
119
    soa = ('93.84.28.in-addr.arpa', reverse_soa),
 
120
    records = {
 
121
        '123.93.84.28.in-addr.arpa': [
 
122
             dns.Record_PTR('test.host-reverse.lookup.com'),
 
123
             reverse_soa
 
124
        ]
 
125
    }
 
126
)
 
127
 
 
128
 
 
129
my_domain_com = NoFileAuthority(
 
130
    soa = ('my-domain.com', my_soa),
 
131
    records = {
 
132
        'my-domain.com': [
 
133
            my_soa,
 
134
            dns.Record_A('1.2.3.4', ttl='1S'),
 
135
            dns.Record_NS('ns1.domain', ttl='2M'),
 
136
            dns.Record_NS('ns2.domain', ttl='3H'),
 
137
            dns.Record_SRV(257, 16383, 43690, 'some.other.place.fool', ttl='4D')
 
138
            ]
 
139
        }
 
140
    )
 
141
 
 
142
 
 
143
class ServerDNSTestCase(unittest.TestCase):
 
144
    """
 
145
    Test cases for DNS server and client.
 
146
    """
 
147
 
 
148
    def setUp(self):
 
149
        self.factory = server.DNSServerFactory([
 
150
            test_domain_com, reverse_domain, my_domain_com
 
151
        ], verbose=2)
 
152
 
 
153
        p = dns.DNSDatagramProtocol(self.factory)
 
154
 
 
155
        while 1:
 
156
            listenerTCP = reactor.listenTCP(0, self.factory, interface="127.0.0.1")
 
157
            # It's simpler to do the stop listening with addCleanup,
 
158
            # even though we might not end up using this TCP port in
 
159
            # the test (if the listenUDP below fails).  Cleaning up
 
160
            # this TCP port sooner than "cleanup time" would mean
 
161
            # adding more code to keep track of the Deferred returned
 
162
            # by stopListening.
 
163
            self.addCleanup(listenerTCP.stopListening)
 
164
            port = listenerTCP.getHost().port
 
165
 
 
166
            try:
 
167
                listenerUDP = reactor.listenUDP(port, p, interface="127.0.0.1")
 
168
            except error.CannotListenError:
 
169
                pass
 
170
            else:
 
171
                self.addCleanup(listenerUDP.stopListening)
 
172
                break
 
173
 
 
174
        self.listenerTCP = listenerTCP
 
175
        self.listenerUDP = listenerUDP
 
176
        self.resolver = client.Resolver(servers=[('127.0.0.1', port)])
 
177
 
 
178
 
 
179
    def tearDown(self):
 
180
        """
 
181
        Clean up any server connections associated with the
 
182
        L{DNSServerFactory} created in L{setUp}
 
183
        """
 
184
        # It'd be great if DNSServerFactory had a method that
 
185
        # encapsulated this task.  At least the necessary data is
 
186
        # available, though.
 
187
        for conn in self.factory.connections[:]:
 
188
            conn.transport.loseConnection()
 
189
 
 
190
 
 
191
    def namesTest(self, d, r):
 
192
        self.response = None
 
193
        def setDone(response):
 
194
            self.response = response
 
195
 
 
196
        def checkResults(ignored):
 
197
            if isinstance(self.response, failure.Failure):
 
198
                raise self.response
 
199
            results = justPayload(self.response)
 
200
            assert len(results) == len(r), "%s != %s" % (map(str, results), map(str, r))
 
201
            for rec in results:
 
202
                assert rec in r, "%s not in %s" % (rec, map(str, r))
 
203
 
 
204
        d.addBoth(setDone)
 
205
        d.addCallback(checkResults)
 
206
        return d
 
207
 
 
208
    def testAddressRecord1(self):
 
209
        """Test simple DNS 'A' record queries"""
 
210
        return self.namesTest(
 
211
            self.resolver.lookupAddress('test-domain.com'),
 
212
            [dns.Record_A('127.0.0.1', ttl=19283784)]
 
213
        )
 
214
 
 
215
 
 
216
    def testAddressRecord2(self):
 
217
        """Test DNS 'A' record queries with multiple answers"""
 
218
        return self.namesTest(
 
219
            self.resolver.lookupAddress('host.test-domain.com'),
 
220
            [dns.Record_A('123.242.1.5', ttl=19283784), dns.Record_A('0.255.0.255', ttl=19283784)]
 
221
        )
 
222
 
 
223
 
 
224
    def testAdressRecord3(self):
 
225
        """Test DNS 'A' record queries with edge cases"""
 
226
        return self.namesTest(
 
227
            self.resolver.lookupAddress('host-two.test-domain.com'),
 
228
            [dns.Record_A('255.255.255.254', ttl=19283784), dns.Record_A('0.0.0.0', ttl=19283784)]
 
229
        )
 
230
 
 
231
    def testAuthority(self):
 
232
        """Test DNS 'SOA' record queries"""
 
233
        return self.namesTest(
 
234
            self.resolver.lookupAuthority('test-domain.com'),
 
235
            [soa_record]
 
236
        )
 
237
 
 
238
 
 
239
    def testMailExchangeRecord(self):
 
240
        """Test DNS 'MX' record queries"""
 
241
        return self.namesTest(
 
242
            self.resolver.lookupMailExchange('test-domain.com'),
 
243
            [dns.Record_MX(10, 'host.test-domain.com', ttl=19283784)]
 
244
        )
 
245
 
 
246
 
 
247
    def testNameserver(self):
 
248
        """Test DNS 'NS' record queries"""
 
249
        return self.namesTest(
 
250
            self.resolver.lookupNameservers('test-domain.com'),
 
251
            [dns.Record_NS('39.28.189.39', ttl=19283784)]
 
252
        )
 
253
 
 
254
 
 
255
    def testHINFO(self):
 
256
        """Test DNS 'HINFO' record queries"""
 
257
        return self.namesTest(
 
258
            self.resolver.lookupHostInfo('test-domain.com'),
 
259
            [dns.Record_HINFO(os='Linux', cpu='A Fast One, Dontcha know', ttl=19283784)]
 
260
        )
 
261
 
 
262
    def testPTR(self):
 
263
        """Test DNS 'PTR' record queries"""
 
264
        return self.namesTest(
 
265
            self.resolver.lookupPointer('123.93.84.28.in-addr.arpa'),
 
266
            [dns.Record_PTR('test.host-reverse.lookup.com', ttl=11193983)]
 
267
        )
 
268
 
 
269
 
 
270
    def testCNAME(self):
 
271
        """Test DNS 'CNAME' record queries"""
 
272
        return self.namesTest(
 
273
            self.resolver.lookupCanonicalName('test-domain.com'),
 
274
            [dns.Record_CNAME('canonical.name.com', ttl=19283784)]
 
275
        )
 
276
 
 
277
    def testCNAMEAdditional(self):
 
278
        """Test additional processing for CNAME records"""
 
279
        return self.namesTest(
 
280
        self.resolver.lookupAddress('cname.test-domain.com'),
 
281
        [dns.Record_CNAME('test-domain.com', ttl=19283784), dns.Record_A('127.0.0.1', ttl=19283784)]
 
282
    )
 
283
 
 
284
    def testMB(self):
 
285
        """Test DNS 'MB' record queries"""
 
286
        return self.namesTest(
 
287
            self.resolver.lookupMailBox('test-domain.com'),
 
288
            [dns.Record_MB('mailbox.test-domain.com', ttl=19283784)]
 
289
        )
 
290
 
 
291
 
 
292
    def testMG(self):
 
293
        """Test DNS 'MG' record queries"""
 
294
        return self.namesTest(
 
295
            self.resolver.lookupMailGroup('test-domain.com'),
 
296
            [dns.Record_MG('mail.group.someplace', ttl=19283784)]
 
297
        )
 
298
 
 
299
 
 
300
    def testMR(self):
 
301
        """Test DNS 'MR' record queries"""
 
302
        return self.namesTest(
 
303
            self.resolver.lookupMailRename('test-domain.com'),
 
304
            [dns.Record_MR('mail.redirect.or.whatever', ttl=19283784)]
 
305
        )
 
306
 
 
307
 
 
308
    def testMINFO(self):
 
309
        """Test DNS 'MINFO' record queries"""
 
310
        return self.namesTest(
 
311
            self.resolver.lookupMailboxInfo('test-domain.com'),
 
312
            [dns.Record_MINFO(rmailbx='r mail box', emailbx='e mail box', ttl=19283784)]
 
313
        )
 
314
 
 
315
 
 
316
    def testSRV(self):
 
317
        """Test DNS 'SRV' record queries"""
 
318
        return self.namesTest(
 
319
            self.resolver.lookupService('http.tcp.test-domain.com'),
 
320
            [dns.Record_SRV(257, 16383, 43690, 'some.other.place.fool', ttl=19283784)]
 
321
        )
 
322
 
 
323
    def testAFSDB(self):
 
324
        """Test DNS 'AFSDB' record queries"""
 
325
        return self.namesTest(
 
326
            self.resolver.lookupAFSDatabase('test-domain.com'),
 
327
            [dns.Record_AFSDB(subtype=1, hostname='afsdb.test-domain.com', ttl=19283784)]
 
328
        )
 
329
 
 
330
 
 
331
    def testRP(self):
 
332
        """Test DNS 'RP' record queries"""
 
333
        return self.namesTest(
 
334
            self.resolver.lookupResponsibility('test-domain.com'),
 
335
            [dns.Record_RP(mbox='whatever.i.dunno', txt='some.more.text', ttl=19283784)]
 
336
        )
 
337
 
 
338
 
 
339
    def testTXT(self):
 
340
        """Test DNS 'TXT' record queries"""
 
341
        return self.namesTest(
 
342
            self.resolver.lookupText('test-domain.com'),
 
343
            [dns.Record_TXT('A First piece of Text', 'a SecoNd piece', ttl=19283784),
 
344
             dns.Record_TXT('Some more text, haha!  Yes.  \0  Still here?', ttl=19283784)]
 
345
        )
 
346
 
 
347
 
 
348
    def testWKS(self):
 
349
        """Test DNS 'WKS' record queries"""
 
350
        return self.namesTest(
 
351
            self.resolver.lookupWellKnownServices('test-domain.com'),
 
352
            [dns.Record_WKS('12.54.78.12', socket.IPPROTO_TCP, '\x12\x01\x16\xfe\xc1\x00\x01', ttl=19283784)]
 
353
        )
 
354
 
 
355
 
 
356
    def testSomeRecordsWithTTLs(self):
 
357
        result_soa = copy.copy(my_soa)
 
358
        result_soa.ttl = my_soa.expire
 
359
        return self.namesTest(
 
360
            self.resolver.lookupAllRecords('my-domain.com'),
 
361
            [result_soa,
 
362
             dns.Record_A('1.2.3.4', ttl='1S'),
 
363
             dns.Record_NS('ns1.domain', ttl='2M'),
 
364
             dns.Record_NS('ns2.domain', ttl='3H'),
 
365
             dns.Record_SRV(257, 16383, 43690, 'some.other.place.fool', ttl='4D')]
 
366
            )
 
367
 
 
368
 
 
369
    def testAAAA(self):
 
370
        """Test DNS 'AAAA' record queries (IPv6)"""
 
371
        return self.namesTest(
 
372
            self.resolver.lookupIPV6Address('test-domain.com'),
 
373
            [dns.Record_AAAA('AF43:5634:1294:AFCB:56AC:48EF:34C3:01FF', ttl=19283784)]
 
374
        )
 
375
 
 
376
    def testA6(self):
 
377
        """Test DNS 'A6' record queries (IPv6)"""
 
378
        return self.namesTest(
 
379
            self.resolver.lookupAddress6('test-domain.com'),
 
380
            [dns.Record_A6(0, 'ABCD::4321', '', ttl=19283784),
 
381
             dns.Record_A6(12, '0:0069::0', 'some.network.tld', ttl=19283784),
 
382
             dns.Record_A6(8, '0:5634:1294:AFCB:56AC:48EF:34C3:01FF', 'tra.la.la.net', ttl=19283784)]
 
383
         )
 
384
 
 
385
 
 
386
    def test_zoneTransfer(self):
 
387
        """
 
388
        Test DNS 'AXFR' queries (Zone transfer)
 
389
        """
 
390
        default_ttl = soa_record.expire
 
391
        results = [copy.copy(r) for r in reduce(operator.add, test_domain_com.records.values())]
 
392
        for r in results:
 
393
            if r.ttl is None:
 
394
                r.ttl = default_ttl
 
395
        return self.namesTest(
 
396
            self.resolver.lookupZone('test-domain.com').addCallback(lambda r: (r[0][:-1],)),
 
397
            results
 
398
        )
 
399
 
 
400
 
 
401
    def testSimilarZonesDontInterfere(self):
 
402
        """Tests that unrelated zones don't mess with each other."""
 
403
        return self.namesTest(
 
404
            self.resolver.lookupAddress("anothertest-domain.com"),
 
405
            [dns.Record_A('1.2.3.4', ttl=19283784)]
 
406
        )
 
407
 
 
408
 
 
409
    def test_NAPTR(self):
 
410
        """
 
411
        Test DNS 'NAPTR' record queries.
 
412
        """
 
413
        return self.namesTest(
 
414
            self.resolver.lookupNamingAuthorityPointer('test-domain.com'),
 
415
            [dns.Record_NAPTR(100, 10, "u", "sip+E2U",
 
416
                              "!^.*$!sip:information@domain.tld!",
 
417
                              ttl=19283784)])
 
418
 
 
419
 
 
420
 
 
421
class DNSServerFactoryTests(unittest.TestCase):
 
422
    """
 
423
    Tests for L{server.DNSServerFactory}.
 
424
    """
 
425
    def _messageReceivedTest(self, methodName, message):
 
426
        """
 
427
        Assert that the named method is called with the given message when
 
428
        it is passed to L{DNSServerFactory.messageReceived}.
 
429
        """
 
430
        # Make it appear to have some queries so that
 
431
        # DNSServerFactory.allowQuery allows it.
 
432
        message.queries = [None]
 
433
 
 
434
        receivedMessages = []
 
435
        def fakeHandler(message, protocol, address):
 
436
            receivedMessages.append((message, protocol, address))
 
437
 
 
438
        class FakeProtocol(object):
 
439
            def writeMessage(self, message):
 
440
                pass
 
441
 
 
442
        protocol = FakeProtocol()
 
443
        factory = server.DNSServerFactory(None)
 
444
        setattr(factory, methodName, fakeHandler)
 
445
        factory.messageReceived(message, protocol)
 
446
        self.assertEqual(receivedMessages, [(message, protocol, None)])
 
447
 
 
448
 
 
449
    def test_notifyMessageReceived(self):
 
450
        """
 
451
        L{DNSServerFactory.messageReceived} passes messages with an opcode
 
452
        of C{OP_NOTIFY} on to L{DNSServerFactory.handleNotify}.
 
453
        """
 
454
        # RFC 1996, section 4.5
 
455
        opCode = 4
 
456
        self._messageReceivedTest('handleNotify', Message(opCode=opCode))
 
457
 
 
458
 
 
459
    def test_updateMessageReceived(self):
 
460
        """
 
461
        L{DNSServerFactory.messageReceived} passes messages with an opcode
 
462
        of C{OP_UPDATE} on to L{DNSServerFactory.handleOther}.
 
463
 
 
464
        This may change if the implementation ever covers update messages.
 
465
        """
 
466
        # RFC 2136, section 1.3
 
467
        opCode = 5
 
468
        self._messageReceivedTest('handleOther', Message(opCode=opCode))
 
469
 
 
470
 
 
471
    def test_connectionTracking(self):
 
472
        """
 
473
        The C{connectionMade} and C{connectionLost} methods of
 
474
        L{DNSServerFactory} cooperate to keep track of all
 
475
        L{DNSProtocol} objects created by a factory which are
 
476
        connected.
 
477
        """
 
478
        protoA, protoB = object(), object()
 
479
        factory = server.DNSServerFactory()
 
480
        factory.connectionMade(protoA)
 
481
        self.assertEqual(factory.connections, [protoA])
 
482
        factory.connectionMade(protoB)
 
483
        self.assertEqual(factory.connections, [protoA, protoB])
 
484
        factory.connectionLost(protoA)
 
485
        self.assertEqual(factory.connections, [protoB])
 
486
        factory.connectionLost(protoB)
 
487
        self.assertEqual(factory.connections, [])
 
488
 
 
489
 
 
490
class HelperTestCase(unittest.TestCase):
 
491
    def testSerialGenerator(self):
 
492
        f = self.mktemp()
 
493
        a = authority.getSerial(f)
 
494
        for i in range(20):
 
495
            b = authority.getSerial(f)
 
496
            self.failUnless(a < b)
 
497
            a = b
 
498
 
 
499
 
 
500
class AXFRTest(unittest.TestCase):
 
501
    def setUp(self):
 
502
        self.results = None
 
503
        self.d = defer.Deferred()
 
504
        self.d.addCallback(self._gotResults)
 
505
        self.controller = client.AXFRController('fooby.com', self.d)
 
506
 
 
507
        self.soa = dns.RRHeader(name='fooby.com', type=dns.SOA, cls=dns.IN, ttl=86400, auth=False,
 
508
                                payload=dns.Record_SOA(mname='fooby.com',
 
509
                                                       rname='hooj.fooby.com',
 
510
                                                       serial=100,
 
511
                                                       refresh=200,
 
512
                                                       retry=300,
 
513
                                                       expire=400,
 
514
                                                       minimum=500,
 
515
                                                       ttl=600))
 
516
 
 
517
        self.records = [
 
518
            self.soa,
 
519
            dns.RRHeader(name='fooby.com', type=dns.NS, cls=dns.IN, ttl=700, auth=False,
 
520
                         payload=dns.Record_NS(name='ns.twistedmatrix.com', ttl=700)),
 
521
 
 
522
            dns.RRHeader(name='fooby.com', type=dns.MX, cls=dns.IN, ttl=700, auth=False,
 
523
                         payload=dns.Record_MX(preference=10, exchange='mail.mv3d.com', ttl=700)),
 
524
 
 
525
            dns.RRHeader(name='fooby.com', type=dns.A, cls=dns.IN, ttl=700, auth=False,
 
526
                         payload=dns.Record_A(address='64.123.27.105', ttl=700)),
 
527
            self.soa
 
528
            ]
 
529
 
 
530
    def _makeMessage(self):
 
531
        # hooray they all have the same message format
 
532
        return dns.Message(id=999, answer=1, opCode=0, recDes=0, recAv=1, auth=1, rCode=0, trunc=0, maxSize=0)
 
533
 
 
534
    def testBindAndTNamesStyle(self):
 
535
        # Bind style = One big single message
 
536
        m = self._makeMessage()
 
537
        m.queries = [dns.Query('fooby.com', dns.AXFR, dns.IN)]
 
538
        m.answers = self.records
 
539
        self.controller.messageReceived(m, None)
 
540
        self.assertEquals(self.results, self.records)
 
541
 
 
542
    def _gotResults(self, result):
 
543
        self.results = result
 
544
 
 
545
    def testDJBStyle(self):
 
546
        # DJB style = message per record
 
547
        records = self.records[:]
 
548
        while records:
 
549
            m = self._makeMessage()
 
550
            m.queries = [] # DJB *doesn't* specify any queries.. hmm..
 
551
            m.answers = [records.pop(0)]
 
552
            self.controller.messageReceived(m, None)
 
553
        self.assertEquals(self.results, self.records)
 
554
 
 
555
class HostsTestCase(unittest.TestCase):
 
556
    def setUp(self):
 
557
        f = open('EtcHosts', 'w')
 
558
        f.write('''
 
559
1.1.1.1    EXAMPLE EXAMPLE.EXAMPLETHING
 
560
1.1.1.2    HOOJY
 
561
::1        ip6thingy
 
562
''')
 
563
        f.close()
 
564
        self.resolver = hosts.Resolver('EtcHosts')
 
565
 
 
566
    def testGetHostByName(self):
 
567
        data = [('EXAMPLE', '1.1.1.1'),
 
568
                ('EXAMPLE.EXAMPLETHING', '1.1.1.1'),
 
569
                ('HOOJY', '1.1.1.2'),
 
570
                ]
 
571
        ds = [self.resolver.getHostByName(n).addCallback(self.assertEqual, ip)
 
572
              for n, ip in data]
 
573
        return defer.gatherResults(ds)
 
574
 
 
575
    def testLookupAddress(self):
 
576
        d = self.resolver.lookupAddress('HOOJY')
 
577
        d.addCallback(lambda x: self.assertEqual(x[0][0].payload.dottedQuad(),
 
578
                                                 '1.1.1.2'))
 
579
        return d
 
580
 
 
581
    def testIPv6(self):
 
582
        d = self.resolver.lookupIPV6Address('ip6thingy')
 
583
        d.addCallback(self.assertEqual, '::1')
 
584
        return d
 
585
 
 
586
    testIPv6.skip = 'IPv6 support is not in our hosts resolver yet'
 
587
 
 
588
    def testNotImplemented(self):
 
589
        return self.assertFailure(self.resolver.lookupMailExchange('EXAMPLE'),
 
590
                                  NotImplementedError)
 
591
 
 
592
    def testQuery(self):
 
593
        d = self.resolver.query(dns.Query('EXAMPLE'))
 
594
        d.addCallback(lambda x: self.assertEqual(x[0][0].payload.dottedQuad(),
 
595
                                                 '1.1.1.1'))
 
596
        return d
 
597
 
 
598
    def testNotFound(self):
 
599
        return self.assertFailure(self.resolver.lookupAddress('foueoa'),
 
600
                                  dns.DomainError)
 
601
 
 
602
 
 
603
class FakeDNSDatagramProtocol(object):
 
604
    def __init__(self):
 
605
        self.queries = []
 
606
        self.transport = StubPort()
 
607
 
 
608
    def query(self, address, queries, timeout=10, id=None):
 
609
        self.queries.append((address, queries, timeout, id))
 
610
        return defer.fail(dns.DNSQueryTimeoutError(queries))
 
611
 
 
612
    def removeResend(self, id):
 
613
        # Ignore this for the time being.
 
614
        pass
 
615
 
 
616
class RetryLogic(unittest.TestCase):
 
617
    testServers = [
 
618
        '1.2.3.4',
 
619
        '4.3.2.1',
 
620
        'a.b.c.d',
 
621
        'z.y.x.w']
 
622
 
 
623
    def testRoundRobinBackoff(self):
 
624
        addrs = [(x, 53) for x in self.testServers]
 
625
        r = client.Resolver(resolv=None, servers=addrs)
 
626
        r.protocol = proto = FakeDNSDatagramProtocol()
 
627
        return r.lookupAddress("foo.example.com"
 
628
            ).addCallback(self._cbRoundRobinBackoff
 
629
            ).addErrback(self._ebRoundRobinBackoff, proto
 
630
            )
 
631
 
 
632
    def _cbRoundRobinBackoff(self, result):
 
633
        raise unittest.FailTest("Lookup address succeeded, should have timed out")
 
634
 
 
635
    def _ebRoundRobinBackoff(self, failure, fakeProto):
 
636
        failure.trap(defer.TimeoutError)
 
637
 
 
638
        # Assert that each server is tried with a particular timeout
 
639
        # before the timeout is increased and the attempts are repeated.
 
640
 
 
641
        for t in (1, 3, 11, 45):
 
642
            tries = fakeProto.queries[:len(self.testServers)]
 
643
            del fakeProto.queries[:len(self.testServers)]
 
644
 
 
645
            tries.sort()
 
646
            expected = list(self.testServers)
 
647
            expected.sort()
 
648
 
 
649
            for ((addr, query, timeout, id), expectedAddr) in zip(tries, expected):
 
650
                self.assertEquals(addr, (expectedAddr, 53))
 
651
                self.assertEquals(timeout, t)
 
652
 
 
653
        self.failIf(fakeProto.queries)
 
654
 
 
655
class ResolvConfHandling(unittest.TestCase):
 
656
    def testMissing(self):
 
657
        resolvConf = self.mktemp()
 
658
        r = client.Resolver(resolv=resolvConf)
 
659
        self.assertEquals(r.dynServers, [('127.0.0.1', 53)])
 
660
        r._parseCall.cancel()
 
661
 
 
662
    def testEmpty(self):
 
663
        resolvConf = self.mktemp()
 
664
        fObj = file(resolvConf, 'w')
 
665
        fObj.close()
 
666
        r = client.Resolver(resolv=resolvConf)
 
667
        self.assertEquals(r.dynServers, [('127.0.0.1', 53)])
 
668
        r._parseCall.cancel()
 
669
 
 
670
 
 
671
 
 
672
class FilterAnswersTests(unittest.TestCase):
 
673
    """
 
674
    Test L{twisted.names.client.Resolver.filterAnswers}'s handling of various
 
675
    error conditions it might encounter.
 
676
    """
 
677
    def setUp(self):
 
678
        # Create a resolver pointed at an invalid server - we won't be hitting
 
679
        # the network in any of these tests.
 
680
        self.resolver = Resolver(servers=[('0.0.0.0', 0)])
 
681
 
 
682
 
 
683
    def test_truncatedMessage(self):
 
684
        """
 
685
        Test that a truncated message results in an equivalent request made via
 
686
        TCP.
 
687
        """
 
688
        m = Message(trunc=True)
 
689
        m.addQuery('example.com')
 
690
 
 
691
        def queryTCP(queries):
 
692
            self.assertEqual(queries, m.queries)
 
693
            response = Message()
 
694
            response.answers = ['answer']
 
695
            response.authority = ['authority']
 
696
            response.additional = ['additional']
 
697
            return succeed(response)
 
698
        self.resolver.queryTCP = queryTCP
 
699
        d = self.resolver.filterAnswers(m)
 
700
        d.addCallback(
 
701
            self.assertEqual, (['answer'], ['authority'], ['additional']))
 
702
        return d
 
703
 
 
704
 
 
705
    def _rcodeTest(self, rcode, exc):
 
706
        m = Message(rCode=rcode)
 
707
        err = self.resolver.filterAnswers(m)
 
708
        err.trap(exc)
 
709
 
 
710
 
 
711
    def test_formatError(self):
 
712
        """
 
713
        Test that a message with a result code of C{EFORMAT} results in a
 
714
        failure wrapped around L{DNSFormatError}.
 
715
        """
 
716
        return self._rcodeTest(EFORMAT, DNSFormatError)
 
717
 
 
718
 
 
719
    def test_serverError(self):
 
720
        """
 
721
        Like L{test_formatError} but for C{ESERVER}/L{DNSServerError}.
 
722
        """
 
723
        return self._rcodeTest(ESERVER, DNSServerError)
 
724
 
 
725
 
 
726
    def test_nameError(self):
 
727
        """
 
728
        Like L{test_formatError} but for C{ENAME}/L{DNSNameError}.
 
729
        """
 
730
        return self._rcodeTest(ENAME, DNSNameError)
 
731
 
 
732
 
 
733
    def test_notImplementedError(self):
 
734
        """
 
735
        Like L{test_formatError} but for C{ENOTIMP}/L{DNSNotImplementedError}.
 
736
        """
 
737
        return self._rcodeTest(ENOTIMP, DNSNotImplementedError)
 
738
 
 
739
 
 
740
    def test_refusedError(self):
 
741
        """
 
742
        Like L{test_formatError} but for C{EREFUSED}/L{DNSQueryRefusedError}.
 
743
        """
 
744
        return self._rcodeTest(EREFUSED, DNSQueryRefusedError)
 
745
 
 
746
 
 
747
    def test_refusedErrorUnknown(self):
 
748
        """
 
749
        Like L{test_formatError} but for an unrecognized error code and
 
750
        L{DNSUnknownError}.
 
751
        """
 
752
        return self._rcodeTest(EREFUSED + 1, DNSUnknownError)