4
from test import test_support
13
_print_mutex = thread.allocate_lock()
15
def verbose_print(arg):
16
"""Helper function for printing out debugging output."""
17
if test_support.verbose:
22
class BasicThreadTest(unittest.TestCase):
25
self.done_mutex = thread.allocate_lock()
26
self.done_mutex.acquire()
27
self.running_mutex = thread.allocate_lock()
28
self.random_mutex = thread.allocate_lock()
33
class ThreadRunningTests(BasicThreadTest):
36
with self.running_mutex:
38
verbose_print("creating task %s" % self.next_ident)
39
thread.start_new_thread(self.task, (self.next_ident,))
42
def task(self, ident):
43
with self.random_mutex:
44
delay = random.random() / 10000.0
45
verbose_print("task %s will run for %sus" % (ident, round(delay*1e6)))
47
verbose_print("task %s done" % ident)
48
with self.running_mutex:
51
self.done_mutex.release()
53
def test_starting_threads(self):
54
# Basic test for thread creation.
55
for i in range(NUMTASKS):
57
verbose_print("waiting for tasks to complete...")
58
self.done_mutex.acquire()
59
verbose_print("all tasks done")
61
def test_stack_size(self):
62
# Various stack size tests.
63
self.assertEquals(thread.stack_size(), 0, "intial stack size is not 0")
66
self.assertEquals(thread.stack_size(), 0, "stack_size not reset to default")
68
if os.name not in ("nt", "os2", "posix"):
73
thread.stack_size(4096)
75
verbose_print("caught expected ValueError setting "
79
verbose_print("platform does not support changing thread stack "
83
fail_msg = "stack_size(%d) failed - should succeed"
84
for tss in (262144, 0x100000, 0):
85
thread.stack_size(tss)
86
self.assertEquals(thread.stack_size(), tss, fail_msg % tss)
87
verbose_print("successfully set stack_size(%d)" % tss)
89
for tss in (262144, 0x100000):
90
verbose_print("trying stack_size = (%d)" % tss)
92
for i in range(NUMTASKS):
95
verbose_print("waiting for all tasks to complete")
96
self.done_mutex.acquire()
97
verbose_print("all tasks done")
103
def __init__(self, num_threads):
104
self.num_threads = num_threads
106
self.checkin_mutex = thread.allocate_lock()
107
self.checkout_mutex = thread.allocate_lock()
108
self.checkout_mutex.acquire()
111
self.checkin_mutex.acquire()
112
self.waiting = self.waiting + 1
113
if self.waiting == self.num_threads:
114
self.waiting = self.num_threads - 1
115
self.checkout_mutex.release()
117
self.checkin_mutex.release()
119
self.checkout_mutex.acquire()
120
self.waiting = self.waiting - 1
121
if self.waiting == 0:
122
self.checkin_mutex.release()
124
self.checkout_mutex.release()
127
class BarrierTest(BasicThreadTest):
129
def test_barrier(self):
130
self.bar = Barrier(NUMTASKS)
131
self.running = NUMTASKS
132
for i in range(NUMTASKS):
133
thread.start_new_thread(self.task2, (i,))
134
verbose_print("waiting for tasks to end")
135
self.done_mutex.acquire()
136
verbose_print("tasks done")
138
def task2(self, ident):
139
for i in range(NUMTRIPS):
141
# give it a good chance to enter the next
142
# barrier before the others are all out
146
with self.random_mutex:
147
delay = random.random() / 10000.0
148
verbose_print("task %s will run for %sus" %
149
(ident, round(delay * 1e6)))
151
verbose_print("task %s entering %s" % (ident, i))
153
verbose_print("task %s leaving barrier" % ident)
154
with self.running_mutex:
156
# Must release mutex before releasing done, else the main thread can
157
# exit and set mutex to None as part of global teardown; then
158
# mutex.release() raises AttributeError.
159
finished = self.running == 0
161
self.done_mutex.release()
165
test_support.run_unittest(ThreadRunningTests, BarrierTest)
167
if __name__ == "__main__":