3
"""Unit tests for M2Crypto.SSL.
5
Copyright (c) 2000-2004 Ng Pheng Siong. All rights reserved."""
20
import os, socket, string, sys, tempfile, thread, time, unittest
21
from M2Crypto import Rand, SSL, m2, Err
23
from fips import fips_mode
25
srv_host = 'localhost'
28
def verify_cb_new_function(ok, store):
31
err = store.get_error()
32
assert err in [m2.X509_V_ERR_DEPTH_ZERO_SELF_SIGNED_CERT,
33
m2.X509_V_ERR_UNABLE_TO_GET_ISSUER_CERT_LOCALLY,
34
m2.X509_V_ERR_CERT_UNTRUSTED,
35
m2.X509_V_ERR_UNABLE_TO_VERIFY_LEAF_SIGNATURE]
36
assert store.get_error_depth() == 0
37
app_data = m2.x509_store_ctx_get_app_data(store.ctx)
39
x509 = store.get_current_cert()
41
stack = store.get1_chain()
42
assert len(stack) == 1
43
assert stack[0].as_pem() == x509.as_pem()
44
except AssertionError, e:
45
# If we let exceptions propagate from here the
46
# caller may see strange errors. This is cleaner.
51
def __call__(self, ok, store):
52
return verify_cb_new_function(ok, store)
54
sleepTime = float(os.getenv('M2CRYPTO_TEST_SSL_SLEEP', 0.5))
57
if os.name == 'nt' or sys.platform == 'cygwin':
58
openssl = 'openssl.exe'
62
plist = os.environ['PATH'].split(os.pathsep)
72
class BaseSSLClientTestCase(unittest.TestCase):
74
openssl_in_path = find_openssl()
76
def start_server(self, args):
77
if not self.openssl_in_path:
78
raise Exception('openssl command not in PATH')
82
# openssl must be started in the tests directory for it
83
# to find the .pem files
86
os.execvp('openssl', args)
94
def stop_server(self, pid):
98
def http_get(self, s):
99
s.send('GET / HTTP/1.0\n\n')
106
except SSL.SSLError: # s_server throws an 'unexpected eof'...
112
self.srv_host = srv_host
113
self.srv_port = srv_port
114
self.srv_addr = (srv_host, srv_port)
115
self.srv_url = 'https://%s:%s/' % (srv_host, srv_port)
116
self.args = ['s_server', '-quiet', '-www',
117
#'-cert', 'server.pem', Implicitly using this
118
'-accept', str(self.srv_port)]
122
srv_port = srv_port - 1
125
class PassSSLClientTestCase(BaseSSLClientTestCase):
130
class HttpslibSSLClientTestCase(BaseSSLClientTestCase):
132
def test_HTTPSConnection(self):
133
pid = self.start_server(self.args)
135
from M2Crypto import httpslib
136
c = httpslib.HTTPSConnection(srv_host, srv_port)
137
c.request('GET', '/')
138
data = c.getresponse().read()
141
self.stop_server(pid)
142
self.failIf(string.find(data, 's_server -quiet -www') == -1)
144
def test_HTTPSConnection_resume_session(self):
145
pid = self.start_server(self.args)
147
from M2Crypto import httpslib
149
ctx.load_verify_locations(cafile='tests/ca.pem')
150
ctx.load_cert('tests/x509.pem')
151
ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 1)
152
ctx.set_session_cache_mode(m2.SSL_SESS_CACHE_CLIENT)
153
c = httpslib.HTTPSConnection(srv_host, srv_port, ssl_context=ctx)
154
c.request('GET', '/')
155
ses = c.get_session()
157
data = c.getresponse().read()
158
# Appearently closing connection here screws session; Ali Polatel?
162
ctx2.load_verify_locations(cafile='tests/ca.pem')
163
ctx2.load_cert('tests/x509.pem')
164
ctx2.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 1)
165
ctx2.set_session_cache_mode(m2.SSL_SESS_CACHE_CLIENT)
166
c2 = httpslib.HTTPSConnection(srv_host, srv_port, ssl_context=ctx2)
168
c2.request('GET', '/')
169
ses2 = c2.get_session()
171
data = c2.getresponse().read()
174
assert t == t2, "Sessions did not match"
176
self.stop_server(pid)
177
self.failIf(string.find(data, 's_server -quiet -www') == -1)
179
def test_HTTPSConnection_secure_context(self):
180
pid = self.start_server(self.args)
182
from M2Crypto import httpslib
184
ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)
185
ctx.load_verify_locations('tests/ca.pem')
186
c = httpslib.HTTPSConnection(srv_host, srv_port, ssl_context=ctx)
187
c.request('GET', '/')
188
data = c.getresponse().read()
191
self.stop_server(pid)
192
self.failIf(string.find(data, 's_server -quiet -www') == -1)
194
def test_HTTPSConnection_secure_context_fail(self):
195
pid = self.start_server(self.args)
197
from M2Crypto import httpslib
199
ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)
200
ctx.load_verify_locations('tests/server.pem')
201
c = httpslib.HTTPSConnection(srv_host, srv_port, ssl_context=ctx)
202
self.assertRaises(SSL.SSLError, c.request, 'GET', '/')
205
self.stop_server(pid)
207
def test_HTTPS(self):
208
pid = self.start_server(self.args)
210
from M2Crypto import httpslib
211
c = httpslib.HTTPS(srv_host, srv_port)
212
c.putrequest('GET', '/')
213
c.putheader('Accept', 'text/html')
214
c.putheader('Accept', 'text/plain')
216
err, msg, headers = c.getreply()
217
assert err == 200, err
222
self.stop_server(pid)
223
self.failIf(string.find(data, 's_server -quiet -www') == -1)
225
def test_HTTPS_secure_context(self):
226
pid = self.start_server(self.args)
228
from M2Crypto import httpslib
230
ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)
231
ctx.load_verify_locations('tests/ca.pem')
232
c = httpslib.HTTPS(srv_host, srv_port, ssl_context=ctx)
233
c.putrequest('GET', '/')
234
c.putheader('Accept', 'text/html')
235
c.putheader('Accept', 'text/plain')
237
err, msg, headers = c.getreply()
238
assert err == 200, err
243
self.stop_server(pid)
244
self.failIf(string.find(data, 's_server -quiet -www') == -1)
246
def test_HTTPS_secure_context_fail(self):
247
pid = self.start_server(self.args)
249
from M2Crypto import httpslib
251
ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)
252
ctx.load_verify_locations('tests/server.pem')
253
c = httpslib.HTTPS(srv_host, srv_port, ssl_context=ctx)
254
c.putrequest('GET', '/')
255
c.putheader('Accept', 'text/html')
256
c.putheader('Accept', 'text/plain')
257
self.assertRaises(SSL.SSLError, c.endheaders)
260
self.stop_server(pid)
262
def test_HTTPSConnection_illegalkeywordarg(self):
263
from M2Crypto import httpslib
264
self.assertRaises(ValueError, httpslib.HTTPSConnection, 'example.org',
268
class MiscSSLClientTestCase(BaseSSLClientTestCase):
270
def test_no_connection(self):
272
s = SSL.Connection(ctx)
274
def test_server_simple(self):
275
pid = self.start_server(self.args)
277
self.assertRaises(ValueError, SSL.Context, 'tlsv5')
279
s = SSL.Connection(ctx)
280
s.connect(self.srv_addr)
281
self.assertRaises(ValueError, s.read, 0)
282
data = self.http_get(s)
285
self.stop_server(pid)
286
self.failIf(string.find(data, 's_server -quiet -www') == -1)
288
def test_server_simple_secure_context(self):
289
pid = self.start_server(self.args)
292
ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)
293
ctx.load_verify_locations('tests/ca.pem')
294
s = SSL.Connection(ctx)
295
s.connect(self.srv_addr)
296
data = self.http_get(s)
299
self.stop_server(pid)
300
self.failIf(string.find(data, 's_server -quiet -www') == -1)
302
def test_server_simple_secure_context_fail(self):
303
pid = self.start_server(self.args)
306
ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)
307
ctx.load_verify_locations('tests/server.pem')
308
s = SSL.Connection(ctx)
309
self.assertRaises(SSL.SSLError, s.connect, self.srv_addr)
312
self.stop_server(pid)
314
def test_server_simple_timeouts(self):
315
pid = self.start_server(self.args)
317
self.assertRaises(ValueError, SSL.Context, 'tlsv5')
319
s = SSL.Connection(ctx)
321
r = s.get_socket_read_timeout()
322
w = s.get_socket_write_timeout()
323
assert r.sec == 0, r.sec
324
assert r.microsec == 0, r.microsec
325
assert w.sec == 0, w.sec
326
assert w.microsec == 0, w.microsec
328
s.set_socket_read_timeout(SSL.timeout())
329
s.set_socket_write_timeout(SSL.timeout(909,9))
330
r = s.get_socket_read_timeout()
331
w = s.get_socket_write_timeout()
332
assert r.sec == 600, r.sec
333
assert r.microsec == 0, r.microsec
334
assert w.sec == 909, w.sec
335
#assert w.microsec == 9, w.microsec XXX 4000
337
s.connect(self.srv_addr)
338
data = self.http_get(s)
341
self.stop_server(pid)
342
self.failIf(string.find(data, 's_server -quiet -www') == -1)
344
def test_tls1_nok(self):
345
if fips_mode: # TLS is required in FIPS mode
347
self.args.append('-no_tls1')
348
pid = self.start_server(self.args)
350
ctx = SSL.Context('tlsv1')
351
s = SSL.Connection(ctx)
353
s.connect(self.srv_addr)
354
except SSL.SSLError, e:
355
self.failUnlessEqual(e[0], 'wrong version number')
358
self.stop_server(pid)
360
def test_tls1_ok(self):
361
self.args.append('-tls1')
362
pid = self.start_server(self.args)
364
ctx = SSL.Context('tlsv1')
365
s = SSL.Connection(ctx)
366
s.connect(self.srv_addr)
367
data = self.http_get(s)
370
self.stop_server(pid)
371
self.failIf(string.find(data, 's_server -quiet -www') == -1)
373
def test_sslv23_no_v2(self):
374
if fips_mode: # TLS is required in FIPS mode
376
self.args.append('-no_tls1')
377
pid = self.start_server(self.args)
379
ctx = SSL.Context('sslv23')
380
s = SSL.Connection(ctx)
381
s.connect(self.srv_addr)
382
self.failUnlessEqual(s.get_version(), 'SSLv3')
385
self.stop_server(pid)
387
def test_sslv23_no_v2_no_service(self):
388
if fips_mode: # TLS is required in FIPS mode
390
self.args = self.args + ['-no_tls1', '-no_ssl3']
391
pid = self.start_server(self.args)
393
ctx = SSL.Context('sslv23')
394
s = SSL.Connection(ctx)
395
self.assertRaises(SSL.SSLError, s.connect, self.srv_addr)
398
self.stop_server(pid)
400
def disabled_test_sslv23_weak_crypto(self):
401
if fips_mode: # TLS is required in FIPS mode
403
self.args = self.args + ['-no_tls1', '-no_ssl3']
404
pid = self.start_server(self.args)
406
ctx = SSL.Context('sslv23', weak_crypto=1)
407
s = SSL.Connection(ctx)
408
s.connect(self.srv_addr)
409
self.failUnlessEqual(s.get_version(), 'SSLv2')
412
self.stop_server(pid)
414
def test_cipher_mismatch(self):
415
self.args = self.args + ['-cipher', 'AES256-SHA']
416
pid = self.start_server(self.args)
419
s = SSL.Connection(ctx)
420
s.set_cipher_list('AES128-SHA')
422
s.connect(self.srv_addr)
423
except SSL.SSLError, e:
424
self.failUnlessEqual(e[0], 'sslv3 alert handshake failure')
427
self.stop_server(pid)
429
def test_no_such_cipher(self):
430
self.args = self.args + ['-cipher', 'AES128-SHA']
431
pid = self.start_server(self.args)
434
s = SSL.Connection(ctx)
435
s.set_cipher_list('EXP-RC2-MD5')
437
s.connect(self.srv_addr)
438
except SSL.SSLError, e:
439
self.failUnlessEqual(e[0], 'no ciphers available')
442
self.stop_server(pid)
444
def test_no_weak_cipher(self):
445
if fips_mode: # Weak ciphers are prohibited
447
self.args = self.args + ['-cipher', 'EXP']
448
pid = self.start_server(self.args)
451
s = SSL.Connection(ctx)
453
s.connect(self.srv_addr)
454
except SSL.SSLError, e:
455
self.failUnlessEqual(e[0], 'sslv3 alert handshake failure')
458
self.stop_server(pid)
460
def test_use_weak_cipher(self):
461
if fips_mode: # Weak ciphers are prohibited
463
self.args = self.args + ['-cipher', 'EXP']
464
pid = self.start_server(self.args)
466
ctx = SSL.Context(weak_crypto=1)
467
s = SSL.Connection(ctx)
468
s.connect(self.srv_addr)
469
data = self.http_get(s)
472
self.stop_server(pid)
473
self.failIf(string.find(data, 's_server -quiet -www') == -1)
475
def test_cipher_ok(self):
476
self.args = self.args + ['-cipher', 'AES128-SHA']
477
pid = self.start_server(self.args)
480
s = SSL.Connection(ctx)
481
s.set_cipher_list('AES128-SHA')
482
s.connect(self.srv_addr)
483
data = self.http_get(s)
485
assert s.get_cipher().name() == 'AES128-SHA', s.get_cipher().name()
487
cipher_stack = s.get_ciphers()
488
assert cipher_stack[0].name() == 'AES128-SHA', cipher_stack[0].name()
489
self.assertRaises(IndexError, cipher_stack.__getitem__, 2)
490
# For some reason there are 2 entries in the stack
491
#assert len(cipher_stack) == 1, len(cipher_stack)
492
assert s.get_cipher_list() == 'AES128-SHA', s.get_cipher_list()
494
# Test Cipher_Stack iterator
496
for cipher in cipher_stack:
498
assert cipher.name() == 'AES128-SHA', '"%s"' % cipher.name()
499
self.assertEqual('AES128-SHA-128', str(cipher))
500
# For some reason there are 2 entries in the stack
502
self.assertEqual(i, len(cipher_stack))
506
self.stop_server(pid)
507
self.failIf(string.find(data, 's_server -quiet -www') == -1)
509
def verify_cb_new(self, ok, store):
510
return verify_cb_new_function(ok, store)
512
def test_verify_cb_new(self):
513
pid = self.start_server(self.args)
516
ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9,
518
s = SSL.Connection(ctx)
520
s.connect(self.srv_addr)
521
except SSL.SSLError, e:
523
data = self.http_get(s)
526
self.stop_server(pid)
527
self.failIf(string.find(data, 's_server -quiet -www') == -1)
529
def test_verify_cb_new_class(self):
530
pid = self.start_server(self.args)
533
ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9,
535
s = SSL.Connection(ctx)
537
s.connect(self.srv_addr)
538
except SSL.SSLError, e:
540
data = self.http_get(s)
543
self.stop_server(pid)
544
self.failIf(string.find(data, 's_server -quiet -www') == -1)
546
def test_verify_cb_new_function(self):
547
pid = self.start_server(self.args)
550
ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9,
551
verify_cb_new_function)
552
s = SSL.Connection(ctx)
554
s.connect(self.srv_addr)
555
except SSL.SSLError, e:
557
data = self.http_get(s)
560
self.stop_server(pid)
561
self.failIf(string.find(data, 's_server -quiet -www') == -1)
563
def test_verify_cb_lambda(self):
564
pid = self.start_server(self.args)
567
ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9,
569
s = SSL.Connection(ctx)
571
s.connect(self.srv_addr)
572
except SSL.SSLError, e:
574
data = self.http_get(s)
577
self.stop_server(pid)
578
self.failIf(string.find(data, 's_server -quiet -www') == -1)
580
def verify_cb_exception(self, ok, store):
581
raise Exception, 'We should fail verification'
583
def test_verify_cb_exception(self):
584
pid = self.start_server(self.args)
587
ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9,
588
self.verify_cb_exception)
589
s = SSL.Connection(ctx)
590
self.assertRaises(SSL.SSLError, s.connect, self.srv_addr)
593
self.stop_server(pid)
595
def test_verify_cb_not_callable(self):
597
self.assertRaises(TypeError,
599
SSL.verify_peer | SSL.verify_fail_if_no_peer_cert,
603
def test_verify_cb_wrong_callable(self):
604
pid = self.start_server(self.args)
607
ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9,
609
s = SSL.Connection(ctx)
610
self.assertRaises(SSL.SSLError, s.connect, self.srv_addr)
613
self.stop_server(pid)
615
def verify_cb_old(self, ctx_ptr, x509_ptr, err, depth, ok):
617
from M2Crypto import X509
619
assert err == m2.X509_V_ERR_DEPTH_ZERO_SELF_SIGNED_CERT or \
620
err == m2.X509_V_ERR_UNABLE_TO_GET_ISSUER_CERT_LOCALLY or \
621
err == m2.X509_V_ERR_CERT_UNTRUSTED or \
622
err == m2.X509_V_ERR_UNABLE_TO_VERIFY_LEAF_SIGNATURE
623
assert m2.ssl_ctx_get_cert_store(ctx_ptr)
624
assert X509.X509(x509_ptr).as_pem()
625
except AssertionError:
626
# If we let exceptions propagate from here the
627
# caller may see strange errors. This is cleaner.
631
def test_verify_cb_old(self):
632
pid = self.start_server(self.args)
635
ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9,
637
s = SSL.Connection(ctx)
639
s.connect(self.srv_addr)
640
except SSL.SSLError, e:
642
data = self.http_get(s)
645
self.stop_server(pid)
646
self.failIf(string.find(data, 's_server -quiet -www') == -1)
648
def test_verify_allow_unknown_old(self):
649
pid = self.start_server(self.args)
652
ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9,
653
SSL.cb.ssl_verify_callback)
654
ctx.set_allow_unknown_ca(1)
655
s = SSL.Connection(ctx)
657
s.connect(self.srv_addr)
658
except SSL.SSLError, e:
660
data = self.http_get(s)
663
self.stop_server(pid)
664
self.failIf(string.find(data, 's_server -quiet -www') == -1)
666
def test_verify_allow_unknown_new(self):
667
pid = self.start_server(self.args)
670
ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9,
671
SSL.cb.ssl_verify_callback_allow_unknown_ca)
672
s = SSL.Connection(ctx)
674
s.connect(self.srv_addr)
675
except SSL.SSLError, e:
677
data = self.http_get(s)
680
self.stop_server(pid)
681
self.failIf(string.find(data, 's_server -quiet -www') == -1)
683
def test_verify_cert(self):
684
pid = self.start_server(self.args)
687
ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)
688
ctx.load_verify_locations('tests/ca.pem')
689
s = SSL.Connection(ctx)
691
s.connect(self.srv_addr)
692
except SSL.SSLError, e:
694
data = self.http_get(s)
697
self.stop_server(pid)
698
self.failIf(string.find(data, 's_server -quiet -www') == -1)
700
def test_verify_cert_fail(self):
701
pid = self.start_server(self.args)
704
ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)
705
ctx.load_verify_locations('tests/server.pem')
706
s = SSL.Connection(ctx)
707
self.assertRaises(SSL.SSLError, s.connect, self.srv_addr)
710
self.stop_server(pid)
712
def test_verify_cert_mutual_auth(self):
713
self.args.extend(['-Verify', '2', '-CAfile', 'ca.pem'])
714
pid = self.start_server(self.args)
717
ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)
718
ctx.load_verify_locations('tests/ca.pem')
719
ctx.load_cert('tests/x509.pem')
720
s = SSL.Connection(ctx)
722
s.connect(self.srv_addr)
723
except SSL.SSLError, e:
725
data = self.http_get(s)
728
self.stop_server(pid)
729
self.failIf(string.find(data, 's_server -quiet -www') == -1)
731
def test_verify_cert_mutual_auth_servernbio(self):
732
self.args.extend(['-Verify', '2', '-CAfile', 'ca.pem', '-nbio'])
733
pid = self.start_server(self.args)
736
ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)
737
ctx.load_verify_locations('tests/ca.pem')
738
ctx.load_cert('tests/x509.pem')
739
s = SSL.Connection(ctx)
741
s.connect(self.srv_addr)
742
except SSL.SSLError, e:
744
data = self.http_get(s)
747
self.stop_server(pid)
748
self.failIf(string.find(data, 's_server -quiet -www') == -1)
750
def test_verify_cert_mutual_auth_fail(self):
751
self.args.extend(['-Verify', '2', '-CAfile', 'ca.pem'])
752
pid = self.start_server(self.args)
755
ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)
756
ctx.load_verify_locations('tests/ca.pem')
757
s = SSL.Connection(ctx)
758
self.assertRaises(SSL.SSLError, s.connect, self.srv_addr)
761
self.stop_server(pid)
763
def test_verify_nocert_fail(self):
764
self.args.extend(['-nocert'])
765
pid = self.start_server(self.args)
768
ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)
769
ctx.load_verify_locations('tests/ca.pem')
770
s = SSL.Connection(ctx)
771
self.assertRaises(SSL.SSLError, s.connect, self.srv_addr)
774
self.stop_server(pid)
776
def test_blocking0(self):
777
pid = self.start_server(self.args)
780
s = SSL.Connection(ctx)
782
self.assertRaises(Exception, s.connect, self.srv_addr)
785
self.stop_server(pid)
787
def test_blocking1(self):
788
pid = self.start_server(self.args)
791
s = SSL.Connection(ctx)
794
s.connect(self.srv_addr)
795
except SSL.SSLError, e:
797
data = self.http_get(s)
800
self.stop_server(pid)
801
self.failIf(string.find(data, 's_server -quiet -www') == -1)
803
def test_makefile(self):
804
pid = self.start_server(self.args)
807
s = SSL.Connection(ctx)
809
s.connect(self.srv_addr)
810
except SSL.SSLError, e:
812
bio = s.makefile('rw')
813
#s.close() # XXX bug 6628?
814
bio.write('GET / HTTP/1.0\n\n')
820
self.stop_server(pid)
821
self.failIf(string.find(data, 's_server -quiet -www') == -1)
823
def test_makefile_err(self):
824
pid = self.start_server(self.args)
827
s = SSL.Connection(ctx)
829
s.connect(self.srv_addr)
830
except SSL.SSLError, e:
833
data = self.http_get(s)
837
err_code = Err.peek_error_code()
838
assert not err_code, 'Unexpected error: %s' % err_code
839
err = Err.get_error()
840
assert not err, 'Unexpected error: %s' % err
842
self.stop_server(pid)
843
self.failIf(string.find(data, 's_server -quiet -www') == -1)
845
def test_info_callback(self):
846
pid = self.start_server(self.args)
849
ctx.set_info_callback()
850
s = SSL.Connection(ctx)
851
s.connect(self.srv_addr)
852
data = self.http_get(s)
855
self.stop_server(pid)
856
self.failIf(string.find(data, 's_server -quiet -www') == -1)
859
class UrllibSSLClientTestCase(BaseSSLClientTestCase):
861
def test_urllib(self):
862
pid = self.start_server(self.args)
864
from M2Crypto import m2urllib
865
url = m2urllib.FancyURLopener()
866
url.addheader('Connection', 'close')
867
u = url.open('https://%s:%s/' % (srv_host, srv_port))
871
self.stop_server(pid)
872
self.failIf(string.find(data, 's_server -quiet -www') == -1)
874
# XXX Don't actually know how to use m2urllib safely!
875
#def test_urllib_secure_context(self):
876
#def test_urllib_secure_context_fail(self):
878
# XXX Don't actually know how to use m2urllib safely!
879
#def test_urllib_safe_context(self):
880
#def test_urllib_safe_context_fail(self):
883
class Urllib2SSLClientTestCase(BaseSSLClientTestCase):
885
if sys.version_info >= (2,4):
886
def test_urllib2(self):
887
pid = self.start_server(self.args)
889
from M2Crypto import m2urllib2
890
opener = m2urllib2.build_opener()
891
opener.addheaders = [('Connection', 'close')]
892
u = opener.open('https://%s:%s/' % (srv_host, srv_port))
896
self.stop_server(pid)
897
self.failIf(string.find(data, 's_server -quiet -www') == -1)
899
def test_urllib2_secure_context(self):
900
pid = self.start_server(self.args)
903
ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)
904
ctx.load_verify_locations('tests/ca.pem')
906
from M2Crypto import m2urllib2
907
opener = m2urllib2.build_opener(ctx)
908
opener.addheaders = [('Connection', 'close')]
909
u = opener.open('https://%s:%s/' % (srv_host, srv_port))
913
self.stop_server(pid)
914
self.failIf(string.find(data, 's_server -quiet -www') == -1)
916
def test_urllib2_secure_context_fail(self):
917
pid = self.start_server(self.args)
920
ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)
921
ctx.load_verify_locations('tests/server.pem')
923
from M2Crypto import m2urllib2
924
opener = m2urllib2.build_opener(ctx)
925
opener.addheaders = [('Connection', 'close')]
926
self.assertRaises(SSL.SSLError, opener.open, 'https://%s:%s/' % (srv_host, srv_port))
928
self.stop_server(pid)
930
def test_z_urllib2_opener(self):
931
pid = self.start_server(self.args)
935
from M2Crypto import m2urllib2
936
opener = m2urllib2.build_opener(ctx, m2urllib2.HTTPBasicAuthHandler())
937
m2urllib2.install_opener(opener)
938
req = m2urllib2.Request('https://%s:%s/' % (srv_host, srv_port))
939
u = m2urllib2.urlopen(req)
943
self.stop_server(pid)
944
self.failIf(string.find(data, 's_server -quiet -www') == -1)
946
def test_urllib2_opener_handlers(self):
949
from M2Crypto import m2urllib2
950
opener = m2urllib2.build_opener(ctx,
951
m2urllib2.HTTPBasicAuthHandler())
953
def test_urllib2_leak(self):
954
pid = self.start_server(self.args)
957
from M2Crypto import m2urllib2
958
o = m2urllib2.build_opener()
959
r = o.open('https://%s:%s/' % (srv_host, srv_port))
962
self.assertEqual(len(gc.get_referrers(s[0])), 1)
964
self.stop_server(pid)
967
class TwistedSSLClientTestCase(BaseSSLClientTestCase):
969
def test_twisted_wrapper(self):
970
# Test only when twisted and ZopeInterfaces are present
972
from twisted.internet.protocol import ClientFactory
973
from twisted.protocols.basic import LineReceiver
974
from twisted.internet import reactor
975
import M2Crypto.SSL.TwistedProtocolWrapper as wrapper
978
warnings.warn('Skipping twisted wrapper test because twisted not found')
981
class EchoClient(LineReceiver):
982
def connectionMade(self):
983
self.sendLine('GET / HTTP/1.0\n\n')
985
def lineReceived(self, line):
989
class EchoClientFactory(ClientFactory):
990
protocol = EchoClient
992
def clientConnectionFailed(self, connector, reason):
996
def clientConnectionLost(self, connector, reason):
999
pid = self.start_server(self.args)
1001
class ContextFactory:
1002
def getContext(self):
1003
return SSL.Context()
1009
contextFactory = ContextFactory()
1010
factory = EchoClientFactory()
1011
wrapper.connectSSL(srv_host, srv_port, factory, contextFactory)
1012
reactor.run() # This will block until reactor.stop() is called
1014
self.stop_server(pid)
1015
self.failIf(string.find(twisted_data, 's_server -quiet -www') == -1)
1021
class XmlRpcLibTestCase(unittest.TestCase):
1023
from M2Crypto import m2xmlrpclib
1024
m2xmlrpclib.SSL_Transport()
1025
# XXX need server to test against
1028
class FtpsLibTestCase(unittest.TestCase):
1030
from M2Crypto import ftpslib
1032
# XXX need server to test against
1035
class CheckerTestCase(unittest.TestCase):
1036
def test_checker(self):
1037
from M2Crypto.SSL import Checker
1038
from M2Crypto import X509
1040
check = Checker.Checker(host=srv_host,
1041
peerCertHash='7B754EFA41A264AAD370D43460BC8229F9354ECE')
1042
x509 = X509.load_cert('tests/server.pem')
1043
assert check(x509, srv_host)
1044
self.assertRaises(Checker.WrongHost, check, x509, 'example.com')
1047
doctest.testmod(Checker)
1050
class ContextTestCase(unittest.TestCase):
1051
def test_ctx_load_verify_locations(self):
1053
self.assertRaises(ValueError, ctx.load_verify_locations, None, None)
1056
from M2Crypto.SSL.Context import map, _ctxmap
1057
assert isinstance(map(), _ctxmap)
1061
assert map() is _ctxmap.singleton
1063
def test_certstore(self):
1065
ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)
1066
ctx.load_verify_locations('tests/ca.pem')
1067
ctx.load_cert('tests/x509.pem')
1069
from M2Crypto import X509
1070
store = ctx.get_cert_store()
1071
assert isinstance(store, X509.X509_Store)
1074
class SessionTestCase(unittest.TestCase):
1075
def test_session_load_bad(self):
1076
self.assertRaises(SSL.SSLError, SSL.Session.load_session,
1079
class FtpslibTestCase(unittest.TestCase):
1080
def test_26_compat(self):
1081
from M2Crypto import ftpslib
1082
f = ftpslib.FTP_TLS()
1083
# 2.6 used to raise AttributeError:
1084
self.assertRaises(socket.gaierror, f.connect, 'no-such-host-dfgHJK56789', 990)
1088
suite = unittest.TestSuite()
1089
suite.addTest(unittest.makeSuite(CheckerTestCase))
1090
suite.addTest(unittest.makeSuite(ContextTestCase))
1091
suite.addTest(unittest.makeSuite(SessionTestCase))
1092
suite.addTest(unittest.makeSuite(XmlRpcLibTestCase))
1093
suite.addTest(unittest.makeSuite(FtpsLibTestCase))
1094
suite.addTest(unittest.makeSuite(PassSSLClientTestCase))
1095
suite.addTest(unittest.makeSuite(HttpslibSSLClientTestCase))
1096
suite.addTest(unittest.makeSuite(UrllibSSLClientTestCase))
1097
suite.addTest(unittest.makeSuite(Urllib2SSLClientTestCase))
1098
suite.addTest(unittest.makeSuite(MiscSSLClientTestCase))
1099
suite.addTest(unittest.makeSuite(FtpslibTestCase))
1101
import M2Crypto.SSL.TwistedProtocolWrapper as wrapper
1102
suite.addTest(unittest.makeSuite(TwistedSSLClientTestCase))
1110
fn = tempfile.mktemp()
1111
cmd = 'ps | egrep %s > %s' % (s, fn)
1118
chunk = string.split(ps)
1119
pid, cmd = chunk[0], chunk[4]
1121
os.kill(int(pid), 1)
1126
if __name__ == '__main__':
1132
gc.set_debug(gc.DEBUG_LEAK & ~gc.DEBUG_SAVEALL)
1135
Rand.load_file('randpool.dat', -1)
1136
unittest.TextTestRunner().run(suite())
1137
Rand.save_file('randpool.dat')
1143
alltests.dump_garbage()