~ubuntu-branches/ubuntu/trusty/python3.4/trusty-proposed

« back to all changes in this revision

Viewing changes to Lib/test/test_concurrent_futures.py

  • Committer: Package Import Robot
  • Author(s): Matthias Klose
  • Date: 2013-11-25 09:44:27 UTC
  • Revision ID: package-import@ubuntu.com-20131125094427-lzxj8ap5w01lmo7f
Tags: upstream-3.4~b1
ImportĀ upstreamĀ versionĀ 3.4~b1

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
import test.support
 
2
 
 
3
# Skip tests if _multiprocessing wasn't built.
 
4
test.support.import_module('_multiprocessing')
 
5
# Skip tests if sem_open implementation is broken.
 
6
test.support.import_module('multiprocessing.synchronize')
 
7
# import threading after _multiprocessing to raise a more revelant error
 
8
# message: "No module named _multiprocessing". _multiprocessing is not compiled
 
9
# without thread support.
 
10
test.support.import_module('threading')
 
11
 
 
12
from test.script_helper import assert_python_ok
 
13
 
 
14
import sys
 
15
import threading
 
16
import time
 
17
import unittest
 
18
import weakref
 
19
 
 
20
from concurrent import futures
 
21
from concurrent.futures._base import (
 
22
    PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future)
 
23
from concurrent.futures.process import BrokenProcessPool
 
24
 
 
25
 
 
26
def create_future(state=PENDING, exception=None, result=None):
 
27
    f = Future()
 
28
    f._state = state
 
29
    f._exception = exception
 
30
    f._result = result
 
31
    return f
 
32
 
 
33
 
 
34
PENDING_FUTURE = create_future(state=PENDING)
 
35
RUNNING_FUTURE = create_future(state=RUNNING)
 
36
CANCELLED_FUTURE = create_future(state=CANCELLED)
 
37
CANCELLED_AND_NOTIFIED_FUTURE = create_future(state=CANCELLED_AND_NOTIFIED)
 
38
EXCEPTION_FUTURE = create_future(state=FINISHED, exception=OSError())
 
39
SUCCESSFUL_FUTURE = create_future(state=FINISHED, result=42)
 
40
 
 
41
 
 
42
def mul(x, y):
 
43
    return x * y
 
44
 
 
45
 
 
46
def sleep_and_raise(t):
 
47
    time.sleep(t)
 
48
    raise Exception('this is an exception')
 
49
 
 
50
def sleep_and_print(t, msg):
 
51
    time.sleep(t)
 
52
    print(msg)
 
53
    sys.stdout.flush()
 
54
 
 
55
 
 
56
class MyObject(object):
 
57
    def my_method(self):
 
58
        pass
 
59
 
 
60
 
 
61
class ExecutorMixin:
 
62
    worker_count = 5
 
63
 
 
64
    def setUp(self):
 
65
        self.t1 = time.time()
 
66
        try:
 
67
            self.executor = self.executor_type(max_workers=self.worker_count)
 
68
        except NotImplementedError as e:
 
69
            self.skipTest(str(e))
 
70
        self._prime_executor()
 
71
 
 
72
    def tearDown(self):
 
73
        self.executor.shutdown(wait=True)
 
74
        dt = time.time() - self.t1
 
75
        if test.support.verbose:
 
76
            print("%.2fs" % dt, end=' ')
 
77
        self.assertLess(dt, 60, "synchronization issue: test lasted too long")
 
78
 
 
79
    def _prime_executor(self):
 
80
        # Make sure that the executor is ready to do work before running the
 
81
        # tests. This should reduce the probability of timeouts in the tests.
 
82
        futures = [self.executor.submit(time.sleep, 0.1)
 
83
                   for _ in range(self.worker_count)]
 
84
 
 
85
        for f in futures:
 
86
            f.result()
 
87
 
 
88
 
 
89
class ThreadPoolMixin(ExecutorMixin):
 
90
    executor_type = futures.ThreadPoolExecutor
 
91
 
 
92
 
 
93
class ProcessPoolMixin(ExecutorMixin):
 
94
    executor_type = futures.ProcessPoolExecutor
 
95
 
 
96
 
 
97
class ExecutorShutdownTest:
 
98
    def test_run_after_shutdown(self):
 
99
        self.executor.shutdown()
 
100
        self.assertRaises(RuntimeError,
 
101
                          self.executor.submit,
 
102
                          pow, 2, 5)
 
103
 
 
104
    def test_interpreter_shutdown(self):
 
105
        # Test the atexit hook for shutdown of worker threads and processes
 
106
        rc, out, err = assert_python_ok('-c', """if 1:
 
107
            from concurrent.futures import {executor_type}
 
108
            from time import sleep
 
109
            from test.test_concurrent_futures import sleep_and_print
 
110
            t = {executor_type}(5)
 
111
            t.submit(sleep_and_print, 1.0, "apple")
 
112
            """.format(executor_type=self.executor_type.__name__))
 
113
        # Errors in atexit hooks don't change the process exit code, check
 
114
        # stderr manually.
 
115
        self.assertFalse(err)
 
116
        self.assertEqual(out.strip(), b"apple")
 
117
 
 
118
    def test_hang_issue12364(self):
 
119
        fs = [self.executor.submit(time.sleep, 0.1) for _ in range(50)]
 
120
        self.executor.shutdown()
 
121
        for f in fs:
 
122
            f.result()
 
123
 
 
124
 
 
125
class ThreadPoolShutdownTest(ThreadPoolMixin, ExecutorShutdownTest, unittest.TestCase):
 
126
    def _prime_executor(self):
 
127
        pass
 
128
 
 
129
    def test_threads_terminate(self):
 
130
        self.executor.submit(mul, 21, 2)
 
131
        self.executor.submit(mul, 6, 7)
 
132
        self.executor.submit(mul, 3, 14)
 
133
        self.assertEqual(len(self.executor._threads), 3)
 
134
        self.executor.shutdown()
 
135
        for t in self.executor._threads:
 
136
            t.join()
 
137
 
 
138
    def test_context_manager_shutdown(self):
 
139
        with futures.ThreadPoolExecutor(max_workers=5) as e:
 
140
            executor = e
 
141
            self.assertEqual(list(e.map(abs, range(-5, 5))),
 
142
                             [5, 4, 3, 2, 1, 0, 1, 2, 3, 4])
 
143
 
 
144
        for t in executor._threads:
 
145
            t.join()
 
146
 
 
147
    def test_del_shutdown(self):
 
148
        executor = futures.ThreadPoolExecutor(max_workers=5)
 
149
        executor.map(abs, range(-5, 5))
 
150
        threads = executor._threads
 
151
        del executor
 
152
 
 
153
        for t in threads:
 
154
            t.join()
 
155
 
 
156
 
 
157
class ProcessPoolShutdownTest(ProcessPoolMixin, ExecutorShutdownTest, unittest.TestCase):
 
158
    def _prime_executor(self):
 
159
        pass
 
160
 
 
161
    def test_processes_terminate(self):
 
162
        self.executor.submit(mul, 21, 2)
 
163
        self.executor.submit(mul, 6, 7)
 
164
        self.executor.submit(mul, 3, 14)
 
165
        self.assertEqual(len(self.executor._processes), 5)
 
166
        processes = self.executor._processes
 
167
        self.executor.shutdown()
 
168
 
 
169
        for p in processes.values():
 
170
            p.join()
 
171
 
 
172
    def test_context_manager_shutdown(self):
 
173
        with futures.ProcessPoolExecutor(max_workers=5) as e:
 
174
            processes = e._processes
 
175
            self.assertEqual(list(e.map(abs, range(-5, 5))),
 
176
                             [5, 4, 3, 2, 1, 0, 1, 2, 3, 4])
 
177
 
 
178
        for p in processes.values():
 
179
            p.join()
 
180
 
 
181
    def test_del_shutdown(self):
 
182
        executor = futures.ProcessPoolExecutor(max_workers=5)
 
183
        list(executor.map(abs, range(-5, 5)))
 
184
        queue_management_thread = executor._queue_management_thread
 
185
        processes = executor._processes
 
186
        del executor
 
187
 
 
188
        queue_management_thread.join()
 
189
        for p in processes.values():
 
190
            p.join()
 
191
 
 
192
 
 
193
class WaitTests:
 
194
 
 
195
    def test_first_completed(self):
 
196
        future1 = self.executor.submit(mul, 21, 2)
 
197
        future2 = self.executor.submit(time.sleep, 1.5)
 
198
 
 
199
        done, not_done = futures.wait(
 
200
                [CANCELLED_FUTURE, future1, future2],
 
201
                 return_when=futures.FIRST_COMPLETED)
 
202
 
 
203
        self.assertEqual(set([future1]), done)
 
204
        self.assertEqual(set([CANCELLED_FUTURE, future2]), not_done)
 
205
 
 
206
    def test_first_completed_some_already_completed(self):
 
207
        future1 = self.executor.submit(time.sleep, 1.5)
 
208
 
 
209
        finished, pending = futures.wait(
 
210
                 [CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE, future1],
 
211
                 return_when=futures.FIRST_COMPLETED)
 
212
 
 
213
        self.assertEqual(
 
214
                set([CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE]),
 
215
                finished)
 
216
        self.assertEqual(set([future1]), pending)
 
217
 
 
218
    def test_first_exception(self):
 
219
        future1 = self.executor.submit(mul, 2, 21)
 
220
        future2 = self.executor.submit(sleep_and_raise, 1.5)
 
221
        future3 = self.executor.submit(time.sleep, 3)
 
222
 
 
223
        finished, pending = futures.wait(
 
224
                [future1, future2, future3],
 
225
                return_when=futures.FIRST_EXCEPTION)
 
226
 
 
227
        self.assertEqual(set([future1, future2]), finished)
 
228
        self.assertEqual(set([future3]), pending)
 
229
 
 
230
    def test_first_exception_some_already_complete(self):
 
231
        future1 = self.executor.submit(divmod, 21, 0)
 
232
        future2 = self.executor.submit(time.sleep, 1.5)
 
233
 
 
234
        finished, pending = futures.wait(
 
235
                [SUCCESSFUL_FUTURE,
 
236
                 CANCELLED_FUTURE,
 
237
                 CANCELLED_AND_NOTIFIED_FUTURE,
 
238
                 future1, future2],
 
239
                return_when=futures.FIRST_EXCEPTION)
 
240
 
 
241
        self.assertEqual(set([SUCCESSFUL_FUTURE,
 
242
                              CANCELLED_AND_NOTIFIED_FUTURE,
 
243
                              future1]), finished)
 
244
        self.assertEqual(set([CANCELLED_FUTURE, future2]), pending)
 
245
 
 
246
    def test_first_exception_one_already_failed(self):
 
247
        future1 = self.executor.submit(time.sleep, 2)
 
248
 
 
249
        finished, pending = futures.wait(
 
250
                 [EXCEPTION_FUTURE, future1],
 
251
                 return_when=futures.FIRST_EXCEPTION)
 
252
 
 
253
        self.assertEqual(set([EXCEPTION_FUTURE]), finished)
 
254
        self.assertEqual(set([future1]), pending)
 
255
 
 
256
    def test_all_completed(self):
 
257
        future1 = self.executor.submit(divmod, 2, 0)
 
258
        future2 = self.executor.submit(mul, 2, 21)
 
259
 
 
260
        finished, pending = futures.wait(
 
261
                [SUCCESSFUL_FUTURE,
 
262
                 CANCELLED_AND_NOTIFIED_FUTURE,
 
263
                 EXCEPTION_FUTURE,
 
264
                 future1,
 
265
                 future2],
 
266
                return_when=futures.ALL_COMPLETED)
 
267
 
 
268
        self.assertEqual(set([SUCCESSFUL_FUTURE,
 
269
                              CANCELLED_AND_NOTIFIED_FUTURE,
 
270
                              EXCEPTION_FUTURE,
 
271
                              future1,
 
272
                              future2]), finished)
 
273
        self.assertEqual(set(), pending)
 
274
 
 
275
    def test_timeout(self):
 
276
        future1 = self.executor.submit(mul, 6, 7)
 
277
        future2 = self.executor.submit(time.sleep, 6)
 
278
 
 
279
        finished, pending = futures.wait(
 
280
                [CANCELLED_AND_NOTIFIED_FUTURE,
 
281
                 EXCEPTION_FUTURE,
 
282
                 SUCCESSFUL_FUTURE,
 
283
                 future1, future2],
 
284
                timeout=5,
 
285
                return_when=futures.ALL_COMPLETED)
 
286
 
 
287
        self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE,
 
288
                              EXCEPTION_FUTURE,
 
289
                              SUCCESSFUL_FUTURE,
 
290
                              future1]), finished)
 
291
        self.assertEqual(set([future2]), pending)
 
292
 
 
293
 
 
294
class ThreadPoolWaitTests(ThreadPoolMixin, WaitTests, unittest.TestCase):
 
295
 
 
296
    def test_pending_calls_race(self):
 
297
        # Issue #14406: multi-threaded race condition when waiting on all
 
298
        # futures.
 
299
        event = threading.Event()
 
300
        def future_func():
 
301
            event.wait()
 
302
        oldswitchinterval = sys.getswitchinterval()
 
303
        sys.setswitchinterval(1e-6)
 
304
        try:
 
305
            fs = {self.executor.submit(future_func) for i in range(100)}
 
306
            event.set()
 
307
            futures.wait(fs, return_when=futures.ALL_COMPLETED)
 
308
        finally:
 
309
            sys.setswitchinterval(oldswitchinterval)
 
310
 
 
311
 
 
312
class ProcessPoolWaitTests(ProcessPoolMixin, WaitTests, unittest.TestCase):
 
313
    pass
 
314
 
 
315
 
 
316
class AsCompletedTests:
 
317
    # TODO(brian@sweetapp.com): Should have a test with a non-zero timeout.
 
318
    def test_no_timeout(self):
 
319
        future1 = self.executor.submit(mul, 2, 21)
 
320
        future2 = self.executor.submit(mul, 7, 6)
 
321
 
 
322
        completed = set(futures.as_completed(
 
323
                [CANCELLED_AND_NOTIFIED_FUTURE,
 
324
                 EXCEPTION_FUTURE,
 
325
                 SUCCESSFUL_FUTURE,
 
326
                 future1, future2]))
 
327
        self.assertEqual(set(
 
328
                [CANCELLED_AND_NOTIFIED_FUTURE,
 
329
                 EXCEPTION_FUTURE,
 
330
                 SUCCESSFUL_FUTURE,
 
331
                 future1, future2]),
 
332
                completed)
 
333
 
 
334
    def test_zero_timeout(self):
 
335
        future1 = self.executor.submit(time.sleep, 2)
 
336
        completed_futures = set()
 
337
        try:
 
338
            for future in futures.as_completed(
 
339
                    [CANCELLED_AND_NOTIFIED_FUTURE,
 
340
                     EXCEPTION_FUTURE,
 
341
                     SUCCESSFUL_FUTURE,
 
342
                     future1],
 
343
                    timeout=0):
 
344
                completed_futures.add(future)
 
345
        except futures.TimeoutError:
 
346
            pass
 
347
 
 
348
        self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE,
 
349
                              EXCEPTION_FUTURE,
 
350
                              SUCCESSFUL_FUTURE]),
 
351
                         completed_futures)
 
352
 
 
353
 
 
354
class ThreadPoolAsCompletedTests(ThreadPoolMixin, AsCompletedTests, unittest.TestCase):
 
355
    pass
 
356
 
 
357
 
 
358
class ProcessPoolAsCompletedTests(ProcessPoolMixin, AsCompletedTests, unittest.TestCase):
 
359
    pass
 
360
 
 
361
 
 
362
class ExecutorTest:
 
363
    # Executor.shutdown() and context manager usage is tested by
 
364
    # ExecutorShutdownTest.
 
365
    def test_submit(self):
 
366
        future = self.executor.submit(pow, 2, 8)
 
367
        self.assertEqual(256, future.result())
 
368
 
 
369
    def test_submit_keyword(self):
 
370
        future = self.executor.submit(mul, 2, y=8)
 
371
        self.assertEqual(16, future.result())
 
372
 
 
373
    def test_map(self):
 
374
        self.assertEqual(
 
375
                list(self.executor.map(pow, range(10), range(10))),
 
376
                list(map(pow, range(10), range(10))))
 
377
 
 
378
    def test_map_exception(self):
 
379
        i = self.executor.map(divmod, [1, 1, 1, 1], [2, 3, 0, 5])
 
380
        self.assertEqual(i.__next__(), (0, 1))
 
381
        self.assertEqual(i.__next__(), (0, 1))
 
382
        self.assertRaises(ZeroDivisionError, i.__next__)
 
383
 
 
384
    def test_map_timeout(self):
 
385
        results = []
 
386
        try:
 
387
            for i in self.executor.map(time.sleep,
 
388
                                       [0, 0, 6],
 
389
                                       timeout=5):
 
390
                results.append(i)
 
391
        except futures.TimeoutError:
 
392
            pass
 
393
        else:
 
394
            self.fail('expected TimeoutError')
 
395
 
 
396
        self.assertEqual([None, None], results)
 
397
 
 
398
    def test_shutdown_race_issue12456(self):
 
399
        # Issue #12456: race condition at shutdown where trying to post a
 
400
        # sentinel in the call queue blocks (the queue is full while processes
 
401
        # have exited).
 
402
        self.executor.map(str, [2] * (self.worker_count + 1))
 
403
        self.executor.shutdown()
 
404
 
 
405
    @test.support.cpython_only
 
406
    def test_no_stale_references(self):
 
407
        # Issue #16284: check that the executors don't unnecessarily hang onto
 
408
        # references.
 
409
        my_object = MyObject()
 
410
        my_object_collected = threading.Event()
 
411
        my_object_callback = weakref.ref(
 
412
            my_object, lambda obj: my_object_collected.set())
 
413
        # Deliberately discarding the future.
 
414
        self.executor.submit(my_object.my_method)
 
415
        del my_object
 
416
 
 
417
        collected = my_object_collected.wait(timeout=5.0)
 
418
        self.assertTrue(collected,
 
419
                        "Stale reference not collected within timeout.")
 
420
 
 
421
 
 
422
class ThreadPoolExecutorTest(ThreadPoolMixin, ExecutorTest, unittest.TestCase):
 
423
    def test_map_submits_without_iteration(self):
 
424
        """Tests verifying issue 11777."""
 
425
        finished = []
 
426
        def record_finished(n):
 
427
            finished.append(n)
 
428
 
 
429
        self.executor.map(record_finished, range(10))
 
430
        self.executor.shutdown(wait=True)
 
431
        self.assertCountEqual(finished, range(10))
 
432
 
 
433
 
 
434
class ProcessPoolExecutorTest(ProcessPoolMixin, ExecutorTest, unittest.TestCase):
 
435
    def test_killed_child(self):
 
436
        # When a child process is abruptly terminated, the whole pool gets
 
437
        # "broken".
 
438
        futures = [self.executor.submit(time.sleep, 3)]
 
439
        # Get one of the processes, and terminate (kill) it
 
440
        p = next(iter(self.executor._processes.values()))
 
441
        p.terminate()
 
442
        for fut in futures:
 
443
            self.assertRaises(BrokenProcessPool, fut.result)
 
444
        # Submitting other jobs fails as well.
 
445
        self.assertRaises(BrokenProcessPool, self.executor.submit, pow, 2, 8)
 
446
 
 
447
 
 
448
class FutureTests(unittest.TestCase):
 
449
    def test_done_callback_with_result(self):
 
450
        callback_result = None
 
451
        def fn(callback_future):
 
452
            nonlocal callback_result
 
453
            callback_result = callback_future.result()
 
454
 
 
455
        f = Future()
 
456
        f.add_done_callback(fn)
 
457
        f.set_result(5)
 
458
        self.assertEqual(5, callback_result)
 
459
 
 
460
    def test_done_callback_with_exception(self):
 
461
        callback_exception = None
 
462
        def fn(callback_future):
 
463
            nonlocal callback_exception
 
464
            callback_exception = callback_future.exception()
 
465
 
 
466
        f = Future()
 
467
        f.add_done_callback(fn)
 
468
        f.set_exception(Exception('test'))
 
469
        self.assertEqual(('test',), callback_exception.args)
 
470
 
 
471
    def test_done_callback_with_cancel(self):
 
472
        was_cancelled = None
 
473
        def fn(callback_future):
 
474
            nonlocal was_cancelled
 
475
            was_cancelled = callback_future.cancelled()
 
476
 
 
477
        f = Future()
 
478
        f.add_done_callback(fn)
 
479
        self.assertTrue(f.cancel())
 
480
        self.assertTrue(was_cancelled)
 
481
 
 
482
    def test_done_callback_raises(self):
 
483
        with test.support.captured_stderr() as stderr:
 
484
            raising_was_called = False
 
485
            fn_was_called = False
 
486
 
 
487
            def raising_fn(callback_future):
 
488
                nonlocal raising_was_called
 
489
                raising_was_called = True
 
490
                raise Exception('doh!')
 
491
 
 
492
            def fn(callback_future):
 
493
                nonlocal fn_was_called
 
494
                fn_was_called = True
 
495
 
 
496
            f = Future()
 
497
            f.add_done_callback(raising_fn)
 
498
            f.add_done_callback(fn)
 
499
            f.set_result(5)
 
500
            self.assertTrue(raising_was_called)
 
501
            self.assertTrue(fn_was_called)
 
502
            self.assertIn('Exception: doh!', stderr.getvalue())
 
503
 
 
504
    def test_done_callback_already_successful(self):
 
505
        callback_result = None
 
506
        def fn(callback_future):
 
507
            nonlocal callback_result
 
508
            callback_result = callback_future.result()
 
509
 
 
510
        f = Future()
 
511
        f.set_result(5)
 
512
        f.add_done_callback(fn)
 
513
        self.assertEqual(5, callback_result)
 
514
 
 
515
    def test_done_callback_already_failed(self):
 
516
        callback_exception = None
 
517
        def fn(callback_future):
 
518
            nonlocal callback_exception
 
519
            callback_exception = callback_future.exception()
 
520
 
 
521
        f = Future()
 
522
        f.set_exception(Exception('test'))
 
523
        f.add_done_callback(fn)
 
524
        self.assertEqual(('test',), callback_exception.args)
 
525
 
 
526
    def test_done_callback_already_cancelled(self):
 
527
        was_cancelled = None
 
528
        def fn(callback_future):
 
529
            nonlocal was_cancelled
 
530
            was_cancelled = callback_future.cancelled()
 
531
 
 
532
        f = Future()
 
533
        self.assertTrue(f.cancel())
 
534
        f.add_done_callback(fn)
 
535
        self.assertTrue(was_cancelled)
 
536
 
 
537
    def test_repr(self):
 
538
        self.assertRegex(repr(PENDING_FUTURE),
 
539
                         '<Future at 0x[0-9a-f]+ state=pending>')
 
540
        self.assertRegex(repr(RUNNING_FUTURE),
 
541
                         '<Future at 0x[0-9a-f]+ state=running>')
 
542
        self.assertRegex(repr(CANCELLED_FUTURE),
 
543
                         '<Future at 0x[0-9a-f]+ state=cancelled>')
 
544
        self.assertRegex(repr(CANCELLED_AND_NOTIFIED_FUTURE),
 
545
                         '<Future at 0x[0-9a-f]+ state=cancelled>')
 
546
        self.assertRegex(
 
547
                repr(EXCEPTION_FUTURE),
 
548
                '<Future at 0x[0-9a-f]+ state=finished raised OSError>')
 
549
        self.assertRegex(
 
550
                repr(SUCCESSFUL_FUTURE),
 
551
                '<Future at 0x[0-9a-f]+ state=finished returned int>')
 
552
 
 
553
 
 
554
    def test_cancel(self):
 
555
        f1 = create_future(state=PENDING)
 
556
        f2 = create_future(state=RUNNING)
 
557
        f3 = create_future(state=CANCELLED)
 
558
        f4 = create_future(state=CANCELLED_AND_NOTIFIED)
 
559
        f5 = create_future(state=FINISHED, exception=OSError())
 
560
        f6 = create_future(state=FINISHED, result=5)
 
561
 
 
562
        self.assertTrue(f1.cancel())
 
563
        self.assertEqual(f1._state, CANCELLED)
 
564
 
 
565
        self.assertFalse(f2.cancel())
 
566
        self.assertEqual(f2._state, RUNNING)
 
567
 
 
568
        self.assertTrue(f3.cancel())
 
569
        self.assertEqual(f3._state, CANCELLED)
 
570
 
 
571
        self.assertTrue(f4.cancel())
 
572
        self.assertEqual(f4._state, CANCELLED_AND_NOTIFIED)
 
573
 
 
574
        self.assertFalse(f5.cancel())
 
575
        self.assertEqual(f5._state, FINISHED)
 
576
 
 
577
        self.assertFalse(f6.cancel())
 
578
        self.assertEqual(f6._state, FINISHED)
 
579
 
 
580
    def test_cancelled(self):
 
581
        self.assertFalse(PENDING_FUTURE.cancelled())
 
582
        self.assertFalse(RUNNING_FUTURE.cancelled())
 
583
        self.assertTrue(CANCELLED_FUTURE.cancelled())
 
584
        self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.cancelled())
 
585
        self.assertFalse(EXCEPTION_FUTURE.cancelled())
 
586
        self.assertFalse(SUCCESSFUL_FUTURE.cancelled())
 
587
 
 
588
    def test_done(self):
 
589
        self.assertFalse(PENDING_FUTURE.done())
 
590
        self.assertFalse(RUNNING_FUTURE.done())
 
591
        self.assertTrue(CANCELLED_FUTURE.done())
 
592
        self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.done())
 
593
        self.assertTrue(EXCEPTION_FUTURE.done())
 
594
        self.assertTrue(SUCCESSFUL_FUTURE.done())
 
595
 
 
596
    def test_running(self):
 
597
        self.assertFalse(PENDING_FUTURE.running())
 
598
        self.assertTrue(RUNNING_FUTURE.running())
 
599
        self.assertFalse(CANCELLED_FUTURE.running())
 
600
        self.assertFalse(CANCELLED_AND_NOTIFIED_FUTURE.running())
 
601
        self.assertFalse(EXCEPTION_FUTURE.running())
 
602
        self.assertFalse(SUCCESSFUL_FUTURE.running())
 
603
 
 
604
    def test_result_with_timeout(self):
 
605
        self.assertRaises(futures.TimeoutError,
 
606
                          PENDING_FUTURE.result, timeout=0)
 
607
        self.assertRaises(futures.TimeoutError,
 
608
                          RUNNING_FUTURE.result, timeout=0)
 
609
        self.assertRaises(futures.CancelledError,
 
610
                          CANCELLED_FUTURE.result, timeout=0)
 
611
        self.assertRaises(futures.CancelledError,
 
612
                          CANCELLED_AND_NOTIFIED_FUTURE.result, timeout=0)
 
613
        self.assertRaises(OSError, EXCEPTION_FUTURE.result, timeout=0)
 
614
        self.assertEqual(SUCCESSFUL_FUTURE.result(timeout=0), 42)
 
615
 
 
616
    def test_result_with_success(self):
 
617
        # TODO(brian@sweetapp.com): This test is timing dependant.
 
618
        def notification():
 
619
            # Wait until the main thread is waiting for the result.
 
620
            time.sleep(1)
 
621
            f1.set_result(42)
 
622
 
 
623
        f1 = create_future(state=PENDING)
 
624
        t = threading.Thread(target=notification)
 
625
        t.start()
 
626
 
 
627
        self.assertEqual(f1.result(timeout=5), 42)
 
628
 
 
629
    def test_result_with_cancel(self):
 
630
        # TODO(brian@sweetapp.com): This test is timing dependant.
 
631
        def notification():
 
632
            # Wait until the main thread is waiting for the result.
 
633
            time.sleep(1)
 
634
            f1.cancel()
 
635
 
 
636
        f1 = create_future(state=PENDING)
 
637
        t = threading.Thread(target=notification)
 
638
        t.start()
 
639
 
 
640
        self.assertRaises(futures.CancelledError, f1.result, timeout=5)
 
641
 
 
642
    def test_exception_with_timeout(self):
 
643
        self.assertRaises(futures.TimeoutError,
 
644
                          PENDING_FUTURE.exception, timeout=0)
 
645
        self.assertRaises(futures.TimeoutError,
 
646
                          RUNNING_FUTURE.exception, timeout=0)
 
647
        self.assertRaises(futures.CancelledError,
 
648
                          CANCELLED_FUTURE.exception, timeout=0)
 
649
        self.assertRaises(futures.CancelledError,
 
650
                          CANCELLED_AND_NOTIFIED_FUTURE.exception, timeout=0)
 
651
        self.assertTrue(isinstance(EXCEPTION_FUTURE.exception(timeout=0),
 
652
                                   OSError))
 
653
        self.assertEqual(SUCCESSFUL_FUTURE.exception(timeout=0), None)
 
654
 
 
655
    def test_exception_with_success(self):
 
656
        def notification():
 
657
            # Wait until the main thread is waiting for the exception.
 
658
            time.sleep(1)
 
659
            with f1._condition:
 
660
                f1._state = FINISHED
 
661
                f1._exception = OSError()
 
662
                f1._condition.notify_all()
 
663
 
 
664
        f1 = create_future(state=PENDING)
 
665
        t = threading.Thread(target=notification)
 
666
        t.start()
 
667
 
 
668
        self.assertTrue(isinstance(f1.exception(timeout=5), OSError))
 
669
 
 
670
@test.support.reap_threads
 
671
def test_main():
 
672
    try:
 
673
        test.support.run_unittest(__name__)
 
674
    finally:
 
675
        test.support.reap_children()
 
676
 
 
677
if __name__ == "__main__":
 
678
    test_main()