~ubuntu-branches/ubuntu/trusty/python3.4/trusty-proposed

« back to all changes in this revision

Viewing changes to Lib/test/test_importlib/test_locks.py

  • Committer: Package Import Robot
  • Author(s): Matthias Klose
  • Date: 2013-11-25 09:44:27 UTC
  • Revision ID: package-import@ubuntu.com-20131125094427-lzxj8ap5w01lmo7f
Tags: upstream-3.4~b1
ImportĀ upstreamĀ versionĀ 3.4~b1

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
from . import util
 
2
frozen_init, source_init = util.import_importlib('importlib')
 
3
frozen_bootstrap = frozen_init._bootstrap
 
4
source_bootstrap = source_init._bootstrap
 
5
 
 
6
import sys
 
7
import time
 
8
import unittest
 
9
import weakref
 
10
 
 
11
from test import support
 
12
 
 
13
try:
 
14
    import threading
 
15
except ImportError:
 
16
    threading = None
 
17
else:
 
18
    from test import lock_tests
 
19
 
 
20
if threading is not None:
 
21
    class ModuleLockAsRLockTests:
 
22
        locktype = classmethod(lambda cls: cls.LockType("some_lock"))
 
23
 
 
24
        # _is_owned() unsupported
 
25
        test__is_owned = None
 
26
        # acquire(blocking=False) unsupported
 
27
        test_try_acquire = None
 
28
        test_try_acquire_contended = None
 
29
        # `with` unsupported
 
30
        test_with = None
 
31
        # acquire(timeout=...) unsupported
 
32
        test_timeout = None
 
33
        # _release_save() unsupported
 
34
        test_release_save_unacquired = None
 
35
 
 
36
    class Frozen_ModuleLockAsRLockTests(ModuleLockAsRLockTests, lock_tests.RLockTests):
 
37
        LockType = frozen_bootstrap._ModuleLock
 
38
 
 
39
    class Source_ModuleLockAsRLockTests(ModuleLockAsRLockTests, lock_tests.RLockTests):
 
40
        LockType = source_bootstrap._ModuleLock
 
41
 
 
42
else:
 
43
    class Frozen_ModuleLockAsRLockTests(unittest.TestCase):
 
44
        pass
 
45
 
 
46
    class Source_ModuleLockAsRLockTests(unittest.TestCase):
 
47
        pass
 
48
 
 
49
 
 
50
class DeadlockAvoidanceTests:
 
51
 
 
52
    def setUp(self):
 
53
        try:
 
54
            self.old_switchinterval = sys.getswitchinterval()
 
55
            sys.setswitchinterval(0.000001)
 
56
        except AttributeError:
 
57
            self.old_switchinterval = None
 
58
 
 
59
    def tearDown(self):
 
60
        if self.old_switchinterval is not None:
 
61
            sys.setswitchinterval(self.old_switchinterval)
 
62
 
 
63
    def run_deadlock_avoidance_test(self, create_deadlock):
 
64
        NLOCKS = 10
 
65
        locks = [self.LockType(str(i)) for i in range(NLOCKS)]
 
66
        pairs = [(locks[i], locks[(i+1)%NLOCKS]) for i in range(NLOCKS)]
 
67
        if create_deadlock:
 
68
            NTHREADS = NLOCKS
 
69
        else:
 
70
            NTHREADS = NLOCKS - 1
 
71
        barrier = threading.Barrier(NTHREADS)
 
72
        results = []
 
73
        def _acquire(lock):
 
74
            """Try to acquire the lock. Return True on success, False on deadlock."""
 
75
            try:
 
76
                lock.acquire()
 
77
            except self.DeadlockError:
 
78
                return False
 
79
            else:
 
80
                return True
 
81
        def f():
 
82
            a, b = pairs.pop()
 
83
            ra = _acquire(a)
 
84
            barrier.wait()
 
85
            rb = _acquire(b)
 
86
            results.append((ra, rb))
 
87
            if rb:
 
88
                b.release()
 
89
            if ra:
 
90
                a.release()
 
91
        lock_tests.Bunch(f, NTHREADS).wait_for_finished()
 
92
        self.assertEqual(len(results), NTHREADS)
 
93
        return results
 
94
 
 
95
    def test_deadlock(self):
 
96
        results = self.run_deadlock_avoidance_test(True)
 
97
        # At least one of the threads detected a potential deadlock on its
 
98
        # second acquire() call.  It may be several of them, because the
 
99
        # deadlock avoidance mechanism is conservative.
 
100
        nb_deadlocks = results.count((True, False))
 
101
        self.assertGreaterEqual(nb_deadlocks, 1)
 
102
        self.assertEqual(results.count((True, True)), len(results) - nb_deadlocks)
 
103
 
 
104
    def test_no_deadlock(self):
 
105
        results = self.run_deadlock_avoidance_test(False)
 
106
        self.assertEqual(results.count((True, False)), 0)
 
107
        self.assertEqual(results.count((True, True)), len(results))
 
108
 
 
109
@unittest.skipUnless(threading, "threads needed for this test")
 
110
class Frozen_DeadlockAvoidanceTests(DeadlockAvoidanceTests, unittest.TestCase):
 
111
    LockType = frozen_bootstrap._ModuleLock
 
112
    DeadlockError = frozen_bootstrap._DeadlockError
 
113
 
 
114
@unittest.skipUnless(threading, "threads needed for this test")
 
115
class Source_DeadlockAvoidanceTests(DeadlockAvoidanceTests, unittest.TestCase):
 
116
    LockType = source_bootstrap._ModuleLock
 
117
    DeadlockError = source_bootstrap._DeadlockError
 
118
 
 
119
 
 
120
class LifetimeTests:
 
121
 
 
122
    def test_lock_lifetime(self):
 
123
        name = "xyzzy"
 
124
        self.assertNotIn(name, self.bootstrap._module_locks)
 
125
        lock = self.bootstrap._get_module_lock(name)
 
126
        self.assertIn(name, self.bootstrap._module_locks)
 
127
        wr = weakref.ref(lock)
 
128
        del lock
 
129
        support.gc_collect()
 
130
        self.assertNotIn(name, self.bootstrap._module_locks)
 
131
        self.assertIsNone(wr())
 
132
 
 
133
    def test_all_locks(self):
 
134
        support.gc_collect()
 
135
        self.assertEqual(0, len(self.bootstrap._module_locks),
 
136
                         self.bootstrap._module_locks)
 
137
 
 
138
class Frozen_LifetimeTests(LifetimeTests, unittest.TestCase):
 
139
    bootstrap = frozen_bootstrap
 
140
 
 
141
class Source_LifetimeTests(LifetimeTests, unittest.TestCase):
 
142
    bootstrap = source_bootstrap
 
143
 
 
144
 
 
145
@support.reap_threads
 
146
def test_main():
 
147
    support.run_unittest(Frozen_ModuleLockAsRLockTests,
 
148
                         Source_ModuleLockAsRLockTests,
 
149
                         Frozen_DeadlockAvoidanceTests,
 
150
                         Source_DeadlockAvoidanceTests,
 
151
                         Frozen_LifetimeTests,
 
152
                         Source_LifetimeTests)
 
153
 
 
154
 
 
155
if __name__ == '__main__':
 
156
    test_main()