~ntt-pf-lab/nova/monkey_patch_notification

« back to all changes in this revision

Viewing changes to vendor/Twisted-10.0.0/twisted/test/test_ssl.py

  • Committer: Jesse Andrews
  • Date: 2010-05-28 06:05:26 UTC
  • Revision ID: git-v1:bf6e6e718cdc7488e2da87b21e258ccc065fe499
initial commit

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (c) 2001-2009 Twisted Matrix Laboratories.
 
2
# See LICENSE for details.
 
3
 
 
4
"""
 
5
Tests for twisted SSL support.
 
6
"""
 
7
 
 
8
from twisted.trial import unittest
 
9
from twisted.internet import protocol, reactor, interfaces, defer
 
10
from twisted.protocols import basic
 
11
from twisted.python import util
 
12
from twisted.python.reflect import getClass, fullyQualifiedName
 
13
from twisted.python.runtime import platform
 
14
from twisted.test.test_tcp import WriteDataTestCase, ProperlyCloseFilesMixin
 
15
 
 
16
import os, errno
 
17
 
 
18
try:
 
19
    from OpenSSL import SSL, crypto
 
20
    from twisted.internet import ssl
 
21
    from twisted.test.ssl_helpers import ClientTLSContext
 
22
except ImportError:
 
23
    def _noSSL():
 
24
        # ugh, make pyflakes happy.
 
25
        global SSL
 
26
        global ssl
 
27
        SSL = ssl = None
 
28
    _noSSL()
 
29
 
 
30
certPath = util.sibpath(__file__, "server.pem")
 
31
 
 
32
 
 
33
 
 
34
class UnintelligentProtocol(basic.LineReceiver):
 
35
    """
 
36
    @ivar deferred: a deferred that will fire at connection lost.
 
37
    @type deferred: L{defer.Deferred}
 
38
 
 
39
    @cvar pretext: text sent before TLS is set up.
 
40
    @type pretext: C{str}
 
41
 
 
42
    @cvar posttext: text sent after TLS is set up.
 
43
    @type posttext: C{str}
 
44
    """
 
45
    pretext = [
 
46
        "first line",
 
47
        "last thing before tls starts",
 
48
        "STARTTLS"]
 
49
 
 
50
    posttext = [
 
51
        "first thing after tls started",
 
52
        "last thing ever"]
 
53
 
 
54
    def __init__(self):
 
55
        self.deferred = defer.Deferred()
 
56
 
 
57
 
 
58
    def connectionMade(self):
 
59
        for l in self.pretext:
 
60
            self.sendLine(l)
 
61
 
 
62
 
 
63
    def lineReceived(self, line):
 
64
        if line == "READY":
 
65
            self.transport.startTLS(ClientTLSContext(), self.factory.client)
 
66
            for l in self.posttext:
 
67
                self.sendLine(l)
 
68
            self.transport.loseConnection()
 
69
 
 
70
 
 
71
    def connectionLost(self, reason):
 
72
        self.deferred.callback(None)
 
73
 
 
74
 
 
75
 
 
76
class LineCollector(basic.LineReceiver):
 
77
    """
 
78
    @ivar deferred: a deferred that will fire at connection lost.
 
79
    @type deferred: L{defer.Deferred}
 
80
 
 
81
    @ivar doTLS: whether the protocol is initiate TLS or not.
 
82
    @type doTLS: C{bool}
 
83
 
 
84
    @ivar fillBuffer: if set to True, it will send lots of data once
 
85
        C{STARTTLS} is received.
 
86
    @type fillBuffer: C{bool}
 
87
    """
 
88
 
 
89
    def __init__(self, doTLS, fillBuffer=False):
 
90
        self.doTLS = doTLS
 
91
        self.fillBuffer = fillBuffer
 
92
        self.deferred = defer.Deferred()
 
93
 
 
94
 
 
95
    def connectionMade(self):
 
96
        self.factory.rawdata = ''
 
97
        self.factory.lines = []
 
98
 
 
99
 
 
100
    def lineReceived(self, line):
 
101
        self.factory.lines.append(line)
 
102
        if line == 'STARTTLS':
 
103
            if self.fillBuffer:
 
104
                for x in range(500):
 
105
                    self.sendLine('X' * 1000)
 
106
            self.sendLine('READY')
 
107
            if self.doTLS:
 
108
                ctx = ServerTLSContext(
 
109
                    privateKeyFileName=certPath,
 
110
                    certificateFileName=certPath,
 
111
                )
 
112
                self.transport.startTLS(ctx, self.factory.server)
 
113
            else:
 
114
                self.setRawMode()
 
115
 
 
116
 
 
117
    def rawDataReceived(self, data):
 
118
        self.factory.rawdata += data
 
119
        self.transport.loseConnection()
 
120
 
 
121
 
 
122
    def connectionLost(self, reason):
 
123
        self.deferred.callback(None)
 
124
 
 
125
 
 
126
 
 
127
class SingleLineServerProtocol(protocol.Protocol):
 
128
    """
 
129
    A protocol that sends a single line of data at C{connectionMade}.
 
130
    """
 
131
 
 
132
    def connectionMade(self):
 
133
        self.transport.write("+OK <some crap>\r\n")
 
134
        self.transport.getPeerCertificate()
 
135
 
 
136
 
 
137
 
 
138
class RecordingClientProtocol(protocol.Protocol):
 
139
    """
 
140
    @ivar deferred: a deferred that will fire with first received content.
 
141
    @type deferred: L{defer.Deferred}
 
142
    """
 
143
 
 
144
    def __init__(self):
 
145
        self.deferred = defer.Deferred()
 
146
 
 
147
 
 
148
    def connectionMade(self):
 
149
        self.transport.getPeerCertificate()
 
150
 
 
151
 
 
152
    def dataReceived(self, data):
 
153
        self.deferred.callback(data)
 
154
 
 
155
 
 
156
 
 
157
class ImmediatelyDisconnectingProtocol(protocol.Protocol):
 
158
    """
 
159
    A protocol that disconnect immediately on connection. It fires the
 
160
    C{connectionDisconnected} deferred of its factory on connetion lost.
 
161
    """
 
162
 
 
163
    def connectionMade(self):
 
164
        self.transport.loseConnection()
 
165
 
 
166
 
 
167
    def connectionLost(self, reason):
 
168
        self.factory.connectionDisconnected.callback(None)
 
169
 
 
170
 
 
171
 
 
172
def generateCertificateObjects(organization, organizationalUnit):
 
173
    """
 
174
    Create a certificate for given C{organization} and C{organizationalUnit}.
 
175
 
 
176
    @return: a tuple of (key, request, certificate) objects.
 
177
    """
 
178
    pkey = crypto.PKey()
 
179
    pkey.generate_key(crypto.TYPE_RSA, 512)
 
180
    req = crypto.X509Req()
 
181
    subject = req.get_subject()
 
182
    subject.O = organization
 
183
    subject.OU = organizationalUnit
 
184
    req.set_pubkey(pkey)
 
185
    req.sign(pkey, "md5")
 
186
 
 
187
    # Here comes the actual certificate
 
188
    cert = crypto.X509()
 
189
    cert.set_serial_number(1)
 
190
    cert.gmtime_adj_notBefore(0)
 
191
    cert.gmtime_adj_notAfter(60) # Testing certificates need not be long lived
 
192
    cert.set_issuer(req.get_subject())
 
193
    cert.set_subject(req.get_subject())
 
194
    cert.set_pubkey(req.get_pubkey())
 
195
    cert.sign(pkey, "md5")
 
196
 
 
197
    return pkey, req, cert
 
198
 
 
199
 
 
200
 
 
201
def generateCertificateFiles(basename, organization, organizationalUnit):
 
202
    """
 
203
    Create certificate files key, req and cert prefixed by C{basename} for
 
204
    given C{organization} and C{organizationalUnit}.
 
205
    """
 
206
    pkey, req, cert = generateCertificateObjects(organization, organizationalUnit)
 
207
 
 
208
    for ext, obj, dumpFunc in [
 
209
        ('key', pkey, crypto.dump_privatekey),
 
210
        ('req', req, crypto.dump_certificate_request),
 
211
        ('cert', cert, crypto.dump_certificate)]:
 
212
        fName = os.extsep.join((basename, ext))
 
213
        fObj = file(fName, 'w')
 
214
        fObj.write(dumpFunc(crypto.FILETYPE_PEM, obj))
 
215
        fObj.close()
 
216
 
 
217
 
 
218
 
 
219
class ContextGeneratingMixin:
 
220
    """
 
221
    Offer methods to create L{ssl.DefaultOpenSSLContextFactory} for both client
 
222
    and server.
 
223
 
 
224
    @ivar clientBase: prefix of client certificate files.
 
225
    @type clientBase: C{str}
 
226
 
 
227
    @ivar serverBase: prefix of server certificate files.
 
228
    @type serverBase: C{str}
 
229
 
 
230
    @ivar clientCtxFactory: a generated context factory to be used in
 
231
        C{reactor.connectSSL}.
 
232
    @type clientCtxFactory: L{ssl.DefaultOpenSSLContextFactory}
 
233
 
 
234
    @ivar serverCtxFactory: a generated context factory to be used in
 
235
        C{reactor.listenSSL}.
 
236
    @type serverCtxFactory: L{ssl.DefaultOpenSSLContextFactory}
 
237
    """
 
238
 
 
239
    def makeContextFactory(self, org, orgUnit, *args, **kwArgs):
 
240
        base = self.mktemp()
 
241
        generateCertificateFiles(base, org, orgUnit)
 
242
        serverCtxFactory = ssl.DefaultOpenSSLContextFactory(
 
243
            os.extsep.join((base, 'key')),
 
244
            os.extsep.join((base, 'cert')),
 
245
            *args, **kwArgs)
 
246
 
 
247
        return base, serverCtxFactory
 
248
 
 
249
 
 
250
    def setupServerAndClient(self, clientArgs, clientKwArgs, serverArgs,
 
251
                             serverKwArgs):
 
252
        self.clientBase, self.clientCtxFactory = self.makeContextFactory(
 
253
            *clientArgs, **clientKwArgs)
 
254
        self.serverBase, self.serverCtxFactory = self.makeContextFactory(
 
255
            *serverArgs, **serverKwArgs)
 
256
 
 
257
 
 
258
 
 
259
if SSL is not None:
 
260
    class ServerTLSContext(ssl.DefaultOpenSSLContextFactory):
 
261
        """
 
262
        A context factory with a default method set to L{SSL.TLSv1_METHOD}.
 
263
        """
 
264
        isClient = False
 
265
 
 
266
        def __init__(self, *args, **kw):
 
267
            kw['sslmethod'] = SSL.TLSv1_METHOD
 
268
            ssl.DefaultOpenSSLContextFactory.__init__(self, *args, **kw)
 
269
 
 
270
 
 
271
 
 
272
class StolenTCPTestCase(ProperlyCloseFilesMixin, unittest.TestCase):
 
273
    """
 
274
    For SSL transports, test many of the same things which are tested for
 
275
    TCP transports.
 
276
    """
 
277
 
 
278
    def createServer(self, address, portNumber, factory):
 
279
        """
 
280
        Create an SSL server with a certificate using L{IReactorSSL.listenSSL}.
 
281
        """
 
282
        cert = ssl.PrivateCertificate.loadPEM(file(certPath).read())
 
283
        contextFactory = cert.options()
 
284
        return reactor.listenSSL(
 
285
            portNumber, factory, contextFactory, interface=address)
 
286
 
 
287
 
 
288
    def connectClient(self, address, portNumber, clientCreator):
 
289
        """
 
290
        Create an SSL client using L{IReactorSSL.connectSSL}.
 
291
        """
 
292
        contextFactory = ssl.CertificateOptions()
 
293
        return clientCreator.connectSSL(address, portNumber, contextFactory)
 
294
 
 
295
 
 
296
    def getHandleExceptionType(self):
 
297
        """
 
298
        Return L{SSL.Error} as the expected error type which will be raised by
 
299
        a write to the L{OpenSSL.SSL.Connection} object after it has been
 
300
        closed.
 
301
        """
 
302
        return SSL.Error
 
303
 
 
304
 
 
305
    _iocp = 'twisted.internet.iocpreactor.reactor.IOCPReactor'
 
306
 
 
307
    def getHandleErrorCode(self):
 
308
        """
 
309
        Return the argument L{SSL.Error} will be constructed with for this
 
310
        case.  This is basically just a random OpenSSL implementation detail.
 
311
        It would be better if this test worked in a way which did not require
 
312
        this.
 
313
        """
 
314
        # Windows 2000 SP 4 and Windows XP SP 2 give back WSAENOTSOCK for
 
315
        # SSL.Connection.write for some reason.  The twisted.protocols.tls
 
316
        # implementation of IReactorSSL doesn't suffer from this imprecation,
 
317
        # though, since it is isolated from the Windows I/O layer (I suppose?).
 
318
 
 
319
        # If test_properlyCloseFiles waited for the SSL handshake to complete
 
320
        # and performed an orderly shutdown, then this would probably be a
 
321
        # little less weird: writing to a shutdown SSL connection has a more
 
322
        # well-defined failure mode (or at least it should).
 
323
        name = fullyQualifiedName(getClass(reactor))
 
324
        if platform.getType() == 'win32' and name != self._iocp:
 
325
            return errno.WSAENOTSOCK
 
326
        # This is terribly implementation-specific.
 
327
        return [('SSL routines', 'SSL_write', 'protocol is shutdown')]
 
328
 
 
329
 
 
330
 
 
331
class TLSTestCase(unittest.TestCase):
 
332
    """
 
333
    Tests for startTLS support.
 
334
 
 
335
    @ivar fillBuffer: forwarded to L{LineCollector.fillBuffer}
 
336
    @type fillBuffer: C{bool}
 
337
    """
 
338
    fillBuffer = False
 
339
 
 
340
    clientProto = None
 
341
    serverProto = None
 
342
 
 
343
 
 
344
    def tearDown(self):
 
345
        if self.clientProto.transport is not None:
 
346
            self.clientProto.transport.loseConnection()
 
347
        if self.serverProto.transport is not None:
 
348
            self.serverProto.transport.loseConnection()
 
349
 
 
350
 
 
351
    def _runTest(self, clientProto, serverProto, clientIsServer=False):
 
352
        """
 
353
        Helper method to run TLS tests.
 
354
 
 
355
        @param clientProto: protocol instance attached to the client
 
356
            connection.
 
357
        @param serverProto: protocol instance attached to the server
 
358
            connection.
 
359
        @param clientIsServer: flag indicated if client should initiate
 
360
            startTLS instead of server.
 
361
 
 
362
        @return: a L{defer.Deferred} that will fire when both connections are
 
363
            lost.
 
364
        """
 
365
        self.clientProto = clientProto
 
366
        cf = self.clientFactory = protocol.ClientFactory()
 
367
        cf.protocol = lambda: clientProto
 
368
        if clientIsServer:
 
369
            cf.server = False
 
370
        else:
 
371
            cf.client = True
 
372
 
 
373
        self.serverProto = serverProto
 
374
        sf = self.serverFactory = protocol.ServerFactory()
 
375
        sf.protocol = lambda: serverProto
 
376
        if clientIsServer:
 
377
            sf.client = False
 
378
        else:
 
379
            sf.server = True
 
380
 
 
381
        port = reactor.listenTCP(0, sf, interface="127.0.0.1")
 
382
        self.addCleanup(port.stopListening)
 
383
 
 
384
        reactor.connectTCP('127.0.0.1', port.getHost().port, cf)
 
385
 
 
386
        return defer.gatherResults([clientProto.deferred, serverProto.deferred])
 
387
 
 
388
 
 
389
    def test_TLS(self):
 
390
        """
 
391
        Test for server and client startTLS: client should received data both
 
392
        before and after the startTLS.
 
393
        """
 
394
        def check(ignore):
 
395
            self.assertEquals(
 
396
                self.serverFactory.lines,
 
397
                UnintelligentProtocol.pretext + UnintelligentProtocol.posttext
 
398
            )
 
399
        d = self._runTest(UnintelligentProtocol(),
 
400
                          LineCollector(True, self.fillBuffer))
 
401
        return d.addCallback(check)
 
402
 
 
403
 
 
404
    def test_unTLS(self):
 
405
        """
 
406
        Test for server startTLS not followed by a startTLS in client: the data
 
407
        received after server startTLS should be received as raw.
 
408
        """
 
409
        def check(ignored):
 
410
            self.assertEquals(
 
411
                self.serverFactory.lines,
 
412
                UnintelligentProtocol.pretext
 
413
            )
 
414
            self.failUnless(self.serverFactory.rawdata,
 
415
                            "No encrypted bytes received")
 
416
        d = self._runTest(UnintelligentProtocol(),
 
417
                          LineCollector(False, self.fillBuffer))
 
418
        return d.addCallback(check)
 
419
 
 
420
 
 
421
    def test_backwardsTLS(self):
 
422
        """
 
423
        Test startTLS first initiated by client.
 
424
        """
 
425
        def check(ignored):
 
426
            self.assertEquals(
 
427
                self.clientFactory.lines,
 
428
                UnintelligentProtocol.pretext + UnintelligentProtocol.posttext
 
429
            )
 
430
        d = self._runTest(LineCollector(True, self.fillBuffer),
 
431
                          UnintelligentProtocol(), True)
 
432
        return d.addCallback(check)
 
433
 
 
434
 
 
435
 
 
436
class SpammyTLSTestCase(TLSTestCase):
 
437
    """
 
438
    Test TLS features with bytes sitting in the out buffer.
 
439
    """
 
440
    fillBuffer = True
 
441
 
 
442
 
 
443
 
 
444
class BufferingTestCase(unittest.TestCase):
 
445
    serverProto = None
 
446
    clientProto = None
 
447
 
 
448
 
 
449
    def tearDown(self):
 
450
        if self.serverProto.transport is not None:
 
451
            self.serverProto.transport.loseConnection()
 
452
        if self.clientProto.transport is not None:
 
453
            self.clientProto.transport.loseConnection()
 
454
 
 
455
 
 
456
    def test_openSSLBuffering(self):
 
457
        serverProto = self.serverProto = SingleLineServerProtocol()
 
458
        clientProto = self.clientProto = RecordingClientProtocol()
 
459
 
 
460
        server = protocol.ServerFactory()
 
461
        client = self.client = protocol.ClientFactory()
 
462
 
 
463
        server.protocol = lambda: serverProto
 
464
        client.protocol = lambda: clientProto
 
465
 
 
466
        sCTX = ssl.DefaultOpenSSLContextFactory(certPath, certPath)
 
467
        cCTX = ssl.ClientContextFactory()
 
468
 
 
469
        port = reactor.listenSSL(0, server, sCTX, interface='127.0.0.1')
 
470
        self.addCleanup(port.stopListening)
 
471
 
 
472
        reactor.connectSSL('127.0.0.1', port.getHost().port, client, cCTX)
 
473
 
 
474
        return clientProto.deferred.addCallback(
 
475
            self.assertEquals, "+OK <some crap>\r\n")
 
476
 
 
477
 
 
478
 
 
479
class ConnectionLostTestCase(unittest.TestCase, ContextGeneratingMixin):
 
480
 
 
481
    def testImmediateDisconnect(self):
 
482
        org = "twisted.test.test_ssl"
 
483
        self.setupServerAndClient(
 
484
            (org, org + ", client"), {},
 
485
            (org, org + ", server"), {})
 
486
 
 
487
        # Set up a server, connect to it with a client, which should work since our verifiers
 
488
        # allow anything, then disconnect.
 
489
        serverProtocolFactory = protocol.ServerFactory()
 
490
        serverProtocolFactory.protocol = protocol.Protocol
 
491
        self.serverPort = serverPort = reactor.listenSSL(0,
 
492
            serverProtocolFactory, self.serverCtxFactory)
 
493
 
 
494
        clientProtocolFactory = protocol.ClientFactory()
 
495
        clientProtocolFactory.protocol = ImmediatelyDisconnectingProtocol
 
496
        clientProtocolFactory.connectionDisconnected = defer.Deferred()
 
497
        clientConnector = reactor.connectSSL('127.0.0.1',
 
498
            serverPort.getHost().port, clientProtocolFactory, self.clientCtxFactory)
 
499
 
 
500
        return clientProtocolFactory.connectionDisconnected.addCallback(
 
501
            lambda ignoredResult: self.serverPort.stopListening())
 
502
 
 
503
 
 
504
    def testFailedVerify(self):
 
505
        org = "twisted.test.test_ssl"
 
506
        self.setupServerAndClient(
 
507
            (org, org + ", client"), {},
 
508
            (org, org + ", server"), {})
 
509
 
 
510
        def verify(*a):
 
511
            return False
 
512
        self.clientCtxFactory.getContext().set_verify(SSL.VERIFY_PEER, verify)
 
513
 
 
514
        serverConnLost = defer.Deferred()
 
515
        serverProtocol = protocol.Protocol()
 
516
        serverProtocol.connectionLost = serverConnLost.callback
 
517
        serverProtocolFactory = protocol.ServerFactory()
 
518
        serverProtocolFactory.protocol = lambda: serverProtocol
 
519
        self.serverPort = serverPort = reactor.listenSSL(0,
 
520
            serverProtocolFactory, self.serverCtxFactory)
 
521
 
 
522
        clientConnLost = defer.Deferred()
 
523
        clientProtocol = protocol.Protocol()
 
524
        clientProtocol.connectionLost = clientConnLost.callback
 
525
        clientProtocolFactory = protocol.ClientFactory()
 
526
        clientProtocolFactory.protocol = lambda: clientProtocol
 
527
        clientConnector = reactor.connectSSL('127.0.0.1',
 
528
            serverPort.getHost().port, clientProtocolFactory, self.clientCtxFactory)
 
529
 
 
530
        dl = defer.DeferredList([serverConnLost, clientConnLost], consumeErrors=True)
 
531
        return dl.addCallback(self._cbLostConns)
 
532
 
 
533
 
 
534
    def _cbLostConns(self, results):
 
535
        (sSuccess, sResult), (cSuccess, cResult) = results
 
536
 
 
537
        self.failIf(sSuccess)
 
538
        self.failIf(cSuccess)
 
539
 
 
540
        acceptableErrors = [SSL.Error]
 
541
 
 
542
        # Rather than getting a verification failure on Windows, we are getting
 
543
        # a connection failure.  Without something like sslverify proxying
 
544
        # in-between we can't fix up the platform's errors, so let's just
 
545
        # specifically say it is only OK in this one case to keep the tests
 
546
        # passing.  Normally we'd like to be as strict as possible here, so
 
547
        # we're not going to allow this to report errors incorrectly on any
 
548
        # other platforms.
 
549
 
 
550
        if platform.isWindows():
 
551
            from twisted.internet.error import ConnectionLost
 
552
            acceptableErrors.append(ConnectionLost)
 
553
 
 
554
        sResult.trap(*acceptableErrors)
 
555
        cResult.trap(*acceptableErrors)
 
556
 
 
557
        return self.serverPort.stopListening()
 
558
 
 
559
 
 
560
 
 
561
class FakeContext:
 
562
    """
 
563
    L{OpenSSL.SSL.Context} double which can more easily be inspected.
 
564
    """
 
565
    def __init__(self, method):
 
566
        self._method = method
 
567
        self._options = 0
 
568
 
 
569
 
 
570
    def set_options(self, options):
 
571
        self._options |= options
 
572
 
 
573
 
 
574
    def use_certificate_file(self, fileName):
 
575
        pass
 
576
 
 
577
 
 
578
    def use_privatekey_file(self, fileName):
 
579
        pass
 
580
 
 
581
 
 
582
 
 
583
class DefaultOpenSSLContextFactoryTests(unittest.TestCase):
 
584
    """
 
585
    Tests for L{ssl.DefaultOpenSSLContextFactory}.
 
586
    """
 
587
    def setUp(self):
 
588
        # pyOpenSSL Context objects aren't introspectable enough.  Pass in
 
589
        # an alternate context factory so we can inspect what is done to it.
 
590
        self.contextFactory = ssl.DefaultOpenSSLContextFactory(
 
591
            certPath, certPath, _contextFactory=FakeContext)
 
592
        self.context = self.contextFactory.getContext()
 
593
 
 
594
 
 
595
    def test_method(self):
 
596
        """
 
597
        L{ssl.DefaultOpenSSLContextFactory.getContext} returns an SSL context
 
598
        which can use SSLv3 or TLSv1 but not SSLv2.
 
599
        """
 
600
        # SSLv23_METHOD allows SSLv2, SSLv3, or TLSv1
 
601
        self.assertEqual(self.context._method, SSL.SSLv23_METHOD)
 
602
 
 
603
        # And OP_NO_SSLv2 disables the SSLv2 support.
 
604
        self.assertTrue(self.context._options & SSL.OP_NO_SSLv2)
 
605
 
 
606
        # Make sure SSLv3 and TLSv1 aren't disabled though.
 
607
        self.assertFalse(self.context._options & SSL.OP_NO_SSLv3)
 
608
        self.assertFalse(self.context._options & SSL.OP_NO_TLSv1)
 
609
 
 
610
 
 
611
    def test_missingCertificateFile(self):
 
612
        """
 
613
        Instantiating L{ssl.DefaultOpenSSLContextFactory} with a certificate
 
614
        filename which does not identify an existing file results in the
 
615
        initializer raising L{OpenSSL.SSL.Error}.
 
616
        """
 
617
        self.assertRaises(
 
618
            SSL.Error,
 
619
            ssl.DefaultOpenSSLContextFactory, certPath, self.mktemp())
 
620
 
 
621
 
 
622
    def test_missingPrivateKeyFile(self):
 
623
        """
 
624
        Instantiating L{ssl.DefaultOpenSSLContextFactory} with a private key
 
625
        filename which does not identify an existing file results in the
 
626
        initializer raising L{OpenSSL.SSL.Error}.
 
627
        """
 
628
        self.assertRaises(
 
629
            SSL.Error,
 
630
            ssl.DefaultOpenSSLContextFactory, self.mktemp(), certPath)
 
631
 
 
632
 
 
633
 
 
634
class ClientContextFactoryTests(unittest.TestCase):
 
635
    """
 
636
    Tests for L{ssl.ClientContextFactory}.
 
637
    """
 
638
    def setUp(self):
 
639
        self.contextFactory = ssl.ClientContextFactory()
 
640
        self.contextFactory._contextFactory = FakeContext
 
641
        self.context = self.contextFactory.getContext()
 
642
 
 
643
 
 
644
    def test_method(self):
 
645
        """
 
646
        L{ssl.ClientContextFactory.getContext} returns a context which can use
 
647
        SSLv3 or TLSv1 but not SSLv2.
 
648
        """
 
649
        self.assertEqual(self.context._method, SSL.SSLv23_METHOD)
 
650
        self.assertTrue(self.context._options & SSL.OP_NO_SSLv2)
 
651
        self.assertFalse(self.context._options & SSL.OP_NO_SSLv3)
 
652
        self.assertFalse(self.context._options & SSL.OP_NO_TLSv1)
 
653
 
 
654
 
 
655
 
 
656
if interfaces.IReactorSSL(reactor, None) is None:
 
657
    for tCase in [StolenTCPTestCase, TLSTestCase, SpammyTLSTestCase,
 
658
                  BufferingTestCase, ConnectionLostTestCase,
 
659
                  DefaultOpenSSLContextFactoryTests,
 
660
                  ClientContextFactoryTests]:
 
661
        tCase.skip = "Reactor does not support SSL, cannot run SSL tests"
 
662
 
 
663
# Otherwise trial will run this test here
 
664
del WriteDataTestCase