~ubuntu-branches/ubuntu/trusty/python3.4/trusty-proposed

« back to all changes in this revision

Viewing changes to Lib/multiprocessing/connection.py

  • Committer: Package Import Robot
  • Author(s): Matthias Klose
  • Date: 2013-11-25 09:44:27 UTC
  • Revision ID: package-import@ubuntu.com-20131125094427-lzxj8ap5w01lmo7f
Tags: upstream-3.4~b1
ImportĀ upstreamĀ versionĀ 3.4~b1

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
#
 
2
# A higher level module for using sockets (or Windows named pipes)
 
3
#
 
4
# multiprocessing/connection.py
 
5
#
 
6
# Copyright (c) 2006-2008, R Oudkerk
 
7
# Licensed to PSF under a Contributor Agreement.
 
8
#
 
9
 
 
10
__all__ = [ 'Client', 'Listener', 'Pipe', 'wait' ]
 
11
 
 
12
import io
 
13
import os
 
14
import sys
 
15
import socket
 
16
import struct
 
17
import errno
 
18
import time
 
19
import tempfile
 
20
import itertools
 
21
 
 
22
import _multiprocessing
 
23
 
 
24
from . import reduction
 
25
from . import util
 
26
 
 
27
from . import AuthenticationError, BufferTooShort
 
28
from .reduction import ForkingPickler
 
29
 
 
30
try:
 
31
    import _winapi
 
32
    from _winapi import WAIT_OBJECT_0, WAIT_TIMEOUT, INFINITE
 
33
except ImportError:
 
34
    if sys.platform == 'win32':
 
35
        raise
 
36
    _winapi = None
 
37
 
 
38
#
 
39
#
 
40
#
 
41
 
 
42
BUFSIZE = 8192
 
43
# A very generous timeout when it comes to local connections...
 
44
CONNECTION_TIMEOUT = 20.
 
45
 
 
46
_mmap_counter = itertools.count()
 
47
 
 
48
default_family = 'AF_INET'
 
49
families = ['AF_INET']
 
50
 
 
51
if hasattr(socket, 'AF_UNIX'):
 
52
    default_family = 'AF_UNIX'
 
53
    families += ['AF_UNIX']
 
54
 
 
55
if sys.platform == 'win32':
 
56
    default_family = 'AF_PIPE'
 
57
    families += ['AF_PIPE']
 
58
 
 
59
 
 
60
def _init_timeout(timeout=CONNECTION_TIMEOUT):
 
61
    return time.time() + timeout
 
62
 
 
63
def _check_timeout(t):
 
64
    return time.time() > t
 
65
 
 
66
#
 
67
#
 
68
#
 
69
 
 
70
def arbitrary_address(family):
 
71
    '''
 
72
    Return an arbitrary free address for the given family
 
73
    '''
 
74
    if family == 'AF_INET':
 
75
        return ('localhost', 0)
 
76
    elif family == 'AF_UNIX':
 
77
        return tempfile.mktemp(prefix='listener-', dir=util.get_temp_dir())
 
78
    elif family == 'AF_PIPE':
 
79
        return tempfile.mktemp(prefix=r'\\.\pipe\pyc-%d-%d-' %
 
80
                               (os.getpid(), next(_mmap_counter)))
 
81
    else:
 
82
        raise ValueError('unrecognized family')
 
83
 
 
84
def _validate_family(family):
 
85
    '''
 
86
    Checks if the family is valid for the current environment.
 
87
    '''
 
88
    if sys.platform != 'win32' and family == 'AF_PIPE':
 
89
        raise ValueError('Family %s is not recognized.' % family)
 
90
 
 
91
    if sys.platform == 'win32' and family == 'AF_UNIX':
 
92
        # double check
 
93
        if not hasattr(socket, family):
 
94
            raise ValueError('Family %s is not recognized.' % family)
 
95
 
 
96
def address_type(address):
 
97
    '''
 
98
    Return the types of the address
 
99
 
 
100
    This can be 'AF_INET', 'AF_UNIX', or 'AF_PIPE'
 
101
    '''
 
102
    if type(address) == tuple:
 
103
        return 'AF_INET'
 
104
    elif type(address) is str and address.startswith('\\\\'):
 
105
        return 'AF_PIPE'
 
106
    elif type(address) is str:
 
107
        return 'AF_UNIX'
 
108
    else:
 
109
        raise ValueError('address type of %r unrecognized' % address)
 
110
 
 
111
#
 
112
# Connection classes
 
113
#
 
114
 
 
115
class _ConnectionBase:
 
116
    _handle = None
 
117
 
 
118
    def __init__(self, handle, readable=True, writable=True):
 
119
        handle = handle.__index__()
 
120
        if handle < 0:
 
121
            raise ValueError("invalid handle")
 
122
        if not readable and not writable:
 
123
            raise ValueError(
 
124
                "at least one of `readable` and `writable` must be True")
 
125
        self._handle = handle
 
126
        self._readable = readable
 
127
        self._writable = writable
 
128
 
 
129
    # XXX should we use util.Finalize instead of a __del__?
 
130
 
 
131
    def __del__(self):
 
132
        if self._handle is not None:
 
133
            self._close()
 
134
 
 
135
    def _check_closed(self):
 
136
        if self._handle is None:
 
137
            raise OSError("handle is closed")
 
138
 
 
139
    def _check_readable(self):
 
140
        if not self._readable:
 
141
            raise OSError("connection is write-only")
 
142
 
 
143
    def _check_writable(self):
 
144
        if not self._writable:
 
145
            raise OSError("connection is read-only")
 
146
 
 
147
    def _bad_message_length(self):
 
148
        if self._writable:
 
149
            self._readable = False
 
150
        else:
 
151
            self.close()
 
152
        raise OSError("bad message length")
 
153
 
 
154
    @property
 
155
    def closed(self):
 
156
        """True if the connection is closed"""
 
157
        return self._handle is None
 
158
 
 
159
    @property
 
160
    def readable(self):
 
161
        """True if the connection is readable"""
 
162
        return self._readable
 
163
 
 
164
    @property
 
165
    def writable(self):
 
166
        """True if the connection is writable"""
 
167
        return self._writable
 
168
 
 
169
    def fileno(self):
 
170
        """File descriptor or handle of the connection"""
 
171
        self._check_closed()
 
172
        return self._handle
 
173
 
 
174
    def close(self):
 
175
        """Close the connection"""
 
176
        if self._handle is not None:
 
177
            try:
 
178
                self._close()
 
179
            finally:
 
180
                self._handle = None
 
181
 
 
182
    def send_bytes(self, buf, offset=0, size=None):
 
183
        """Send the bytes data from a bytes-like object"""
 
184
        self._check_closed()
 
185
        self._check_writable()
 
186
        m = memoryview(buf)
 
187
        # HACK for byte-indexing of non-bytewise buffers (e.g. array.array)
 
188
        if m.itemsize > 1:
 
189
            m = memoryview(bytes(m))
 
190
        n = len(m)
 
191
        if offset < 0:
 
192
            raise ValueError("offset is negative")
 
193
        if n < offset:
 
194
            raise ValueError("buffer length < offset")
 
195
        if size is None:
 
196
            size = n - offset
 
197
        elif size < 0:
 
198
            raise ValueError("size is negative")
 
199
        elif offset + size > n:
 
200
            raise ValueError("buffer length < offset + size")
 
201
        self._send_bytes(m[offset:offset + size])
 
202
 
 
203
    def send(self, obj):
 
204
        """Send a (picklable) object"""
 
205
        self._check_closed()
 
206
        self._check_writable()
 
207
        self._send_bytes(ForkingPickler.dumps(obj))
 
208
 
 
209
    def recv_bytes(self, maxlength=None):
 
210
        """
 
211
        Receive bytes data as a bytes object.
 
212
        """
 
213
        self._check_closed()
 
214
        self._check_readable()
 
215
        if maxlength is not None and maxlength < 0:
 
216
            raise ValueError("negative maxlength")
 
217
        buf = self._recv_bytes(maxlength)
 
218
        if buf is None:
 
219
            self._bad_message_length()
 
220
        return buf.getvalue()
 
221
 
 
222
    def recv_bytes_into(self, buf, offset=0):
 
223
        """
 
224
        Receive bytes data into a writeable buffer-like object.
 
225
        Return the number of bytes read.
 
226
        """
 
227
        self._check_closed()
 
228
        self._check_readable()
 
229
        with memoryview(buf) as m:
 
230
            # Get bytesize of arbitrary buffer
 
231
            itemsize = m.itemsize
 
232
            bytesize = itemsize * len(m)
 
233
            if offset < 0:
 
234
                raise ValueError("negative offset")
 
235
            elif offset > bytesize:
 
236
                raise ValueError("offset too large")
 
237
            result = self._recv_bytes()
 
238
            size = result.tell()
 
239
            if bytesize < offset + size:
 
240
                raise BufferTooShort(result.getvalue())
 
241
            # Message can fit in dest
 
242
            result.seek(0)
 
243
            result.readinto(m[offset // itemsize :
 
244
                              (offset + size) // itemsize])
 
245
            return size
 
246
 
 
247
    def recv(self):
 
248
        """Receive a (picklable) object"""
 
249
        self._check_closed()
 
250
        self._check_readable()
 
251
        buf = self._recv_bytes()
 
252
        return ForkingPickler.loads(buf.getbuffer())
 
253
 
 
254
    def poll(self, timeout=0.0):
 
255
        """Whether there is any input available to be read"""
 
256
        self._check_closed()
 
257
        self._check_readable()
 
258
        return self._poll(timeout)
 
259
 
 
260
    def __enter__(self):
 
261
        return self
 
262
 
 
263
    def __exit__(self, exc_type, exc_value, exc_tb):
 
264
        self.close()
 
265
 
 
266
 
 
267
if _winapi:
 
268
 
 
269
    class PipeConnection(_ConnectionBase):
 
270
        """
 
271
        Connection class based on a Windows named pipe.
 
272
        Overlapped I/O is used, so the handles must have been created
 
273
        with FILE_FLAG_OVERLAPPED.
 
274
        """
 
275
        _got_empty_message = False
 
276
 
 
277
        def _close(self, _CloseHandle=_winapi.CloseHandle):
 
278
            _CloseHandle(self._handle)
 
279
 
 
280
        def _send_bytes(self, buf):
 
281
            ov, err = _winapi.WriteFile(self._handle, buf, overlapped=True)
 
282
            try:
 
283
                if err == _winapi.ERROR_IO_PENDING:
 
284
                    waitres = _winapi.WaitForMultipleObjects(
 
285
                        [ov.event], False, INFINITE)
 
286
                    assert waitres == WAIT_OBJECT_0
 
287
            except:
 
288
                ov.cancel()
 
289
                raise
 
290
            finally:
 
291
                nwritten, err = ov.GetOverlappedResult(True)
 
292
            assert err == 0
 
293
            assert nwritten == len(buf)
 
294
 
 
295
        def _recv_bytes(self, maxsize=None):
 
296
            if self._got_empty_message:
 
297
                self._got_empty_message = False
 
298
                return io.BytesIO()
 
299
            else:
 
300
                bsize = 128 if maxsize is None else min(maxsize, 128)
 
301
                try:
 
302
                    ov, err = _winapi.ReadFile(self._handle, bsize,
 
303
                                                overlapped=True)
 
304
                    try:
 
305
                        if err == _winapi.ERROR_IO_PENDING:
 
306
                            waitres = _winapi.WaitForMultipleObjects(
 
307
                                [ov.event], False, INFINITE)
 
308
                            assert waitres == WAIT_OBJECT_0
 
309
                    except:
 
310
                        ov.cancel()
 
311
                        raise
 
312
                    finally:
 
313
                        nread, err = ov.GetOverlappedResult(True)
 
314
                        if err == 0:
 
315
                            f = io.BytesIO()
 
316
                            f.write(ov.getbuffer())
 
317
                            return f
 
318
                        elif err == _winapi.ERROR_MORE_DATA:
 
319
                            return self._get_more_data(ov, maxsize)
 
320
                except OSError as e:
 
321
                    if e.winerror == _winapi.ERROR_BROKEN_PIPE:
 
322
                        raise EOFError
 
323
                    else:
 
324
                        raise
 
325
            raise RuntimeError("shouldn't get here; expected KeyboardInterrupt")
 
326
 
 
327
        def _poll(self, timeout):
 
328
            if (self._got_empty_message or
 
329
                        _winapi.PeekNamedPipe(self._handle)[0] != 0):
 
330
                return True
 
331
            return bool(wait([self], timeout))
 
332
 
 
333
        def _get_more_data(self, ov, maxsize):
 
334
            buf = ov.getbuffer()
 
335
            f = io.BytesIO()
 
336
            f.write(buf)
 
337
            left = _winapi.PeekNamedPipe(self._handle)[1]
 
338
            assert left > 0
 
339
            if maxsize is not None and len(buf) + left > maxsize:
 
340
                self._bad_message_length()
 
341
            ov, err = _winapi.ReadFile(self._handle, left, overlapped=True)
 
342
            rbytes, err = ov.GetOverlappedResult(True)
 
343
            assert err == 0
 
344
            assert rbytes == left
 
345
            f.write(ov.getbuffer())
 
346
            return f
 
347
 
 
348
 
 
349
class Connection(_ConnectionBase):
 
350
    """
 
351
    Connection class based on an arbitrary file descriptor (Unix only), or
 
352
    a socket handle (Windows).
 
353
    """
 
354
 
 
355
    if _winapi:
 
356
        def _close(self, _close=_multiprocessing.closesocket):
 
357
            _close(self._handle)
 
358
        _write = _multiprocessing.send
 
359
        _read = _multiprocessing.recv
 
360
    else:
 
361
        def _close(self, _close=os.close):
 
362
            _close(self._handle)
 
363
        _write = os.write
 
364
        _read = os.read
 
365
 
 
366
    def _send(self, buf, write=_write):
 
367
        remaining = len(buf)
 
368
        while True:
 
369
            try:
 
370
                n = write(self._handle, buf)
 
371
            except InterruptedError:
 
372
                continue
 
373
            remaining -= n
 
374
            if remaining == 0:
 
375
                break
 
376
            buf = buf[n:]
 
377
 
 
378
    def _recv(self, size, read=_read):
 
379
        buf = io.BytesIO()
 
380
        handle = self._handle
 
381
        remaining = size
 
382
        while remaining > 0:
 
383
            try:
 
384
                chunk = read(handle, remaining)
 
385
            except InterruptedError:
 
386
                continue
 
387
            n = len(chunk)
 
388
            if n == 0:
 
389
                if remaining == size:
 
390
                    raise EOFError
 
391
                else:
 
392
                    raise OSError("got end of file during message")
 
393
            buf.write(chunk)
 
394
            remaining -= n
 
395
        return buf
 
396
 
 
397
    def _send_bytes(self, buf):
 
398
        # For wire compatibility with 3.2 and lower
 
399
        n = len(buf)
 
400
        self._send(struct.pack("!i", n))
 
401
        # The condition is necessary to avoid "broken pipe" errors
 
402
        # when sending a 0-length buffer if the other end closed the pipe.
 
403
        if n > 0:
 
404
            self._send(buf)
 
405
 
 
406
    def _recv_bytes(self, maxsize=None):
 
407
        buf = self._recv(4)
 
408
        size, = struct.unpack("!i", buf.getvalue())
 
409
        if maxsize is not None and size > maxsize:
 
410
            return None
 
411
        return self._recv(size)
 
412
 
 
413
    def _poll(self, timeout):
 
414
        r = wait([self], timeout)
 
415
        return bool(r)
 
416
 
 
417
 
 
418
#
 
419
# Public functions
 
420
#
 
421
 
 
422
class Listener(object):
 
423
    '''
 
424
    Returns a listener object.
 
425
 
 
426
    This is a wrapper for a bound socket which is 'listening' for
 
427
    connections, or for a Windows named pipe.
 
428
    '''
 
429
    def __init__(self, address=None, family=None, backlog=1, authkey=None):
 
430
        family = family or (address and address_type(address)) \
 
431
                 or default_family
 
432
        address = address or arbitrary_address(family)
 
433
 
 
434
        _validate_family(family)
 
435
        if family == 'AF_PIPE':
 
436
            self._listener = PipeListener(address, backlog)
 
437
        else:
 
438
            self._listener = SocketListener(address, family, backlog)
 
439
 
 
440
        if authkey is not None and not isinstance(authkey, bytes):
 
441
            raise TypeError('authkey should be a byte string')
 
442
 
 
443
        self._authkey = authkey
 
444
 
 
445
    def accept(self):
 
446
        '''
 
447
        Accept a connection on the bound socket or named pipe of `self`.
 
448
 
 
449
        Returns a `Connection` object.
 
450
        '''
 
451
        if self._listener is None:
 
452
            raise OSError('listener is closed')
 
453
        c = self._listener.accept()
 
454
        if self._authkey:
 
455
            deliver_challenge(c, self._authkey)
 
456
            answer_challenge(c, self._authkey)
 
457
        return c
 
458
 
 
459
    def close(self):
 
460
        '''
 
461
        Close the bound socket or named pipe of `self`.
 
462
        '''
 
463
        if self._listener is not None:
 
464
            self._listener.close()
 
465
            self._listener = None
 
466
 
 
467
    address = property(lambda self: self._listener._address)
 
468
    last_accepted = property(lambda self: self._listener._last_accepted)
 
469
 
 
470
    def __enter__(self):
 
471
        return self
 
472
 
 
473
    def __exit__(self, exc_type, exc_value, exc_tb):
 
474
        self.close()
 
475
 
 
476
 
 
477
def Client(address, family=None, authkey=None):
 
478
    '''
 
479
    Returns a connection to the address of a `Listener`
 
480
    '''
 
481
    family = family or address_type(address)
 
482
    _validate_family(family)
 
483
    if family == 'AF_PIPE':
 
484
        c = PipeClient(address)
 
485
    else:
 
486
        c = SocketClient(address)
 
487
 
 
488
    if authkey is not None and not isinstance(authkey, bytes):
 
489
        raise TypeError('authkey should be a byte string')
 
490
 
 
491
    if authkey is not None:
 
492
        answer_challenge(c, authkey)
 
493
        deliver_challenge(c, authkey)
 
494
 
 
495
    return c
 
496
 
 
497
 
 
498
if sys.platform != 'win32':
 
499
 
 
500
    def Pipe(duplex=True):
 
501
        '''
 
502
        Returns pair of connection objects at either end of a pipe
 
503
        '''
 
504
        if duplex:
 
505
            s1, s2 = socket.socketpair()
 
506
            s1.setblocking(True)
 
507
            s2.setblocking(True)
 
508
            c1 = Connection(s1.detach())
 
509
            c2 = Connection(s2.detach())
 
510
        else:
 
511
            fd1, fd2 = os.pipe()
 
512
            c1 = Connection(fd1, writable=False)
 
513
            c2 = Connection(fd2, readable=False)
 
514
 
 
515
        return c1, c2
 
516
 
 
517
else:
 
518
 
 
519
    def Pipe(duplex=True):
 
520
        '''
 
521
        Returns pair of connection objects at either end of a pipe
 
522
        '''
 
523
        address = arbitrary_address('AF_PIPE')
 
524
        if duplex:
 
525
            openmode = _winapi.PIPE_ACCESS_DUPLEX
 
526
            access = _winapi.GENERIC_READ | _winapi.GENERIC_WRITE
 
527
            obsize, ibsize = BUFSIZE, BUFSIZE
 
528
        else:
 
529
            openmode = _winapi.PIPE_ACCESS_INBOUND
 
530
            access = _winapi.GENERIC_WRITE
 
531
            obsize, ibsize = 0, BUFSIZE
 
532
 
 
533
        h1 = _winapi.CreateNamedPipe(
 
534
            address, openmode | _winapi.FILE_FLAG_OVERLAPPED |
 
535
            _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE,
 
536
            _winapi.PIPE_TYPE_MESSAGE | _winapi.PIPE_READMODE_MESSAGE |
 
537
            _winapi.PIPE_WAIT,
 
538
            1, obsize, ibsize, _winapi.NMPWAIT_WAIT_FOREVER,
 
539
            # default security descriptor: the handle cannot be inherited
 
540
            _winapi.NULL
 
541
            )
 
542
        h2 = _winapi.CreateFile(
 
543
            address, access, 0, _winapi.NULL, _winapi.OPEN_EXISTING,
 
544
            _winapi.FILE_FLAG_OVERLAPPED, _winapi.NULL
 
545
            )
 
546
        _winapi.SetNamedPipeHandleState(
 
547
            h2, _winapi.PIPE_READMODE_MESSAGE, None, None
 
548
            )
 
549
 
 
550
        overlapped = _winapi.ConnectNamedPipe(h1, overlapped=True)
 
551
        _, err = overlapped.GetOverlappedResult(True)
 
552
        assert err == 0
 
553
 
 
554
        c1 = PipeConnection(h1, writable=duplex)
 
555
        c2 = PipeConnection(h2, readable=duplex)
 
556
 
 
557
        return c1, c2
 
558
 
 
559
#
 
560
# Definitions for connections based on sockets
 
561
#
 
562
 
 
563
class SocketListener(object):
 
564
    '''
 
565
    Representation of a socket which is bound to an address and listening
 
566
    '''
 
567
    def __init__(self, address, family, backlog=1):
 
568
        self._socket = socket.socket(getattr(socket, family))
 
569
        try:
 
570
            # SO_REUSEADDR has different semantics on Windows (issue #2550).
 
571
            if os.name == 'posix':
 
572
                self._socket.setsockopt(socket.SOL_SOCKET,
 
573
                                        socket.SO_REUSEADDR, 1)
 
574
            self._socket.setblocking(True)
 
575
            self._socket.bind(address)
 
576
            self._socket.listen(backlog)
 
577
            self._address = self._socket.getsockname()
 
578
        except OSError:
 
579
            self._socket.close()
 
580
            raise
 
581
        self._family = family
 
582
        self._last_accepted = None
 
583
 
 
584
        if family == 'AF_UNIX':
 
585
            self._unlink = util.Finalize(
 
586
                self, os.unlink, args=(address,), exitpriority=0
 
587
                )
 
588
        else:
 
589
            self._unlink = None
 
590
 
 
591
    def accept(self):
 
592
        while True:
 
593
            try:
 
594
                s, self._last_accepted = self._socket.accept()
 
595
            except InterruptedError:
 
596
                pass
 
597
            else:
 
598
                break
 
599
        s.setblocking(True)
 
600
        return Connection(s.detach())
 
601
 
 
602
    def close(self):
 
603
        self._socket.close()
 
604
        if self._unlink is not None:
 
605
            self._unlink()
 
606
 
 
607
 
 
608
def SocketClient(address):
 
609
    '''
 
610
    Return a connection object connected to the socket given by `address`
 
611
    '''
 
612
    family = address_type(address)
 
613
    with socket.socket( getattr(socket, family) ) as s:
 
614
        s.setblocking(True)
 
615
        s.connect(address)
 
616
        return Connection(s.detach())
 
617
 
 
618
#
 
619
# Definitions for connections based on named pipes
 
620
#
 
621
 
 
622
if sys.platform == 'win32':
 
623
 
 
624
    class PipeListener(object):
 
625
        '''
 
626
        Representation of a named pipe
 
627
        '''
 
628
        def __init__(self, address, backlog=None):
 
629
            self._address = address
 
630
            self._handle_queue = [self._new_handle(first=True)]
 
631
 
 
632
            self._last_accepted = None
 
633
            util.sub_debug('listener created with address=%r', self._address)
 
634
            self.close = util.Finalize(
 
635
                self, PipeListener._finalize_pipe_listener,
 
636
                args=(self._handle_queue, self._address), exitpriority=0
 
637
                )
 
638
 
 
639
        def _new_handle(self, first=False):
 
640
            flags = _winapi.PIPE_ACCESS_DUPLEX | _winapi.FILE_FLAG_OVERLAPPED
 
641
            if first:
 
642
                flags |= _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE
 
643
            return _winapi.CreateNamedPipe(
 
644
                self._address, flags,
 
645
                _winapi.PIPE_TYPE_MESSAGE | _winapi.PIPE_READMODE_MESSAGE |
 
646
                _winapi.PIPE_WAIT,
 
647
                _winapi.PIPE_UNLIMITED_INSTANCES, BUFSIZE, BUFSIZE,
 
648
                _winapi.NMPWAIT_WAIT_FOREVER, _winapi.NULL
 
649
                )
 
650
 
 
651
        def accept(self):
 
652
            self._handle_queue.append(self._new_handle())
 
653
            handle = self._handle_queue.pop(0)
 
654
            try:
 
655
                ov = _winapi.ConnectNamedPipe(handle, overlapped=True)
 
656
            except OSError as e:
 
657
                if e.winerror != _winapi.ERROR_NO_DATA:
 
658
                    raise
 
659
                # ERROR_NO_DATA can occur if a client has already connected,
 
660
                # written data and then disconnected -- see Issue 14725.
 
661
            else:
 
662
                try:
 
663
                    res = _winapi.WaitForMultipleObjects(
 
664
                        [ov.event], False, INFINITE)
 
665
                except:
 
666
                    ov.cancel()
 
667
                    _winapi.CloseHandle(handle)
 
668
                    raise
 
669
                finally:
 
670
                    _, err = ov.GetOverlappedResult(True)
 
671
                    assert err == 0
 
672
            return PipeConnection(handle)
 
673
 
 
674
        @staticmethod
 
675
        def _finalize_pipe_listener(queue, address):
 
676
            util.sub_debug('closing listener with address=%r', address)
 
677
            for handle in queue:
 
678
                _winapi.CloseHandle(handle)
 
679
 
 
680
    def PipeClient(address):
 
681
        '''
 
682
        Return a connection object connected to the pipe given by `address`
 
683
        '''
 
684
        t = _init_timeout()
 
685
        while 1:
 
686
            try:
 
687
                _winapi.WaitNamedPipe(address, 1000)
 
688
                h = _winapi.CreateFile(
 
689
                    address, _winapi.GENERIC_READ | _winapi.GENERIC_WRITE,
 
690
                    0, _winapi.NULL, _winapi.OPEN_EXISTING,
 
691
                    _winapi.FILE_FLAG_OVERLAPPED, _winapi.NULL
 
692
                    )
 
693
            except OSError as e:
 
694
                if e.winerror not in (_winapi.ERROR_SEM_TIMEOUT,
 
695
                                      _winapi.ERROR_PIPE_BUSY) or _check_timeout(t):
 
696
                    raise
 
697
            else:
 
698
                break
 
699
        else:
 
700
            raise
 
701
 
 
702
        _winapi.SetNamedPipeHandleState(
 
703
            h, _winapi.PIPE_READMODE_MESSAGE, None, None
 
704
            )
 
705
        return PipeConnection(h)
 
706
 
 
707
#
 
708
# Authentication stuff
 
709
#
 
710
 
 
711
MESSAGE_LENGTH = 20
 
712
 
 
713
CHALLENGE = b'#CHALLENGE#'
 
714
WELCOME = b'#WELCOME#'
 
715
FAILURE = b'#FAILURE#'
 
716
 
 
717
def deliver_challenge(connection, authkey):
 
718
    import hmac
 
719
    assert isinstance(authkey, bytes)
 
720
    message = os.urandom(MESSAGE_LENGTH)
 
721
    connection.send_bytes(CHALLENGE + message)
 
722
    digest = hmac.new(authkey, message, 'md5').digest()
 
723
    response = connection.recv_bytes(256)        # reject large message
 
724
    if response == digest:
 
725
        connection.send_bytes(WELCOME)
 
726
    else:
 
727
        connection.send_bytes(FAILURE)
 
728
        raise AuthenticationError('digest received was wrong')
 
729
 
 
730
def answer_challenge(connection, authkey):
 
731
    import hmac
 
732
    assert isinstance(authkey, bytes)
 
733
    message = connection.recv_bytes(256)         # reject large message
 
734
    assert message[:len(CHALLENGE)] == CHALLENGE, 'message = %r' % message
 
735
    message = message[len(CHALLENGE):]
 
736
    digest = hmac.new(authkey, message, 'md5').digest()
 
737
    connection.send_bytes(digest)
 
738
    response = connection.recv_bytes(256)        # reject large message
 
739
    if response != WELCOME:
 
740
        raise AuthenticationError('digest sent was rejected')
 
741
 
 
742
#
 
743
# Support for using xmlrpclib for serialization
 
744
#
 
745
 
 
746
class ConnectionWrapper(object):
 
747
    def __init__(self, conn, dumps, loads):
 
748
        self._conn = conn
 
749
        self._dumps = dumps
 
750
        self._loads = loads
 
751
        for attr in ('fileno', 'close', 'poll', 'recv_bytes', 'send_bytes'):
 
752
            obj = getattr(conn, attr)
 
753
            setattr(self, attr, obj)
 
754
    def send(self, obj):
 
755
        s = self._dumps(obj)
 
756
        self._conn.send_bytes(s)
 
757
    def recv(self):
 
758
        s = self._conn.recv_bytes()
 
759
        return self._loads(s)
 
760
 
 
761
def _xml_dumps(obj):
 
762
    return xmlrpclib.dumps((obj,), None, None, None, 1).encode('utf-8')
 
763
 
 
764
def _xml_loads(s):
 
765
    (obj,), method = xmlrpclib.loads(s.decode('utf-8'))
 
766
    return obj
 
767
 
 
768
class XmlListener(Listener):
 
769
    def accept(self):
 
770
        global xmlrpclib
 
771
        import xmlrpc.client as xmlrpclib
 
772
        obj = Listener.accept(self)
 
773
        return ConnectionWrapper(obj, _xml_dumps, _xml_loads)
 
774
 
 
775
def XmlClient(*args, **kwds):
 
776
    global xmlrpclib
 
777
    import xmlrpc.client as xmlrpclib
 
778
    return ConnectionWrapper(Client(*args, **kwds), _xml_dumps, _xml_loads)
 
779
 
 
780
#
 
781
# Wait
 
782
#
 
783
 
 
784
if sys.platform == 'win32':
 
785
 
 
786
    def _exhaustive_wait(handles, timeout):
 
787
        # Return ALL handles which are currently signalled.  (Only
 
788
        # returning the first signalled might create starvation issues.)
 
789
        L = list(handles)
 
790
        ready = []
 
791
        while L:
 
792
            res = _winapi.WaitForMultipleObjects(L, False, timeout)
 
793
            if res == WAIT_TIMEOUT:
 
794
                break
 
795
            elif WAIT_OBJECT_0 <= res < WAIT_OBJECT_0 + len(L):
 
796
                res -= WAIT_OBJECT_0
 
797
            elif WAIT_ABANDONED_0 <= res < WAIT_ABANDONED_0 + len(L):
 
798
                res -= WAIT_ABANDONED_0
 
799
            else:
 
800
                raise RuntimeError('Should not get here')
 
801
            ready.append(L[res])
 
802
            L = L[res+1:]
 
803
            timeout = 0
 
804
        return ready
 
805
 
 
806
    _ready_errors = {_winapi.ERROR_BROKEN_PIPE, _winapi.ERROR_NETNAME_DELETED}
 
807
 
 
808
    def wait(object_list, timeout=None):
 
809
        '''
 
810
        Wait till an object in object_list is ready/readable.
 
811
 
 
812
        Returns list of those objects in object_list which are ready/readable.
 
813
        '''
 
814
        if timeout is None:
 
815
            timeout = INFINITE
 
816
        elif timeout < 0:
 
817
            timeout = 0
 
818
        else:
 
819
            timeout = int(timeout * 1000 + 0.5)
 
820
 
 
821
        object_list = list(object_list)
 
822
        waithandle_to_obj = {}
 
823
        ov_list = []
 
824
        ready_objects = set()
 
825
        ready_handles = set()
 
826
 
 
827
        try:
 
828
            for o in object_list:
 
829
                try:
 
830
                    fileno = getattr(o, 'fileno')
 
831
                except AttributeError:
 
832
                    waithandle_to_obj[o.__index__()] = o
 
833
                else:
 
834
                    # start an overlapped read of length zero
 
835
                    try:
 
836
                        ov, err = _winapi.ReadFile(fileno(), 0, True)
 
837
                    except OSError as e:
 
838
                        err = e.winerror
 
839
                        if err not in _ready_errors:
 
840
                            raise
 
841
                    if err == _winapi.ERROR_IO_PENDING:
 
842
                        ov_list.append(ov)
 
843
                        waithandle_to_obj[ov.event] = o
 
844
                    else:
 
845
                        # If o.fileno() is an overlapped pipe handle and
 
846
                        # err == 0 then there is a zero length message
 
847
                        # in the pipe, but it HAS NOT been consumed.
 
848
                        ready_objects.add(o)
 
849
                        timeout = 0
 
850
 
 
851
            ready_handles = _exhaustive_wait(waithandle_to_obj.keys(), timeout)
 
852
        finally:
 
853
            # request that overlapped reads stop
 
854
            for ov in ov_list:
 
855
                ov.cancel()
 
856
 
 
857
            # wait for all overlapped reads to stop
 
858
            for ov in ov_list:
 
859
                try:
 
860
                    _, err = ov.GetOverlappedResult(True)
 
861
                except OSError as e:
 
862
                    err = e.winerror
 
863
                    if err not in _ready_errors:
 
864
                        raise
 
865
                if err != _winapi.ERROR_OPERATION_ABORTED:
 
866
                    o = waithandle_to_obj[ov.event]
 
867
                    ready_objects.add(o)
 
868
                    if err == 0:
 
869
                        # If o.fileno() is an overlapped pipe handle then
 
870
                        # a zero length message HAS been consumed.
 
871
                        if hasattr(o, '_got_empty_message'):
 
872
                            o._got_empty_message = True
 
873
 
 
874
        ready_objects.update(waithandle_to_obj[h] for h in ready_handles)
 
875
        return [o for o in object_list if o in ready_objects]
 
876
 
 
877
else:
 
878
 
 
879
    import selectors
 
880
 
 
881
    # poll/select have the advantage of not requiring any extra file
 
882
    # descriptor, contrarily to epoll/kqueue (also, they require a single
 
883
    # syscall).
 
884
    if hasattr(selectors, 'PollSelector'):
 
885
        _WaitSelector = selectors.PollSelector
 
886
    else:
 
887
        _WaitSelector = selectors.SelectSelector
 
888
 
 
889
    def wait(object_list, timeout=None):
 
890
        '''
 
891
        Wait till an object in object_list is ready/readable.
 
892
 
 
893
        Returns list of those objects in object_list which are ready/readable.
 
894
        '''
 
895
        with _WaitSelector() as selector:
 
896
            for obj in object_list:
 
897
                selector.register(obj, selectors.EVENT_READ)
 
898
 
 
899
            if timeout is not None:
 
900
                deadline = time.time() + timeout
 
901
 
 
902
            while True:
 
903
                ready = selector.select(timeout)
 
904
                if ready:
 
905
                    return [key.fileobj for (key, events) in ready]
 
906
                else:
 
907
                    if timeout is not None:
 
908
                        timeout = deadline - time.time()
 
909
                        if timeout < 0:
 
910
                            return ready
 
911
 
 
912
#
 
913
# Make connection and socket objects sharable if possible
 
914
#
 
915
 
 
916
if sys.platform == 'win32':
 
917
    def reduce_connection(conn):
 
918
        handle = conn.fileno()
 
919
        with socket.fromfd(handle, socket.AF_INET, socket.SOCK_STREAM) as s:
 
920
            from . import resource_sharer
 
921
            ds = resource_sharer.DupSocket(s)
 
922
            return rebuild_connection, (ds, conn.readable, conn.writable)
 
923
    def rebuild_connection(ds, readable, writable):
 
924
        sock = ds.detach()
 
925
        return Connection(sock.detach(), readable, writable)
 
926
    reduction.register(Connection, reduce_connection)
 
927
 
 
928
    def reduce_pipe_connection(conn):
 
929
        access = ((_winapi.FILE_GENERIC_READ if conn.readable else 0) |
 
930
                  (_winapi.FILE_GENERIC_WRITE if conn.writable else 0))
 
931
        dh = reduction.DupHandle(conn.fileno(), access)
 
932
        return rebuild_pipe_connection, (dh, conn.readable, conn.writable)
 
933
    def rebuild_pipe_connection(dh, readable, writable):
 
934
        handle = dh.detach()
 
935
        return PipeConnection(handle, readable, writable)
 
936
    reduction.register(PipeConnection, reduce_pipe_connection)
 
937
 
 
938
else:
 
939
    def reduce_connection(conn):
 
940
        df = reduction.DupFd(conn.fileno())
 
941
        return rebuild_connection, (df, conn.readable, conn.writable)
 
942
    def rebuild_connection(df, readable, writable):
 
943
        fd = df.detach()
 
944
        return Connection(fd, readable, writable)
 
945
    reduction.register(Connection, reduce_connection)