~ubuntu-branches/ubuntu/vivid/python-gevent/vivid

« back to all changes in this revision

Viewing changes to greentest/testrunner.py

  • Committer: Bazaar Package Importer
  • Author(s): Örjan Persson
  • Date: 2011-05-17 16:43:20 UTC
  • mto: (14.1.1 sid)
  • mto: This revision was merged to the branch mainline in revision 7.
  • Revision ID: james.westby@ubuntu.com-20110517164320-jyr5vamkqi3jfeab
Tags: upstream-0.13.6
Import upstream version 0.13.6

Show diffs side-by-side

added added

removed removed

Lines of Context:
11
11
 
12
12
The --db option, when provided, specifies sqlite3 database that holds the test results.
13
13
By default 'testresults.sqlite3' is used in the current directory.
14
 
If the a mercurial repository is detected and the current working copy is "dirty", that is,
15
 
has uncommited changes, then '/tmp/testresults.sqlite3' is used.
16
14
 
17
15
The results are stored in the following 2 tables:
18
16
 
39
37
DEFAULT_TIMEOUT = 60
40
38
 
41
39
# the number of bytes of output that is recorded; the rest is thrown away
42
 
OUTPUT_LIMIT = 15*1024
 
40
OUTPUT_LIMIT = 50000
43
41
 
44
42
ignore_tracebacks = ['ExpectedException', 'test_support.TestSkipped', 'test.test_support.TestSkipped']
45
43
 
53
51
 
54
52
try:
55
53
    import sqlite3
56
 
except ImportError:
 
54
except ImportError, ex:
 
55
    sys.stderr.write('Failed to import sqlite3: %s\n' % ex)
57
56
    try:
58
57
        import pysqlite2.dbapi2 as sqlite3
59
 
    except ImportError:
 
58
    except ImportError, ex:
 
59
        sys.stderr.write('Failed to import pysqlite2.dbapi2: %s\n' % ex)
60
60
        sqlite3 = None
61
61
 
62
62
_column_types = {'time': 'real'}
170
170
    return libevent_version
171
171
 
172
172
 
173
 
def get_libevent_method():
174
 
    from gevent import core
175
 
    return core.get_method()
176
 
 
177
 
 
178
173
def get_tempnam():
179
174
    import warnings
180
175
    warnings.filterwarnings('ignore', 'tempnam is a potential security risk to your program')
186
181
 
187
182
 
188
183
def run_tests(options, args):
189
 
    if len(args) != 1:
190
 
        sys.exit('--record requires exactly one test module to run')
191
184
    arg = args[0]
192
185
    module_name = arg
193
186
    if module_name.endswith('.py'):
194
187
        module_name = module_name[:-3]
 
188
 
195
189
    class _runner(object):
 
190
 
196
191
        def __new__(cls, *args, **kawrgs):
197
192
            return DatabaseTestRunner(database_path=options.db, runid=options.runid, module_name=module_name, verbosity=options.verbosity)
 
193
 
198
194
    if options.db:
199
195
        import unittest
200
196
        unittest.TextTestRunner = _runner
201
197
        import test_support
202
198
        test_support.BasicTestRunner = _runner
 
199
 
 
200
    sys.argv = args
 
201
    globals()['__file__'] = arg
 
202
 
203
203
    if os.path.exists(arg):
204
 
        sys.argv = args
205
 
        saved_globals = {'__file__': __file__}
206
 
        try:
207
 
            globals()['__file__'] = arg
208
 
            # QQQ this makes tests reported as if they are from __main__ and screws up warnings location
209
 
            execfile(arg, globals())
210
 
        finally:
211
 
            globals().update(saved_globals)
 
204
        execfile(arg, globals())
212
205
    else:
213
206
        test = defaultTestLoader.loadTestsFromName(arg)
214
207
        result = _runner().run(test)
215
208
        sys.exit(not result.wasSuccessful())
216
209
 
217
210
 
218
 
def run_subprocess(arg, options):
 
211
def run_subprocess(args, options):
219
212
    from threading import Timer
220
213
    from mysubprocess import Popen, PIPE, STDOUT
221
214
 
224
217
                  '--verbosity', options.verbosity]
225
218
    if options.db:
226
219
        popen_args += ['--db', options.db]
227
 
    popen_args += [arg]
 
220
    popen_args += args
228
221
    popen_args = [str(x) for x in popen_args]
229
222
    if options.capture:
230
223
        popen = Popen(popen_args, stdout=PIPE, stderr=STDOUT, shell=False)
235
228
 
236
229
    def killer():
237
230
        retcode.append('TIMEOUT')
238
 
        print >> sys.stderr, 'Killing %s (%s) because of timeout' % (popen.pid, arg)
 
231
        print >> sys.stderr, 'Killing %s (%s) because of timeout' % (popen.pid, args)
239
232
        popen.kill()
240
233
 
241
234
    timeout = Timer(options.timeout, killer)
260
253
    finally:
261
254
        timeout.cancel()
262
255
    # QQQ compensating for run_tests' screw up
263
 
    module_name = arg
 
256
    module_name = args[0]
264
257
    if module_name.endswith('.py'):
265
258
        module_name = module_name[:-3]
266
259
    output = output.replace(' (__main__.', ' (' + module_name + '.')
267
260
    return retcode[0], output, output_printed
268
261
 
269
262
 
270
 
def spawn_subprocess(arg, options, base_params):
 
263
def spawn_subprocess(args, options, base_params):
271
264
    success = False
272
265
    if options.db:
273
 
        module_name = arg
 
266
        module_name = args[0]
274
267
        if module_name.endswith('.py'):
275
268
            module_name = module_name[:-3]
276
269
        from datetime import datetime
279
272
                       'test': module_name})
280
273
        row_id = store_record(options.db, 'test', params)
281
274
        params['id'] = row_id
282
 
    retcode, output, output_printed = run_subprocess(arg, options)
 
275
    retcode, output, output_printed = run_subprocess(args, options)
283
276
    if len(output) > OUTPUT_LIMIT:
284
 
        output = output[:OUTPUT_LIMIT] + '<AbridgedOutputWarning>'
 
277
        warn = '<AbridgedOutputWarning>'
 
278
        output = output[:OUTPUT_LIMIT - len(warn)] + warn
285
279
    if retcode:
286
280
        if retcode == 1 and 'test_support.TestSkipped' in output:
287
281
            pass
288
282
        else:
289
283
            if not output_printed and options.verbosity >= -1:
290
284
                sys.stdout.write(output)
291
 
            print '%s failed with code %s' % (arg, retcode)
 
285
            print '%s failed with code %s' % (' '.join(args), retcode)
292
286
    elif retcode == 0:
293
287
        if not output_printed and options.verbosity >= 1:
294
288
            sys.stdout.write(output)
295
289
        if options.verbosity >= 0:
296
 
            print '%s passed' % arg
 
290
            print '%s passed' % ' '.join(args)
297
291
        success = True
298
292
    else:
299
 
        print '%s timed out' % arg
 
293
        print '%s timed out' % ' '.join(args)
300
294
    if options.db:
301
295
        params['output'] = output
302
296
        params['retcode'] = retcode
309
303
              'python': '%s.%s.%s' % sys.version_info[:3],
310
304
              'changeset': get_changeset(),
311
305
              'libevent_version': get_libevent_version(),
312
 
              'libevent_method': get_libevent_method(),
313
306
              'uname': platform.uname()[0],
314
307
              'retcode': 'TIMEOUT'}
315
308
    success = True
316
309
    if not args:
317
310
        args = glob.glob('test_*.py')
318
311
        args.remove('test_support.py')
 
312
    real_args = []
319
313
    for arg in args:
 
314
        if os.path.exists(arg):
 
315
            real_args.append([arg])
 
316
        else:
 
317
            real_args[-1].append(arg)
 
318
    for arg in real_args:
320
319
        try:
321
320
            success = spawn_subprocess(arg, options, params) and success
322
321
        except Exception:
356
355
 
357
356
 
358
357
_warning_re = re.compile('\w*warning', re.I)
359
 
_error_re = re.compile(r'(?P<prefix>\s*)Traceback \(most recent call last\):' +
360
 
                       r'(\n(?P=prefix)[ \t]+[^\n]*)+\n(?P=prefix)(?P<error>[\w\.]+)')
361
358
 
362
359
 
363
360
def get_warnings(output):
379
376
    ... ZeroDivisionError: integer division or modulo by zero''')
380
377
    ['ZeroDivisionError']
381
378
    """
382
 
    return [x.group('error') for x in _error_re.finditer(output)]
 
379
    errors = []
 
380
    readtb = False
 
381
    for line in output.split('\n'):
 
382
        if 'Traceback (most recent call last):' in line:
 
383
            readtb = True
 
384
        else:
 
385
            if readtb:
 
386
                if line[:1] == ' ':
 
387
                    pass
 
388
                else:
 
389
                    errors.append(line.split(':')[0])
 
390
                    readtb = False
 
391
    return errors
383
392
 
384
393
 
385
394
def get_warning_stats(output):
421
430
        traceback_count -= 1
422
431
    items = counter.items()
423
432
    items.sort(key=lambda (a, b): -b)
424
 
    if traceback_count>0:
 
433
    if traceback_count > 0:
425
434
        items.append(('other traceback', traceback_count))
426
435
    result = []
427
436
    for name, count in items:
433
442
 
434
443
 
435
444
def get_info(output, test):
436
 
    output = output[:OUTPUT_LIMIT*2]
 
445
    output = output[:OUTPUT_LIMIT]
437
446
    traceback_stats, ignored_list = get_traceback_stats(output, test)
438
447
    warning_stats = get_warning_stats(output)
439
448
    result = traceback_stats + warning_stats
494
503
    parser.add_option('-v', '--verbose', default=0, action='count')
495
504
    parser.add_option('-q', '--quiet', default=0, action='count')
496
505
    parser.add_option('--verbosity', default=0, type='int', help=optparse.SUPPRESS_HELP)
497
 
    parser.add_option('--db')
 
506
    parser.add_option('--db', default='testresults.sqlite3')
 
507
    parser.add_option('--no-db', dest='db', action='store_false')
498
508
    parser.add_option('--runid')
499
509
    parser.add_option('--record', default=False, action='store_true')
500
510
    parser.add_option('--no-capture', dest='capture', default=True, action='store_false')
504
514
    options, args = parser.parse_args()
505
515
    options.verbosity += options.verbose - options.quiet
506
516
 
507
 
    if not options.db and sqlite3:
508
 
        if get_changeset().endswith('+'):
509
 
            options.db = get_tempnam()
 
517
    if options.db:
 
518
        if sqlite3:
 
519
            options.db = os.path.abspath(options.db)
 
520
            print 'Using the database: %s' % options.db
510
521
        else:
511
 
            options.db = 'testresults.sqlite3'
512
 
        print 'Using the database: %s' % options.db
513
 
    elif options.db and not sqlite3:
514
 
        sys.exit('Cannot access the database %r: no sqlite3 module found.' % (options.db, ))
 
522
            sys.stderr.write('Cannot access the database %r: no sqlite3 module found.\n' % (options.db, ))
 
523
            options.db = False
515
524
 
516
525
    if options.db:
517
526
        db = sqlite3.connect(options.db)