3
# Skip tests if _multiprocessing wasn't built.
4
test.support.import_module('_multiprocessing')
5
# Skip tests if sem_open implementation is broken.
6
test.support.import_module('multiprocessing.synchronize')
7
# import threading after _multiprocessing to raise a more revelant error
8
# message: "No module named _multiprocessing". _multiprocessing is not compiled
9
# without thread support.
10
test.support.import_module('threading')
12
from test.script_helper import assert_python_ok
20
from concurrent import futures
21
from concurrent.futures._base import (
22
PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future)
23
from concurrent.futures.process import BrokenProcessPool
26
def create_future(state=PENDING, exception=None, result=None):
29
f._exception = exception
34
PENDING_FUTURE = create_future(state=PENDING)
35
RUNNING_FUTURE = create_future(state=RUNNING)
36
CANCELLED_FUTURE = create_future(state=CANCELLED)
37
CANCELLED_AND_NOTIFIED_FUTURE = create_future(state=CANCELLED_AND_NOTIFIED)
38
EXCEPTION_FUTURE = create_future(state=FINISHED, exception=OSError())
39
SUCCESSFUL_FUTURE = create_future(state=FINISHED, result=42)
46
def sleep_and_raise(t):
48
raise Exception('this is an exception')
50
def sleep_and_print(t, msg):
56
class MyObject(object):
67
self.executor = self.executor_type(max_workers=self.worker_count)
68
except NotImplementedError as e:
70
self._prime_executor()
73
self.executor.shutdown(wait=True)
74
dt = time.time() - self.t1
75
if test.support.verbose:
76
print("%.2fs" % dt, end=' ')
77
self.assertLess(dt, 60, "synchronization issue: test lasted too long")
79
def _prime_executor(self):
80
# Make sure that the executor is ready to do work before running the
81
# tests. This should reduce the probability of timeouts in the tests.
82
futures = [self.executor.submit(time.sleep, 0.1)
83
for _ in range(self.worker_count)]
89
class ThreadPoolMixin(ExecutorMixin):
90
executor_type = futures.ThreadPoolExecutor
93
class ProcessPoolMixin(ExecutorMixin):
94
executor_type = futures.ProcessPoolExecutor
97
class ExecutorShutdownTest:
98
def test_run_after_shutdown(self):
99
self.executor.shutdown()
100
self.assertRaises(RuntimeError,
101
self.executor.submit,
104
def test_interpreter_shutdown(self):
105
# Test the atexit hook for shutdown of worker threads and processes
106
rc, out, err = assert_python_ok('-c', """if 1:
107
from concurrent.futures import {executor_type}
108
from time import sleep
109
from test.test_concurrent_futures import sleep_and_print
110
t = {executor_type}(5)
111
t.submit(sleep_and_print, 1.0, "apple")
112
""".format(executor_type=self.executor_type.__name__))
113
# Errors in atexit hooks don't change the process exit code, check
115
self.assertFalse(err)
116
self.assertEqual(out.strip(), b"apple")
118
def test_hang_issue12364(self):
119
fs = [self.executor.submit(time.sleep, 0.1) for _ in range(50)]
120
self.executor.shutdown()
125
class ThreadPoolShutdownTest(ThreadPoolMixin, ExecutorShutdownTest, unittest.TestCase):
126
def _prime_executor(self):
129
def test_threads_terminate(self):
130
self.executor.submit(mul, 21, 2)
131
self.executor.submit(mul, 6, 7)
132
self.executor.submit(mul, 3, 14)
133
self.assertEqual(len(self.executor._threads), 3)
134
self.executor.shutdown()
135
for t in self.executor._threads:
138
def test_context_manager_shutdown(self):
139
with futures.ThreadPoolExecutor(max_workers=5) as e:
141
self.assertEqual(list(e.map(abs, range(-5, 5))),
142
[5, 4, 3, 2, 1, 0, 1, 2, 3, 4])
144
for t in executor._threads:
147
def test_del_shutdown(self):
148
executor = futures.ThreadPoolExecutor(max_workers=5)
149
executor.map(abs, range(-5, 5))
150
threads = executor._threads
157
class ProcessPoolShutdownTest(ProcessPoolMixin, ExecutorShutdownTest, unittest.TestCase):
158
def _prime_executor(self):
161
def test_processes_terminate(self):
162
self.executor.submit(mul, 21, 2)
163
self.executor.submit(mul, 6, 7)
164
self.executor.submit(mul, 3, 14)
165
self.assertEqual(len(self.executor._processes), 5)
166
processes = self.executor._processes
167
self.executor.shutdown()
169
for p in processes.values():
172
def test_context_manager_shutdown(self):
173
with futures.ProcessPoolExecutor(max_workers=5) as e:
174
processes = e._processes
175
self.assertEqual(list(e.map(abs, range(-5, 5))),
176
[5, 4, 3, 2, 1, 0, 1, 2, 3, 4])
178
for p in processes.values():
181
def test_del_shutdown(self):
182
executor = futures.ProcessPoolExecutor(max_workers=5)
183
list(executor.map(abs, range(-5, 5)))
184
queue_management_thread = executor._queue_management_thread
185
processes = executor._processes
188
queue_management_thread.join()
189
for p in processes.values():
195
def test_first_completed(self):
196
future1 = self.executor.submit(mul, 21, 2)
197
future2 = self.executor.submit(time.sleep, 1.5)
199
done, not_done = futures.wait(
200
[CANCELLED_FUTURE, future1, future2],
201
return_when=futures.FIRST_COMPLETED)
203
self.assertEqual(set([future1]), done)
204
self.assertEqual(set([CANCELLED_FUTURE, future2]), not_done)
206
def test_first_completed_some_already_completed(self):
207
future1 = self.executor.submit(time.sleep, 1.5)
209
finished, pending = futures.wait(
210
[CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE, future1],
211
return_when=futures.FIRST_COMPLETED)
214
set([CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE]),
216
self.assertEqual(set([future1]), pending)
218
def test_first_exception(self):
219
future1 = self.executor.submit(mul, 2, 21)
220
future2 = self.executor.submit(sleep_and_raise, 1.5)
221
future3 = self.executor.submit(time.sleep, 3)
223
finished, pending = futures.wait(
224
[future1, future2, future3],
225
return_when=futures.FIRST_EXCEPTION)
227
self.assertEqual(set([future1, future2]), finished)
228
self.assertEqual(set([future3]), pending)
230
def test_first_exception_some_already_complete(self):
231
future1 = self.executor.submit(divmod, 21, 0)
232
future2 = self.executor.submit(time.sleep, 1.5)
234
finished, pending = futures.wait(
237
CANCELLED_AND_NOTIFIED_FUTURE,
239
return_when=futures.FIRST_EXCEPTION)
241
self.assertEqual(set([SUCCESSFUL_FUTURE,
242
CANCELLED_AND_NOTIFIED_FUTURE,
244
self.assertEqual(set([CANCELLED_FUTURE, future2]), pending)
246
def test_first_exception_one_already_failed(self):
247
future1 = self.executor.submit(time.sleep, 2)
249
finished, pending = futures.wait(
250
[EXCEPTION_FUTURE, future1],
251
return_when=futures.FIRST_EXCEPTION)
253
self.assertEqual(set([EXCEPTION_FUTURE]), finished)
254
self.assertEqual(set([future1]), pending)
256
def test_all_completed(self):
257
future1 = self.executor.submit(divmod, 2, 0)
258
future2 = self.executor.submit(mul, 2, 21)
260
finished, pending = futures.wait(
262
CANCELLED_AND_NOTIFIED_FUTURE,
266
return_when=futures.ALL_COMPLETED)
268
self.assertEqual(set([SUCCESSFUL_FUTURE,
269
CANCELLED_AND_NOTIFIED_FUTURE,
273
self.assertEqual(set(), pending)
275
def test_timeout(self):
276
future1 = self.executor.submit(mul, 6, 7)
277
future2 = self.executor.submit(time.sleep, 6)
279
finished, pending = futures.wait(
280
[CANCELLED_AND_NOTIFIED_FUTURE,
285
return_when=futures.ALL_COMPLETED)
287
self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE,
291
self.assertEqual(set([future2]), pending)
294
class ThreadPoolWaitTests(ThreadPoolMixin, WaitTests, unittest.TestCase):
296
def test_pending_calls_race(self):
297
# Issue #14406: multi-threaded race condition when waiting on all
299
event = threading.Event()
302
oldswitchinterval = sys.getswitchinterval()
303
sys.setswitchinterval(1e-6)
305
fs = {self.executor.submit(future_func) for i in range(100)}
307
futures.wait(fs, return_when=futures.ALL_COMPLETED)
309
sys.setswitchinterval(oldswitchinterval)
312
class ProcessPoolWaitTests(ProcessPoolMixin, WaitTests, unittest.TestCase):
316
class AsCompletedTests:
317
# TODO(brian@sweetapp.com): Should have a test with a non-zero timeout.
318
def test_no_timeout(self):
319
future1 = self.executor.submit(mul, 2, 21)
320
future2 = self.executor.submit(mul, 7, 6)
322
completed = set(futures.as_completed(
323
[CANCELLED_AND_NOTIFIED_FUTURE,
327
self.assertEqual(set(
328
[CANCELLED_AND_NOTIFIED_FUTURE,
334
def test_zero_timeout(self):
335
future1 = self.executor.submit(time.sleep, 2)
336
completed_futures = set()
338
for future in futures.as_completed(
339
[CANCELLED_AND_NOTIFIED_FUTURE,
344
completed_futures.add(future)
345
except futures.TimeoutError:
348
self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE,
354
class ThreadPoolAsCompletedTests(ThreadPoolMixin, AsCompletedTests, unittest.TestCase):
358
class ProcessPoolAsCompletedTests(ProcessPoolMixin, AsCompletedTests, unittest.TestCase):
363
# Executor.shutdown() and context manager usage is tested by
364
# ExecutorShutdownTest.
365
def test_submit(self):
366
future = self.executor.submit(pow, 2, 8)
367
self.assertEqual(256, future.result())
369
def test_submit_keyword(self):
370
future = self.executor.submit(mul, 2, y=8)
371
self.assertEqual(16, future.result())
375
list(self.executor.map(pow, range(10), range(10))),
376
list(map(pow, range(10), range(10))))
378
def test_map_exception(self):
379
i = self.executor.map(divmod, [1, 1, 1, 1], [2, 3, 0, 5])
380
self.assertEqual(i.__next__(), (0, 1))
381
self.assertEqual(i.__next__(), (0, 1))
382
self.assertRaises(ZeroDivisionError, i.__next__)
384
def test_map_timeout(self):
387
for i in self.executor.map(time.sleep,
391
except futures.TimeoutError:
394
self.fail('expected TimeoutError')
396
self.assertEqual([None, None], results)
398
def test_shutdown_race_issue12456(self):
399
# Issue #12456: race condition at shutdown where trying to post a
400
# sentinel in the call queue blocks (the queue is full while processes
402
self.executor.map(str, [2] * (self.worker_count + 1))
403
self.executor.shutdown()
405
@test.support.cpython_only
406
def test_no_stale_references(self):
407
# Issue #16284: check that the executors don't unnecessarily hang onto
409
my_object = MyObject()
410
my_object_collected = threading.Event()
411
my_object_callback = weakref.ref(
412
my_object, lambda obj: my_object_collected.set())
413
# Deliberately discarding the future.
414
self.executor.submit(my_object.my_method)
417
collected = my_object_collected.wait(timeout=5.0)
418
self.assertTrue(collected,
419
"Stale reference not collected within timeout.")
422
class ThreadPoolExecutorTest(ThreadPoolMixin, ExecutorTest, unittest.TestCase):
423
def test_map_submits_without_iteration(self):
424
"""Tests verifying issue 11777."""
426
def record_finished(n):
429
self.executor.map(record_finished, range(10))
430
self.executor.shutdown(wait=True)
431
self.assertCountEqual(finished, range(10))
434
class ProcessPoolExecutorTest(ProcessPoolMixin, ExecutorTest, unittest.TestCase):
435
def test_killed_child(self):
436
# When a child process is abruptly terminated, the whole pool gets
438
futures = [self.executor.submit(time.sleep, 3)]
439
# Get one of the processes, and terminate (kill) it
440
p = next(iter(self.executor._processes.values()))
443
self.assertRaises(BrokenProcessPool, fut.result)
444
# Submitting other jobs fails as well.
445
self.assertRaises(BrokenProcessPool, self.executor.submit, pow, 2, 8)
448
class FutureTests(unittest.TestCase):
449
def test_done_callback_with_result(self):
450
callback_result = None
451
def fn(callback_future):
452
nonlocal callback_result
453
callback_result = callback_future.result()
456
f.add_done_callback(fn)
458
self.assertEqual(5, callback_result)
460
def test_done_callback_with_exception(self):
461
callback_exception = None
462
def fn(callback_future):
463
nonlocal callback_exception
464
callback_exception = callback_future.exception()
467
f.add_done_callback(fn)
468
f.set_exception(Exception('test'))
469
self.assertEqual(('test',), callback_exception.args)
471
def test_done_callback_with_cancel(self):
473
def fn(callback_future):
474
nonlocal was_cancelled
475
was_cancelled = callback_future.cancelled()
478
f.add_done_callback(fn)
479
self.assertTrue(f.cancel())
480
self.assertTrue(was_cancelled)
482
def test_done_callback_raises(self):
483
with test.support.captured_stderr() as stderr:
484
raising_was_called = False
485
fn_was_called = False
487
def raising_fn(callback_future):
488
nonlocal raising_was_called
489
raising_was_called = True
490
raise Exception('doh!')
492
def fn(callback_future):
493
nonlocal fn_was_called
497
f.add_done_callback(raising_fn)
498
f.add_done_callback(fn)
500
self.assertTrue(raising_was_called)
501
self.assertTrue(fn_was_called)
502
self.assertIn('Exception: doh!', stderr.getvalue())
504
def test_done_callback_already_successful(self):
505
callback_result = None
506
def fn(callback_future):
507
nonlocal callback_result
508
callback_result = callback_future.result()
512
f.add_done_callback(fn)
513
self.assertEqual(5, callback_result)
515
def test_done_callback_already_failed(self):
516
callback_exception = None
517
def fn(callback_future):
518
nonlocal callback_exception
519
callback_exception = callback_future.exception()
522
f.set_exception(Exception('test'))
523
f.add_done_callback(fn)
524
self.assertEqual(('test',), callback_exception.args)
526
def test_done_callback_already_cancelled(self):
528
def fn(callback_future):
529
nonlocal was_cancelled
530
was_cancelled = callback_future.cancelled()
533
self.assertTrue(f.cancel())
534
f.add_done_callback(fn)
535
self.assertTrue(was_cancelled)
538
self.assertRegex(repr(PENDING_FUTURE),
539
'<Future at 0x[0-9a-f]+ state=pending>')
540
self.assertRegex(repr(RUNNING_FUTURE),
541
'<Future at 0x[0-9a-f]+ state=running>')
542
self.assertRegex(repr(CANCELLED_FUTURE),
543
'<Future at 0x[0-9a-f]+ state=cancelled>')
544
self.assertRegex(repr(CANCELLED_AND_NOTIFIED_FUTURE),
545
'<Future at 0x[0-9a-f]+ state=cancelled>')
547
repr(EXCEPTION_FUTURE),
548
'<Future at 0x[0-9a-f]+ state=finished raised OSError>')
550
repr(SUCCESSFUL_FUTURE),
551
'<Future at 0x[0-9a-f]+ state=finished returned int>')
554
def test_cancel(self):
555
f1 = create_future(state=PENDING)
556
f2 = create_future(state=RUNNING)
557
f3 = create_future(state=CANCELLED)
558
f4 = create_future(state=CANCELLED_AND_NOTIFIED)
559
f5 = create_future(state=FINISHED, exception=OSError())
560
f6 = create_future(state=FINISHED, result=5)
562
self.assertTrue(f1.cancel())
563
self.assertEqual(f1._state, CANCELLED)
565
self.assertFalse(f2.cancel())
566
self.assertEqual(f2._state, RUNNING)
568
self.assertTrue(f3.cancel())
569
self.assertEqual(f3._state, CANCELLED)
571
self.assertTrue(f4.cancel())
572
self.assertEqual(f4._state, CANCELLED_AND_NOTIFIED)
574
self.assertFalse(f5.cancel())
575
self.assertEqual(f5._state, FINISHED)
577
self.assertFalse(f6.cancel())
578
self.assertEqual(f6._state, FINISHED)
580
def test_cancelled(self):
581
self.assertFalse(PENDING_FUTURE.cancelled())
582
self.assertFalse(RUNNING_FUTURE.cancelled())
583
self.assertTrue(CANCELLED_FUTURE.cancelled())
584
self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.cancelled())
585
self.assertFalse(EXCEPTION_FUTURE.cancelled())
586
self.assertFalse(SUCCESSFUL_FUTURE.cancelled())
589
self.assertFalse(PENDING_FUTURE.done())
590
self.assertFalse(RUNNING_FUTURE.done())
591
self.assertTrue(CANCELLED_FUTURE.done())
592
self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.done())
593
self.assertTrue(EXCEPTION_FUTURE.done())
594
self.assertTrue(SUCCESSFUL_FUTURE.done())
596
def test_running(self):
597
self.assertFalse(PENDING_FUTURE.running())
598
self.assertTrue(RUNNING_FUTURE.running())
599
self.assertFalse(CANCELLED_FUTURE.running())
600
self.assertFalse(CANCELLED_AND_NOTIFIED_FUTURE.running())
601
self.assertFalse(EXCEPTION_FUTURE.running())
602
self.assertFalse(SUCCESSFUL_FUTURE.running())
604
def test_result_with_timeout(self):
605
self.assertRaises(futures.TimeoutError,
606
PENDING_FUTURE.result, timeout=0)
607
self.assertRaises(futures.TimeoutError,
608
RUNNING_FUTURE.result, timeout=0)
609
self.assertRaises(futures.CancelledError,
610
CANCELLED_FUTURE.result, timeout=0)
611
self.assertRaises(futures.CancelledError,
612
CANCELLED_AND_NOTIFIED_FUTURE.result, timeout=0)
613
self.assertRaises(OSError, EXCEPTION_FUTURE.result, timeout=0)
614
self.assertEqual(SUCCESSFUL_FUTURE.result(timeout=0), 42)
616
def test_result_with_success(self):
617
# TODO(brian@sweetapp.com): This test is timing dependant.
619
# Wait until the main thread is waiting for the result.
623
f1 = create_future(state=PENDING)
624
t = threading.Thread(target=notification)
627
self.assertEqual(f1.result(timeout=5), 42)
629
def test_result_with_cancel(self):
630
# TODO(brian@sweetapp.com): This test is timing dependant.
632
# Wait until the main thread is waiting for the result.
636
f1 = create_future(state=PENDING)
637
t = threading.Thread(target=notification)
640
self.assertRaises(futures.CancelledError, f1.result, timeout=5)
642
def test_exception_with_timeout(self):
643
self.assertRaises(futures.TimeoutError,
644
PENDING_FUTURE.exception, timeout=0)
645
self.assertRaises(futures.TimeoutError,
646
RUNNING_FUTURE.exception, timeout=0)
647
self.assertRaises(futures.CancelledError,
648
CANCELLED_FUTURE.exception, timeout=0)
649
self.assertRaises(futures.CancelledError,
650
CANCELLED_AND_NOTIFIED_FUTURE.exception, timeout=0)
651
self.assertTrue(isinstance(EXCEPTION_FUTURE.exception(timeout=0),
653
self.assertEqual(SUCCESSFUL_FUTURE.exception(timeout=0), None)
655
def test_exception_with_success(self):
657
# Wait until the main thread is waiting for the exception.
661
f1._exception = OSError()
662
f1._condition.notify_all()
664
f1 = create_future(state=PENDING)
665
t = threading.Thread(target=notification)
668
self.assertTrue(isinstance(f1.exception(timeout=5), OSError))
670
@test.support.reap_threads
673
test.support.run_unittest(__name__)
675
test.support.reap_children()
677
if __name__ == "__main__":