~pythonregexp2.7/python/issue2636-18

« back to all changes in this revision

Viewing changes to Lib/test/test_socket_ssl.py

  • Committer: Jeffrey C. "The TimeHorse" Jacobs
  • Date: 2008-09-21 13:47:31 UTC
  • mfrom: (39021.1.404 Regexp-2.7)
  • Revision ID: darklord@timehorse.com-20080921134731-rudomuzeh1b2tz1y
Merged in changes from the latest python source snapshot.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Test just the SSL support in the socket module, in a moderately bogus way.
2
 
 
3
 
import sys
4
 
import unittest
5
 
from test import test_support
6
 
import socket
7
 
import errno
8
 
import threading
9
 
import subprocess
10
 
import time
11
 
import os
12
 
import urllib
13
 
import warnings
14
 
 
15
 
warnings.filterwarnings(
16
 
    'ignore',
17
 
    'socket.ssl.. is deprecated.  Use ssl.wrap_socket.. instead.',
18
 
    DeprecationWarning)
19
 
 
20
 
# Optionally test SSL support, if we have it in the tested platform
21
 
skip_expected = not hasattr(socket, "ssl")
22
 
 
23
 
HOST = test_support.HOST
24
 
 
25
 
class ConnectedTests(unittest.TestCase):
26
 
 
27
 
    def urlopen(self, host, *args, **kwargs):
28
 
        # Connecting to remote hosts is flaky.  Make it more robust
29
 
        # by retrying the connection several times.
30
 
        for i in range(3):
31
 
            try:
32
 
                return urllib.urlopen(host, *args, **kwargs)
33
 
            except IOError, e:
34
 
                last_exc = e
35
 
                continue
36
 
            except:
37
 
                raise
38
 
        raise last_exc
39
 
 
40
 
    def testBasic(self):
41
 
        socket.RAND_status()
42
 
        try:
43
 
            socket.RAND_egd(1)
44
 
        except TypeError:
45
 
            pass
46
 
        else:
47
 
            print "didn't raise TypeError"
48
 
        socket.RAND_add("this is a random string", 75.0)
49
 
 
50
 
        with test_support.transient_internet():
51
 
            f = self.urlopen('https://sf.net')
52
 
        buf = f.read()
53
 
        f.close()
54
 
 
55
 
    def testTimeout(self):
56
 
        def error_msg(extra_msg):
57
 
            print >> sys.stderr, """\
58
 
        WARNING:  an attempt to connect to %r %s, in
59
 
        testTimeout.  That may be legitimate, but is not the outcome we
60
 
        hoped for.  If this message is seen often, testTimeout should be
61
 
        changed to use a more reliable address.""" % (ADDR, extra_msg)
62
 
 
63
 
        # A service which issues a welcome banner (without need to write
64
 
        # anything).
65
 
        ADDR = "pop.gmail.com", 995
66
 
 
67
 
        s = socket.socket()
68
 
        s.settimeout(30.0)
69
 
        try:
70
 
            s.connect(ADDR)
71
 
        except socket.timeout:
72
 
            error_msg('timed out')
73
 
            return
74
 
        except socket.error, exc:  # In case connection is refused.
75
 
            if exc.args[0] == errno.ECONNREFUSED:
76
 
                error_msg('was refused')
77
 
                return
78
 
            else:
79
 
                raise
80
 
 
81
 
        ss = socket.ssl(s)
82
 
        # Read part of return welcome banner twice.
83
 
        ss.read(1)
84
 
        ss.read(1)
85
 
        s.close()
86
 
 
87
 
class BasicTests(unittest.TestCase):
88
 
 
89
 
    def testRudeShutdown(self):
90
 
        listener_ready = threading.Event()
91
 
        listener_gone = threading.Event()
92
 
        sock = socket.socket()
93
 
        port = test_support.bind_port(sock)
94
 
 
95
 
        # `listener` runs in a thread.  It opens a socket and sits in accept()
96
 
        # until the main thread connects.  Then it rudely closes the socket,
97
 
        # and sets Event `listener_gone` to let the main thread know the socket
98
 
        # is gone.
99
 
        def listener(s):
100
 
            s.listen(5)
101
 
            listener_ready.set()
102
 
            s.accept()
103
 
            s = None # reclaim the socket object, which also closes it
104
 
            listener_gone.set()
105
 
 
106
 
        def connector():
107
 
            listener_ready.wait()
108
 
            s = socket.socket()
109
 
            s.connect((HOST, port))
110
 
            listener_gone.wait()
111
 
            try:
112
 
                ssl_sock = socket.ssl(s)
113
 
            except socket.sslerror:
114
 
                pass
115
 
            else:
116
 
                raise test_support.TestFailed(
117
 
                      'connecting to closed SSL socket should have failed')
118
 
 
119
 
        t = threading.Thread(target=listener, args=(sock,))
120
 
        t.start()
121
 
        connector()
122
 
        t.join()
123
 
 
124
 
    def connect(self, s, host_port):
125
 
        # Connecting to remote hosts is flaky.  Make it more robust
126
 
        # by retrying the connection several times.
127
 
        for i in range(3):
128
 
            try:
129
 
                return s.connect(host_port)
130
 
            except IOError, e:
131
 
                last_exc = e
132
 
                continue
133
 
            except:
134
 
                raise
135
 
        raise last_exc
136
 
 
137
 
    def test_978833(self):
138
 
        if not test_support.is_resource_enabled("network"):
139
 
            return
140
 
        if test_support.verbose:
141
 
            print "test_978833 ..."
142
 
 
143
 
        import os, httplib, ssl
144
 
        with test_support.transient_internet():
145
 
            s = socket.socket(socket.AF_INET)
146
 
            try:
147
 
                self.connect(s, ("svn.python.org", 443))
148
 
            except IOError:
149
 
                print >> sys.stderr, """\
150
 
        WARNING:  an attempt to connect to svn.python.org:443 failed, in
151
 
        test_978833.  That may be legitimate, but is not the outcome we
152
 
        hoped for.  If this message is seen often, test_978833 should be
153
 
        changed to use a more reliable address."""
154
 
                return
155
 
            fd = s._sock.fileno()
156
 
            sock = ssl.wrap_socket(s)
157
 
            s = None
158
 
            sock.close()
159
 
            try:
160
 
                os.fstat(fd)
161
 
            except OSError:
162
 
                pass
163
 
            else:
164
 
                raise test_support.TestFailed("Failed to close socket")
165
 
 
166
 
class OpenSSLTests(unittest.TestCase):
167
 
 
168
 
    def testBasic(self):
169
 
        s = socket.socket()
170
 
        s.connect((HOST, OpenSSLServer.PORT))
171
 
        ss = socket.ssl(s)
172
 
        ss.write("Foo\n")
173
 
        i = ss.read(4)
174
 
        self.assertEqual(i, "Foo\n")
175
 
        s.close()
176
 
 
177
 
    def testMethods(self):
178
 
        # read & write is already tried in the Basic test
179
 
        # now we'll try to get the server info about certificates
180
 
        # this came from the certificate I used, one I found in /usr/share/openssl
181
 
        info = "/C=PT/ST=Queensland/L=Lisboa/O=Neuronio, Lda./OU=Desenvolvimento/CN=brutus.neuronio.pt/emailAddress=sampo@iki.fi"
182
 
 
183
 
        s = socket.socket()
184
 
        s.connect((HOST, OpenSSLServer.PORT))
185
 
        ss = socket.ssl(s)
186
 
        cert = ss.server()
187
 
        self.assertEqual(cert, info)
188
 
        cert = ss.issuer()
189
 
        self.assertEqual(cert, info)
190
 
        s.close()
191
 
 
192
 
 
193
 
class OpenSSLServer(threading.Thread):
194
 
    PORT = None
195
 
    def __init__(self):
196
 
        self.s = None
197
 
        self.keepServing = True
198
 
        self._external()
199
 
        if self.haveServer:
200
 
            threading.Thread.__init__(self)
201
 
 
202
 
    def _external(self):
203
 
        # let's find the .pem files
204
 
        curdir = os.path.dirname(__file__) or os.curdir
205
 
        cert_file = os.path.join(curdir, "ssl_cert.pem")
206
 
        if not os.access(cert_file, os.F_OK):
207
 
            raise ValueError("No cert file found! (tried %r)" % cert_file)
208
 
        key_file = os.path.join(curdir, "ssl_key.pem")
209
 
        if not os.access(key_file, os.F_OK):
210
 
            raise ValueError("No key file found! (tried %r)" % key_file)
211
 
 
212
 
        try:
213
 
            # XXX TODO: on Windows, this should make more effort to use the
214
 
            # openssl.exe that would have been built by the pcbuild.sln.
215
 
            OpenSSLServer.PORT = test_support.find_unused_port()
216
 
            args = (OpenSSLServer.PORT, cert_file, key_file)
217
 
            cmd = "openssl s_server -accept %d -cert %s -key %s -quiet" % args
218
 
            self.s = subprocess.Popen(cmd.split(), stdin=subprocess.PIPE,
219
 
                                       stdout=subprocess.PIPE,
220
 
                                       stderr=subprocess.STDOUT)
221
 
            time.sleep(1)
222
 
        except:
223
 
            self.haveServer = False
224
 
        else:
225
 
            # let's try if it is actually up
226
 
            try:
227
 
                s = socket.socket()
228
 
                s.connect((HOST, OpenSSLServer.PORT))
229
 
                s.close()
230
 
                if self.s.stdout.readline() != "ERROR\n":
231
 
                    raise ValueError
232
 
            except:
233
 
                self.haveServer = False
234
 
            else:
235
 
                self.haveServer = True
236
 
 
237
 
    def run(self):
238
 
        while self.keepServing:
239
 
            time.sleep(.5)
240
 
            l = self.s.stdout.readline()
241
 
            self.s.stdin.write(l)
242
 
 
243
 
    def shutdown(self):
244
 
        self.keepServing = False
245
 
        if not self.s:
246
 
            return
247
 
        if sys.platform == "win32":
248
 
            subprocess.TerminateProcess(int(self.s._handle), -1)
249
 
        else:
250
 
            os.kill(self.s.pid, 15)
251
 
 
252
 
def test_main():
253
 
    if not hasattr(socket, "ssl"):
254
 
        raise test_support.TestSkipped("socket module has no ssl support")
255
 
 
256
 
    tests = [BasicTests]
257
 
 
258
 
    if test_support.is_resource_enabled('network'):
259
 
        tests.append(ConnectedTests)
260
 
 
261
 
    # in these platforms we can kill the openssl process
262
 
    if sys.platform in ("sunos5", "darwin", "linux1",
263
 
                        "linux2", "win32", "hp-ux11"):
264
 
 
265
 
        server = OpenSSLServer()
266
 
        if server.haveServer:
267
 
            tests.append(OpenSSLTests)
268
 
            server.start()
269
 
    else:
270
 
        server = None
271
 
 
272
 
    thread_info = test_support.threading_setup()
273
 
 
274
 
    try:
275
 
        test_support.run_unittest(*tests)
276
 
    finally:
277
 
        if server is not None and server.haveServer:
278
 
            server.shutdown()
279
 
 
280
 
    test_support.threading_cleanup(*thread_info)
281
 
 
282
 
if __name__ == "__main__":
283
 
    test_main()