1
# -*- test-case-name: twisted.names.test.test_names -*-
2
# Copyright (c) 2001-2009 Twisted Matrix Laboratories.
3
# See LICENSE for details.
6
Test cases for twisted.names.
9
import socket, operator, copy
11
from twisted.trial import unittest
13
from twisted.internet import reactor, defer, error
14
from twisted.internet.defer import succeed
15
from twisted.names import client, server, common, authority, hosts, dns
16
from twisted.python import failure
17
from twisted.names.error import DNSFormatError, DNSServerError, DNSNameError
18
from twisted.names.error import DNSNotImplementedError, DNSQueryRefusedError
19
from twisted.names.error import DNSUnknownError
20
from twisted.names.dns import EFORMAT, ESERVER, ENAME, ENOTIMP, EREFUSED
21
from twisted.names.dns import Message
22
from twisted.names.client import Resolver
24
from twisted.names.test.test_client import StubPort
25
from twisted.python.compat import reduce
27
def justPayload(results):
28
return [r.payload for r in results[0]]
30
class NoFileAuthority(authority.FileAuthority):
31
def __init__(self, soa, records):
32
# Yes, skip FileAuthority
33
common.ResolverBase.__init__(self)
34
self.soa, self.records = soa, records
37
soa_record = dns.Record_SOA(
38
mname = 'test-domain.com',
39
rname = 'root.test-domain.com',
48
reverse_soa = dns.Record_SOA(
49
mname = '93.84.28.in-addr.arpa',
50
rname = '93.84.28.in-addr.arpa',
59
my_soa = dns.Record_SOA(
60
mname = 'my-domain.com',
61
rname = 'postmaster.test-domain.com',
69
test_domain_com = NoFileAuthority(
70
soa = ('test-domain.com', soa_record),
74
dns.Record_A('127.0.0.1'),
75
dns.Record_NS('39.28.189.39'),
76
dns.Record_MX(10, 'host.test-domain.com'),
77
dns.Record_HINFO(os='Linux', cpu='A Fast One, Dontcha know'),
78
dns.Record_CNAME('canonical.name.com'),
79
dns.Record_MB('mailbox.test-domain.com'),
80
dns.Record_MG('mail.group.someplace'),
81
dns.Record_TXT('A First piece of Text', 'a SecoNd piece'),
82
dns.Record_A6(0, 'ABCD::4321', ''),
83
dns.Record_A6(12, '0:0069::0', 'some.network.tld'),
84
dns.Record_A6(8, '0:5634:1294:AFCB:56AC:48EF:34C3:01FF', 'tra.la.la.net'),
85
dns.Record_TXT('Some more text, haha! Yes. \0 Still here?'),
86
dns.Record_MR('mail.redirect.or.whatever'),
87
dns.Record_MINFO(rmailbx='r mail box', emailbx='e mail box'),
88
dns.Record_AFSDB(subtype=1, hostname='afsdb.test-domain.com'),
89
dns.Record_RP(mbox='whatever.i.dunno', txt='some.more.text'),
90
dns.Record_WKS('12.54.78.12', socket.IPPROTO_TCP,
91
'\x12\x01\x16\xfe\xc1\x00\x01'),
92
dns.Record_NAPTR(100, 10, "u", "sip+E2U",
93
"!^.*$!sip:information@domain.tld!"),
94
dns.Record_AAAA('AF43:5634:1294:AFCB:56AC:48EF:34C3:01FF')],
95
'http.tcp.test-domain.com': [
96
dns.Record_SRV(257, 16383, 43690, 'some.other.place.fool')
98
'host.test-domain.com': [
99
dns.Record_A('123.242.1.5'),
100
dns.Record_A('0.255.0.255'),
102
'host-two.test-domain.com': [
105
# dns.Record_A('255.255.255.255'),
107
dns.Record_A('255.255.255.254'),
108
dns.Record_A('0.0.0.0')
110
'cname.test-domain.com': [
111
dns.Record_CNAME('test-domain.com')
113
'anothertest-domain.com': [
114
dns.Record_A('1.2.3.4')],
118
reverse_domain = NoFileAuthority(
119
soa = ('93.84.28.in-addr.arpa', reverse_soa),
121
'123.93.84.28.in-addr.arpa': [
122
dns.Record_PTR('test.host-reverse.lookup.com'),
129
my_domain_com = NoFileAuthority(
130
soa = ('my-domain.com', my_soa),
134
dns.Record_A('1.2.3.4', ttl='1S'),
135
dns.Record_NS('ns1.domain', ttl='2M'),
136
dns.Record_NS('ns2.domain', ttl='3H'),
137
dns.Record_SRV(257, 16383, 43690, 'some.other.place.fool', ttl='4D')
143
class ServerDNSTestCase(unittest.TestCase):
145
Test cases for DNS server and client.
149
self.factory = server.DNSServerFactory([
150
test_domain_com, reverse_domain, my_domain_com
153
p = dns.DNSDatagramProtocol(self.factory)
156
listenerTCP = reactor.listenTCP(0, self.factory, interface="127.0.0.1")
157
# It's simpler to do the stop listening with addCleanup,
158
# even though we might not end up using this TCP port in
159
# the test (if the listenUDP below fails). Cleaning up
160
# this TCP port sooner than "cleanup time" would mean
161
# adding more code to keep track of the Deferred returned
163
self.addCleanup(listenerTCP.stopListening)
164
port = listenerTCP.getHost().port
167
listenerUDP = reactor.listenUDP(port, p, interface="127.0.0.1")
168
except error.CannotListenError:
171
self.addCleanup(listenerUDP.stopListening)
174
self.listenerTCP = listenerTCP
175
self.listenerUDP = listenerUDP
176
self.resolver = client.Resolver(servers=[('127.0.0.1', port)])
181
Clean up any server connections associated with the
182
L{DNSServerFactory} created in L{setUp}
184
# It'd be great if DNSServerFactory had a method that
185
# encapsulated this task. At least the necessary data is
187
for conn in self.factory.connections[:]:
188
conn.transport.loseConnection()
191
def namesTest(self, d, r):
193
def setDone(response):
194
self.response = response
196
def checkResults(ignored):
197
if isinstance(self.response, failure.Failure):
199
results = justPayload(self.response)
200
assert len(results) == len(r), "%s != %s" % (map(str, results), map(str, r))
202
assert rec in r, "%s not in %s" % (rec, map(str, r))
205
d.addCallback(checkResults)
208
def testAddressRecord1(self):
209
"""Test simple DNS 'A' record queries"""
210
return self.namesTest(
211
self.resolver.lookupAddress('test-domain.com'),
212
[dns.Record_A('127.0.0.1', ttl=19283784)]
216
def testAddressRecord2(self):
217
"""Test DNS 'A' record queries with multiple answers"""
218
return self.namesTest(
219
self.resolver.lookupAddress('host.test-domain.com'),
220
[dns.Record_A('123.242.1.5', ttl=19283784), dns.Record_A('0.255.0.255', ttl=19283784)]
224
def testAdressRecord3(self):
225
"""Test DNS 'A' record queries with edge cases"""
226
return self.namesTest(
227
self.resolver.lookupAddress('host-two.test-domain.com'),
228
[dns.Record_A('255.255.255.254', ttl=19283784), dns.Record_A('0.0.0.0', ttl=19283784)]
231
def testAuthority(self):
232
"""Test DNS 'SOA' record queries"""
233
return self.namesTest(
234
self.resolver.lookupAuthority('test-domain.com'),
239
def testMailExchangeRecord(self):
240
"""Test DNS 'MX' record queries"""
241
return self.namesTest(
242
self.resolver.lookupMailExchange('test-domain.com'),
243
[dns.Record_MX(10, 'host.test-domain.com', ttl=19283784)]
247
def testNameserver(self):
248
"""Test DNS 'NS' record queries"""
249
return self.namesTest(
250
self.resolver.lookupNameservers('test-domain.com'),
251
[dns.Record_NS('39.28.189.39', ttl=19283784)]
256
"""Test DNS 'HINFO' record queries"""
257
return self.namesTest(
258
self.resolver.lookupHostInfo('test-domain.com'),
259
[dns.Record_HINFO(os='Linux', cpu='A Fast One, Dontcha know', ttl=19283784)]
263
"""Test DNS 'PTR' record queries"""
264
return self.namesTest(
265
self.resolver.lookupPointer('123.93.84.28.in-addr.arpa'),
266
[dns.Record_PTR('test.host-reverse.lookup.com', ttl=11193983)]
271
"""Test DNS 'CNAME' record queries"""
272
return self.namesTest(
273
self.resolver.lookupCanonicalName('test-domain.com'),
274
[dns.Record_CNAME('canonical.name.com', ttl=19283784)]
277
def testCNAMEAdditional(self):
278
"""Test additional processing for CNAME records"""
279
return self.namesTest(
280
self.resolver.lookupAddress('cname.test-domain.com'),
281
[dns.Record_CNAME('test-domain.com', ttl=19283784), dns.Record_A('127.0.0.1', ttl=19283784)]
285
"""Test DNS 'MB' record queries"""
286
return self.namesTest(
287
self.resolver.lookupMailBox('test-domain.com'),
288
[dns.Record_MB('mailbox.test-domain.com', ttl=19283784)]
293
"""Test DNS 'MG' record queries"""
294
return self.namesTest(
295
self.resolver.lookupMailGroup('test-domain.com'),
296
[dns.Record_MG('mail.group.someplace', ttl=19283784)]
301
"""Test DNS 'MR' record queries"""
302
return self.namesTest(
303
self.resolver.lookupMailRename('test-domain.com'),
304
[dns.Record_MR('mail.redirect.or.whatever', ttl=19283784)]
309
"""Test DNS 'MINFO' record queries"""
310
return self.namesTest(
311
self.resolver.lookupMailboxInfo('test-domain.com'),
312
[dns.Record_MINFO(rmailbx='r mail box', emailbx='e mail box', ttl=19283784)]
317
"""Test DNS 'SRV' record queries"""
318
return self.namesTest(
319
self.resolver.lookupService('http.tcp.test-domain.com'),
320
[dns.Record_SRV(257, 16383, 43690, 'some.other.place.fool', ttl=19283784)]
324
"""Test DNS 'AFSDB' record queries"""
325
return self.namesTest(
326
self.resolver.lookupAFSDatabase('test-domain.com'),
327
[dns.Record_AFSDB(subtype=1, hostname='afsdb.test-domain.com', ttl=19283784)]
332
"""Test DNS 'RP' record queries"""
333
return self.namesTest(
334
self.resolver.lookupResponsibility('test-domain.com'),
335
[dns.Record_RP(mbox='whatever.i.dunno', txt='some.more.text', ttl=19283784)]
340
"""Test DNS 'TXT' record queries"""
341
return self.namesTest(
342
self.resolver.lookupText('test-domain.com'),
343
[dns.Record_TXT('A First piece of Text', 'a SecoNd piece', ttl=19283784),
344
dns.Record_TXT('Some more text, haha! Yes. \0 Still here?', ttl=19283784)]
349
"""Test DNS 'WKS' record queries"""
350
return self.namesTest(
351
self.resolver.lookupWellKnownServices('test-domain.com'),
352
[dns.Record_WKS('12.54.78.12', socket.IPPROTO_TCP, '\x12\x01\x16\xfe\xc1\x00\x01', ttl=19283784)]
356
def testSomeRecordsWithTTLs(self):
357
result_soa = copy.copy(my_soa)
358
result_soa.ttl = my_soa.expire
359
return self.namesTest(
360
self.resolver.lookupAllRecords('my-domain.com'),
362
dns.Record_A('1.2.3.4', ttl='1S'),
363
dns.Record_NS('ns1.domain', ttl='2M'),
364
dns.Record_NS('ns2.domain', ttl='3H'),
365
dns.Record_SRV(257, 16383, 43690, 'some.other.place.fool', ttl='4D')]
370
"""Test DNS 'AAAA' record queries (IPv6)"""
371
return self.namesTest(
372
self.resolver.lookupIPV6Address('test-domain.com'),
373
[dns.Record_AAAA('AF43:5634:1294:AFCB:56AC:48EF:34C3:01FF', ttl=19283784)]
377
"""Test DNS 'A6' record queries (IPv6)"""
378
return self.namesTest(
379
self.resolver.lookupAddress6('test-domain.com'),
380
[dns.Record_A6(0, 'ABCD::4321', '', ttl=19283784),
381
dns.Record_A6(12, '0:0069::0', 'some.network.tld', ttl=19283784),
382
dns.Record_A6(8, '0:5634:1294:AFCB:56AC:48EF:34C3:01FF', 'tra.la.la.net', ttl=19283784)]
386
def test_zoneTransfer(self):
388
Test DNS 'AXFR' queries (Zone transfer)
390
default_ttl = soa_record.expire
391
results = [copy.copy(r) for r in reduce(operator.add, test_domain_com.records.values())]
395
return self.namesTest(
396
self.resolver.lookupZone('test-domain.com').addCallback(lambda r: (r[0][:-1],)),
401
def testSimilarZonesDontInterfere(self):
402
"""Tests that unrelated zones don't mess with each other."""
403
return self.namesTest(
404
self.resolver.lookupAddress("anothertest-domain.com"),
405
[dns.Record_A('1.2.3.4', ttl=19283784)]
409
def test_NAPTR(self):
411
Test DNS 'NAPTR' record queries.
413
return self.namesTest(
414
self.resolver.lookupNamingAuthorityPointer('test-domain.com'),
415
[dns.Record_NAPTR(100, 10, "u", "sip+E2U",
416
"!^.*$!sip:information@domain.tld!",
421
class DNSServerFactoryTests(unittest.TestCase):
423
Tests for L{server.DNSServerFactory}.
425
def _messageReceivedTest(self, methodName, message):
427
Assert that the named method is called with the given message when
428
it is passed to L{DNSServerFactory.messageReceived}.
430
# Make it appear to have some queries so that
431
# DNSServerFactory.allowQuery allows it.
432
message.queries = [None]
434
receivedMessages = []
435
def fakeHandler(message, protocol, address):
436
receivedMessages.append((message, protocol, address))
438
class FakeProtocol(object):
439
def writeMessage(self, message):
442
protocol = FakeProtocol()
443
factory = server.DNSServerFactory(None)
444
setattr(factory, methodName, fakeHandler)
445
factory.messageReceived(message, protocol)
446
self.assertEqual(receivedMessages, [(message, protocol, None)])
449
def test_notifyMessageReceived(self):
451
L{DNSServerFactory.messageReceived} passes messages with an opcode
452
of C{OP_NOTIFY} on to L{DNSServerFactory.handleNotify}.
454
# RFC 1996, section 4.5
456
self._messageReceivedTest('handleNotify', Message(opCode=opCode))
459
def test_updateMessageReceived(self):
461
L{DNSServerFactory.messageReceived} passes messages with an opcode
462
of C{OP_UPDATE} on to L{DNSServerFactory.handleOther}.
464
This may change if the implementation ever covers update messages.
466
# RFC 2136, section 1.3
468
self._messageReceivedTest('handleOther', Message(opCode=opCode))
471
def test_connectionTracking(self):
473
The C{connectionMade} and C{connectionLost} methods of
474
L{DNSServerFactory} cooperate to keep track of all
475
L{DNSProtocol} objects created by a factory which are
478
protoA, protoB = object(), object()
479
factory = server.DNSServerFactory()
480
factory.connectionMade(protoA)
481
self.assertEqual(factory.connections, [protoA])
482
factory.connectionMade(protoB)
483
self.assertEqual(factory.connections, [protoA, protoB])
484
factory.connectionLost(protoA)
485
self.assertEqual(factory.connections, [protoB])
486
factory.connectionLost(protoB)
487
self.assertEqual(factory.connections, [])
490
class HelperTestCase(unittest.TestCase):
491
def testSerialGenerator(self):
493
a = authority.getSerial(f)
495
b = authority.getSerial(f)
496
self.failUnless(a < b)
500
class AXFRTest(unittest.TestCase):
503
self.d = defer.Deferred()
504
self.d.addCallback(self._gotResults)
505
self.controller = client.AXFRController('fooby.com', self.d)
507
self.soa = dns.RRHeader(name='fooby.com', type=dns.SOA, cls=dns.IN, ttl=86400, auth=False,
508
payload=dns.Record_SOA(mname='fooby.com',
509
rname='hooj.fooby.com',
519
dns.RRHeader(name='fooby.com', type=dns.NS, cls=dns.IN, ttl=700, auth=False,
520
payload=dns.Record_NS(name='ns.twistedmatrix.com', ttl=700)),
522
dns.RRHeader(name='fooby.com', type=dns.MX, cls=dns.IN, ttl=700, auth=False,
523
payload=dns.Record_MX(preference=10, exchange='mail.mv3d.com', ttl=700)),
525
dns.RRHeader(name='fooby.com', type=dns.A, cls=dns.IN, ttl=700, auth=False,
526
payload=dns.Record_A(address='64.123.27.105', ttl=700)),
530
def _makeMessage(self):
531
# hooray they all have the same message format
532
return dns.Message(id=999, answer=1, opCode=0, recDes=0, recAv=1, auth=1, rCode=0, trunc=0, maxSize=0)
534
def testBindAndTNamesStyle(self):
535
# Bind style = One big single message
536
m = self._makeMessage()
537
m.queries = [dns.Query('fooby.com', dns.AXFR, dns.IN)]
538
m.answers = self.records
539
self.controller.messageReceived(m, None)
540
self.assertEquals(self.results, self.records)
542
def _gotResults(self, result):
543
self.results = result
545
def testDJBStyle(self):
546
# DJB style = message per record
547
records = self.records[:]
549
m = self._makeMessage()
550
m.queries = [] # DJB *doesn't* specify any queries.. hmm..
551
m.answers = [records.pop(0)]
552
self.controller.messageReceived(m, None)
553
self.assertEquals(self.results, self.records)
555
class HostsTestCase(unittest.TestCase):
557
f = open('EtcHosts', 'w')
559
1.1.1.1 EXAMPLE EXAMPLE.EXAMPLETHING
564
self.resolver = hosts.Resolver('EtcHosts')
566
def testGetHostByName(self):
567
data = [('EXAMPLE', '1.1.1.1'),
568
('EXAMPLE.EXAMPLETHING', '1.1.1.1'),
569
('HOOJY', '1.1.1.2'),
571
ds = [self.resolver.getHostByName(n).addCallback(self.assertEqual, ip)
573
return defer.gatherResults(ds)
575
def testLookupAddress(self):
576
d = self.resolver.lookupAddress('HOOJY')
577
d.addCallback(lambda x: self.assertEqual(x[0][0].payload.dottedQuad(),
582
d = self.resolver.lookupIPV6Address('ip6thingy')
583
d.addCallback(self.assertEqual, '::1')
586
testIPv6.skip = 'IPv6 support is not in our hosts resolver yet'
588
def testNotImplemented(self):
589
return self.assertFailure(self.resolver.lookupMailExchange('EXAMPLE'),
593
d = self.resolver.query(dns.Query('EXAMPLE'))
594
d.addCallback(lambda x: self.assertEqual(x[0][0].payload.dottedQuad(),
598
def testNotFound(self):
599
return self.assertFailure(self.resolver.lookupAddress('foueoa'),
603
class FakeDNSDatagramProtocol(object):
606
self.transport = StubPort()
608
def query(self, address, queries, timeout=10, id=None):
609
self.queries.append((address, queries, timeout, id))
610
return defer.fail(dns.DNSQueryTimeoutError(queries))
612
def removeResend(self, id):
613
# Ignore this for the time being.
616
class RetryLogic(unittest.TestCase):
623
def testRoundRobinBackoff(self):
624
addrs = [(x, 53) for x in self.testServers]
625
r = client.Resolver(resolv=None, servers=addrs)
626
r.protocol = proto = FakeDNSDatagramProtocol()
627
return r.lookupAddress("foo.example.com"
628
).addCallback(self._cbRoundRobinBackoff
629
).addErrback(self._ebRoundRobinBackoff, proto
632
def _cbRoundRobinBackoff(self, result):
633
raise unittest.FailTest("Lookup address succeeded, should have timed out")
635
def _ebRoundRobinBackoff(self, failure, fakeProto):
636
failure.trap(defer.TimeoutError)
638
# Assert that each server is tried with a particular timeout
639
# before the timeout is increased and the attempts are repeated.
641
for t in (1, 3, 11, 45):
642
tries = fakeProto.queries[:len(self.testServers)]
643
del fakeProto.queries[:len(self.testServers)]
646
expected = list(self.testServers)
649
for ((addr, query, timeout, id), expectedAddr) in zip(tries, expected):
650
self.assertEquals(addr, (expectedAddr, 53))
651
self.assertEquals(timeout, t)
653
self.failIf(fakeProto.queries)
655
class ResolvConfHandling(unittest.TestCase):
656
def testMissing(self):
657
resolvConf = self.mktemp()
658
r = client.Resolver(resolv=resolvConf)
659
self.assertEquals(r.dynServers, [('127.0.0.1', 53)])
660
r._parseCall.cancel()
663
resolvConf = self.mktemp()
664
fObj = file(resolvConf, 'w')
666
r = client.Resolver(resolv=resolvConf)
667
self.assertEquals(r.dynServers, [('127.0.0.1', 53)])
668
r._parseCall.cancel()
672
class FilterAnswersTests(unittest.TestCase):
674
Test L{twisted.names.client.Resolver.filterAnswers}'s handling of various
675
error conditions it might encounter.
678
# Create a resolver pointed at an invalid server - we won't be hitting
679
# the network in any of these tests.
680
self.resolver = Resolver(servers=[('0.0.0.0', 0)])
683
def test_truncatedMessage(self):
685
Test that a truncated message results in an equivalent request made via
688
m = Message(trunc=True)
689
m.addQuery('example.com')
691
def queryTCP(queries):
692
self.assertEqual(queries, m.queries)
694
response.answers = ['answer']
695
response.authority = ['authority']
696
response.additional = ['additional']
697
return succeed(response)
698
self.resolver.queryTCP = queryTCP
699
d = self.resolver.filterAnswers(m)
701
self.assertEqual, (['answer'], ['authority'], ['additional']))
705
def _rcodeTest(self, rcode, exc):
706
m = Message(rCode=rcode)
707
err = self.resolver.filterAnswers(m)
711
def test_formatError(self):
713
Test that a message with a result code of C{EFORMAT} results in a
714
failure wrapped around L{DNSFormatError}.
716
return self._rcodeTest(EFORMAT, DNSFormatError)
719
def test_serverError(self):
721
Like L{test_formatError} but for C{ESERVER}/L{DNSServerError}.
723
return self._rcodeTest(ESERVER, DNSServerError)
726
def test_nameError(self):
728
Like L{test_formatError} but for C{ENAME}/L{DNSNameError}.
730
return self._rcodeTest(ENAME, DNSNameError)
733
def test_notImplementedError(self):
735
Like L{test_formatError} but for C{ENOTIMP}/L{DNSNotImplementedError}.
737
return self._rcodeTest(ENOTIMP, DNSNotImplementedError)
740
def test_refusedError(self):
742
Like L{test_formatError} but for C{EREFUSED}/L{DNSQueryRefusedError}.
744
return self._rcodeTest(EREFUSED, DNSQueryRefusedError)
747
def test_refusedErrorUnknown(self):
749
Like L{test_formatError} but for an unrecognized error code and
752
return self._rcodeTest(EREFUSED + 1, DNSUnknownError)