~justin-fathomdb/nova/justinsb-openstack-api-volumes

« back to all changes in this revision

Viewing changes to vendor/Twisted-10.0.0/twisted/names/test/test_rootresolve.py

  • Committer: Jesse Andrews
  • Date: 2010-05-28 06:05:26 UTC
  • Revision ID: git-v1:bf6e6e718cdc7488e2da87b21e258ccc065fe499
initial commit

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (c) 2001-2009 Twisted Matrix Laboratories.
 
2
# See LICENSE for details.
 
3
 
 
4
"""
 
5
Test cases for Twisted.names' root resolver.
 
6
"""
 
7
 
 
8
from random import randrange
 
9
 
 
10
from zope.interface import implements
 
11
from zope.interface.verify import verifyClass
 
12
 
 
13
from twisted.python.log import msg
 
14
from twisted.trial import util
 
15
from twisted.trial.unittest import TestCase
 
16
from twisted.internet.defer import Deferred, succeed, gatherResults
 
17
from twisted.internet.task import Clock
 
18
from twisted.internet.address import IPv4Address
 
19
from twisted.internet.interfaces import IReactorUDP, IUDPTransport
 
20
from twisted.names.root import Resolver, lookupNameservers, lookupAddress
 
21
from twisted.names.root import extractAuthority, discoverAuthority, retry
 
22
from twisted.names.dns import IN, HS, A, NS, CNAME, OK, ENAME, Record_CNAME
 
23
from twisted.names.dns import Query, Message, RRHeader, Record_A, Record_NS
 
24
from twisted.names.error import DNSNameError, ResolverError
 
25
 
 
26
 
 
27
class MemoryDatagramTransport(object):
 
28
    """
 
29
    This L{IUDPTransport} implementation enforces the usual connection rules
 
30
    and captures sent traffic in a list for later inspection.
 
31
 
 
32
    @ivar _host: The host address to which this transport is bound.
 
33
    @ivar _protocol: The protocol connected to this transport.
 
34
    @ivar _sentPackets: A C{list} of two-tuples of the datagrams passed to
 
35
        C{write} and the addresses to which they are destined.
 
36
 
 
37
    @ivar _connectedTo: C{None} if this transport is unconnected, otherwise an
 
38
        address to which all traffic is supposedly sent.
 
39
 
 
40
    @ivar _maxPacketSize: An C{int} giving the maximum length of a datagram
 
41
        which will be successfully handled by C{write}.
 
42
    """
 
43
    implements(IUDPTransport)
 
44
 
 
45
    def __init__(self, host, protocol, maxPacketSize):
 
46
        self._host = host
 
47
        self._protocol = protocol
 
48
        self._sentPackets = []
 
49
        self._connectedTo = None
 
50
        self._maxPacketSize = maxPacketSize
 
51
 
 
52
 
 
53
    def getHost(self):
 
54
        """
 
55
        Return the address which this transport is pretending to be bound
 
56
        to.
 
57
        """
 
58
        return IPv4Address('UDP', *self._host)
 
59
 
 
60
 
 
61
    def connect(self, host, port):
 
62
        """
 
63
        Connect this transport to the given address.
 
64
        """
 
65
        if self._connectedTo is not None:
 
66
            raise ValueError("Already connected")
 
67
        self._connectedTo = (host, port)
 
68
 
 
69
 
 
70
    def write(self, datagram, addr=None):
 
71
        """
 
72
        Send the given datagram.
 
73
        """
 
74
        if addr is None:
 
75
            addr = self._connectedTo
 
76
        if addr is None:
 
77
            raise ValueError("Need an address")
 
78
        if len(datagram) > self._maxPacketSize:
 
79
            raise ValueError("Packet too big")
 
80
        self._sentPackets.append((datagram, addr))
 
81
 
 
82
 
 
83
    def stopListening(self):
 
84
        """
 
85
        Shut down this transport.
 
86
        """
 
87
        self._protocol.stopProtocol()
 
88
        return succeed(None)
 
89
 
 
90
verifyClass(IUDPTransport, MemoryDatagramTransport)
 
91
 
 
92
 
 
93
 
 
94
class MemoryReactor(Clock):
 
95
    """
 
96
    An L{IReactorTime} and L{IReactorUDP} provider.
 
97
 
 
98
    Time is controlled deterministically via the base class, L{Clock}.  UDP is
 
99
    handled in-memory by connecting protocols to instances of
 
100
    L{MemoryDatagramTransport}.
 
101
 
 
102
    @ivar udpPorts: A C{dict} mapping port numbers to instances of
 
103
        L{MemoryDatagramTransport}.
 
104
    """
 
105
    implements(IReactorUDP)
 
106
 
 
107
    def __init__(self):
 
108
        Clock.__init__(self)
 
109
        self.udpPorts = {}
 
110
 
 
111
 
 
112
    def listenUDP(self, port, protocol, interface='', maxPacketSize=8192):
 
113
        """
 
114
        Pretend to bind a UDP port and connect the given protocol to it.
 
115
        """
 
116
        if port == 0:
 
117
            while True:
 
118
                port = randrange(1, 2 ** 16)
 
119
                if port not in self.udpPorts:
 
120
                    break
 
121
        if port in self.udpPorts:
 
122
            raise ValueError("Address in use")
 
123
        transport = MemoryDatagramTransport(
 
124
            (interface, port), protocol, maxPacketSize)
 
125
        self.udpPorts[port] = transport
 
126
        protocol.makeConnection(transport)
 
127
        return transport
 
128
 
 
129
verifyClass(IReactorUDP, MemoryReactor)
 
130
 
 
131
 
 
132
 
 
133
class RootResolverTests(TestCase):
 
134
    """
 
135
    Tests for L{twisted.names.root.Resolver}.
 
136
    """
 
137
    def _queryTest(self, filter):
 
138
        """
 
139
        Invoke L{Resolver._query} and verify that it sends the correct DNS
 
140
        query.  Deliver a canned response to the query and return whatever the
 
141
        L{Deferred} returned by L{Resolver._query} fires with.
 
142
 
 
143
        @param filter: The value to pass for the C{filter} parameter to
 
144
            L{Resolver._query}.
 
145
        """
 
146
        reactor = MemoryReactor()
 
147
        resolver = Resolver([], reactor=reactor)
 
148
        d = resolver._query(
 
149
            Query('foo.example.com', A, IN), [('1.1.2.3', 1053)], (30,),
 
150
            filter)
 
151
 
 
152
        # A UDP port should have been started.
 
153
        portNumber, transport = reactor.udpPorts.popitem()
 
154
 
 
155
        # And a DNS packet sent.
 
156
        [(packet, address)] = transport._sentPackets
 
157
 
 
158
        msg = Message()
 
159
        msg.fromStr(packet)
 
160
 
 
161
        # It should be a query with the parameters used above.
 
162
        self.assertEquals(msg.queries, [Query('foo.example.com', A, IN)])
 
163
        self.assertEquals(msg.answers, [])
 
164
        self.assertEquals(msg.authority, [])
 
165
        self.assertEquals(msg.additional, [])
 
166
 
 
167
        response = []
 
168
        d.addCallback(response.append)
 
169
        self.assertEquals(response, [])
 
170
 
 
171
        # Once a reply is received, the Deferred should fire.
 
172
        del msg.queries[:]
 
173
        msg.answer = 1
 
174
        msg.answers.append(RRHeader('foo.example.com', payload=Record_A('5.8.13.21')))
 
175
        transport._protocol.datagramReceived(msg.toStr(), ('1.1.2.3', 1053))
 
176
        return response[0]
 
177
 
 
178
 
 
179
    def test_filteredQuery(self):
 
180
        """
 
181
        L{Resolver._query} accepts a L{Query} instance and an address, issues
 
182
        the query, and returns a L{Deferred} which fires with the response to
 
183
        the query.  If a true value is passed for the C{filter} parameter, the
 
184
        result is a three-tuple of lists of records.
 
185
        """
 
186
        answer, authority, additional = self._queryTest(True)
 
187
        self.assertEquals(
 
188
            answer,
 
189
            [RRHeader('foo.example.com', payload=Record_A('5.8.13.21', ttl=0))])
 
190
        self.assertEquals(authority, [])
 
191
        self.assertEquals(additional, [])
 
192
 
 
193
 
 
194
    def test_unfilteredQuery(self):
 
195
        """
 
196
        Similar to L{test_filteredQuery}, but for the case where a false value
 
197
        is passed for the C{filter} parameter.  In this case, the result is a
 
198
        L{Message} instance.
 
199
        """
 
200
        message = self._queryTest(False)
 
201
        self.assertIsInstance(message, Message)
 
202
        self.assertEquals(message.queries, [])
 
203
        self.assertEquals(
 
204
            message.answers,
 
205
            [RRHeader('foo.example.com', payload=Record_A('5.8.13.21', ttl=0))])
 
206
        self.assertEquals(message.authority, [])
 
207
        self.assertEquals(message.additional, [])
 
208
 
 
209
 
 
210
    def _respond(self, answers=[], authority=[], additional=[], rCode=OK):
 
211
        """
 
212
        Create a L{Message} suitable for use as a response to a query.
 
213
 
 
214
        @param answers: A C{list} of two-tuples giving data for the answers
 
215
            section of the message.  The first element of each tuple is a name
 
216
            for the L{RRHeader}.  The second element is the payload.
 
217
        @param authority: A C{list} like C{answers}, but for the authority
 
218
            section of the response.
 
219
        @param additional: A C{list} like C{answers}, but for the
 
220
            additional section of the response.
 
221
        @param rCode: The response code the message will be created with.
 
222
 
 
223
        @return: A new L{Message} initialized with the given values.
 
224
        """
 
225
        response = Message(rCode=rCode)
 
226
        for (section, data) in [(response.answers, answers),
 
227
                                (response.authority, authority),
 
228
                                (response.additional, additional)]:
 
229
            section.extend([
 
230
                    RRHeader(name, record.TYPE, getattr(record, 'CLASS', IN),
 
231
                             payload=record)
 
232
                    for (name, record) in data])
 
233
        return response
 
234
 
 
235
 
 
236
    def _getResolver(self, serverResponses, maximumQueries=10):
 
237
        """
 
238
        Create and return a new L{root.Resolver} modified to resolve queries
 
239
        against the record data represented by C{servers}.
 
240
 
 
241
        @param serverResponses: A mapping from dns server addresses to
 
242
            mappings.  The inner mappings are from query two-tuples (name,
 
243
            type) to dictionaries suitable for use as **arguments to
 
244
            L{_respond}.  See that method for details.
 
245
        """
 
246
        roots = ['1.1.2.3']
 
247
        resolver = Resolver(roots, maximumQueries)
 
248
 
 
249
        def query(query, serverAddresses, timeout, filter):
 
250
            msg("Query for QNAME %s at %r" % (query.name, serverAddresses))
 
251
            for addr in serverAddresses:
 
252
                try:
 
253
                    server = serverResponses[addr]
 
254
                except KeyError:
 
255
                    continue
 
256
                records = server[str(query.name), query.type]
 
257
                return succeed(self._respond(**records))
 
258
        resolver._query = query
 
259
        return resolver
 
260
 
 
261
 
 
262
    def test_lookupAddress(self):
 
263
        """
 
264
        L{root.Resolver.lookupAddress} looks up the I{A} records for the
 
265
        specified hostname by first querying one of the root servers the
 
266
        resolver was created with and then following the authority delegations
 
267
        until a result is received.
 
268
        """
 
269
        servers = {
 
270
            ('1.1.2.3', 53): {
 
271
                ('foo.example.com', A): {
 
272
                    'authority': [('foo.example.com', Record_NS('ns1.example.com'))],
 
273
                    'additional': [('ns1.example.com', Record_A('34.55.89.144'))],
 
274
                    },
 
275
                },
 
276
            ('34.55.89.144', 53): {
 
277
                ('foo.example.com', A): {
 
278
                    'answers': [('foo.example.com', Record_A('10.0.0.1'))],
 
279
                    }
 
280
                },
 
281
            }
 
282
        resolver = self._getResolver(servers)
 
283
        d = resolver.lookupAddress('foo.example.com')
 
284
        d.addCallback(lambda (ans, auth, add): ans[0].payload.dottedQuad())
 
285
        d.addCallback(self.assertEquals, '10.0.0.1')
 
286
        return d
 
287
 
 
288
 
 
289
    def test_lookupChecksClass(self):
 
290
        """
 
291
        If a response includes a record with a class different from the one
 
292
        in the query, it is ignored and lookup continues until a record with
 
293
        the right class is found.
 
294
        """
 
295
        badClass = Record_A('10.0.0.1')
 
296
        badClass.CLASS = HS
 
297
        servers = {
 
298
            ('1.1.2.3', 53): {
 
299
                ('foo.example.com', A): {
 
300
                    'answers': [('foo.example.com', badClass)],
 
301
                    'authority': [('foo.example.com', Record_NS('ns1.example.com'))],
 
302
                    'additional': [('ns1.example.com', Record_A('10.0.0.2'))],
 
303
                },
 
304
            },
 
305
            ('10.0.0.2', 53): {
 
306
                ('foo.example.com', A): {
 
307
                    'answers': [('foo.example.com', Record_A('10.0.0.3'))],
 
308
                },
 
309
            },
 
310
        }
 
311
        resolver = self._getResolver(servers)
 
312
        d = resolver.lookupAddress('foo.example.com')
 
313
        d.addCallback(lambda (ans, auth, add): ans[0].payload)
 
314
        d.addCallback(self.assertEquals, Record_A('10.0.0.3'))
 
315
        return d
 
316
 
 
317
 
 
318
    def test_missingGlue(self):
 
319
        """
 
320
        If an intermediate response includes no glue records for the
 
321
        authorities, separate queries are made to find those addresses.
 
322
        """
 
323
        servers = {
 
324
            ('1.1.2.3', 53): {
 
325
                ('foo.example.com', A): {
 
326
                    'authority': [('foo.example.com', Record_NS('ns1.example.org'))],
 
327
                    # Conspicuous lack of an additional section naming ns1.example.com
 
328
                    },
 
329
                ('ns1.example.org', A): {
 
330
                    'answers': [('ns1.example.org', Record_A('10.0.0.1'))],
 
331
                    },
 
332
                },
 
333
            ('10.0.0.1', 53): {
 
334
                ('foo.example.com', A): {
 
335
                    'answers': [('foo.example.com', Record_A('10.0.0.2'))],
 
336
                    },
 
337
                },
 
338
            }
 
339
        resolver = self._getResolver(servers)
 
340
        d = resolver.lookupAddress('foo.example.com')
 
341
        d.addCallback(lambda (ans, auth, add): ans[0].payload.dottedQuad())
 
342
        d.addCallback(self.assertEquals, '10.0.0.2')
 
343
        return d
 
344
 
 
345
 
 
346
    def test_missingName(self):
 
347
        """
 
348
        If a name is missing, L{Resolver.lookupAddress} returns a L{Deferred}
 
349
        which fails with L{DNSNameError}.
 
350
        """
 
351
        servers = {
 
352
            ('1.1.2.3', 53): {
 
353
                ('foo.example.com', A): {
 
354
                    'rCode': ENAME,
 
355
                    },
 
356
                },
 
357
            }
 
358
        resolver = self._getResolver(servers)
 
359
        d = resolver.lookupAddress('foo.example.com')
 
360
        return self.assertFailure(d, DNSNameError)
 
361
 
 
362
 
 
363
    def test_answerless(self):
 
364
        """
 
365
        If a query is responded to with no answers or nameserver records, the
 
366
        L{Deferred} returned by L{Resolver.lookupAddress} fires with
 
367
        L{ResolverError}.
 
368
        """
 
369
        servers = {
 
370
            ('1.1.2.3', 53): {
 
371
                ('example.com', A): {
 
372
                    },
 
373
                },
 
374
            }
 
375
        resolver = self._getResolver(servers)
 
376
        d = resolver.lookupAddress('example.com')
 
377
        return self.assertFailure(d, ResolverError)
 
378
 
 
379
 
 
380
    def test_delegationLookupError(self):
 
381
        """
 
382
        If there is an error resolving the nameserver in a delegation response,
 
383
        the L{Deferred} returned by L{Resolver.lookupAddress} fires with that
 
384
        error.
 
385
        """
 
386
        servers = {
 
387
            ('1.1.2.3', 53): {
 
388
                ('example.com', A): {
 
389
                    'authority': [('example.com', Record_NS('ns1.example.com'))],
 
390
                    },
 
391
                ('ns1.example.com', A): {
 
392
                    'rCode': ENAME,
 
393
                    },
 
394
                },
 
395
            }
 
396
        resolver = self._getResolver(servers)
 
397
        d = resolver.lookupAddress('example.com')
 
398
        return self.assertFailure(d, DNSNameError)
 
399
 
 
400
 
 
401
    def test_delegationLookupEmpty(self):
 
402
        """
 
403
        If there are no records in the response to a lookup of a delegation
 
404
        nameserver, the L{Deferred} returned by L{Resolver.lookupAddress} fires
 
405
        with L{ResolverError}.
 
406
        """
 
407
        servers = {
 
408
            ('1.1.2.3', 53): {
 
409
                ('example.com', A): {
 
410
                    'authority': [('example.com', Record_NS('ns1.example.com'))],
 
411
                    },
 
412
                ('ns1.example.com', A): {
 
413
                    },
 
414
                },
 
415
            }
 
416
        resolver = self._getResolver(servers)
 
417
        d = resolver.lookupAddress('example.com')
 
418
        return self.assertFailure(d, ResolverError)
 
419
 
 
420
 
 
421
    def test_lookupNameservers(self):
 
422
        """
 
423
        L{Resolver.lookupNameservers} is like L{Resolver.lookupAddress}, except
 
424
        it queries for I{NS} records instead of I{A} records.
 
425
        """
 
426
        servers = {
 
427
            ('1.1.2.3', 53): {
 
428
                ('example.com', A): {
 
429
                    'rCode': ENAME,
 
430
                    },
 
431
                ('example.com', NS): {
 
432
                    'answers': [('example.com', Record_NS('ns1.example.com'))],
 
433
                    },
 
434
                },
 
435
            }
 
436
        resolver = self._getResolver(servers)
 
437
        d = resolver.lookupNameservers('example.com')
 
438
        d.addCallback(lambda (ans, auth, add): str(ans[0].payload.name))
 
439
        d.addCallback(self.assertEquals, 'ns1.example.com')
 
440
        return d
 
441
 
 
442
 
 
443
    def test_returnCanonicalName(self):
 
444
        """
 
445
        If a I{CNAME} record is encountered as the answer to a query for
 
446
        another record type, that record is returned as the answer.
 
447
        """
 
448
        servers = {
 
449
            ('1.1.2.3', 53): {
 
450
                ('example.com', A): {
 
451
                    'answers': [('example.com', Record_CNAME('example.net')),
 
452
                                ('example.net', Record_A('10.0.0.7'))],
 
453
                    },
 
454
                },
 
455
            }
 
456
        resolver = self._getResolver(servers)
 
457
        d = resolver.lookupAddress('example.com')
 
458
        d.addCallback(lambda (ans, auth, add): ans)
 
459
        d.addCallback(
 
460
            self.assertEquals,
 
461
            [RRHeader('example.com', CNAME, payload=Record_CNAME('example.net')),
 
462
             RRHeader('example.net', A, payload=Record_A('10.0.0.7'))])
 
463
        return d
 
464
 
 
465
 
 
466
    def test_followCanonicalName(self):
 
467
        """
 
468
        If no record of the requested type is included in a response, but a
 
469
        I{CNAME} record for the query name is included, queries are made to
 
470
        resolve the value of the I{CNAME}.
 
471
        """
 
472
        servers = {
 
473
            ('1.1.2.3', 53): {
 
474
                ('example.com', A): {
 
475
                    'answers': [('example.com', Record_CNAME('example.net'))],
 
476
                },
 
477
                ('example.net', A): {
 
478
                    'answers': [('example.net', Record_A('10.0.0.5'))],
 
479
                },
 
480
            },
 
481
        }
 
482
        resolver = self._getResolver(servers)
 
483
        d = resolver.lookupAddress('example.com')
 
484
        d.addCallback(lambda (ans, auth, add): ans)
 
485
        d.addCallback(
 
486
            self.assertEquals,
 
487
            [RRHeader('example.com', CNAME, payload=Record_CNAME('example.net')),
 
488
             RRHeader('example.net', A, payload=Record_A('10.0.0.5'))])
 
489
        return d
 
490
 
 
491
 
 
492
    def test_detectCanonicalNameLoop(self):
 
493
        """
 
494
        If there is a cycle between I{CNAME} records in a response, this is
 
495
        detected and the L{Deferred} returned by the lookup method fails
 
496
        with L{ResolverError}.
 
497
        """
 
498
        servers = {
 
499
            ('1.1.2.3', 53): {
 
500
                ('example.com', A): {
 
501
                    'answers': [('example.com', Record_CNAME('example.net')),
 
502
                                ('example.net', Record_CNAME('example.com'))],
 
503
                },
 
504
            },
 
505
        }
 
506
        resolver = self._getResolver(servers)
 
507
        d = resolver.lookupAddress('example.com')
 
508
        return self.assertFailure(d, ResolverError)
 
509
 
 
510
 
 
511
    def test_boundedQueries(self):
 
512
        """
 
513
        L{Resolver.lookupAddress} won't issue more queries following
 
514
        delegations than the limit passed to its initializer.
 
515
        """
 
516
        servers = {
 
517
            ('1.1.2.3', 53): {
 
518
                # First query - force it to start over with a name lookup of
 
519
                # ns1.example.com
 
520
                ('example.com', A): {
 
521
                    'authority': [('example.com', Record_NS('ns1.example.com'))],
 
522
                },
 
523
                # Second query - let it resume the original lookup with the
 
524
                # address of the nameserver handling the delegation.
 
525
                ('ns1.example.com', A): {
 
526
                    'answers': [('ns1.example.com', Record_A('10.0.0.2'))],
 
527
                },
 
528
            },
 
529
            ('10.0.0.2', 53): {
 
530
                # Third query - let it jump straight to asking the
 
531
                # delegation server by including its address here (different
 
532
                # case from the first query).
 
533
                ('example.com', A): {
 
534
                    'authority': [('example.com', Record_NS('ns2.example.com'))],
 
535
                    'additional': [('ns2.example.com', Record_A('10.0.0.3'))],
 
536
                },
 
537
            },
 
538
            ('10.0.0.3', 53): {
 
539
                # Fourth query - give it the answer, we're done.
 
540
                ('example.com', A): {
 
541
                    'answers': [('example.com', Record_A('10.0.0.4'))],
 
542
                },
 
543
            },
 
544
        }
 
545
 
 
546
        # Make two resolvers.  One which is allowed to make 3 queries
 
547
        # maximum, and so will fail, and on which may make 4, and so should
 
548
        # succeed.
 
549
        failer = self._getResolver(servers, 3)
 
550
        failD = self.assertFailure(
 
551
            failer.lookupAddress('example.com'), ResolverError)
 
552
 
 
553
        succeeder = self._getResolver(servers, 4)
 
554
        succeedD = succeeder.lookupAddress('example.com')
 
555
        succeedD.addCallback(lambda (ans, auth, add): ans[0].payload)
 
556
        succeedD.addCallback(self.assertEquals, Record_A('10.0.0.4'))
 
557
 
 
558
        return gatherResults([failD, succeedD])
 
559
 
 
560
 
 
561
    def test_discoveredAuthorityDeprecated(self):
 
562
        """
 
563
        Calling L{Resolver.discoveredAuthority} produces a deprecation warning.
 
564
        """
 
565
        resolver = Resolver([])
 
566
        d = resolver.discoveredAuthority('127.0.0.1', 'example.com', IN, A, (0,))
 
567
 
 
568
        warnings = self.flushWarnings([
 
569
                self.test_discoveredAuthorityDeprecated])
 
570
        self.assertEquals(warnings[0]['category'], DeprecationWarning)
 
571
        self.assertEquals(
 
572
            warnings[0]['message'],
 
573
            'twisted.names.root.Resolver.discoveredAuthority is deprecated since '
 
574
            'Twisted 10.0.  Use twisted.names.client.Resolver directly, instead.')
 
575
        self.assertEquals(len(warnings), 1)
 
576
 
 
577
        # This will time out quickly, but we need to wait for it because there
 
578
        # are resources associated with.
 
579
        d.addErrback(lambda ignored: None)
 
580
        return d
 
581
 
 
582
 
 
583
 
 
584
class StubDNSDatagramProtocol:
 
585
    """
 
586
    A do-nothing stand-in for L{DNSDatagramProtocol} which can be used to avoid
 
587
    network traffic in tests where that kind of thing doesn't matter.
 
588
    """
 
589
    def query(self, *a, **kw):
 
590
        return Deferred()
 
591
 
 
592
 
 
593
 
 
594
_retrySuppression = util.suppress(
 
595
    category=DeprecationWarning,
 
596
    message=(
 
597
        'twisted.names.root.retry is deprecated since Twisted 10.0.  Use a '
 
598
        'Resolver object for retry logic.'))
 
599
 
 
600
 
 
601
class DiscoveryToolsTests(TestCase):
 
602
    """
 
603
    Tests for the free functions in L{twisted.names.root} which help out with
 
604
    authority discovery.  Since these are mostly deprecated, these are mostly
 
605
    deprecation tests.
 
606
    """
 
607
    def test_lookupNameserversDeprecated(self):
 
608
        """
 
609
        Calling L{root.lookupNameservers} produces a deprecation warning.
 
610
        """
 
611
        # Don't care about the return value, since it will never have a result,
 
612
        # since StubDNSDatagramProtocol doesn't actually work.
 
613
        lookupNameservers('example.com', '127.0.0.1', StubDNSDatagramProtocol())
 
614
 
 
615
        warnings = self.flushWarnings([
 
616
                self.test_lookupNameserversDeprecated])
 
617
        self.assertEquals(warnings[0]['category'], DeprecationWarning)
 
618
        self.assertEquals(
 
619
            warnings[0]['message'],
 
620
            'twisted.names.root.lookupNameservers is deprecated since Twisted '
 
621
            '10.0.  Use twisted.names.root.Resolver.lookupNameservers '
 
622
            'instead.')
 
623
        self.assertEquals(len(warnings), 1)
 
624
    test_lookupNameserversDeprecated.suppress = [_retrySuppression]
 
625
 
 
626
 
 
627
    def test_lookupAddressDeprecated(self):
 
628
        """
 
629
        Calling L{root.lookupAddress} produces a deprecation warning.
 
630
        """
 
631
        # Don't care about the return value, since it will never have a result,
 
632
        # since StubDNSDatagramProtocol doesn't actually work.
 
633
        lookupAddress('example.com', '127.0.0.1', StubDNSDatagramProtocol())
 
634
 
 
635
        warnings = self.flushWarnings([
 
636
                self.test_lookupAddressDeprecated])
 
637
        self.assertEquals(warnings[0]['category'], DeprecationWarning)
 
638
        self.assertEquals(
 
639
            warnings[0]['message'],
 
640
            'twisted.names.root.lookupAddress is deprecated since Twisted '
 
641
            '10.0.  Use twisted.names.root.Resolver.lookupAddress '
 
642
            'instead.')
 
643
        self.assertEquals(len(warnings), 1)
 
644
    test_lookupAddressDeprecated.suppress = [_retrySuppression]
 
645
 
 
646
 
 
647
    def test_extractAuthorityDeprecated(self):
 
648
        """
 
649
        Calling L{root.extractAuthority} produces a deprecation warning.
 
650
        """
 
651
        extractAuthority(Message(), {})
 
652
 
 
653
        warnings = self.flushWarnings([
 
654
                self.test_extractAuthorityDeprecated])
 
655
        self.assertEquals(warnings[0]['category'], DeprecationWarning)
 
656
        self.assertEquals(
 
657
            warnings[0]['message'],
 
658
            'twisted.names.root.extractAuthority is deprecated since Twisted '
 
659
            '10.0.  Please inspect the Message object directly.')
 
660
        self.assertEquals(len(warnings), 1)
 
661
 
 
662
 
 
663
    def test_discoverAuthorityDeprecated(self):
 
664
        """
 
665
        Calling L{root.discoverAuthority} produces a deprecation warning.
 
666
        """
 
667
        discoverAuthority(
 
668
            'example.com', ['10.0.0.1'], p=StubDNSDatagramProtocol())
 
669
 
 
670
        warnings = self.flushWarnings([
 
671
                self.test_discoverAuthorityDeprecated])
 
672
        self.assertEquals(warnings[0]['category'], DeprecationWarning)
 
673
        self.assertEquals(
 
674
            warnings[0]['message'],
 
675
            'twisted.names.root.discoverAuthority is deprecated since Twisted '
 
676
            '10.0.  Use twisted.names.root.Resolver.lookupNameservers '
 
677
            'instead.')
 
678
        self.assertEquals(len(warnings), 1)
 
679
 
 
680
    # discoverAuthority is implemented in terms of deprecated functions,
 
681
    # too.  Ignore those.
 
682
    test_discoverAuthorityDeprecated.suppress = [
 
683
        util.suppress(
 
684
            category=DeprecationWarning,
 
685
            message=(
 
686
                'twisted.names.root.lookupNameservers is deprecated since '
 
687
                'Twisted 10.0.  Use '
 
688
                'twisted.names.root.Resolver.lookupNameservers instead.')),
 
689
        _retrySuppression]
 
690
 
 
691
 
 
692
    def test_retryDeprecated(self):
 
693
        """
 
694
        Calling L{root.retry} produces a deprecation warning.
 
695
        """
 
696
        retry([0], StubDNSDatagramProtocol())
 
697
 
 
698
        warnings = self.flushWarnings([
 
699
                self.test_retryDeprecated])
 
700
        self.assertEquals(warnings[0]['category'], DeprecationWarning)
 
701
        self.assertEquals(
 
702
            warnings[0]['message'],
 
703
            'twisted.names.root.retry is deprecated since Twisted '
 
704
            '10.0.  Use a Resolver object for retry logic.')
 
705
        self.assertEquals(len(warnings), 1)