51
51
BADCERT = data_file("badcert.pem")
52
52
WRONGCERT = data_file("XXXnonexisting.pem")
53
53
BADKEY = data_file("badkey.pem")
54
NOKIACERT = data_file("nokia.pem")
56
57
def handle_error(prefix):
117
118
p = ssl._ssl._test_decode_cert(CERTFILE)
118
119
if support.verbose:
119
120
sys.stdout.write("\n" + pprint.pformat(p) + "\n")
121
self.assertEqual(p['issuer'],
122
((('countryName', 'XY'),),
123
(('localityName', 'Castle Anthrax'),),
124
(('organizationName', 'Python Software Foundation'),),
125
(('commonName', 'localhost'),))
127
self.assertEqual(p['notAfter'], 'Oct 5 23:01:56 2020 GMT')
128
self.assertEqual(p['notBefore'], 'Oct 8 23:01:56 2010 GMT')
129
self.assertEqual(p['serialNumber'], 'D7C7381919AFC24E')
130
self.assertEqual(p['subject'],
131
((('countryName', 'XY'),),
132
(('localityName', 'Castle Anthrax'),),
133
(('organizationName', 'Python Software Foundation'),),
134
(('commonName', 'localhost'),))
136
self.assertEqual(p['subjectAltName'], (('DNS', 'localhost'),))
137
# Issue #13034: the subjectAltName in some certificates
138
# (notably projects.developer.nokia.com:443) wasn't parsed
139
p = ssl._ssl._test_decode_cert(NOKIACERT)
141
sys.stdout.write("\n" + pprint.pformat(p) + "\n")
142
self.assertEqual(p['subjectAltName'],
143
(('DNS', 'projects.developer.nokia.com'),
144
('DNS', 'projects.forum.nokia.com'))
121
147
def test_DER_to_PEM(self):
122
148
with open(SVN_PYTHON_ORG_ROOT_CERT, 'r') as f:
737
763
self.sslconn = self.server.context.wrap_socket(
738
764
self.sock, server_side=True)
765
except ssl.SSLError as e:
740
766
# XXX Various errors can have happened here, for example
741
767
# a mismatching protocol version, an invalid certificate,
742
768
# or a low-level bug. This should be made more discriminating.
769
self.server.conn_errors.append(e)
743
770
if self.server.chatty:
744
771
handle_error("\n server: bad connection attempt from " + repr(self.addr) + ":\n")
745
772
self.running = False
852
879
self.port = support.bind_port(self.sock)
854
881
self.active = False
882
self.conn_errors = []
855
883
threading.Thread.__init__(self)
856
884
self.daemon = True
887
self.start(threading.Event())
891
def __exit__(self, *args):
858
895
def start(self, flag=None):
860
897
threading.Thread.start(self)
967
1005
def __str__(self):
968
1006
return "<%s %s>" % (self.__class__.__name__, self.server)
1008
def __enter__(self):
1009
self.start(threading.Event())
1013
def __exit__(self, *args):
1015
sys.stdout.write(" cleanup: stopping server.\n")
1018
sys.stdout.write(" cleanup: joining server thread.\n")
1021
sys.stdout.write(" cleanup: successfully joined.\n")
970
1023
def start (self, flag=None):
971
1024
self.flag = flag
972
1025
threading.Thread.start(self)
994
1047
certreqs=ssl.CERT_REQUIRED,
995
1048
cacerts=CERTFILE, chatty=False,
996
1049
connectionchatty=False)
997
flag = threading.Event()
999
# wait for it to start
1004
1052
with socket.socket() as sock:
1005
1053
s = ssl.wrap_socket(sock,
1019
1067
sys.stdout.write("\IOError is %s\n" % str(x))
1021
1069
raise AssertionError("Use of invalid cert should have failed!")
1026
1071
def server_params_test(client_context, server_context, indata=b"FOO\n",
1027
1072
chatty=True, connectionchatty=False):
1032
1077
server = ThreadedEchoServer(context=server_context,
1034
1079
connectionchatty=False)
1035
flag = threading.Event()
1037
# wait for it to start
1041
s = client_context.wrap_socket(socket.socket())
1042
s.connect((HOST, server.port))
1043
for arg in [indata, bytearray(indata), memoryview(indata)]:
1044
if connectionchatty:
1047
" client: sending %r...\n" % indata)
1050
if connectionchatty:
1052
sys.stdout.write(" client: read %r\n" % outdata)
1053
if outdata != indata.lower():
1054
raise AssertionError(
1055
"bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
1056
% (outdata[:20], len(outdata),
1057
indata[:20].lower(), len(indata)))
1059
if connectionchatty:
1061
sys.stdout.write(" client: closing connection.\n")
1081
with client_context.wrap_socket(socket.socket()) as s:
1082
s.connect((HOST, server.port))
1083
for arg in [indata, bytearray(indata), memoryview(indata)]:
1084
if connectionchatty:
1087
" client: sending %r...\n" % indata)
1090
if connectionchatty:
1092
sys.stdout.write(" client: read %r\n" % outdata)
1093
if outdata != indata.lower():
1094
raise AssertionError(
1095
"bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
1096
% (outdata[:20], len(outdata),
1097
indata[:20].lower(), len(indata)))
1099
if connectionchatty:
1101
sys.stdout.write(" client: closing connection.\n")
1067
1104
def try_protocol_combo(server_protocol, client_protocol, expect_success,
1068
1105
certsreqs=None, server_options=0, client_options=0):
1131
1168
context.load_verify_locations(CERTFILE)
1132
1169
context.load_cert_chain(CERTFILE)
1133
1170
server = ThreadedEchoServer(context=context, chatty=False)
1134
flag = threading.Event()
1136
# wait for it to start
1140
1172
s = context.wrap_socket(socket.socket())
1141
1173
s.connect((HOST, server.port))
1142
1174
cert = s.getpeercert()
1289
1318
try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, True, ssl.CERT_REQUIRED)
1290
1319
if hasattr(ssl, 'PROTOCOL_SSLv2'):
1291
1320
try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv2, False)
1292
try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv23, False)
1321
try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv23, False,
1322
client_options=ssl.OP_NO_SSLv3)
1293
1323
try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLSv1, False)
1294
1324
if no_sslv2_implies_sslv3_hello():
1295
1325
# No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
1307
1337
if hasattr(ssl, 'PROTOCOL_SSLv2'):
1308
1338
try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv2, False)
1309
1339
try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv3, False)
1310
try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv23, False)
1340
try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv23, False,
1341
client_options=ssl.OP_NO_TLSv1)
1312
1343
def test_starttls(self):
1313
1344
"""Switching from clear text to encrypted and back again."""
1436
1454
if support.verbose:
1437
1455
sys.stdout.write(" client: connection closed.\n")
1440
sys.stdout.write(" cleanup: stopping server.\n")
1443
sys.stdout.write(" cleanup: joining server thread.\n")
1446
sys.stdout.write(" cleanup: successfully joined.\n")
1448
1457
def test_recv_send(self):
1449
1458
"""Test recv(), send() and friends."""
1456
1465
cacerts=CERTFILE,
1458
1467
connectionchatty=False)
1459
flag = threading.Event()
1461
# wait for it to start
1464
s = ssl.wrap_socket(socket.socket(),
1468
cert_reqs=ssl.CERT_NONE,
1469
ssl_version=ssl.PROTOCOL_TLSv1)
1470
s.connect((HOST, server.port))
1469
s = ssl.wrap_socket(socket.socket(),
1473
cert_reqs=ssl.CERT_NONE,
1474
ssl_version=ssl.PROTOCOL_TLSv1)
1475
s.connect((HOST, server.port))
1472
1476
# helper methods for standardising recv* method signatures
1473
1477
def _recv_into():
1474
1478
b = bytearray(b"\0"*100)
1612
def test_default_ciphers(self):
1613
context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1615
# Force a set of weak ciphers on our client context
1616
context.set_ciphers("DES")
1617
except ssl.SSLError:
1618
self.skipTest("no DES cipher available")
1619
with ThreadedEchoServer(CERTFILE,
1620
ssl_version=ssl.PROTOCOL_SSLv23,
1621
chatty=False) as server:
1622
with socket.socket() as sock:
1623
s = context.wrap_socket(sock)
1624
with self.assertRaises((OSError, ssl.SSLError)):
1625
s.connect((HOST, server.port))
1626
self.assertIn("no shared cipher", str(server.conn_errors[0]))
1613
1629
def test_main(verbose=False):
1614
1630
if support.verbose: