~pythonregexp2.7/python/issue2636-19

« back to all changes in this revision

Viewing changes to Lib/multiprocessing/reduction.py

  • Committer: Jeffrey C. "The TimeHorse" Jacobs
  • Date: 2008-09-21 13:47:31 UTC
  • mfrom: (39021.1.404 Regexp-2.7)
  • Revision ID: darklord@timehorse.com-20080921134731-rudomuzeh1b2tz1y
Merged in changes from the latest python source snapshot.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
#
 
2
# Module to allow connection and socket objects to be transferred
 
3
# between processes
 
4
#
 
5
# multiprocessing/reduction.py
 
6
#
 
7
# Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt
 
8
#
 
9
 
 
10
__all__ = []
 
11
 
 
12
import os
 
13
import sys
 
14
import socket
 
15
import threading
 
16
 
 
17
import _multiprocessing
 
18
from multiprocessing import current_process
 
19
from multiprocessing.forking import Popen, duplicate, close, ForkingPickler
 
20
from multiprocessing.util import register_after_fork, debug, sub_debug
 
21
from multiprocessing.connection import Client, Listener
 
22
 
 
23
 
 
24
#
 
25
#
 
26
#
 
27
 
 
28
if not(sys.platform == 'win32' or hasattr(_multiprocessing, 'recvfd')):
 
29
    raise ImportError('pickling of connections not supported')
 
30
 
 
31
#
 
32
# Platform specific definitions
 
33
#
 
34
 
 
35
if sys.platform == 'win32':
 
36
    import _subprocess
 
37
    from ._multiprocessing import win32
 
38
 
 
39
    def send_handle(conn, handle, destination_pid):
 
40
        process_handle = win32.OpenProcess(
 
41
            win32.PROCESS_ALL_ACCESS, False, destination_pid
 
42
            )
 
43
        try:
 
44
            new_handle = duplicate(handle, process_handle)
 
45
            conn.send(new_handle)
 
46
        finally:
 
47
            close(process_handle)
 
48
 
 
49
    def recv_handle(conn):
 
50
        return conn.recv()
 
51
 
 
52
else:
 
53
    def send_handle(conn, handle, destination_pid):
 
54
        _multiprocessing.sendfd(conn.fileno(), handle)
 
55
 
 
56
    def recv_handle(conn):
 
57
        return _multiprocessing.recvfd(conn.fileno())
 
58
 
 
59
#
 
60
# Support for a per-process server thread which caches pickled handles
 
61
#
 
62
 
 
63
_cache = set()
 
64
 
 
65
def _reset(obj):
 
66
    global _lock, _listener, _cache
 
67
    for h in _cache:
 
68
        close(h)
 
69
    _cache.clear()
 
70
    _lock = threading.Lock()
 
71
    _listener = None
 
72
 
 
73
_reset(None)
 
74
register_after_fork(_reset, _reset)
 
75
 
 
76
def _get_listener():
 
77
    global _listener
 
78
 
 
79
    if _listener is None:
 
80
        _lock.acquire()
 
81
        try:
 
82
            if _listener is None:
 
83
                debug('starting listener and thread for sending handles')
 
84
                _listener = Listener(authkey=current_process().authkey)
 
85
                t = threading.Thread(target=_serve)
 
86
                t.daemon = True
 
87
                t.start()
 
88
        finally:
 
89
            _lock.release()
 
90
 
 
91
    return _listener
 
92
 
 
93
def _serve():
 
94
    from .util import is_exiting, sub_warning
 
95
 
 
96
    while 1:
 
97
        try:
 
98
            conn = _listener.accept()
 
99
            handle_wanted, destination_pid = conn.recv()
 
100
            _cache.remove(handle_wanted)
 
101
            send_handle(conn, handle_wanted, destination_pid)
 
102
            close(handle_wanted)
 
103
            conn.close()
 
104
        except:
 
105
            if not is_exiting():
 
106
                import traceback
 
107
                sub_warning(
 
108
                    'thread for sharing handles raised exception :\n' +
 
109
                    '-'*79 + '\n' + traceback.format_exc() + '-'*79
 
110
                    )
 
111
 
 
112
#
 
113
# Functions to be used for pickling/unpickling objects with handles
 
114
#
 
115
 
 
116
def reduce_handle(handle):
 
117
    if Popen.thread_is_spawning():
 
118
        return (None, Popen.duplicate_for_child(handle), True)
 
119
    dup_handle = duplicate(handle)
 
120
    _cache.add(dup_handle)
 
121
    sub_debug('reducing handle %d', handle)
 
122
    return (_get_listener().address, dup_handle, False)
 
123
 
 
124
def rebuild_handle(pickled_data):
 
125
    address, handle, inherited = pickled_data
 
126
    if inherited:
 
127
        return handle
 
128
    sub_debug('rebuilding handle %d', handle)
 
129
    conn = Client(address, authkey=current_process().authkey)
 
130
    conn.send((handle, os.getpid()))
 
131
    new_handle = recv_handle(conn)
 
132
    conn.close()
 
133
    return new_handle
 
134
 
 
135
#
 
136
# Register `_multiprocessing.Connection` with `ForkingPickler`
 
137
#
 
138
 
 
139
def reduce_connection(conn):
 
140
    rh = reduce_handle(conn.fileno())
 
141
    return rebuild_connection, (rh, conn.readable, conn.writable)
 
142
 
 
143
def rebuild_connection(reduced_handle, readable, writable):
 
144
    handle = rebuild_handle(reduced_handle)
 
145
    return _multiprocessing.Connection(
 
146
        handle, readable=readable, writable=writable
 
147
        )
 
148
 
 
149
ForkingPickler.register(_multiprocessing.Connection, reduce_connection)
 
150
 
 
151
#
 
152
# Register `socket.socket` with `ForkingPickler`
 
153
#
 
154
 
 
155
def fromfd(fd, family, type_, proto=0):
 
156
    s = socket.fromfd(fd, family, type_, proto)
 
157
    if s.__class__ is not socket.socket:
 
158
        s = socket.socket(_sock=s)
 
159
    return s
 
160
 
 
161
def reduce_socket(s):
 
162
    reduced_handle = reduce_handle(s.fileno())
 
163
    return rebuild_socket, (reduced_handle, s.family, s.type, s.proto)
 
164
 
 
165
def rebuild_socket(reduced_handle, family, type_, proto):
 
166
    fd = rebuild_handle(reduced_handle)
 
167
    _sock = fromfd(fd, family, type_, proto)
 
168
    close(fd)
 
169
    return _sock
 
170
 
 
171
ForkingPickler.register(socket.socket, reduce_socket)
 
172
 
 
173
#
 
174
# Register `_multiprocessing.PipeConnection` with `ForkingPickler`
 
175
#
 
176
 
 
177
if sys.platform == 'win32':
 
178
 
 
179
    def reduce_pipe_connection(conn):
 
180
        rh = reduce_handle(conn.fileno())
 
181
        return rebuild_pipe_connection, (rh, conn.readable, conn.writable)
 
182
 
 
183
    def rebuild_pipe_connection(reduced_handle, readable, writable):
 
184
        handle = rebuild_handle(reduced_handle)
 
185
        return _multiprocessing.PipeConnection(
 
186
            handle, readable=readable, writable=writable
 
187
            )
 
188
 
 
189
    ForkingPickler.register(_multiprocessing.PipeConnection, reduce_pipe_connection)