2
# A higher level module for using sockets (or Windows named pipes)
4
# multiprocessing/connection.py
6
# Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt
9
__all__ = [ 'Client', 'Listener', 'Pipe' ]
19
import _multiprocessing
20
from multiprocessing import current_process, AuthenticationError
21
from multiprocessing.util import get_temp_dir, Finalize, sub_debug, debug
22
from multiprocessing.forking import duplicate, close
31
_mmap_counter = itertools.count()
33
default_family = 'AF_INET'
34
families = ['AF_INET']
36
if hasattr(socket, 'AF_UNIX'):
37
default_family = 'AF_UNIX'
38
families += ['AF_UNIX']
40
if sys.platform == 'win32':
41
default_family = 'AF_PIPE'
42
families += ['AF_PIPE']
48
def arbitrary_address(family):
50
Return an arbitrary free address for the given family
52
if family == 'AF_INET':
53
return ('localhost', 0)
54
elif family == 'AF_UNIX':
55
return tempfile.mktemp(prefix='listener-', dir=get_temp_dir())
56
elif family == 'AF_PIPE':
57
return tempfile.mktemp(prefix=r'\\.\pipe\pyc-%d-%d-' %
58
(os.getpid(), _mmap_counter.next()))
60
raise ValueError('unrecognized family')
63
def address_type(address):
65
Return the types of the address
67
This can be 'AF_INET', 'AF_UNIX', or 'AF_PIPE'
69
if type(address) == tuple:
71
elif type(address) is str and address.startswith('\\\\'):
73
elif type(address) is str:
76
raise ValueError('address type of %r unrecognized' % address)
82
class Listener(object):
84
Returns a listener object.
86
This is a wrapper for a bound socket which is 'listening' for
87
connections, or for a Windows named pipe.
89
def __init__(self, address=None, family=None, backlog=1, authkey=None):
90
family = family or (address and address_type(address)) \
92
address = address or arbitrary_address(family)
94
if family == 'AF_PIPE':
95
self._listener = PipeListener(address, backlog)
97
self._listener = SocketListener(address, family, backlog)
99
if authkey is not None and not isinstance(authkey, bytes):
100
raise TypeError, 'authkey should be a byte string'
102
self._authkey = authkey
106
Accept a connection on the bound socket or named pipe of `self`.
108
Returns a `Connection` object.
110
c = self._listener.accept()
112
deliver_challenge(c, self._authkey)
113
answer_challenge(c, self._authkey)
118
Close the bound socket or named pipe of `self`.
120
return self._listener.close()
122
address = property(lambda self: self._listener._address)
123
last_accepted = property(lambda self: self._listener._last_accepted)
126
def Client(address, family=None, authkey=None):
128
Returns a connection to the address of a `Listener`
130
family = family or address_type(address)
131
if family == 'AF_PIPE':
132
c = PipeClient(address)
134
c = SocketClient(address)
136
if authkey is not None and not isinstance(authkey, bytes):
137
raise TypeError, 'authkey should be a byte string'
139
if authkey is not None:
140
answer_challenge(c, authkey)
141
deliver_challenge(c, authkey)
146
if sys.platform != 'win32':
148
def Pipe(duplex=True):
150
Returns pair of connection objects at either end of a pipe
153
s1, s2 = socket.socketpair()
154
c1 = _multiprocessing.Connection(os.dup(s1.fileno()))
155
c2 = _multiprocessing.Connection(os.dup(s2.fileno()))
160
c1 = _multiprocessing.Connection(fd1, writable=False)
161
c2 = _multiprocessing.Connection(fd2, readable=False)
167
from ._multiprocessing import win32
169
def Pipe(duplex=True):
171
Returns pair of connection objects at either end of a pipe
173
address = arbitrary_address('AF_PIPE')
175
openmode = win32.PIPE_ACCESS_DUPLEX
176
access = win32.GENERIC_READ | win32.GENERIC_WRITE
177
obsize, ibsize = BUFSIZE, BUFSIZE
179
openmode = win32.PIPE_ACCESS_INBOUND
180
access = win32.GENERIC_WRITE
181
obsize, ibsize = 0, BUFSIZE
183
h1 = win32.CreateNamedPipe(
185
win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE |
187
1, obsize, ibsize, win32.NMPWAIT_WAIT_FOREVER, win32.NULL
189
h2 = win32.CreateFile(
190
address, access, 0, win32.NULL, win32.OPEN_EXISTING, 0, win32.NULL
192
win32.SetNamedPipeHandleState(
193
h2, win32.PIPE_READMODE_MESSAGE, None, None
197
win32.ConnectNamedPipe(h1, win32.NULL)
198
except WindowsError, e:
199
if e.args[0] != win32.ERROR_PIPE_CONNECTED:
202
c1 = _multiprocessing.PipeConnection(h1, writable=duplex)
203
c2 = _multiprocessing.PipeConnection(h2, readable=duplex)
208
# Definitions for connections based on sockets
211
class SocketListener(object):
213
Representation of a socket which is bound to an address and listening
215
def __init__(self, address, family, backlog=1):
216
self._socket = socket.socket(getattr(socket, family))
217
self._socket.bind(address)
218
self._socket.listen(backlog)
219
self._address = self._socket.getsockname()
220
self._family = family
221
self._last_accepted = None
223
if family == 'AF_UNIX':
224
self._unlink = Finalize(
225
self, os.unlink, args=(address,), exitpriority=0
231
s, self._last_accepted = self._socket.accept()
232
fd = duplicate(s.fileno())
233
conn = _multiprocessing.Connection(fd)
239
if self._unlink is not None:
243
def SocketClient(address):
245
Return a connection object connected to the socket given by `address`
247
family = address_type(address)
248
s = socket.socket( getattr(socket, family) )
253
except socket.error, e:
254
if e.args[0] != errno.ECONNREFUSED: # connection refused
255
debug('failed to connect to address %s', address)
263
fd = duplicate(s.fileno())
264
conn = _multiprocessing.Connection(fd)
269
# Definitions for connections based on named pipes
272
if sys.platform == 'win32':
274
class PipeListener(object):
276
Representation of a named pipe
278
def __init__(self, address, backlog=None):
279
self._address = address
280
handle = win32.CreateNamedPipe(
281
address, win32.PIPE_ACCESS_DUPLEX,
282
win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE |
284
win32.PIPE_UNLIMITED_INSTANCES, BUFSIZE, BUFSIZE,
285
win32.NMPWAIT_WAIT_FOREVER, win32.NULL
287
self._handle_queue = [handle]
288
self._last_accepted = None
290
sub_debug('listener created with address=%r', self._address)
292
self.close = Finalize(
293
self, PipeListener._finalize_pipe_listener,
294
args=(self._handle_queue, self._address), exitpriority=0
298
newhandle = win32.CreateNamedPipe(
299
self._address, win32.PIPE_ACCESS_DUPLEX,
300
win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE |
302
win32.PIPE_UNLIMITED_INSTANCES, BUFSIZE, BUFSIZE,
303
win32.NMPWAIT_WAIT_FOREVER, win32.NULL
305
self._handle_queue.append(newhandle)
306
handle = self._handle_queue.pop(0)
308
win32.ConnectNamedPipe(handle, win32.NULL)
309
except WindowsError, e:
310
if e.args[0] != win32.ERROR_PIPE_CONNECTED:
312
return _multiprocessing.PipeConnection(handle)
315
def _finalize_pipe_listener(queue, address):
316
sub_debug('closing listener with address=%r', address)
320
def PipeClient(address):
322
Return a connection object connected to the pipe given by `address`
326
win32.WaitNamedPipe(address, 1000)
327
h = win32.CreateFile(
328
address, win32.GENERIC_READ | win32.GENERIC_WRITE,
329
0, win32.NULL, win32.OPEN_EXISTING, 0, win32.NULL
331
except WindowsError, e:
332
if e.args[0] not in (win32.ERROR_SEM_TIMEOUT,
333
win32.ERROR_PIPE_BUSY):
340
win32.SetNamedPipeHandleState(
341
h, win32.PIPE_READMODE_MESSAGE, None, None
343
return _multiprocessing.PipeConnection(h)
346
# Authentication stuff
351
CHALLENGE = b'#CHALLENGE#'
352
WELCOME = b'#WELCOME#'
353
FAILURE = b'#FAILURE#'
355
def deliver_challenge(connection, authkey):
357
assert isinstance(authkey, bytes)
358
message = os.urandom(MESSAGE_LENGTH)
359
connection.send_bytes(CHALLENGE + message)
360
digest = hmac.new(authkey, message).digest()
361
response = connection.recv_bytes(256) # reject large message
362
if response == digest:
363
connection.send_bytes(WELCOME)
365
connection.send_bytes(FAILURE)
366
raise AuthenticationError('digest received was wrong')
368
def answer_challenge(connection, authkey):
370
assert isinstance(authkey, bytes)
371
message = connection.recv_bytes(256) # reject large message
372
assert message[:len(CHALLENGE)] == CHALLENGE, 'message = %r' % message
373
message = message[len(CHALLENGE):]
374
digest = hmac.new(authkey, message).digest()
375
connection.send_bytes(digest)
376
response = connection.recv_bytes(256) # reject large message
377
if response != WELCOME:
378
raise AuthenticationError('digest sent was rejected')
381
# Support for using xmlrpclib for serialization
384
class ConnectionWrapper(object):
385
def __init__(self, conn, dumps, loads):
389
for attr in ('fileno', 'close', 'poll', 'recv_bytes', 'send_bytes'):
390
obj = getattr(conn, attr)
391
setattr(self, attr, obj)
394
self._conn.send_bytes(s)
396
s = self._conn.recv_bytes()
397
return self._loads(s)
400
return xmlrpclib.dumps((obj,), None, None, None, 1).encode('utf8')
403
(obj,), method = xmlrpclib.loads(s.decode('utf8'))
406
class XmlListener(Listener):
410
obj = Listener.accept(self)
411
return ConnectionWrapper(obj, _xml_dumps, _xml_loads)
413
def XmlClient(*args, **kwds):
416
return ConnectionWrapper(Client(*args, **kwds), _xml_dumps, _xml_loads)