~justin-fathomdb/nova/justinsb-openstack-api-volumes

« back to all changes in this revision

Viewing changes to vendor/Twisted-10.0.0/twisted/test/test_sip.py

  • Committer: Jesse Andrews
  • Date: 2010-05-28 06:05:26 UTC
  • Revision ID: git-v1:bf6e6e718cdc7488e2da87b21e258ccc065fe499
initial commit

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# -*- test-case-name: twisted.test.test_sip -*-
 
2
# Copyright (c) 2001-2009 Twisted Matrix Laboratories.
 
3
# See LICENSE for details.
 
4
 
 
5
 
 
6
"""Session Initialization Protocol tests."""
 
7
 
 
8
from twisted.trial import unittest, util
 
9
from twisted.protocols import sip
 
10
from twisted.internet import defer, reactor, utils
 
11
from twisted.python.versions import Version
 
12
 
 
13
from twisted.test import proto_helpers
 
14
 
 
15
from twisted import cred
 
16
import twisted.cred.portal
 
17
import twisted.cred.checkers
 
18
 
 
19
from zope.interface import implements
 
20
 
 
21
 
 
22
# request, prefixed by random CRLFs
 
23
request1 = "\n\r\n\n\r" + """\
 
24
INVITE sip:foo SIP/2.0
 
25
From: mo
 
26
To: joe
 
27
Content-Length: 4
 
28
 
 
29
abcd""".replace("\n", "\r\n")
 
30
 
 
31
# request, no content-length
 
32
request2 = """INVITE sip:foo SIP/2.0
 
33
From: mo
 
34
To: joe
 
35
 
 
36
1234""".replace("\n", "\r\n")
 
37
 
 
38
# request, with garbage after
 
39
request3 = """INVITE sip:foo SIP/2.0
 
40
From: mo
 
41
To: joe
 
42
Content-Length: 4
 
43
 
 
44
1234
 
45
 
 
46
lalalal""".replace("\n", "\r\n")
 
47
 
 
48
# three requests
 
49
request4 = """INVITE sip:foo SIP/2.0
 
50
From: mo
 
51
To: joe
 
52
Content-Length: 0
 
53
 
 
54
INVITE sip:loop SIP/2.0
 
55
From: foo
 
56
To: bar
 
57
Content-Length: 4
 
58
 
 
59
abcdINVITE sip:loop SIP/2.0
 
60
From: foo
 
61
To: bar
 
62
Content-Length: 4
 
63
 
 
64
1234""".replace("\n", "\r\n")
 
65
 
 
66
# response, no content
 
67
response1 = """SIP/2.0 200 OK
 
68
From:  foo
 
69
To:bar
 
70
Content-Length: 0
 
71
 
 
72
""".replace("\n", "\r\n")
 
73
 
 
74
# short header version
 
75
request_short = """\
 
76
INVITE sip:foo SIP/2.0
 
77
f: mo
 
78
t: joe
 
79
l: 4
 
80
 
 
81
abcd""".replace("\n", "\r\n")
 
82
 
 
83
request_natted = """\
 
84
INVITE sip:foo SIP/2.0
 
85
Via: SIP/2.0/UDP 10.0.0.1:5060;rport
 
86
 
 
87
""".replace("\n", "\r\n")
 
88
 
 
89
class TestRealm:
 
90
    def requestAvatar(self, avatarId, mind, *interfaces):
 
91
        return sip.IContact, None, lambda: None
 
92
 
 
93
class MessageParsingTestCase(unittest.TestCase):
 
94
    def setUp(self):
 
95
        self.l = []
 
96
        self.parser = sip.MessagesParser(self.l.append)
 
97
 
 
98
    def feedMessage(self, message):
 
99
        self.parser.dataReceived(message)
 
100
        self.parser.dataDone()
 
101
 
 
102
    def validateMessage(self, m, method, uri, headers, body):
 
103
        """Validate Requests."""
 
104
        self.assertEquals(m.method, method)
 
105
        self.assertEquals(m.uri.toString(), uri)
 
106
        self.assertEquals(m.headers, headers)
 
107
        self.assertEquals(m.body, body)
 
108
        self.assertEquals(m.finished, 1)
 
109
 
 
110
    def testSimple(self):
 
111
        l = self.l
 
112
        self.feedMessage(request1)
 
113
        self.assertEquals(len(l), 1)
 
114
        self.validateMessage(
 
115
            l[0], "INVITE", "sip:foo",
 
116
            {"from": ["mo"], "to": ["joe"], "content-length": ["4"]},
 
117
            "abcd")
 
118
 
 
119
    def testTwoMessages(self):
 
120
        l = self.l
 
121
        self.feedMessage(request1)
 
122
        self.feedMessage(request2)
 
123
        self.assertEquals(len(l), 2)
 
124
        self.validateMessage(
 
125
            l[0], "INVITE", "sip:foo",
 
126
            {"from": ["mo"], "to": ["joe"], "content-length": ["4"]},
 
127
            "abcd")
 
128
        self.validateMessage(l[1], "INVITE", "sip:foo",
 
129
                             {"from": ["mo"], "to": ["joe"]},
 
130
                             "1234")
 
131
 
 
132
    def testGarbage(self):
 
133
        l = self.l
 
134
        self.feedMessage(request3)
 
135
        self.assertEquals(len(l), 1)
 
136
        self.validateMessage(
 
137
            l[0], "INVITE", "sip:foo",
 
138
            {"from": ["mo"], "to": ["joe"], "content-length": ["4"]},
 
139
            "1234")
 
140
 
 
141
    def testThreeInOne(self):
 
142
        l = self.l
 
143
        self.feedMessage(request4)
 
144
        self.assertEquals(len(l), 3)
 
145
        self.validateMessage(
 
146
            l[0], "INVITE", "sip:foo",
 
147
            {"from": ["mo"], "to": ["joe"], "content-length": ["0"]},
 
148
            "")
 
149
        self.validateMessage(
 
150
            l[1], "INVITE", "sip:loop",
 
151
            {"from": ["foo"], "to": ["bar"], "content-length": ["4"]},
 
152
            "abcd")
 
153
        self.validateMessage(
 
154
            l[2], "INVITE", "sip:loop",
 
155
            {"from": ["foo"], "to": ["bar"], "content-length": ["4"]},
 
156
            "1234")
 
157
 
 
158
    def testShort(self):
 
159
        l = self.l
 
160
        self.feedMessage(request_short)
 
161
        self.assertEquals(len(l), 1)
 
162
        self.validateMessage(
 
163
            l[0], "INVITE", "sip:foo",
 
164
            {"from": ["mo"], "to": ["joe"], "content-length": ["4"]},
 
165
            "abcd")
 
166
 
 
167
    def testSimpleResponse(self):
 
168
        l = self.l
 
169
        self.feedMessage(response1)
 
170
        self.assertEquals(len(l), 1)
 
171
        m = l[0]
 
172
        self.assertEquals(m.code, 200)
 
173
        self.assertEquals(m.phrase, "OK")
 
174
        self.assertEquals(
 
175
            m.headers,
 
176
            {"from": ["foo"], "to": ["bar"], "content-length": ["0"]})
 
177
        self.assertEquals(m.body, "")
 
178
        self.assertEquals(m.finished, 1)
 
179
 
 
180
 
 
181
class MessageParsingTestCase2(MessageParsingTestCase):
 
182
    """Same as base class, but feed data char by char."""
 
183
 
 
184
    def feedMessage(self, message):
 
185
        for c in message:
 
186
            self.parser.dataReceived(c)
 
187
        self.parser.dataDone()
 
188
 
 
189
 
 
190
class MakeMessageTestCase(unittest.TestCase):
 
191
 
 
192
    def testRequest(self):
 
193
        r = sip.Request("INVITE", "sip:foo")
 
194
        r.addHeader("foo", "bar")
 
195
        self.assertEquals(
 
196
            r.toString(),
 
197
            "INVITE sip:foo SIP/2.0\r\nFoo: bar\r\n\r\n")
 
198
 
 
199
    def testResponse(self):
 
200
        r = sip.Response(200, "OK")
 
201
        r.addHeader("foo", "bar")
 
202
        r.addHeader("Content-Length", "4")
 
203
        r.bodyDataReceived("1234")
 
204
        self.assertEquals(
 
205
            r.toString(),
 
206
            "SIP/2.0 200 OK\r\nFoo: bar\r\nContent-Length: 4\r\n\r\n1234")
 
207
 
 
208
    def testStatusCode(self):
 
209
        r = sip.Response(200)
 
210
        self.assertEquals(r.toString(), "SIP/2.0 200 OK\r\n\r\n")
 
211
 
 
212
 
 
213
class ViaTestCase(unittest.TestCase):
 
214
 
 
215
    def checkRoundtrip(self, v):
 
216
        s = v.toString()
 
217
        self.assertEquals(s, sip.parseViaHeader(s).toString())
 
218
 
 
219
    def testExtraWhitespace(self):
 
220
        v1 = sip.parseViaHeader('SIP/2.0/UDP 192.168.1.1:5060')
 
221
        v2 = sip.parseViaHeader('SIP/2.0/UDP     192.168.1.1:5060')
 
222
        self.assertEquals(v1.transport, v2.transport)
 
223
        self.assertEquals(v1.host, v2.host)
 
224
        self.assertEquals(v1.port, v2.port)
 
225
 
 
226
    def test_complex(self):
 
227
        """
 
228
        Test parsing a Via header with one of everything.
 
229
        """
 
230
        s = ("SIP/2.0/UDP first.example.com:4000;ttl=16;maddr=224.2.0.1"
 
231
             " ;branch=a7c6a8dlze (Example)")
 
232
        v = sip.parseViaHeader(s)
 
233
        self.assertEquals(v.transport, "UDP")
 
234
        self.assertEquals(v.host, "first.example.com")
 
235
        self.assertEquals(v.port, 4000)
 
236
        self.assertEquals(v.rport, None)
 
237
        self.assertEquals(v.rportValue, None)
 
238
        self.assertEquals(v.rportRequested, False)
 
239
        self.assertEquals(v.ttl, 16)
 
240
        self.assertEquals(v.maddr, "224.2.0.1")
 
241
        self.assertEquals(v.branch, "a7c6a8dlze")
 
242
        self.assertEquals(v.hidden, 0)
 
243
        self.assertEquals(v.toString(),
 
244
                          "SIP/2.0/UDP first.example.com:4000"
 
245
                          ";ttl=16;branch=a7c6a8dlze;maddr=224.2.0.1")
 
246
        self.checkRoundtrip(v)
 
247
 
 
248
    def test_simple(self):
 
249
        """
 
250
        Test parsing a simple Via header.
 
251
        """
 
252
        s = "SIP/2.0/UDP example.com;hidden"
 
253
        v = sip.parseViaHeader(s)
 
254
        self.assertEquals(v.transport, "UDP")
 
255
        self.assertEquals(v.host, "example.com")
 
256
        self.assertEquals(v.port, 5060)
 
257
        self.assertEquals(v.rport, None)
 
258
        self.assertEquals(v.rportValue, None)
 
259
        self.assertEquals(v.rportRequested, False)
 
260
        self.assertEquals(v.ttl, None)
 
261
        self.assertEquals(v.maddr, None)
 
262
        self.assertEquals(v.branch, None)
 
263
        self.assertEquals(v.hidden, True)
 
264
        self.assertEquals(v.toString(),
 
265
                          "SIP/2.0/UDP example.com:5060;hidden")
 
266
        self.checkRoundtrip(v)
 
267
 
 
268
    def testSimpler(self):
 
269
        v = sip.Via("example.com")
 
270
        self.checkRoundtrip(v)
 
271
 
 
272
 
 
273
    def test_deprecatedRPort(self):
 
274
        """
 
275
        Setting rport to True is deprecated, but still produces a Via header
 
276
        with the expected properties.
 
277
        """
 
278
        v = sip.Via("foo.bar", rport=True)
 
279
 
 
280
        warnings = self.flushWarnings(
 
281
            offendingFunctions=[self.test_deprecatedRPort])
 
282
        self.assertEqual(len(warnings), 1)
 
283
        self.assertEqual(
 
284
            warnings[0]['message'],
 
285
            'rport=True is deprecated since Twisted 9.0.')
 
286
        self.assertEqual(
 
287
            warnings[0]['category'],
 
288
            DeprecationWarning)
 
289
 
 
290
        self.assertEqual(v.toString(), "SIP/2.0/UDP foo.bar:5060;rport")
 
291
        self.assertEqual(v.rport, True)
 
292
        self.assertEqual(v.rportRequested, True)
 
293
        self.assertEqual(v.rportValue, None)
 
294
 
 
295
 
 
296
    def test_rport(self):
 
297
        """
 
298
        An rport setting of None should insert the parameter with no value.
 
299
        """
 
300
        v = sip.Via("foo.bar", rport=None)
 
301
        self.assertEqual(v.toString(), "SIP/2.0/UDP foo.bar:5060;rport")
 
302
        self.assertEqual(v.rportRequested, True)
 
303
        self.assertEqual(v.rportValue, None)
 
304
 
 
305
 
 
306
    def test_rportValue(self):
 
307
        """
 
308
        An rport numeric setting should insert the parameter with the number
 
309
        value given.
 
310
        """
 
311
        v = sip.Via("foo.bar", rport=1)
 
312
        self.assertEqual(v.toString(), "SIP/2.0/UDP foo.bar:5060;rport=1")
 
313
        self.assertEqual(v.rportRequested, False)
 
314
        self.assertEqual(v.rportValue, 1)
 
315
        self.assertEqual(v.rport, 1)
 
316
 
 
317
 
 
318
    def testNAT(self):
 
319
        s = "SIP/2.0/UDP 10.0.0.1:5060;received=22.13.1.5;rport=12345"
 
320
        v = sip.parseViaHeader(s)
 
321
        self.assertEquals(v.transport, "UDP")
 
322
        self.assertEquals(v.host, "10.0.0.1")
 
323
        self.assertEquals(v.port, 5060)
 
324
        self.assertEquals(v.received, "22.13.1.5")
 
325
        self.assertEquals(v.rport, 12345)
 
326
 
 
327
        self.assertNotEquals(v.toString().find("rport=12345"), -1)
 
328
 
 
329
 
 
330
    def test_unknownParams(self):
 
331
       """
 
332
       Parsing and serializing Via headers with unknown parameters should work.
 
333
       """
 
334
       s = "SIP/2.0/UDP example.com:5060;branch=a12345b;bogus;pie=delicious"
 
335
       v = sip.parseViaHeader(s)
 
336
       self.assertEqual(v.toString(), s)
 
337
 
 
338
 
 
339
 
 
340
class URLTestCase(unittest.TestCase):
 
341
 
 
342
    def testRoundtrip(self):
 
343
        for url in [
 
344
            "sip:j.doe@big.com",
 
345
            "sip:j.doe:secret@big.com;transport=tcp",
 
346
            "sip:j.doe@big.com?subject=project",
 
347
            "sip:example.com",
 
348
            ]:
 
349
            self.assertEquals(sip.parseURL(url).toString(), url)
 
350
 
 
351
    def testComplex(self):
 
352
        s = ("sip:user:pass@hosta:123;transport=udp;user=phone;method=foo;"
 
353
             "ttl=12;maddr=1.2.3.4;blah;goo=bar?a=b&c=d")
 
354
        url = sip.parseURL(s)
 
355
        for k, v in [("username", "user"), ("password", "pass"),
 
356
                     ("host", "hosta"), ("port", 123),
 
357
                     ("transport", "udp"), ("usertype", "phone"),
 
358
                     ("method", "foo"), ("ttl", 12),
 
359
                     ("maddr", "1.2.3.4"), ("other", ["blah", "goo=bar"]),
 
360
                     ("headers", {"a": "b", "c": "d"})]:
 
361
            self.assertEquals(getattr(url, k), v)
 
362
 
 
363
 
 
364
class ParseTestCase(unittest.TestCase):
 
365
 
 
366
    def testParseAddress(self):
 
367
        for address, name, urls, params in [
 
368
            ('"A. G. Bell" <sip:foo@example.com>',
 
369
             "A. G. Bell", "sip:foo@example.com", {}),
 
370
            ("Anon <sip:foo@example.com>", "Anon", "sip:foo@example.com", {}),
 
371
            ("sip:foo@example.com", "", "sip:foo@example.com", {}),
 
372
            ("<sip:foo@example.com>", "", "sip:foo@example.com", {}),
 
373
            ("foo <sip:foo@example.com>;tag=bar;foo=baz", "foo",
 
374
             "sip:foo@example.com", {"tag": "bar", "foo": "baz"}),
 
375
            ]:
 
376
            gname, gurl, gparams = sip.parseAddress(address)
 
377
            self.assertEquals(name, gname)
 
378
            self.assertEquals(gurl.toString(), urls)
 
379
            self.assertEquals(gparams, params)
 
380
 
 
381
 
 
382
class DummyLocator:
 
383
    implements(sip.ILocator)
 
384
    def getAddress(self, logicalURL):
 
385
        return defer.succeed(sip.URL("server.com", port=5060))
 
386
 
 
387
class FailingLocator:
 
388
    implements(sip.ILocator)
 
389
    def getAddress(self, logicalURL):
 
390
        return defer.fail(LookupError())
 
391
 
 
392
 
 
393
class ProxyTestCase(unittest.TestCase):
 
394
 
 
395
    def setUp(self):
 
396
        self.proxy = sip.Proxy("127.0.0.1")
 
397
        self.proxy.locator = DummyLocator()
 
398
        self.sent = []
 
399
        self.proxy.sendMessage = lambda dest, msg: self.sent.append((dest, msg))
 
400
 
 
401
    def testRequestForward(self):
 
402
        r = sip.Request("INVITE", "sip:foo")
 
403
        r.addHeader("via", sip.Via("1.2.3.4").toString())
 
404
        r.addHeader("via", sip.Via("1.2.3.5").toString())
 
405
        r.addHeader("foo", "bar")
 
406
        r.addHeader("to", "<sip:joe@server.com>")
 
407
        r.addHeader("contact", "<sip:joe@1.2.3.5>")
 
408
        self.proxy.datagramReceived(r.toString(), ("1.2.3.4", 5060))
 
409
        self.assertEquals(len(self.sent), 1)
 
410
        dest, m = self.sent[0]
 
411
        self.assertEquals(dest.port, 5060)
 
412
        self.assertEquals(dest.host, "server.com")
 
413
        self.assertEquals(m.uri.toString(), "sip:foo")
 
414
        self.assertEquals(m.method, "INVITE")
 
415
        self.assertEquals(m.headers["via"],
 
416
                          ["SIP/2.0/UDP 127.0.0.1:5060",
 
417
                           "SIP/2.0/UDP 1.2.3.4:5060",
 
418
                           "SIP/2.0/UDP 1.2.3.5:5060"])
 
419
 
 
420
 
 
421
    def testReceivedRequestForward(self):
 
422
        r = sip.Request("INVITE", "sip:foo")
 
423
        r.addHeader("via", sip.Via("1.2.3.4").toString())
 
424
        r.addHeader("foo", "bar")
 
425
        r.addHeader("to", "<sip:joe@server.com>")
 
426
        r.addHeader("contact", "<sip:joe@1.2.3.4>")
 
427
        self.proxy.datagramReceived(r.toString(), ("1.1.1.1", 5060))
 
428
        dest, m = self.sent[0]
 
429
        self.assertEquals(m.headers["via"],
 
430
                          ["SIP/2.0/UDP 127.0.0.1:5060",
 
431
                           "SIP/2.0/UDP 1.2.3.4:5060;received=1.1.1.1"])
 
432
 
 
433
 
 
434
    def testResponseWrongVia(self):
 
435
        # first via must match proxy's address
 
436
        r = sip.Response(200)
 
437
        r.addHeader("via", sip.Via("foo.com").toString())
 
438
        self.proxy.datagramReceived(r.toString(), ("1.1.1.1", 5060))
 
439
        self.assertEquals(len(self.sent), 0)
 
440
 
 
441
    def testResponseForward(self):
 
442
        r = sip.Response(200)
 
443
        r.addHeader("via", sip.Via("127.0.0.1").toString())
 
444
        r.addHeader("via", sip.Via("client.com", port=1234).toString())
 
445
        self.proxy.datagramReceived(r.toString(), ("1.1.1.1", 5060))
 
446
        self.assertEquals(len(self.sent), 1)
 
447
        dest, m = self.sent[0]
 
448
        self.assertEquals((dest.host, dest.port), ("client.com", 1234))
 
449
        self.assertEquals(m.code, 200)
 
450
        self.assertEquals(m.headers["via"], ["SIP/2.0/UDP client.com:1234"])
 
451
 
 
452
    def testReceivedResponseForward(self):
 
453
        r = sip.Response(200)
 
454
        r.addHeader("via", sip.Via("127.0.0.1").toString())
 
455
        r.addHeader(
 
456
            "via",
 
457
            sip.Via("10.0.0.1", received="client.com").toString())
 
458
        self.proxy.datagramReceived(r.toString(), ("1.1.1.1", 5060))
 
459
        self.assertEquals(len(self.sent), 1)
 
460
        dest, m = self.sent[0]
 
461
        self.assertEquals((dest.host, dest.port), ("client.com", 5060))
 
462
 
 
463
    def testResponseToUs(self):
 
464
        r = sip.Response(200)
 
465
        r.addHeader("via", sip.Via("127.0.0.1").toString())
 
466
        l = []
 
467
        self.proxy.gotResponse = lambda *a: l.append(a)
 
468
        self.proxy.datagramReceived(r.toString(), ("1.1.1.1", 5060))
 
469
        self.assertEquals(len(l), 1)
 
470
        m, addr = l[0]
 
471
        self.assertEquals(len(m.headers.get("via", [])), 0)
 
472
        self.assertEquals(m.code, 200)
 
473
 
 
474
    def testLoop(self):
 
475
        r = sip.Request("INVITE", "sip:foo")
 
476
        r.addHeader("via", sip.Via("1.2.3.4").toString())
 
477
        r.addHeader("via", sip.Via("127.0.0.1").toString())
 
478
        self.proxy.datagramReceived(r.toString(), ("client.com", 5060))
 
479
        self.assertEquals(self.sent, [])
 
480
 
 
481
    def testCantForwardRequest(self):
 
482
        r = sip.Request("INVITE", "sip:foo")
 
483
        r.addHeader("via", sip.Via("1.2.3.4").toString())
 
484
        r.addHeader("to", "<sip:joe@server.com>")
 
485
        self.proxy.locator = FailingLocator()
 
486
        self.proxy.datagramReceived(r.toString(), ("1.2.3.4", 5060))
 
487
        self.assertEquals(len(self.sent), 1)
 
488
        dest, m = self.sent[0]
 
489
        self.assertEquals((dest.host, dest.port), ("1.2.3.4", 5060))
 
490
        self.assertEquals(m.code, 404)
 
491
        self.assertEquals(m.headers["via"], ["SIP/2.0/UDP 1.2.3.4:5060"])
 
492
 
 
493
    def testCantForwardResponse(self):
 
494
        pass
 
495
 
 
496
    #testCantForwardResponse.skip = "not implemented yet"
 
497
 
 
498
 
 
499
class RegistrationTestCase(unittest.TestCase):
 
500
 
 
501
    def setUp(self):
 
502
        self.proxy = sip.RegisterProxy(host="127.0.0.1")
 
503
        self.registry = sip.InMemoryRegistry("bell.example.com")
 
504
        self.proxy.registry = self.proxy.locator = self.registry
 
505
        self.sent = []
 
506
        self.proxy.sendMessage = lambda dest, msg: self.sent.append((dest, msg))
 
507
    setUp = utils.suppressWarnings(setUp,
 
508
        util.suppress(category=DeprecationWarning,
 
509
            message=r'twisted.protocols.sip.DigestAuthorizer was deprecated'))
 
510
 
 
511
    def tearDown(self):
 
512
        for d, uri in self.registry.users.values():
 
513
            d.cancel()
 
514
        del self.proxy
 
515
 
 
516
    def register(self):
 
517
        r = sip.Request("REGISTER", "sip:bell.example.com")
 
518
        r.addHeader("to", "sip:joe@bell.example.com")
 
519
        r.addHeader("contact", "sip:joe@client.com:1234")
 
520
        r.addHeader("via", sip.Via("client.com").toString())
 
521
        self.proxy.datagramReceived(r.toString(), ("client.com", 5060))
 
522
 
 
523
    def unregister(self):
 
524
        r = sip.Request("REGISTER", "sip:bell.example.com")
 
525
        r.addHeader("to", "sip:joe@bell.example.com")
 
526
        r.addHeader("contact", "*")
 
527
        r.addHeader("via", sip.Via("client.com").toString())
 
528
        r.addHeader("expires", "0")
 
529
        self.proxy.datagramReceived(r.toString(), ("client.com", 5060))
 
530
 
 
531
    def testRegister(self):
 
532
        self.register()
 
533
        dest, m = self.sent[0]
 
534
        self.assertEquals((dest.host, dest.port), ("client.com", 5060))
 
535
        self.assertEquals(m.code, 200)
 
536
        self.assertEquals(m.headers["via"], ["SIP/2.0/UDP client.com:5060"])
 
537
        self.assertEquals(m.headers["to"], ["sip:joe@bell.example.com"])
 
538
        self.assertEquals(m.headers["contact"], ["sip:joe@client.com:5060"])
 
539
        self.failUnless(
 
540
            int(m.headers["expires"][0]) in (3600, 3601, 3599, 3598))
 
541
        self.assertEquals(len(self.registry.users), 1)
 
542
        dc, uri = self.registry.users["joe"]
 
543
        self.assertEquals(uri.toString(), "sip:joe@client.com:5060")
 
544
        d = self.proxy.locator.getAddress(sip.URL(username="joe",
 
545
                                                  host="bell.example.com"))
 
546
        d.addCallback(lambda desturl : (desturl.host, desturl.port))
 
547
        d.addCallback(self.assertEquals, ('client.com', 5060))
 
548
        return d
 
549
 
 
550
    def testUnregister(self):
 
551
        self.register()
 
552
        self.unregister()
 
553
        dest, m = self.sent[1]
 
554
        self.assertEquals((dest.host, dest.port), ("client.com", 5060))
 
555
        self.assertEquals(m.code, 200)
 
556
        self.assertEquals(m.headers["via"], ["SIP/2.0/UDP client.com:5060"])
 
557
        self.assertEquals(m.headers["to"], ["sip:joe@bell.example.com"])
 
558
        self.assertEquals(m.headers["contact"], ["sip:joe@client.com:5060"])
 
559
        self.assertEquals(m.headers["expires"], ["0"])
 
560
        self.assertEquals(self.registry.users, {})
 
561
 
 
562
 
 
563
    def addPortal(self):
 
564
        r = TestRealm()
 
565
        p = cred.portal.Portal(r)
 
566
        c = cred.checkers.InMemoryUsernamePasswordDatabaseDontUse()
 
567
        c.addUser('userXname@127.0.0.1', 'passXword')
 
568
        p.registerChecker(c)
 
569
        self.proxy.portal = p
 
570
 
 
571
    def testFailedAuthentication(self):
 
572
        self.addPortal()
 
573
        self.register()
 
574
 
 
575
        self.assertEquals(len(self.registry.users), 0)
 
576
        self.assertEquals(len(self.sent), 1)
 
577
        dest, m = self.sent[0]
 
578
        self.assertEquals(m.code, 401)
 
579
 
 
580
 
 
581
    def test_basicAuthentication(self):
 
582
        """
 
583
        Test that registration with basic authentication suceeds.
 
584
        """
 
585
        self.addPortal()
 
586
        self.proxy.authorizers = self.proxy.authorizers.copy()
 
587
        self.proxy.authorizers['basic'] = sip.BasicAuthorizer()
 
588
        warnings = self.flushWarnings(
 
589
            offendingFunctions=[self.test_basicAuthentication])
 
590
        self.assertEqual(len(warnings), 1)
 
591
        self.assertEqual(
 
592
            warnings[0]['message'],
 
593
            "twisted.protocols.sip.BasicAuthorizer was deprecated in "
 
594
            "Twisted 9.0.0")
 
595
        self.assertEqual(
 
596
            warnings[0]['category'],
 
597
            DeprecationWarning)
 
598
        r = sip.Request("REGISTER", "sip:bell.example.com")
 
599
        r.addHeader("to", "sip:joe@bell.example.com")
 
600
        r.addHeader("contact", "sip:joe@client.com:1234")
 
601
        r.addHeader("via", sip.Via("client.com").toString())
 
602
        r.addHeader("authorization",
 
603
                    "Basic " + "userXname:passXword".encode('base64'))
 
604
        self.proxy.datagramReceived(r.toString(), ("client.com", 5060))
 
605
 
 
606
        self.assertEquals(len(self.registry.users), 1)
 
607
        self.assertEquals(len(self.sent), 1)
 
608
        dest, m = self.sent[0]
 
609
        self.assertEquals(m.code, 200)
 
610
 
 
611
 
 
612
    def test_failedBasicAuthentication(self):
 
613
        """
 
614
        Failed registration with basic authentication results in an
 
615
        unauthorized error response.
 
616
        """
 
617
        self.addPortal()
 
618
        self.proxy.authorizers = self.proxy.authorizers.copy()
 
619
        self.proxy.authorizers['basic'] = sip.BasicAuthorizer()
 
620
        warnings = self.flushWarnings(
 
621
            offendingFunctions=[self.test_failedBasicAuthentication])
 
622
        self.assertEqual(len(warnings), 1)
 
623
        self.assertEqual(
 
624
            warnings[0]['message'],
 
625
            "twisted.protocols.sip.BasicAuthorizer was deprecated in "
 
626
            "Twisted 9.0.0")
 
627
        self.assertEqual(
 
628
            warnings[0]['category'],
 
629
            DeprecationWarning)
 
630
        r = sip.Request("REGISTER", "sip:bell.example.com")
 
631
        r.addHeader("to", "sip:joe@bell.example.com")
 
632
        r.addHeader("contact", "sip:joe@client.com:1234")
 
633
        r.addHeader("via", sip.Via("client.com").toString())
 
634
        r.addHeader(
 
635
            "authorization", "Basic " + "userXname:password".encode('base64'))
 
636
        self.proxy.datagramReceived(r.toString(), ("client.com", 5060))
 
637
 
 
638
        self.assertEquals(len(self.registry.users), 0)
 
639
        self.assertEquals(len(self.sent), 1)
 
640
        dest, m = self.sent[0]
 
641
        self.assertEquals(m.code, 401)
 
642
 
 
643
 
 
644
    def testWrongDomainRegister(self):
 
645
        r = sip.Request("REGISTER", "sip:wrong.com")
 
646
        r.addHeader("to", "sip:joe@bell.example.com")
 
647
        r.addHeader("contact", "sip:joe@client.com:1234")
 
648
        r.addHeader("via", sip.Via("client.com").toString())
 
649
        self.proxy.datagramReceived(r.toString(), ("client.com", 5060))
 
650
        self.assertEquals(len(self.sent), 0)
 
651
 
 
652
    def testWrongToDomainRegister(self):
 
653
        r = sip.Request("REGISTER", "sip:bell.example.com")
 
654
        r.addHeader("to", "sip:joe@foo.com")
 
655
        r.addHeader("contact", "sip:joe@client.com:1234")
 
656
        r.addHeader("via", sip.Via("client.com").toString())
 
657
        self.proxy.datagramReceived(r.toString(), ("client.com", 5060))
 
658
        self.assertEquals(len(self.sent), 0)
 
659
 
 
660
    def testWrongDomainLookup(self):
 
661
        self.register()
 
662
        url = sip.URL(username="joe", host="foo.com")
 
663
        d = self.proxy.locator.getAddress(url)
 
664
        self.assertFailure(d, LookupError)
 
665
        return d
 
666
 
 
667
    def testNoContactLookup(self):
 
668
        self.register()
 
669
        url = sip.URL(username="jane", host="bell.example.com")
 
670
        d = self.proxy.locator.getAddress(url)
 
671
        self.assertFailure(d, LookupError)
 
672
        return d
 
673
 
 
674
 
 
675
class Client(sip.Base):
 
676
 
 
677
    def __init__(self):
 
678
        sip.Base.__init__(self)
 
679
        self.received = []
 
680
        self.deferred = defer.Deferred()
 
681
 
 
682
    def handle_response(self, response, addr):
 
683
        self.received.append(response)
 
684
        self.deferred.callback(self.received)
 
685
 
 
686
 
 
687
class LiveTest(unittest.TestCase):
 
688
 
 
689
    def setUp(self):
 
690
        self.proxy = sip.RegisterProxy(host="127.0.0.1")
 
691
        self.registry = sip.InMemoryRegistry("bell.example.com")
 
692
        self.proxy.registry = self.proxy.locator = self.registry
 
693
        self.serverPort = reactor.listenUDP(
 
694
            0, self.proxy, interface="127.0.0.1")
 
695
        self.client = Client()
 
696
        self.clientPort = reactor.listenUDP(
 
697
            0, self.client, interface="127.0.0.1")
 
698
        self.serverAddress = (self.serverPort.getHost().host,
 
699
                              self.serverPort.getHost().port)
 
700
    setUp = utils.suppressWarnings(setUp,
 
701
        util.suppress(category=DeprecationWarning,
 
702
            message=r'twisted.protocols.sip.DigestAuthorizer was deprecated'))
 
703
 
 
704
    def tearDown(self):
 
705
        for d, uri in self.registry.users.values():
 
706
            d.cancel()
 
707
        d1 = defer.maybeDeferred(self.clientPort.stopListening)
 
708
        d2 = defer.maybeDeferred(self.serverPort.stopListening)
 
709
        return defer.gatherResults([d1, d2])
 
710
 
 
711
    def testRegister(self):
 
712
        p = self.clientPort.getHost().port
 
713
        r = sip.Request("REGISTER", "sip:bell.example.com")
 
714
        r.addHeader("to", "sip:joe@bell.example.com")
 
715
        r.addHeader("contact", "sip:joe@127.0.0.1:%d" % p)
 
716
        r.addHeader("via", sip.Via("127.0.0.1", port=p).toString())
 
717
        self.client.sendMessage(
 
718
            sip.URL(host="127.0.0.1", port=self.serverAddress[1]), r)
 
719
        d = self.client.deferred
 
720
        def check(received):
 
721
            self.assertEquals(len(received), 1)
 
722
            r = received[0]
 
723
            self.assertEquals(r.code, 200)
 
724
        d.addCallback(check)
 
725
        return d
 
726
 
 
727
    def test_amoralRPort(self):
 
728
        """
 
729
        rport is allowed without a value, apparently because server
 
730
        implementors might be too stupid to check the received port
 
731
        against 5060 and see if they're equal, and because client
 
732
        implementors might be too stupid to bind to port 5060, or set a
 
733
        value on the rport parameter they send if they bind to another
 
734
        port.
 
735
        """
 
736
        p = self.clientPort.getHost().port
 
737
        r = sip.Request("REGISTER", "sip:bell.example.com")
 
738
        r.addHeader("to", "sip:joe@bell.example.com")
 
739
        r.addHeader("contact", "sip:joe@127.0.0.1:%d" % p)
 
740
        r.addHeader("via", sip.Via("127.0.0.1", port=p, rport=True).toString())
 
741
        warnings = self.flushWarnings(
 
742
            offendingFunctions=[self.test_amoralRPort])
 
743
        self.assertEqual(len(warnings), 1)
 
744
        self.assertEqual(
 
745
            warnings[0]['message'],
 
746
            'rport=True is deprecated since Twisted 9.0.')
 
747
        self.assertEqual(
 
748
            warnings[0]['category'],
 
749
            DeprecationWarning)
 
750
        self.client.sendMessage(sip.URL(host="127.0.0.1",
 
751
                                        port=self.serverAddress[1]),
 
752
                                r)
 
753
        d = self.client.deferred
 
754
        def check(received):
 
755
            self.assertEquals(len(received), 1)
 
756
            r = received[0]
 
757
            self.assertEquals(r.code, 200)
 
758
        d.addCallback(check)
 
759
        return d
 
760
 
 
761
 
 
762
 
 
763
registerRequest = """
 
764
REGISTER sip:intarweb.us SIP/2.0\r
 
765
Via: SIP/2.0/UDP 192.168.1.100:50609\r
 
766
From: <sip:exarkun@intarweb.us:50609>\r
 
767
To: <sip:exarkun@intarweb.us:50609>\r
 
768
Contact: "exarkun" <sip:exarkun@192.168.1.100:50609>\r
 
769
Call-ID: 94E7E5DAF39111D791C6000393764646@intarweb.us\r
 
770
CSeq: 9898 REGISTER\r
 
771
Expires: 500\r
 
772
User-Agent: X-Lite build 1061\r
 
773
Content-Length: 0\r
 
774
\r
 
775
"""
 
776
 
 
777
challengeResponse = """\
 
778
SIP/2.0 401 Unauthorized\r
 
779
Via: SIP/2.0/UDP 192.168.1.100:50609;received=127.0.0.1;rport=5632\r
 
780
To: <sip:exarkun@intarweb.us:50609>\r
 
781
From: <sip:exarkun@intarweb.us:50609>\r
 
782
Call-ID: 94E7E5DAF39111D791C6000393764646@intarweb.us\r
 
783
CSeq: 9898 REGISTER\r
 
784
WWW-Authenticate: Digest nonce="92956076410767313901322208775",opaque="1674186428",qop-options="auth",algorithm="MD5",realm="intarweb.us"\r
 
785
\r
 
786
"""
 
787
 
 
788
authRequest = """\
 
789
REGISTER sip:intarweb.us SIP/2.0\r
 
790
Via: SIP/2.0/UDP 192.168.1.100:50609\r
 
791
From: <sip:exarkun@intarweb.us:50609>\r
 
792
To: <sip:exarkun@intarweb.us:50609>\r
 
793
Contact: "exarkun" <sip:exarkun@192.168.1.100:50609>\r
 
794
Call-ID: 94E7E5DAF39111D791C6000393764646@intarweb.us\r
 
795
CSeq: 9899 REGISTER\r
 
796
Expires: 500\r
 
797
Authorization: Digest username="exarkun",realm="intarweb.us",nonce="92956076410767313901322208775",response="4a47980eea31694f997369214292374b",uri="sip:intarweb.us",algorithm=MD5,opaque="1674186428"\r
 
798
User-Agent: X-Lite build 1061\r
 
799
Content-Length: 0\r
 
800
\r
 
801
"""
 
802
 
 
803
okResponse = """\
 
804
SIP/2.0 200 OK\r
 
805
Via: SIP/2.0/UDP 192.168.1.100:50609;received=127.0.0.1;rport=5632\r
 
806
To: <sip:exarkun@intarweb.us:50609>\r
 
807
From: <sip:exarkun@intarweb.us:50609>\r
 
808
Call-ID: 94E7E5DAF39111D791C6000393764646@intarweb.us\r
 
809
CSeq: 9899 REGISTER\r
 
810
Contact: sip:exarkun@127.0.0.1:5632\r
 
811
Expires: 3600\r
 
812
Content-Length: 0\r
 
813
\r
 
814
"""
 
815
 
 
816
class FakeDigestAuthorizer(sip.DigestAuthorizer):
 
817
    def generateNonce(self):
 
818
        return '92956076410767313901322208775'
 
819
    def generateOpaque(self):
 
820
        return '1674186428'
 
821
 
 
822
 
 
823
class FakeRegistry(sip.InMemoryRegistry):
 
824
    """Make sure expiration is always seen to be 3600.
 
825
 
 
826
    Otherwise slow reactors fail tests incorrectly.
 
827
    """
 
828
 
 
829
    def _cbReg(self, reg):
 
830
        if 3600 < reg.secondsToExpiry or reg.secondsToExpiry < 3598:
 
831
            raise RuntimeError(
 
832
                "bad seconds to expire: %s" % reg.secondsToExpiry)
 
833
        reg.secondsToExpiry = 3600
 
834
        return reg
 
835
 
 
836
    def getRegistrationInfo(self, uri):
 
837
        d = sip.InMemoryRegistry.getRegistrationInfo(self, uri)
 
838
        return d.addCallback(self._cbReg)
 
839
 
 
840
    def registerAddress(self, domainURL, logicalURL, physicalURL):
 
841
        d = sip.InMemoryRegistry.registerAddress(
 
842
            self, domainURL, logicalURL, physicalURL)
 
843
        return d.addCallback(self._cbReg)
 
844
 
 
845
class AuthorizationTestCase(unittest.TestCase):
 
846
    def setUp(self):
 
847
        self.proxy = sip.RegisterProxy(host="intarweb.us")
 
848
        self.proxy.authorizers = self.proxy.authorizers.copy()
 
849
        self.proxy.authorizers['digest'] = FakeDigestAuthorizer()
 
850
 
 
851
        self.registry = FakeRegistry("intarweb.us")
 
852
        self.proxy.registry = self.proxy.locator = self.registry
 
853
        self.transport = proto_helpers.FakeDatagramTransport()
 
854
        self.proxy.transport = self.transport
 
855
 
 
856
        r = TestRealm()
 
857
        p = cred.portal.Portal(r)
 
858
        c = cred.checkers.InMemoryUsernamePasswordDatabaseDontUse()
 
859
        c.addUser('exarkun@intarweb.us', 'password')
 
860
        p.registerChecker(c)
 
861
        self.proxy.portal = p
 
862
    setUp = utils.suppressWarnings(setUp,
 
863
        util.suppress(category=DeprecationWarning,
 
864
            message=r'twisted.protocols.sip.DigestAuthorizer was deprecated'))
 
865
 
 
866
    def tearDown(self):
 
867
        for d, uri in self.registry.users.values():
 
868
            d.cancel()
 
869
        del self.proxy
 
870
 
 
871
    def testChallenge(self):
 
872
        self.proxy.datagramReceived(registerRequest, ("127.0.0.1", 5632))
 
873
 
 
874
        self.assertEquals(
 
875
            self.transport.written[-1],
 
876
            ((challengeResponse, ("127.0.0.1", 5632)))
 
877
        )
 
878
        self.transport.written = []
 
879
 
 
880
        self.proxy.datagramReceived(authRequest, ("127.0.0.1", 5632))
 
881
 
 
882
        self.assertEquals(
 
883
            self.transport.written[-1],
 
884
            ((okResponse, ("127.0.0.1", 5632)))
 
885
        )
 
886
    testChallenge.suppress = [
 
887
        util.suppress(
 
888
            category=DeprecationWarning,
 
889
            message=r'twisted.protocols.sip.DigestAuthorizer was deprecated'),
 
890
        util.suppress(
 
891
            category=DeprecationWarning,
 
892
            message=r'twisted.protocols.sip.DigestedCredentials was deprecated'),
 
893
        util.suppress(
 
894
            category=DeprecationWarning,
 
895
            message=r'twisted.protocols.sip.DigestCalcHA1 was deprecated'),
 
896
        util.suppress(
 
897
            category=DeprecationWarning,
 
898
            message=r'twisted.protocols.sip.DigestCalcResponse was deprecated')]
 
899
 
 
900
 
 
901
 
 
902
class DeprecationTests(unittest.TestCase):
 
903
    """
 
904
    Tests for deprecation of obsolete components of L{twisted.protocols.sip}.
 
905
    """
 
906
 
 
907
    def test_deprecatedDigestCalcHA1(self):
 
908
        """
 
909
        L{sip.DigestCalcHA1} is deprecated.
 
910
        """
 
911
        self.callDeprecated(Version("Twisted", 9, 0, 0),
 
912
                            sip.DigestCalcHA1, '', '', '', '', '', '')
 
913
 
 
914
 
 
915
    def test_deprecatedDigestCalcResponse(self):
 
916
        """
 
917
        L{sip.DigestCalcResponse} is deprecated.
 
918
        """
 
919
        self.callDeprecated(Version("Twisted", 9, 0, 0),
 
920
                            sip.DigestCalcResponse, '', '', '', '', '', '', '',
 
921
                            '')
 
922
 
 
923
    def test_deprecatedBasicAuthorizer(self):
 
924
        """
 
925
        L{sip.BasicAuthorizer} is deprecated.
 
926
        """
 
927
        self.callDeprecated(Version("Twisted", 9, 0, 0), sip.BasicAuthorizer)
 
928
 
 
929
 
 
930
    def test_deprecatedDigestAuthorizer(self):
 
931
        """
 
932
        L{sip.DigestAuthorizer} is deprecated.
 
933
        """
 
934
        self.callDeprecated(Version("Twisted", 9, 0, 0), sip.DigestAuthorizer)
 
935
 
 
936
 
 
937
    def test_deprecatedDigestedCredentials(self):
 
938
        """
 
939
        L{sip.DigestedCredentials} is deprecated.
 
940
        """
 
941
        self.callDeprecated(Version("Twisted", 9, 0, 0),
 
942
                            sip.DigestedCredentials, '', {}, {})