~ubuntu-branches/debian/sid/python2.7/sid

« back to all changes in this revision

Viewing changes to Lib/test/test_support.py

  • Committer: Package Import Robot
  • Author(s): Matthias Klose
  • Date: 2015-05-10 23:40:03 UTC
  • mfrom: (1.1.22)
  • Revision ID: package-import@ubuntu.com-20150510234003-8tohs1y1swc56hwf
Tags: 2.7.10~rc1-1
* Python 2.7.10 release candidate 1.
* Re-enable running the tests, re-enable the pgo/lto build.

Show diffs side-by-side

added added

removed removed

Lines of Context:
28
28
__all__ = ["Error", "TestFailed", "ResourceDenied", "import_module",
29
29
           "verbose", "use_resources", "max_memuse", "record_original_stdout",
30
30
           "get_original_stdout", "unload", "unlink", "rmtree", "forget",
31
 
           "is_resource_enabled", "requires", "find_unused_port", "bind_port",
 
31
           "is_resource_enabled", "requires", "requires_mac_ver",
 
32
           "find_unused_port", "bind_port",
32
33
           "fcmp", "have_unicode", "is_jython", "TESTFN", "HOST", "FUZZ",
33
34
           "SAVEDCWD", "temp_cwd", "findfile", "sortdict", "check_syntax_error",
34
35
           "open_urlresource", "check_warnings", "check_py3k_warnings",
36
37
           "captured_stdout", "TransientResource", "transient_internet",
37
38
           "run_with_locale", "set_memlimit", "bigmemtest", "bigaddrspacetest",
38
39
           "BasicTestRunner", "run_unittest", "run_doctest", "threading_setup",
39
 
           "threading_cleanup", "reap_children", "cpython_only",
 
40
           "threading_cleanup", "reap_threads", "start_threads", "cpython_only",
40
41
           "check_impl_detail", "get_attribute", "py3k_bytes",
41
42
           "import_fresh_module", "threading_cleanup", "reap_children",
42
43
           "strip_python_stderr", "IPV6_ENABLED"]
361
362
            msg = "Use of the `%s' resource not enabled" % resource
362
363
        raise ResourceDenied(msg)
363
364
 
 
365
def requires_mac_ver(*min_version):
 
366
    """Decorator raising SkipTest if the OS is Mac OS X and the OS X
 
367
    version if less than min_version.
 
368
 
 
369
    For example, @requires_mac_ver(10, 5) raises SkipTest if the OS X version
 
370
    is lesser than 10.5.
 
371
    """
 
372
    def decorator(func):
 
373
        @functools.wraps(func)
 
374
        def wrapper(*args, **kw):
 
375
            if sys.platform == 'darwin':
 
376
                version_txt = platform.mac_ver()[0]
 
377
                try:
 
378
                    version = tuple(map(int, version_txt.split('.')))
 
379
                except ValueError:
 
380
                    pass
 
381
                else:
 
382
                    if version < min_version:
 
383
                        min_version_txt = '.'.join(map(str, min_version))
 
384
                        raise unittest.SkipTest(
 
385
                            "Mac OS X %s or higher required, not %s"
 
386
                            % (min_version_txt, version_txt))
 
387
            return func(*args, **kw)
 
388
        wrapper.min_version = min_version
 
389
        return wrapper
 
390
    return decorator
 
391
 
364
392
 
365
393
# Don't use "localhost", since resolving it uses the DNS under recent
366
394
# Windows versions (see issue #18792).
1481
1509
                break
1482
1510
 
1483
1511
@contextlib.contextmanager
 
1512
def start_threads(threads, unlock=None):
 
1513
    threads = list(threads)
 
1514
    started = []
 
1515
    try:
 
1516
        try:
 
1517
            for t in threads:
 
1518
                t.start()
 
1519
                started.append(t)
 
1520
        except:
 
1521
            if verbose:
 
1522
                print("Can't start %d threads, only %d threads started" %
 
1523
                      (len(threads), len(started)))
 
1524
            raise
 
1525
        yield
 
1526
    finally:
 
1527
        if unlock:
 
1528
            unlock()
 
1529
        endtime = starttime = time.time()
 
1530
        for timeout in range(1, 16):
 
1531
            endtime += 60
 
1532
            for t in started:
 
1533
                t.join(max(endtime - time.time(), 0.01))
 
1534
            started = [t for t in started if t.isAlive()]
 
1535
            if not started:
 
1536
                break
 
1537
            if verbose:
 
1538
                print('Unable to join %d threads during a period of '
 
1539
                      '%d minutes' % (len(started), timeout))
 
1540
    started = [t for t in started if t.isAlive()]
 
1541
    if started:
 
1542
        raise AssertionError('Unable to join %d threads' % len(started))
 
1543
 
 
1544
@contextlib.contextmanager
1484
1545
def swap_attr(obj, attr, new_val):
1485
1546
    """Temporary swap out an attribute with a new object.
1486
1547