~pythonregexp2.7/python/issue2636-09-01+10

« back to all changes in this revision

Viewing changes to Doc/includes/mp_pool.py

  • Committer: Jeffrey C. "The TimeHorse" Jacobs
  • Date: 2008-09-22 21:39:45 UTC
  • mfrom: (39055.1.33 Regexp-2.7)
  • Revision ID: darklord@timehorse.com-20080922213945-23717m5eiqpamcyn
Merged in changes from the Single-Loop Engine branch.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
#
 
2
# A test of `multiprocessing.Pool` class
 
3
#
 
4
 
 
5
import multiprocessing
 
6
import time
 
7
import random
 
8
import sys
 
9
 
 
10
#
 
11
# Functions used by test code
 
12
#
 
13
 
 
14
def calculate(func, args):
 
15
    result = func(*args)
 
16
    return '%s says that %s%s = %s' % (
 
17
        multiprocessing.current_process().name,
 
18
        func.__name__, args, result
 
19
        )
 
20
 
 
21
def calculatestar(args):
 
22
    return calculate(*args)
 
23
 
 
24
def mul(a, b):
 
25
    time.sleep(0.5*random.random())
 
26
    return a * b
 
27
 
 
28
def plus(a, b):
 
29
    time.sleep(0.5*random.random())
 
30
    return a + b
 
31
 
 
32
def f(x):
 
33
    return 1.0 / (x-5.0)
 
34
 
 
35
def pow3(x):
 
36
    return x**3
 
37
 
 
38
def noop(x):
 
39
    pass
 
40
 
 
41
#
 
42
# Test code
 
43
#
 
44
 
 
45
def test():
 
46
    print 'cpu_count() = %d\n' % multiprocessing.cpu_count()
 
47
 
 
48
    #
 
49
    # Create pool
 
50
    #
 
51
 
 
52
    PROCESSES = 4
 
53
    print 'Creating pool with %d processes\n' % PROCESSES
 
54
    pool = multiprocessing.Pool(PROCESSES)
 
55
    print 'pool = %s' % pool
 
56
    print
 
57
 
 
58
    #
 
59
    # Tests
 
60
    #
 
61
 
 
62
    TASKS = [(mul, (i, 7)) for i in range(10)] + \
 
63
            [(plus, (i, 8)) for i in range(10)]
 
64
 
 
65
    results = [pool.apply_async(calculate, t) for t in TASKS]
 
66
    imap_it = pool.imap(calculatestar, TASKS)
 
67
    imap_unordered_it = pool.imap_unordered(calculatestar, TASKS)
 
68
 
 
69
    print 'Ordered results using pool.apply_async():'
 
70
    for r in results:
 
71
        print '\t', r.get()
 
72
    print
 
73
 
 
74
    print 'Ordered results using pool.imap():'
 
75
    for x in imap_it:
 
76
        print '\t', x
 
77
    print
 
78
 
 
79
    print 'Unordered results using pool.imap_unordered():'
 
80
    for x in imap_unordered_it:
 
81
        print '\t', x
 
82
    print
 
83
 
 
84
    print 'Ordered results using pool.map() --- will block till complete:'
 
85
    for x in pool.map(calculatestar, TASKS):
 
86
        print '\t', x
 
87
    print
 
88
 
 
89
    #
 
90
    # Simple benchmarks
 
91
    #
 
92
 
 
93
    N = 100000
 
94
    print 'def pow3(x): return x**3'
 
95
 
 
96
    t = time.time()
 
97
    A = map(pow3, xrange(N))
 
98
    print '\tmap(pow3, xrange(%d)):\n\t\t%s seconds' % \
 
99
          (N, time.time() - t)
 
100
 
 
101
    t = time.time()
 
102
    B = pool.map(pow3, xrange(N))
 
103
    print '\tpool.map(pow3, xrange(%d)):\n\t\t%s seconds' % \
 
104
          (N, time.time() - t)
 
105
 
 
106
    t = time.time()
 
107
    C = list(pool.imap(pow3, xrange(N), chunksize=N//8))
 
108
    print '\tlist(pool.imap(pow3, xrange(%d), chunksize=%d)):\n\t\t%s' \
 
109
          ' seconds' % (N, N//8, time.time() - t)
 
110
 
 
111
    assert A == B == C, (len(A), len(B), len(C))
 
112
    print
 
113
 
 
114
    L = [None] * 1000000
 
115
    print 'def noop(x): pass'
 
116
    print 'L = [None] * 1000000'
 
117
 
 
118
    t = time.time()
 
119
    A = map(noop, L)
 
120
    print '\tmap(noop, L):\n\t\t%s seconds' % \
 
121
          (time.time() - t)
 
122
 
 
123
    t = time.time()
 
124
    B = pool.map(noop, L)
 
125
    print '\tpool.map(noop, L):\n\t\t%s seconds' % \
 
126
          (time.time() - t)
 
127
 
 
128
    t = time.time()
 
129
    C = list(pool.imap(noop, L, chunksize=len(L)//8))
 
130
    print '\tlist(pool.imap(noop, L, chunksize=%d)):\n\t\t%s seconds' % \
 
131
          (len(L)//8, time.time() - t)
 
132
 
 
133
    assert A == B == C, (len(A), len(B), len(C))
 
134
    print
 
135
 
 
136
    del A, B, C, L
 
137
 
 
138
    #
 
139
    # Test error handling
 
140
    #
 
141
 
 
142
    print 'Testing error handling:'
 
143
 
 
144
    try:
 
145
        print pool.apply(f, (5,))
 
146
    except ZeroDivisionError:
 
147
        print '\tGot ZeroDivisionError as expected from pool.apply()'
 
148
    else:
 
149
        raise AssertionError, 'expected ZeroDivisionError'
 
150
 
 
151
    try:
 
152
        print pool.map(f, range(10))
 
153
    except ZeroDivisionError:
 
154
        print '\tGot ZeroDivisionError as expected from pool.map()'
 
155
    else:
 
156
        raise AssertionError, 'expected ZeroDivisionError'
 
157
 
 
158
    try:
 
159
        print list(pool.imap(f, range(10)))
 
160
    except ZeroDivisionError:
 
161
        print '\tGot ZeroDivisionError as expected from list(pool.imap())'
 
162
    else:
 
163
        raise AssertionError, 'expected ZeroDivisionError'
 
164
 
 
165
    it = pool.imap(f, range(10))
 
166
    for i in range(10):
 
167
        try:
 
168
            x = it.next()
 
169
        except ZeroDivisionError:
 
170
            if i == 5:
 
171
                pass
 
172
        except StopIteration:
 
173
            break
 
174
        else:
 
175
            if i == 5:
 
176
                raise AssertionError, 'expected ZeroDivisionError'
 
177
 
 
178
    assert i == 9
 
179
    print '\tGot ZeroDivisionError as expected from IMapIterator.next()'
 
180
    print
 
181
 
 
182
    #
 
183
    # Testing timeouts
 
184
    #
 
185
 
 
186
    print 'Testing ApplyResult.get() with timeout:',
 
187
    res = pool.apply_async(calculate, TASKS[0])
 
188
    while 1:
 
189
        sys.stdout.flush()
 
190
        try:
 
191
            sys.stdout.write('\n\t%s' % res.get(0.02))
 
192
            break
 
193
        except multiprocessing.TimeoutError:
 
194
            sys.stdout.write('.')
 
195
    print
 
196
    print
 
197
 
 
198
    print 'Testing IMapIterator.next() with timeout:',
 
199
    it = pool.imap(calculatestar, TASKS)
 
200
    while 1:
 
201
        sys.stdout.flush()
 
202
        try:
 
203
            sys.stdout.write('\n\t%s' % it.next(0.02))
 
204
        except StopIteration:
 
205
            break
 
206
        except multiprocessing.TimeoutError:
 
207
            sys.stdout.write('.')
 
208
    print
 
209
    print
 
210
 
 
211
    #
 
212
    # Testing callback
 
213
    #
 
214
 
 
215
    print 'Testing callback:'
 
216
 
 
217
    A = []
 
218
    B = [56, 0, 1, 8, 27, 64, 125, 216, 343, 512, 729]
 
219
 
 
220
    r = pool.apply_async(mul, (7, 8), callback=A.append)
 
221
    r.wait()
 
222
 
 
223
    r = pool.map_async(pow3, range(10), callback=A.extend)
 
224
    r.wait()
 
225
 
 
226
    if A == B:
 
227
        print '\tcallbacks succeeded\n'
 
228
    else:
 
229
        print '\t*** callbacks failed\n\t\t%s != %s\n' % (A, B)
 
230
 
 
231
    #
 
232
    # Check there are no outstanding tasks
 
233
    #
 
234
 
 
235
    assert not pool._cache, 'cache = %r' % pool._cache
 
236
 
 
237
    #
 
238
    # Check close() methods
 
239
    #
 
240
 
 
241
    print 'Testing close():'
 
242
 
 
243
    for worker in pool._pool:
 
244
        assert worker.is_alive()
 
245
 
 
246
    result = pool.apply_async(time.sleep, [0.5])
 
247
    pool.close()
 
248
    pool.join()
 
249
 
 
250
    assert result.get() is None
 
251
 
 
252
    for worker in pool._pool:
 
253
        assert not worker.is_alive()
 
254
 
 
255
    print '\tclose() succeeded\n'
 
256
 
 
257
    #
 
258
    # Check terminate() method
 
259
    #
 
260
 
 
261
    print 'Testing terminate():'
 
262
 
 
263
    pool = multiprocessing.Pool(2)
 
264
    DELTA = 0.1
 
265
    ignore = pool.apply(pow3, [2])
 
266
    results = [pool.apply_async(time.sleep, [DELTA]) for i in range(100)]
 
267
    pool.terminate()
 
268
    pool.join()
 
269
 
 
270
    for worker in pool._pool:
 
271
        assert not worker.is_alive()
 
272
 
 
273
    print '\tterminate() succeeded\n'
 
274
 
 
275
    #
 
276
    # Check garbage collection
 
277
    #
 
278
 
 
279
    print 'Testing garbage collection:'
 
280
 
 
281
    pool = multiprocessing.Pool(2)
 
282
    DELTA = 0.1
 
283
    processes = pool._pool
 
284
    ignore = pool.apply(pow3, [2])
 
285
    results = [pool.apply_async(time.sleep, [DELTA]) for i in range(100)]
 
286
 
 
287
    results = pool = None
 
288
 
 
289
    time.sleep(DELTA * 2)
 
290
 
 
291
    for worker in processes:
 
292
        assert not worker.is_alive()
 
293
 
 
294
    print '\tgarbage collection succeeded\n'
 
295
 
 
296
 
 
297
if __name__ == '__main__':
 
298
    multiprocessing.freeze_support()
 
299
 
 
300
    assert len(sys.argv) in (1, 2)
 
301
 
 
302
    if len(sys.argv) == 1 or sys.argv[1] == 'processes':
 
303
        print ' Using processes '.center(79, '-')
 
304
    elif sys.argv[1] == 'threads':
 
305
        print ' Using threads '.center(79, '-')
 
306
        import multiprocessing.dummy as multiprocessing
 
307
    else:
 
308
        print 'Usage:\n\t%s [processes | threads]' % sys.argv[0]
 
309
        raise SystemExit(2)
 
310
 
 
311
    test()