~ubuntu-branches/ubuntu/quantal/nova/quantal-security

« back to all changes in this revision

Viewing changes to nova/testing/runner.py

  • Committer: Package Import Robot
  • Author(s): Chuck Short, Chuck Short, Adam Gandelman
  • Date: 2012-06-22 12:39:57 UTC
  • mfrom: (1.1.57)
  • Revision ID: package-import@ubuntu.com-20120622123957-hbzwg84nt9rqwg8r
Tags: 2012.2~f2~20120621.14517-0ubuntu1
[ Chuck Short ]
* New upstream version.

[ Adam Gandelman ]
* debian/rules: Temporarily disable test suite while blocking
  tests are investigated. 
* debian/patches/kombu_tests_timeout.patch: Dropped.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
#!/usr/bin/env python
2
 
# vim: tabstop=4 shiftwidth=4 softtabstop=4
3
 
 
4
 
# Copyright 2010 United States Government as represented by the
5
 
# Administrator of the National Aeronautics and Space Administration.
6
 
# All Rights Reserved.
7
 
#
8
 
#    Licensed under the Apache License, Version 2.0 (the "License");
9
 
#    you may not use this file except in compliance with the License.
10
 
#    You may obtain a copy of the License at
11
 
#
12
 
#        http://www.apache.org/licenses/LICENSE-2.0
13
 
#
14
 
#    Unless required by applicable law or agreed to in writing, software
15
 
#    distributed under the License is distributed on an "AS IS" BASIS,
16
 
#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 
#    See the License for the specific language governing permissions and
18
 
#    limitations under the License.
19
 
 
20
 
# Colorizer Code is borrowed from Twisted:
21
 
# Copyright (c) 2001-2010 Twisted Matrix Laboratories.
22
 
#
23
 
#    Permission is hereby granted, free of charge, to any person obtaining
24
 
#    a copy of this software and associated documentation files (the
25
 
#    "Software"), to deal in the Software without restriction, including
26
 
#    without limitation the rights to use, copy, modify, merge, publish,
27
 
#    distribute, sublicense, and/or sell copies of the Software, and to
28
 
#    permit persons to whom the Software is furnished to do so, subject to
29
 
#    the following conditions:
30
 
#
31
 
#    The above copyright notice and this permission notice shall be
32
 
#    included in all copies or substantial portions of the Software.
33
 
#
34
 
#    THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
35
 
#    EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
36
 
#    MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
37
 
#    NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
38
 
#    LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
39
 
#    OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
40
 
#    WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
41
 
"""Unittest runner for Nova.
42
 
 
43
 
To run all tests
44
 
    python nova/testing/runner.py
45
 
 
46
 
To run a single test module:
47
 
    python nova/testing/runner.py test_compute
48
 
 
49
 
    or
50
 
 
51
 
    python nova/testing/runner.py api.test_wsgi
52
 
 
53
 
To run a single test:
54
 
    python nova/testing/runner.py
55
 
        test_compute:ComputeTestCase.test_run_terminate
56
 
 
57
 
"""
58
 
 
59
 
import gettext
60
 
import heapq
61
 
import os
62
 
import sys
63
 
import time
64
 
import unittest
65
 
 
66
 
import eventlet
67
 
from nose import config
68
 
from nose import core
69
 
from nose import result
70
 
 
71
 
gettext.install('nova', unicode=1)
72
 
reldir = os.path.join(os.path.dirname(__file__), '..', '..')
73
 
absdir = os.path.abspath(reldir)
74
 
sys.path.insert(0, absdir)
75
 
 
76
 
from nova import flags
77
 
from nova import log as logging
78
 
from nova.openstack.common import cfg
79
 
 
80
 
 
81
 
class _AnsiColorizer(object):
82
 
    """
83
 
    A colorizer is an object that loosely wraps around a stream, allowing
84
 
    callers to write text to the stream in a particular color.
85
 
 
86
 
    Colorizer classes must implement C{supported()} and C{write(text, color)}.
87
 
    """
88
 
    _colors = dict(black=30, red=31, green=32, yellow=33,
89
 
                   blue=34, magenta=35, cyan=36, white=37)
90
 
 
91
 
    def __init__(self, stream):
92
 
        self.stream = stream
93
 
 
94
 
    def supported(cls, stream=sys.stdout):
95
 
        """
96
 
        A class method that returns True if the current platform supports
97
 
        coloring terminal output using this method. Returns False otherwise.
98
 
        """
99
 
        if not stream.isatty():
100
 
            return False  # auto color only on TTYs
101
 
        try:
102
 
            import curses
103
 
        except ImportError:
104
 
            return False
105
 
        else:
106
 
            try:
107
 
                try:
108
 
                    return curses.tigetnum("colors") > 2
109
 
                except curses.error:
110
 
                    curses.setupterm()
111
 
                    return curses.tigetnum("colors") > 2
112
 
            except Exception:
113
 
                return False
114
 
    supported = classmethod(supported)
115
 
 
116
 
    def write(self, text, color):
117
 
        """
118
 
        Write the given text to the stream in the given color.
119
 
 
120
 
        @param text: Text to be written to the stream.
121
 
 
122
 
        @param color: A string label for a color. e.g. 'red', 'white'.
123
 
        """
124
 
        color = self._colors[color]
125
 
        self.stream.write('\x1b[%s;1m%s\x1b[0m' % (color, text))
126
 
 
127
 
 
128
 
class _Win32Colorizer(object):
129
 
    """
130
 
    See _AnsiColorizer docstring.
131
 
    """
132
 
    def __init__(self, stream):
133
 
        import win32console as win
134
 
        red, green, blue, bold = (win.FOREGROUND_RED, win.FOREGROUND_GREEN,
135
 
                                 win.FOREGROUND_BLUE, win.FOREGROUND_INTENSITY)
136
 
        self.stream = stream
137
 
        self.screenBuffer = win.GetStdHandle(win.STD_OUT_HANDLE)
138
 
        self._colors = {
139
 
            'normal': red | green | blue,
140
 
            'red': red | bold,
141
 
            'green': green | bold,
142
 
            'blue': blue | bold,
143
 
            'yellow': red | green | bold,
144
 
            'magenta': red | blue | bold,
145
 
            'cyan': green | blue | bold,
146
 
            'white': red | green | blue | bold
147
 
            }
148
 
 
149
 
    def supported(cls, stream=sys.stdout):
150
 
        try:
151
 
            import win32console
152
 
            screenBuffer = win32console.GetStdHandle(
153
 
                win32console.STD_OUT_HANDLE)
154
 
        except ImportError:
155
 
            return False
156
 
        import pywintypes
157
 
        try:
158
 
            screenBuffer.SetConsoleTextAttribute(
159
 
                win32console.FOREGROUND_RED |
160
 
                win32console.FOREGROUND_GREEN |
161
 
                win32console.FOREGROUND_BLUE)
162
 
        except pywintypes.error:
163
 
            return False
164
 
        else:
165
 
            return True
166
 
    supported = classmethod(supported)
167
 
 
168
 
    def write(self, text, color):
169
 
        color = self._colors[color]
170
 
        self.screenBuffer.SetConsoleTextAttribute(color)
171
 
        self.stream.write(text)
172
 
        self.screenBuffer.SetConsoleTextAttribute(self._colors['normal'])
173
 
 
174
 
 
175
 
class _NullColorizer(object):
176
 
    """
177
 
    See _AnsiColorizer docstring.
178
 
    """
179
 
    def __init__(self, stream):
180
 
        self.stream = stream
181
 
 
182
 
    def supported(cls, stream=sys.stdout):
183
 
        return True
184
 
    supported = classmethod(supported)
185
 
 
186
 
    def write(self, text, color):
187
 
        self.stream.write(text)
188
 
 
189
 
 
190
 
def get_elapsed_time_color(elapsed_time):
191
 
    if elapsed_time > 1.0:
192
 
        return 'red'
193
 
    elif elapsed_time > 0.25:
194
 
        return 'yellow'
195
 
    else:
196
 
        return 'green'
197
 
 
198
 
 
199
 
class NovaTestResult(result.TextTestResult):
200
 
    def __init__(self, *args, **kw):
201
 
        self.show_elapsed = kw.pop('show_elapsed')
202
 
        result.TextTestResult.__init__(self, *args, **kw)
203
 
        self.num_slow_tests = 5
204
 
        self.slow_tests = []  # this is a fixed-sized heap
205
 
        self._last_case = None
206
 
        self.colorizer = None
207
 
        # NOTE(vish): reset stdout for the terminal check
208
 
        stdout = sys.stdout
209
 
        sys.stdout = sys.__stdout__
210
 
        for colorizer in [_Win32Colorizer, _AnsiColorizer, _NullColorizer]:
211
 
            if colorizer.supported():
212
 
                self.colorizer = colorizer(self.stream)
213
 
                break
214
 
        sys.stdout = stdout
215
 
 
216
 
        # NOTE(lorinh): Initialize start_time in case a sqlalchemy-migrate
217
 
        # error results in it failing to be initialized later. Otherwise,
218
 
        # _handleElapsedTime will fail, causing the wrong error message to
219
 
        # be outputted.
220
 
        self.start_time = time.time()
221
 
 
222
 
    def getDescription(self, test):
223
 
        return str(test)
224
 
 
225
 
    def _handleElapsedTime(self, test):
226
 
        self.elapsed_time = time.time() - self.start_time
227
 
        item = (self.elapsed_time, test)
228
 
        # Record only the n-slowest tests using heap
229
 
        if len(self.slow_tests) >= self.num_slow_tests:
230
 
            heapq.heappushpop(self.slow_tests, item)
231
 
        else:
232
 
            heapq.heappush(self.slow_tests, item)
233
 
 
234
 
    def _writeElapsedTime(self, test):
235
 
        color = get_elapsed_time_color(self.elapsed_time)
236
 
        self.colorizer.write("  %.2f" % self.elapsed_time, color)
237
 
 
238
 
    def _writeResult(self, test, long_result, color, short_result, success):
239
 
        if self.showAll:
240
 
            self.colorizer.write(long_result, color)
241
 
            if self.show_elapsed and success:
242
 
                self._writeElapsedTime(test)
243
 
            self.stream.writeln()
244
 
        elif self.dots:
245
 
            self.stream.write(short_result)
246
 
            self.stream.flush()
247
 
 
248
 
    # NOTE(vish): copied from unittest with edit to add color
249
 
    def addSuccess(self, test):
250
 
        unittest.TestResult.addSuccess(self, test)
251
 
        self._handleElapsedTime(test)
252
 
        self._writeResult(test, 'OK', 'green', '.', True)
253
 
 
254
 
    # NOTE(vish): copied from unittest with edit to add color
255
 
    def addFailure(self, test, err):
256
 
        unittest.TestResult.addFailure(self, test, err)
257
 
        self._handleElapsedTime(test)
258
 
        self._writeResult(test, 'FAIL', 'red', 'F', False)
259
 
 
260
 
    # NOTE(vish): copied from nose with edit to add color
261
 
    def addError(self, test, err):
262
 
        """Overrides normal addError to add support for
263
 
        errorClasses. If the exception is a registered class, the
264
 
        error will be added to the list for that class, not errors.
265
 
        """
266
 
        self._handleElapsedTime(test)
267
 
        stream = getattr(self, 'stream', None)
268
 
        ec, ev, tb = err
269
 
        try:
270
 
            exc_info = self._exc_info_to_string(err, test)
271
 
        except TypeError:
272
 
            # 2.3 compat
273
 
            exc_info = self._exc_info_to_string(err)
274
 
        for cls, (storage, label, isfail) in self.errorClasses.items():
275
 
            if result.isclass(ec) and issubclass(ec, cls):
276
 
                if isfail:
277
 
                    test.passed = False
278
 
                storage.append((test, exc_info))
279
 
                # Might get patched into a streamless result
280
 
                if stream is not None:
281
 
                    if self.showAll:
282
 
                        message = [label]
283
 
                        detail = result._exception_detail(err[1])
284
 
                        if detail:
285
 
                            message.append(detail)
286
 
                        stream.writeln(": ".join(message))
287
 
                    elif self.dots:
288
 
                        stream.write(label[:1])
289
 
                return
290
 
        self.errors.append((test, exc_info))
291
 
        test.passed = False
292
 
        if stream is not None:
293
 
            self._writeResult(test, 'ERROR', 'red', 'E', False)
294
 
 
295
 
    def startTest(self, test):
296
 
        unittest.TestResult.startTest(self, test)
297
 
        self.start_time = time.time()
298
 
        current_case = test.test.__class__.__name__
299
 
 
300
 
        if self.showAll:
301
 
            if current_case != self._last_case:
302
 
                self.stream.writeln(current_case)
303
 
                self._last_case = current_case
304
 
 
305
 
            self.stream.write(
306
 
                '    %s' % str(test.test._testMethodName).ljust(60))
307
 
            self.stream.flush()
308
 
 
309
 
 
310
 
class NovaTestRunner(core.TextTestRunner):
311
 
    def __init__(self, *args, **kwargs):
312
 
        self.show_elapsed = kwargs.pop('show_elapsed')
313
 
        core.TextTestRunner.__init__(self, *args, **kwargs)
314
 
 
315
 
    def _makeResult(self):
316
 
        return NovaTestResult(self.stream,
317
 
                              self.descriptions,
318
 
                              self.verbosity,
319
 
                              self.config,
320
 
                              show_elapsed=self.show_elapsed)
321
 
 
322
 
    def _writeSlowTests(self, result_):
323
 
        # Pare out 'fast' tests
324
 
        slow_tests = [item for item in result_.slow_tests
325
 
                      if get_elapsed_time_color(item[0]) != 'green']
326
 
        if slow_tests:
327
 
            slow_total_time = sum(item[0] for item in slow_tests)
328
 
            self.stream.writeln("Slowest %i tests took %.2f secs:"
329
 
                                % (len(slow_tests), slow_total_time))
330
 
            for elapsed_time, test in sorted(slow_tests, reverse=True):
331
 
                time_str = "%.2f" % elapsed_time
332
 
                self.stream.writeln("    %s %s" % (time_str.ljust(10), test))
333
 
 
334
 
    def run(self, test):
335
 
        result_ = core.TextTestRunner.run(self, test)
336
 
        if self.show_elapsed:
337
 
            self._writeSlowTests(result_)
338
 
        return result_
339
 
 
340
 
 
341
 
def run():
342
 
    # This is a fix to allow the --hide-elapsed flag while accepting
343
 
    # arbitrary nosetest flags as well
344
 
    argv = [x for x in sys.argv if x != '--hide-elapsed']
345
 
    hide_elapsed = argv != sys.argv
346
 
    logging.setup()
347
 
 
348
 
    # If any argument looks like a test name but doesn't have "nova.tests" in
349
 
    # front of it, automatically add that so we don't have to type as much
350
 
    for i, arg in enumerate(argv):
351
 
        if arg.startswith('test_'):
352
 
            argv[i] = 'nova.tests.%s' % arg
353
 
 
354
 
    testdir = os.path.abspath(os.path.join("nova", "tests"))
355
 
    c = config.Config(stream=sys.stdout,
356
 
                      env=os.environ,
357
 
                      verbosity=3,
358
 
                      workingDir=testdir,
359
 
                      plugins=core.DefaultPluginManager())
360
 
 
361
 
    runner = NovaTestRunner(stream=c.stream,
362
 
                            verbosity=c.verbosity,
363
 
                            config=c,
364
 
                            show_elapsed=not hide_elapsed)
365
 
    sys.exit(not core.run(config=c, testRunner=runner, argv=argv))
366
 
 
367
 
 
368
 
if __name__ == '__main__':
369
 
    eventlet.monkey_patch()
370
 
    run()