~certify-web-dev/twisted/certify-staging

« back to all changes in this revision

Viewing changes to twisted/test/test_amp.py

  • Committer: Bazaar Package Importer
  • Author(s): Matthias Klose
  • Date: 2010-01-02 19:38:17 UTC
  • mfrom: (2.2.4 sid)
  • Revision ID: james.westby@ubuntu.com-20100102193817-jphp464ppwh7dulg
Tags: 9.0.0-1
* python-twisted: Depend on the python-twisted-* 9.0 packages.
* python-twisted: Depend on python-zope.interface only. Closes: #557781.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
1
# Copyright (c) 2005 Divmod, Inc.
2
 
# Copyright (c) 2007-2008 Twisted Matrix Laboratories.
 
2
# Copyright (c) 2007-2009 Twisted Matrix Laboratories.
3
3
# See LICENSE for details.
4
4
 
5
5
"""
6
6
Tests for L{twisted.protocols.amp}.
7
7
"""
8
8
 
 
9
from zope.interface.verify import verifyObject
 
10
 
 
11
from twisted.python.util import setIDFunction
9
12
from twisted.python import filepath
10
13
from twisted.python.failure import Failure
11
14
from twisted.protocols import amp
14
17
from twisted.test import iosim
15
18
from twisted.test.proto_helpers import StringTransport
16
19
 
 
20
try:
 
21
    from twisted.internet import ssl
 
22
except ImportError:
 
23
    ssl = None
 
24
if ssl and not ssl.supported:
 
25
    ssl = None
 
26
 
 
27
if ssl is None:
 
28
    skipSSL = "SSL not available"
 
29
else:
 
30
    skipSSL = None
 
31
 
17
32
 
18
33
class TestProto(protocol.Protocol):
 
34
    """
 
35
    A trivial protocol for use in testing where a L{Protocol} is expected.
 
36
 
 
37
    @ivar instanceId: the id of this instance
 
38
    @ivar onConnLost: deferred that will fired when the connection is lost
 
39
    @ivar dataToSend: data to send on the protocol
 
40
    """
 
41
 
 
42
    instanceCount = 0
 
43
 
19
44
    def __init__(self, onConnLost, dataToSend):
20
45
        self.onConnLost = onConnLost
21
46
        self.dataToSend = dataToSend
 
47
        self.instanceId = TestProto.instanceCount
 
48
        TestProto.instanceCount = TestProto.instanceCount + 1
22
49
 
23
50
    def connectionMade(self):
24
51
        self.data = []
32
59
        self.onConnLost.callback(self.data)
33
60
 
34
61
 
 
62
    def __repr__(self):
 
63
        """
 
64
        Custom repr for testing to avoid coupling amp tests with repr from 
 
65
        L{Protocol}
 
66
        
 
67
        Returns a string which contains a unique identifier that can be looked
 
68
        up using the instanceId property::
 
69
 
 
70
            <TestProto #3>
 
71
        """
 
72
        return "<TestProto #%d>" % (self.instanceId,)
 
73
 
 
74
 
35
75
 
36
76
class SimpleSymmetricProtocol(amp.AMP):
37
77
 
146
186
    arguments = [('length', amp.Integer())]
147
187
    response = [('body', amp.AmpList([('x', amp.Integer())]))]
148
188
 
 
189
class DontRejectMe(amp.Command):
 
190
    commandName = 'dontrejectme'
 
191
    arguments = [
 
192
            ('magicWord', amp.Unicode()),
 
193
            ('list', amp.AmpList([('name', amp.Unicode())], optional=True)),
 
194
            ]
 
195
    response = [('response', amp.Unicode())]
 
196
 
149
197
class SecuredPing(amp.Command):
150
198
    # XXX TODO: actually make this refuse to send over an insecure connection
151
199
    response = [('pinged', amp.Boolean())]
227
275
        return {'body': [dict(x=1)] * length}
228
276
    GetList.responder(cmdGetlist)
229
277
 
 
278
    def okiwont(self, magicWord, list):
 
279
        return dict(response=u'%s accepted' % (list[0]['name']))
 
280
    DontRejectMe.responder(okiwont)
 
281
 
230
282
    def waitforit(self):
231
283
        self.waiting = defer.Deferred()
232
284
        return self.waiting
654
706
SWITCH_SERVER_DATA = 'No, really.  Success.'
655
707
 
656
708
 
657
 
from twisted.test.proto_helpers import StringTransport
658
 
 
659
 
 
660
709
class BinaryProtocolTests(unittest.TestCase):
661
710
    """
662
711
    Tests for L{amp.BinaryBoxProtocol}.
1025
1074
        self.assertEquals(L[0]['Print'], HELLO_UNICODE)
1026
1075
 
1027
1076
 
 
1077
    def test_callRemoteStringRequiresAnswerFalse(self):
 
1078
        """
 
1079
        L{BoxDispatcher.callRemoteString} returns C{None} if C{requiresAnswer}
 
1080
        is C{False}.
 
1081
        """
 
1082
        c, s, p = connectedServerAndClient()
 
1083
        ret = c.callRemoteString("WTF", requiresAnswer=False)
 
1084
        self.assertIdentical(ret, None)
 
1085
 
 
1086
 
1028
1087
    def test_unknownCommandLow(self):
1029
1088
        """
1030
1089
        Verify that unknown commands using low-level APIs will be rejected with an
1084
1143
        self.failUnlessIn('None', repr(L[0].value))
1085
1144
 
1086
1145
 
1087
 
 
1088
1146
    def test_unknownArgument(self):
1089
1147
        """
1090
1148
        Verify that unknown arguments are ignored, and not passed to a Python
1108
1166
        """
1109
1167
        Verify that the various Box objects repr properly, for debugging.
1110
1168
        """
1111
 
        self.assertEquals(type(repr(amp._TLSBox())), str)
1112
1169
        self.assertEquals(type(repr(amp._SwitchBox('a'))), str)
1113
1170
        self.assertEquals(type(repr(amp.QuitBox())), str)
1114
1171
        self.assertEquals(type(repr(amp.AmpBox())), str)
1115
1172
        self.failUnless("AmpBox" in repr(amp.AmpBox()))
1116
1173
 
 
1174
 
 
1175
    def test_innerProtocolInRepr(self):
 
1176
        """
 
1177
        Verify that L{AMP} objects output their innerProtocol when set.
 
1178
        """
 
1179
        otherProto = TestProto(None, "outgoing data")
 
1180
        a = amp.AMP()
 
1181
        a.innerProtocol = otherProto
 
1182
        def fakeID(obj):
 
1183
            return {a: 0x1234}.get(obj, id(obj))
 
1184
        self.addCleanup(setIDFunction, setIDFunction(fakeID))
 
1185
 
 
1186
        self.assertEquals(
 
1187
            repr(a), "<AMP inner <TestProto #%d> at 0x1234>" % (
 
1188
                otherProto.instanceId,))
 
1189
 
 
1190
 
 
1191
    def test_innerProtocolNotInRepr(self):
 
1192
        """
 
1193
        Verify that L{AMP} objects do not output 'inner' when no innerProtocol
 
1194
        is set.
 
1195
        """
 
1196
        a = amp.AMP()
 
1197
        def fakeID(obj):
 
1198
            return {a: 0x4321}.get(obj, id(obj))
 
1199
        self.addCleanup(setIDFunction, setIDFunction(fakeID))
 
1200
        self.assertEquals(repr(a), "<AMP at 0x4321>")
 
1201
 
 
1202
 
 
1203
    def test_simpleSSLRepr(self):
 
1204
        """
 
1205
        L{amp._TLSBox.__repr__} returns a string.
 
1206
        """
 
1207
        self.assertEquals(type(repr(amp._TLSBox())), str)
 
1208
 
 
1209
    test_simpleSSLRepr.skip = skipSSL
 
1210
 
 
1211
 
1117
1212
    def test_keyTooLong(self):
1118
1213
        """
1119
1214
        Verify that a key that is too long will immediately raise a synchronous
1145
1240
        p.flush()
1146
1241
        self.failIf(tl.isKey)
1147
1242
        self.failUnless(tl.isLocal)
1148
 
        self.failUnlessIdentical(tl.keyName, 'hello')
 
1243
        self.assertEquals(tl.keyName, 'hello')
1149
1244
        self.failUnlessIdentical(tl.value, x)
1150
1245
        self.failUnless(str(len(x)) in repr(tl))
1151
1246
        self.failUnless("value" in repr(tl))
1332
1427
        self.assertEquals(values, [{'x': 1}] * 10)
1333
1428
 
1334
1429
 
 
1430
    def test_optionalAmpListOmitted(self):
 
1431
        """
 
1432
        Test that sending a command with an omitted AmpList argument that is
 
1433
        designated as optional does not raise an InvalidSignature error.
 
1434
        """
 
1435
        dontRejectMeCommand = DontRejectMe(magicWord=u'please')
 
1436
   
 
1437
 
 
1438
    def test_optionalAmpListPresent(self):
 
1439
        """
 
1440
        Sanity check that optional AmpList arguments are processed normally.
 
1441
        """
 
1442
        dontRejectMeCommand = DontRejectMe(magicWord=u'please',
 
1443
                list=[{'name': 'foo'}])
 
1444
        c, s, p = connectedServerAndClient(
 
1445
            ServerClass=SimpleSymmetricCommandProtocol,
 
1446
            ClientClass=SimpleSymmetricCommandProtocol)
 
1447
        L = []
 
1448
        c.callRemote(DontRejectMe, magicWord=u'please',
 
1449
                list=[{'name': 'foo'}]).addCallback(L.append)
 
1450
        p.flush()
 
1451
        response = L.pop().get('response')
 
1452
        self.assertEquals(response, 'foo accepted')
 
1453
 
 
1454
 
1335
1455
    def test_failEarlyOnArgSending(self):
1336
1456
        """
1337
1457
        Verify that if we pass an invalid argument list (omitting an argument), an
1704
1824
        # reasonable.
1705
1825
        self.assertFailure(d, error.PeerVerifyError)
1706
1826
 
 
1827
    skip = skipSSL
 
1828
 
 
1829
 
 
1830
 
 
1831
class TLSNotAvailableTest(unittest.TestCase):
 
1832
    """
 
1833
    Tests what happened when ssl is not available in current installation.
 
1834
    """
 
1835
 
 
1836
    def setUp(self):
 
1837
        """
 
1838
        Disable ssl in amp.
 
1839
        """
 
1840
        self.ssl = amp.ssl
 
1841
        amp.ssl = None
 
1842
 
 
1843
 
 
1844
    def tearDown(self):
 
1845
        """
 
1846
        Restore ssl module.
 
1847
        """
 
1848
        amp.ssl = self.ssl
 
1849
 
 
1850
 
 
1851
    def test_callRemoteError(self):
 
1852
        """
 
1853
        Check that callRemote raises an exception when called with a
 
1854
        L{amp.StartTLS}.
 
1855
        """
 
1856
        cli, svr, p = connectedServerAndClient(
 
1857
            ServerClass=SecurableProto,
 
1858
            ClientClass=SecurableProto)
 
1859
 
 
1860
        okc = OKCert()
 
1861
        svr.certFactory = lambda : okc
 
1862
 
 
1863
        return self.assertFailure(cli.callRemote(
 
1864
            amp.StartTLS, tls_localCertificate=okc,
 
1865
            tls_verifyAuthorities=[PretendRemoteCertificateAuthority()]),
 
1866
            RuntimeError)
 
1867
 
 
1868
 
 
1869
    def test_messageReceivedError(self):
 
1870
        """
 
1871
        When a client with SSL enabled talks to a server without SSL, it
 
1872
        should return a meaningful error.
 
1873
        """
 
1874
        svr = SecurableProto()
 
1875
        okc = OKCert()
 
1876
        svr.certFactory = lambda : okc
 
1877
        box = amp.Box()
 
1878
        box['_command'] = 'StartTLS'
 
1879
        box['_ask'] = '1'
 
1880
        boxes = []
 
1881
        svr.sendBox = boxes.append
 
1882
        svr.makeConnection(StringTransport())
 
1883
        svr.ampBoxReceived(box)
 
1884
        self.assertEquals(boxes,
 
1885
            [{'_error_code': 'TLS_ERROR',
 
1886
              '_error': '1',
 
1887
              '_error_description': 'TLS not available'}])
 
1888
 
1707
1889
 
1708
1890
 
1709
1891
class InheritedError(Exception):
1908
2090
    cert = key.newCertificate(sscrd)
1909
2091
    return cert
1910
2092
 
1911
 
tempcert = tempSelfSigned()
 
2093
if ssl is not None:
 
2094
    tempcert = tempSelfSigned()
1912
2095
 
1913
2096
 
1914
2097
class LiveFireTLSTestCase(LiveFireBase, unittest.TestCase):
1946
2129
                                   tls_localCertificate=cert,
1947
2130
                                   tls_verifyAuthorities=[cert]).addCallback(secured)
1948
2131
 
 
2132
    skip = skipSSL
 
2133
 
 
2134
 
1949
2135
 
1950
2136
class SlightlySmartTLS(SimpleSymmetricCommandProtocol):
1951
2137
    """
1974
2160
            return self.cli.callRemote(SecuredPing)
1975
2161
        return self.cli.callRemote(amp.StartTLS).addCallback(secured)
1976
2162
 
 
2163
    skip = skipSSL
 
2164
 
 
2165
 
1977
2166
 
1978
2167
class WithServerTLSVerification(LiveFireBase, unittest.TestCase):
1979
2168
    clientProto = SimpleSymmetricCommandProtocol
1989
2178
                                   tls_verifyAuthorities=[tempcert]
1990
2179
            ).addCallback(secured)
1991
2180
 
 
2181
    skip = skipSSL
 
2182
 
1992
2183
 
1993
2184
 
1994
2185
class ProtocolIncludingArgument(amp.Argument):
2114
2305
 
2115
2306
class CommandTestCase(unittest.TestCase):
2116
2307
    """
2117
 
    Tests for L{amp.Command}.
 
2308
    Tests for L{amp.Argument} and L{amp.Command}.
2118
2309
    """
 
2310
    def test_argumentInterface(self):
 
2311
        """
 
2312
        L{Argument} instances provide L{amp.IArgumentType}.
 
2313
        """
 
2314
        self.assertTrue(verifyObject(amp.IArgumentType, amp.Argument()))
 
2315
 
2119
2316
 
2120
2317
    def test_parseResponse(self):
2121
2318
        """
2222
2419
        response.addCallback(gotResponse)
2223
2420
        return response
2224
2421
 
 
2422
 
 
2423
    def test_extraArgumentsDisallowed(self):
 
2424
        """
 
2425
        L{Command.makeArguments} raises L{amp.InvalidSignature} if the objects
 
2426
        dictionary passed to it includes a key which does not correspond to the
 
2427
        Python identifier for a defined argument.
 
2428
        """
 
2429
        self.assertRaises(
 
2430
            amp.InvalidSignature,
 
2431
            Hello.makeArguments,
 
2432
            dict(hello="hello", bogusArgument=object()), None)
 
2433
 
 
2434
 
 
2435
    def test_wireSpellingDisallowed(self):
 
2436
        """
 
2437
        If a command argument conflicts with a Python keyword, the
 
2438
        untransformed argument name is not allowed as a key in the dictionary
 
2439
        passed to L{Command.makeArguments}.  If it is supplied,
 
2440
        L{amp.InvalidSignature} is raised.
 
2441
 
 
2442
        This may be a pointless implementation restriction which may be lifted.
 
2443
        The current behavior is tested to verify that such arguments are not
 
2444
        silently dropped on the floor (the previous behavior).
 
2445
        """
 
2446
        self.assertRaises(
 
2447
            amp.InvalidSignature,
 
2448
            Hello.makeArguments,
 
2449
            dict(hello="required", **{"print": "print value"}),
 
2450
            None)
 
2451
 
 
2452
 
 
2453
 
2225
2454
if not interfaces.IReactorSSL.providedBy(reactor):
2226
2455
    skipMsg = 'This test case requires SSL support in the reactor'
2227
2456
    TLSTest.skip = skipMsg