1
# Copyright (c) 2008 Twisted Matrix Laboratories.
2
# See LICENSE for details.
5
Tests for implementations of L{ITLSTransport}.
10
from twisted.internet.test.reactormixins import ReactorBuilder
11
from twisted.internet.protocol import ServerFactory, ClientFactory, Protocol
12
from twisted.internet.interfaces import ITLSTransport
13
from twisted.internet.defer import Deferred, DeferredList
14
from twisted.internet.error import ConnectionClosed
15
from twisted.trial.unittest import SkipTest
16
from twisted.python.runtime import platform
19
from OpenSSL.crypto import FILETYPE_PEM
23
from twisted.internet.ssl import PrivateCertificate, KeyPair
24
from twisted.internet.ssl import ClientContextFactory
28
class SSLClientTestsMixin(ReactorBuilder):
30
Mixin defining tests relating to L{ITLSTransport}.
32
if FILETYPE_PEM is None:
33
skip = "OpenSSL is unavailable"
35
if platform.isWindows():
37
"For some reason, these reactors don't deal with SSL "
38
"disconnection correctly on Windows. See #3371.")
40
"twisted.internet.glib2reactor.Glib2Reactor": msg,
41
"twisted.internet.gtk2reactor.Gtk2Reactor": msg}
45
"-----BEGIN CERTIFICATE-----\n"
46
"MIIDBjCCAm+gAwIBAgIBATANBgkqhkiG9w0BAQQFADB7MQswCQYDVQQGEwJTRzER\n"
47
"MA8GA1UEChMITTJDcnlwdG8xFDASBgNVBAsTC00yQ3J5cHRvIENBMSQwIgYDVQQD\n"
48
"ExtNMkNyeXB0byBDZXJ0aWZpY2F0ZSBNYXN0ZXIxHTAbBgkqhkiG9w0BCQEWDm5n\n"
49
"cHNAcG9zdDEuY29tMB4XDTAwMDkxMDA5NTEzMFoXDTAyMDkxMDA5NTEzMFowUzEL\n"
50
"MAkGA1UEBhMCU0cxETAPBgNVBAoTCE0yQ3J5cHRvMRIwEAYDVQQDEwlsb2NhbGhv\n"
51
"c3QxHTAbBgkqhkiG9w0BCQEWDm5ncHNAcG9zdDEuY29tMFwwDQYJKoZIhvcNAQEB\n"
52
"BQADSwAwSAJBAKy+e3dulvXzV7zoTZWc5TzgApr8DmeQHTYC8ydfzH7EECe4R1Xh\n"
53
"5kwIzOuuFfn178FBiS84gngaNcrFi0Z5fAkCAwEAAaOCAQQwggEAMAkGA1UdEwQC\n"
54
"MAAwLAYJYIZIAYb4QgENBB8WHU9wZW5TU0wgR2VuZXJhdGVkIENlcnRpZmljYXRl\n"
55
"MB0GA1UdDgQWBBTPhIKSvnsmYsBVNWjj0m3M2z0qVTCBpQYDVR0jBIGdMIGagBT7\n"
56
"hyNp65w6kxXlxb8pUU/+7Sg4AaF/pH0wezELMAkGA1UEBhMCU0cxETAPBgNVBAoT\n"
57
"CE0yQ3J5cHRvMRQwEgYDVQQLEwtNMkNyeXB0byBDQTEkMCIGA1UEAxMbTTJDcnlw\n"
58
"dG8gQ2VydGlmaWNhdGUgTWFzdGVyMR0wGwYJKoZIhvcNAQkBFg5uZ3BzQHBvc3Qx\n"
59
"LmNvbYIBADANBgkqhkiG9w0BAQQFAAOBgQA7/CqT6PoHycTdhEStWNZde7M/2Yc6\n"
60
"BoJuVwnW8YxGO8Sn6UJ4FeffZNcYZddSDKosw8LtPOeWoK3JINjAk5jiPQ2cww++\n"
61
"7QGG/g5NDjxFZNDJP1dGiLAxPW6JXwov4v0FmdzfLOZ01jDcgQQZqEpYlgpuI5JE\n"
63
"-----END CERTIFICATE-----\n")
66
"-----BEGIN RSA PRIVATE KEY-----\n"
67
"MIIBPAIBAAJBAKy+e3dulvXzV7zoTZWc5TzgApr8DmeQHTYC8ydfzH7EECe4R1Xh\n"
68
"5kwIzOuuFfn178FBiS84gngaNcrFi0Z5fAkCAwEAAQJBAIqm/bz4NA1H++Vx5Ewx\n"
69
"OcKp3w19QSaZAwlGRtsUxrP7436QjnREM3Bm8ygU11BjkPVmtrKm6AayQfCHqJoT\n"
70
"ZIECIQDW0BoMoL0HOYM/mrTLhaykYAVqgIeJsPjvkEhTFXWBuQIhAM3deFAvWNu4\n"
71
"nklUQ37XsCT2c9tmNt1LAT+slG2JOTTRAiAuXDtC/m3NYVwyHfFm+zKHRzHkClk2\n"
72
"HjubeEgjpj32AQIhAJqMGTaZVOwevTXvvHwNEH+vRWsAYU/gbx+OQB+7VOcBAiEA\n"
73
"oolb6NMg/R3enNPvS1O4UU1H8wpaF77L4yiSWlE0p4w=\n"
74
"-----END RSA PRIVATE KEY-----\n")
77
def getServerContext(self):
79
Return a new SSL context suitable for use in a test server.
81
cert = PrivateCertificate.load(
82
self._certificateText,
83
KeyPair.load(self._privateKeyText, FILETYPE_PEM),
88
def test_disconnectAfterWriteAfterStartTLS(self):
90
L{ITCPTransport.loseConnection} ends a connection which was set up with
91
L{ITLSTransport.startTLS} and which has recently been written to. This
92
is intended to verify that a socket send error masked by the TLS
93
implementation doesn't prevent the connection from being reported as
96
class ShortProtocol(Protocol):
97
def connectionMade(self):
98
if not ITLSTransport.providedBy(self.transport):
99
# Functionality isn't available to be tested.
100
finished = self.factory.finished
101
self.factory.finished = None
102
finished.errback(SkipTest("No ITLSTransport support"))
105
# Switch the transport to TLS.
106
self.transport.startTLS(self.factory.context)
107
# Force TLS to really get negotiated. If nobody talks, nothing
109
self.transport.write("x")
111
def dataReceived(self, data):
112
# Stuff some bytes into the socket. This mostly has the effect
113
# of causing the next write to fail with ENOTCONN or EPIPE.
114
# With the pyOpenSSL implementation of ITLSTransport, the error
115
# is swallowed outside of the control of Twisted.
116
self.transport.write("y")
117
# Now close the connection, which requires a TLS close alert to
119
self.transport.loseConnection()
121
def connectionLost(self, reason):
122
# This is the success case. The client and the server want to
124
finished = self.factory.finished
125
if finished is not None:
126
self.factory.finished = None
127
finished.callback(reason)
129
serverFactory = ServerFactory()
130
serverFactory.finished = Deferred()
131
serverFactory.protocol = ShortProtocol
132
serverFactory.context = self.getServerContext()
134
clientFactory = ClientFactory()
135
clientFactory.finished = Deferred()
136
clientFactory.protocol = ShortProtocol
137
clientFactory.context = ClientContextFactory()
138
clientFactory.context.method = serverFactory.context.method
140
lostConnectionResults = []
141
finished = DeferredList(
142
[serverFactory.finished, clientFactory.finished],
144
def cbFinished(results):
145
lostConnectionResults.extend([results[0][1], results[1][1]])
146
finished.addCallback(cbFinished)
148
reactor = self.buildReactor()
150
port = reactor.listenTCP(0, serverFactory, interface='127.0.0.1')
151
self.addCleanup(port.stopListening)
153
connector = reactor.connectTCP(
154
port.getHost().host, port.getHost().port, clientFactory)
155
self.addCleanup(connector.disconnect)
157
finished.addCallback(lambda ign: reactor.stop())
158
self.runReactor(reactor)
159
lostConnectionResults[0].trap(ConnectionClosed)
160
lostConnectionResults[1].trap(ConnectionClosed)
163
globals().update(SSLClientTestsMixin.makeTestCaseClasses())