2
# A test of `multiprocessing.Pool` class
11
# Functions used by test code
14
def calculate(func, args):
16
return '%s says that %s%s = %s' % (
17
multiprocessing.current_process().name,
18
func.__name__, args, result
21
def calculatestar(args):
22
return calculate(*args)
25
time.sleep(0.5*random.random())
29
time.sleep(0.5*random.random())
46
print 'cpu_count() = %d\n' % multiprocessing.cpu_count()
53
print 'Creating pool with %d processes\n' % PROCESSES
54
pool = multiprocessing.Pool(PROCESSES)
55
print 'pool = %s' % pool
62
TASKS = [(mul, (i, 7)) for i in range(10)] + \
63
[(plus, (i, 8)) for i in range(10)]
65
results = [pool.apply_async(calculate, t) for t in TASKS]
66
imap_it = pool.imap(calculatestar, TASKS)
67
imap_unordered_it = pool.imap_unordered(calculatestar, TASKS)
69
print 'Ordered results using pool.apply_async():'
74
print 'Ordered results using pool.imap():'
79
print 'Unordered results using pool.imap_unordered():'
80
for x in imap_unordered_it:
84
print 'Ordered results using pool.map() --- will block till complete:'
85
for x in pool.map(calculatestar, TASKS):
94
print 'def pow3(x): return x**3'
97
A = map(pow3, xrange(N))
98
print '\tmap(pow3, xrange(%d)):\n\t\t%s seconds' % \
102
B = pool.map(pow3, xrange(N))
103
print '\tpool.map(pow3, xrange(%d)):\n\t\t%s seconds' % \
107
C = list(pool.imap(pow3, xrange(N), chunksize=N//8))
108
print '\tlist(pool.imap(pow3, xrange(%d), chunksize=%d)):\n\t\t%s' \
109
' seconds' % (N, N//8, time.time() - t)
111
assert A == B == C, (len(A), len(B), len(C))
115
print 'def noop(x): pass'
116
print 'L = [None] * 1000000'
120
print '\tmap(noop, L):\n\t\t%s seconds' % \
124
B = pool.map(noop, L)
125
print '\tpool.map(noop, L):\n\t\t%s seconds' % \
129
C = list(pool.imap(noop, L, chunksize=len(L)//8))
130
print '\tlist(pool.imap(noop, L, chunksize=%d)):\n\t\t%s seconds' % \
131
(len(L)//8, time.time() - t)
133
assert A == B == C, (len(A), len(B), len(C))
139
# Test error handling
142
print 'Testing error handling:'
145
print pool.apply(f, (5,))
146
except ZeroDivisionError:
147
print '\tGot ZeroDivisionError as expected from pool.apply()'
149
raise AssertionError, 'expected ZeroDivisionError'
152
print pool.map(f, range(10))
153
except ZeroDivisionError:
154
print '\tGot ZeroDivisionError as expected from pool.map()'
156
raise AssertionError, 'expected ZeroDivisionError'
159
print list(pool.imap(f, range(10)))
160
except ZeroDivisionError:
161
print '\tGot ZeroDivisionError as expected from list(pool.imap())'
163
raise AssertionError, 'expected ZeroDivisionError'
165
it = pool.imap(f, range(10))
169
except ZeroDivisionError:
172
except StopIteration:
176
raise AssertionError, 'expected ZeroDivisionError'
179
print '\tGot ZeroDivisionError as expected from IMapIterator.next()'
186
print 'Testing ApplyResult.get() with timeout:',
187
res = pool.apply_async(calculate, TASKS[0])
191
sys.stdout.write('\n\t%s' % res.get(0.02))
193
except multiprocessing.TimeoutError:
194
sys.stdout.write('.')
198
print 'Testing IMapIterator.next() with timeout:',
199
it = pool.imap(calculatestar, TASKS)
203
sys.stdout.write('\n\t%s' % it.next(0.02))
204
except StopIteration:
206
except multiprocessing.TimeoutError:
207
sys.stdout.write('.')
215
print 'Testing callback:'
218
B = [56, 0, 1, 8, 27, 64, 125, 216, 343, 512, 729]
220
r = pool.apply_async(mul, (7, 8), callback=A.append)
223
r = pool.map_async(pow3, range(10), callback=A.extend)
227
print '\tcallbacks succeeded\n'
229
print '\t*** callbacks failed\n\t\t%s != %s\n' % (A, B)
232
# Check there are no outstanding tasks
235
assert not pool._cache, 'cache = %r' % pool._cache
238
# Check close() methods
241
print 'Testing close():'
243
for worker in pool._pool:
244
assert worker.is_alive()
246
result = pool.apply_async(time.sleep, [0.5])
250
assert result.get() is None
252
for worker in pool._pool:
253
assert not worker.is_alive()
255
print '\tclose() succeeded\n'
258
# Check terminate() method
261
print 'Testing terminate():'
263
pool = multiprocessing.Pool(2)
265
ignore = pool.apply(pow3, [2])
266
results = [pool.apply_async(time.sleep, [DELTA]) for i in range(100)]
270
for worker in pool._pool:
271
assert not worker.is_alive()
273
print '\tterminate() succeeded\n'
276
# Check garbage collection
279
print 'Testing garbage collection:'
281
pool = multiprocessing.Pool(2)
283
processes = pool._pool
284
ignore = pool.apply(pow3, [2])
285
results = [pool.apply_async(time.sleep, [DELTA]) for i in range(100)]
287
results = pool = None
289
time.sleep(DELTA * 2)
291
for worker in processes:
292
assert not worker.is_alive()
294
print '\tgarbage collection succeeded\n'
297
if __name__ == '__main__':
298
multiprocessing.freeze_support()
300
assert len(sys.argv) in (1, 2)
302
if len(sys.argv) == 1 or sys.argv[1] == 'processes':
303
print ' Using processes '.center(79, '-')
304
elif sys.argv[1] == 'threads':
305
print ' Using threads '.center(79, '-')
306
import multiprocessing.dummy as multiprocessing
308
print 'Usage:\n\t%s [processes | threads]' % sys.argv[0]