~ubuntu-branches/ubuntu/wily/python-crypto/wily-proposed

« back to all changes in this revision

Viewing changes to lib/Crypto/SelfTest/Random/test__UserFriendlyRNG.py

  • Committer: Package Import Robot
  • Author(s): Sebastian Ramacher, Jakub Wilk, Sebastian Ramacher
  • Date: 2013-10-17 18:24:51 UTC
  • mfrom: (1.1.8)
  • Revision ID: package-import@ubuntu.com-20131017182451-3zdvbd6pnum0anrm
Tags: 2.6.1-1
[ Jakub Wilk ]
* Drop obsolete Conflicts/Replaces with python2.3-crypto and
  python2.4-crypto.

[ Sebastian Ramacher ]
* New upstream release.
  - Fixes CVE-2013-1445: PRNG not correctly reseeded in some situations.
* Set urgency to high since this upload fixes a security issue.
* debian/python3-crypto.install: Fix lintian warning
  brace-expansion-in-debhelper-config-file.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# -*- coding: utf-8 -*-
 
2
# Self-tests for the user-friendly Crypto.Random interface
 
3
#
 
4
# Written in 2013 by Dwayne C. Litzenberger <dlitz@dlitz.net>
 
5
#
 
6
# ===================================================================
 
7
# The contents of this file are dedicated to the public domain.  To
 
8
# the extent that dedication to the public domain is not available,
 
9
# everyone is granted a worldwide, perpetual, royalty-free,
 
10
# non-exclusive license to exercise all rights associated with the
 
11
# contents of this file for any purpose whatsoever.
 
12
# No rights are reserved.
 
13
#
 
14
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
 
15
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
 
16
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
 
17
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
 
18
# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
 
19
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
 
20
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
 
21
# SOFTWARE.
 
22
# ===================================================================
 
23
 
 
24
"""Self-test suite for generic Crypto.Random stuff """
 
25
 
 
26
from __future__ import nested_scopes
 
27
 
 
28
__revision__ = "$Id$"
 
29
 
 
30
import binascii
 
31
import pprint
 
32
import unittest
 
33
import os
 
34
import time
 
35
import sys
 
36
if sys.version_info[0] == 2 and sys.version_info[1] == 1:
 
37
    from Crypto.Util.py21compat import *
 
38
from Crypto.Util.py3compat import *
 
39
 
 
40
try:
 
41
    import multiprocessing
 
42
except ImportError:
 
43
    multiprocessing = None
 
44
 
 
45
import Crypto.Random._UserFriendlyRNG
 
46
import Crypto.Random.random
 
47
 
 
48
class RNGForkTest(unittest.TestCase):
 
49
 
 
50
    def _get_reseed_count(self):
 
51
        """
 
52
        Get `FortunaAccumulator.reseed_count`, the global count of the
 
53
        number of times that the PRNG has been reseeded.
 
54
        """
 
55
        rng_singleton = Crypto.Random._UserFriendlyRNG._get_singleton()
 
56
        rng_singleton._lock.acquire()
 
57
        try:
 
58
            return rng_singleton._fa.reseed_count
 
59
        finally:
 
60
            rng_singleton._lock.release()
 
61
 
 
62
    def runTest(self):
 
63
        # Regression test for CVE-2013-1445.  We had a bug where, under the
 
64
        # right conditions, two processes might see the same random sequence.
 
65
 
 
66
        if sys.platform.startswith('win'):  # windows can't fork
 
67
            assert not hasattr(os, 'fork')    # ... right?
 
68
            return
 
69
 
 
70
        # Wait 150 ms so that we don't trigger the rate-limit prematurely.
 
71
        time.sleep(0.15)
 
72
 
 
73
        reseed_count_before = self._get_reseed_count()
 
74
 
 
75
        # One or both of these calls together should trigger a reseed right here.
 
76
        Crypto.Random._UserFriendlyRNG._get_singleton().reinit()
 
77
        Crypto.Random.get_random_bytes(1)
 
78
 
 
79
        reseed_count_after = self._get_reseed_count()
 
80
        self.assertNotEqual(reseed_count_before, reseed_count_after)  # sanity check: test should reseed parent before forking
 
81
 
 
82
        rfiles = []
 
83
        for i in range(10):
 
84
            rfd, wfd = os.pipe()
 
85
            if os.fork() == 0:
 
86
                # child
 
87
                os.close(rfd)
 
88
                f = os.fdopen(wfd, "wb")
 
89
 
 
90
                Crypto.Random.atfork()
 
91
 
 
92
                data = Crypto.Random.get_random_bytes(16)
 
93
 
 
94
                f.write(data)
 
95
                f.close()
 
96
                os._exit(0)
 
97
            # parent
 
98
            os.close(wfd)
 
99
            rfiles.append(os.fdopen(rfd, "rb"))
 
100
 
 
101
        results = []
 
102
        results_dict = {}
 
103
        for f in rfiles:
 
104
            data = binascii.hexlify(f.read())
 
105
            results.append(data)
 
106
            results_dict[data] = 1
 
107
            f.close()
 
108
 
 
109
        if len(results) != len(results_dict.keys()):
 
110
            raise AssertionError("RNG output duplicated across fork():\n%s" %
 
111
                                 (pprint.pformat(results)))
 
112
 
 
113
 
 
114
# For RNGMultiprocessingForkTest
 
115
def _task_main(q):
 
116
    a = Crypto.Random.get_random_bytes(16)
 
117
    time.sleep(0.1)     # wait 100 ms
 
118
    b = Crypto.Random.get_random_bytes(16)
 
119
    q.put(binascii.b2a_hex(a))
 
120
    q.put(binascii.b2a_hex(b))
 
121
    q.put(None)      # Wait for acknowledgment
 
122
 
 
123
 
 
124
class RNGMultiprocessingForkTest(unittest.TestCase):
 
125
 
 
126
    def runTest(self):
 
127
        # Another regression test for CVE-2013-1445.  This is basically the
 
128
        # same as RNGForkTest, but less compatible with old versions of Python,
 
129
        # and a little easier to read.
 
130
 
 
131
        n_procs = 5
 
132
        manager = multiprocessing.Manager()
 
133
        queues = [manager.Queue(1) for i in range(n_procs)]
 
134
 
 
135
        # Reseed the pool
 
136
        time.sleep(0.15)
 
137
        Crypto.Random._UserFriendlyRNG._get_singleton().reinit()
 
138
        Crypto.Random.get_random_bytes(1)
 
139
 
 
140
        # Start the child processes
 
141
        pool = multiprocessing.Pool(processes=n_procs, initializer=Crypto.Random.atfork)
 
142
        map_result = pool.map_async(_task_main, queues)
 
143
 
 
144
        # Get the results, ensuring that no pool processes are reused.
 
145
        aa = [queues[i].get(30) for i in range(n_procs)]
 
146
        bb = [queues[i].get(30) for i in range(n_procs)]
 
147
        res = list(zip(aa, bb))
 
148
 
 
149
        # Shut down the pool
 
150
        map_result.get(30)
 
151
        pool.close()
 
152
        pool.join()
 
153
 
 
154
        # Check that the results are unique
 
155
        if len(set(aa)) != len(aa) or len(set(res)) != len(res):
 
156
            raise AssertionError("RNG output duplicated across fork():\n%s" %
 
157
                                 (pprint.pformat(res),))
 
158
 
 
159
 
 
160
def get_tests(config={}):
 
161
    tests = []
 
162
    tests += [RNGForkTest()]
 
163
    if multiprocessing is not None:
 
164
        tests += [RNGMultiprocessingForkTest()]
 
165
    return tests
 
166
 
 
167
if __name__ == '__main__':
 
168
    suite = lambda: unittest.TestSuite(get_tests())
 
169
    unittest.main(defaultTest='suite')
 
170
 
 
171
# vim:set ts=4 sw=4 sts=4 expandtab: