~malept/ubuntu/lucid/python2.6/dev-dependency-fix

« back to all changes in this revision

Viewing changes to Lib/test/test_thread.py

  • Committer: Bazaar Package Importer
  • Author(s): Matthias Klose
  • Date: 2009-02-13 12:51:00 UTC
  • Revision ID: james.westby@ubuntu.com-20090213125100-uufgcb9yeqzujpqw
Tags: upstream-2.6.1
ImportĀ upstreamĀ versionĀ 2.6.1

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
import os
 
2
import unittest
 
3
import random
 
4
from test import test_support
 
5
import thread
 
6
import time
 
7
 
 
8
 
 
9
NUMTASKS = 10
 
10
NUMTRIPS = 3
 
11
 
 
12
 
 
13
_print_mutex = thread.allocate_lock()
 
14
 
 
15
def verbose_print(arg):
 
16
    """Helper function for printing out debugging output."""
 
17
    if test_support.verbose:
 
18
        with _print_mutex:
 
19
            print arg
 
20
 
 
21
 
 
22
class BasicThreadTest(unittest.TestCase):
 
23
 
 
24
    def setUp(self):
 
25
        self.done_mutex = thread.allocate_lock()
 
26
        self.done_mutex.acquire()
 
27
        self.running_mutex = thread.allocate_lock()
 
28
        self.random_mutex = thread.allocate_lock()
 
29
        self.running = 0
 
30
        self.next_ident = 0
 
31
 
 
32
 
 
33
class ThreadRunningTests(BasicThreadTest):
 
34
 
 
35
    def newtask(self):
 
36
        with self.running_mutex:
 
37
            self.next_ident += 1
 
38
            verbose_print("creating task %s" % self.next_ident)
 
39
            thread.start_new_thread(self.task, (self.next_ident,))
 
40
            self.running += 1
 
41
 
 
42
    def task(self, ident):
 
43
        with self.random_mutex:
 
44
            delay = random.random() / 10000.0
 
45
        verbose_print("task %s will run for %sus" % (ident, round(delay*1e6)))
 
46
        time.sleep(delay)
 
47
        verbose_print("task %s done" % ident)
 
48
        with self.running_mutex:
 
49
            self.running -= 1
 
50
            if self.running == 0:
 
51
                self.done_mutex.release()
 
52
 
 
53
    def test_starting_threads(self):
 
54
        # Basic test for thread creation.
 
55
        for i in range(NUMTASKS):
 
56
            self.newtask()
 
57
        verbose_print("waiting for tasks to complete...")
 
58
        self.done_mutex.acquire()
 
59
        verbose_print("all tasks done")
 
60
 
 
61
    def test_stack_size(self):
 
62
        # Various stack size tests.
 
63
        self.assertEquals(thread.stack_size(), 0, "intial stack size is not 0")
 
64
 
 
65
        thread.stack_size(0)
 
66
        self.assertEquals(thread.stack_size(), 0, "stack_size not reset to default")
 
67
 
 
68
        if os.name not in ("nt", "os2", "posix"):
 
69
            return
 
70
 
 
71
        tss_supported = True
 
72
        try:
 
73
            thread.stack_size(4096)
 
74
        except ValueError:
 
75
            verbose_print("caught expected ValueError setting "
 
76
                            "stack_size(4096)")
 
77
        except thread.error:
 
78
            tss_supported = False
 
79
            verbose_print("platform does not support changing thread stack "
 
80
                            "size")
 
81
 
 
82
        if tss_supported:
 
83
            fail_msg = "stack_size(%d) failed - should succeed"
 
84
            for tss in (262144, 0x100000, 0):
 
85
                thread.stack_size(tss)
 
86
                self.assertEquals(thread.stack_size(), tss, fail_msg % tss)
 
87
                verbose_print("successfully set stack_size(%d)" % tss)
 
88
 
 
89
            for tss in (262144, 0x100000):
 
90
                verbose_print("trying stack_size = (%d)" % tss)
 
91
                self.next_ident = 0
 
92
                for i in range(NUMTASKS):
 
93
                    self.newtask()
 
94
 
 
95
                verbose_print("waiting for all tasks to complete")
 
96
                self.done_mutex.acquire()
 
97
                verbose_print("all tasks done")
 
98
 
 
99
            thread.stack_size(0)
 
100
 
 
101
 
 
102
class Barrier:
 
103
    def __init__(self, num_threads):
 
104
        self.num_threads = num_threads
 
105
        self.waiting = 0
 
106
        self.checkin_mutex  = thread.allocate_lock()
 
107
        self.checkout_mutex = thread.allocate_lock()
 
108
        self.checkout_mutex.acquire()
 
109
 
 
110
    def enter(self):
 
111
        self.checkin_mutex.acquire()
 
112
        self.waiting = self.waiting + 1
 
113
        if self.waiting == self.num_threads:
 
114
            self.waiting = self.num_threads - 1
 
115
            self.checkout_mutex.release()
 
116
            return
 
117
        self.checkin_mutex.release()
 
118
 
 
119
        self.checkout_mutex.acquire()
 
120
        self.waiting = self.waiting - 1
 
121
        if self.waiting == 0:
 
122
            self.checkin_mutex.release()
 
123
            return
 
124
        self.checkout_mutex.release()
 
125
 
 
126
 
 
127
class BarrierTest(BasicThreadTest):
 
128
 
 
129
    def test_barrier(self):
 
130
        self.bar = Barrier(NUMTASKS)
 
131
        self.running = NUMTASKS
 
132
        for i in range(NUMTASKS):
 
133
            thread.start_new_thread(self.task2, (i,))
 
134
        verbose_print("waiting for tasks to end")
 
135
        self.done_mutex.acquire()
 
136
        verbose_print("tasks done")
 
137
 
 
138
    def task2(self, ident):
 
139
        for i in range(NUMTRIPS):
 
140
            if ident == 0:
 
141
                # give it a good chance to enter the next
 
142
                # barrier before the others are all out
 
143
                # of the current one
 
144
                delay = 0
 
145
            else:
 
146
                with self.random_mutex:
 
147
                    delay = random.random() / 10000.0
 
148
            verbose_print("task %s will run for %sus" %
 
149
                          (ident, round(delay * 1e6)))
 
150
            time.sleep(delay)
 
151
            verbose_print("task %s entering %s" % (ident, i))
 
152
            self.bar.enter()
 
153
            verbose_print("task %s leaving barrier" % ident)
 
154
        with self.running_mutex:
 
155
            self.running -= 1
 
156
            # Must release mutex before releasing done, else the main thread can
 
157
            # exit and set mutex to None as part of global teardown; then
 
158
            # mutex.release() raises AttributeError.
 
159
            finished = self.running == 0
 
160
        if finished:
 
161
            self.done_mutex.release()
 
162
 
 
163
 
 
164
def test_main():
 
165
    test_support.run_unittest(ThreadRunningTests, BarrierTest)
 
166
 
 
167
if __name__ == "__main__":
 
168
    test_main()