4
# Unit tests for the multiprocessing package
22
# Work around broken sem_open implementations
24
import multiprocessing.synchronize
25
except ImportError, e:
26
from test.test_support import TestSkipped
29
import multiprocessing.dummy
30
import multiprocessing.connection
31
import multiprocessing.managers
32
import multiprocessing.heap
33
import multiprocessing.pool
34
import _multiprocessing
36
from multiprocessing import util
48
LOG_LEVEL = util.SUBWARNING
49
#LOG_LEVEL = logging.WARNING
52
CHECK_TIMINGS = False # making true makes tests take a lot longer
53
# and can sometimes cause some non-serious
54
# failures because some calls block a bit
55
# longer than expected
57
TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
59
TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
61
HAVE_GETVALUE = not getattr(_multiprocessing,
62
'HAVE_BROKEN_SEM_GETVALUE', False)
65
# Creates a wrapper for a function which records the time it takes to finish
68
class TimingWrapper(object):
70
def __init__(self, func):
74
def __call__(self, *args, **kwds):
77
return self.func(*args, **kwds)
79
self.elapsed = time.time() - t
82
# Base class for test cases
85
class BaseTestCase(object):
87
ALLOWED_TYPES = ('processes', 'manager', 'threads')
89
def assertTimingAlmostEqual(self, a, b):
91
self.assertAlmostEqual(a, b, 1)
93
def assertReturnsIfImplemented(self, value, func, *args):
96
except NotImplementedError:
99
return self.assertEqual(value, res)
102
# Return the value of a semaphore
107
return self.get_value()
108
except AttributeError:
110
return self._Semaphore__value
111
except AttributeError:
114
except AttributeError:
115
raise NotImplementedError
121
class _TestProcess(BaseTestCase):
123
ALLOWED_TYPES = ('processes', 'threads')
125
def test_current(self):
126
if self.TYPE == 'threads':
129
current = self.current_process()
130
authkey = current.authkey
132
self.assertTrue(current.is_alive())
133
self.assertTrue(not current.daemon)
134
self.assertTrue(isinstance(authkey, bytes))
135
self.assertTrue(len(authkey) > 0)
136
self.assertEqual(current.ident, os.getpid())
137
self.assertEqual(current.exitcode, None)
139
def _test(self, q, *args, **kwds):
140
current = self.current_process()
144
if self.TYPE != 'threads':
145
q.put(bytes(current.authkey))
148
def test_process(self):
152
kwargs = {'hello':23, 'bye':2.54}
155
target=self._test, args=args, kwargs=kwargs, name=name
158
current = self.current_process()
160
if self.TYPE != 'threads':
161
self.assertEquals(p.authkey, current.authkey)
162
self.assertEquals(p.is_alive(), False)
163
self.assertEquals(p.daemon, True)
164
self.assertTrue(p not in self.active_children())
165
self.assertTrue(type(self.active_children()) is list)
166
self.assertEqual(p.exitcode, None)
170
self.assertEquals(p.exitcode, None)
171
self.assertEquals(p.is_alive(), True)
172
self.assertTrue(p in self.active_children())
174
self.assertEquals(q.get(), args[1:])
175
self.assertEquals(q.get(), kwargs)
176
self.assertEquals(q.get(), p.name)
177
if self.TYPE != 'threads':
178
self.assertEquals(q.get(), current.authkey)
179
self.assertEquals(q.get(), p.pid)
183
self.assertEquals(p.exitcode, 0)
184
self.assertEquals(p.is_alive(), False)
185
self.assertTrue(p not in self.active_children())
187
def _test_terminate(self):
190
def test_terminate(self):
191
if self.TYPE == 'threads':
194
p = self.Process(target=self._test_terminate)
198
self.assertEqual(p.is_alive(), True)
199
self.assertTrue(p in self.active_children())
200
self.assertEqual(p.exitcode, None)
204
join = TimingWrapper(p.join)
205
self.assertEqual(join(), None)
206
self.assertTimingAlmostEqual(join.elapsed, 0.0)
208
self.assertEqual(p.is_alive(), False)
209
self.assertTrue(p not in self.active_children())
213
# XXX sometimes get p.exitcode == 0 on Windows ...
214
#self.assertEqual(p.exitcode, -signal.SIGTERM)
216
def test_cpu_count(self):
218
cpus = multiprocessing.cpu_count()
219
except NotImplementedError:
221
self.assertTrue(type(cpus) is int)
222
self.assertTrue(cpus >= 1)
224
def test_active_children(self):
225
self.assertEqual(type(self.active_children()), list)
227
p = self.Process(target=time.sleep, args=(DELTA,))
228
self.assertTrue(p not in self.active_children())
231
self.assertTrue(p in self.active_children())
234
self.assertTrue(p not in self.active_children())
236
def _test_recursion(self, wconn, id):
237
from multiprocessing import forking
242
target=self._test_recursion, args=(wconn, id+[i])
247
def test_recursion(self):
248
rconn, wconn = self.Pipe(duplex=False)
249
self._test_recursion(wconn, [])
254
result.append(rconn.recv())
265
self.assertEqual(result, expected)
271
class _UpperCaser(multiprocessing.Process):
274
multiprocessing.Process.__init__(self)
275
self.child_conn, self.parent_conn = multiprocessing.Pipe()
278
self.parent_conn.close()
279
for s in iter(self.child_conn.recv, None):
280
self.child_conn.send(s.upper())
281
self.child_conn.close()
284
assert type(s) is str
285
self.parent_conn.send(s)
286
return self.parent_conn.recv()
289
self.parent_conn.send(None)
290
self.parent_conn.close()
291
self.child_conn.close()
293
class _TestSubclassingProcess(BaseTestCase):
295
ALLOWED_TYPES = ('processes',)
297
def test_subclassing(self):
298
uppercaser = _UpperCaser()
300
self.assertEqual(uppercaser.submit('hello'), 'HELLO')
301
self.assertEqual(uppercaser.submit('world'), 'WORLD')
310
if hasattr(q, 'empty'):
313
return q.qsize() == 0
315
def queue_full(q, maxsize):
316
if hasattr(q, 'full'):
319
return q.qsize() == maxsize
322
class _TestQueue(BaseTestCase):
325
def _test_put(self, queue, child_can_start, parent_can_continue):
326
child_can_start.wait()
329
parent_can_continue.set()
333
queue = self.Queue(maxsize=MAXSIZE)
334
child_can_start = self.Event()
335
parent_can_continue = self.Event()
338
target=self._test_put,
339
args=(queue, child_can_start, parent_can_continue)
344
self.assertEqual(queue_empty(queue), True)
345
self.assertEqual(queue_full(queue, MAXSIZE), False)
349
queue.put(3, True, None)
351
queue.put(5, False, None)
354
# the values may be in buffer but not yet in pipe so sleep a bit
357
self.assertEqual(queue_empty(queue), False)
358
self.assertEqual(queue_full(queue, MAXSIZE), True)
360
put = TimingWrapper(queue.put)
361
put_nowait = TimingWrapper(queue.put_nowait)
363
self.assertRaises(Queue.Full, put, 7, False)
364
self.assertTimingAlmostEqual(put.elapsed, 0)
366
self.assertRaises(Queue.Full, put, 7, False, None)
367
self.assertTimingAlmostEqual(put.elapsed, 0)
369
self.assertRaises(Queue.Full, put_nowait, 7)
370
self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
372
self.assertRaises(Queue.Full, put, 7, True, TIMEOUT1)
373
self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
375
self.assertRaises(Queue.Full, put, 7, False, TIMEOUT2)
376
self.assertTimingAlmostEqual(put.elapsed, 0)
378
self.assertRaises(Queue.Full, put, 7, True, timeout=TIMEOUT3)
379
self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
381
child_can_start.set()
382
parent_can_continue.wait()
384
self.assertEqual(queue_empty(queue), True)
385
self.assertEqual(queue_full(queue, MAXSIZE), False)
389
def _test_get(self, queue, child_can_start, parent_can_continue):
390
child_can_start.wait()
396
parent_can_continue.set()
400
child_can_start = self.Event()
401
parent_can_continue = self.Event()
404
target=self._test_get,
405
args=(queue, child_can_start, parent_can_continue)
410
self.assertEqual(queue_empty(queue), True)
412
child_can_start.set()
413
parent_can_continue.wait()
416
self.assertEqual(queue_empty(queue), False)
418
# Hangs unexpectedly, remove for now
419
#self.assertEqual(queue.get(), 1)
420
self.assertEqual(queue.get(True, None), 2)
421
self.assertEqual(queue.get(True), 3)
422
self.assertEqual(queue.get(timeout=1), 4)
423
self.assertEqual(queue.get_nowait(), 5)
425
self.assertEqual(queue_empty(queue), True)
427
get = TimingWrapper(queue.get)
428
get_nowait = TimingWrapper(queue.get_nowait)
430
self.assertRaises(Queue.Empty, get, False)
431
self.assertTimingAlmostEqual(get.elapsed, 0)
433
self.assertRaises(Queue.Empty, get, False, None)
434
self.assertTimingAlmostEqual(get.elapsed, 0)
436
self.assertRaises(Queue.Empty, get_nowait)
437
self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
439
self.assertRaises(Queue.Empty, get, True, TIMEOUT1)
440
self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
442
self.assertRaises(Queue.Empty, get, False, TIMEOUT2)
443
self.assertTimingAlmostEqual(get.elapsed, 0)
445
self.assertRaises(Queue.Empty, get, timeout=TIMEOUT3)
446
self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
450
def _test_fork(self, queue):
451
for i in range(10, 20):
453
# note that at this point the items may only be buffered, so the
454
# process cannot shutdown until the feeder thread has finished
455
# pushing items onto the pipe.
458
# Old versions of Queue would fail to create a new feeder
459
# thread for a forked process if the original process had its
460
# own feeder thread. This test checks that this no longer
465
# put items on queue so that main process starts a feeder thread
469
# wait to make sure thread starts before we fork a new process
473
p = self.Process(target=self._test_fork, args=(queue,))
476
# check that all expected items are in the queue
478
self.assertEqual(queue.get(), i)
479
self.assertRaises(Queue.Empty, queue.get, False)
483
def test_qsize(self):
486
self.assertEqual(q.qsize(), 0)
487
except NotImplementedError:
490
self.assertEqual(q.qsize(), 1)
492
self.assertEqual(q.qsize(), 2)
494
self.assertEqual(q.qsize(), 1)
496
self.assertEqual(q.qsize(), 0)
498
def _test_task_done(self, q):
499
for obj in iter(q.get, None):
503
def test_task_done(self):
504
queue = self.JoinableQueue()
506
if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
509
workers = [self.Process(target=self._test_task_done, args=(queue,))
530
class _TestLock(BaseTestCase):
534
self.assertEqual(lock.acquire(), True)
535
self.assertEqual(lock.acquire(False), False)
536
self.assertEqual(lock.release(), None)
537
self.assertRaises((ValueError, threading.ThreadError), lock.release)
539
def test_rlock(self):
541
self.assertEqual(lock.acquire(), True)
542
self.assertEqual(lock.acquire(), True)
543
self.assertEqual(lock.acquire(), True)
544
self.assertEqual(lock.release(), None)
545
self.assertEqual(lock.release(), None)
546
self.assertEqual(lock.release(), None)
547
self.assertRaises((AssertionError, RuntimeError), lock.release)
550
class _TestSemaphore(BaseTestCase):
552
def _test_semaphore(self, sem):
553
self.assertReturnsIfImplemented(2, get_value, sem)
554
self.assertEqual(sem.acquire(), True)
555
self.assertReturnsIfImplemented(1, get_value, sem)
556
self.assertEqual(sem.acquire(), True)
557
self.assertReturnsIfImplemented(0, get_value, sem)
558
self.assertEqual(sem.acquire(False), False)
559
self.assertReturnsIfImplemented(0, get_value, sem)
560
self.assertEqual(sem.release(), None)
561
self.assertReturnsIfImplemented(1, get_value, sem)
562
self.assertEqual(sem.release(), None)
563
self.assertReturnsIfImplemented(2, get_value, sem)
565
def test_semaphore(self):
566
sem = self.Semaphore(2)
567
self._test_semaphore(sem)
568
self.assertEqual(sem.release(), None)
569
self.assertReturnsIfImplemented(3, get_value, sem)
570
self.assertEqual(sem.release(), None)
571
self.assertReturnsIfImplemented(4, get_value, sem)
573
def test_bounded_semaphore(self):
574
sem = self.BoundedSemaphore(2)
575
self._test_semaphore(sem)
576
# Currently fails on OS/X
578
# self.assertRaises(ValueError, sem.release)
579
# self.assertReturnsIfImplemented(2, get_value, sem)
581
def test_timeout(self):
582
if self.TYPE != 'processes':
585
sem = self.Semaphore(0)
586
acquire = TimingWrapper(sem.acquire)
588
self.assertEqual(acquire(False), False)
589
self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
591
self.assertEqual(acquire(False, None), False)
592
self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
594
self.assertEqual(acquire(False, TIMEOUT1), False)
595
self.assertTimingAlmostEqual(acquire.elapsed, 0)
597
self.assertEqual(acquire(True, TIMEOUT2), False)
598
self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
600
self.assertEqual(acquire(timeout=TIMEOUT3), False)
601
self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
604
class _TestCondition(BaseTestCase):
606
def f(self, cond, sleeping, woken, timeout=None):
613
def check_invariant(self, cond):
614
# this is only supposed to succeed when there are no sleepers
615
if self.TYPE == 'processes':
617
sleepers = (cond._sleeping_count.get_value() -
618
cond._woken_count.get_value())
619
self.assertEqual(sleepers, 0)
620
self.assertEqual(cond._wait_semaphore.get_value(), 0)
621
except NotImplementedError:
624
def test_notify(self):
625
cond = self.Condition()
626
sleeping = self.Semaphore(0)
627
woken = self.Semaphore(0)
629
p = self.Process(target=self.f, args=(cond, sleeping, woken))
633
p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
637
# wait for both children to start sleeping
641
# check no process/thread has woken up
643
self.assertReturnsIfImplemented(0, get_value, woken)
645
# wake up one process/thread
650
# check one process/thread has woken up
652
self.assertReturnsIfImplemented(1, get_value, woken)
659
# check other has woken up
661
self.assertReturnsIfImplemented(2, get_value, woken)
663
# check state is not mucked up
664
self.check_invariant(cond)
667
def test_notify_all(self):
668
cond = self.Condition()
669
sleeping = self.Semaphore(0)
670
woken = self.Semaphore(0)
672
# start some threads/processes which will timeout
674
p = self.Process(target=self.f,
675
args=(cond, sleeping, woken, TIMEOUT1))
679
t = threading.Thread(target=self.f,
680
args=(cond, sleeping, woken, TIMEOUT1))
684
# wait for them all to sleep
688
# check they have all timed out
691
self.assertReturnsIfImplemented(0, get_value, woken)
693
# check state is not mucked up
694
self.check_invariant(cond)
696
# start some more threads/processes
698
p = self.Process(target=self.f, args=(cond, sleeping, woken))
702
t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
706
# wait for them to all sleep
710
# check no process/thread has woken up
712
self.assertReturnsIfImplemented(0, get_value, woken)
719
# check they have all woken
721
self.assertReturnsIfImplemented(6, get_value, woken)
723
# check state is not mucked up
724
self.check_invariant(cond)
726
def test_timeout(self):
727
cond = self.Condition()
728
wait = TimingWrapper(cond.wait)
732
self.assertEqual(res, None)
733
self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
736
class _TestEvent(BaseTestCase):
738
def _test_event(self, event):
742
def test_event(self):
744
wait = TimingWrapper(event.wait)
746
# Removed temporaily, due to API shear, this does not
747
# work with threading._Event objects. is_set == isSet
748
#self.assertEqual(event.is_set(), False)
750
self.assertEqual(wait(0.0), None)
751
self.assertTimingAlmostEqual(wait.elapsed, 0.0)
752
self.assertEqual(wait(TIMEOUT1), None)
753
self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
757
# See note above on the API differences
758
# self.assertEqual(event.is_set(), True)
759
self.assertEqual(wait(), None)
760
self.assertTimingAlmostEqual(wait.elapsed, 0.0)
761
self.assertEqual(wait(TIMEOUT1), None)
762
self.assertTimingAlmostEqual(wait.elapsed, 0.0)
763
# self.assertEqual(event.is_set(), True)
767
#self.assertEqual(event.is_set(), False)
769
self.Process(target=self._test_event, args=(event,)).start()
770
self.assertEqual(wait(), None)
776
class _TestValue(BaseTestCase):
782
('c', latin('x'), latin('y'))
785
def _test(self, values):
786
for sv, cv in zip(values, self.codes_values):
790
def test_value(self, raw=False):
791
if self.TYPE != 'processes':
795
values = [self.RawValue(code, value)
796
for code, value, _ in self.codes_values]
798
values = [self.Value(code, value)
799
for code, value, _ in self.codes_values]
801
for sv, cv in zip(values, self.codes_values):
802
self.assertEqual(sv.value, cv[1])
804
proc = self.Process(target=self._test, args=(values,))
808
for sv, cv in zip(values, self.codes_values):
809
self.assertEqual(sv.value, cv[2])
811
def test_rawvalue(self):
812
self.test_value(raw=True)
814
def test_getobj_getlock(self):
815
if self.TYPE != 'processes':
818
val1 = self.Value('i', 5)
819
lock1 = val1.get_lock()
820
obj1 = val1.get_obj()
822
val2 = self.Value('i', 5, lock=None)
823
lock2 = val2.get_lock()
824
obj2 = val2.get_obj()
827
val3 = self.Value('i', 5, lock=lock)
828
lock3 = val3.get_lock()
829
obj3 = val3.get_obj()
830
self.assertEqual(lock, lock3)
832
arr4 = self.RawValue('i', 5)
833
self.assertFalse(hasattr(arr4, 'get_lock'))
834
self.assertFalse(hasattr(arr4, 'get_obj'))
837
class _TestArray(BaseTestCase):
840
for i in range(1, len(seq)):
843
def test_array(self, raw=False):
844
if self.TYPE != 'processes':
847
seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
849
arr = self.RawArray('i', seq)
851
arr = self.Array('i', seq)
853
self.assertEqual(len(arr), len(seq))
854
self.assertEqual(arr[3], seq[3])
855
self.assertEqual(list(arr[2:7]), list(seq[2:7]))
857
arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
859
self.assertEqual(list(arr[:]), seq)
863
p = self.Process(target=self.f, args=(arr,))
867
self.assertEqual(list(arr[:]), seq)
869
def test_rawarray(self):
870
self.test_array(raw=True)
872
def test_getobj_getlock_obj(self):
873
if self.TYPE != 'processes':
876
arr1 = self.Array('i', range(10))
877
lock1 = arr1.get_lock()
878
obj1 = arr1.get_obj()
880
arr2 = self.Array('i', range(10), lock=None)
881
lock2 = arr2.get_lock()
882
obj2 = arr2.get_obj()
885
arr3 = self.Array('i', range(10), lock=lock)
886
lock3 = arr3.get_lock()
887
obj3 = arr3.get_obj()
888
self.assertEqual(lock, lock3)
890
arr4 = self.RawArray('i', range(10))
891
self.assertFalse(hasattr(arr4, 'get_lock'))
892
self.assertFalse(hasattr(arr4, 'get_obj'))
898
class _TestContainers(BaseTestCase):
900
ALLOWED_TYPES = ('manager',)
903
a = self.list(range(10))
904
self.assertEqual(a[:], range(10))
907
self.assertEqual(b[:], [])
910
self.assertEqual(b[:], range(5))
912
self.assertEqual(b[2], 2)
913
self.assertEqual(b[2:10], [2,3,4])
916
self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
918
self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
920
self.assertEqual(a[:], range(10))
926
[[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
931
self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
935
indices = range(65, 70)
938
self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
939
self.assertEqual(sorted(d.keys()), indices)
940
self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
941
self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
943
def test_namespace(self):
948
self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
950
self.assertEqual(str(n), "Namespace(name='Bob')")
951
self.assertTrue(hasattr(n, 'name'))
952
self.assertTrue(not hasattr(n, 'job'))
958
def sqr(x, wait=0.0):
961
class _TestPool(BaseTestCase):
963
def test_apply(self):
964
papply = self.pool.apply
965
self.assertEqual(papply(sqr, (5,)), sqr(5))
966
self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
970
self.assertEqual(pmap(sqr, range(10)), map(sqr, range(10)))
971
self.assertEqual(pmap(sqr, range(100), chunksize=20),
972
map(sqr, range(100)))
974
def test_async(self):
975
res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
976
get = TimingWrapper(res.get)
977
self.assertEqual(get(), 49)
978
self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
980
def test_async_timeout(self):
981
res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
982
get = TimingWrapper(res.get)
983
self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
984
self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
987
it = self.pool.imap(sqr, range(10))
988
self.assertEqual(list(it), map(sqr, range(10)))
990
it = self.pool.imap(sqr, range(10))
992
self.assertEqual(it.next(), i*i)
993
self.assertRaises(StopIteration, it.next)
995
it = self.pool.imap(sqr, range(1000), chunksize=100)
996
for i in range(1000):
997
self.assertEqual(it.next(), i*i)
998
self.assertRaises(StopIteration, it.next)
1000
def test_imap_unordered(self):
1001
it = self.pool.imap_unordered(sqr, range(1000))
1002
self.assertEqual(sorted(it), map(sqr, range(1000)))
1004
it = self.pool.imap_unordered(sqr, range(1000), chunksize=53)
1005
self.assertEqual(sorted(it), map(sqr, range(1000)))
1007
def test_make_pool(self):
1008
p = multiprocessing.Pool(3)
1009
self.assertEqual(3, len(p._pool))
1013
def test_terminate(self):
1014
if self.TYPE == 'manager':
1015
# On Unix a forked process increfs each shared object to
1016
# which its parent process held a reference. If the
1017
# forked process gets terminated then there is likely to
1018
# be a reference leak. So to prevent
1019
# _TestZZZNumberOfObjects from failing we skip this test
1020
# when using a manager.
1023
result = self.pool.map_async(
1024
time.sleep, [0.1 for i in range(10000)], chunksize=1
1026
self.pool.terminate()
1027
join = TimingWrapper(self.pool.join)
1029
self.assertTrue(join.elapsed < 0.2)
1031
# Test that manager has expected number of shared objects left
1034
class _TestZZZNumberOfObjects(BaseTestCase):
1035
# Because test cases are sorted alphabetically, this one will get
1036
# run after all the other tests for the manager. It tests that
1037
# there have been no "reference leaks" for the manager's shared
1038
# objects. Note the comment in _TestPool.test_terminate().
1039
ALLOWED_TYPES = ('manager',)
1041
def test_number_of_objects(self):
1042
EXPECTED_NUMBER = 1 # the pool object is still alive
1043
multiprocessing.active_children() # discard dead process objs
1044
gc.collect() # do garbage collection
1045
refs = self.manager._number_of_objects()
1046
if refs != EXPECTED_NUMBER:
1047
print self.manager._debug_info()
1049
self.assertEqual(refs, EXPECTED_NUMBER)
1052
# Test of creating a customized manager class
1055
from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1057
class FooBar(object):
1066
for i in xrange(10):
1069
class IteratorProxy(BaseProxy):
1070
_exposed_ = ('next', '__next__')
1074
return self._callmethod('next')
1076
return self._callmethod('__next__')
1078
class MyManager(BaseManager):
1081
MyManager.register('Foo', callable=FooBar)
1082
MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1083
MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1086
class _TestMyManager(BaseTestCase):
1088
ALLOWED_TYPES = ('manager',)
1090
def test_mymanager(self):
1091
manager = MyManager()
1098
foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1099
bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1101
self.assertEqual(foo_methods, ['f', 'g'])
1102
self.assertEqual(bar_methods, ['f', '_h'])
1104
self.assertEqual(foo.f(), 'f()')
1105
self.assertRaises(ValueError, foo.g)
1106
self.assertEqual(foo._callmethod('f'), 'f()')
1107
self.assertRaises(RemoteError, foo._callmethod, '_h')
1109
self.assertEqual(bar.f(), 'f()')
1110
self.assertEqual(bar._h(), '_h()')
1111
self.assertEqual(bar._callmethod('f'), 'f()')
1112
self.assertEqual(bar._callmethod('_h'), '_h()')
1114
self.assertEqual(list(baz), [i*i for i in range(10)])
1119
# Test of connecting to a remote server and using xmlrpclib for serialization
1122
_queue = Queue.Queue()
1126
class QueueManager(BaseManager):
1127
'''manager class used by server process'''
1128
QueueManager.register('get_queue', callable=get_queue)
1130
class QueueManager2(BaseManager):
1131
'''manager class which specifies the same interface as QueueManager'''
1132
QueueManager2.register('get_queue')
1135
SERIALIZER = 'xmlrpclib'
1137
class _TestRemoteManager(BaseTestCase):
1139
ALLOWED_TYPES = ('manager',)
1141
def _putter(self, address, authkey):
1142
manager = QueueManager2(
1143
address=address, authkey=authkey, serializer=SERIALIZER
1146
queue = manager.get_queue()
1147
queue.put(('hello world', None, True, 2.25))
1149
def test_remote(self):
1150
authkey = os.urandom(32)
1152
manager = QueueManager(
1153
address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
1157
p = self.Process(target=self._putter, args=(manager.address, authkey))
1160
manager2 = QueueManager2(
1161
address=manager.address, authkey=authkey, serializer=SERIALIZER
1164
queue = manager2.get_queue()
1166
# Note that xmlrpclib will deserialize object as a list not a tuple
1167
self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
1169
# Because we are using xmlrpclib for serialization instead of
1170
# pickle this will cause a serialization error.
1171
self.assertRaises(Exception, queue.put, time.sleep)
1173
# Make queue finalizer run before the server is stopped
1181
SENTINEL = latin('')
1183
class _TestConnection(BaseTestCase):
1185
ALLOWED_TYPES = ('processes', 'threads')
1187
def _echo(self, conn):
1188
for msg in iter(conn.recv_bytes, SENTINEL):
1189
conn.send_bytes(msg)
1192
def test_connection(self):
1193
conn, child_conn = self.Pipe()
1195
p = self.Process(target=self._echo, args=(child_conn,))
1199
seq = [1, 2.25, None]
1200
msg = latin('hello world')
1202
arr = array.array('i', range(4))
1204
if self.TYPE == 'processes':
1205
self.assertEqual(type(conn.fileno()), int)
1207
self.assertEqual(conn.send(seq), None)
1208
self.assertEqual(conn.recv(), seq)
1210
self.assertEqual(conn.send_bytes(msg), None)
1211
self.assertEqual(conn.recv_bytes(), msg)
1213
if self.TYPE == 'processes':
1214
buffer = array.array('i', [0]*10)
1215
expected = list(arr) + [0] * (10 - len(arr))
1216
self.assertEqual(conn.send_bytes(arr), None)
1217
self.assertEqual(conn.recv_bytes_into(buffer),
1218
len(arr) * buffer.itemsize)
1219
self.assertEqual(list(buffer), expected)
1221
buffer = array.array('i', [0]*10)
1222
expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
1223
self.assertEqual(conn.send_bytes(arr), None)
1224
self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
1225
len(arr) * buffer.itemsize)
1226
self.assertEqual(list(buffer), expected)
1228
buffer = bytearray(latin(' ' * 40))
1229
self.assertEqual(conn.send_bytes(longmsg), None)
1231
res = conn.recv_bytes_into(buffer)
1232
except multiprocessing.BufferTooShort, e:
1233
self.assertEqual(e.args, (longmsg,))
1235
self.fail('expected BufferTooShort, got %s' % res)
1237
poll = TimingWrapper(conn.poll)
1239
self.assertEqual(poll(), False)
1240
self.assertTimingAlmostEqual(poll.elapsed, 0)
1242
self.assertEqual(poll(TIMEOUT1), False)
1243
self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
1247
self.assertEqual(poll(TIMEOUT1), True)
1248
self.assertTimingAlmostEqual(poll.elapsed, 0)
1250
self.assertEqual(conn.recv(), None)
1252
really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
1253
conn.send_bytes(really_big_msg)
1254
self.assertEqual(conn.recv_bytes(), really_big_msg)
1256
conn.send_bytes(SENTINEL) # tell child to quit
1259
if self.TYPE == 'processes':
1260
self.assertEqual(conn.readable, True)
1261
self.assertEqual(conn.writable, True)
1262
self.assertRaises(EOFError, conn.recv)
1263
self.assertRaises(EOFError, conn.recv_bytes)
1267
def test_duplex_false(self):
1268
reader, writer = self.Pipe(duplex=False)
1269
self.assertEqual(writer.send(1), None)
1270
self.assertEqual(reader.recv(), 1)
1271
if self.TYPE == 'processes':
1272
self.assertEqual(reader.readable, True)
1273
self.assertEqual(reader.writable, False)
1274
self.assertEqual(writer.readable, False)
1275
self.assertEqual(writer.writable, True)
1276
self.assertRaises(IOError, reader.send, 2)
1277
self.assertRaises(IOError, writer.recv)
1278
self.assertRaises(IOError, writer.poll)
1280
def test_spawn_close(self):
1281
# We test that a pipe connection can be closed by parent
1282
# process immediately after child is spawned. On Windows this
1283
# would have sometimes failed on old versions because
1284
# child_conn would be closed before the child got a chance to
1286
conn, child_conn = self.Pipe()
1288
p = self.Process(target=self._echo, args=(child_conn,))
1290
child_conn.close() # this might complete before child initializes
1292
msg = latin('hello')
1293
conn.send_bytes(msg)
1294
self.assertEqual(conn.recv_bytes(), msg)
1296
conn.send_bytes(SENTINEL)
1300
def test_sendbytes(self):
1301
if self.TYPE != 'processes':
1304
msg = latin('abcdefghijklmnopqrstuvwxyz')
1308
self.assertEqual(b.recv_bytes(), msg)
1310
a.send_bytes(msg, 5)
1311
self.assertEqual(b.recv_bytes(), msg[5:])
1313
a.send_bytes(msg, 7, 8)
1314
self.assertEqual(b.recv_bytes(), msg[7:7+8])
1316
a.send_bytes(msg, 26)
1317
self.assertEqual(b.recv_bytes(), latin(''))
1319
a.send_bytes(msg, 26, 0)
1320
self.assertEqual(b.recv_bytes(), latin(''))
1322
self.assertRaises(ValueError, a.send_bytes, msg, 27)
1324
self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
1326
self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
1328
self.assertRaises(ValueError, a.send_bytes, msg, -1)
1330
self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
1332
class _TestListenerClient(BaseTestCase):
1334
ALLOWED_TYPES = ('processes', 'threads')
1336
def _test(self, address):
1337
conn = self.connection.Client(address)
1341
def test_listener_client(self):
1342
for family in self.connection.families:
1343
l = self.connection.Listener(family=family)
1344
p = self.Process(target=self._test, args=(l.address,))
1348
self.assertEqual(conn.recv(), 'hello')
1352
# Test of sending connection and socket objects between processes
1355
class _TestPicklingConnections(BaseTestCase):
1357
ALLOWED_TYPES = ('processes',)
1359
def _listener(self, conn, families):
1360
for fam in families:
1361
l = self.connection.Listener(family=fam)
1362
conn.send(l.address)
1363
new_conn = l.accept()
1366
if self.TYPE == 'processes':
1368
l.bind(('localhost', 0))
1369
conn.send(l.getsockname())
1371
new_conn, addr = l.accept()
1376
def _remote(self, conn):
1377
for (address, msg) in iter(conn.recv, None):
1378
client = self.connection.Client(address)
1379
client.send(msg.upper())
1382
if self.TYPE == 'processes':
1383
address, msg = conn.recv()
1384
client = socket.socket()
1385
client.connect(address)
1386
client.sendall(msg.upper())
1391
def test_pickling(self):
1393
multiprocessing.allow_connection_pickling()
1397
families = self.connection.families
1399
lconn, lconn0 = self.Pipe()
1400
lp = self.Process(target=self._listener, args=(lconn0, families))
1404
rconn, rconn0 = self.Pipe()
1405
rp = self.Process(target=self._remote, args=(rconn0,))
1409
for fam in families:
1410
msg = ('This connection uses family %s' % fam).encode('ascii')
1411
address = lconn.recv()
1412
rconn.send((address, msg))
1413
new_conn = lconn.recv()
1414
self.assertEqual(new_conn.recv(), msg.upper())
1418
if self.TYPE == 'processes':
1419
msg = latin('This connection uses a normal socket')
1420
address = lconn.recv()
1421
rconn.send((address, msg))
1422
if hasattr(socket, 'fromfd'):
1423
new_conn = lconn.recv()
1424
self.assertEqual(new_conn.recv(100), msg.upper())
1426
# XXX On Windows with Py2.6 need to backport fromfd()
1427
discard = lconn.recv_bytes()
1441
class _TestHeap(BaseTestCase):
1443
ALLOWED_TYPES = ('processes',)
1445
def test_heap(self):
1450
# create and destroy lots of blocks of different sizes
1451
for i in xrange(iterations):
1452
size = int(random.lognormvariate(0, 1) * 1000)
1453
b = multiprocessing.heap.BufferWrapper(size)
1455
if len(blocks) > maxblocks:
1456
i = random.randrange(maxblocks)
1459
# get the heap object
1460
heap = multiprocessing.heap.BufferWrapper._heap
1462
# verify the state of the heap
1465
for L in heap._len_to_seq.values():
1466
for arena, start, stop in L:
1467
all.append((heap._arenas.index(arena), start, stop,
1468
stop-start, 'free'))
1469
for arena, start, stop in heap._allocated_blocks:
1470
all.append((heap._arenas.index(arena), start, stop,
1471
stop-start, 'occupied'))
1472
occupied += (stop-start)
1476
for i in range(len(all)-1):
1477
(arena, start, stop) = all[i][:3]
1478
(narena, nstart, nstop) = all[i+1][:3]
1479
self.assertTrue((arena != narena and nstart == 0) or
1487
from ctypes import Structure, Value, copy, c_int, c_double
1490
c_int = c_double = None
1492
class _Foo(Structure):
1498
class _TestSharedCTypes(BaseTestCase):
1500
ALLOWED_TYPES = ('processes',)
1502
def _double(self, x, y, foo, arr, string):
1508
for i in range(len(arr)):
1511
def test_sharedctypes(self, lock=False):
1515
x = Value('i', 7, lock=lock)
1516
y = Value(ctypes.c_double, 1.0/3.0, lock=lock)
1517
foo = Value(_Foo, 3, 2, lock=lock)
1518
arr = Array('d', range(10), lock=lock)
1519
string = Array('c', 20, lock=lock)
1520
string.value = 'hello'
1522
p = self.Process(target=self._double, args=(x, y, foo, arr, string))
1526
self.assertEqual(x.value, 14)
1527
self.assertAlmostEqual(y.value, 2.0/3.0)
1528
self.assertEqual(foo.x, 6)
1529
self.assertAlmostEqual(foo.y, 4.0)
1531
self.assertAlmostEqual(arr[i], i*2)
1532
self.assertEqual(string.value, latin('hellohello'))
1534
def test_synchronize(self):
1535
self.test_sharedctypes(lock=True)
1537
def test_copy(self):
1545
self.assertEqual(bar.x, 2)
1546
self.assertAlmostEqual(bar.y, 5.0)
1552
class _TestFinalize(BaseTestCase):
1554
ALLOWED_TYPES = ('processes',)
1556
def _test_finalize(self, conn):
1561
util.Finalize(a, conn.send, args=('a',))
1562
del a # triggers callback for a
1565
close_b = util.Finalize(b, conn.send, args=('b',))
1566
close_b() # triggers callback for b
1567
close_b() # does nothing because callback has already been called
1568
del b # does nothing because callback has already been called
1571
util.Finalize(c, conn.send, args=('c',))
1574
util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
1577
util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
1579
util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
1581
util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
1583
util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
1585
util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
1587
# call mutliprocessing's cleanup function then exit process without
1588
# garbage collecting locals
1589
util._exit_function()
1593
def test_finalize(self):
1594
conn, child_conn = self.Pipe()
1596
p = self.Process(target=self._test_finalize, args=(child_conn,))
1600
result = [obj for obj in iter(conn.recv, 'STOP')]
1601
self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
1604
# Test that from ... import * works for each module
1607
class _TestImportStar(BaseTestCase):
1609
ALLOWED_TYPES = ('processes',)
1611
def test_import(self):
1613
'multiprocessing', 'multiprocessing.connection',
1614
'multiprocessing.heap', 'multiprocessing.managers',
1615
'multiprocessing.pool', 'multiprocessing.process',
1616
'multiprocessing.reduction', 'multiprocessing.sharedctypes',
1617
'multiprocessing.synchronize', 'multiprocessing.util'
1620
for name in modules:
1622
mod = sys.modules[name]
1624
for attr in getattr(mod, '__all__', ()):
1627
'%r does not have attribute %r' % (mod, attr)
1631
# Quick test that logging works -- does not test logging output
1634
class _TestLogging(BaseTestCase):
1636
ALLOWED_TYPES = ('processes',)
1638
def test_enable_logging(self):
1639
logger = multiprocessing.get_logger()
1640
logger.setLevel(util.SUBWARNING)
1641
self.assertTrue(logger is not None)
1642
logger.debug('this will not be printed')
1643
logger.info('nor will this')
1644
logger.setLevel(LOG_LEVEL)
1646
def _test_level(self, conn):
1647
logger = multiprocessing.get_logger()
1648
conn.send(logger.getEffectiveLevel())
1650
def test_level(self):
1654
logger = multiprocessing.get_logger()
1655
root_logger = logging.getLogger()
1656
root_level = root_logger.level
1658
reader, writer = multiprocessing.Pipe(duplex=False)
1660
logger.setLevel(LEVEL1)
1661
self.Process(target=self._test_level, args=(writer,)).start()
1662
self.assertEqual(LEVEL1, reader.recv())
1664
logger.setLevel(logging.NOTSET)
1665
root_logger.setLevel(LEVEL2)
1666
self.Process(target=self._test_level, args=(writer,)).start()
1667
self.assertEqual(LEVEL2, reader.recv())
1669
root_logger.setLevel(root_level)
1670
logger.setLevel(level=LOG_LEVEL)
1673
# Functions used to create test cases from the base ones in this module
1676
def get_attributes(Source, names):
1679
obj = getattr(Source, name)
1680
if type(obj) == type(get_attributes):
1681
obj = staticmethod(obj)
1685
def create_test_cases(Mixin, type):
1688
Type = type[0].upper() + type[1:]
1690
for name in glob.keys():
1691
if name.startswith('_Test'):
1693
if type in base.ALLOWED_TYPES:
1694
newname = 'With' + Type + name[1:]
1695
class Temp(base, unittest.TestCase, Mixin):
1697
result[newname] = Temp
1698
Temp.__name__ = newname
1699
Temp.__module__ = Mixin.__module__
1706
class ProcessesMixin(object):
1708
Process = multiprocessing.Process
1709
locals().update(get_attributes(multiprocessing, (
1710
'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1711
'Condition', 'Event', 'Value', 'Array', 'RawValue',
1712
'RawArray', 'current_process', 'active_children', 'Pipe',
1713
'connection', 'JoinableQueue'
1716
testcases_processes = create_test_cases(ProcessesMixin, type='processes')
1717
globals().update(testcases_processes)
1720
class ManagerMixin(object):
1722
Process = multiprocessing.Process
1723
manager = object.__new__(multiprocessing.managers.SyncManager)
1724
locals().update(get_attributes(manager, (
1725
'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1726
'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
1727
'Namespace', 'JoinableQueue'
1730
testcases_manager = create_test_cases(ManagerMixin, type='manager')
1731
globals().update(testcases_manager)
1734
class ThreadsMixin(object):
1736
Process = multiprocessing.dummy.Process
1737
locals().update(get_attributes(multiprocessing.dummy, (
1738
'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1739
'Condition', 'Event', 'Value', 'Array', 'current_process',
1740
'active_children', 'Pipe', 'connection', 'dict', 'list',
1741
'Namespace', 'JoinableQueue'
1744
testcases_threads = create_test_cases(ThreadsMixin, type='threads')
1745
globals().update(testcases_threads)
1747
class OtherTest(unittest.TestCase):
1748
# TODO: add more tests for deliver/answer challenge.
1749
def test_deliver_challenge_auth_failure(self):
1750
class _FakeConnection(object):
1751
def recv_bytes(self, size):
1752
return b'something bogus'
1753
def send_bytes(self, data):
1755
self.assertRaises(multiprocessing.AuthenticationError,
1756
multiprocessing.connection.deliver_challenge,
1757
_FakeConnection(), b'abc')
1759
def test_answer_challenge_auth_failure(self):
1760
class _FakeConnection(object):
1763
def recv_bytes(self, size):
1766
return multiprocessing.connection.CHALLENGE
1767
elif self.count == 2:
1768
return b'something bogus'
1770
def send_bytes(self, data):
1772
self.assertRaises(multiprocessing.AuthenticationError,
1773
multiprocessing.connection.answer_challenge,
1774
_FakeConnection(), b'abc')
1776
testcases_other = [OtherTest]
1782
def test_main(run=None):
1783
if sys.platform.startswith("linux"):
1785
lock = multiprocessing.RLock()
1787
from test.test_support import TestSkipped
1788
raise TestSkipped("OSError raises on RLock creation, see issue 3111!")
1791
from test.test_support import run_unittest as run
1793
util.get_temp_dir() # creates temp directory for use by all processes
1795
multiprocessing.get_logger().setLevel(LOG_LEVEL)
1797
ProcessesMixin.pool = multiprocessing.Pool(4)
1798
ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
1799
ManagerMixin.manager.__init__()
1800
ManagerMixin.manager.start()
1801
ManagerMixin.pool = ManagerMixin.manager.Pool(4)
1804
sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
1805
sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
1806
sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
1810
loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
1811
suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
1814
ThreadsMixin.pool.terminate()
1815
ProcessesMixin.pool.terminate()
1816
ManagerMixin.pool.terminate()
1817
ManagerMixin.manager.shutdown()
1819
del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
1822
test_main(unittest.TextTestRunner(verbosity=2).run)
1824
if __name__ == '__main__':