2
frozen_init, source_init = util.import_importlib('importlib')
3
frozen_bootstrap = frozen_init._bootstrap
4
source_bootstrap = source_init._bootstrap
11
from test import support
18
from test import lock_tests
20
if threading is not None:
21
class ModuleLockAsRLockTests:
22
locktype = classmethod(lambda cls: cls.LockType("some_lock"))
24
# _is_owned() unsupported
26
# acquire(blocking=False) unsupported
27
test_try_acquire = None
28
test_try_acquire_contended = None
31
# acquire(timeout=...) unsupported
33
# _release_save() unsupported
34
test_release_save_unacquired = None
36
class Frozen_ModuleLockAsRLockTests(ModuleLockAsRLockTests, lock_tests.RLockTests):
37
LockType = frozen_bootstrap._ModuleLock
39
class Source_ModuleLockAsRLockTests(ModuleLockAsRLockTests, lock_tests.RLockTests):
40
LockType = source_bootstrap._ModuleLock
43
class Frozen_ModuleLockAsRLockTests(unittest.TestCase):
46
class Source_ModuleLockAsRLockTests(unittest.TestCase):
50
class DeadlockAvoidanceTests:
54
self.old_switchinterval = sys.getswitchinterval()
55
sys.setswitchinterval(0.000001)
56
except AttributeError:
57
self.old_switchinterval = None
60
if self.old_switchinterval is not None:
61
sys.setswitchinterval(self.old_switchinterval)
63
def run_deadlock_avoidance_test(self, create_deadlock):
65
locks = [self.LockType(str(i)) for i in range(NLOCKS)]
66
pairs = [(locks[i], locks[(i+1)%NLOCKS]) for i in range(NLOCKS)]
71
barrier = threading.Barrier(NTHREADS)
74
"""Try to acquire the lock. Return True on success, False on deadlock."""
77
except self.DeadlockError:
86
results.append((ra, rb))
91
lock_tests.Bunch(f, NTHREADS).wait_for_finished()
92
self.assertEqual(len(results), NTHREADS)
95
def test_deadlock(self):
96
results = self.run_deadlock_avoidance_test(True)
97
# At least one of the threads detected a potential deadlock on its
98
# second acquire() call. It may be several of them, because the
99
# deadlock avoidance mechanism is conservative.
100
nb_deadlocks = results.count((True, False))
101
self.assertGreaterEqual(nb_deadlocks, 1)
102
self.assertEqual(results.count((True, True)), len(results) - nb_deadlocks)
104
def test_no_deadlock(self):
105
results = self.run_deadlock_avoidance_test(False)
106
self.assertEqual(results.count((True, False)), 0)
107
self.assertEqual(results.count((True, True)), len(results))
109
@unittest.skipUnless(threading, "threads needed for this test")
110
class Frozen_DeadlockAvoidanceTests(DeadlockAvoidanceTests, unittest.TestCase):
111
LockType = frozen_bootstrap._ModuleLock
112
DeadlockError = frozen_bootstrap._DeadlockError
114
@unittest.skipUnless(threading, "threads needed for this test")
115
class Source_DeadlockAvoidanceTests(DeadlockAvoidanceTests, unittest.TestCase):
116
LockType = source_bootstrap._ModuleLock
117
DeadlockError = source_bootstrap._DeadlockError
122
def test_lock_lifetime(self):
124
self.assertNotIn(name, self.bootstrap._module_locks)
125
lock = self.bootstrap._get_module_lock(name)
126
self.assertIn(name, self.bootstrap._module_locks)
127
wr = weakref.ref(lock)
130
self.assertNotIn(name, self.bootstrap._module_locks)
131
self.assertIsNone(wr())
133
def test_all_locks(self):
135
self.assertEqual(0, len(self.bootstrap._module_locks),
136
self.bootstrap._module_locks)
138
class Frozen_LifetimeTests(LifetimeTests, unittest.TestCase):
139
bootstrap = frozen_bootstrap
141
class Source_LifetimeTests(LifetimeTests, unittest.TestCase):
142
bootstrap = source_bootstrap
145
@support.reap_threads
147
support.run_unittest(Frozen_ModuleLockAsRLockTests,
148
Source_ModuleLockAsRLockTests,
149
Frozen_DeadlockAvoidanceTests,
150
Source_DeadlockAvoidanceTests,
151
Frozen_LifetimeTests,
152
Source_LifetimeTests)
155
if __name__ == '__main__':