1
# -*- test-case-name: twisted.names.test.test_names -*-
2
# Copyright (c) 2001-2004 Twisted Matrix Laboratories.
3
# See LICENSE for details.
6
Test cases for twisted.names.
8
from __future__ import nested_scopes
10
import socket, operator, copy
12
from twisted.trial import unittest
14
from twisted.internet import reactor, defer, error
15
from twisted.internet.defer import succeed
16
from twisted.names import client, server, common, authority, hosts, dns
17
from twisted.python import failure
18
from twisted.names.error import DNSFormatError, DNSServerError, DNSNameError
19
from twisted.names.error import DNSNotImplementedError, DNSQueryRefusedError
20
from twisted.names.error import DNSUnknownError
21
from twisted.names.dns import EFORMAT, ESERVER, ENAME, ENOTIMP, EREFUSED
22
from twisted.names.dns import Message
23
from twisted.names.client import Resolver
26
def justPayload(results):
27
return [r.payload for r in results[0]]
29
class NoFileAuthority(authority.FileAuthority):
30
def __init__(self, soa, records):
31
# Yes, skip FileAuthority
32
common.ResolverBase.__init__(self)
33
self.soa, self.records = soa, records
36
soa_record = dns.Record_SOA(
37
mname = 'test-domain.com',
38
rname = 'root.test-domain.com',
47
reverse_soa = dns.Record_SOA(
48
mname = '93.84.28.in-addr.arpa',
49
rname = '93.84.28.in-addr.arpa',
58
my_soa = dns.Record_SOA(
59
mname = 'my-domain.com',
60
rname = 'postmaster.test-domain.com',
68
test_domain_com = NoFileAuthority(
69
soa = ('test-domain.com', soa_record),
73
dns.Record_A('127.0.0.1'),
74
dns.Record_NS('39.28.189.39'),
75
dns.Record_MX(10, 'host.test-domain.com'),
76
dns.Record_HINFO(os='Linux', cpu='A Fast One, Dontcha know'),
77
dns.Record_CNAME('canonical.name.com'),
78
dns.Record_MB('mailbox.test-domain.com'),
79
dns.Record_MG('mail.group.someplace'),
80
dns.Record_TXT('A First piece of Text', 'a SecoNd piece'),
81
dns.Record_A6(0, 'ABCD::4321', ''),
82
dns.Record_A6(12, '0:0069::0', 'some.network.tld'),
83
dns.Record_A6(8, '0:5634:1294:AFCB:56AC:48EF:34C3:01FF', 'tra.la.la.net'),
84
dns.Record_TXT('Some more text, haha! Yes. \0 Still here?'),
85
dns.Record_MR('mail.redirect.or.whatever'),
86
dns.Record_MINFO(rmailbx='r mail box', emailbx='e mail box'),
87
dns.Record_AFSDB(subtype=1, hostname='afsdb.test-domain.com'),
88
dns.Record_RP(mbox='whatever.i.dunno', txt='some.more.text'),
89
dns.Record_WKS('12.54.78.12', socket.IPPROTO_TCP, '\x12\x01\x16\xfe\xc1\x00\x01'),
90
dns.Record_AAAA('AF43:5634:1294:AFCB:56AC:48EF:34C3:01FF')],
91
'http.tcp.test-domain.com': [
92
dns.Record_SRV(257, 16383, 43690, 'some.other.place.fool')
94
'host.test-domain.com': [
95
dns.Record_A('123.242.1.5'),
96
dns.Record_A('0.255.0.255'),
98
'host-two.test-domain.com': [
101
# dns.Record_A('255.255.255.255'),
103
dns.Record_A('255.255.255.254'),
104
dns.Record_A('0.0.0.0')
106
'cname.test-domain.com': [
107
dns.Record_CNAME('test-domain.com')
109
'anothertest-domain.com': [
110
dns.Record_A('1.2.3.4')],
114
reverse_domain = NoFileAuthority(
115
soa = ('93.84.28.in-addr.arpa', reverse_soa),
117
'123.93.84.28.in-addr.arpa': [
118
dns.Record_PTR('test.host-reverse.lookup.com'),
125
my_domain_com = NoFileAuthority(
126
soa = ('my-domain.com', my_soa),
130
dns.Record_A('1.2.3.4', ttl='1S'),
131
dns.Record_NS('ns1.domain', ttl='2M'),
132
dns.Record_NS('ns2.domain', ttl='3H'),
133
dns.Record_SRV(257, 16383, 43690, 'some.other.place.fool', ttl='4D')
139
class ServerDNSTestCase(unittest.TestCase):
140
"""Test cases for DNS server and client."""
143
self.factory = server.DNSServerFactory([
144
test_domain_com, reverse_domain, my_domain_com
147
p = dns.DNSDatagramProtocol(self.factory)
150
self.listenerTCP = reactor.listenTCP(0, self.factory, interface="127.0.0.1")
151
port = self.listenerTCP.getHost().port
154
self.listenerUDP = reactor.listenUDP(port, p, interface="127.0.0.1")
155
except error.CannotListenError:
156
self.listenerTCP.stopListening()
160
self.resolver = client.Resolver(servers=[('127.0.0.1', port)])
163
"""Asynchronously disconnect listenerTCP, listenerUDP and resolver"""
164
d1 = self.listenerTCP.loseConnection()
165
d2 = defer.maybeDeferred(self.listenerUDP.stopListening)
166
d = defer.gatherResults([d1, d2])
167
def disconnectTransport(ignored):
168
if getattr(self.resolver.protocol, 'transport', None) is not None:
169
return self.resolver.protocol.transport.stopListening()
170
d.addCallback(disconnectTransport)
171
d.addCallback(lambda x : self.failUnless(
172
self.listenerUDP.disconnected
173
and self.listenerTCP.disconnected))
176
def namesTest(self, d, r):
178
def setDone(response):
179
self.response = response
181
def checkResults(ignored):
182
if isinstance(self.response, failure.Failure):
184
results = justPayload(self.response)
185
assert len(results) == len(r), "%s != %s" % (map(str, results), map(str, r))
187
assert rec in r, "%s not in %s" % (rec, map(str, r))
190
d.addCallback(checkResults)
193
def testAddressRecord1(self):
194
"""Test simple DNS 'A' record queries"""
195
return self.namesTest(
196
self.resolver.lookupAddress('test-domain.com'),
197
[dns.Record_A('127.0.0.1', ttl=19283784)]
201
def testAddressRecord2(self):
202
"""Test DNS 'A' record queries with multiple answers"""
203
return self.namesTest(
204
self.resolver.lookupAddress('host.test-domain.com'),
205
[dns.Record_A('123.242.1.5', ttl=19283784), dns.Record_A('0.255.0.255', ttl=19283784)]
209
def testAdressRecord3(self):
210
"""Test DNS 'A' record queries with edge cases"""
211
return self.namesTest(
212
self.resolver.lookupAddress('host-two.test-domain.com'),
213
[dns.Record_A('255.255.255.254', ttl=19283784), dns.Record_A('0.0.0.0', ttl=19283784)]
216
def testAuthority(self):
217
"""Test DNS 'SOA' record queries"""
218
return self.namesTest(
219
self.resolver.lookupAuthority('test-domain.com'),
224
def testMailExchangeRecord(self):
225
"""Test DNS 'MX' record queries"""
226
return self.namesTest(
227
self.resolver.lookupMailExchange('test-domain.com'),
228
[dns.Record_MX(10, 'host.test-domain.com', ttl=19283784)]
232
def testNameserver(self):
233
"""Test DNS 'NS' record queries"""
234
return self.namesTest(
235
self.resolver.lookupNameservers('test-domain.com'),
236
[dns.Record_NS('39.28.189.39', ttl=19283784)]
241
"""Test DNS 'HINFO' record queries"""
242
return self.namesTest(
243
self.resolver.lookupHostInfo('test-domain.com'),
244
[dns.Record_HINFO(os='Linux', cpu='A Fast One, Dontcha know', ttl=19283784)]
248
"""Test DNS 'PTR' record queries"""
249
return self.namesTest(
250
self.resolver.lookupPointer('123.93.84.28.in-addr.arpa'),
251
[dns.Record_PTR('test.host-reverse.lookup.com', ttl=11193983)]
256
"""Test DNS 'CNAME' record queries"""
257
return self.namesTest(
258
self.resolver.lookupCanonicalName('test-domain.com'),
259
[dns.Record_CNAME('canonical.name.com', ttl=19283784)]
262
def testCNAMEAdditional(self):
263
"""Test additional processing for CNAME records"""
264
return self.namesTest(
265
self.resolver.lookupAddress('cname.test-domain.com'),
266
[dns.Record_CNAME('test-domain.com', ttl=19283784), dns.Record_A('127.0.0.1', ttl=19283784)]
270
"""Test DNS 'MB' record queries"""
271
return self.namesTest(
272
self.resolver.lookupMailBox('test-domain.com'),
273
[dns.Record_MB('mailbox.test-domain.com', ttl=19283784)]
278
"""Test DNS 'MG' record queries"""
279
return self.namesTest(
280
self.resolver.lookupMailGroup('test-domain.com'),
281
[dns.Record_MG('mail.group.someplace', ttl=19283784)]
286
"""Test DNS 'MR' record queries"""
287
return self.namesTest(
288
self.resolver.lookupMailRename('test-domain.com'),
289
[dns.Record_MR('mail.redirect.or.whatever', ttl=19283784)]
294
"""Test DNS 'MINFO' record queries"""
295
return self.namesTest(
296
self.resolver.lookupMailboxInfo('test-domain.com'),
297
[dns.Record_MINFO(rmailbx='r mail box', emailbx='e mail box', ttl=19283784)]
302
"""Test DNS 'SRV' record queries"""
303
return self.namesTest(
304
self.resolver.lookupService('http.tcp.test-domain.com'),
305
[dns.Record_SRV(257, 16383, 43690, 'some.other.place.fool', ttl=19283784)]
309
"""Test DNS 'AFSDB' record queries"""
310
return self.namesTest(
311
self.resolver.lookupAFSDatabase('test-domain.com'),
312
[dns.Record_AFSDB(subtype=1, hostname='afsdb.test-domain.com', ttl=19283784)]
317
"""Test DNS 'RP' record queries"""
318
return self.namesTest(
319
self.resolver.lookupResponsibility('test-domain.com'),
320
[dns.Record_RP(mbox='whatever.i.dunno', txt='some.more.text', ttl=19283784)]
325
"""Test DNS 'TXT' record queries"""
326
return self.namesTest(
327
self.resolver.lookupText('test-domain.com'),
328
[dns.Record_TXT('A First piece of Text', 'a SecoNd piece', ttl=19283784),
329
dns.Record_TXT('Some more text, haha! Yes. \0 Still here?', ttl=19283784)]
334
"""Test DNS 'WKS' record queries"""
335
return self.namesTest(
336
self.resolver.lookupWellKnownServices('test-domain.com'),
337
[dns.Record_WKS('12.54.78.12', socket.IPPROTO_TCP, '\x12\x01\x16\xfe\xc1\x00\x01', ttl=19283784)]
341
def testSomeRecordsWithTTLs(self):
342
result_soa = copy.copy(my_soa)
343
result_soa.ttl = my_soa.expire
344
return self.namesTest(
345
self.resolver.lookupAllRecords('my-domain.com'),
347
dns.Record_A('1.2.3.4', ttl='1S'),
348
dns.Record_NS('ns1.domain', ttl='2M'),
349
dns.Record_NS('ns2.domain', ttl='3H'),
350
dns.Record_SRV(257, 16383, 43690, 'some.other.place.fool', ttl='4D')]
355
"""Test DNS 'AAAA' record queries (IPv6)"""
356
return self.namesTest(
357
self.resolver.lookupIPV6Address('test-domain.com'),
358
[dns.Record_AAAA('AF43:5634:1294:AFCB:56AC:48EF:34C3:01FF', ttl=19283784)]
362
"""Test DNS 'A6' record queries (IPv6)"""
363
return self.namesTest(
364
self.resolver.lookupAddress6('test-domain.com'),
365
[dns.Record_A6(0, 'ABCD::4321', '', ttl=19283784),
366
dns.Record_A6(12, '0:0069::0', 'some.network.tld', ttl=19283784),
367
dns.Record_A6(8, '0:5634:1294:AFCB:56AC:48EF:34C3:01FF', 'tra.la.la.net', ttl=19283784)]
371
def testZoneTransfer(self):
372
"""Test DNS 'AXFR' queries (Zone transfer)"""
373
default_ttl = soa_record.expire
374
results = [copy.copy(r) for r in reduce(operator.add, test_domain_com.records.values())]
378
return self.namesTest(
379
self.resolver.lookupZone('test-domain.com').addCallback(lambda r: (r[0][:-1],)),
383
def testSimilarZonesDontInterfere(self):
384
"""Tests that unrelated zones don't mess with each other."""
385
return self.namesTest(
386
self.resolver.lookupAddress("anothertest-domain.com"),
387
[dns.Record_A('1.2.3.4', ttl=19283784)]
390
class HelperTestCase(unittest.TestCase):
391
def testSerialGenerator(self):
393
a = authority.getSerial(f)
395
b = authority.getSerial(f)
396
self.failUnless(a < b)
400
class AXFRTest(unittest.TestCase):
403
self.d = defer.Deferred()
404
self.d.addCallback(self._gotResults)
405
self.controller = client.AXFRController('fooby.com', self.d)
407
self.soa = dns.RRHeader(name='fooby.com', type=dns.SOA, cls=dns.IN, ttl=86400, auth=False,
408
payload=dns.Record_SOA(mname='fooby.com',
409
rname='hooj.fooby.com',
419
dns.RRHeader(name='fooby.com', type=dns.NS, cls=dns.IN, ttl=700, auth=False,
420
payload=dns.Record_NS(name='ns.twistedmatrix.com', ttl=700)),
422
dns.RRHeader(name='fooby.com', type=dns.MX, cls=dns.IN, ttl=700, auth=False,
423
payload=dns.Record_MX(preference=10, exchange='mail.mv3d.com', ttl=700)),
425
dns.RRHeader(name='fooby.com', type=dns.A, cls=dns.IN, ttl=700, auth=False,
426
payload=dns.Record_A(address='64.123.27.105', ttl=700)),
430
def _makeMessage(self):
431
# hooray they all have the same message format
432
return dns.Message(id=999, answer=1, opCode=0, recDes=0, recAv=1, auth=1, rCode=0, trunc=0, maxSize=0)
434
def testBindAndTNamesStyle(self):
435
# Bind style = One big single message
436
m = self._makeMessage()
437
m.queries = [dns.Query('fooby.com', dns.AXFR, dns.IN)]
438
m.answers = self.records
439
self.controller.messageReceived(m, None)
440
self.assertEquals(self.results, self.records)
442
def _gotResults(self, result):
443
self.results = result
445
def testDJBStyle(self):
446
# DJB style = message per record
447
records = self.records[:]
449
m = self._makeMessage()
450
m.queries = [] # DJB *doesn't* specify any queries.. hmm..
451
m.answers = [records.pop(0)]
452
self.controller.messageReceived(m, None)
453
self.assertEquals(self.results, self.records)
455
class HostsTestCase(unittest.TestCase):
457
f = open('EtcHosts', 'w')
459
1.1.1.1 EXAMPLE EXAMPLE.EXAMPLETHING
464
self.resolver = hosts.Resolver('EtcHosts')
466
def testGetHostByName(self):
467
data = [('EXAMPLE', '1.1.1.1'),
468
('EXAMPLE.EXAMPLETHING', '1.1.1.1'),
469
('HOOJY', '1.1.1.2'),
471
ds = [self.resolver.getHostByName(n).addCallback(self.assertEqual, ip)
473
return defer.gatherResults(ds)
475
def testLookupAddress(self):
476
d = self.resolver.lookupAddress('HOOJY')
477
d.addCallback(lambda x: self.assertEqual(x[0][0].payload.dottedQuad(),
482
d = self.resolver.lookupIPV6Address('ip6thingy')
483
d.addCallback(self.assertEqual, '::1')
486
testIPv6.skip = 'IPv6 support is not in our hosts resolver yet'
488
def testNotImplemented(self):
489
return self.assertFailure(self.resolver.lookupMailExchange('EXAMPLE'),
493
d = self.resolver.query(dns.Query('EXAMPLE'))
494
d.addCallback(lambda x: self.assertEqual(x[0][0].payload.dottedQuad(),
498
def testNotFound(self):
499
return self.assertFailure(self.resolver.lookupAddress('foueoa'),
503
class FakeDNSDatagramProtocol(object):
509
def query(self, address, queries, timeout=10, id=None):
510
self.queries.append((address, queries, timeout, id))
511
return defer.fail(dns.DNSQueryTimeoutError(queries))
513
def removeResend(self, id):
514
# Ignore this for the time being.
517
class RetryLogic(unittest.TestCase):
524
def testRoundRobinBackoff(self):
525
addrs = [(x, 53) for x in self.testServers]
526
r = client.Resolver(resolv=None, servers=addrs)
527
r.protocol = proto = FakeDNSDatagramProtocol()
528
return r.lookupAddress("foo.example.com"
529
).addCallback(self._cbRoundRobinBackoff
530
).addErrback(self._ebRoundRobinBackoff, proto
533
def _cbRoundRobinBackoff(self, result):
534
raise unittest.FailTest("Lookup address succeeded, should have timed out")
536
def _ebRoundRobinBackoff(self, failure, fakeProto):
537
failure.trap(defer.TimeoutError)
539
# Assert that each server is tried with a particular timeout
540
# before the timeout is increased and the attempts are repeated.
542
for t in (1, 3, 11, 45):
543
tries = fakeProto.queries[:len(self.testServers)]
544
del fakeProto.queries[:len(self.testServers)]
547
expected = list(self.testServers)
550
for ((addr, query, timeout, id), expectedAddr) in zip(tries, expected):
551
self.assertEquals(addr, (expectedAddr, 53))
552
self.assertEquals(timeout, t)
554
self.failIf(fakeProto.queries)
556
class ResolvConfHandling(unittest.TestCase):
557
def testMissing(self):
558
resolvConf = self.mktemp()
559
r = client.Resolver(resolv=resolvConf)
560
self.assertEquals(r.dynServers, [('127.0.0.1', 53)])
561
r._parseCall.cancel()
564
resolvConf = self.mktemp()
565
fObj = file(resolvConf, 'w')
567
r = client.Resolver(resolv=resolvConf)
568
self.assertEquals(r.dynServers, [('127.0.0.1', 53)])
569
r._parseCall.cancel()
573
class FilterAnswersTests(unittest.TestCase):
575
Test L{twisted.names.client.Resolver.filterAnswers}'s handling of various
576
error conditions it might encounter.
579
# Create a resolver pointed at an invalid server - we won't be hitting
580
# the network in any of these tests.
581
self.resolver = Resolver(servers=[('0.0.0.0', 0)])
584
def test_truncatedMessage(self):
586
Test that a truncated message results in an equivalent request made via
589
m = Message(trunc=True)
590
m.addQuery('example.com')
592
def queryTCP(queries):
593
self.assertEqual(queries, m.queries)
595
response.answers = ['answer']
596
response.authority = ['authority']
597
response.additional = ['additional']
598
return succeed(response)
599
self.resolver.queryTCP = queryTCP
600
d = self.resolver.filterAnswers(m)
602
self.assertEqual, (['answer'], ['authority'], ['additional']))
606
def _rcodeTest(self, rcode, exc):
607
m = Message(rCode=rcode)
608
err = self.resolver.filterAnswers(m)
612
def test_formatError(self):
614
Test that a message with a result code of C{EFORMAT} results in a
615
failure wrapped around L{DNSFormatError}.
617
return self._rcodeTest(EFORMAT, DNSFormatError)
620
def test_serverError(self):
622
Like L{test_formatError} but for C{ESERVER}/L{DNSServerError}.
624
return self._rcodeTest(ESERVER, DNSServerError)
627
def test_nameError(self):
629
Like L{test_formatError} but for C{ENAME}/L{DNSNameError}.
631
return self._rcodeTest(ENAME, DNSNameError)
634
def test_notImplementedError(self):
636
Like L{test_formatError} but for C{ENOTIMP}/L{DNSNotImplementedError}.
638
return self._rcodeTest(ENOTIMP, DNSNotImplementedError)
641
def test_refusedError(self):
643
Like L{test_formatError} but for C{EREFUSED}/L{DNSQueryRefusedError}.
645
return self._rcodeTest(EREFUSED, DNSQueryRefusedError)
648
def test_refusedError(self):
650
Like L{test_formatError} but for an unrecognized error code and
653
return self._rcodeTest(EREFUSED + 1, DNSUnknownError)