~pythonregexp2.7/python/issue2636-18

« back to all changes in this revision

Viewing changes to Doc/library/multiprocessing.rst

  • Committer: Jeffrey C. "The TimeHorse" Jacobs
  • Date: 2008-09-21 13:47:31 UTC
  • mfrom: (39021.1.404 Regexp-2.7)
  • Revision ID: darklord@timehorse.com-20080921134731-rudomuzeh1b2tz1y
Merged in changes from the latest python source snapshot.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
:mod:`multiprocessing` --- Process-based "threading" interface
 
2
==============================================================
 
3
 
 
4
.. module:: multiprocessing
 
5
   :synopsis: Process-based "threading" interface.
 
6
 
 
7
.. versionadded:: 2.6
 
8
 
 
9
 
 
10
Introduction
 
11
----------------------
 
12
 
 
13
:mod:`multiprocessing` is a package that supports spawning processes using an
 
14
API similar to the :mod:`threading` module.  The :mod:`multiprocessing` package
 
15
offers both local and remote concurrency, effectively side-stepping the
 
16
:term:`Global Interpreter Lock` by using subprocesses instead of threads.  Due
 
17
to this, the :mod:`multiprocessing` module allows the programmer to fully
 
18
leverage multiple processors on a given machine.  It runs on both Unix and
 
19
Windows.
 
20
 
 
21
 
 
22
The :class:`Process` class
 
23
~~~~~~~~~~~~~~~~~~~~~~~~~~
 
24
 
 
25
In :mod:`multiprocessing`, processes are spawned by creating a :class:`Process`
 
26
object and then calling its :meth:`~Process.start` method.  :class:`Process`
 
27
follows the API of :class:`threading.Thread`.  A trivial example of a
 
28
multiprocess program is ::
 
29
 
 
30
   from multiprocessing import Process
 
31
 
 
32
   def f(name):
 
33
       print 'hello', name
 
34
 
 
35
   if __name__ == '__main__':
 
36
       p = Process(target=f, args=('bob',))
 
37
       p.start()
 
38
       p.join()
 
39
 
 
40
Here the function ``f`` is run in a child process.
 
41
 
 
42
For an explanation of why (on Windows) the ``if __name__ == '__main__'`` part is
 
43
necessary, see :ref:`multiprocessing-programming`.
 
44
 
 
45
 
 
46
 
 
47
Exchanging objects between processes
 
48
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 
49
 
 
50
:mod:`multiprocessing` supports two types of communication channel between
 
51
processes:
 
52
 
 
53
**Queues**
 
54
 
 
55
   The :class:`Queue` class is a near clone of :class:`Queue.Queue`.  For
 
56
   example::
 
57
 
 
58
      from multiprocessing import Process, Queue
 
59
 
 
60
      def f(q):
 
61
          q.put([42, None, 'hello'])
 
62
 
 
63
       if __name__ == '__main__':
 
64
           q = Queue()
 
65
           p = Process(target=f, args=(q,))
 
66
           p.start()
 
67
           print q.get()    # prints "[42, None, 'hello']"
 
68
           p.join()
 
69
 
 
70
   Queues are thread and process safe.
 
71
 
 
72
**Pipes**
 
73
 
 
74
   The :func:`Pipe` function returns a pair of connection objects connected by a
 
75
   pipe which by default is duplex (two-way).  For example::
 
76
 
 
77
      from multiprocessing import Process, Pipe
 
78
 
 
79
      def f(conn):
 
80
          conn.send([42, None, 'hello'])
 
81
          conn.close()
 
82
 
 
83
      if __name__ == '__main__':
 
84
          parent_conn, child_conn = Pipe()
 
85
          p = Process(target=f, args=(child_conn,))
 
86
          p.start()
 
87
          print parent_conn.recv()   # prints "[42, None, 'hello']"
 
88
          p.join()
 
89
 
 
90
   The two connection objects returned by :func:`Pipe` represent the two ends of
 
91
   the pipe.  Each connection object has :meth:`~Connection.send` and
 
92
   :meth:`~Connection.recv` methods (among others).  Note that data in a pipe
 
93
   may become corrupted if two processes (or threads) try to read from or write
 
94
   to the *same* end of the pipe at the same time.  Of course there is no risk
 
95
   of corruption from processes using different ends of the pipe at the same
 
96
   time.
 
97
 
 
98
 
 
99
Synchronization between processes
 
100
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 
101
 
 
102
:mod:`multiprocessing` contains equivalents of all the synchronization
 
103
primitives from :mod:`threading`.  For instance one can use a lock to ensure
 
104
that only one process prints to standard output at a time::
 
105
 
 
106
   from multiprocessing import Process, Lock
 
107
 
 
108
   def f(l, i):
 
109
       l.acquire()
 
110
       print 'hello world', i
 
111
       l.release()
 
112
 
 
113
   if __name__ == '__main__':
 
114
       lock = Lock()
 
115
 
 
116
       for num in range(10):
 
117
           Process(target=f, args=(lock, num)).start()
 
118
 
 
119
Without using the lock output from the different processes is liable to get all
 
120
mixed up.
 
121
 
 
122
 
 
123
Sharing state between processes
 
124
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 
125
 
 
126
As mentioned above, when doing concurrent programming it is usually best to
 
127
avoid using shared state as far as possible.  This is particularly true when
 
128
using multiple processes.
 
129
 
 
130
However, if you really do need to use some shared data then
 
131
:mod:`multiprocessing` provides a couple of ways of doing so.
 
132
 
 
133
**Shared memory**
 
134
 
 
135
   Data can be stored in a shared memory map using :class:`Value` or
 
136
   :class:`Array`.  For example, the following code ::
 
137
 
 
138
      from multiprocessing import Process, Value, Array
 
139
 
 
140
      def f(n, a):
 
141
          n.value = 3.1415927
 
142
          for i in range(len(a)):
 
143
              a[i] = -a[i]
 
144
 
 
145
      if __name__ == '__main__':
 
146
          num = Value('d', 0.0)
 
147
          arr = Array('i', range(10))
 
148
 
 
149
          p = Process(target=f, args=(num, arr))
 
150
          p.start()
 
151
          p.join()
 
152
 
 
153
          print num.value
 
154
          print arr[:]
 
155
 
 
156
   will print ::
 
157
 
 
158
      3.1415927
 
159
      [0, -1, -2, -3, -4, -5, -6, -7, -8, -9]
 
160
 
 
161
   The ``'d'`` and ``'i'`` arguments used when creating ``num`` and ``arr`` are
 
162
   typecodes of the kind used by the :mod:`array` module: ``'d'`` indicates a
 
163
   double precision float and ``'i'`` indicates a signed integer.  These shared
 
164
   objects will be process and thread safe.
 
165
 
 
166
   For more flexibility in using shared memory one can use the
 
167
   :mod:`multiprocessing.sharedctypes` module which supports the creation of
 
168
   arbitrary ctypes objects allocated from shared memory.
 
169
 
 
170
**Server process**
 
171
 
 
172
   A manager object returned by :func:`Manager` controls a server process which
 
173
   holds Python objects and allows other processes to manipulate them using
 
174
   proxies.
 
175
 
 
176
   A manager returned by :func:`Manager` will support types :class:`list`,
 
177
   :class:`dict`, :class:`Namespace`, :class:`Lock`, :class:`RLock`,
 
178
   :class:`Semaphore`, :class:`BoundedSemaphore`, :class:`Condition`,
 
179
   :class:`Event`, :class:`Queue`, :class:`Value` and :class:`Array`.  For
 
180
   example, ::
 
181
 
 
182
      from multiprocessing import Process, Manager
 
183
 
 
184
      def f(d, l):
 
185
          d[1] = '1'
 
186
          d['2'] = 2
 
187
          d[0.25] = None
 
188
          l.reverse()
 
189
 
 
190
      if __name__ == '__main__':
 
191
          manager = Manager()
 
192
 
 
193
          d = manager.dict()
 
194
          l = manager.list(range(10))
 
195
 
 
196
          p = Process(target=f, args=(d, l))
 
197
          p.start()
 
198
          p.join()
 
199
 
 
200
          print d
 
201
          print l
 
202
 
 
203
   will print ::
 
204
 
 
205
       {0.25: None, 1: '1', '2': 2}
 
206
       [9, 8, 7, 6, 5, 4, 3, 2, 1, 0]
 
207
 
 
208
   Server process managers are more flexible than using shared memory objects
 
209
   because they can be made to support arbitrary object types.  Also, a single
 
210
   manager can be shared by processes on different computers over a network.
 
211
   They are, however, slower than using shared memory.
 
212
 
 
213
 
 
214
Using a pool of workers
 
215
~~~~~~~~~~~~~~~~~~~~~~~
 
216
 
 
217
The :class:`~multiprocessing.pool.Pool` class represents a pool of worker
 
218
processes.  It has methods which allows tasks to be offloaded to the worker
 
219
processes in a few different ways.
 
220
 
 
221
For example::
 
222
 
 
223
   from multiprocessing import Pool
 
224
 
 
225
   def f(x):
 
226
       return x*x
 
227
 
 
228
   if __name__ == '__main__':
 
229
       pool = Pool(processes=4)              # start 4 worker processes
 
230
       result = pool.applyAsync(f, [10])     # evaluate "f(10)" asynchronously
 
231
       print result.get(timeout=1)           # prints "100" unless your computer is *very* slow
 
232
       print pool.map(f, range(10))          # prints "[0, 1, 4,..., 81]"
 
233
 
 
234
 
 
235
Reference
 
236
---------
 
237
 
 
238
The :mod:`multiprocessing` package mostly replicates the API of the
 
239
:mod:`threading` module.
 
240
 
 
241
 
 
242
:class:`Process` and exceptions
 
243
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 
244
 
 
245
.. class:: Process([group[, target[, name[, args[, kwargs]]]]])
 
246
 
 
247
   Process objects represent activity that is run in a separate process. The
 
248
   :class:`Process` class has equivalents of all the methods of
 
249
   :class:`threading.Thread`.
 
250
 
 
251
   The constructor should always be called with keyword arguments. *group*
 
252
   should always be ``None``; it exists solely for compatibility with
 
253
   :class:`threading.Thread`.  *target* is the callable object to be invoked by
 
254
   the :meth:`run()` method.  It defaults to ``None``, meaning nothing is
 
255
   called. *name* is the process name.  By default, a unique name is constructed
 
256
   of the form 'Process-N\ :sub:`1`:N\ :sub:`2`:...:N\ :sub:`k`' where N\
 
257
   :sub:`1`,N\ :sub:`2`,...,N\ :sub:`k` is a sequence of integers whose length
 
258
   is determined by the *generation* of the process.  *args* is the argument
 
259
   tuple for the target invocation.  *kwargs* is a dictionary of keyword
 
260
   arguments for the target invocation.  By default, no arguments are passed to
 
261
   *target*.
 
262
 
 
263
   If a subclass overrides the constructor, it must make sure it invokes the
 
264
   base class constructor (:meth:`Process.__init__`) before doing anything else
 
265
   to the process.
 
266
 
 
267
   .. method:: run()
 
268
 
 
269
      Method representing the process's activity.
 
270
 
 
271
      You may override this method in a subclass.  The standard :meth:`run`
 
272
      method invokes the callable object passed to the object's constructor as
 
273
      the target argument, if any, with sequential and keyword arguments taken
 
274
      from the *args* and *kwargs* arguments, respectively.
 
275
 
 
276
   .. method:: start()
 
277
 
 
278
      Start the process's activity.
 
279
 
 
280
      This must be called at most once per process object.  It arranges for the
 
281
      object's :meth:`run` method to be invoked in a separate process.
 
282
 
 
283
   .. method:: join([timeout])
 
284
 
 
285
      Block the calling thread until the process whose :meth:`join` method is
 
286
      called terminates or until the optional timeout occurs.
 
287
 
 
288
      If *timeout* is ``None`` then there is no timeout.
 
289
 
 
290
      A process can be joined many times.
 
291
 
 
292
      A process cannot join itself because this would cause a deadlock.  It is
 
293
      an error to attempt to join a process before it has been started.
 
294
 
 
295
   .. attribute:: name
 
296
 
 
297
      The process's name.
 
298
 
 
299
      The name is a string used for identification purposes only.  It has no
 
300
      semantics.  Multiple processes may be given the same name.  The initial
 
301
      name is set by the constructor.
 
302
 
 
303
   .. method:: is_alive()
 
304
 
 
305
      Return whether the process is alive.
 
306
 
 
307
      Roughly, a process object is alive from the moment the :meth:`start`
 
308
      method returns until the child process terminates.
 
309
 
 
310
   .. attribute:: daemon
 
311
 
 
312
      The process's daemon flag, a Boolean value.  This must be called before
 
313
      :meth:`start` is called.
 
314
 
 
315
      The initial value is inherited from the creating process.
 
316
 
 
317
      When a process exits, it attempts to terminate all of its daemonic child
 
318
      processes.
 
319
 
 
320
      Note that a daemonic process is not allowed to create child processes.
 
321
      Otherwise a daemonic process would leave its children orphaned if it gets
 
322
      terminated when its parent process exits.
 
323
 
 
324
   In addition to the  :class:`Threading.Thread` API, :class:`Process` objects
 
325
   also support the following attributes and methods:
 
326
 
 
327
   .. attribute:: pid
 
328
 
 
329
      Return the process ID.  Before the process is spawned, this will be
 
330
      ``None``.
 
331
 
 
332
   .. attribute:: exitcode
 
333
 
 
334
      The child's exit code.  This will be ``None`` if the process has not yet
 
335
      terminated.  A negative value *-N* indicates that the child was terminated
 
336
      by signal *N*.
 
337
 
 
338
   .. attribute:: authkey
 
339
 
 
340
      The process's authentication key (a byte string).
 
341
 
 
342
      When :mod:`multiprocessing` is initialized the main process is assigned a
 
343
      random string using :func:`os.random`.
 
344
 
 
345
      When a :class:`Process` object is created, it will inherit the
 
346
      authentication key of its parent process, although this may be changed by
 
347
      setting :attr:`authkey` to another byte string.
 
348
 
 
349
      See :ref:`multiprocessing-auth-keys`.
 
350
 
 
351
   .. method:: terminate()
 
352
 
 
353
      Terminate the process.  On Unix this is done using the ``SIGTERM`` signal;
 
354
      on Windows :cfunc:`TerminateProcess` is used.  Note that exit handlers and
 
355
      finally clauses, etc., will not be executed.
 
356
 
 
357
      Note that descendant processes of the process will *not* be terminated --
 
358
      they will simply become orphaned.
 
359
 
 
360
      .. warning::
 
361
 
 
362
         If this method is used when the associated process is using a pipe or
 
363
         queue then the pipe or queue is liable to become corrupted and may
 
364
         become unusable by other process.  Similarly, if the process has
 
365
         acquired a lock or semaphore etc. then terminating it is liable to
 
366
         cause other processes to deadlock.
 
367
 
 
368
   Note that the :meth:`start`, :meth:`join`, :meth:`is_alive` and
 
369
   :attr:`exit_code` methods should only be called by the process that created
 
370
   the process object.
 
371
 
 
372
   Example usage of some of the methods of :class:`Process`::
 
373
 
 
374
       >>> import processing, time, signal
 
375
       >>> p = processing.Process(target=time.sleep, args=(1000,))
 
376
       >>> print p, p.is_alive()
 
377
       <Process(Process-1, initial)> False
 
378
       >>> p.start()
 
379
       >>> print p, p.is_alive()
 
380
       <Process(Process-1, started)> True
 
381
       >>> p.terminate()
 
382
       >>> print p, p.is_alive()
 
383
       <Process(Process-1, stopped[SIGTERM])> False
 
384
       >>> p.exitcode == -signal.SIGTERM
 
385
       True
 
386
 
 
387
 
 
388
.. exception:: BufferTooShort
 
389
 
 
390
   Exception raised by :meth:`Connection.recv_bytes_into()` when the supplied
 
391
   buffer object is too small for the message read.
 
392
 
 
393
   If ``e`` is an instance of :exc:`BufferTooShort` then ``e.args[0]`` will give
 
394
   the message as a byte string.
 
395
 
 
396
 
 
397
Pipes and Queues
 
398
~~~~~~~~~~~~~~~~
 
399
 
 
400
When using multiple processes, one generally uses message passing for
 
401
communication between processes and avoids having to use any synchronization
 
402
primitives like locks.
 
403
 
 
404
For passing messages one can use :func:`Pipe` (for a connection between two
 
405
processes) or a queue (which allows multiple producers and consumers).
 
406
 
 
407
The :class:`Queue` and :class:`JoinableQueue` types are multi-producer,
 
408
multi-consumer FIFO queues modelled on the :class:`Queue.Queue` class in the
 
409
standard library.  They differ in that :class:`Queue` lacks the
 
410
:meth:`~Queue.Queue.task_done` and :meth:`~Queue.Queue.join` methods introduced
 
411
into Python 2.5's :class:`Queue.Queue` class.
 
412
 
 
413
If you use :class:`JoinableQueue` then you **must** call
 
414
:meth:`JoinableQueue.task_done` for each task removed from the queue or else the
 
415
semaphore used to count the number of unfinished tasks may eventually overflow
 
416
raising an exception.
 
417
 
 
418
Note that one can also create a shared queue by using a manager object -- see
 
419
:ref:`multiprocessing-managers`.
 
420
 
 
421
.. note::
 
422
 
 
423
   :mod:`multiprocessing` uses the usual :exc:`Queue.Empty` and
 
424
   :exc:`Queue.Full` exceptions to signal a timeout.  They are not available in
 
425
   the :mod:`multiprocessing` namespace so you need to import them from
 
426
   :mod:`Queue`.
 
427
 
 
428
 
 
429
.. warning::
 
430
 
 
431
   If a process is killed using :meth:`Process.terminate` or :func:`os.kill`
 
432
   while it is trying to use a :class:`Queue`, then the data in the queue is
 
433
   likely to become corrupted.  This may cause any other processes to get an
 
434
   exception when it tries to use the queue later on.
 
435
 
 
436
.. warning::
 
437
 
 
438
   As mentioned above, if a child process has put items on a queue (and it has
 
439
   not used :meth:`JoinableQueue.cancel_join_thread`), then that process will
 
440
   not terminate until all buffered items have been flushed to the pipe.
 
441
 
 
442
   This means that if you try joining that process you may get a deadlock unless
 
443
   you are sure that all items which have been put on the queue have been
 
444
   consumed.  Similarly, if the child process is non-daemonic then the parent
 
445
   process may hang on exit when it tries to join all its non-daemonic children.
 
446
 
 
447
   Note that a queue created using a manager does not have this issue.  See
 
448
   :ref:`multiprocessing-programming`.
 
449
 
 
450
For an example of the usage of queues for interprocess communication see
 
451
:ref:`multiprocessing-examples`.
 
452
 
 
453
 
 
454
.. function:: Pipe([duplex])
 
455
 
 
456
   Returns a pair ``(conn1, conn2)`` of :class:`Connection` objects representing
 
457
   the ends of a pipe.
 
458
 
 
459
   If *duplex* is ``True`` (the default) then the pipe is bidirectional.  If
 
460
   *duplex* is ``False`` then the pipe is unidirectional: ``conn1`` can only be
 
461
   used for receiving messages and ``conn2`` can only be used for sending
 
462
   messages.
 
463
 
 
464
 
 
465
.. class:: Queue([maxsize])
 
466
 
 
467
   Returns a process shared queue implemented using a pipe and a few
 
468
   locks/semaphores.  When a process first puts an item on the queue a feeder
 
469
   thread is started which transfers objects from a buffer into the pipe.
 
470
 
 
471
   The usual :exc:`Queue.Empty` and :exc:`Queue.Full` exceptions from the
 
472
   standard library's :mod:`Queue` module are raised to signal timeouts.
 
473
 
 
474
   :class:`Queue` implements all the methods of :class:`Queue.Queue` except for
 
475
   :meth:`~Queue.Queue.task_done` and :meth:`~Queue.Queue.join`.
 
476
 
 
477
   .. method:: qsize()
 
478
 
 
479
      Return the approximate size of the queue.  Because of
 
480
      multithreading/multiprocessing semantics, this number is not reliable.
 
481
 
 
482
      Note that this may raise :exc:`NotImplementedError` on Unix platforms like
 
483
      Mac OS X where ``sem_getvalue()`` is not implemented.
 
484
 
 
485
   .. method:: empty()
 
486
 
 
487
      Return ``True`` if the queue is empty, ``False`` otherwise.  Because of
 
488
      multithreading/multiprocessing semantics, this is not reliable.
 
489
 
 
490
   .. method:: full()
 
491
 
 
492
      Return ``True`` if the queue is full, ``False`` otherwise.  Because of
 
493
      multithreading/multiprocessing semantics, this is not reliable.
 
494
 
 
495
   .. method:: put(item[, block[, timeout]])
 
496
 
 
497
      Put item into the queue.  If the optional argument *block* is ``True`` 
 
498
      (the default) and *timeout* is ``None`` (the default), block if necessary until
 
499
      a free slot is available.  If *timeout* is a positive number, it blocks at
 
500
      most *timeout* seconds and raises the :exc:`Queue.Full` exception if no
 
501
      free slot was available within that time.  Otherwise (*block* is
 
502
      ``False``), put an item on the queue if a free slot is immediately
 
503
      available, else raise the :exc:`Queue.Full` exception (*timeout* is
 
504
      ignored in that case).
 
505
 
 
506
   .. method:: put_nowait(item)
 
507
 
 
508
      Equivalent to ``put(item, False)``.
 
509
 
 
510
   .. method:: get([block[, timeout]])
 
511
 
 
512
      Remove and return an item from the queue.  If optional args *block* is
 
513
      ``True`` (the default) and *timeout* is ``None`` (the default), block if
 
514
      necessary until an item is available.  If *timeout* is a positive number,
 
515
      it blocks at most *timeout* seconds and raises the :exc:`Queue.Empty`
 
516
      exception if no item was available within that time.  Otherwise (block is
 
517
      ``False``), return an item if one is immediately available, else raise the
 
518
      :exc:`Queue.Empty` exception (*timeout* is ignored in that case).
 
519
 
 
520
   .. method:: get_nowait()
 
521
               get_no_wait()
 
522
 
 
523
      Equivalent to ``get(False)``.
 
524
 
 
525
   :class:`multiprocessing.Queue` has a few additional methods not found in
 
526
   :class:`Queue.Queue`.  These methods are usually unnecessary for most
 
527
   code:
 
528
 
 
529
   .. method:: close()
 
530
 
 
531
      Indicate that no more data will be put on this queue by the current
 
532
      process.  The background thread will quit once it has flushed all buffered
 
533
      data to the pipe.  This is called automatically when the queue is garbage
 
534
      collected.
 
535
 
 
536
   .. method:: join_thread()
 
537
 
 
538
      Join the background thread.  This can only be used after :meth:`close` has
 
539
      been called.  It blocks until the background thread exits, ensuring that
 
540
      all data in the buffer has been flushed to the pipe.
 
541
 
 
542
      By default if a process is not the creator of the queue then on exit it
 
543
      will attempt to join the queue's background thread.  The process can call
 
544
      :meth:`cancel_join_thread` to make :meth:`join_thread` do nothing.
 
545
 
 
546
   .. method:: cancel_join_thread()
 
547
 
 
548
      Prevent :meth:`join_thread` from blocking.  In particular, this prevents
 
549
      the background thread from being joined automatically when the process
 
550
      exits -- see :meth:`join_thread`.
 
551
 
 
552
 
 
553
.. class:: JoinableQueue([maxsize])
 
554
 
 
555
   :class:`JoinableQueue`, a :class:`Queue` subclass, is a queue which
 
556
   additionally has :meth:`task_done` and :meth:`join` methods.
 
557
 
 
558
   .. method:: task_done()
 
559
 
 
560
      Indicate that a formerly enqueued task is complete. Used by queue consumer
 
561
      threads.  For each :meth:`~Queue.get` used to fetch a task, a subsequent
 
562
      call to :meth:`task_done` tells the queue that the processing on the task
 
563
      is complete.
 
564
 
 
565
      If a :meth:`~Queue.join` is currently blocking, it will resume when all
 
566
      items have been processed (meaning that a :meth:`task_done` call was
 
567
      received for every item that had been :meth:`~Queue.put` into the queue).
 
568
 
 
569
      Raises a :exc:`ValueError` if called more times than there were items
 
570
      placed in the queue.
 
571
 
 
572
 
 
573
   .. method:: join()
 
574
 
 
575
      Block until all items in the queue have been gotten and processed.
 
576
 
 
577
      The count of unfinished tasks goes up whenever an item is added to the
 
578
      queue.  The count goes down whenever a consumer thread calls
 
579
      :meth:`task_done` to indicate that the item was retrieved and all work on
 
580
      it is complete.  When the count of unfinished tasks drops to zero,
 
581
      :meth:`~Queue.join` unblocks.
 
582
 
 
583
 
 
584
Miscellaneous
 
585
~~~~~~~~~~~~~
 
586
 
 
587
.. function:: active_children()
 
588
 
 
589
   Return list of all live children of the current process.
 
590
 
 
591
   Calling this has the side affect of "joining" any processes which have
 
592
   already finished.
 
593
 
 
594
.. function:: cpu_count()
 
595
 
 
596
   Return the number of CPUs in the system.  May raise
 
597
   :exc:`NotImplementedError`.
 
598
 
 
599
.. function:: current_process()
 
600
 
 
601
   Return the :class:`Process` object corresponding to the current process.
 
602
 
 
603
   An analogue of :func:`threading.current_thread`.
 
604
 
 
605
.. function:: freeze_support()
 
606
 
 
607
   Add support for when a program which uses :mod:`multiprocessing` has been
 
608
   frozen to produce a Windows executable.  (Has been tested with **py2exe**,
 
609
   **PyInstaller** and **cx_Freeze**.)
 
610
 
 
611
   One needs to call this function straight after the ``if __name__ ==
 
612
   '__main__'`` line of the main module.  For example::
 
613
 
 
614
      from multiprocessing import Process, freeze_support
 
615
 
 
616
      def f():
 
617
          print 'hello world!'
 
618
 
 
619
      if __name__ == '__main__':
 
620
          freeze_support()
 
621
          Process(target=f).start()
 
622
 
 
623
   If the ``freeze_support()`` line is missed out then trying to run the frozen
 
624
   executable will raise :exc:`RuntimeError`.
 
625
 
 
626
   If the module is being run normally by the Python interpreter then
 
627
   :func:`freeze_support` has no effect.
 
628
 
 
629
.. function:: set_executable()
 
630
 
 
631
   Sets the path of the python interpreter to use when starting a child process.
 
632
   (By default :data:`sys.executable` is used).  Embedders will probably need to
 
633
   do some thing like ::
 
634
 
 
635
      setExecutable(os.path.join(sys.exec_prefix, 'pythonw.exe'))
 
636
 
 
637
    before they can create child processes.  (Windows only)
 
638
 
 
639
 
 
640
.. note::
 
641
 
 
642
   :mod:`multiprocessing` contains no analogues of
 
643
   :func:`threading.active_count`, :func:`threading.enumerate`,
 
644
   :func:`threading.settrace`, :func:`threading.setprofile`,
 
645
   :class:`threading.Timer`, or :class:`threading.local`.
 
646
 
 
647
 
 
648
Connection Objects
 
649
~~~~~~~~~~~~~~~~~~
 
650
 
 
651
Connection objects allow the sending and receiving of picklable objects or
 
652
strings.  They can be thought of as message oriented connected sockets.
 
653
 
 
654
Connection objects usually created using :func:`Pipe` -- see also
 
655
:ref:`multiprocessing-listeners-clients`.
 
656
 
 
657
.. class:: Connection
 
658
 
 
659
   .. method:: send(obj)
 
660
 
 
661
      Send an object to the other end of the connection which should be read
 
662
      using :meth:`recv`.
 
663
 
 
664
      The object must be picklable.
 
665
 
 
666
   .. method:: recv()
 
667
 
 
668
      Return an object sent from the other end of the connection using
 
669
      :meth:`send`.  Raises :exc:`EOFError` if there is nothing left to receive
 
670
      and the other end was closed.
 
671
 
 
672
   .. method:: fileno()
 
673
 
 
674
      Returns the file descriptor or handle used by the connection.
 
675
 
 
676
   .. method:: close()
 
677
 
 
678
      Close the connection.
 
679
 
 
680
      This is called automatically when the connection is garbage collected.
 
681
 
 
682
   .. method:: poll([timeout])
 
683
 
 
684
      Return whether there is any data available to be read.
 
685
 
 
686
      If *timeout* is not specified then it will return immediately.  If
 
687
      *timeout* is a number then this specifies the maximum time in seconds to
 
688
      block.  If *timeout* is ``None`` then an infinite timeout is used.
 
689
 
 
690
   .. method:: send_bytes(buffer[, offset[, size]])
 
691
 
 
692
      Send byte data from an object supporting the buffer interface as a
 
693
      complete message.
 
694
 
 
695
      If *offset* is given then data is read from that position in *buffer*.  If
 
696
      *size* is given then that many bytes will be read from buffer.
 
697
 
 
698
   .. method:: recv_bytes([maxlength])
 
699
 
 
700
      Return a complete message of byte data sent from the other end of the
 
701
      connection as a string.  Raises :exc:`EOFError` if there is nothing left
 
702
      to receive and the other end has closed.
 
703
 
 
704
      If *maxlength* is specified and the message is longer than *maxlength*
 
705
      then :exc:`IOError` is raised and the connection will no longer be
 
706
      readable.
 
707
 
 
708
   .. method:: recv_bytes_into(buffer[, offset])
 
709
 
 
710
      Read into *buffer* a complete message of byte data sent from the other end
 
711
      of the connection and return the number of bytes in the message.  Raises
 
712
      :exc:`EOFError` if there is nothing left to receive and the other end was
 
713
      closed.
 
714
 
 
715
      *buffer* must be an object satisfying the writable buffer interface.  If
 
716
      *offset* is given then the message will be written into the buffer from
 
717
      *that position.  Offset must be a non-negative integer less than the
 
718
      *length of *buffer* (in bytes).
 
719
 
 
720
      If the buffer is too short then a :exc:`BufferTooShort` exception is
 
721
      raised and the complete message is available as ``e.args[0]`` where ``e``
 
722
      is the exception instance.
 
723
 
 
724
 
 
725
For example:
 
726
 
 
727
    >>> from multiprocessing import Pipe
 
728
    >>> a, b = Pipe()
 
729
    >>> a.send([1, 'hello', None])
 
730
    >>> b.recv()
 
731
    [1, 'hello', None]
 
732
    >>> b.send_bytes('thank you')
 
733
    >>> a.recv_bytes()
 
734
    'thank you'
 
735
    >>> import array
 
736
    >>> arr1 = array.array('i', range(5))
 
737
    >>> arr2 = array.array('i', [0] * 10)
 
738
    >>> a.send_bytes(arr1)
 
739
    >>> count = b.recv_bytes_into(arr2)
 
740
    >>> assert count == len(arr1) * arr1.itemsize
 
741
    >>> arr2
 
742
    array('i', [0, 1, 2, 3, 4, 0, 0, 0, 0, 0])
 
743
 
 
744
 
 
745
.. warning::
 
746
 
 
747
    The :meth:`Connection.recv` method automatically unpickles the data it
 
748
    receives, which can be a security risk unless you can trust the process
 
749
    which sent the message.
 
750
 
 
751
    Therefore, unless the connection object was produced using :func:`Pipe` you
 
752
    should only use the :meth:`~Connection.recv` and :meth:`~Connection.send`
 
753
    methods after performing some sort of authentication.  See
 
754
    :ref:`multiprocessing-auth-keys`.
 
755
 
 
756
.. warning::
 
757
 
 
758
    If a process is killed while it is trying to read or write to a pipe then
 
759
    the data in the pipe is likely to become corrupted, because it may become
 
760
    impossible to be sure where the message boundaries lie.
 
761
 
 
762
 
 
763
Synchronization primitives
 
764
~~~~~~~~~~~~~~~~~~~~~~~~~~
 
765
 
 
766
Generally synchronization primitives are not as necessary in a multiprocess
 
767
program as they are in a multithreaded program.  See the documentation for
 
768
:mod:`threading` module.
 
769
 
 
770
Note that one can also create synchronization primitives by using a manager
 
771
object -- see :ref:`multiprocessing-managers`.
 
772
 
 
773
.. class:: BoundedSemaphore([value])
 
774
 
 
775
   A bounded semaphore object: a clone of :class:`threading.BoundedSemaphore`.
 
776
 
 
777
   (On Mac OS X this is indistinguishable from :class:`Semaphore` because
 
778
   ``sem_getvalue()`` is not implemented on that platform).
 
779
 
 
780
.. class:: Condition([lock])
 
781
 
 
782
   A condition variable: a clone of :class:`threading.Condition`.
 
783
 
 
784
   If *lock* is specified then it should be a :class:`Lock` or :class:`RLock`
 
785
   object from :mod:`multiprocessing`.
 
786
 
 
787
.. class:: Event()
 
788
 
 
789
   A clone of :class:`threading.Event`.
 
790
 
 
791
.. class:: Lock()
 
792
 
 
793
   A non-recursive lock object: a clone of :class:`threading.Lock`.
 
794
 
 
795
.. class:: RLock()
 
796
 
 
797
   A recursive lock object: a clone of :class:`threading.RLock`.
 
798
 
 
799
.. class:: Semaphore([value])
 
800
 
 
801
   A bounded semaphore object: a clone of :class:`threading.Semaphore`.
 
802
 
 
803
.. note::
 
804
 
 
805
   The :meth:`acquire` method of :class:`BoundedSemaphore`, :class:`Lock`,
 
806
   :class:`RLock` and :class:`Semaphore` has a timeout parameter not supported
 
807
   by the equivalents in :mod:`threading`.  The signature is
 
808
   ``acquire(block=True, timeout=None)`` with keyword parameters being
 
809
   acceptable.  If *block* is ``True`` and *timeout* is not ``None`` then it
 
810
   specifies a timeout in seconds.  If *block* is ``False`` then *timeout* is
 
811
   ignored.
 
812
 
 
813
.. note::
 
814
 
 
815
   If the SIGINT signal generated by Ctrl-C arrives while the main thread is
 
816
   blocked by a call to :meth:`BoundedSemaphore.acquire`, :meth:`Lock.acquire`,
 
817
   :meth:`RLock.acquire`, :meth:`Semaphore.acquire`, :meth:`Condition.acquire`
 
818
   or :meth:`Condition.wait` then the call will be immediately interrupted and
 
819
   :exc:`KeyboardInterrupt` will be raised.
 
820
 
 
821
   This differs from the behaviour of :mod:`threading` where SIGINT will be
 
822
   ignored while the equivalent blocking calls are in progress.
 
823
 
 
824
 
 
825
Shared :mod:`ctypes` Objects
 
826
~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 
827
 
 
828
It is possible to create shared objects using shared memory which can be
 
829
inherited by child processes.
 
830
 
 
831
.. function:: Value(typecode_or_type[, *args, lock]])
 
832
 
 
833
   Return a :mod:`ctypes` object allocated from shared memory.  By default the
 
834
   return value is actually a synchronized wrapper for the object.
 
835
 
 
836
   *typecode_or_type* determines the type of the returned object: it is either a
 
837
   ctypes type or a one character typecode of the kind used by the :mod:`array`
 
838
   module.  *\*args* is passed on to the constructor for the type.
 
839
 
 
840
   If *lock* is ``True`` (the default) then a new lock object is created to
 
841
   synchronize access to the value.  If *lock* is a :class:`Lock` or
 
842
   :class:`RLock` object then that will be used to synchronize access to the
 
843
   value.  If *lock* is ``False`` then access to the returned object will not be
 
844
   automatically protected by a lock, so it will not necessarily be
 
845
   "process-safe".
 
846
 
 
847
   Note that *lock* is a keyword-only argument.
 
848
 
 
849
.. function:: Array(typecode_or_type, size_or_initializer, *, lock=True)
 
850
 
 
851
   Return a ctypes array allocated from shared memory.  By default the return
 
852
   value is actually a synchronized wrapper for the array.
 
853
 
 
854
   *typecode_or_type* determines the type of the elements of the returned array:
 
855
   it is either a ctypes type or a one character typecode of the kind used by
 
856
   the :mod:`array` module.  If *size_or_initializer* is an integer, then it
 
857
   determines the length of the array, and the array will be initially zeroed.
 
858
   Otherwise, *size_or_initializer* is a sequence which is used to initialize
 
859
   the array and whose length determines the length of the array.
 
860
 
 
861
   If *lock* is ``True`` (the default) then a new lock object is created to
 
862
   synchronize access to the value.  If *lock* is a :class:`Lock` or
 
863
   :class:`RLock` object then that will be used to synchronize access to the
 
864
   value.  If *lock* is ``False`` then access to the returned object will not be
 
865
   automatically protected by a lock, so it will not necessarily be
 
866
   "process-safe".
 
867
 
 
868
   Note that *lock* is a keyword only argument.
 
869
 
 
870
   Note that an array of :data:`ctypes.c_char` has *value* and *rawvalue*
 
871
   attributes which allow one to use it to store and retrieve strings.
 
872
 
 
873
 
 
874
The :mod:`multiprocessing.sharedctypes` module
 
875
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
 
876
 
 
877
.. module:: multiprocessing.sharedctypes
 
878
   :synopsis: Allocate ctypes objects from shared memory.
 
879
 
 
880
The :mod:`multiprocessing.sharedctypes` module provides functions for allocating
 
881
:mod:`ctypes` objects from shared memory which can be inherited by child
 
882
processes.
 
883
 
 
884
.. note::
 
885
 
 
886
   Although it is possible to store a pointer in shared memory remember that
 
887
   this will refer to a location in the address space of a specific process.
 
888
   However, the pointer is quite likely to be invalid in the context of a second
 
889
   process and trying to dereference the pointer from the second process may
 
890
   cause a crash.
 
891
 
 
892
.. function:: RawArray(typecode_or_type, size_or_initializer)
 
893
 
 
894
   Return a ctypes array allocated from shared memory.
 
895
 
 
896
   *typecode_or_type* determines the type of the elements of the returned array:
 
897
   it is either a ctypes type or a one character typecode of the kind used by
 
898
   the :mod:`array` module.  If *size_or_initializer* is an integer then it
 
899
   determines the length of the array, and the array will be initially zeroed.
 
900
   Otherwise *size_or_initializer* is a sequence which is used to initialize the
 
901
   array and whose length determines the length of the array.
 
902
 
 
903
   Note that setting and getting an element is potentially non-atomic -- use
 
904
   :func:`Array` instead to make sure that access is automatically synchronized
 
905
   using a lock.
 
906
 
 
907
.. function:: RawValue(typecode_or_type, *args)
 
908
 
 
909
   Return a ctypes object allocated from shared memory.
 
910
 
 
911
   *typecode_or_type* determines the type of the returned object: it is either a
 
912
   ctypes type or a one character typecode of the kind used by the :mod:`array`
 
913
   module.  */*args* is passed on to the constructor for the type.
 
914
 
 
915
   Note that setting and getting the value is potentially non-atomic -- use
 
916
   :func:`Value` instead to make sure that access is automatically synchronized
 
917
   using a lock.
 
918
 
 
919
   Note that an array of :data:`ctypes.c_char` has ``value`` and ``rawvalue``
 
920
   attributes which allow one to use it to store and retrieve strings -- see
 
921
   documentation for :mod:`ctypes`.
 
922
 
 
923
.. function:: Array(typecode_or_type, size_or_initializer[, *args[, lock]])
 
924
 
 
925
   The same as :func:`RawArray` except that depending on the value of *lock* a
 
926
   process-safe synchronization wrapper may be returned instead of a raw ctypes
 
927
   array.
 
928
 
 
929
   If *lock* is ``True`` (the default) then a new lock object is created to
 
930
   synchronize access to the value.  If *lock* is a :class:`Lock` or
 
931
   :class:`RLock` object then that will be used to synchronize access to the
 
932
   value.  If *lock* is ``False`` then access to the returned object will not be
 
933
   automatically protected by a lock, so it will not necessarily be
 
934
   "process-safe".
 
935
 
 
936
   Note that *lock* is a keyword-only argument.
 
937
 
 
938
.. function:: Value(typecode_or_type, *args[, lock])
 
939
 
 
940
   The same as :func:`RawValue` except that depending on the value of *lock* a
 
941
   process-safe synchronization wrapper may be returned instead of a raw ctypes
 
942
   object.
 
943
 
 
944
   If *lock* is ``True`` (the default) then a new lock object is created to
 
945
   synchronize access to the value.  If *lock* is a :class:`Lock` or
 
946
   :class:`RLock` object then that will be used to synchronize access to the
 
947
   value.  If *lock* is ``False`` then access to the returned object will not be
 
948
   automatically protected by a lock, so it will not necessarily be
 
949
   "process-safe".
 
950
 
 
951
   Note that *lock* is a keyword-only argument.
 
952
 
 
953
.. function:: copy(obj)
 
954
 
 
955
   Return a ctypes object allocated from shared memory which is a copy of the
 
956
   ctypes object *obj*.
 
957
 
 
958
.. function:: synchronized(obj[, lock])
 
959
 
 
960
   Return a process-safe wrapper object for a ctypes object which uses *lock* to
 
961
   synchronize access.  If *lock* is ``None`` (the default) then a
 
962
   :class:`multiprocessing.RLock` object is created automatically.
 
963
 
 
964
   A synchronized wrapper will have two methods in addition to those of the
 
965
   object it wraps: :meth:`get_obj` returns the wrapped object and
 
966
   :meth:`get_lock` returns the lock object used for synchronization.
 
967
 
 
968
   Note that accessing the ctypes object through the wrapper can be a lot slower
 
969
   than accessing the raw ctypes object.
 
970
 
 
971
 
 
972
The table below compares the syntax for creating shared ctypes objects from
 
973
shared memory with the normal ctypes syntax.  (In the table ``MyStruct`` is some
 
974
subclass of :class:`ctypes.Structure`.)
 
975
 
 
976
==================== ========================== ===========================
 
977
ctypes               sharedctypes using type    sharedctypes using typecode
 
978
==================== ========================== ===========================
 
979
c_double(2.4)        RawValue(c_double, 2.4)    RawValue('d', 2.4)
 
980
MyStruct(4, 6)       RawValue(MyStruct, 4, 6)
 
981
(c_short * 7)()      RawArray(c_short, 7)       RawArray('h', 7)
 
982
(c_int * 3)(9, 2, 8) RawArray(c_int, (9, 2, 8)) RawArray('i', (9, 2, 8))
 
983
==================== ========================== ===========================
 
984
 
 
985
 
 
986
Below is an example where a number of ctypes objects are modified by a child
 
987
process::
 
988
 
 
989
   from multiprocessing import Process, Lock
 
990
   from multiprocessing.sharedctypes import Value, Array
 
991
   from ctypes import Structure, c_double
 
992
 
 
993
   class Point(Structure):
 
994
       _fields_ = [('x', c_double), ('y', c_double)]
 
995
 
 
996
   def modify(n, x, s, A):
 
997
       n.value **= 2
 
998
       x.value **= 2
 
999
       s.value = s.value.upper()
 
1000
       for a in A:
 
1001
           a.x **= 2
 
1002
           a.y **= 2
 
1003
 
 
1004
   if __name__ == '__main__':
 
1005
       lock = Lock()
 
1006
 
 
1007
       n = Value('i', 7)
 
1008
       x = Value(ctypes.c_double, 1.0/3.0, lock=False)
 
1009
       s = Array('c', 'hello world', lock=lock)
 
1010
       A = Array(Point, [(1.875,-6.25), (-5.75,2.0), (2.375,9.5)], lock=lock)
 
1011
 
 
1012
       p = Process(target=modify, args=(n, x, s, A))
 
1013
       p.start()
 
1014
       p.join()
 
1015
 
 
1016
       print n.value
 
1017
       print x.value
 
1018
       print s.value
 
1019
       print [(a.x, a.y) for a in A]
 
1020
 
 
1021
 
 
1022
.. highlightlang:: none
 
1023
 
 
1024
The results printed are ::
 
1025
 
 
1026
    49
 
1027
    0.1111111111111111
 
1028
    HELLO WORLD
 
1029
    [(3.515625, 39.0625), (33.0625, 4.0), (5.640625, 90.25)]
 
1030
 
 
1031
.. highlightlang:: python
 
1032
 
 
1033
 
 
1034
.. _multiprocessing-managers:
 
1035
 
 
1036
Managers
 
1037
~~~~~~~~
 
1038
 
 
1039
Managers provide a way to create data which can be shared between different
 
1040
processes. A manager object controls a server process which manages *shared
 
1041
objects*.  Other processes can access the shared objects by using proxies.
 
1042
 
 
1043
.. function:: multiprocessing.Manager()
 
1044
 
 
1045
   Returns a started :class:`~multiprocessing.managers.SyncManager` object which
 
1046
   can be used for sharing objects between processes.  The returned manager
 
1047
   object corresponds to a spawned child process and has methods which will
 
1048
   create shared objects and return corresponding proxies.
 
1049
 
 
1050
.. module:: multiprocessing.managers
 
1051
   :synopsis: Share data between process with shared objects.
 
1052
 
 
1053
Manager processes will be shutdown as soon as they are garbage collected or
 
1054
their parent process exits.  The manager classes are defined in the
 
1055
:mod:`multiprocessing.managers` module:
 
1056
 
 
1057
.. class:: BaseManager([address[, authkey]])
 
1058
 
 
1059
   Create a BaseManager object.
 
1060
 
 
1061
   Once created one should call :meth:`start` or :meth:`serve_forever` to ensure
 
1062
   that the manager object refers to a started manager process.
 
1063
 
 
1064
   *address* is the address on which the manager process listens for new
 
1065
   connections.  If *address* is ``None`` then an arbitrary one is chosen.
 
1066
 
 
1067
   *authkey* is the authentication key which will be used to check the validity
 
1068
   of incoming connections to the server process.  If *authkey* is ``None`` then
 
1069
   ``current_process().authkey``.  Otherwise *authkey* is used and it
 
1070
   must be a string.
 
1071
 
 
1072
   .. method:: start()
 
1073
 
 
1074
      Start a subprocess to start the manager.
 
1075
 
 
1076
   .. method:: serve_forever()
 
1077
 
 
1078
      Run the server in the current process.
 
1079
 
 
1080
   .. method:: from_address(address, authkey)
 
1081
 
 
1082
      A class method which creates a manager object referring to a pre-existing
 
1083
      server process which is using the given address and authentication key.
 
1084
 
 
1085
   .. method:: shutdown()
 
1086
 
 
1087
      Stop the process used by the manager.  This is only available if
 
1088
      :meth:`start` has been used to start the server process.
 
1089
 
 
1090
      This can be called multiple times.
 
1091
 
 
1092
   .. method:: register(typeid[, callable[, proxytype[, exposed[, method_to_typeid[, create_method]]]]])
 
1093
 
 
1094
      A classmethod which can be used for registering a type or callable with
 
1095
      the manager class.
 
1096
 
 
1097
      *typeid* is a "type identifier" which is used to identify a particular
 
1098
      type of shared object.  This must be a string.
 
1099
 
 
1100
      *callable* is a callable used for creating objects for this type
 
1101
      identifier.  If a manager instance will be created using the
 
1102
      :meth:`from_address` classmethod or if the *create_method* argument is
 
1103
      ``False`` then this can be left as ``None``.
 
1104
 
 
1105
      *proxytype* is a subclass of :class:`BaseProxy` which is used to create
 
1106
      proxies for shared objects with this *typeid*.  If ``None`` then a proxy
 
1107
      class is created automatically.
 
1108
 
 
1109
      *exposed* is used to specify a sequence of method names which proxies for
 
1110
      this typeid should be allowed to access using
 
1111
      :meth:`BaseProxy._callMethod`.  (If *exposed* is ``None`` then
 
1112
      :attr:`proxytype._exposed_` is used instead if it exists.)  In the case
 
1113
      where no exposed list is specified, all "public methods" of the shared
 
1114
      object will be accessible.  (Here a "public method" means any attribute
 
1115
      which has a :meth:`__call__` method and whose name does not begin with
 
1116
      ``'_'``.)
 
1117
 
 
1118
      *method_to_typeid* is a mapping used to specify the return type of those
 
1119
      exposed methods which should return a proxy.  It maps method names to
 
1120
      typeid strings.  (If *method_to_typeid* is ``None`` then
 
1121
      :attr:`proxytype._method_to_typeid_` is used instead if it exists.)  If a
 
1122
      method's name is not a key of this mapping or if the mapping is ``None``
 
1123
      then the object returned by the method will be copied by value.
 
1124
 
 
1125
      *create_method* determines whether a method should be created with name
 
1126
      *typeid* which can be used to tell the server process to create a new
 
1127
      shared object and return a proxy for it.  By default it is ``True``.
 
1128
 
 
1129
   :class:`BaseManager` instances also have one read-only property:
 
1130
 
 
1131
   .. attribute:: address
 
1132
 
 
1133
      The address used by the manager.
 
1134
 
 
1135
 
 
1136
.. class:: SyncManager
 
1137
 
 
1138
   A subclass of :class:`BaseManager` which can be used for the synchronization
 
1139
   of processes.  Objects of this type are returned by
 
1140
   :func:`multiprocessing.Manager`.
 
1141
 
 
1142
   It also supports creation of shared lists and dictionaries.
 
1143
 
 
1144
   .. method:: BoundedSemaphore([value])
 
1145
 
 
1146
      Create a shared :class:`threading.BoundedSemaphore` object and return a
 
1147
      proxy for it.
 
1148
 
 
1149
   .. method:: Condition([lock])
 
1150
 
 
1151
      Create a shared :class:`threading.Condition` object and return a proxy for
 
1152
      it.
 
1153
 
 
1154
      If *lock* is supplied then it should be a proxy for a
 
1155
      :class:`threading.Lock` or :class:`threading.RLock` object.
 
1156
 
 
1157
   .. method:: Event()
 
1158
 
 
1159
      Create a shared :class:`threading.Event` object and return a proxy for it.
 
1160
 
 
1161
   .. method:: Lock()
 
1162
 
 
1163
      Create a shared :class:`threading.Lock` object and return a proxy for it.
 
1164
 
 
1165
   .. method:: Namespace()
 
1166
 
 
1167
      Create a shared :class:`Namespace` object and return a proxy for it.
 
1168
 
 
1169
   .. method:: Queue([maxsize])
 
1170
 
 
1171
      Create a shared :class:`Queue.Queue` object and return a proxy for it.
 
1172
 
 
1173
   .. method:: RLock()
 
1174
 
 
1175
      Create a shared :class:`threading.RLock` object and return a proxy for it.
 
1176
 
 
1177
   .. method:: Semaphore([value])
 
1178
 
 
1179
      Create a shared :class:`threading.Semaphore` object and return a proxy for
 
1180
      it.
 
1181
 
 
1182
   .. method:: Array(typecode, sequence)
 
1183
 
 
1184
      Create an array and return a proxy for it.
 
1185
 
 
1186
   .. method:: Value(typecode, value)
 
1187
 
 
1188
      Create an object with a writable ``value`` attribute and return a proxy
 
1189
      for it.
 
1190
 
 
1191
   .. method:: dict()
 
1192
               dict(mapping)
 
1193
               dict(sequence)
 
1194
 
 
1195
      Create a shared ``dict`` object and return a proxy for it.
 
1196
 
 
1197
   .. method:: list()
 
1198
               list(sequence)
 
1199
 
 
1200
      Create a shared ``list`` object and return a proxy for it.
 
1201
 
 
1202
 
 
1203
Namespace objects
 
1204
>>>>>>>>>>>>>>>>>
 
1205
 
 
1206
A namespace object has no public methods, but does have writable attributes.
 
1207
Its representation shows the values of its attributes.
 
1208
 
 
1209
However, when using a proxy for a namespace object, an attribute beginning with
 
1210
``'_'`` will be an attribute of the proxy and not an attribute of the referent::
 
1211
 
 
1212
   >>> manager = multiprocessing.Manager()
 
1213
   >>> Global = manager.Namespace()
 
1214
   >>> Global.x = 10
 
1215
   >>> Global.y = 'hello'
 
1216
   >>> Global._z = 12.3    # this is an attribute of the proxy
 
1217
   >>> print Global
 
1218
   Namespace(x=10, y='hello')
 
1219
 
 
1220
 
 
1221
Customized managers
 
1222
>>>>>>>>>>>>>>>>>>>
 
1223
 
 
1224
To create one's own manager, one creates a subclass of :class:`BaseManager` and
 
1225
use the :meth:`~BaseManager.resgister` classmethod to register new types or
 
1226
callables with the manager class.  For example::
 
1227
 
 
1228
   from multiprocessing.managers import BaseManager
 
1229
 
 
1230
   class MathsClass(object):
 
1231
       def add(self, x, y):
 
1232
           return x + y
 
1233
       def mul(self, x, y):
 
1234
           return x * y
 
1235
 
 
1236
   class MyManager(BaseManager):
 
1237
       pass
 
1238
 
 
1239
   MyManager.register('Maths', MathsClass)
 
1240
 
 
1241
   if __name__ == '__main__':
 
1242
       manager = MyManager()
 
1243
       manager.start()
 
1244
       maths = manager.Maths()
 
1245
       print maths.add(4, 3)         # prints 7
 
1246
       print maths.mul(7, 8)         # prints 56
 
1247
 
 
1248
 
 
1249
Using a remote manager
 
1250
>>>>>>>>>>>>>>>>>>>>>>
 
1251
 
 
1252
It is possible to run a manager server on one machine and have clients use it
 
1253
from other machines (assuming that the firewalls involved allow it).
 
1254
 
 
1255
Running the following commands creates a server for a single shared queue which
 
1256
remote clients can access::
 
1257
 
 
1258
   >>> from multiprocessing.managers import BaseManager
 
1259
   >>> import Queue
 
1260
   >>> queue = Queue.Queue()
 
1261
   >>> class QueueManager(BaseManager): pass
 
1262
   ...
 
1263
   >>> QueueManager.register('getQueue', callable=lambda:queue)
 
1264
   >>> m = QueueManager(address=('', 50000), authkey='abracadabra')
 
1265
   >>> m.serveForever()
 
1266
 
 
1267
One client can access the server as follows::
 
1268
 
 
1269
   >>> from multiprocessing.managers import BaseManager
 
1270
   >>> class QueueManager(BaseManager): pass
 
1271
   ...
 
1272
   >>> QueueManager.register('getQueue')
 
1273
   >>> m = QueueManager.from_address(address=('foo.bar.org', 50000),
 
1274
   >>> authkey='abracadabra')
 
1275
   >>> queue = m.getQueue()
 
1276
   >>> queue.put('hello')
 
1277
 
 
1278
Another client can also use it::
 
1279
 
 
1280
   >>> from multiprocessing.managers import BaseManager
 
1281
   >>> class QueueManager(BaseManager): pass
 
1282
   ...
 
1283
   >>> QueueManager.register('getQueue')
 
1284
   >>> m = QueueManager.from_address(address=('foo.bar.org', 50000), authkey='abracadabra')
 
1285
   >>> queue = m.getQueue()
 
1286
   >>> queue.get()
 
1287
   'hello'
 
1288
 
 
1289
 
 
1290
Proxy Objects
 
1291
~~~~~~~~~~~~~
 
1292
 
 
1293
A proxy is an object which *refers* to a shared object which lives (presumably)
 
1294
in a different process.  The shared object is said to be the *referent* of the
 
1295
proxy.  Multiple proxy objects may have the same referent.
 
1296
 
 
1297
A proxy object has methods which invoke corresponding methods of its referent
 
1298
(although not every method of the referent will necessarily be available through
 
1299
the proxy).  A proxy can usually be used in most of the same ways that its
 
1300
referent can::
 
1301
 
 
1302
   >>> from multiprocessing import Manager
 
1303
   >>> manager = Manager()
 
1304
   >>> l = manager.list([i*i for i in range(10)])
 
1305
   >>> print l
 
1306
   [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
 
1307
   >>> print repr(l)
 
1308
   <ListProxy object, typeid 'list' at 0xb799974c>
 
1309
   >>> l[4]
 
1310
   16
 
1311
   >>> l[2:5]
 
1312
   [4, 9, 16]
 
1313
 
 
1314
Notice that applying :func:`str` to a proxy will return the representation of
 
1315
the referent, whereas applying :func:`repr` will return the representation of
 
1316
the proxy.
 
1317
 
 
1318
An important feature of proxy objects is that they are picklable so they can be
 
1319
passed between processes.  Note, however, that if a proxy is sent to the
 
1320
corresponding manager's process then unpickling it will produce the referent
 
1321
itself.  This means, for example, that one shared object can contain a second::
 
1322
 
 
1323
   >>> a = manager.list()
 
1324
   >>> b = manager.list()
 
1325
   >>> a.append(b)         # referent of a now contains referent of b
 
1326
   >>> print a, b
 
1327
   [[]] []
 
1328
   >>> b.append('hello')
 
1329
   >>> print a, b
 
1330
   [['hello']] ['hello']
 
1331
 
 
1332
.. note::
 
1333
 
 
1334
   The proxy types in :mod:`multiprocessing` do nothing to support comparisons
 
1335
   by value.  So, for instance, ::
 
1336
 
 
1337
       manager.list([1,2,3]) == [1,2,3]
 
1338
 
 
1339
   will return ``False``.  One should just use a copy of the referent instead
 
1340
   when making comparisons.
 
1341
 
 
1342
.. class:: BaseProxy
 
1343
 
 
1344
   Proxy objects are instances of subclasses of :class:`BaseProxy`.
 
1345
 
 
1346
   .. method:: _call_method(methodname[, args[, kwds]])
 
1347
 
 
1348
      Call and return the result of a method of the proxy's referent.
 
1349
 
 
1350
      If ``proxy`` is a proxy whose referent is ``obj`` then the expression ::
 
1351
 
 
1352
         proxy._call_method(methodname, args, kwds)
 
1353
 
 
1354
      will evaluate the expression ::
 
1355
 
 
1356
         getattr(obj, methodname)(*args, **kwds)
 
1357
 
 
1358
      in the manager's process.
 
1359
 
 
1360
      The returned value will be a copy of the result of the call or a proxy to
 
1361
      a new shared object -- see documentation for the *method_to_typeid*
 
1362
      argument of :meth:`BaseManager.register`.
 
1363
 
 
1364
      If an exception is raised by the call, then then is re-raised by
 
1365
      :meth:`_call_method`.  If some other exception is raised in the manager's
 
1366
      process then this is converted into a :exc:`RemoteError` exception and is
 
1367
      raised by :meth:`_call_method`.
 
1368
 
 
1369
      Note in particular that an exception will be raised if *methodname* has
 
1370
      not been *exposed*
 
1371
 
 
1372
      An example of the usage of :meth:`_call_method`::
 
1373
 
 
1374
         >>> l = manager.list(range(10))
 
1375
         >>> l._call_method('__len__')
 
1376
         10
 
1377
         >>> l._call_method('__getslice__', (2, 7))   # equiv to `l[2:7]`
 
1378
         [2, 3, 4, 5, 6]
 
1379
         >>> l._call_method('__getitem__', (20,))     # equiv to `l[20]`
 
1380
         Traceback (most recent call last):
 
1381
         ...
 
1382
         IndexError: list index out of range
 
1383
 
 
1384
   .. method:: _get_value()
 
1385
 
 
1386
      Return a copy of the referent.
 
1387
 
 
1388
      If the referent is unpicklable then this will raise an exception.
 
1389
 
 
1390
   .. method:: __repr__
 
1391
 
 
1392
      Return a representation of the proxy object.
 
1393
 
 
1394
   .. method:: __str__
 
1395
 
 
1396
      Return the representation of the referent.
 
1397
 
 
1398
 
 
1399
Cleanup
 
1400
>>>>>>>
 
1401
 
 
1402
A proxy object uses a weakref callback so that when it gets garbage collected it
 
1403
deregisters itself from the manager which owns its referent.
 
1404
 
 
1405
A shared object gets deleted from the manager process when there are no longer
 
1406
any proxies referring to it.
 
1407
 
 
1408
 
 
1409
Process Pools
 
1410
~~~~~~~~~~~~~
 
1411
 
 
1412
.. module:: multiprocessing.pool
 
1413
   :synopsis: Create pools of processes.
 
1414
 
 
1415
One can create a pool of processes which will carry out tasks submitted to it
 
1416
with the :class:`Pool` class.
 
1417
 
 
1418
.. class:: multiprocessing.Pool([processes[, initializer[, initargs]]])
 
1419
 
 
1420
   A process pool object which controls a pool of worker processes to which jobs
 
1421
   can be submitted.  It supports asynchronous results with timeouts and
 
1422
   callbacks and has a parallel map implementation.
 
1423
 
 
1424
   *processes* is the number of worker processes to use.  If *processes* is
 
1425
   ``None`` then the number returned by :func:`cpu_count` is used.  If
 
1426
   *initializer* is not ``None`` then each worker process will call
 
1427
   ``initializer(*initargs)`` when it starts.
 
1428
 
 
1429
   .. method:: apply(func[, args[, kwds]])
 
1430
 
 
1431
      Equivalent of the :func:`apply` builtin function.  It blocks till the
 
1432
      result is ready.
 
1433
 
 
1434
   .. method:: apply_async(func[, args[, kwds[, callback]]])
 
1435
 
 
1436
      A variant of the :meth:`apply` method which returns a result object.
 
1437
 
 
1438
      If *callback* is specified then it should be a callable which accepts a
 
1439
      single argument.  When the result becomes ready *callback* is applied to
 
1440
      it (unless the call failed).  *callback* should complete immediately since
 
1441
      otherwise the thread which handles the results will get blocked.
 
1442
 
 
1443
   .. method:: map(func, iterable[, chunksize])
 
1444
 
 
1445
      A parallel equivalent of the :func:`map` builtin function.  It blocks till
 
1446
      the result is ready.
 
1447
 
 
1448
      This method chops the iterable into a number of chunks which it submits to
 
1449
      the process pool as separate tasks.  The (approximate) size of these
 
1450
      chunks can be specified by setting *chunksize* to a positive integer.
 
1451
 
 
1452
   .. method:: map_async(func, iterable[, chunksize[, callback]])
 
1453
 
 
1454
      A variant of the :meth:`map` method which returns a result object.
 
1455
 
 
1456
      If *callback* is specified then it should be a callable which accepts a
 
1457
      single argument.  When the result becomes ready *callback* is applied to
 
1458
      it (unless the call failed).  *callback* should complete immediately since
 
1459
      otherwise the thread which handles the results will get blocked.
 
1460
 
 
1461
   .. method:: imap(func, iterable[, chunksize])
 
1462
 
 
1463
      An equivalent of :func:`itertools.imap`.
 
1464
 
 
1465
      The *chunksize* argument is the same as the one used by the :meth:`.map`
 
1466
      method.  For very long iterables using a large value for *chunksize* can
 
1467
      make make the job complete **much** faster than using the default value of
 
1468
      ``1``.
 
1469
 
 
1470
      Also if *chunksize* is ``1`` then the :meth:`next` method of the iterator
 
1471
      returned by the :meth:`imap` method has an optional *timeout* parameter:
 
1472
      ``next(timeout)`` will raise :exc:`multiprocessing.TimeoutError` if the
 
1473
      result cannot be returned within *timeout* seconds.
 
1474
 
 
1475
   .. method:: imap_unordered(func, iterable[, chunksize])
 
1476
 
 
1477
      The same as :meth:`imap` except that the ordering of the results from the
 
1478
      returned iterator should be considered arbitrary.  (Only when there is
 
1479
      only one worker process is the order guaranteed to be "correct".)
 
1480
 
 
1481
   .. method:: close()
 
1482
 
 
1483
      Prevents any more tasks from being submitted to the pool.  Once all the
 
1484
      tasks have been completed the worker processes will exit.
 
1485
 
 
1486
   .. method:: terminate()
 
1487
 
 
1488
      Stops the worker processes immediately without completing outstanding
 
1489
      work.  When the pool object is garbage collected :meth:`terminate` will be
 
1490
      called immediately.
 
1491
 
 
1492
   .. method:: join()
 
1493
 
 
1494
      Wait for the worker processes to exit.  One must call :meth:`close` or
 
1495
      :meth:`terminate` before using :meth:`join`.
 
1496
 
 
1497
 
 
1498
.. class:: AsyncResult
 
1499
 
 
1500
   The class of the result returned by :meth:`Pool.apply_async` and
 
1501
   :meth:`Pool.map_async`.
 
1502
 
 
1503
   .. method:: get([timeout)
 
1504
 
 
1505
      Return the result when it arrives.  If *timeout* is not ``None`` and the
 
1506
      result does not arrive within *timeout* seconds then
 
1507
      :exc:`multiprocessing.TimeoutError` is raised.  If the remote call raised
 
1508
      an exception then that exception will be reraised by :meth:`get`.
 
1509
 
 
1510
   .. method:: wait([timeout])
 
1511
 
 
1512
      Wait until the result is available or until *timeout* seconds pass.
 
1513
 
 
1514
   .. method:: ready()
 
1515
 
 
1516
      Return whether the call has completed.
 
1517
 
 
1518
   .. method:: successful()
 
1519
 
 
1520
      Return whether the call completed without raising an exception.  Will
 
1521
      raise :exc:`AssertionError` if the result is not ready.
 
1522
 
 
1523
The following example demonstrates the use of a pool::
 
1524
 
 
1525
   from multiprocessing import Pool
 
1526
 
 
1527
   def f(x):
 
1528
       return x*x
 
1529
 
 
1530
   if __name__ == '__main__':
 
1531
       pool = Pool(processes=4)              # start 4 worker processes
 
1532
 
 
1533
       result = pool.applyAsync(f, (10,))    # evaluate "f(10)" asynchronously
 
1534
       print result.get(timeout=1)           # prints "100" unless your computer is *very* slow
 
1535
 
 
1536
       print pool.map(f, range(10))          # prints "[0, 1, 4,..., 81]"
 
1537
 
 
1538
       it = pool.imap(f, range(10))
 
1539
       print it.next()                       # prints "0"
 
1540
       print it.next()                       # prints "1"
 
1541
       print it.next(timeout=1)              # prints "4" unless your computer is *very* slow
 
1542
 
 
1543
       import time
 
1544
       result = pool.applyAsync(time.sleep, (10,))
 
1545
       print result.get(timeout=1)           # raises TimeoutError
 
1546
 
 
1547
 
 
1548
.. _multiprocessing-listeners-clients:
 
1549
 
 
1550
Listeners and Clients
 
1551
~~~~~~~~~~~~~~~~~~~~~
 
1552
 
 
1553
.. module:: multiprocessing.connection
 
1554
   :synopsis: API for dealing with sockets.
 
1555
 
 
1556
Usually message passing between processes is done using queues or by using
 
1557
:class:`Connection` objects returned by :func:`Pipe`.
 
1558
 
 
1559
However, the :mod:`multiprocessing.connection` module allows some extra
 
1560
flexibility.  It basically gives a high level message oriented API for dealing
 
1561
with sockets or Windows named pipes, and also has support for *digest
 
1562
authentication* using the :mod:`hmac` module.
 
1563
 
 
1564
 
 
1565
.. function:: deliver_challenge(connection, authkey)
 
1566
 
 
1567
   Send a randomly generated message to the other end of the connection and wait
 
1568
   for a reply.
 
1569
 
 
1570
   If the reply matches the digest of the message using *authkey* as the key
 
1571
   then a welcome message is sent to the other end of the connection.  Otherwise
 
1572
   :exc:`AuthenticationError` is raised.
 
1573
 
 
1574
.. function:: answerChallenge(connection, authkey)
 
1575
 
 
1576
   Receive a message, calculate the digest of the message using *authkey* as the
 
1577
   key, and then send the digest back.
 
1578
 
 
1579
   If a welcome message is not received, then :exc:`AuthenticationError` is
 
1580
   raised.
 
1581
 
 
1582
.. function:: Client(address[, family[, authenticate[, authkey]]])
 
1583
 
 
1584
   Attempt to set up a connection to the listener which is using address
 
1585
   *address*, returning a :class:`~multiprocessing.Connection`.
 
1586
 
 
1587
   The type of the connection is determined by *family* argument, but this can
 
1588
   generally be omitted since it can usually be inferred from the format of
 
1589
   *address*. (See :ref:`multiprocessing-address-formats`)
 
1590
 
 
1591
   If *authentication* is ``True`` or *authkey* is a string then digest
 
1592
   authentication is used.  The key used for authentication will be either
 
1593
   *authkey* or ``current_process().authkey)`` if *authkey* is ``None``.
 
1594
   If authentication fails then :exc:`AuthenticationError` is raised.  See
 
1595
   :ref:`multiprocessing-auth-keys`.
 
1596
 
 
1597
.. class:: Listener([address[, family[, backlog[, authenticate[, authkey]]]]])
 
1598
 
 
1599
   A wrapper for a bound socket or Windows named pipe which is 'listening' for
 
1600
   connections.
 
1601
 
 
1602
   *address* is the address to be used by the bound socket or named pipe of the
 
1603
   listener object.
 
1604
 
 
1605
   *family* is the type of socket (or named pipe) to use.  This can be one of
 
1606
   the strings ``'AF_INET'`` (for a TCP socket), ``'AF_UNIX'`` (for a Unix
 
1607
   domain socket) or ``'AF_PIPE'`` (for a Windows named pipe).  Of these only
 
1608
   the first is guaranteed to be available.  If *family* is ``None`` then the
 
1609
   family is inferred from the format of *address*.  If *address* is also
 
1610
   ``None`` then a default is chosen.  This default is the family which is
 
1611
   assumed to be the fastest available.  See
 
1612
   :ref:`multiprocessing-address-formats`.  Note that if *family* is
 
1613
   ``'AF_UNIX'`` and address is ``None`` then the socket will be created in a
 
1614
   private temporary directory created using :func:`tempfile.mkstemp`.
 
1615
 
 
1616
   If the listener object uses a socket then *backlog* (1 by default) is passed
 
1617
   to the :meth:`listen` method of the socket once it has been bound.
 
1618
 
 
1619
   If *authenticate* is ``True`` (``False`` by default) or *authkey* is not
 
1620
   ``None`` then digest authentication is used.
 
1621
 
 
1622
   If *authkey* is a string then it will be used as the authentication key;
 
1623
   otherwise it must be *None*.
 
1624
 
 
1625
   If *authkey* is ``None`` and *authenticate* is ``True`` then
 
1626
   ``current_process().authkey`` is used as the authentication key.  If
 
1627
   *authkey* is ``None`` and *authentication* is ``False`` then no
 
1628
   authentication is done.  If authentication fails then
 
1629
   :exc:`AuthenticationError` is raised.  See :ref:`multiprocessing-auth-keys`.
 
1630
 
 
1631
   .. method:: accept()
 
1632
 
 
1633
      Accept a connection on the bound socket or named pipe of the listener
 
1634
      object and return a :class:`Connection` object.  If authentication is
 
1635
      attempted and fails, then :exc:`AuthenticationError` is raised.
 
1636
 
 
1637
   .. method:: close()
 
1638
 
 
1639
      Close the bound socket or named pipe of the listener object.  This is
 
1640
      called automatically when the listener is garbage collected.  However it
 
1641
      is advisable to call it explicitly.
 
1642
 
 
1643
   Listener objects have the following read-only properties:
 
1644
 
 
1645
   .. attribute:: address
 
1646
 
 
1647
      The address which is being used by the Listener object.
 
1648
 
 
1649
   .. attribute:: last_accepted
 
1650
 
 
1651
      The address from which the last accepted connection came.  If this is
 
1652
      unavailable then it is ``None``.
 
1653
 
 
1654
 
 
1655
The module defines two exceptions:
 
1656
 
 
1657
.. exception:: AuthenticationError
 
1658
 
 
1659
   Exception raised when there is an authentication error.
 
1660
 
 
1661
 
 
1662
**Examples**
 
1663
 
 
1664
The following server code creates a listener which uses ``'secret password'`` as
 
1665
an authentication key.  It then waits for a connection and sends some data to
 
1666
the client::
 
1667
 
 
1668
   from multiprocessing.connection import Listener
 
1669
   from array import array
 
1670
 
 
1671
   address = ('localhost', 6000)     # family is deduced to be 'AF_INET'
 
1672
   listener = Listener(address, authkey='secret password')
 
1673
 
 
1674
   conn = listener.accept()
 
1675
   print 'connection accepted from', listener.last_accepted
 
1676
 
 
1677
   conn.send([2.25, None, 'junk', float])
 
1678
 
 
1679
   conn.send_bytes('hello')
 
1680
 
 
1681
   conn.send_bytes(array('i', [42, 1729]))
 
1682
 
 
1683
   conn.close()
 
1684
   listener.close()
 
1685
 
 
1686
The following code connects to the server and receives some data from the
 
1687
server::
 
1688
 
 
1689
   from multiprocessing.connection import Client
 
1690
   from array import array
 
1691
 
 
1692
   address = ('localhost', 6000)
 
1693
   conn = Client(address, authkey='secret password')
 
1694
 
 
1695
   print conn.recv()                 # => [2.25, None, 'junk', float]
 
1696
 
 
1697
   print conn.recv_bytes()            # => 'hello'
 
1698
 
 
1699
   arr = array('i', [0, 0, 0, 0, 0])
 
1700
   print conn.recv_bytes_into(arr)     # => 8
 
1701
   print arr                         # => array('i', [42, 1729, 0, 0, 0])
 
1702
 
 
1703
   conn.close()
 
1704
 
 
1705
 
 
1706
.. _multiprocessing-address-formats:
 
1707
 
 
1708
Address Formats
 
1709
>>>>>>>>>>>>>>>
 
1710
 
 
1711
* An ``'AF_INET'`` address is a tuple of the form ``(hostname, port)`` where
 
1712
  *hostname* is a string and *port* is an integer.
 
1713
 
 
1714
* An ``'AF_UNIX'`` address is a string representing a filename on the
 
1715
  filesystem.
 
1716
 
 
1717
* An ``'AF_PIPE'`` address is a string of the form
 
1718
   ``r'\\\\.\\pipe\\PipeName'``.  To use :func:`Client` to connect to a named
 
1719
   pipe on a remote computer called ServerName* one should use an address of the
 
1720
   form ``r'\\\\ServerName\\pipe\\PipeName'`` instead.
 
1721
 
 
1722
Note that any string beginning with two backslashes is assumed by default to be
 
1723
an ``'AF_PIPE'`` address rather than an ``'AF_UNIX'`` address.
 
1724
 
 
1725
 
 
1726
.. _multiprocessing-auth-keys:
 
1727
 
 
1728
Authentication keys
 
1729
~~~~~~~~~~~~~~~~~~~
 
1730
 
 
1731
When one uses :meth:`Connection.recv`, the data received is automatically
 
1732
unpickled.  Unfortunately unpickling data from an untrusted source is a security
 
1733
risk.  Therefore :class:`Listener` and :func:`Client` use the :mod:`hmac` module
 
1734
to provide digest authentication.
 
1735
 
 
1736
An authentication key is a string which can be thought of as a password: once a
 
1737
connection is established both ends will demand proof that the other knows the
 
1738
authentication key.  (Demonstrating that both ends are using the same key does
 
1739
**not** involve sending the key over the connection.)
 
1740
 
 
1741
If authentication is requested but do authentication key is specified then the
 
1742
return value of ``current_process().authkey`` is used (see
 
1743
:class:`~multiprocessing.Process`).  This value will automatically inherited by
 
1744
any :class:`~multiprocessing.Process` object that the current process creates.
 
1745
This means that (by default) all processes of a multi-process program will share
 
1746
a single authentication key which can be used when setting up connections
 
1747
between the themselves.
 
1748
 
 
1749
Suitable authentication keys can also be generated by using :func:`os.urandom`.
 
1750
 
 
1751
 
 
1752
Logging
 
1753
~~~~~~~
 
1754
 
 
1755
Some support for logging is available.  Note, however, that the :mod:`logging`
 
1756
package does not use process shared locks so it is possible (depending on the
 
1757
handler type) for messages from different processes to get mixed up.
 
1758
 
 
1759
.. currentmodule:: multiprocessing
 
1760
.. function:: get_logger()
 
1761
 
 
1762
   Returns the logger used by :mod:`multiprocessing`.  If necessary, a new one
 
1763
   will be created.
 
1764
 
 
1765
   When first created the logger has level :data:`logging.NOTSET` and has a
 
1766
   handler which sends output to :data:`sys.stderr` using format
 
1767
   ``'[%(levelname)s/%(processName)s] %(message)s'``.  (The logger allows use of
 
1768
   the non-standard ``'%(processName)s'`` format.)  Message sent to this logger
 
1769
   will not by default propagate to the root logger.
 
1770
 
 
1771
   Note that on Windows child processes will only inherit the level of the
 
1772
   parent process's logger -- any other customization of the logger will not be
 
1773
   inherited.
 
1774
 
 
1775
Below is an example session with logging turned on::
 
1776
 
 
1777
    >>> import processing, logging
 
1778
    >>> logger = processing.getLogger()
 
1779
    >>> logger.setLevel(logging.INFO)
 
1780
    >>> logger.warning('doomed')
 
1781
    [WARNING/MainProcess] doomed
 
1782
    >>> m = processing.Manager()
 
1783
    [INFO/SyncManager-1] child process calling self.run()
 
1784
    [INFO/SyncManager-1] manager bound to '\\\\.\\pipe\\pyc-2776-0-lj0tfa'
 
1785
    >>> del m
 
1786
    [INFO/MainProcess] sending shutdown message to manager
 
1787
    [INFO/SyncManager-1] manager exiting with exitcode 0
 
1788
 
 
1789
 
 
1790
The :mod:`multiprocessing.dummy` module
 
1791
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 
1792
 
 
1793
.. module:: multiprocessing.dummy
 
1794
   :synopsis: Dumb wrapper around threading.
 
1795
 
 
1796
:mod:`multiprocessing.dummy` replicates the API of :mod:`multiprocessing` but is
 
1797
no more than a wrapper around the :mod:`threading` module.
 
1798
 
 
1799
 
 
1800
.. _multiprocessing-programming:
 
1801
 
 
1802
Programming guidelines
 
1803
----------------------
 
1804
 
 
1805
There are certain guidelines and idioms which should be adhered to when using
 
1806
:mod:`multiprocessing`.
 
1807
 
 
1808
 
 
1809
All platforms
 
1810
~~~~~~~~~~~~~
 
1811
 
 
1812
Avoid shared state
 
1813
 
 
1814
    As far as possible one should try to avoid shifting large amounts of data
 
1815
    between processes.
 
1816
 
 
1817
    It is probably best to stick to using queues or pipes for communication
 
1818
    between processes rather than using the lower level synchronization
 
1819
    primitives from the :mod:`threading` module.
 
1820
 
 
1821
Picklability
 
1822
 
 
1823
    Ensure that the arguments to the methods of proxies are picklable.
 
1824
 
 
1825
Thread safety of proxies
 
1826
 
 
1827
    Do not use a proxy object from more than one thread unless you protect it
 
1828
    with a lock.
 
1829
 
 
1830
    (There is never a problem with different processes using the *same* proxy.)
 
1831
 
 
1832
Joining zombie processes
 
1833
 
 
1834
    On Unix when a process finishes but has not been joined it becomes a zombie.
 
1835
    There should never be very many because each time a new process starts (or
 
1836
    :func:`active_children` is called) all completed processes which have not
 
1837
    yet been joined will be joined.  Also calling a finished process's
 
1838
    :meth:`Process.is_alive` will join the process.  Even so it is probably good
 
1839
    practice to explicitly join all the processes that you start.
 
1840
 
 
1841
Better to inherit than pickle/unpickle
 
1842
 
 
1843
    On Windows many types from :mod:`multiprocessing` need to be picklable so
 
1844
    that child processes can use them.  However, one should generally avoid
 
1845
    sending shared objects to other processes using pipes or queues.  Instead
 
1846
    you should arrange the program so that a process which need access to a
 
1847
    shared resource created elsewhere can inherit it from an ancestor process.
 
1848
 
 
1849
Avoid terminating processes
 
1850
 
 
1851
    Using the :meth:`Process.terminate` method to stop a process is liable to
 
1852
    cause any shared resources (such as locks, semaphores, pipes and queues)
 
1853
    currently being used by the process to become broken or unavailable to other
 
1854
    processes.
 
1855
 
 
1856
    Therefore it is probably best to only consider using
 
1857
    :meth:`Process.terminate` on processes which never use any shared resources.
 
1858
 
 
1859
Joining processes that use queues
 
1860
 
 
1861
    Bear in mind that a process that has put items in a queue will wait before
 
1862
    terminating until all the buffered items are fed by the "feeder" thread to
 
1863
    the underlying pipe.  (The child process can call the
 
1864
    :meth:`Queue.cancel_join_thread` method of the queue to avoid this behaviour.)
 
1865
 
 
1866
    This means that whenever you use a queue you need to make sure that all
 
1867
    items which have been put on the queue will eventually be removed before the
 
1868
    process is joined.  Otherwise you cannot be sure that processes which have
 
1869
    put items on the queue will terminate.  Remember also that non-daemonic
 
1870
    processes will be automatically be joined.
 
1871
 
 
1872
    An example which will deadlock is the following::
 
1873
 
 
1874
        from multiprocessing import Process, Queue
 
1875
 
 
1876
        def f(q):
 
1877
            q.put('X' * 1000000)
 
1878
 
 
1879
        if __name__ == '__main__':
 
1880
            queue = Queue()
 
1881
            p = Process(target=f, args=(queue,))
 
1882
            p.start()
 
1883
            p.join()                    # this deadlocks
 
1884
            obj = queue.get()
 
1885
 
 
1886
    A fix here would be to swap the last two lines round (or simply remove the
 
1887
    ``p.join()`` line).
 
1888
 
 
1889
Explicitly pass resources to child processes
 
1890
 
 
1891
    On Unix a child process can make use of a shared resource created in a
 
1892
    parent process using a global resource.  However, it is better to pass the
 
1893
    object as an argument to the constructor for the child process.
 
1894
 
 
1895
    Apart from making the code (potentially) compatible with Windows this also
 
1896
    ensures that as long as the child process is still alive the object will not
 
1897
    be garbage collected in the parent process.  This might be important if some
 
1898
    resource is freed when the object is garbage collected in the parent
 
1899
    process.
 
1900
 
 
1901
    So for instance ::
 
1902
 
 
1903
        from multiprocessing import Process, Lock
 
1904
 
 
1905
        def f():
 
1906
            ... do something using "lock" ...
 
1907
 
 
1908
        if __name__ == '__main__':
 
1909
           lock = Lock()
 
1910
           for i in range(10):
 
1911
                Process(target=f).start()
 
1912
 
 
1913
    should be rewritten as ::
 
1914
 
 
1915
        from multiprocessing import Process, Lock
 
1916
 
 
1917
        def f(l):
 
1918
            ... do something using "l" ...
 
1919
 
 
1920
        if __name__ == '__main__':
 
1921
           lock = Lock()
 
1922
           for i in range(10):
 
1923
                Process(target=f, args=(lock,)).start()
 
1924
 
 
1925
 
 
1926
Windows
 
1927
~~~~~~~
 
1928
 
 
1929
Since Windows lacks :func:`os.fork` it has a few extra restrictions:
 
1930
 
 
1931
More picklability
 
1932
 
 
1933
    Ensure that all arguments to :meth:`Process.__init__` are picklable.  This
 
1934
    means, in particular, that bound or unbound methods cannot be used directly
 
1935
    as the ``target`` argument on Windows --- just define a function and use
 
1936
    that instead.
 
1937
 
 
1938
    Also, if you subclass :class:`Process` then make sure that instances will be
 
1939
    picklable when the :meth:`Process.start` method is called.
 
1940
 
 
1941
Global variables
 
1942
 
 
1943
    Bear in mind that if code run in a child process tries to access a global
 
1944
    variable, then the value it sees (if any) may not be the same as the value
 
1945
    in the parent process at the time that :meth:`Process.start` was called.
 
1946
 
 
1947
    However, global variables which are just module level constants cause no
 
1948
    problems.
 
1949
 
 
1950
Safe importing of main module
 
1951
 
 
1952
    Make sure that the main module can be safely imported by a new Python
 
1953
    interpreter without causing unintended side effects (such a starting a new
 
1954
    process).
 
1955
 
 
1956
    For example, under Windows running the following module would fail with a
 
1957
    :exc:`RuntimeError`::
 
1958
 
 
1959
        from multiprocessing import Process
 
1960
 
 
1961
        def foo():
 
1962
            print 'hello'
 
1963
 
 
1964
        p = Process(target=foo)
 
1965
        p.start()
 
1966
 
 
1967
    Instead one should protect the "entry point" of the program by using ``if
 
1968
    __name__ == '__main__':`` as follows::
 
1969
 
 
1970
       from multiprocessing import Process, freeze_support
 
1971
 
 
1972
       def foo():
 
1973
           print 'hello'
 
1974
 
 
1975
       if __name__ == '__main__':
 
1976
           freeze_support()
 
1977
           p = Process(target=foo)
 
1978
           p.start()
 
1979
 
 
1980
    (The ``freeze_support()`` line can be omitted if the program will be run
 
1981
    normally instead of frozen.)
 
1982
 
 
1983
    This allows the newly spawned Python interpreter to safely import the module
 
1984
    and then run the module's ``foo()`` function.
 
1985
 
 
1986
    Similar restrictions apply if a pool or manager is created in the main
 
1987
    module.
 
1988
 
 
1989
 
 
1990
.. _multiprocessing-examples:
 
1991
 
 
1992
Examples
 
1993
--------
 
1994
 
 
1995
Demonstration of how to create and use customized managers and proxies:
 
1996
 
 
1997
.. literalinclude:: ../includes/mp_newtype.py
 
1998
 
 
1999
 
 
2000
Using :class:`Pool`:
 
2001
 
 
2002
.. literalinclude:: ../includes/mp_pool.py
 
2003
 
 
2004
 
 
2005
Synchronization types like locks, conditions and queues:
 
2006
 
 
2007
.. literalinclude:: ../includes/mp_synchronize.py
 
2008
 
 
2009
 
 
2010
An showing how to use queues to feed tasks to a collection of worker process and
 
2011
collect the results:
 
2012
 
 
2013
.. literalinclude:: ../includes/mp_workers.py
 
2014
 
 
2015
 
 
2016
An example of how a pool of worker processes can each run a
 
2017
:class:`SimpleHTTPServer.HttpServer` instance while sharing a single listening
 
2018
socket.
 
2019
 
 
2020
.. literalinclude:: ../includes/mp_webserver.py
 
2021
 
 
2022
 
 
2023
Some simple benchmarks comparing :mod:`multiprocessing` with :mod:`threading`:
 
2024
 
 
2025
.. literalinclude:: ../includes/mp_benchmarks.py
 
2026
 
 
2027
An example/demo of how to use the :class:`managers.SyncManager`, :class:`Process`
 
2028
and others to build a system which can distribute processes and work via a 
 
2029
distributed queue to a "cluster" of machines on a network, accessible via SSH.
 
2030
You will need to have private key authentication for all hosts configured for
 
2031
this to work.
 
2032
 
 
2033
.. literalinclude:: ../includes/mp_distributing.py