1
# -*- coding: utf-8 -*-
2
# Self-tests for the user-friendly Crypto.Random interface
4
# Written in 2013 by Dwayne C. Litzenberger <dlitz@dlitz.net>
6
# ===================================================================
7
# The contents of this file are dedicated to the public domain. To
8
# the extent that dedication to the public domain is not available,
9
# everyone is granted a worldwide, perpetual, royalty-free,
10
# non-exclusive license to exercise all rights associated with the
11
# contents of this file for any purpose whatsoever.
12
# No rights are reserved.
14
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
15
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
16
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
17
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
18
# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
19
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
20
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22
# ===================================================================
24
"""Self-test suite for generic Crypto.Random stuff """
26
from __future__ import nested_scopes
36
if sys.version_info[0] == 2 and sys.version_info[1] == 1:
37
from Crypto.Util.py21compat import *
38
from Crypto.Util.py3compat import *
41
import multiprocessing
43
multiprocessing = None
45
import Crypto.Random._UserFriendlyRNG
46
import Crypto.Random.random
48
class RNGForkTest(unittest.TestCase):
50
def _get_reseed_count(self):
52
Get `FortunaAccumulator.reseed_count`, the global count of the
53
number of times that the PRNG has been reseeded.
55
rng_singleton = Crypto.Random._UserFriendlyRNG._get_singleton()
56
rng_singleton._lock.acquire()
58
return rng_singleton._fa.reseed_count
60
rng_singleton._lock.release()
63
# Regression test for CVE-2013-1445. We had a bug where, under the
64
# right conditions, two processes might see the same random sequence.
66
if sys.platform.startswith('win'): # windows can't fork
67
assert not hasattr(os, 'fork') # ... right?
70
# Wait 150 ms so that we don't trigger the rate-limit prematurely.
73
reseed_count_before = self._get_reseed_count()
75
# One or both of these calls together should trigger a reseed right here.
76
Crypto.Random._UserFriendlyRNG._get_singleton().reinit()
77
Crypto.Random.get_random_bytes(1)
79
reseed_count_after = self._get_reseed_count()
80
self.assertNotEqual(reseed_count_before, reseed_count_after) # sanity check: test should reseed parent before forking
88
f = os.fdopen(wfd, "wb")
90
Crypto.Random.atfork()
92
data = Crypto.Random.get_random_bytes(16)
99
rfiles.append(os.fdopen(rfd, "rb"))
104
data = binascii.hexlify(f.read())
106
results_dict[data] = 1
109
if len(results) != len(results_dict.keys()):
110
raise AssertionError("RNG output duplicated across fork():\n%s" %
111
(pprint.pformat(results)))
114
# For RNGMultiprocessingForkTest
116
a = Crypto.Random.get_random_bytes(16)
117
time.sleep(0.1) # wait 100 ms
118
b = Crypto.Random.get_random_bytes(16)
119
q.put(binascii.b2a_hex(a))
120
q.put(binascii.b2a_hex(b))
121
q.put(None) # Wait for acknowledgment
124
class RNGMultiprocessingForkTest(unittest.TestCase):
127
# Another regression test for CVE-2013-1445. This is basically the
128
# same as RNGForkTest, but less compatible with old versions of Python,
129
# and a little easier to read.
132
manager = multiprocessing.Manager()
133
queues = [manager.Queue(1) for i in range(n_procs)]
137
Crypto.Random._UserFriendlyRNG._get_singleton().reinit()
138
Crypto.Random.get_random_bytes(1)
140
# Start the child processes
141
pool = multiprocessing.Pool(processes=n_procs, initializer=Crypto.Random.atfork)
142
map_result = pool.map_async(_task_main, queues)
144
# Get the results, ensuring that no pool processes are reused.
145
aa = [queues[i].get(30) for i in range(n_procs)]
146
bb = [queues[i].get(30) for i in range(n_procs)]
147
res = list(zip(aa, bb))
154
# Check that the results are unique
155
if len(set(aa)) != len(aa) or len(set(res)) != len(res):
156
raise AssertionError("RNG output duplicated across fork():\n%s" %
157
(pprint.pformat(res),))
160
def get_tests(config={}):
162
tests += [RNGForkTest()]
163
if multiprocessing is not None:
164
tests += [RNGMultiprocessingForkTest()]
167
if __name__ == '__main__':
168
suite = lambda: unittest.TestSuite(get_tests())
169
unittest.main(defaultTest='suite')
171
# vim:set ts=4 sw=4 sts=4 expandtab: