~ubuntu-branches/ubuntu/precise/python3.2/precise-proposed

« back to all changes in this revision

Viewing changes to Lib/test/test_ssl.py

  • Committer: Package Import Robot
  • Author(s): Matthias Klose
  • Date: 2012-03-09 18:40:39 UTC
  • mfrom: (30.1.2 sid)
  • Revision ID: package-import@ubuntu.com-20120309184039-j3yk2emxr1plyo21
Tags: 3.2.3~rc1-1
* Python 3.2.3 release candidate 1.
* Update to 20120309 from the 3.2 branch.
* Fix libpython.a symlink. Closes: #660146.
* Build-depend on xauth.
* Run the gdb tests for the debug build only.

Show diffs side-by-side

added added

removed removed

Lines of Context:
51
51
BADCERT = data_file("badcert.pem")
52
52
WRONGCERT = data_file("XXXnonexisting.pem")
53
53
BADKEY = data_file("badkey.pem")
 
54
NOKIACERT = data_file("nokia.pem")
54
55
 
55
56
 
56
57
def handle_error(prefix):
117
118
        p = ssl._ssl._test_decode_cert(CERTFILE)
118
119
        if support.verbose:
119
120
            sys.stdout.write("\n" + pprint.pformat(p) + "\n")
 
121
        self.assertEqual(p['issuer'],
 
122
                         ((('countryName', 'XY'),),
 
123
                          (('localityName', 'Castle Anthrax'),),
 
124
                          (('organizationName', 'Python Software Foundation'),),
 
125
                          (('commonName', 'localhost'),))
 
126
                        )
 
127
        self.assertEqual(p['notAfter'], 'Oct  5 23:01:56 2020 GMT')
 
128
        self.assertEqual(p['notBefore'], 'Oct  8 23:01:56 2010 GMT')
 
129
        self.assertEqual(p['serialNumber'], 'D7C7381919AFC24E')
 
130
        self.assertEqual(p['subject'],
 
131
                         ((('countryName', 'XY'),),
 
132
                          (('localityName', 'Castle Anthrax'),),
 
133
                          (('organizationName', 'Python Software Foundation'),),
 
134
                          (('commonName', 'localhost'),))
 
135
                        )
 
136
        self.assertEqual(p['subjectAltName'], (('DNS', 'localhost'),))
 
137
        # Issue #13034: the subjectAltName in some certificates
 
138
        # (notably projects.developer.nokia.com:443) wasn't parsed
 
139
        p = ssl._ssl._test_decode_cert(NOKIACERT)
 
140
        if support.verbose:
 
141
            sys.stdout.write("\n" + pprint.pformat(p) + "\n")
 
142
        self.assertEqual(p['subjectAltName'],
 
143
                         (('DNS', 'projects.developer.nokia.com'),
 
144
                          ('DNS', 'projects.forum.nokia.com'))
 
145
                        )
120
146
 
121
147
    def test_DER_to_PEM(self):
122
148
        with open(SVN_PYTHON_ORG_ROOT_CERT, 'r') as f:
736
762
                try:
737
763
                    self.sslconn = self.server.context.wrap_socket(
738
764
                        self.sock, server_side=True)
739
 
                except ssl.SSLError:
 
765
                except ssl.SSLError as e:
740
766
                    # XXX Various errors can have happened here, for example
741
767
                    # a mismatching protocol version, an invalid certificate,
742
768
                    # or a low-level bug. This should be made more discriminating.
 
769
                    self.server.conn_errors.append(e)
743
770
                    if self.server.chatty:
744
771
                        handle_error("\n server:  bad connection attempt from " + repr(self.addr) + ":\n")
745
772
                    self.running = False
852
879
            self.port = support.bind_port(self.sock)
853
880
            self.flag = None
854
881
            self.active = False
 
882
            self.conn_errors = []
855
883
            threading.Thread.__init__(self)
856
884
            self.daemon = True
857
885
 
 
886
        def __enter__(self):
 
887
            self.start(threading.Event())
 
888
            self.flag.wait()
 
889
            return self
 
890
 
 
891
        def __exit__(self, *args):
 
892
            self.stop()
 
893
            self.join()
 
894
 
858
895
        def start(self, flag=None):
859
896
            self.flag = flag
860
897
            threading.Thread.start(self)
874
911
                                         + repr(connaddr) + '\n')
875
912
                    handler = self.ConnectionHandler(self, newconn, connaddr)
876
913
                    handler.start()
 
914
                    handler.join()
877
915
                except socket.timeout:
878
916
                    pass
879
917
                except KeyboardInterrupt:
967
1005
        def __str__(self):
968
1006
            return "<%s %s>" % (self.__class__.__name__, self.server)
969
1007
 
 
1008
        def __enter__(self):
 
1009
            self.start(threading.Event())
 
1010
            self.flag.wait()
 
1011
            return self
 
1012
 
 
1013
        def __exit__(self, *args):
 
1014
            if support.verbose:
 
1015
                sys.stdout.write(" cleanup: stopping server.\n")
 
1016
            self.stop()
 
1017
            if support.verbose:
 
1018
                sys.stdout.write(" cleanup: joining server thread.\n")
 
1019
            self.join()
 
1020
            if support.verbose:
 
1021
                sys.stdout.write(" cleanup: successfully joined.\n")
 
1022
 
970
1023
        def start (self, flag=None):
971
1024
            self.flag = flag
972
1025
            threading.Thread.start(self)
994
1047
                                    certreqs=ssl.CERT_REQUIRED,
995
1048
                                    cacerts=CERTFILE, chatty=False,
996
1049
                                    connectionchatty=False)
997
 
        flag = threading.Event()
998
 
        server.start(flag)
999
 
        # wait for it to start
1000
 
        flag.wait()
1001
 
        # try to connect
1002
 
        try:
 
1050
        with server:
1003
1051
            try:
1004
1052
                with socket.socket() as sock:
1005
1053
                    s = ssl.wrap_socket(sock,
1019
1067
                    sys.stdout.write("\IOError is %s\n" % str(x))
1020
1068
            else:
1021
1069
                raise AssertionError("Use of invalid cert should have failed!")
1022
 
        finally:
1023
 
            server.stop()
1024
 
            server.join()
1025
1070
 
1026
1071
    def server_params_test(client_context, server_context, indata=b"FOO\n",
1027
1072
                           chatty=True, connectionchatty=False):
1032
1077
        server = ThreadedEchoServer(context=server_context,
1033
1078
                                    chatty=chatty,
1034
1079
                                    connectionchatty=False)
1035
 
        flag = threading.Event()
1036
 
        server.start(flag)
1037
 
        # wait for it to start
1038
 
        flag.wait()
1039
 
        # try to connect
1040
 
        try:
1041
 
            s = client_context.wrap_socket(socket.socket())
1042
 
            s.connect((HOST, server.port))
1043
 
            for arg in [indata, bytearray(indata), memoryview(indata)]:
1044
 
                if connectionchatty:
1045
 
                    if support.verbose:
1046
 
                        sys.stdout.write(
1047
 
                            " client:  sending %r...\n" % indata)
1048
 
                s.write(arg)
1049
 
                outdata = s.read()
1050
 
                if connectionchatty:
1051
 
                    if support.verbose:
1052
 
                        sys.stdout.write(" client:  read %r\n" % outdata)
1053
 
                if outdata != indata.lower():
1054
 
                    raise AssertionError(
1055
 
                        "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
1056
 
                        % (outdata[:20], len(outdata),
1057
 
                           indata[:20].lower(), len(indata)))
1058
 
            s.write(b"over\n")
1059
 
            if connectionchatty:
1060
 
                if support.verbose:
1061
 
                    sys.stdout.write(" client:  closing connection.\n")
1062
 
            s.close()
1063
 
        finally:
1064
 
            server.stop()
1065
 
            server.join()
 
1080
        with server:
 
1081
            with client_context.wrap_socket(socket.socket()) as s:
 
1082
                s.connect((HOST, server.port))
 
1083
                for arg in [indata, bytearray(indata), memoryview(indata)]:
 
1084
                    if connectionchatty:
 
1085
                        if support.verbose:
 
1086
                            sys.stdout.write(
 
1087
                                " client:  sending %r...\n" % indata)
 
1088
                    s.write(arg)
 
1089
                    outdata = s.read()
 
1090
                    if connectionchatty:
 
1091
                        if support.verbose:
 
1092
                            sys.stdout.write(" client:  read %r\n" % outdata)
 
1093
                    if outdata != indata.lower():
 
1094
                        raise AssertionError(
 
1095
                            "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
 
1096
                            % (outdata[:20], len(outdata),
 
1097
                               indata[:20].lower(), len(indata)))
 
1098
                s.write(b"over\n")
 
1099
                if connectionchatty:
 
1100
                    if support.verbose:
 
1101
                        sys.stdout.write(" client:  closing connection.\n")
 
1102
                s.close()
1066
1103
 
1067
1104
    def try_protocol_combo(server_protocol, client_protocol, expect_success,
1068
1105
                           certsreqs=None, server_options=0, client_options=0):
1131
1168
            context.load_verify_locations(CERTFILE)
1132
1169
            context.load_cert_chain(CERTFILE)
1133
1170
            server = ThreadedEchoServer(context=context, chatty=False)
1134
 
            flag = threading.Event()
1135
 
            server.start(flag)
1136
 
            # wait for it to start
1137
 
            flag.wait()
1138
 
            # try to connect
1139
 
            try:
 
1171
            with server:
1140
1172
                s = context.wrap_socket(socket.socket())
1141
1173
                s.connect((HOST, server.port))
1142
1174
                cert = s.getpeercert()
1159
1191
                after = ssl.cert_time_to_seconds(cert['notAfter'])
1160
1192
                self.assertLess(before, after)
1161
1193
                s.close()
1162
 
            finally:
1163
 
                server.stop()
1164
 
                server.join()
1165
1194
 
1166
1195
        def test_empty_cert(self):
1167
1196
            """Connecting with an empty cert file"""
1289
1318
            try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, True, ssl.CERT_REQUIRED)
1290
1319
            if hasattr(ssl, 'PROTOCOL_SSLv2'):
1291
1320
                try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv2, False)
1292
 
            try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv23, False)
 
1321
            try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv23, False,
 
1322
                               client_options=ssl.OP_NO_SSLv3)
1293
1323
            try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLSv1, False)
1294
1324
            if no_sslv2_implies_sslv3_hello():
1295
1325
                # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
1307
1337
            if hasattr(ssl, 'PROTOCOL_SSLv2'):
1308
1338
                try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv2, False)
1309
1339
            try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv3, False)
1310
 
            try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv23, False)
 
1340
            try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv23, False,
 
1341
                               client_options=ssl.OP_NO_TLSv1)
1311
1342
 
1312
1343
        def test_starttls(self):
1313
1344
            """Switching from clear text to encrypted and back again."""
1318
1349
                                        starttls_server=True,
1319
1350
                                        chatty=True,
1320
1351
                                        connectionchatty=True)
1321
 
            flag = threading.Event()
1322
 
            server.start(flag)
1323
 
            # wait for it to start
1324
 
            flag.wait()
1325
 
            # try to connect
1326
1352
            wrapped = False
1327
 
            try:
 
1353
            with server:
1328
1354
                s = socket.socket()
1329
1355
                s.setblocking(1)
1330
1356
                s.connect((HOST, server.port))
1371
1397
                    conn.close()
1372
1398
                else:
1373
1399
                    s.close()
1374
 
            finally:
1375
 
                server.stop()
1376
 
                server.join()
1377
1400
 
1378
1401
        def test_socketserver(self):
1379
1402
            """Using a SocketServer to create and manage SSL connections."""
1409
1432
 
1410
1433
            indata = b"FOO\n"
1411
1434
            server = AsyncoreEchoServer(CERTFILE)
1412
 
            flag = threading.Event()
1413
 
            server.start(flag)
1414
 
            # wait for it to start
1415
 
            flag.wait()
1416
 
            # try to connect
1417
 
            try:
 
1435
            with server:
1418
1436
                s = ssl.wrap_socket(socket.socket())
1419
1437
                s.connect(('127.0.0.1', server.port))
1420
1438
                if support.verbose:
1435
1453
                s.close()
1436
1454
                if support.verbose:
1437
1455
                    sys.stdout.write(" client:  connection closed.\n")
1438
 
            finally:
1439
 
                if support.verbose:
1440
 
                    sys.stdout.write(" cleanup: stopping server.\n")
1441
 
                server.stop()
1442
 
                if support.verbose:
1443
 
                    sys.stdout.write(" cleanup: joining server thread.\n")
1444
 
                server.join()
1445
 
                if support.verbose:
1446
 
                    sys.stdout.write(" cleanup: successfully joined.\n")
1447
1456
 
1448
1457
        def test_recv_send(self):
1449
1458
            """Test recv(), send() and friends."""
1456
1465
                                        cacerts=CERTFILE,
1457
1466
                                        chatty=True,
1458
1467
                                        connectionchatty=False)
1459
 
            flag = threading.Event()
1460
 
            server.start(flag)
1461
 
            # wait for it to start
1462
 
            flag.wait()
1463
 
            # try to connect
1464
 
            s = ssl.wrap_socket(socket.socket(),
1465
 
                                server_side=False,
1466
 
                                certfile=CERTFILE,
1467
 
                                ca_certs=CERTFILE,
1468
 
                                cert_reqs=ssl.CERT_NONE,
1469
 
                                ssl_version=ssl.PROTOCOL_TLSv1)
1470
 
            s.connect((HOST, server.port))
1471
 
            try:
 
1468
            with server:
 
1469
                s = ssl.wrap_socket(socket.socket(),
 
1470
                                    server_side=False,
 
1471
                                    certfile=CERTFILE,
 
1472
                                    ca_certs=CERTFILE,
 
1473
                                    cert_reqs=ssl.CERT_NONE,
 
1474
                                    ssl_version=ssl.PROTOCOL_TLSv1)
 
1475
                s.connect((HOST, server.port))
1472
1476
                # helper methods for standardising recv* method signatures
1473
1477
                def _recv_into():
1474
1478
                    b = bytearray(b"\0"*100)
1553
1557
                            )
1554
1558
                        # consume data
1555
1559
                        s.read()
1556
 
 
1557
1560
                s.write(b"over\n")
1558
1561
                s.close()
1559
 
            finally:
1560
 
                server.stop()
1561
 
                server.join()
1562
1562
 
1563
1563
        def test_handshake_timeout(self):
1564
1564
            # Issue #5103: SSL handshake must respect the socket timeout
1609
1609
                t.join()
1610
1610
                server.close()
1611
1611
 
 
1612
        def test_default_ciphers(self):
 
1613
            context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
 
1614
            try:
 
1615
                # Force a set of weak ciphers on our client context
 
1616
                context.set_ciphers("DES")
 
1617
            except ssl.SSLError:
 
1618
                self.skipTest("no DES cipher available")
 
1619
            with ThreadedEchoServer(CERTFILE,
 
1620
                                    ssl_version=ssl.PROTOCOL_SSLv23,
 
1621
                                    chatty=False) as server:
 
1622
                with socket.socket() as sock:
 
1623
                    s = context.wrap_socket(sock)
 
1624
                    with self.assertRaises((OSError, ssl.SSLError)):
 
1625
                        s.connect((HOST, server.port))
 
1626
            self.assertIn("no shared cipher", str(server.conn_errors[0]))
 
1627
 
1612
1628
 
1613
1629
def test_main(verbose=False):
1614
1630
    if support.verbose: