1
# Copyright 2005 Divmod, Inc. See LICENSE file for details
2
# Copyright (c) 2006-2008 Twisted Matrix Laboratories.
3
# See LICENSE for details.
6
Tests for L{twisted.internet._sslverify}.
12
from OpenSSL import SSL
13
from OpenSSL.crypto import PKey, X509, X509Req
14
from OpenSSL.crypto import TYPE_RSA
15
from twisted.internet import _sslverify as sslverify
19
from twisted.trial import unittest
20
from twisted.internet import protocol, defer, reactor
21
from twisted.python.reflect import objgrep, isSame
22
from twisted.python import log
24
from twisted.internet.error import CertificateError, ConnectionLost
25
from twisted.internet import interfaces
28
# A couple of static PEM-format certificates to be used by various tests.
29
A_HOST_CERTIFICATE_PEM = """
30
-----BEGIN CERTIFICATE-----
31
MIIC2jCCAkMCAjA5MA0GCSqGSIb3DQEBBAUAMIG0MQswCQYDVQQGEwJVUzEiMCAG
32
A1UEAxMZZXhhbXBsZS50d2lzdGVkbWF0cml4LmNvbTEPMA0GA1UEBxMGQm9zdG9u
33
MRwwGgYDVQQKExNUd2lzdGVkIE1hdHJpeCBMYWJzMRYwFAYDVQQIEw1NYXNzYWNo
34
dXNldHRzMScwJQYJKoZIhvcNAQkBFhhub2JvZHlAdHdpc3RlZG1hdHJpeC5jb20x
35
ETAPBgNVBAsTCFNlY3VyaXR5MB4XDTA2MDgxNjAxMDEwOFoXDTA3MDgxNjAxMDEw
36
OFowgbQxCzAJBgNVBAYTAlVTMSIwIAYDVQQDExlleGFtcGxlLnR3aXN0ZWRtYXRy
37
aXguY29tMQ8wDQYDVQQHEwZCb3N0b24xHDAaBgNVBAoTE1R3aXN0ZWQgTWF0cml4
38
IExhYnMxFjAUBgNVBAgTDU1hc3NhY2h1c2V0dHMxJzAlBgkqhkiG9w0BCQEWGG5v
39
Ym9keUB0d2lzdGVkbWF0cml4LmNvbTERMA8GA1UECxMIU2VjdXJpdHkwgZ8wDQYJ
40
KoZIhvcNAQEBBQADgY0AMIGJAoGBAMzH8CDF/U91y/bdbdbJKnLgnyvQ9Ig9ZNZp
41
8hpsu4huil60zF03+Lexg2l1FIfURScjBuaJMR6HiMYTMjhzLuByRZ17KW4wYkGi
42
KXstz03VIKy4Tjc+v4aXFI4XdRw10gGMGQlGGscXF/RSoN84VoDKBfOMWdXeConJ
43
VyC4w3iJAgMBAAEwDQYJKoZIhvcNAQEEBQADgYEAviMT4lBoxOgQy32LIgZ4lVCj
44
JNOiZYg8GMQ6y0ugp86X80UjOvkGtNf/R7YgED/giKRN/q/XJiLJDEhzknkocwmO
45
S+4b2XpiaZYxRyKWwL221O7CGmtWYyZl2+92YYmmCiNzWQPfP6BOMlfax0AGLHls
47
-----END CERTIFICATE-----
50
A_PEER_CERTIFICATE_PEM = """
51
-----BEGIN CERTIFICATE-----
52
MIIC3jCCAkcCAjA6MA0GCSqGSIb3DQEBBAUAMIG2MQswCQYDVQQGEwJVUzEiMCAG
53
A1UEAxMZZXhhbXBsZS50d2lzdGVkbWF0cml4LmNvbTEPMA0GA1UEBxMGQm9zdG9u
54
MRwwGgYDVQQKExNUd2lzdGVkIE1hdHJpeCBMYWJzMRYwFAYDVQQIEw1NYXNzYWNo
55
dXNldHRzMSkwJwYJKoZIhvcNAQkBFhpzb21lYm9keUB0d2lzdGVkbWF0cml4LmNv
56
bTERMA8GA1UECxMIU2VjdXJpdHkwHhcNMDYwODE2MDEwMTU2WhcNMDcwODE2MDEw
57
MTU2WjCBtjELMAkGA1UEBhMCVVMxIjAgBgNVBAMTGWV4YW1wbGUudHdpc3RlZG1h
58
dHJpeC5jb20xDzANBgNVBAcTBkJvc3RvbjEcMBoGA1UEChMTVHdpc3RlZCBNYXRy
59
aXggTGFiczEWMBQGA1UECBMNTWFzc2FjaHVzZXR0czEpMCcGCSqGSIb3DQEJARYa
60
c29tZWJvZHlAdHdpc3RlZG1hdHJpeC5jb20xETAPBgNVBAsTCFNlY3VyaXR5MIGf
61
MA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQCnm+WBlgFNbMlHehib9ePGGDXF+Nz4
62
CjGuUmVBaXCRCiVjg3kSDecwqfb0fqTksBZ+oQ1UBjMcSh7OcvFXJZnUesBikGWE
63
JE4V8Bjh+RmbJ1ZAlUPZ40bAkww0OpyIRAGMvKG+4yLFTO4WDxKmfDcrOb6ID8WJ
64
e1u+i3XGkIf/5QIDAQABMA0GCSqGSIb3DQEBBAUAA4GBAD4Oukm3YYkhedUepBEA
65
vvXIQhVDqL7mk6OqYdXmNj6R7ZMC8WWvGZxrzDI1bZuB+4aIxxd1FXC3UOHiR/xg
66
i9cDl1y8P/qRp4aEBNF6rI0D4AxTbfnHQx4ERDAOShJdYZs/2zifPJ6va6YvrEyr
67
yqDtGhklsWW3ZwBzEh5VEOUp
68
-----END CERTIFICATE-----
73
counter = itertools.count().next
74
def makeCertificate(**kw):
76
keypair.generate_key(TYPE_RSA, 512)
79
certificate.gmtime_adj_notBefore(0)
80
certificate.gmtime_adj_notAfter(60 * 60 * 24 * 365) # One year
81
for xname in certificate.get_issuer(), certificate.get_subject():
82
for (k, v) in kw.items():
85
certificate.set_serial_number(counter())
86
certificate.set_pubkey(keypair)
87
certificate.sign(keypair, "md5")
89
return keypair, certificate
93
class DataCallbackProtocol(protocol.Protocol):
94
def dataReceived(self, data):
95
d, self.factory.onData = self.factory.onData, None
99
def connectionLost(self, reason):
100
d, self.factory.onLost = self.factory.onLost, None
104
class WritingProtocol(protocol.Protocol):
106
def connectionMade(self):
107
self.transport.write(self.byte)
109
def connectionLost(self, reason):
110
self.factory.onLost.errback(reason)
113
class OpenSSLOptions(unittest.TestCase):
114
serverPort = clientConn = None
115
onServerLost = onClientLost = None
124
Create class variables of client and server certificates.
126
self.sKey, self.sCert = makeCertificate(
127
O="Server Test Certificate",
129
self.cKey, self.cCert = makeCertificate(
130
O="Client Test Certificate",
134
if self.serverPort is not None:
135
self.serverPort.stopListening()
136
if self.clientConn is not None:
137
self.clientConn.disconnect()
140
if self.onServerLost is not None:
141
L.append(self.onServerLost)
142
if self.onClientLost is not None:
143
L.append(self.onClientLost)
145
return defer.DeferredList(L, consumeErrors=True)
147
def loopback(self, serverCertOpts, clientCertOpts,
148
onServerLost=None, onClientLost=None, onData=None):
149
if onServerLost is None:
150
self.onServerLost = onServerLost = defer.Deferred()
151
if onClientLost is None:
152
self.onClientLost = onClientLost = defer.Deferred()
154
onData = defer.Deferred()
156
serverFactory = protocol.ServerFactory()
157
serverFactory.protocol = DataCallbackProtocol
158
serverFactory.onLost = onServerLost
159
serverFactory.onData = onData
161
clientFactory = protocol.ClientFactory()
162
clientFactory.protocol = WritingProtocol
163
clientFactory.onLost = onClientLost
165
self.serverPort = reactor.listenSSL(0, serverFactory, serverCertOpts)
166
self.clientConn = reactor.connectSSL('127.0.0.1',
167
self.serverPort.getHost().port, clientFactory, clientCertOpts)
169
def test_abbreviatingDistinguishedNames(self):
171
Check that abbreviations used in certificates correctly map to
175
sslverify.DN(CN='a', OU='hello'),
176
sslverify.DistinguishedName(commonName='a',
177
organizationalUnitName='hello'))
178
self.assertNotEquals(
179
sslverify.DN(CN='a', OU='hello'),
180
sslverify.DN(CN='a', OU='hello', emailAddress='xxx'))
181
dn = sslverify.DN(CN='abcdefg')
182
self.assertRaises(AttributeError, setattr, dn, 'Cn', 'x')
183
self.assertEquals(dn.CN, dn.commonName)
185
self.assertEquals(dn.CN, dn.commonName)
188
def testInspectDistinguishedName(self):
189
n = sslverify.DN(commonName='common name',
190
organizationName='organization name',
191
organizationalUnitName='organizational unit name',
192
localityName='locality name',
193
stateOrProvinceName='state or province name',
194
countryName='country name',
195
emailAddress='email address')
200
'organizational unit name',
202
'state or province name',
205
self.assertIn(k, s, "%r was not in inspect output." % (k,))
206
self.assertIn(k.title(), s, "%r was not in inspect output." % (k,))
209
def testInspectDistinguishedNameWithoutAllFields(self):
210
n = sslverify.DN(localityName='locality name')
215
'organizational unit name',
216
'state or province name',
219
self.assertNotIn(k, s, "%r was in inspect output." % (k,))
220
self.assertNotIn(k.title(), s, "%r was in inspect output." % (k,))
221
self.assertIn('locality name', s)
222
self.assertIn('Locality Name', s)
225
def test_inspectCertificate(self):
227
Test that the C{inspect} method of L{sslverify.Certificate} returns
228
a human-readable string containing some basic information about the
231
c = sslverify.Certificate.loadPEM(A_HOST_CERTIFICATE_PEM)
233
c.inspect().split('\n'),
234
["Certificate For Subject:",
235
" Organizational Unit Name: Security",
236
" Organization Name: Twisted Matrix Labs",
237
" Common Name: example.twistedmatrix.com",
238
" State Or Province Name: Massachusetts",
240
" Email Address: nobody@twistedmatrix.com",
241
" Locality Name: Boston",
244
" Organizational Unit Name: Security",
245
" Organization Name: Twisted Matrix Labs",
246
" Common Name: example.twistedmatrix.com",
247
" State Or Province Name: Massachusetts",
249
" Email Address: nobody@twistedmatrix.com",
250
" Locality Name: Boston",
252
"Serial Number: 12345",
253
"Digest: C4:96:11:00:30:C3:EC:EE:A3:55:AA:ED:8C:84:85:18",
254
"Public Key with Hash: ff33994c80812aa95a79cdb85362d054"])
257
def test_certificateOptionsSerialization(self):
259
Test that __setstate__(__getstate__()) round-trips properly.
261
firstOpts = sslverify.OpenSSLCertificateOptions(
262
privateKey=self.sKey,
263
certificate=self.sCert,
264
method=SSL.SSLv3_METHOD,
266
caCerts=[self.sCert],
268
requireCertificate=False,
270
enableSingleUseKeys=False,
271
enableSessions=False,
273
enableSessionTickets=True)
274
context = firstOpts.getContext()
275
state = firstOpts.__getstate__()
277
# The context shouldn't be in the state to serialize
278
self.failIf(objgrep(state, context, isSame),
279
objgrep(state, context, isSame))
281
opts = sslverify.OpenSSLCertificateOptions()
282
opts.__setstate__(state)
283
self.assertEqual(opts.privateKey, self.sKey)
284
self.assertEqual(opts.certificate, self.sCert)
285
self.assertEqual(opts.method, SSL.SSLv3_METHOD)
286
self.assertEqual(opts.verify, True)
287
self.assertEqual(opts.caCerts, [self.sCert])
288
self.assertEqual(opts.verifyDepth, 2)
289
self.assertEqual(opts.requireCertificate, False)
290
self.assertEqual(opts.verifyOnce, False)
291
self.assertEqual(opts.enableSingleUseKeys, False)
292
self.assertEqual(opts.enableSessions, False)
293
self.assertEqual(opts.fixBrokenPeers, True)
294
self.assertEqual(opts.enableSessionTickets, True)
297
def test_certificateOptionsSessionTickets(self):
299
Enabling session tickets should not set the OP_NO_TICKET option.
301
opts = sslverify.OpenSSLCertificateOptions(enableSessionTickets=True)
302
ctx = opts.getContext()
303
self.assertEquals(0, ctx.set_options(0) & 0x00004000)
306
def test_certificateOptionsSessionTicketsDisabled(self):
308
Enabling session tickets should set the OP_NO_TICKET option.
310
opts = sslverify.OpenSSLCertificateOptions(enableSessionTickets=False)
311
ctx = opts.getContext()
312
self.assertEquals(0x00004000, ctx.set_options(0) & 0x00004000)
315
def test_allowedAnonymousClientConnection(self):
317
Check that anonymous connections are allowed when certificates aren't
318
required on the server.
320
onData = defer.Deferred()
321
self.loopback(sslverify.OpenSSLCertificateOptions(privateKey=self.sKey,
322
certificate=self.sCert, requireCertificate=False),
323
sslverify.OpenSSLCertificateOptions(
324
requireCertificate=False),
327
return onData.addCallback(
328
lambda result: self.assertEquals(result, WritingProtocol.byte))
330
def test_refusedAnonymousClientConnection(self):
332
Check that anonymous connections are refused when certificates are
333
required on the server.
335
onServerLost = defer.Deferred()
336
onClientLost = defer.Deferred()
337
self.loopback(sslverify.OpenSSLCertificateOptions(privateKey=self.sKey,
338
certificate=self.sCert, verify=True,
339
caCerts=[self.sCert], requireCertificate=True),
340
sslverify.OpenSSLCertificateOptions(
341
requireCertificate=False),
342
onServerLost=onServerLost,
343
onClientLost=onClientLost)
345
d = defer.DeferredList([onClientLost, onServerLost],
349
def afterLost(((cSuccess, cResult), (sSuccess, sResult))):
351
self.failIf(cSuccess)
352
self.failIf(sSuccess)
353
# Win32 fails to report the SSL Error, and report a connection lost
354
# instead: there is a race condition so that's not totally
355
# surprising (see ticket #2877 in the tracker)
356
self.assertIsInstance(cResult.value, (SSL.Error, ConnectionLost))
357
self.assertIsInstance(sResult.value, SSL.Error)
359
return d.addCallback(afterLost)
361
def test_failedCertificateVerification(self):
363
Check that connecting with a certificate not accepted by the server CA
366
onServerLost = defer.Deferred()
367
onClientLost = defer.Deferred()
368
self.loopback(sslverify.OpenSSLCertificateOptions(privateKey=self.sKey,
369
certificate=self.sCert, verify=False,
370
requireCertificate=False),
371
sslverify.OpenSSLCertificateOptions(verify=True,
372
requireCertificate=False, caCerts=[self.cCert]),
373
onServerLost=onServerLost,
374
onClientLost=onClientLost)
376
d = defer.DeferredList([onClientLost, onServerLost],
378
def afterLost(((cSuccess, cResult), (sSuccess, sResult))):
380
self.failIf(cSuccess)
381
self.failIf(sSuccess)
383
return d.addCallback(afterLost)
385
def test_successfulCertificateVerification(self):
387
Test a successful connection with client certificate validation on
390
onData = defer.Deferred()
391
self.loopback(sslverify.OpenSSLCertificateOptions(privateKey=self.sKey,
392
certificate=self.sCert, verify=False,
393
requireCertificate=False),
394
sslverify.OpenSSLCertificateOptions(verify=True,
395
requireCertificate=True, caCerts=[self.sCert]),
398
return onData.addCallback(
399
lambda result: self.assertEquals(result, WritingProtocol.byte))
401
def test_successfulSymmetricSelfSignedCertificateVerification(self):
403
Test a successful connection with validation on both server and client
406
onData = defer.Deferred()
407
self.loopback(sslverify.OpenSSLCertificateOptions(privateKey=self.sKey,
408
certificate=self.sCert, verify=True,
409
requireCertificate=True, caCerts=[self.cCert]),
410
sslverify.OpenSSLCertificateOptions(privateKey=self.cKey,
411
certificate=self.cCert, verify=True,
412
requireCertificate=True, caCerts=[self.sCert]),
415
return onData.addCallback(
416
lambda result: self.assertEquals(result, WritingProtocol.byte))
418
def test_verification(self):
420
Check certificates verification building custom certificates data.
422
clientDN = sslverify.DistinguishedName(commonName='client')
423
clientKey = sslverify.KeyPair.generate()
424
clientCertReq = clientKey.certificateRequest(clientDN)
426
serverDN = sslverify.DistinguishedName(commonName='server')
427
serverKey = sslverify.KeyPair.generate()
428
serverCertReq = serverKey.certificateRequest(serverDN)
430
clientSelfCertReq = clientKey.certificateRequest(clientDN)
431
clientSelfCertData = clientKey.signCertificateRequest(
432
clientDN, clientSelfCertReq, lambda dn: True, 132)
433
clientSelfCert = clientKey.newCertificate(clientSelfCertData)
435
serverSelfCertReq = serverKey.certificateRequest(serverDN)
436
serverSelfCertData = serverKey.signCertificateRequest(
437
serverDN, serverSelfCertReq, lambda dn: True, 516)
438
serverSelfCert = serverKey.newCertificate(serverSelfCertData)
440
clientCertData = serverKey.signCertificateRequest(
441
serverDN, clientCertReq, lambda dn: True, 7)
442
clientCert = clientKey.newCertificate(clientCertData)
444
serverCertData = clientKey.signCertificateRequest(
445
clientDN, serverCertReq, lambda dn: True, 42)
446
serverCert = serverKey.newCertificate(serverCertData)
448
onData = defer.Deferred()
450
serverOpts = serverCert.options(serverSelfCert)
451
clientOpts = clientCert.options(clientSelfCert)
453
self.loopback(serverOpts,
457
return onData.addCallback(
458
lambda result: self.assertEquals(result, WritingProtocol.byte))
462
if interfaces.IReactorSSL(reactor, None) is None:
463
OpenSSLOptions.skip = "Reactor does not support SSL, cannot run SSL tests"
467
class _NotSSLTransport:
471
class _MaybeSSLTransport:
475
def get_peer_certificate(self):
478
def get_host_certificate(self):
482
class _ActualSSLTransport:
486
def get_host_certificate(self):
487
return sslverify.Certificate.loadPEM(A_HOST_CERTIFICATE_PEM).original
489
def get_peer_certificate(self):
490
return sslverify.Certificate.loadPEM(A_PEER_CERTIFICATE_PEM).original
493
class Constructors(unittest.TestCase):
494
def test_peerFromNonSSLTransport(self):
496
Verify that peerFromTransport raises an exception if the transport
497
passed is not actually an SSL transport.
499
x = self.assertRaises(CertificateError,
500
sslverify.Certificate.peerFromTransport,
502
self.failUnless(str(x).startswith("non-TLS"))
504
def test_peerFromBlankSSLTransport(self):
506
Verify that peerFromTransport raises an exception if the transport
507
passed is an SSL transport, but doesn't have a peer certificate.
509
x = self.assertRaises(CertificateError,
510
sslverify.Certificate.peerFromTransport,
511
_MaybeSSLTransport())
512
self.failUnless(str(x).startswith("TLS"))
514
def test_hostFromNonSSLTransport(self):
516
Verify that hostFromTransport raises an exception if the transport
517
passed is not actually an SSL transport.
519
x = self.assertRaises(CertificateError,
520
sslverify.Certificate.hostFromTransport,
522
self.failUnless(str(x).startswith("non-TLS"))
524
def test_hostFromBlankSSLTransport(self):
526
Verify that hostFromTransport raises an exception if the transport
527
passed is an SSL transport, but doesn't have a host certificate.
529
x = self.assertRaises(CertificateError,
530
sslverify.Certificate.hostFromTransport,
531
_MaybeSSLTransport())
532
self.failUnless(str(x).startswith("TLS"))
535
def test_hostFromSSLTransport(self):
537
Verify that hostFromTransport successfully creates the correct
538
certificate if passed a valid SSL transport.
541
sslverify.Certificate.hostFromTransport(
542
_ActualSSLTransport()).serialNumber(),
545
def test_peerFromSSLTransport(self):
547
Verify that peerFromTransport successfully creates the correct
548
certificate if passed a valid SSL transport.
551
sslverify.Certificate.peerFromTransport(
552
_ActualSSLTransport()).serialNumber(),
557
if interfaces.IReactorSSL(reactor, None) is None:
558
Constructors.skip = "Reactor does not support SSL, cannot run SSL tests"