~malept/ubuntu/lucid/python2.6/dev-dependency-fix

« back to all changes in this revision

Viewing changes to Lib/test/test_multiprocessing.py

  • Committer: Bazaar Package Importer
  • Author(s): Matthias Klose
  • Date: 2009-02-13 12:51:00 UTC
  • Revision ID: james.westby@ubuntu.com-20090213125100-uufgcb9yeqzujpqw
Tags: upstream-2.6.1
ImportĀ upstreamĀ versionĀ 2.6.1

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
#!/usr/bin/env python
 
2
 
 
3
#
 
4
# Unit tests for the multiprocessing package
 
5
#
 
6
 
 
7
import unittest
 
8
import threading
 
9
import Queue
 
10
import time
 
11
import sys
 
12
import os
 
13
import gc
 
14
import signal
 
15
import array
 
16
import copy
 
17
import socket
 
18
import random
 
19
import logging
 
20
 
 
21
 
 
22
# Work around broken sem_open implementations
 
23
try:
 
24
    import multiprocessing.synchronize
 
25
except ImportError, e:
 
26
    from test.test_support import TestSkipped
 
27
    raise TestSkipped(e)
 
28
 
 
29
import multiprocessing.dummy
 
30
import multiprocessing.connection
 
31
import multiprocessing.managers
 
32
import multiprocessing.heap
 
33
import multiprocessing.pool
 
34
import _multiprocessing
 
35
 
 
36
from multiprocessing import util
 
37
 
 
38
#
 
39
#
 
40
#
 
41
 
 
42
latin = str
 
43
 
 
44
#
 
45
# Constants
 
46
#
 
47
 
 
48
LOG_LEVEL = util.SUBWARNING
 
49
#LOG_LEVEL = logging.WARNING
 
50
 
 
51
DELTA = 0.1
 
52
CHECK_TIMINGS = False     # making true makes tests take a lot longer
 
53
                          # and can sometimes cause some non-serious
 
54
                          # failures because some calls block a bit
 
55
                          # longer than expected
 
56
if CHECK_TIMINGS:
 
57
    TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
 
58
else:
 
59
    TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
 
60
 
 
61
HAVE_GETVALUE = not getattr(_multiprocessing,
 
62
                            'HAVE_BROKEN_SEM_GETVALUE', False)
 
63
 
 
64
#
 
65
# Creates a wrapper for a function which records the time it takes to finish
 
66
#
 
67
 
 
68
class TimingWrapper(object):
 
69
 
 
70
    def __init__(self, func):
 
71
        self.func = func
 
72
        self.elapsed = None
 
73
 
 
74
    def __call__(self, *args, **kwds):
 
75
        t = time.time()
 
76
        try:
 
77
            return self.func(*args, **kwds)
 
78
        finally:
 
79
            self.elapsed = time.time() - t
 
80
 
 
81
#
 
82
# Base class for test cases
 
83
#
 
84
 
 
85
class BaseTestCase(object):
 
86
 
 
87
    ALLOWED_TYPES = ('processes', 'manager', 'threads')
 
88
 
 
89
    def assertTimingAlmostEqual(self, a, b):
 
90
        if CHECK_TIMINGS:
 
91
            self.assertAlmostEqual(a, b, 1)
 
92
 
 
93
    def assertReturnsIfImplemented(self, value, func, *args):
 
94
        try:
 
95
            res = func(*args)
 
96
        except NotImplementedError:
 
97
            pass
 
98
        else:
 
99
            return self.assertEqual(value, res)
 
100
 
 
101
#
 
102
# Return the value of a semaphore
 
103
#
 
104
 
 
105
def get_value(self):
 
106
    try:
 
107
        return self.get_value()
 
108
    except AttributeError:
 
109
        try:
 
110
            return self._Semaphore__value
 
111
        except AttributeError:
 
112
            try:
 
113
                return self._value
 
114
            except AttributeError:
 
115
                raise NotImplementedError
 
116
 
 
117
#
 
118
# Testcases
 
119
#
 
120
 
 
121
class _TestProcess(BaseTestCase):
 
122
 
 
123
    ALLOWED_TYPES = ('processes', 'threads')
 
124
 
 
125
    def test_current(self):
 
126
        if self.TYPE == 'threads':
 
127
            return
 
128
 
 
129
        current = self.current_process()
 
130
        authkey = current.authkey
 
131
 
 
132
        self.assertTrue(current.is_alive())
 
133
        self.assertTrue(not current.daemon)
 
134
        self.assertTrue(isinstance(authkey, bytes))
 
135
        self.assertTrue(len(authkey) > 0)
 
136
        self.assertEqual(current.ident, os.getpid())
 
137
        self.assertEqual(current.exitcode, None)
 
138
 
 
139
    def _test(self, q, *args, **kwds):
 
140
        current = self.current_process()
 
141
        q.put(args)
 
142
        q.put(kwds)
 
143
        q.put(current.name)
 
144
        if self.TYPE != 'threads':
 
145
            q.put(bytes(current.authkey))
 
146
            q.put(current.pid)
 
147
 
 
148
    def test_process(self):
 
149
        q = self.Queue(1)
 
150
        e = self.Event()
 
151
        args = (q, 1, 2)
 
152
        kwargs = {'hello':23, 'bye':2.54}
 
153
        name = 'SomeProcess'
 
154
        p = self.Process(
 
155
            target=self._test, args=args, kwargs=kwargs, name=name
 
156
            )
 
157
        p.daemon = True
 
158
        current = self.current_process()
 
159
 
 
160
        if self.TYPE != 'threads':
 
161
            self.assertEquals(p.authkey, current.authkey)
 
162
        self.assertEquals(p.is_alive(), False)
 
163
        self.assertEquals(p.daemon, True)
 
164
        self.assertTrue(p not in self.active_children())
 
165
        self.assertTrue(type(self.active_children()) is list)
 
166
        self.assertEqual(p.exitcode, None)
 
167
 
 
168
        p.start()
 
169
 
 
170
        self.assertEquals(p.exitcode, None)
 
171
        self.assertEquals(p.is_alive(), True)
 
172
        self.assertTrue(p in self.active_children())
 
173
 
 
174
        self.assertEquals(q.get(), args[1:])
 
175
        self.assertEquals(q.get(), kwargs)
 
176
        self.assertEquals(q.get(), p.name)
 
177
        if self.TYPE != 'threads':
 
178
            self.assertEquals(q.get(), current.authkey)
 
179
            self.assertEquals(q.get(), p.pid)
 
180
 
 
181
        p.join()
 
182
 
 
183
        self.assertEquals(p.exitcode, 0)
 
184
        self.assertEquals(p.is_alive(), False)
 
185
        self.assertTrue(p not in self.active_children())
 
186
 
 
187
    def _test_terminate(self):
 
188
        time.sleep(1000)
 
189
 
 
190
    def test_terminate(self):
 
191
        if self.TYPE == 'threads':
 
192
            return
 
193
 
 
194
        p = self.Process(target=self._test_terminate)
 
195
        p.daemon = True
 
196
        p.start()
 
197
 
 
198
        self.assertEqual(p.is_alive(), True)
 
199
        self.assertTrue(p in self.active_children())
 
200
        self.assertEqual(p.exitcode, None)
 
201
 
 
202
        p.terminate()
 
203
 
 
204
        join = TimingWrapper(p.join)
 
205
        self.assertEqual(join(), None)
 
206
        self.assertTimingAlmostEqual(join.elapsed, 0.0)
 
207
 
 
208
        self.assertEqual(p.is_alive(), False)
 
209
        self.assertTrue(p not in self.active_children())
 
210
 
 
211
        p.join()
 
212
 
 
213
        # XXX sometimes get p.exitcode == 0 on Windows ...
 
214
        #self.assertEqual(p.exitcode, -signal.SIGTERM)
 
215
 
 
216
    def test_cpu_count(self):
 
217
        try:
 
218
            cpus = multiprocessing.cpu_count()
 
219
        except NotImplementedError:
 
220
            cpus = 1
 
221
        self.assertTrue(type(cpus) is int)
 
222
        self.assertTrue(cpus >= 1)
 
223
 
 
224
    def test_active_children(self):
 
225
        self.assertEqual(type(self.active_children()), list)
 
226
 
 
227
        p = self.Process(target=time.sleep, args=(DELTA,))
 
228
        self.assertTrue(p not in self.active_children())
 
229
 
 
230
        p.start()
 
231
        self.assertTrue(p in self.active_children())
 
232
 
 
233
        p.join()
 
234
        self.assertTrue(p not in self.active_children())
 
235
 
 
236
    def _test_recursion(self, wconn, id):
 
237
        from multiprocessing import forking
 
238
        wconn.send(id)
 
239
        if len(id) < 2:
 
240
            for i in range(2):
 
241
                p = self.Process(
 
242
                    target=self._test_recursion, args=(wconn, id+[i])
 
243
                    )
 
244
                p.start()
 
245
                p.join()
 
246
 
 
247
    def test_recursion(self):
 
248
        rconn, wconn = self.Pipe(duplex=False)
 
249
        self._test_recursion(wconn, [])
 
250
 
 
251
        time.sleep(DELTA)
 
252
        result = []
 
253
        while rconn.poll():
 
254
            result.append(rconn.recv())
 
255
 
 
256
        expected = [
 
257
            [],
 
258
              [0],
 
259
                [0, 0],
 
260
                [0, 1],
 
261
              [1],
 
262
                [1, 0],
 
263
                [1, 1]
 
264
            ]
 
265
        self.assertEqual(result, expected)
 
266
 
 
267
#
 
268
#
 
269
#
 
270
 
 
271
class _UpperCaser(multiprocessing.Process):
 
272
 
 
273
    def __init__(self):
 
274
        multiprocessing.Process.__init__(self)
 
275
        self.child_conn, self.parent_conn = multiprocessing.Pipe()
 
276
 
 
277
    def run(self):
 
278
        self.parent_conn.close()
 
279
        for s in iter(self.child_conn.recv, None):
 
280
            self.child_conn.send(s.upper())
 
281
        self.child_conn.close()
 
282
 
 
283
    def submit(self, s):
 
284
        assert type(s) is str
 
285
        self.parent_conn.send(s)
 
286
        return self.parent_conn.recv()
 
287
 
 
288
    def stop(self):
 
289
        self.parent_conn.send(None)
 
290
        self.parent_conn.close()
 
291
        self.child_conn.close()
 
292
 
 
293
class _TestSubclassingProcess(BaseTestCase):
 
294
 
 
295
    ALLOWED_TYPES = ('processes',)
 
296
 
 
297
    def test_subclassing(self):
 
298
        uppercaser = _UpperCaser()
 
299
        uppercaser.start()
 
300
        self.assertEqual(uppercaser.submit('hello'), 'HELLO')
 
301
        self.assertEqual(uppercaser.submit('world'), 'WORLD')
 
302
        uppercaser.stop()
 
303
        uppercaser.join()
 
304
 
 
305
#
 
306
#
 
307
#
 
308
 
 
309
def queue_empty(q):
 
310
    if hasattr(q, 'empty'):
 
311
        return q.empty()
 
312
    else:
 
313
        return q.qsize() == 0
 
314
 
 
315
def queue_full(q, maxsize):
 
316
    if hasattr(q, 'full'):
 
317
        return q.full()
 
318
    else:
 
319
        return q.qsize() == maxsize
 
320
 
 
321
 
 
322
class _TestQueue(BaseTestCase):
 
323
 
 
324
 
 
325
    def _test_put(self, queue, child_can_start, parent_can_continue):
 
326
        child_can_start.wait()
 
327
        for i in range(6):
 
328
            queue.get()
 
329
        parent_can_continue.set()
 
330
 
 
331
    def test_put(self):
 
332
        MAXSIZE = 6
 
333
        queue = self.Queue(maxsize=MAXSIZE)
 
334
        child_can_start = self.Event()
 
335
        parent_can_continue = self.Event()
 
336
 
 
337
        proc = self.Process(
 
338
            target=self._test_put,
 
339
            args=(queue, child_can_start, parent_can_continue)
 
340
            )
 
341
        proc.daemon = True
 
342
        proc.start()
 
343
 
 
344
        self.assertEqual(queue_empty(queue), True)
 
345
        self.assertEqual(queue_full(queue, MAXSIZE), False)
 
346
 
 
347
        queue.put(1)
 
348
        queue.put(2, True)
 
349
        queue.put(3, True, None)
 
350
        queue.put(4, False)
 
351
        queue.put(5, False, None)
 
352
        queue.put_nowait(6)
 
353
 
 
354
        # the values may be in buffer but not yet in pipe so sleep a bit
 
355
        time.sleep(DELTA)
 
356
 
 
357
        self.assertEqual(queue_empty(queue), False)
 
358
        self.assertEqual(queue_full(queue, MAXSIZE), True)
 
359
 
 
360
        put = TimingWrapper(queue.put)
 
361
        put_nowait = TimingWrapper(queue.put_nowait)
 
362
 
 
363
        self.assertRaises(Queue.Full, put, 7, False)
 
364
        self.assertTimingAlmostEqual(put.elapsed, 0)
 
365
 
 
366
        self.assertRaises(Queue.Full, put, 7, False, None)
 
367
        self.assertTimingAlmostEqual(put.elapsed, 0)
 
368
 
 
369
        self.assertRaises(Queue.Full, put_nowait, 7)
 
370
        self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
 
371
 
 
372
        self.assertRaises(Queue.Full, put, 7, True, TIMEOUT1)
 
373
        self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
 
374
 
 
375
        self.assertRaises(Queue.Full, put, 7, False, TIMEOUT2)
 
376
        self.assertTimingAlmostEqual(put.elapsed, 0)
 
377
 
 
378
        self.assertRaises(Queue.Full, put, 7, True, timeout=TIMEOUT3)
 
379
        self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
 
380
 
 
381
        child_can_start.set()
 
382
        parent_can_continue.wait()
 
383
 
 
384
        self.assertEqual(queue_empty(queue), True)
 
385
        self.assertEqual(queue_full(queue, MAXSIZE), False)
 
386
 
 
387
        proc.join()
 
388
 
 
389
    def _test_get(self, queue, child_can_start, parent_can_continue):
 
390
        child_can_start.wait()
 
391
        #queue.put(1)
 
392
        queue.put(2)
 
393
        queue.put(3)
 
394
        queue.put(4)
 
395
        queue.put(5)
 
396
        parent_can_continue.set()
 
397
 
 
398
    def test_get(self):
 
399
        queue = self.Queue()
 
400
        child_can_start = self.Event()
 
401
        parent_can_continue = self.Event()
 
402
 
 
403
        proc = self.Process(
 
404
            target=self._test_get,
 
405
            args=(queue, child_can_start, parent_can_continue)
 
406
            )
 
407
        proc.daemon = True
 
408
        proc.start()
 
409
 
 
410
        self.assertEqual(queue_empty(queue), True)
 
411
 
 
412
        child_can_start.set()
 
413
        parent_can_continue.wait()
 
414
 
 
415
        time.sleep(DELTA)
 
416
        self.assertEqual(queue_empty(queue), False)
 
417
 
 
418
        # Hangs unexpectedly, remove for now
 
419
        #self.assertEqual(queue.get(), 1)
 
420
        self.assertEqual(queue.get(True, None), 2)
 
421
        self.assertEqual(queue.get(True), 3)
 
422
        self.assertEqual(queue.get(timeout=1), 4)
 
423
        self.assertEqual(queue.get_nowait(), 5)
 
424
 
 
425
        self.assertEqual(queue_empty(queue), True)
 
426
 
 
427
        get = TimingWrapper(queue.get)
 
428
        get_nowait = TimingWrapper(queue.get_nowait)
 
429
 
 
430
        self.assertRaises(Queue.Empty, get, False)
 
431
        self.assertTimingAlmostEqual(get.elapsed, 0)
 
432
 
 
433
        self.assertRaises(Queue.Empty, get, False, None)
 
434
        self.assertTimingAlmostEqual(get.elapsed, 0)
 
435
 
 
436
        self.assertRaises(Queue.Empty, get_nowait)
 
437
        self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
 
438
 
 
439
        self.assertRaises(Queue.Empty, get, True, TIMEOUT1)
 
440
        self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
 
441
 
 
442
        self.assertRaises(Queue.Empty, get, False, TIMEOUT2)
 
443
        self.assertTimingAlmostEqual(get.elapsed, 0)
 
444
 
 
445
        self.assertRaises(Queue.Empty, get, timeout=TIMEOUT3)
 
446
        self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
 
447
 
 
448
        proc.join()
 
449
 
 
450
    def _test_fork(self, queue):
 
451
        for i in range(10, 20):
 
452
            queue.put(i)
 
453
        # note that at this point the items may only be buffered, so the
 
454
        # process cannot shutdown until the feeder thread has finished
 
455
        # pushing items onto the pipe.
 
456
 
 
457
    def test_fork(self):
 
458
        # Old versions of Queue would fail to create a new feeder
 
459
        # thread for a forked process if the original process had its
 
460
        # own feeder thread.  This test checks that this no longer
 
461
        # happens.
 
462
 
 
463
        queue = self.Queue()
 
464
 
 
465
        # put items on queue so that main process starts a feeder thread
 
466
        for i in range(10):
 
467
            queue.put(i)
 
468
 
 
469
        # wait to make sure thread starts before we fork a new process
 
470
        time.sleep(DELTA)
 
471
 
 
472
        # fork process
 
473
        p = self.Process(target=self._test_fork, args=(queue,))
 
474
        p.start()
 
475
 
 
476
        # check that all expected items are in the queue
 
477
        for i in range(20):
 
478
            self.assertEqual(queue.get(), i)
 
479
        self.assertRaises(Queue.Empty, queue.get, False)
 
480
 
 
481
        p.join()
 
482
 
 
483
    def test_qsize(self):
 
484
        q = self.Queue()
 
485
        try:
 
486
            self.assertEqual(q.qsize(), 0)
 
487
        except NotImplementedError:
 
488
            return
 
489
        q.put(1)
 
490
        self.assertEqual(q.qsize(), 1)
 
491
        q.put(5)
 
492
        self.assertEqual(q.qsize(), 2)
 
493
        q.get()
 
494
        self.assertEqual(q.qsize(), 1)
 
495
        q.get()
 
496
        self.assertEqual(q.qsize(), 0)
 
497
 
 
498
    def _test_task_done(self, q):
 
499
        for obj in iter(q.get, None):
 
500
            time.sleep(DELTA)
 
501
            q.task_done()
 
502
 
 
503
    def test_task_done(self):
 
504
        queue = self.JoinableQueue()
 
505
 
 
506
        if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
 
507
            return
 
508
 
 
509
        workers = [self.Process(target=self._test_task_done, args=(queue,))
 
510
                   for i in xrange(4)]
 
511
 
 
512
        for p in workers:
 
513
            p.start()
 
514
 
 
515
        for i in xrange(10):
 
516
            queue.put(i)
 
517
 
 
518
        queue.join()
 
519
 
 
520
        for p in workers:
 
521
            queue.put(None)
 
522
 
 
523
        for p in workers:
 
524
            p.join()
 
525
 
 
526
#
 
527
#
 
528
#
 
529
 
 
530
class _TestLock(BaseTestCase):
 
531
 
 
532
    def test_lock(self):
 
533
        lock = self.Lock()
 
534
        self.assertEqual(lock.acquire(), True)
 
535
        self.assertEqual(lock.acquire(False), False)
 
536
        self.assertEqual(lock.release(), None)
 
537
        self.assertRaises((ValueError, threading.ThreadError), lock.release)
 
538
 
 
539
    def test_rlock(self):
 
540
        lock = self.RLock()
 
541
        self.assertEqual(lock.acquire(), True)
 
542
        self.assertEqual(lock.acquire(), True)
 
543
        self.assertEqual(lock.acquire(), True)
 
544
        self.assertEqual(lock.release(), None)
 
545
        self.assertEqual(lock.release(), None)
 
546
        self.assertEqual(lock.release(), None)
 
547
        self.assertRaises((AssertionError, RuntimeError), lock.release)
 
548
 
 
549
 
 
550
class _TestSemaphore(BaseTestCase):
 
551
 
 
552
    def _test_semaphore(self, sem):
 
553
        self.assertReturnsIfImplemented(2, get_value, sem)
 
554
        self.assertEqual(sem.acquire(), True)
 
555
        self.assertReturnsIfImplemented(1, get_value, sem)
 
556
        self.assertEqual(sem.acquire(), True)
 
557
        self.assertReturnsIfImplemented(0, get_value, sem)
 
558
        self.assertEqual(sem.acquire(False), False)
 
559
        self.assertReturnsIfImplemented(0, get_value, sem)
 
560
        self.assertEqual(sem.release(), None)
 
561
        self.assertReturnsIfImplemented(1, get_value, sem)
 
562
        self.assertEqual(sem.release(), None)
 
563
        self.assertReturnsIfImplemented(2, get_value, sem)
 
564
 
 
565
    def test_semaphore(self):
 
566
        sem = self.Semaphore(2)
 
567
        self._test_semaphore(sem)
 
568
        self.assertEqual(sem.release(), None)
 
569
        self.assertReturnsIfImplemented(3, get_value, sem)
 
570
        self.assertEqual(sem.release(), None)
 
571
        self.assertReturnsIfImplemented(4, get_value, sem)
 
572
 
 
573
    def test_bounded_semaphore(self):
 
574
        sem = self.BoundedSemaphore(2)
 
575
        self._test_semaphore(sem)
 
576
        # Currently fails on OS/X
 
577
        #if HAVE_GETVALUE:
 
578
        #    self.assertRaises(ValueError, sem.release)
 
579
        #    self.assertReturnsIfImplemented(2, get_value, sem)
 
580
 
 
581
    def test_timeout(self):
 
582
        if self.TYPE != 'processes':
 
583
            return
 
584
 
 
585
        sem = self.Semaphore(0)
 
586
        acquire = TimingWrapper(sem.acquire)
 
587
 
 
588
        self.assertEqual(acquire(False), False)
 
589
        self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
 
590
 
 
591
        self.assertEqual(acquire(False, None), False)
 
592
        self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
 
593
 
 
594
        self.assertEqual(acquire(False, TIMEOUT1), False)
 
595
        self.assertTimingAlmostEqual(acquire.elapsed, 0)
 
596
 
 
597
        self.assertEqual(acquire(True, TIMEOUT2), False)
 
598
        self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
 
599
 
 
600
        self.assertEqual(acquire(timeout=TIMEOUT3), False)
 
601
        self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
 
602
 
 
603
 
 
604
class _TestCondition(BaseTestCase):
 
605
 
 
606
    def f(self, cond, sleeping, woken, timeout=None):
 
607
        cond.acquire()
 
608
        sleeping.release()
 
609
        cond.wait(timeout)
 
610
        woken.release()
 
611
        cond.release()
 
612
 
 
613
    def check_invariant(self, cond):
 
614
        # this is only supposed to succeed when there are no sleepers
 
615
        if self.TYPE == 'processes':
 
616
            try:
 
617
                sleepers = (cond._sleeping_count.get_value() -
 
618
                            cond._woken_count.get_value())
 
619
                self.assertEqual(sleepers, 0)
 
620
                self.assertEqual(cond._wait_semaphore.get_value(), 0)
 
621
            except NotImplementedError:
 
622
                pass
 
623
 
 
624
    def test_notify(self):
 
625
        cond = self.Condition()
 
626
        sleeping = self.Semaphore(0)
 
627
        woken = self.Semaphore(0)
 
628
 
 
629
        p = self.Process(target=self.f, args=(cond, sleeping, woken))
 
630
        p.daemon = True
 
631
        p.start()
 
632
 
 
633
        p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
 
634
        p.daemon = True
 
635
        p.start()
 
636
 
 
637
        # wait for both children to start sleeping
 
638
        sleeping.acquire()
 
639
        sleeping.acquire()
 
640
 
 
641
        # check no process/thread has woken up
 
642
        time.sleep(DELTA)
 
643
        self.assertReturnsIfImplemented(0, get_value, woken)
 
644
 
 
645
        # wake up one process/thread
 
646
        cond.acquire()
 
647
        cond.notify()
 
648
        cond.release()
 
649
 
 
650
        # check one process/thread has woken up
 
651
        time.sleep(DELTA)
 
652
        self.assertReturnsIfImplemented(1, get_value, woken)
 
653
 
 
654
        # wake up another
 
655
        cond.acquire()
 
656
        cond.notify()
 
657
        cond.release()
 
658
 
 
659
        # check other has woken up
 
660
        time.sleep(DELTA)
 
661
        self.assertReturnsIfImplemented(2, get_value, woken)
 
662
 
 
663
        # check state is not mucked up
 
664
        self.check_invariant(cond)
 
665
        p.join()
 
666
 
 
667
    def test_notify_all(self):
 
668
        cond = self.Condition()
 
669
        sleeping = self.Semaphore(0)
 
670
        woken = self.Semaphore(0)
 
671
 
 
672
        # start some threads/processes which will timeout
 
673
        for i in range(3):
 
674
            p = self.Process(target=self.f,
 
675
                             args=(cond, sleeping, woken, TIMEOUT1))
 
676
            p.daemon = True
 
677
            p.start()
 
678
 
 
679
            t = threading.Thread(target=self.f,
 
680
                                 args=(cond, sleeping, woken, TIMEOUT1))
 
681
            t.daemon = True
 
682
            t.start()
 
683
 
 
684
        # wait for them all to sleep
 
685
        for i in xrange(6):
 
686
            sleeping.acquire()
 
687
 
 
688
        # check they have all timed out
 
689
        for i in xrange(6):
 
690
            woken.acquire()
 
691
        self.assertReturnsIfImplemented(0, get_value, woken)
 
692
 
 
693
        # check state is not mucked up
 
694
        self.check_invariant(cond)
 
695
 
 
696
        # start some more threads/processes
 
697
        for i in range(3):
 
698
            p = self.Process(target=self.f, args=(cond, sleeping, woken))
 
699
            p.daemon = True
 
700
            p.start()
 
701
 
 
702
            t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
 
703
            t.daemon = True
 
704
            t.start()
 
705
 
 
706
        # wait for them to all sleep
 
707
        for i in xrange(6):
 
708
            sleeping.acquire()
 
709
 
 
710
        # check no process/thread has woken up
 
711
        time.sleep(DELTA)
 
712
        self.assertReturnsIfImplemented(0, get_value, woken)
 
713
 
 
714
        # wake them all up
 
715
        cond.acquire()
 
716
        cond.notify_all()
 
717
        cond.release()
 
718
 
 
719
        # check they have all woken
 
720
        time.sleep(DELTA)
 
721
        self.assertReturnsIfImplemented(6, get_value, woken)
 
722
 
 
723
        # check state is not mucked up
 
724
        self.check_invariant(cond)
 
725
 
 
726
    def test_timeout(self):
 
727
        cond = self.Condition()
 
728
        wait = TimingWrapper(cond.wait)
 
729
        cond.acquire()
 
730
        res = wait(TIMEOUT1)
 
731
        cond.release()
 
732
        self.assertEqual(res, None)
 
733
        self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
 
734
 
 
735
 
 
736
class _TestEvent(BaseTestCase):
 
737
 
 
738
    def _test_event(self, event):
 
739
        time.sleep(TIMEOUT2)
 
740
        event.set()
 
741
 
 
742
    def test_event(self):
 
743
        event = self.Event()
 
744
        wait = TimingWrapper(event.wait)
 
745
 
 
746
        # Removed temporaily, due to API shear, this does not
 
747
        # work with threading._Event objects. is_set == isSet
 
748
        #self.assertEqual(event.is_set(), False)
 
749
 
 
750
        self.assertEqual(wait(0.0), None)
 
751
        self.assertTimingAlmostEqual(wait.elapsed, 0.0)
 
752
        self.assertEqual(wait(TIMEOUT1), None)
 
753
        self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
 
754
 
 
755
        event.set()
 
756
 
 
757
        # See note above on the API differences
 
758
        # self.assertEqual(event.is_set(), True)
 
759
        self.assertEqual(wait(), None)
 
760
        self.assertTimingAlmostEqual(wait.elapsed, 0.0)
 
761
        self.assertEqual(wait(TIMEOUT1), None)
 
762
        self.assertTimingAlmostEqual(wait.elapsed, 0.0)
 
763
        # self.assertEqual(event.is_set(), True)
 
764
 
 
765
        event.clear()
 
766
 
 
767
        #self.assertEqual(event.is_set(), False)
 
768
 
 
769
        self.Process(target=self._test_event, args=(event,)).start()
 
770
        self.assertEqual(wait(), None)
 
771
 
 
772
#
 
773
#
 
774
#
 
775
 
 
776
class _TestValue(BaseTestCase):
 
777
 
 
778
    codes_values = [
 
779
        ('i', 4343, 24234),
 
780
        ('d', 3.625, -4.25),
 
781
        ('h', -232, 234),
 
782
        ('c', latin('x'), latin('y'))
 
783
        ]
 
784
 
 
785
    def _test(self, values):
 
786
        for sv, cv in zip(values, self.codes_values):
 
787
            sv.value = cv[2]
 
788
 
 
789
 
 
790
    def test_value(self, raw=False):
 
791
        if self.TYPE != 'processes':
 
792
            return
 
793
 
 
794
        if raw:
 
795
            values = [self.RawValue(code, value)
 
796
                      for code, value, _ in self.codes_values]
 
797
        else:
 
798
            values = [self.Value(code, value)
 
799
                      for code, value, _ in self.codes_values]
 
800
 
 
801
        for sv, cv in zip(values, self.codes_values):
 
802
            self.assertEqual(sv.value, cv[1])
 
803
 
 
804
        proc = self.Process(target=self._test, args=(values,))
 
805
        proc.start()
 
806
        proc.join()
 
807
 
 
808
        for sv, cv in zip(values, self.codes_values):
 
809
            self.assertEqual(sv.value, cv[2])
 
810
 
 
811
    def test_rawvalue(self):
 
812
        self.test_value(raw=True)
 
813
 
 
814
    def test_getobj_getlock(self):
 
815
        if self.TYPE != 'processes':
 
816
            return
 
817
 
 
818
        val1 = self.Value('i', 5)
 
819
        lock1 = val1.get_lock()
 
820
        obj1 = val1.get_obj()
 
821
 
 
822
        val2 = self.Value('i', 5, lock=None)
 
823
        lock2 = val2.get_lock()
 
824
        obj2 = val2.get_obj()
 
825
 
 
826
        lock = self.Lock()
 
827
        val3 = self.Value('i', 5, lock=lock)
 
828
        lock3 = val3.get_lock()
 
829
        obj3 = val3.get_obj()
 
830
        self.assertEqual(lock, lock3)
 
831
 
 
832
        arr4 = self.RawValue('i', 5)
 
833
        self.assertFalse(hasattr(arr4, 'get_lock'))
 
834
        self.assertFalse(hasattr(arr4, 'get_obj'))
 
835
 
 
836
 
 
837
class _TestArray(BaseTestCase):
 
838
 
 
839
    def f(self, seq):
 
840
        for i in range(1, len(seq)):
 
841
            seq[i] += seq[i-1]
 
842
 
 
843
    def test_array(self, raw=False):
 
844
        if self.TYPE != 'processes':
 
845
            return
 
846
 
 
847
        seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
 
848
        if raw:
 
849
            arr = self.RawArray('i', seq)
 
850
        else:
 
851
            arr = self.Array('i', seq)
 
852
 
 
853
        self.assertEqual(len(arr), len(seq))
 
854
        self.assertEqual(arr[3], seq[3])
 
855
        self.assertEqual(list(arr[2:7]), list(seq[2:7]))
 
856
 
 
857
        arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
 
858
 
 
859
        self.assertEqual(list(arr[:]), seq)
 
860
 
 
861
        self.f(seq)
 
862
 
 
863
        p = self.Process(target=self.f, args=(arr,))
 
864
        p.start()
 
865
        p.join()
 
866
 
 
867
        self.assertEqual(list(arr[:]), seq)
 
868
 
 
869
    def test_rawarray(self):
 
870
        self.test_array(raw=True)
 
871
 
 
872
    def test_getobj_getlock_obj(self):
 
873
        if self.TYPE != 'processes':
 
874
            return
 
875
 
 
876
        arr1 = self.Array('i', range(10))
 
877
        lock1 = arr1.get_lock()
 
878
        obj1 = arr1.get_obj()
 
879
 
 
880
        arr2 = self.Array('i', range(10), lock=None)
 
881
        lock2 = arr2.get_lock()
 
882
        obj2 = arr2.get_obj()
 
883
 
 
884
        lock = self.Lock()
 
885
        arr3 = self.Array('i', range(10), lock=lock)
 
886
        lock3 = arr3.get_lock()
 
887
        obj3 = arr3.get_obj()
 
888
        self.assertEqual(lock, lock3)
 
889
 
 
890
        arr4 = self.RawArray('i', range(10))
 
891
        self.assertFalse(hasattr(arr4, 'get_lock'))
 
892
        self.assertFalse(hasattr(arr4, 'get_obj'))
 
893
 
 
894
#
 
895
#
 
896
#
 
897
 
 
898
class _TestContainers(BaseTestCase):
 
899
 
 
900
    ALLOWED_TYPES = ('manager',)
 
901
 
 
902
    def test_list(self):
 
903
        a = self.list(range(10))
 
904
        self.assertEqual(a[:], range(10))
 
905
 
 
906
        b = self.list()
 
907
        self.assertEqual(b[:], [])
 
908
 
 
909
        b.extend(range(5))
 
910
        self.assertEqual(b[:], range(5))
 
911
 
 
912
        self.assertEqual(b[2], 2)
 
913
        self.assertEqual(b[2:10], [2,3,4])
 
914
 
 
915
        b *= 2
 
916
        self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
 
917
 
 
918
        self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
 
919
 
 
920
        self.assertEqual(a[:], range(10))
 
921
 
 
922
        d = [a, b]
 
923
        e = self.list(d)
 
924
        self.assertEqual(
 
925
            e[:],
 
926
            [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
 
927
            )
 
928
 
 
929
        f = self.list([a])
 
930
        a.append('hello')
 
931
        self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
 
932
 
 
933
    def test_dict(self):
 
934
        d = self.dict()
 
935
        indices = range(65, 70)
 
936
        for i in indices:
 
937
            d[i] = chr(i)
 
938
        self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
 
939
        self.assertEqual(sorted(d.keys()), indices)
 
940
        self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
 
941
        self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
 
942
 
 
943
    def test_namespace(self):
 
944
        n = self.Namespace()
 
945
        n.name = 'Bob'
 
946
        n.job = 'Builder'
 
947
        n._hidden = 'hidden'
 
948
        self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
 
949
        del n.job
 
950
        self.assertEqual(str(n), "Namespace(name='Bob')")
 
951
        self.assertTrue(hasattr(n, 'name'))
 
952
        self.assertTrue(not hasattr(n, 'job'))
 
953
 
 
954
#
 
955
#
 
956
#
 
957
 
 
958
def sqr(x, wait=0.0):
 
959
    time.sleep(wait)
 
960
    return x*x
 
961
class _TestPool(BaseTestCase):
 
962
 
 
963
    def test_apply(self):
 
964
        papply = self.pool.apply
 
965
        self.assertEqual(papply(sqr, (5,)), sqr(5))
 
966
        self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
 
967
 
 
968
    def test_map(self):
 
969
        pmap = self.pool.map
 
970
        self.assertEqual(pmap(sqr, range(10)), map(sqr, range(10)))
 
971
        self.assertEqual(pmap(sqr, range(100), chunksize=20),
 
972
                         map(sqr, range(100)))
 
973
 
 
974
    def test_async(self):
 
975
        res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
 
976
        get = TimingWrapper(res.get)
 
977
        self.assertEqual(get(), 49)
 
978
        self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
 
979
 
 
980
    def test_async_timeout(self):
 
981
        res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
 
982
        get = TimingWrapper(res.get)
 
983
        self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
 
984
        self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
 
985
 
 
986
    def test_imap(self):
 
987
        it = self.pool.imap(sqr, range(10))
 
988
        self.assertEqual(list(it), map(sqr, range(10)))
 
989
 
 
990
        it = self.pool.imap(sqr, range(10))
 
991
        for i in range(10):
 
992
            self.assertEqual(it.next(), i*i)
 
993
        self.assertRaises(StopIteration, it.next)
 
994
 
 
995
        it = self.pool.imap(sqr, range(1000), chunksize=100)
 
996
        for i in range(1000):
 
997
            self.assertEqual(it.next(), i*i)
 
998
        self.assertRaises(StopIteration, it.next)
 
999
 
 
1000
    def test_imap_unordered(self):
 
1001
        it = self.pool.imap_unordered(sqr, range(1000))
 
1002
        self.assertEqual(sorted(it), map(sqr, range(1000)))
 
1003
 
 
1004
        it = self.pool.imap_unordered(sqr, range(1000), chunksize=53)
 
1005
        self.assertEqual(sorted(it), map(sqr, range(1000)))
 
1006
 
 
1007
    def test_make_pool(self):
 
1008
        p = multiprocessing.Pool(3)
 
1009
        self.assertEqual(3, len(p._pool))
 
1010
        p.close()
 
1011
        p.join()
 
1012
 
 
1013
    def test_terminate(self):
 
1014
        if self.TYPE == 'manager':
 
1015
            # On Unix a forked process increfs each shared object to
 
1016
            # which its parent process held a reference.  If the
 
1017
            # forked process gets terminated then there is likely to
 
1018
            # be a reference leak.  So to prevent
 
1019
            # _TestZZZNumberOfObjects from failing we skip this test
 
1020
            # when using a manager.
 
1021
            return
 
1022
 
 
1023
        result = self.pool.map_async(
 
1024
            time.sleep, [0.1 for i in range(10000)], chunksize=1
 
1025
            )
 
1026
        self.pool.terminate()
 
1027
        join = TimingWrapper(self.pool.join)
 
1028
        join()
 
1029
        self.assertTrue(join.elapsed < 0.2)
 
1030
#
 
1031
# Test that manager has expected number of shared objects left
 
1032
#
 
1033
 
 
1034
class _TestZZZNumberOfObjects(BaseTestCase):
 
1035
    # Because test cases are sorted alphabetically, this one will get
 
1036
    # run after all the other tests for the manager.  It tests that
 
1037
    # there have been no "reference leaks" for the manager's shared
 
1038
    # objects.  Note the comment in _TestPool.test_terminate().
 
1039
    ALLOWED_TYPES = ('manager',)
 
1040
 
 
1041
    def test_number_of_objects(self):
 
1042
        EXPECTED_NUMBER = 1                # the pool object is still alive
 
1043
        multiprocessing.active_children()  # discard dead process objs
 
1044
        gc.collect()                       # do garbage collection
 
1045
        refs = self.manager._number_of_objects()
 
1046
        if refs != EXPECTED_NUMBER:
 
1047
            print self.manager._debug_info()
 
1048
 
 
1049
        self.assertEqual(refs, EXPECTED_NUMBER)
 
1050
 
 
1051
#
 
1052
# Test of creating a customized manager class
 
1053
#
 
1054
 
 
1055
from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
 
1056
 
 
1057
class FooBar(object):
 
1058
    def f(self):
 
1059
        return 'f()'
 
1060
    def g(self):
 
1061
        raise ValueError
 
1062
    def _h(self):
 
1063
        return '_h()'
 
1064
 
 
1065
def baz():
 
1066
    for i in xrange(10):
 
1067
        yield i*i
 
1068
 
 
1069
class IteratorProxy(BaseProxy):
 
1070
    _exposed_ = ('next', '__next__')
 
1071
    def __iter__(self):
 
1072
        return self
 
1073
    def next(self):
 
1074
        return self._callmethod('next')
 
1075
    def __next__(self):
 
1076
        return self._callmethod('__next__')
 
1077
 
 
1078
class MyManager(BaseManager):
 
1079
    pass
 
1080
 
 
1081
MyManager.register('Foo', callable=FooBar)
 
1082
MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
 
1083
MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
 
1084
 
 
1085
 
 
1086
class _TestMyManager(BaseTestCase):
 
1087
 
 
1088
    ALLOWED_TYPES = ('manager',)
 
1089
 
 
1090
    def test_mymanager(self):
 
1091
        manager = MyManager()
 
1092
        manager.start()
 
1093
 
 
1094
        foo = manager.Foo()
 
1095
        bar = manager.Bar()
 
1096
        baz = manager.baz()
 
1097
 
 
1098
        foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
 
1099
        bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
 
1100
 
 
1101
        self.assertEqual(foo_methods, ['f', 'g'])
 
1102
        self.assertEqual(bar_methods, ['f', '_h'])
 
1103
 
 
1104
        self.assertEqual(foo.f(), 'f()')
 
1105
        self.assertRaises(ValueError, foo.g)
 
1106
        self.assertEqual(foo._callmethod('f'), 'f()')
 
1107
        self.assertRaises(RemoteError, foo._callmethod, '_h')
 
1108
 
 
1109
        self.assertEqual(bar.f(), 'f()')
 
1110
        self.assertEqual(bar._h(), '_h()')
 
1111
        self.assertEqual(bar._callmethod('f'), 'f()')
 
1112
        self.assertEqual(bar._callmethod('_h'), '_h()')
 
1113
 
 
1114
        self.assertEqual(list(baz), [i*i for i in range(10)])
 
1115
 
 
1116
        manager.shutdown()
 
1117
 
 
1118
#
 
1119
# Test of connecting to a remote server and using xmlrpclib for serialization
 
1120
#
 
1121
 
 
1122
_queue = Queue.Queue()
 
1123
def get_queue():
 
1124
    return _queue
 
1125
 
 
1126
class QueueManager(BaseManager):
 
1127
    '''manager class used by server process'''
 
1128
QueueManager.register('get_queue', callable=get_queue)
 
1129
 
 
1130
class QueueManager2(BaseManager):
 
1131
    '''manager class which specifies the same interface as QueueManager'''
 
1132
QueueManager2.register('get_queue')
 
1133
 
 
1134
 
 
1135
SERIALIZER = 'xmlrpclib'
 
1136
 
 
1137
class _TestRemoteManager(BaseTestCase):
 
1138
 
 
1139
    ALLOWED_TYPES = ('manager',)
 
1140
 
 
1141
    def _putter(self, address, authkey):
 
1142
        manager = QueueManager2(
 
1143
            address=address, authkey=authkey, serializer=SERIALIZER
 
1144
            )
 
1145
        manager.connect()
 
1146
        queue = manager.get_queue()
 
1147
        queue.put(('hello world', None, True, 2.25))
 
1148
 
 
1149
    def test_remote(self):
 
1150
        authkey = os.urandom(32)
 
1151
 
 
1152
        manager = QueueManager(
 
1153
            address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
 
1154
            )
 
1155
        manager.start()
 
1156
 
 
1157
        p = self.Process(target=self._putter, args=(manager.address, authkey))
 
1158
        p.start()
 
1159
 
 
1160
        manager2 = QueueManager2(
 
1161
            address=manager.address, authkey=authkey, serializer=SERIALIZER
 
1162
            )
 
1163
        manager2.connect()
 
1164
        queue = manager2.get_queue()
 
1165
 
 
1166
        # Note that xmlrpclib will deserialize object as a list not a tuple
 
1167
        self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
 
1168
 
 
1169
        # Because we are using xmlrpclib for serialization instead of
 
1170
        # pickle this will cause a serialization error.
 
1171
        self.assertRaises(Exception, queue.put, time.sleep)
 
1172
 
 
1173
        # Make queue finalizer run before the server is stopped
 
1174
        del queue
 
1175
        manager.shutdown()
 
1176
 
 
1177
#
 
1178
#
 
1179
#
 
1180
 
 
1181
SENTINEL = latin('')
 
1182
 
 
1183
class _TestConnection(BaseTestCase):
 
1184
 
 
1185
    ALLOWED_TYPES = ('processes', 'threads')
 
1186
 
 
1187
    def _echo(self, conn):
 
1188
        for msg in iter(conn.recv_bytes, SENTINEL):
 
1189
            conn.send_bytes(msg)
 
1190
        conn.close()
 
1191
 
 
1192
    def test_connection(self):
 
1193
        conn, child_conn = self.Pipe()
 
1194
 
 
1195
        p = self.Process(target=self._echo, args=(child_conn,))
 
1196
        p.daemon = True
 
1197
        p.start()
 
1198
 
 
1199
        seq = [1, 2.25, None]
 
1200
        msg = latin('hello world')
 
1201
        longmsg = msg * 10
 
1202
        arr = array.array('i', range(4))
 
1203
 
 
1204
        if self.TYPE == 'processes':
 
1205
            self.assertEqual(type(conn.fileno()), int)
 
1206
 
 
1207
        self.assertEqual(conn.send(seq), None)
 
1208
        self.assertEqual(conn.recv(), seq)
 
1209
 
 
1210
        self.assertEqual(conn.send_bytes(msg), None)
 
1211
        self.assertEqual(conn.recv_bytes(), msg)
 
1212
 
 
1213
        if self.TYPE == 'processes':
 
1214
            buffer = array.array('i', [0]*10)
 
1215
            expected = list(arr) + [0] * (10 - len(arr))
 
1216
            self.assertEqual(conn.send_bytes(arr), None)
 
1217
            self.assertEqual(conn.recv_bytes_into(buffer),
 
1218
                             len(arr) * buffer.itemsize)
 
1219
            self.assertEqual(list(buffer), expected)
 
1220
 
 
1221
            buffer = array.array('i', [0]*10)
 
1222
            expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
 
1223
            self.assertEqual(conn.send_bytes(arr), None)
 
1224
            self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
 
1225
                             len(arr) * buffer.itemsize)
 
1226
            self.assertEqual(list(buffer), expected)
 
1227
 
 
1228
            buffer = bytearray(latin(' ' * 40))
 
1229
            self.assertEqual(conn.send_bytes(longmsg), None)
 
1230
            try:
 
1231
                res = conn.recv_bytes_into(buffer)
 
1232
            except multiprocessing.BufferTooShort, e:
 
1233
                self.assertEqual(e.args, (longmsg,))
 
1234
            else:
 
1235
                self.fail('expected BufferTooShort, got %s' % res)
 
1236
 
 
1237
        poll = TimingWrapper(conn.poll)
 
1238
 
 
1239
        self.assertEqual(poll(), False)
 
1240
        self.assertTimingAlmostEqual(poll.elapsed, 0)
 
1241
 
 
1242
        self.assertEqual(poll(TIMEOUT1), False)
 
1243
        self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
 
1244
 
 
1245
        conn.send(None)
 
1246
 
 
1247
        self.assertEqual(poll(TIMEOUT1), True)
 
1248
        self.assertTimingAlmostEqual(poll.elapsed, 0)
 
1249
 
 
1250
        self.assertEqual(conn.recv(), None)
 
1251
 
 
1252
        really_big_msg = latin('X') * (1024 * 1024 * 16)   # 16Mb
 
1253
        conn.send_bytes(really_big_msg)
 
1254
        self.assertEqual(conn.recv_bytes(), really_big_msg)
 
1255
 
 
1256
        conn.send_bytes(SENTINEL)                          # tell child to quit
 
1257
        child_conn.close()
 
1258
 
 
1259
        if self.TYPE == 'processes':
 
1260
            self.assertEqual(conn.readable, True)
 
1261
            self.assertEqual(conn.writable, True)
 
1262
            self.assertRaises(EOFError, conn.recv)
 
1263
            self.assertRaises(EOFError, conn.recv_bytes)
 
1264
 
 
1265
        p.join()
 
1266
 
 
1267
    def test_duplex_false(self):
 
1268
        reader, writer = self.Pipe(duplex=False)
 
1269
        self.assertEqual(writer.send(1), None)
 
1270
        self.assertEqual(reader.recv(), 1)
 
1271
        if self.TYPE == 'processes':
 
1272
            self.assertEqual(reader.readable, True)
 
1273
            self.assertEqual(reader.writable, False)
 
1274
            self.assertEqual(writer.readable, False)
 
1275
            self.assertEqual(writer.writable, True)
 
1276
            self.assertRaises(IOError, reader.send, 2)
 
1277
            self.assertRaises(IOError, writer.recv)
 
1278
            self.assertRaises(IOError, writer.poll)
 
1279
 
 
1280
    def test_spawn_close(self):
 
1281
        # We test that a pipe connection can be closed by parent
 
1282
        # process immediately after child is spawned.  On Windows this
 
1283
        # would have sometimes failed on old versions because
 
1284
        # child_conn would be closed before the child got a chance to
 
1285
        # duplicate it.
 
1286
        conn, child_conn = self.Pipe()
 
1287
 
 
1288
        p = self.Process(target=self._echo, args=(child_conn,))
 
1289
        p.start()
 
1290
        child_conn.close()    # this might complete before child initializes
 
1291
 
 
1292
        msg = latin('hello')
 
1293
        conn.send_bytes(msg)
 
1294
        self.assertEqual(conn.recv_bytes(), msg)
 
1295
 
 
1296
        conn.send_bytes(SENTINEL)
 
1297
        conn.close()
 
1298
        p.join()
 
1299
 
 
1300
    def test_sendbytes(self):
 
1301
        if self.TYPE != 'processes':
 
1302
            return
 
1303
 
 
1304
        msg = latin('abcdefghijklmnopqrstuvwxyz')
 
1305
        a, b = self.Pipe()
 
1306
 
 
1307
        a.send_bytes(msg)
 
1308
        self.assertEqual(b.recv_bytes(), msg)
 
1309
 
 
1310
        a.send_bytes(msg, 5)
 
1311
        self.assertEqual(b.recv_bytes(), msg[5:])
 
1312
 
 
1313
        a.send_bytes(msg, 7, 8)
 
1314
        self.assertEqual(b.recv_bytes(), msg[7:7+8])
 
1315
 
 
1316
        a.send_bytes(msg, 26)
 
1317
        self.assertEqual(b.recv_bytes(), latin(''))
 
1318
 
 
1319
        a.send_bytes(msg, 26, 0)
 
1320
        self.assertEqual(b.recv_bytes(), latin(''))
 
1321
 
 
1322
        self.assertRaises(ValueError, a.send_bytes, msg, 27)
 
1323
 
 
1324
        self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
 
1325
 
 
1326
        self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
 
1327
 
 
1328
        self.assertRaises(ValueError, a.send_bytes, msg, -1)
 
1329
 
 
1330
        self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
 
1331
 
 
1332
class _TestListenerClient(BaseTestCase):
 
1333
 
 
1334
    ALLOWED_TYPES = ('processes', 'threads')
 
1335
 
 
1336
    def _test(self, address):
 
1337
        conn = self.connection.Client(address)
 
1338
        conn.send('hello')
 
1339
        conn.close()
 
1340
 
 
1341
    def test_listener_client(self):
 
1342
        for family in self.connection.families:
 
1343
            l = self.connection.Listener(family=family)
 
1344
            p = self.Process(target=self._test, args=(l.address,))
 
1345
            p.daemon = True
 
1346
            p.start()
 
1347
            conn = l.accept()
 
1348
            self.assertEqual(conn.recv(), 'hello')
 
1349
            p.join()
 
1350
            l.close()
 
1351
#
 
1352
# Test of sending connection and socket objects between processes
 
1353
#
 
1354
"""
 
1355
class _TestPicklingConnections(BaseTestCase):
 
1356
 
 
1357
    ALLOWED_TYPES = ('processes',)
 
1358
 
 
1359
    def _listener(self, conn, families):
 
1360
        for fam in families:
 
1361
            l = self.connection.Listener(family=fam)
 
1362
            conn.send(l.address)
 
1363
            new_conn = l.accept()
 
1364
            conn.send(new_conn)
 
1365
 
 
1366
        if self.TYPE == 'processes':
 
1367
            l = socket.socket()
 
1368
            l.bind(('localhost', 0))
 
1369
            conn.send(l.getsockname())
 
1370
            l.listen(1)
 
1371
            new_conn, addr = l.accept()
 
1372
            conn.send(new_conn)
 
1373
 
 
1374
        conn.recv()
 
1375
 
 
1376
    def _remote(self, conn):
 
1377
        for (address, msg) in iter(conn.recv, None):
 
1378
            client = self.connection.Client(address)
 
1379
            client.send(msg.upper())
 
1380
            client.close()
 
1381
 
 
1382
        if self.TYPE == 'processes':
 
1383
            address, msg = conn.recv()
 
1384
            client = socket.socket()
 
1385
            client.connect(address)
 
1386
            client.sendall(msg.upper())
 
1387
            client.close()
 
1388
 
 
1389
        conn.close()
 
1390
 
 
1391
    def test_pickling(self):
 
1392
        try:
 
1393
            multiprocessing.allow_connection_pickling()
 
1394
        except ImportError:
 
1395
            return
 
1396
 
 
1397
        families = self.connection.families
 
1398
 
 
1399
        lconn, lconn0 = self.Pipe()
 
1400
        lp = self.Process(target=self._listener, args=(lconn0, families))
 
1401
        lp.start()
 
1402
        lconn0.close()
 
1403
 
 
1404
        rconn, rconn0 = self.Pipe()
 
1405
        rp = self.Process(target=self._remote, args=(rconn0,))
 
1406
        rp.start()
 
1407
        rconn0.close()
 
1408
 
 
1409
        for fam in families:
 
1410
            msg = ('This connection uses family %s' % fam).encode('ascii')
 
1411
            address = lconn.recv()
 
1412
            rconn.send((address, msg))
 
1413
            new_conn = lconn.recv()
 
1414
            self.assertEqual(new_conn.recv(), msg.upper())
 
1415
 
 
1416
        rconn.send(None)
 
1417
 
 
1418
        if self.TYPE == 'processes':
 
1419
            msg = latin('This connection uses a normal socket')
 
1420
            address = lconn.recv()
 
1421
            rconn.send((address, msg))
 
1422
            if hasattr(socket, 'fromfd'):
 
1423
                new_conn = lconn.recv()
 
1424
                self.assertEqual(new_conn.recv(100), msg.upper())
 
1425
            else:
 
1426
                # XXX On Windows with Py2.6 need to backport fromfd()
 
1427
                discard = lconn.recv_bytes()
 
1428
 
 
1429
        lconn.send(None)
 
1430
 
 
1431
        rconn.close()
 
1432
        lconn.close()
 
1433
 
 
1434
        lp.join()
 
1435
        rp.join()
 
1436
"""
 
1437
#
 
1438
#
 
1439
#
 
1440
 
 
1441
class _TestHeap(BaseTestCase):
 
1442
 
 
1443
    ALLOWED_TYPES = ('processes',)
 
1444
 
 
1445
    def test_heap(self):
 
1446
        iterations = 5000
 
1447
        maxblocks = 50
 
1448
        blocks = []
 
1449
 
 
1450
        # create and destroy lots of blocks of different sizes
 
1451
        for i in xrange(iterations):
 
1452
            size = int(random.lognormvariate(0, 1) * 1000)
 
1453
            b = multiprocessing.heap.BufferWrapper(size)
 
1454
            blocks.append(b)
 
1455
            if len(blocks) > maxblocks:
 
1456
                i = random.randrange(maxblocks)
 
1457
                del blocks[i]
 
1458
 
 
1459
        # get the heap object
 
1460
        heap = multiprocessing.heap.BufferWrapper._heap
 
1461
 
 
1462
        # verify the state of the heap
 
1463
        all = []
 
1464
        occupied = 0
 
1465
        for L in heap._len_to_seq.values():
 
1466
            for arena, start, stop in L:
 
1467
                all.append((heap._arenas.index(arena), start, stop,
 
1468
                            stop-start, 'free'))
 
1469
        for arena, start, stop in heap._allocated_blocks:
 
1470
            all.append((heap._arenas.index(arena), start, stop,
 
1471
                        stop-start, 'occupied'))
 
1472
            occupied += (stop-start)
 
1473
 
 
1474
        all.sort()
 
1475
 
 
1476
        for i in range(len(all)-1):
 
1477
            (arena, start, stop) = all[i][:3]
 
1478
            (narena, nstart, nstop) = all[i+1][:3]
 
1479
            self.assertTrue((arena != narena and nstart == 0) or
 
1480
                            (stop == nstart))
 
1481
 
 
1482
#
 
1483
#
 
1484
#
 
1485
 
 
1486
try:
 
1487
    from ctypes import Structure, Value, copy, c_int, c_double
 
1488
except ImportError:
 
1489
    Structure = object
 
1490
    c_int = c_double = None
 
1491
 
 
1492
class _Foo(Structure):
 
1493
    _fields_ = [
 
1494
        ('x', c_int),
 
1495
        ('y', c_double)
 
1496
        ]
 
1497
 
 
1498
class _TestSharedCTypes(BaseTestCase):
 
1499
 
 
1500
    ALLOWED_TYPES = ('processes',)
 
1501
 
 
1502
    def _double(self, x, y, foo, arr, string):
 
1503
        x.value *= 2
 
1504
        y.value *= 2
 
1505
        foo.x *= 2
 
1506
        foo.y *= 2
 
1507
        string.value *= 2
 
1508
        for i in range(len(arr)):
 
1509
            arr[i] *= 2
 
1510
 
 
1511
    def test_sharedctypes(self, lock=False):
 
1512
        if c_int is None:
 
1513
            return
 
1514
 
 
1515
        x = Value('i', 7, lock=lock)
 
1516
        y = Value(ctypes.c_double, 1.0/3.0, lock=lock)
 
1517
        foo = Value(_Foo, 3, 2, lock=lock)
 
1518
        arr = Array('d', range(10), lock=lock)
 
1519
        string = Array('c', 20, lock=lock)
 
1520
        string.value = 'hello'
 
1521
 
 
1522
        p = self.Process(target=self._double, args=(x, y, foo, arr, string))
 
1523
        p.start()
 
1524
        p.join()
 
1525
 
 
1526
        self.assertEqual(x.value, 14)
 
1527
        self.assertAlmostEqual(y.value, 2.0/3.0)
 
1528
        self.assertEqual(foo.x, 6)
 
1529
        self.assertAlmostEqual(foo.y, 4.0)
 
1530
        for i in range(10):
 
1531
            self.assertAlmostEqual(arr[i], i*2)
 
1532
        self.assertEqual(string.value, latin('hellohello'))
 
1533
 
 
1534
    def test_synchronize(self):
 
1535
        self.test_sharedctypes(lock=True)
 
1536
 
 
1537
    def test_copy(self):
 
1538
        if c_int is None:
 
1539
            return
 
1540
 
 
1541
        foo = _Foo(2, 5.0)
 
1542
        bar = copy(foo)
 
1543
        foo.x = 0
 
1544
        foo.y = 0
 
1545
        self.assertEqual(bar.x, 2)
 
1546
        self.assertAlmostEqual(bar.y, 5.0)
 
1547
 
 
1548
#
 
1549
#
 
1550
#
 
1551
 
 
1552
class _TestFinalize(BaseTestCase):
 
1553
 
 
1554
    ALLOWED_TYPES = ('processes',)
 
1555
 
 
1556
    def _test_finalize(self, conn):
 
1557
        class Foo(object):
 
1558
            pass
 
1559
 
 
1560
        a = Foo()
 
1561
        util.Finalize(a, conn.send, args=('a',))
 
1562
        del a           # triggers callback for a
 
1563
 
 
1564
        b = Foo()
 
1565
        close_b = util.Finalize(b, conn.send, args=('b',))
 
1566
        close_b()       # triggers callback for b
 
1567
        close_b()       # does nothing because callback has already been called
 
1568
        del b           # does nothing because callback has already been called
 
1569
 
 
1570
        c = Foo()
 
1571
        util.Finalize(c, conn.send, args=('c',))
 
1572
 
 
1573
        d10 = Foo()
 
1574
        util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
 
1575
 
 
1576
        d01 = Foo()
 
1577
        util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
 
1578
        d02 = Foo()
 
1579
        util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
 
1580
        d03 = Foo()
 
1581
        util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
 
1582
 
 
1583
        util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
 
1584
 
 
1585
        util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
 
1586
 
 
1587
        # call mutliprocessing's cleanup function then exit process without
 
1588
        # garbage collecting locals
 
1589
        util._exit_function()
 
1590
        conn.close()
 
1591
        os._exit(0)
 
1592
 
 
1593
    def test_finalize(self):
 
1594
        conn, child_conn = self.Pipe()
 
1595
 
 
1596
        p = self.Process(target=self._test_finalize, args=(child_conn,))
 
1597
        p.start()
 
1598
        p.join()
 
1599
 
 
1600
        result = [obj for obj in iter(conn.recv, 'STOP')]
 
1601
        self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
 
1602
 
 
1603
#
 
1604
# Test that from ... import * works for each module
 
1605
#
 
1606
 
 
1607
class _TestImportStar(BaseTestCase):
 
1608
 
 
1609
    ALLOWED_TYPES = ('processes',)
 
1610
 
 
1611
    def test_import(self):
 
1612
        modules = (
 
1613
            'multiprocessing', 'multiprocessing.connection',
 
1614
            'multiprocessing.heap', 'multiprocessing.managers',
 
1615
            'multiprocessing.pool', 'multiprocessing.process',
 
1616
            'multiprocessing.reduction', 'multiprocessing.sharedctypes',
 
1617
            'multiprocessing.synchronize', 'multiprocessing.util'
 
1618
            )
 
1619
 
 
1620
        for name in modules:
 
1621
            __import__(name)
 
1622
            mod = sys.modules[name]
 
1623
 
 
1624
            for attr in getattr(mod, '__all__', ()):
 
1625
                self.assertTrue(
 
1626
                    hasattr(mod, attr),
 
1627
                    '%r does not have attribute %r' % (mod, attr)
 
1628
                    )
 
1629
 
 
1630
#
 
1631
# Quick test that logging works -- does not test logging output
 
1632
#
 
1633
 
 
1634
class _TestLogging(BaseTestCase):
 
1635
 
 
1636
    ALLOWED_TYPES = ('processes',)
 
1637
 
 
1638
    def test_enable_logging(self):
 
1639
        logger = multiprocessing.get_logger()
 
1640
        logger.setLevel(util.SUBWARNING)
 
1641
        self.assertTrue(logger is not None)
 
1642
        logger.debug('this will not be printed')
 
1643
        logger.info('nor will this')
 
1644
        logger.setLevel(LOG_LEVEL)
 
1645
 
 
1646
    def _test_level(self, conn):
 
1647
        logger = multiprocessing.get_logger()
 
1648
        conn.send(logger.getEffectiveLevel())
 
1649
 
 
1650
    def test_level(self):
 
1651
        LEVEL1 = 32
 
1652
        LEVEL2 = 37
 
1653
 
 
1654
        logger = multiprocessing.get_logger()
 
1655
        root_logger = logging.getLogger()
 
1656
        root_level = root_logger.level
 
1657
 
 
1658
        reader, writer = multiprocessing.Pipe(duplex=False)
 
1659
 
 
1660
        logger.setLevel(LEVEL1)
 
1661
        self.Process(target=self._test_level, args=(writer,)).start()
 
1662
        self.assertEqual(LEVEL1, reader.recv())
 
1663
 
 
1664
        logger.setLevel(logging.NOTSET)
 
1665
        root_logger.setLevel(LEVEL2)
 
1666
        self.Process(target=self._test_level, args=(writer,)).start()
 
1667
        self.assertEqual(LEVEL2, reader.recv())
 
1668
 
 
1669
        root_logger.setLevel(root_level)
 
1670
        logger.setLevel(level=LOG_LEVEL)
 
1671
 
 
1672
#
 
1673
# Functions used to create test cases from the base ones in this module
 
1674
#
 
1675
 
 
1676
def get_attributes(Source, names):
 
1677
    d = {}
 
1678
    for name in names:
 
1679
        obj = getattr(Source, name)
 
1680
        if type(obj) == type(get_attributes):
 
1681
            obj = staticmethod(obj)
 
1682
        d[name] = obj
 
1683
    return d
 
1684
 
 
1685
def create_test_cases(Mixin, type):
 
1686
    result = {}
 
1687
    glob = globals()
 
1688
    Type = type[0].upper() + type[1:]
 
1689
 
 
1690
    for name in glob.keys():
 
1691
        if name.startswith('_Test'):
 
1692
            base = glob[name]
 
1693
            if type in base.ALLOWED_TYPES:
 
1694
                newname = 'With' + Type + name[1:]
 
1695
                class Temp(base, unittest.TestCase, Mixin):
 
1696
                    pass
 
1697
                result[newname] = Temp
 
1698
                Temp.__name__ = newname
 
1699
                Temp.__module__ = Mixin.__module__
 
1700
    return result
 
1701
 
 
1702
#
 
1703
# Create test cases
 
1704
#
 
1705
 
 
1706
class ProcessesMixin(object):
 
1707
    TYPE = 'processes'
 
1708
    Process = multiprocessing.Process
 
1709
    locals().update(get_attributes(multiprocessing, (
 
1710
        'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
 
1711
        'Condition', 'Event', 'Value', 'Array', 'RawValue',
 
1712
        'RawArray', 'current_process', 'active_children', 'Pipe',
 
1713
        'connection', 'JoinableQueue'
 
1714
        )))
 
1715
 
 
1716
testcases_processes = create_test_cases(ProcessesMixin, type='processes')
 
1717
globals().update(testcases_processes)
 
1718
 
 
1719
 
 
1720
class ManagerMixin(object):
 
1721
    TYPE = 'manager'
 
1722
    Process = multiprocessing.Process
 
1723
    manager = object.__new__(multiprocessing.managers.SyncManager)
 
1724
    locals().update(get_attributes(manager, (
 
1725
        'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
 
1726
       'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
 
1727
        'Namespace', 'JoinableQueue'
 
1728
        )))
 
1729
 
 
1730
testcases_manager = create_test_cases(ManagerMixin, type='manager')
 
1731
globals().update(testcases_manager)
 
1732
 
 
1733
 
 
1734
class ThreadsMixin(object):
 
1735
    TYPE = 'threads'
 
1736
    Process = multiprocessing.dummy.Process
 
1737
    locals().update(get_attributes(multiprocessing.dummy, (
 
1738
        'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
 
1739
        'Condition', 'Event', 'Value', 'Array', 'current_process',
 
1740
        'active_children', 'Pipe', 'connection', 'dict', 'list',
 
1741
        'Namespace', 'JoinableQueue'
 
1742
        )))
 
1743
 
 
1744
testcases_threads = create_test_cases(ThreadsMixin, type='threads')
 
1745
globals().update(testcases_threads)
 
1746
 
 
1747
class OtherTest(unittest.TestCase):
 
1748
    # TODO: add more tests for deliver/answer challenge.
 
1749
    def test_deliver_challenge_auth_failure(self):
 
1750
        class _FakeConnection(object):
 
1751
            def recv_bytes(self, size):
 
1752
                return b'something bogus'
 
1753
            def send_bytes(self, data):
 
1754
                pass
 
1755
        self.assertRaises(multiprocessing.AuthenticationError,
 
1756
                          multiprocessing.connection.deliver_challenge,
 
1757
                          _FakeConnection(), b'abc')
 
1758
 
 
1759
    def test_answer_challenge_auth_failure(self):
 
1760
        class _FakeConnection(object):
 
1761
            def __init__(self):
 
1762
                self.count = 0
 
1763
            def recv_bytes(self, size):
 
1764
                self.count += 1
 
1765
                if self.count == 1:
 
1766
                    return multiprocessing.connection.CHALLENGE
 
1767
                elif self.count == 2:
 
1768
                    return b'something bogus'
 
1769
                return b''
 
1770
            def send_bytes(self, data):
 
1771
                pass
 
1772
        self.assertRaises(multiprocessing.AuthenticationError,
 
1773
                          multiprocessing.connection.answer_challenge,
 
1774
                          _FakeConnection(), b'abc')
 
1775
 
 
1776
testcases_other = [OtherTest]
 
1777
 
 
1778
#
 
1779
#
 
1780
#
 
1781
 
 
1782
def test_main(run=None):
 
1783
    if sys.platform.startswith("linux"):
 
1784
        try:
 
1785
            lock = multiprocessing.RLock()
 
1786
        except OSError:
 
1787
            from test.test_support import TestSkipped
 
1788
            raise TestSkipped("OSError raises on RLock creation, see issue 3111!")
 
1789
 
 
1790
    if run is None:
 
1791
        from test.test_support import run_unittest as run
 
1792
 
 
1793
    util.get_temp_dir()     # creates temp directory for use by all processes
 
1794
 
 
1795
    multiprocessing.get_logger().setLevel(LOG_LEVEL)
 
1796
 
 
1797
    ProcessesMixin.pool = multiprocessing.Pool(4)
 
1798
    ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
 
1799
    ManagerMixin.manager.__init__()
 
1800
    ManagerMixin.manager.start()
 
1801
    ManagerMixin.pool = ManagerMixin.manager.Pool(4)
 
1802
 
 
1803
    testcases = (
 
1804
        sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
 
1805
        sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
 
1806
        sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
 
1807
        testcases_other
 
1808
        )
 
1809
 
 
1810
    loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
 
1811
    suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
 
1812
    run(suite)
 
1813
 
 
1814
    ThreadsMixin.pool.terminate()
 
1815
    ProcessesMixin.pool.terminate()
 
1816
    ManagerMixin.pool.terminate()
 
1817
    ManagerMixin.manager.shutdown()
 
1818
 
 
1819
    del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
 
1820
 
 
1821
def main():
 
1822
    test_main(unittest.TextTestRunner(verbosity=2).run)
 
1823
 
 
1824
if __name__ == '__main__':
 
1825
    main()