~pythonregexp2.7/python/issue2636-11

« back to all changes in this revision

Viewing changes to Lib/multiprocessing/connection.py

  • Committer: Jeffrey C. "The TimeHorse" Jacobs
  • Date: 2008-09-21 13:47:31 UTC
  • mfrom: (39021.1.404 Regexp-2.7)
  • mto: This revision was merged to the branch mainline in revision 39030.
  • Revision ID: darklord@timehorse.com-20080921134731-rudomuzeh1b2tz1y
Merged in changes from the latest python source snapshot.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
#
 
2
# A higher level module for using sockets (or Windows named pipes)
 
3
#
 
4
# multiprocessing/connection.py
 
5
#
 
6
# Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt
 
7
#
 
8
 
 
9
__all__ = [ 'Client', 'Listener', 'Pipe' ]
 
10
 
 
11
import os
 
12
import sys
 
13
import socket
 
14
import errno
 
15
import time
 
16
import tempfile
 
17
import itertools
 
18
 
 
19
import _multiprocessing
 
20
from multiprocessing import current_process, AuthenticationError
 
21
from multiprocessing.util import get_temp_dir, Finalize, sub_debug, debug
 
22
from multiprocessing.forking import duplicate, close
 
23
 
 
24
 
 
25
#
 
26
#
 
27
#
 
28
 
 
29
BUFSIZE = 8192
 
30
 
 
31
_mmap_counter = itertools.count()
 
32
 
 
33
default_family = 'AF_INET'
 
34
families = ['AF_INET']
 
35
 
 
36
if hasattr(socket, 'AF_UNIX'):
 
37
    default_family = 'AF_UNIX'
 
38
    families += ['AF_UNIX']
 
39
 
 
40
if sys.platform == 'win32':
 
41
    default_family = 'AF_PIPE'
 
42
    families += ['AF_PIPE']
 
43
 
 
44
#
 
45
#
 
46
#
 
47
 
 
48
def arbitrary_address(family):
 
49
    '''
 
50
    Return an arbitrary free address for the given family
 
51
    '''
 
52
    if family == 'AF_INET':
 
53
        return ('localhost', 0)
 
54
    elif family == 'AF_UNIX':
 
55
        return tempfile.mktemp(prefix='listener-', dir=get_temp_dir())
 
56
    elif family == 'AF_PIPE':
 
57
        return tempfile.mktemp(prefix=r'\\.\pipe\pyc-%d-%d-' %
 
58
                               (os.getpid(), _mmap_counter.next()))
 
59
    else:
 
60
        raise ValueError('unrecognized family')
 
61
 
 
62
 
 
63
def address_type(address):
 
64
    '''
 
65
    Return the types of the address
 
66
 
 
67
    This can be 'AF_INET', 'AF_UNIX', or 'AF_PIPE'
 
68
    '''
 
69
    if type(address) == tuple:
 
70
        return 'AF_INET'
 
71
    elif type(address) is str and address.startswith('\\\\'):
 
72
        return 'AF_PIPE'
 
73
    elif type(address) is str:
 
74
        return 'AF_UNIX'
 
75
    else:
 
76
        raise ValueError('address type of %r unrecognized' % address)
 
77
 
 
78
#
 
79
# Public functions
 
80
#
 
81
 
 
82
class Listener(object):
 
83
    '''
 
84
    Returns a listener object.
 
85
 
 
86
    This is a wrapper for a bound socket which is 'listening' for
 
87
    connections, or for a Windows named pipe.
 
88
    '''
 
89
    def __init__(self, address=None, family=None, backlog=1, authkey=None):
 
90
        family = family or (address and address_type(address)) \
 
91
                 or default_family
 
92
        address = address or arbitrary_address(family)
 
93
 
 
94
        if family == 'AF_PIPE':
 
95
            self._listener = PipeListener(address, backlog)
 
96
        else:
 
97
            self._listener = SocketListener(address, family, backlog)
 
98
 
 
99
        if authkey is not None and not isinstance(authkey, bytes):
 
100
            raise TypeError, 'authkey should be a byte string'
 
101
 
 
102
        self._authkey = authkey
 
103
 
 
104
    def accept(self):
 
105
        '''
 
106
        Accept a connection on the bound socket or named pipe of `self`.
 
107
 
 
108
        Returns a `Connection` object.
 
109
        '''
 
110
        c = self._listener.accept()
 
111
        if self._authkey:
 
112
            deliver_challenge(c, self._authkey)
 
113
            answer_challenge(c, self._authkey)
 
114
        return c
 
115
 
 
116
    def close(self):
 
117
        '''
 
118
        Close the bound socket or named pipe of `self`.
 
119
        '''
 
120
        return self._listener.close()
 
121
 
 
122
    address = property(lambda self: self._listener._address)
 
123
    last_accepted = property(lambda self: self._listener._last_accepted)
 
124
 
 
125
 
 
126
def Client(address, family=None, authkey=None):
 
127
    '''
 
128
    Returns a connection to the address of a `Listener`
 
129
    '''
 
130
    family = family or address_type(address)
 
131
    if family == 'AF_PIPE':
 
132
        c = PipeClient(address)
 
133
    else:
 
134
        c = SocketClient(address)
 
135
 
 
136
    if authkey is not None and not isinstance(authkey, bytes):
 
137
        raise TypeError, 'authkey should be a byte string'
 
138
 
 
139
    if authkey is not None:
 
140
        answer_challenge(c, authkey)
 
141
        deliver_challenge(c, authkey)
 
142
 
 
143
    return c
 
144
 
 
145
 
 
146
if sys.platform != 'win32':
 
147
 
 
148
    def Pipe(duplex=True):
 
149
        '''
 
150
        Returns pair of connection objects at either end of a pipe
 
151
        '''
 
152
        if duplex:
 
153
            s1, s2 = socket.socketpair()
 
154
            c1 = _multiprocessing.Connection(os.dup(s1.fileno()))
 
155
            c2 = _multiprocessing.Connection(os.dup(s2.fileno()))
 
156
            s1.close()
 
157
            s2.close()
 
158
        else:
 
159
            fd1, fd2 = os.pipe()
 
160
            c1 = _multiprocessing.Connection(fd1, writable=False)
 
161
            c2 = _multiprocessing.Connection(fd2, readable=False)
 
162
 
 
163
        return c1, c2
 
164
 
 
165
else:
 
166
 
 
167
    from ._multiprocessing import win32
 
168
 
 
169
    def Pipe(duplex=True):
 
170
        '''
 
171
        Returns pair of connection objects at either end of a pipe
 
172
        '''
 
173
        address = arbitrary_address('AF_PIPE')
 
174
        if duplex:
 
175
            openmode = win32.PIPE_ACCESS_DUPLEX
 
176
            access = win32.GENERIC_READ | win32.GENERIC_WRITE
 
177
            obsize, ibsize = BUFSIZE, BUFSIZE
 
178
        else:
 
179
            openmode = win32.PIPE_ACCESS_INBOUND
 
180
            access = win32.GENERIC_WRITE
 
181
            obsize, ibsize = 0, BUFSIZE
 
182
 
 
183
        h1 = win32.CreateNamedPipe(
 
184
            address, openmode,
 
185
            win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE |
 
186
            win32.PIPE_WAIT,
 
187
            1, obsize, ibsize, win32.NMPWAIT_WAIT_FOREVER, win32.NULL
 
188
            )
 
189
        h2 = win32.CreateFile(
 
190
            address, access, 0, win32.NULL, win32.OPEN_EXISTING, 0, win32.NULL
 
191
            )
 
192
        win32.SetNamedPipeHandleState(
 
193
            h2, win32.PIPE_READMODE_MESSAGE, None, None
 
194
            )
 
195
 
 
196
        try:
 
197
            win32.ConnectNamedPipe(h1, win32.NULL)
 
198
        except WindowsError, e:
 
199
            if e.args[0] != win32.ERROR_PIPE_CONNECTED:
 
200
                raise
 
201
 
 
202
        c1 = _multiprocessing.PipeConnection(h1, writable=duplex)
 
203
        c2 = _multiprocessing.PipeConnection(h2, readable=duplex)
 
204
 
 
205
        return c1, c2
 
206
 
 
207
#
 
208
# Definitions for connections based on sockets
 
209
#
 
210
 
 
211
class SocketListener(object):
 
212
    '''
 
213
    Representation of a socket which is bound to an address and listening
 
214
    '''
 
215
    def __init__(self, address, family, backlog=1):
 
216
        self._socket = socket.socket(getattr(socket, family))
 
217
        self._socket.bind(address)
 
218
        self._socket.listen(backlog)
 
219
        self._address = self._socket.getsockname()
 
220
        self._family = family
 
221
        self._last_accepted = None
 
222
 
 
223
        if family == 'AF_UNIX':
 
224
            self._unlink = Finalize(
 
225
                self, os.unlink, args=(address,), exitpriority=0
 
226
                )
 
227
        else:
 
228
            self._unlink = None
 
229
 
 
230
    def accept(self):
 
231
        s, self._last_accepted = self._socket.accept()
 
232
        fd = duplicate(s.fileno())
 
233
        conn = _multiprocessing.Connection(fd)
 
234
        s.close()
 
235
        return conn
 
236
 
 
237
    def close(self):
 
238
        self._socket.close()
 
239
        if self._unlink is not None:
 
240
            self._unlink()
 
241
 
 
242
 
 
243
def SocketClient(address):
 
244
    '''
 
245
    Return a connection object connected to the socket given by `address`
 
246
    '''
 
247
    family = address_type(address)
 
248
    s = socket.socket( getattr(socket, family) )
 
249
 
 
250
    while 1:
 
251
        try:
 
252
            s.connect(address)
 
253
        except socket.error, e:
 
254
            if e.args[0] != errno.ECONNREFUSED: # connection refused
 
255
                debug('failed to connect to address %s', address)
 
256
                raise
 
257
            time.sleep(0.01)
 
258
        else:
 
259
            break
 
260
    else:
 
261
        raise
 
262
 
 
263
    fd = duplicate(s.fileno())
 
264
    conn = _multiprocessing.Connection(fd)
 
265
    s.close()
 
266
    return conn
 
267
 
 
268
#
 
269
# Definitions for connections based on named pipes
 
270
#
 
271
 
 
272
if sys.platform == 'win32':
 
273
 
 
274
    class PipeListener(object):
 
275
        '''
 
276
        Representation of a named pipe
 
277
        '''
 
278
        def __init__(self, address, backlog=None):
 
279
            self._address = address
 
280
            handle = win32.CreateNamedPipe(
 
281
                address, win32.PIPE_ACCESS_DUPLEX,
 
282
                win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE |
 
283
                win32.PIPE_WAIT,
 
284
                win32.PIPE_UNLIMITED_INSTANCES, BUFSIZE, BUFSIZE,
 
285
                win32.NMPWAIT_WAIT_FOREVER, win32.NULL
 
286
                )
 
287
            self._handle_queue = [handle]
 
288
            self._last_accepted = None
 
289
 
 
290
            sub_debug('listener created with address=%r', self._address)
 
291
 
 
292
            self.close = Finalize(
 
293
                self, PipeListener._finalize_pipe_listener,
 
294
                args=(self._handle_queue, self._address), exitpriority=0
 
295
                )
 
296
 
 
297
        def accept(self):
 
298
            newhandle = win32.CreateNamedPipe(
 
299
                self._address, win32.PIPE_ACCESS_DUPLEX,
 
300
                win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE |
 
301
                win32.PIPE_WAIT,
 
302
                win32.PIPE_UNLIMITED_INSTANCES, BUFSIZE, BUFSIZE,
 
303
                win32.NMPWAIT_WAIT_FOREVER, win32.NULL
 
304
                )
 
305
            self._handle_queue.append(newhandle)
 
306
            handle = self._handle_queue.pop(0)
 
307
            try:
 
308
                win32.ConnectNamedPipe(handle, win32.NULL)
 
309
            except WindowsError, e:
 
310
                if e.args[0] != win32.ERROR_PIPE_CONNECTED:
 
311
                    raise
 
312
            return _multiprocessing.PipeConnection(handle)
 
313
 
 
314
        @staticmethod
 
315
        def _finalize_pipe_listener(queue, address):
 
316
            sub_debug('closing listener with address=%r', address)
 
317
            for handle in queue:
 
318
                close(handle)
 
319
 
 
320
    def PipeClient(address):
 
321
        '''
 
322
        Return a connection object connected to the pipe given by `address`
 
323
        '''
 
324
        while 1:
 
325
            try:
 
326
                win32.WaitNamedPipe(address, 1000)
 
327
                h = win32.CreateFile(
 
328
                    address, win32.GENERIC_READ | win32.GENERIC_WRITE,
 
329
                    0, win32.NULL, win32.OPEN_EXISTING, 0, win32.NULL
 
330
                    )
 
331
            except WindowsError, e:
 
332
                if e.args[0] not in (win32.ERROR_SEM_TIMEOUT,
 
333
                                     win32.ERROR_PIPE_BUSY):
 
334
                    raise
 
335
            else:
 
336
                break
 
337
        else:
 
338
            raise
 
339
 
 
340
        win32.SetNamedPipeHandleState(
 
341
            h, win32.PIPE_READMODE_MESSAGE, None, None
 
342
            )
 
343
        return _multiprocessing.PipeConnection(h)
 
344
 
 
345
#
 
346
# Authentication stuff
 
347
#
 
348
 
 
349
MESSAGE_LENGTH = 20
 
350
 
 
351
CHALLENGE = b'#CHALLENGE#'
 
352
WELCOME = b'#WELCOME#'
 
353
FAILURE = b'#FAILURE#'
 
354
 
 
355
def deliver_challenge(connection, authkey):
 
356
    import hmac
 
357
    assert isinstance(authkey, bytes)
 
358
    message = os.urandom(MESSAGE_LENGTH)
 
359
    connection.send_bytes(CHALLENGE + message)
 
360
    digest = hmac.new(authkey, message).digest()
 
361
    response = connection.recv_bytes(256)        # reject large message
 
362
    if response == digest:
 
363
        connection.send_bytes(WELCOME)
 
364
    else:
 
365
        connection.send_bytes(FAILURE)
 
366
        raise AuthenticationError('digest received was wrong')
 
367
 
 
368
def answer_challenge(connection, authkey):
 
369
    import hmac
 
370
    assert isinstance(authkey, bytes)
 
371
    message = connection.recv_bytes(256)         # reject large message
 
372
    assert message[:len(CHALLENGE)] == CHALLENGE, 'message = %r' % message
 
373
    message = message[len(CHALLENGE):]
 
374
    digest = hmac.new(authkey, message).digest()
 
375
    connection.send_bytes(digest)
 
376
    response = connection.recv_bytes(256)        # reject large message
 
377
    if response != WELCOME:
 
378
        raise AuthenticationError('digest sent was rejected')
 
379
 
 
380
#
 
381
# Support for using xmlrpclib for serialization
 
382
#
 
383
 
 
384
class ConnectionWrapper(object):
 
385
    def __init__(self, conn, dumps, loads):
 
386
        self._conn = conn
 
387
        self._dumps = dumps
 
388
        self._loads = loads
 
389
        for attr in ('fileno', 'close', 'poll', 'recv_bytes', 'send_bytes'):
 
390
            obj = getattr(conn, attr)
 
391
            setattr(self, attr, obj)
 
392
    def send(self, obj):
 
393
        s = self._dumps(obj)
 
394
        self._conn.send_bytes(s)
 
395
    def recv(self):
 
396
        s = self._conn.recv_bytes()
 
397
        return self._loads(s)
 
398
 
 
399
def _xml_dumps(obj):
 
400
    return xmlrpclib.dumps((obj,), None, None, None, 1).encode('utf8')
 
401
 
 
402
def _xml_loads(s):
 
403
    (obj,), method = xmlrpclib.loads(s.decode('utf8'))
 
404
    return obj
 
405
 
 
406
class XmlListener(Listener):
 
407
    def accept(self):
 
408
        global xmlrpclib
 
409
        import xmlrpclib
 
410
        obj = Listener.accept(self)
 
411
        return ConnectionWrapper(obj, _xml_dumps, _xml_loads)
 
412
 
 
413
def XmlClient(*args, **kwds):
 
414
    global xmlrpclib
 
415
    import xmlrpclib
 
416
    return ConnectionWrapper(Client(*args, **kwds), _xml_dumps, _xml_loads)