~justin-fathomdb/nova/justinsb-openstack-api-volumes

« back to all changes in this revision

Viewing changes to vendor/Twisted-10.0.0/twisted/test/test_sslverify.py

  • Committer: Jesse Andrews
  • Date: 2010-05-28 06:05:26 UTC
  • Revision ID: git-v1:bf6e6e718cdc7488e2da87b21e258ccc065fe499
initial commit

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright 2005 Divmod, Inc.  See LICENSE file for details
 
2
# Copyright (c) 2006-2008 Twisted Matrix Laboratories.
 
3
# See LICENSE for details.
 
4
 
 
5
"""
 
6
Tests for L{twisted.internet._sslverify}.
 
7
"""
 
8
 
 
9
import itertools
 
10
 
 
11
try:
 
12
    from OpenSSL import SSL
 
13
    from OpenSSL.crypto import PKey, X509, X509Req
 
14
    from OpenSSL.crypto import TYPE_RSA
 
15
    from twisted.internet import _sslverify as sslverify
 
16
except ImportError:
 
17
    pass
 
18
 
 
19
from twisted.trial import unittest
 
20
from twisted.internet import protocol, defer, reactor
 
21
from twisted.python.reflect import objgrep, isSame
 
22
from twisted.python import log
 
23
 
 
24
from twisted.internet.error import CertificateError, ConnectionLost
 
25
from twisted.internet import interfaces
 
26
 
 
27
 
 
28
# A couple of static PEM-format certificates to be used by various tests.
 
29
A_HOST_CERTIFICATE_PEM = """
 
30
-----BEGIN CERTIFICATE-----
 
31
        MIIC2jCCAkMCAjA5MA0GCSqGSIb3DQEBBAUAMIG0MQswCQYDVQQGEwJVUzEiMCAG
 
32
        A1UEAxMZZXhhbXBsZS50d2lzdGVkbWF0cml4LmNvbTEPMA0GA1UEBxMGQm9zdG9u
 
33
        MRwwGgYDVQQKExNUd2lzdGVkIE1hdHJpeCBMYWJzMRYwFAYDVQQIEw1NYXNzYWNo
 
34
        dXNldHRzMScwJQYJKoZIhvcNAQkBFhhub2JvZHlAdHdpc3RlZG1hdHJpeC5jb20x
 
35
        ETAPBgNVBAsTCFNlY3VyaXR5MB4XDTA2MDgxNjAxMDEwOFoXDTA3MDgxNjAxMDEw
 
36
        OFowgbQxCzAJBgNVBAYTAlVTMSIwIAYDVQQDExlleGFtcGxlLnR3aXN0ZWRtYXRy
 
37
        aXguY29tMQ8wDQYDVQQHEwZCb3N0b24xHDAaBgNVBAoTE1R3aXN0ZWQgTWF0cml4
 
38
        IExhYnMxFjAUBgNVBAgTDU1hc3NhY2h1c2V0dHMxJzAlBgkqhkiG9w0BCQEWGG5v
 
39
        Ym9keUB0d2lzdGVkbWF0cml4LmNvbTERMA8GA1UECxMIU2VjdXJpdHkwgZ8wDQYJ
 
40
        KoZIhvcNAQEBBQADgY0AMIGJAoGBAMzH8CDF/U91y/bdbdbJKnLgnyvQ9Ig9ZNZp
 
41
        8hpsu4huil60zF03+Lexg2l1FIfURScjBuaJMR6HiMYTMjhzLuByRZ17KW4wYkGi
 
42
        KXstz03VIKy4Tjc+v4aXFI4XdRw10gGMGQlGGscXF/RSoN84VoDKBfOMWdXeConJ
 
43
        VyC4w3iJAgMBAAEwDQYJKoZIhvcNAQEEBQADgYEAviMT4lBoxOgQy32LIgZ4lVCj
 
44
        JNOiZYg8GMQ6y0ugp86X80UjOvkGtNf/R7YgED/giKRN/q/XJiLJDEhzknkocwmO
 
45
        S+4b2XpiaZYxRyKWwL221O7CGmtWYyZl2+92YYmmCiNzWQPfP6BOMlfax0AGLHls
 
46
        fXzCWdG0O/3Lk2SRM0I=
 
47
-----END CERTIFICATE-----
 
48
"""
 
49
 
 
50
A_PEER_CERTIFICATE_PEM = """
 
51
-----BEGIN CERTIFICATE-----
 
52
        MIIC3jCCAkcCAjA6MA0GCSqGSIb3DQEBBAUAMIG2MQswCQYDVQQGEwJVUzEiMCAG
 
53
        A1UEAxMZZXhhbXBsZS50d2lzdGVkbWF0cml4LmNvbTEPMA0GA1UEBxMGQm9zdG9u
 
54
        MRwwGgYDVQQKExNUd2lzdGVkIE1hdHJpeCBMYWJzMRYwFAYDVQQIEw1NYXNzYWNo
 
55
        dXNldHRzMSkwJwYJKoZIhvcNAQkBFhpzb21lYm9keUB0d2lzdGVkbWF0cml4LmNv
 
56
        bTERMA8GA1UECxMIU2VjdXJpdHkwHhcNMDYwODE2MDEwMTU2WhcNMDcwODE2MDEw
 
57
        MTU2WjCBtjELMAkGA1UEBhMCVVMxIjAgBgNVBAMTGWV4YW1wbGUudHdpc3RlZG1h
 
58
        dHJpeC5jb20xDzANBgNVBAcTBkJvc3RvbjEcMBoGA1UEChMTVHdpc3RlZCBNYXRy
 
59
        aXggTGFiczEWMBQGA1UECBMNTWFzc2FjaHVzZXR0czEpMCcGCSqGSIb3DQEJARYa
 
60
        c29tZWJvZHlAdHdpc3RlZG1hdHJpeC5jb20xETAPBgNVBAsTCFNlY3VyaXR5MIGf
 
61
        MA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQCnm+WBlgFNbMlHehib9ePGGDXF+Nz4
 
62
        CjGuUmVBaXCRCiVjg3kSDecwqfb0fqTksBZ+oQ1UBjMcSh7OcvFXJZnUesBikGWE
 
63
        JE4V8Bjh+RmbJ1ZAlUPZ40bAkww0OpyIRAGMvKG+4yLFTO4WDxKmfDcrOb6ID8WJ
 
64
        e1u+i3XGkIf/5QIDAQABMA0GCSqGSIb3DQEBBAUAA4GBAD4Oukm3YYkhedUepBEA
 
65
        vvXIQhVDqL7mk6OqYdXmNj6R7ZMC8WWvGZxrzDI1bZuB+4aIxxd1FXC3UOHiR/xg
 
66
        i9cDl1y8P/qRp4aEBNF6rI0D4AxTbfnHQx4ERDAOShJdYZs/2zifPJ6va6YvrEyr
 
67
        yqDtGhklsWW3ZwBzEh5VEOUp
 
68
-----END CERTIFICATE-----
 
69
"""
 
70
 
 
71
 
 
72
 
 
73
counter = itertools.count().next
 
74
def makeCertificate(**kw):
 
75
    keypair = PKey()
 
76
    keypair.generate_key(TYPE_RSA, 512)
 
77
 
 
78
    certificate = X509()
 
79
    certificate.gmtime_adj_notBefore(0)
 
80
    certificate.gmtime_adj_notAfter(60 * 60 * 24 * 365) # One year
 
81
    for xname in certificate.get_issuer(), certificate.get_subject():
 
82
        for (k, v) in kw.items():
 
83
            setattr(xname, k, v)
 
84
 
 
85
    certificate.set_serial_number(counter())
 
86
    certificate.set_pubkey(keypair)
 
87
    certificate.sign(keypair, "md5")
 
88
 
 
89
    return keypair, certificate
 
90
 
 
91
 
 
92
 
 
93
class DataCallbackProtocol(protocol.Protocol):
 
94
    def dataReceived(self, data):
 
95
        d, self.factory.onData = self.factory.onData, None
 
96
        if d is not None:
 
97
            d.callback(data)
 
98
 
 
99
    def connectionLost(self, reason):
 
100
        d, self.factory.onLost = self.factory.onLost, None
 
101
        if d is not None:
 
102
            d.errback(reason)
 
103
 
 
104
class WritingProtocol(protocol.Protocol):
 
105
    byte = 'x'
 
106
    def connectionMade(self):
 
107
        self.transport.write(self.byte)
 
108
 
 
109
    def connectionLost(self, reason):
 
110
        self.factory.onLost.errback(reason)
 
111
 
 
112
 
 
113
class OpenSSLOptions(unittest.TestCase):
 
114
    serverPort = clientConn = None
 
115
    onServerLost = onClientLost = None
 
116
 
 
117
    sKey = None
 
118
    sCert = None
 
119
    cKey = None
 
120
    cCert = None
 
121
 
 
122
    def setUp(self):
 
123
        """
 
124
        Create class variables of client and server certificates.
 
125
        """
 
126
        self.sKey, self.sCert = makeCertificate(
 
127
            O="Server Test Certificate",
 
128
            CN="server")
 
129
        self.cKey, self.cCert = makeCertificate(
 
130
            O="Client Test Certificate",
 
131
            CN="client")
 
132
 
 
133
    def tearDown(self):
 
134
        if self.serverPort is not None:
 
135
            self.serverPort.stopListening()
 
136
        if self.clientConn is not None:
 
137
            self.clientConn.disconnect()
 
138
 
 
139
        L = []
 
140
        if self.onServerLost is not None:
 
141
            L.append(self.onServerLost)
 
142
        if self.onClientLost is not None:
 
143
            L.append(self.onClientLost)
 
144
 
 
145
        return defer.DeferredList(L, consumeErrors=True)
 
146
 
 
147
    def loopback(self, serverCertOpts, clientCertOpts,
 
148
                 onServerLost=None, onClientLost=None, onData=None):
 
149
        if onServerLost is None:
 
150
            self.onServerLost = onServerLost = defer.Deferred()
 
151
        if onClientLost is None:
 
152
            self.onClientLost = onClientLost = defer.Deferred()
 
153
        if onData is None:
 
154
            onData = defer.Deferred()
 
155
 
 
156
        serverFactory = protocol.ServerFactory()
 
157
        serverFactory.protocol = DataCallbackProtocol
 
158
        serverFactory.onLost = onServerLost
 
159
        serverFactory.onData = onData
 
160
 
 
161
        clientFactory = protocol.ClientFactory()
 
162
        clientFactory.protocol = WritingProtocol
 
163
        clientFactory.onLost = onClientLost
 
164
 
 
165
        self.serverPort = reactor.listenSSL(0, serverFactory, serverCertOpts)
 
166
        self.clientConn = reactor.connectSSL('127.0.0.1',
 
167
                self.serverPort.getHost().port, clientFactory, clientCertOpts)
 
168
 
 
169
    def test_abbreviatingDistinguishedNames(self):
 
170
        """
 
171
        Check that abbreviations used in certificates correctly map to
 
172
        complete names.
 
173
        """
 
174
        self.assertEquals(
 
175
                sslverify.DN(CN='a', OU='hello'),
 
176
                sslverify.DistinguishedName(commonName='a',
 
177
                                            organizationalUnitName='hello'))
 
178
        self.assertNotEquals(
 
179
                sslverify.DN(CN='a', OU='hello'),
 
180
                sslverify.DN(CN='a', OU='hello', emailAddress='xxx'))
 
181
        dn = sslverify.DN(CN='abcdefg')
 
182
        self.assertRaises(AttributeError, setattr, dn, 'Cn', 'x')
 
183
        self.assertEquals(dn.CN, dn.commonName)
 
184
        dn.CN = 'bcdefga'
 
185
        self.assertEquals(dn.CN, dn.commonName)
 
186
 
 
187
 
 
188
    def testInspectDistinguishedName(self):
 
189
        n = sslverify.DN(commonName='common name',
 
190
                         organizationName='organization name',
 
191
                         organizationalUnitName='organizational unit name',
 
192
                         localityName='locality name',
 
193
                         stateOrProvinceName='state or province name',
 
194
                         countryName='country name',
 
195
                         emailAddress='email address')
 
196
        s = n.inspect()
 
197
        for k in [
 
198
            'common name',
 
199
            'organization name',
 
200
            'organizational unit name',
 
201
            'locality name',
 
202
            'state or province name',
 
203
            'country name',
 
204
            'email address']:
 
205
            self.assertIn(k, s, "%r was not in inspect output." % (k,))
 
206
            self.assertIn(k.title(), s, "%r was not in inspect output." % (k,))
 
207
 
 
208
 
 
209
    def testInspectDistinguishedNameWithoutAllFields(self):
 
210
        n = sslverify.DN(localityName='locality name')
 
211
        s = n.inspect()
 
212
        for k in [
 
213
            'common name',
 
214
            'organization name',
 
215
            'organizational unit name',
 
216
            'state or province name',
 
217
            'country name',
 
218
            'email address']:
 
219
            self.assertNotIn(k, s, "%r was in inspect output." % (k,))
 
220
            self.assertNotIn(k.title(), s, "%r was in inspect output." % (k,))
 
221
        self.assertIn('locality name', s)
 
222
        self.assertIn('Locality Name', s)
 
223
 
 
224
 
 
225
    def test_inspectCertificate(self):
 
226
        """
 
227
        Test that the C{inspect} method of L{sslverify.Certificate} returns
 
228
        a human-readable string containing some basic information about the
 
229
        certificate.
 
230
        """
 
231
        c = sslverify.Certificate.loadPEM(A_HOST_CERTIFICATE_PEM)
 
232
        self.assertEqual(
 
233
            c.inspect().split('\n'),
 
234
            ["Certificate For Subject:",
 
235
             "  Organizational Unit Name: Security",
 
236
             "         Organization Name: Twisted Matrix Labs",
 
237
             "               Common Name: example.twistedmatrix.com",
 
238
             "    State Or Province Name: Massachusetts",
 
239
             "              Country Name: US",
 
240
             "             Email Address: nobody@twistedmatrix.com",
 
241
             "             Locality Name: Boston",
 
242
             "",
 
243
             "Issuer:",
 
244
             "  Organizational Unit Name: Security",
 
245
             "         Organization Name: Twisted Matrix Labs",
 
246
             "               Common Name: example.twistedmatrix.com",
 
247
             "    State Or Province Name: Massachusetts",
 
248
             "              Country Name: US",
 
249
             "             Email Address: nobody@twistedmatrix.com",
 
250
             "             Locality Name: Boston",
 
251
             "",
 
252
             "Serial Number: 12345",
 
253
             "Digest: C4:96:11:00:30:C3:EC:EE:A3:55:AA:ED:8C:84:85:18",
 
254
             "Public Key with Hash: ff33994c80812aa95a79cdb85362d054"])
 
255
 
 
256
 
 
257
    def test_certificateOptionsSerialization(self):
 
258
        """
 
259
        Test that __setstate__(__getstate__()) round-trips properly.
 
260
        """
 
261
        firstOpts = sslverify.OpenSSLCertificateOptions(
 
262
            privateKey=self.sKey,
 
263
            certificate=self.sCert,
 
264
            method=SSL.SSLv3_METHOD,
 
265
            verify=True,
 
266
            caCerts=[self.sCert],
 
267
            verifyDepth=2,
 
268
            requireCertificate=False,
 
269
            verifyOnce=False,
 
270
            enableSingleUseKeys=False,
 
271
            enableSessions=False,
 
272
            fixBrokenPeers=True,
 
273
            enableSessionTickets=True)
 
274
        context = firstOpts.getContext()
 
275
        state = firstOpts.__getstate__()
 
276
 
 
277
        # The context shouldn't be in the state to serialize
 
278
        self.failIf(objgrep(state, context, isSame),
 
279
                    objgrep(state, context, isSame))
 
280
 
 
281
        opts = sslverify.OpenSSLCertificateOptions()
 
282
        opts.__setstate__(state)
 
283
        self.assertEqual(opts.privateKey, self.sKey)
 
284
        self.assertEqual(opts.certificate, self.sCert)
 
285
        self.assertEqual(opts.method, SSL.SSLv3_METHOD)
 
286
        self.assertEqual(opts.verify, True)
 
287
        self.assertEqual(opts.caCerts, [self.sCert])
 
288
        self.assertEqual(opts.verifyDepth, 2)
 
289
        self.assertEqual(opts.requireCertificate, False)
 
290
        self.assertEqual(opts.verifyOnce, False)
 
291
        self.assertEqual(opts.enableSingleUseKeys, False)
 
292
        self.assertEqual(opts.enableSessions, False)
 
293
        self.assertEqual(opts.fixBrokenPeers, True)
 
294
        self.assertEqual(opts.enableSessionTickets, True)
 
295
 
 
296
 
 
297
    def test_certificateOptionsSessionTickets(self):
 
298
        """
 
299
        Enabling session tickets should not set the OP_NO_TICKET option.
 
300
        """
 
301
        opts = sslverify.OpenSSLCertificateOptions(enableSessionTickets=True)
 
302
        ctx = opts.getContext()
 
303
        self.assertEquals(0, ctx.set_options(0) & 0x00004000)
 
304
 
 
305
 
 
306
    def test_certificateOptionsSessionTicketsDisabled(self):
 
307
        """
 
308
        Enabling session tickets should set the OP_NO_TICKET option.
 
309
        """
 
310
        opts = sslverify.OpenSSLCertificateOptions(enableSessionTickets=False)
 
311
        ctx = opts.getContext()
 
312
        self.assertEquals(0x00004000, ctx.set_options(0) & 0x00004000)
 
313
 
 
314
 
 
315
    def test_allowedAnonymousClientConnection(self):
 
316
        """
 
317
        Check that anonymous connections are allowed when certificates aren't
 
318
        required on the server.
 
319
        """
 
320
        onData = defer.Deferred()
 
321
        self.loopback(sslverify.OpenSSLCertificateOptions(privateKey=self.sKey,
 
322
                            certificate=self.sCert, requireCertificate=False),
 
323
                      sslverify.OpenSSLCertificateOptions(
 
324
                          requireCertificate=False),
 
325
                      onData=onData)
 
326
 
 
327
        return onData.addCallback(
 
328
            lambda result: self.assertEquals(result, WritingProtocol.byte))
 
329
 
 
330
    def test_refusedAnonymousClientConnection(self):
 
331
        """
 
332
        Check that anonymous connections are refused when certificates are
 
333
        required on the server.
 
334
        """
 
335
        onServerLost = defer.Deferred()
 
336
        onClientLost = defer.Deferred()
 
337
        self.loopback(sslverify.OpenSSLCertificateOptions(privateKey=self.sKey,
 
338
                            certificate=self.sCert, verify=True,
 
339
                            caCerts=[self.sCert], requireCertificate=True),
 
340
                      sslverify.OpenSSLCertificateOptions(
 
341
                          requireCertificate=False),
 
342
                      onServerLost=onServerLost,
 
343
                      onClientLost=onClientLost)
 
344
 
 
345
        d = defer.DeferredList([onClientLost, onServerLost],
 
346
                               consumeErrors=True)
 
347
 
 
348
 
 
349
        def afterLost(((cSuccess, cResult), (sSuccess, sResult))):
 
350
 
 
351
            self.failIf(cSuccess)
 
352
            self.failIf(sSuccess)
 
353
            # Win32 fails to report the SSL Error, and report a connection lost
 
354
            # instead: there is a race condition so that's not totally
 
355
            # surprising (see ticket #2877 in the tracker)
 
356
            self.assertIsInstance(cResult.value, (SSL.Error, ConnectionLost))
 
357
            self.assertIsInstance(sResult.value, SSL.Error)
 
358
 
 
359
        return d.addCallback(afterLost)
 
360
 
 
361
    def test_failedCertificateVerification(self):
 
362
        """
 
363
        Check that connecting with a certificate not accepted by the server CA
 
364
        fails.
 
365
        """
 
366
        onServerLost = defer.Deferred()
 
367
        onClientLost = defer.Deferred()
 
368
        self.loopback(sslverify.OpenSSLCertificateOptions(privateKey=self.sKey,
 
369
                            certificate=self.sCert, verify=False,
 
370
                            requireCertificate=False),
 
371
                      sslverify.OpenSSLCertificateOptions(verify=True,
 
372
                            requireCertificate=False, caCerts=[self.cCert]),
 
373
                      onServerLost=onServerLost,
 
374
                      onClientLost=onClientLost)
 
375
 
 
376
        d = defer.DeferredList([onClientLost, onServerLost],
 
377
                               consumeErrors=True)
 
378
        def afterLost(((cSuccess, cResult), (sSuccess, sResult))):
 
379
 
 
380
            self.failIf(cSuccess)
 
381
            self.failIf(sSuccess)
 
382
 
 
383
        return d.addCallback(afterLost)
 
384
 
 
385
    def test_successfulCertificateVerification(self):
 
386
        """
 
387
        Test a successful connection with client certificate validation on
 
388
        server side.
 
389
        """
 
390
        onData = defer.Deferred()
 
391
        self.loopback(sslverify.OpenSSLCertificateOptions(privateKey=self.sKey,
 
392
                            certificate=self.sCert, verify=False,
 
393
                            requireCertificate=False),
 
394
                      sslverify.OpenSSLCertificateOptions(verify=True,
 
395
                            requireCertificate=True, caCerts=[self.sCert]),
 
396
                      onData=onData)
 
397
 
 
398
        return onData.addCallback(
 
399
                lambda result: self.assertEquals(result, WritingProtocol.byte))
 
400
 
 
401
    def test_successfulSymmetricSelfSignedCertificateVerification(self):
 
402
        """
 
403
        Test a successful connection with validation on both server and client
 
404
        sides.
 
405
        """
 
406
        onData = defer.Deferred()
 
407
        self.loopback(sslverify.OpenSSLCertificateOptions(privateKey=self.sKey,
 
408
                            certificate=self.sCert, verify=True,
 
409
                            requireCertificate=True, caCerts=[self.cCert]),
 
410
                      sslverify.OpenSSLCertificateOptions(privateKey=self.cKey,
 
411
                            certificate=self.cCert, verify=True,
 
412
                            requireCertificate=True, caCerts=[self.sCert]),
 
413
                      onData=onData)
 
414
 
 
415
        return onData.addCallback(
 
416
                lambda result: self.assertEquals(result, WritingProtocol.byte))
 
417
 
 
418
    def test_verification(self):
 
419
        """
 
420
        Check certificates verification building custom certificates data.
 
421
        """
 
422
        clientDN = sslverify.DistinguishedName(commonName='client')
 
423
        clientKey = sslverify.KeyPair.generate()
 
424
        clientCertReq = clientKey.certificateRequest(clientDN)
 
425
 
 
426
        serverDN = sslverify.DistinguishedName(commonName='server')
 
427
        serverKey = sslverify.KeyPair.generate()
 
428
        serverCertReq = serverKey.certificateRequest(serverDN)
 
429
 
 
430
        clientSelfCertReq = clientKey.certificateRequest(clientDN)
 
431
        clientSelfCertData = clientKey.signCertificateRequest(
 
432
                clientDN, clientSelfCertReq, lambda dn: True, 132)
 
433
        clientSelfCert = clientKey.newCertificate(clientSelfCertData)
 
434
 
 
435
        serverSelfCertReq = serverKey.certificateRequest(serverDN)
 
436
        serverSelfCertData = serverKey.signCertificateRequest(
 
437
                serverDN, serverSelfCertReq, lambda dn: True, 516)
 
438
        serverSelfCert = serverKey.newCertificate(serverSelfCertData)
 
439
 
 
440
        clientCertData = serverKey.signCertificateRequest(
 
441
                serverDN, clientCertReq, lambda dn: True, 7)
 
442
        clientCert = clientKey.newCertificate(clientCertData)
 
443
 
 
444
        serverCertData = clientKey.signCertificateRequest(
 
445
                clientDN, serverCertReq, lambda dn: True, 42)
 
446
        serverCert = serverKey.newCertificate(serverCertData)
 
447
 
 
448
        onData = defer.Deferred()
 
449
 
 
450
        serverOpts = serverCert.options(serverSelfCert)
 
451
        clientOpts = clientCert.options(clientSelfCert)
 
452
 
 
453
        self.loopback(serverOpts,
 
454
                      clientOpts,
 
455
                      onData=onData)
 
456
 
 
457
        return onData.addCallback(
 
458
                lambda result: self.assertEquals(result, WritingProtocol.byte))
 
459
 
 
460
 
 
461
 
 
462
if interfaces.IReactorSSL(reactor, None) is None:
 
463
    OpenSSLOptions.skip = "Reactor does not support SSL, cannot run SSL tests"
 
464
 
 
465
 
 
466
 
 
467
class _NotSSLTransport:
 
468
    def getHandle(self):
 
469
        return self
 
470
 
 
471
class _MaybeSSLTransport:
 
472
    def getHandle(self):
 
473
        return self
 
474
 
 
475
    def get_peer_certificate(self):
 
476
        return None
 
477
 
 
478
    def get_host_certificate(self):
 
479
        return None
 
480
 
 
481
 
 
482
class _ActualSSLTransport:
 
483
    def getHandle(self):
 
484
        return self
 
485
 
 
486
    def get_host_certificate(self):
 
487
        return sslverify.Certificate.loadPEM(A_HOST_CERTIFICATE_PEM).original
 
488
 
 
489
    def get_peer_certificate(self):
 
490
        return sslverify.Certificate.loadPEM(A_PEER_CERTIFICATE_PEM).original
 
491
 
 
492
 
 
493
class Constructors(unittest.TestCase):
 
494
    def test_peerFromNonSSLTransport(self):
 
495
        """
 
496
        Verify that peerFromTransport raises an exception if the transport
 
497
        passed is not actually an SSL transport.
 
498
        """
 
499
        x = self.assertRaises(CertificateError,
 
500
                              sslverify.Certificate.peerFromTransport,
 
501
                              _NotSSLTransport())
 
502
        self.failUnless(str(x).startswith("non-TLS"))
 
503
 
 
504
    def test_peerFromBlankSSLTransport(self):
 
505
        """
 
506
        Verify that peerFromTransport raises an exception if the transport
 
507
        passed is an SSL transport, but doesn't have a peer certificate.
 
508
        """
 
509
        x = self.assertRaises(CertificateError,
 
510
                              sslverify.Certificate.peerFromTransport,
 
511
                              _MaybeSSLTransport())
 
512
        self.failUnless(str(x).startswith("TLS"))
 
513
 
 
514
    def test_hostFromNonSSLTransport(self):
 
515
        """
 
516
        Verify that hostFromTransport raises an exception if the transport
 
517
        passed is not actually an SSL transport.
 
518
        """
 
519
        x = self.assertRaises(CertificateError,
 
520
                              sslverify.Certificate.hostFromTransport,
 
521
                              _NotSSLTransport())
 
522
        self.failUnless(str(x).startswith("non-TLS"))
 
523
 
 
524
    def test_hostFromBlankSSLTransport(self):
 
525
        """
 
526
        Verify that hostFromTransport raises an exception if the transport
 
527
        passed is an SSL transport, but doesn't have a host certificate.
 
528
        """
 
529
        x = self.assertRaises(CertificateError,
 
530
                              sslverify.Certificate.hostFromTransport,
 
531
                              _MaybeSSLTransport())
 
532
        self.failUnless(str(x).startswith("TLS"))
 
533
 
 
534
 
 
535
    def test_hostFromSSLTransport(self):
 
536
        """
 
537
        Verify that hostFromTransport successfully creates the correct
 
538
        certificate if passed a valid SSL transport.
 
539
        """
 
540
        self.assertEqual(
 
541
            sslverify.Certificate.hostFromTransport(
 
542
                _ActualSSLTransport()).serialNumber(),
 
543
            12345)
 
544
 
 
545
    def test_peerFromSSLTransport(self):
 
546
        """
 
547
        Verify that peerFromTransport successfully creates the correct
 
548
        certificate if passed a valid SSL transport.
 
549
        """
 
550
        self.assertEqual(
 
551
            sslverify.Certificate.peerFromTransport(
 
552
                _ActualSSLTransport()).serialNumber(),
 
553
            12346)
 
554
 
 
555
 
 
556
 
 
557
if interfaces.IReactorSSL(reactor, None) is None:
 
558
    Constructors.skip = "Reactor does not support SSL, cannot run SSL tests"