~ubuntu-branches/ubuntu/maverick/python3.1/maverick

« back to all changes in this revision

Viewing changes to Lib/asyncore.py

  • Committer: Bazaar Package Importer
  • Author(s): Matthias Klose
  • Date: 2009-03-23 00:01:27 UTC
  • Revision ID: james.westby@ubuntu.com-20090323000127-5fstfxju4ufrhthq
Tags: upstream-3.1~a1+20090322
ImportĀ upstreamĀ versionĀ 3.1~a1+20090322

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# -*- Mode: Python -*-
 
2
#   Id: asyncore.py,v 2.51 2000/09/07 22:29:26 rushing Exp
 
3
#   Author: Sam Rushing <rushing@nightmare.com>
 
4
 
 
5
# ======================================================================
 
6
# Copyright 1996 by Sam Rushing
 
7
#
 
8
#                         All Rights Reserved
 
9
#
 
10
# Permission to use, copy, modify, and distribute this software and
 
11
# its documentation for any purpose and without fee is hereby
 
12
# granted, provided that the above copyright notice appear in all
 
13
# copies and that both that copyright notice and this permission
 
14
# notice appear in supporting documentation, and that the name of Sam
 
15
# Rushing not be used in advertising or publicity pertaining to
 
16
# distribution of the software without specific, written prior
 
17
# permission.
 
18
#
 
19
# SAM RUSHING DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE,
 
20
# INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN
 
21
# NO EVENT SHALL SAM RUSHING BE LIABLE FOR ANY SPECIAL, INDIRECT OR
 
22
# CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS
 
23
# OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
 
24
# NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
 
25
# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
 
26
# ======================================================================
 
27
 
 
28
"""Basic infrastructure for asynchronous socket service clients and servers.
 
29
 
 
30
There are only two ways to have a program on a single processor do "more
 
31
than one thing at a time".  Multi-threaded programming is the simplest and
 
32
most popular way to do it, but there is another very different technique,
 
33
that lets you have nearly all the advantages of multi-threading, without
 
34
actually using multiple threads. it's really only practical if your program
 
35
is largely I/O bound. If your program is CPU bound, then pre-emptive
 
36
scheduled threads are probably what you really need. Network servers are
 
37
rarely CPU-bound, however.
 
38
 
 
39
If your operating system supports the select() system call in its I/O
 
40
library (and nearly all do), then you can use it to juggle multiple
 
41
communication channels at once; doing other work while your I/O is taking
 
42
place in the "background."  Although this strategy can seem strange and
 
43
complex, especially at first, it is in many ways easier to understand and
 
44
control than multi-threaded programming. The module documented here solves
 
45
many of the difficult problems for you, making the task of building
 
46
sophisticated high-performance network servers and clients a snap.
 
47
"""
 
48
 
 
49
import select
 
50
import socket
 
51
import sys
 
52
import time
 
53
import os
 
54
from errno import EALREADY, EINPROGRESS, EWOULDBLOCK, ECONNRESET, \
 
55
     ENOTCONN, ESHUTDOWN, EINTR, EISCONN, EBADF, ECONNABORTED, errorcode
 
56
 
 
57
try:
 
58
    socket_map
 
59
except NameError:
 
60
    socket_map = {}
 
61
 
 
62
def _strerror(err):
 
63
    res = os.strerror(err)
 
64
    if res == 'Unknown error':
 
65
        res = errorcode[err]
 
66
    return res
 
67
 
 
68
class ExitNow(Exception):
 
69
    pass
 
70
 
 
71
def read(obj):
 
72
    try:
 
73
        obj.handle_read_event()
 
74
    except (ExitNow, KeyboardInterrupt, SystemExit):
 
75
        raise
 
76
    except:
 
77
        obj.handle_error()
 
78
 
 
79
def write(obj):
 
80
    try:
 
81
        obj.handle_write_event()
 
82
    except (ExitNow, KeyboardInterrupt, SystemExit):
 
83
        raise
 
84
    except:
 
85
        obj.handle_error()
 
86
 
 
87
def _exception(obj):
 
88
    try:
 
89
        obj.handle_expt_event()
 
90
    except (ExitNow, KeyboardInterrupt, SystemExit):
 
91
        raise
 
92
    except:
 
93
        obj.handle_error()
 
94
 
 
95
def readwrite(obj, flags):
 
96
    try:
 
97
        if flags & (select.POLLIN | select.POLLPRI):
 
98
            obj.handle_read_event()
 
99
        if flags & select.POLLOUT:
 
100
            obj.handle_write_event()
 
101
        if flags & (select.POLLERR | select.POLLNVAL):
 
102
            obj.handle_expt_event()
 
103
        if flags & select.POLLHUP:
 
104
            obj.handle_close()
 
105
    except (ExitNow, KeyboardInterrupt, SystemExit):
 
106
        raise
 
107
    except:
 
108
        obj.handle_error()
 
109
 
 
110
def poll(timeout=0.0, map=None):
 
111
    if map is None:
 
112
        map = socket_map
 
113
    if map:
 
114
        r = []; w = []; e = []
 
115
        for fd, obj in list(map.items()):
 
116
            is_r = obj.readable()
 
117
            is_w = obj.writable()
 
118
            if is_r:
 
119
                r.append(fd)
 
120
            if is_w:
 
121
                w.append(fd)
 
122
            if is_r or is_w:
 
123
                e.append(fd)
 
124
        if [] == r == w == e:
 
125
            time.sleep(timeout)
 
126
            return
 
127
 
 
128
        try:
 
129
            r, w, e = select.select(r, w, e, timeout)
 
130
        except select.error as err:
 
131
            if err.args[0] != EINTR:
 
132
                raise
 
133
            else:
 
134
                return
 
135
 
 
136
        for fd in r:
 
137
            obj = map.get(fd)
 
138
            if obj is None:
 
139
                continue
 
140
            read(obj)
 
141
 
 
142
        for fd in w:
 
143
            obj = map.get(fd)
 
144
            if obj is None:
 
145
                continue
 
146
            write(obj)
 
147
 
 
148
        for fd in e:
 
149
            obj = map.get(fd)
 
150
            if obj is None:
 
151
                continue
 
152
            _exception(obj)
 
153
 
 
154
def poll2(timeout=0.0, map=None):
 
155
    # Use the poll() support added to the select module in Python 2.0
 
156
    if map is None:
 
157
        map = socket_map
 
158
    if timeout is not None:
 
159
        # timeout is in milliseconds
 
160
        timeout = int(timeout*1000)
 
161
    pollster = select.poll()
 
162
    if map:
 
163
        for fd, obj in list(map.items()):
 
164
            flags = 0
 
165
            if obj.readable():
 
166
                flags |= select.POLLIN | select.POLLPRI
 
167
            if obj.writable():
 
168
                flags |= select.POLLOUT
 
169
            if flags:
 
170
                # Only check for exceptions if object was either readable
 
171
                # or writable.
 
172
                flags |= select.POLLERR | select.POLLHUP | select.POLLNVAL
 
173
                pollster.register(fd, flags)
 
174
        try:
 
175
            r = pollster.poll(timeout)
 
176
        except select.error as err:
 
177
            if err.args[0] != EINTR:
 
178
                raise
 
179
            r = []
 
180
        for fd, flags in r:
 
181
            obj = map.get(fd)
 
182
            if obj is None:
 
183
                continue
 
184
            readwrite(obj, flags)
 
185
 
 
186
poll3 = poll2                           # Alias for backward compatibility
 
187
 
 
188
def loop(timeout=30.0, use_poll=False, map=None, count=None):
 
189
    if map is None:
 
190
        map = socket_map
 
191
 
 
192
    if use_poll and hasattr(select, 'poll'):
 
193
        poll_fun = poll2
 
194
    else:
 
195
        poll_fun = poll
 
196
 
 
197
    if count is None:
 
198
        while map:
 
199
            poll_fun(timeout, map)
 
200
 
 
201
    else:
 
202
        while map and count > 0:
 
203
            poll_fun(timeout, map)
 
204
            count = count - 1
 
205
 
 
206
class dispatcher:
 
207
 
 
208
    debug = False
 
209
    connected = False
 
210
    accepting = False
 
211
    closing = False
 
212
    addr = None
 
213
 
 
214
    def __init__(self, sock=None, map=None):
 
215
        if map is None:
 
216
            self._map = socket_map
 
217
        else:
 
218
            self._map = map
 
219
 
 
220
        self._fileno = None
 
221
 
 
222
        if sock:
 
223
            # Set to nonblocking just to make sure for cases where we
 
224
            # get a socket from a blocking source.
 
225
            sock.setblocking(0)
 
226
            self.set_socket(sock, map)
 
227
            self.connected = True
 
228
            # The constructor no longer requires that the socket
 
229
            # passed be connected.
 
230
            try:
 
231
                self.addr = sock.getpeername()
 
232
            except socket.error as err:
 
233
                if err.args[0] == ENOTCONN:
 
234
                    # To handle the case where we got an unconnected
 
235
                    # socket.
 
236
                    self.connected = False
 
237
                else:
 
238
                    # The socket is broken in some unknown way, alert
 
239
                    # the user and remove it from the map (to prevent
 
240
                    # polling of broken sockets).
 
241
                    self.del_channel(map)
 
242
                    raise
 
243
        else:
 
244
            self.socket = None
 
245
 
 
246
    def __repr__(self):
 
247
        status = [self.__class__.__module__+"."+self.__class__.__name__]
 
248
        if self.accepting and self.addr:
 
249
            status.append('listening')
 
250
        elif self.connected:
 
251
            status.append('connected')
 
252
        if self.addr is not None:
 
253
            try:
 
254
                status.append('%s:%d' % self.addr)
 
255
            except TypeError:
 
256
                status.append(repr(self.addr))
 
257
        return '<%s at %#x>' % (' '.join(status), id(self))
 
258
 
 
259
    def add_channel(self, map=None):
 
260
        #self.log_info('adding channel %s' % self)
 
261
        if map is None:
 
262
            map = self._map
 
263
        map[self._fileno] = self
 
264
 
 
265
    def del_channel(self, map=None):
 
266
        fd = self._fileno
 
267
        if map is None:
 
268
            map = self._map
 
269
        if fd in map:
 
270
            #self.log_info('closing channel %d:%s' % (fd, self))
 
271
            del map[fd]
 
272
        self._fileno = None
 
273
 
 
274
    def create_socket(self, family, type):
 
275
        self.family_and_type = family, type
 
276
        sock = socket.socket(family, type)
 
277
        sock.setblocking(0)
 
278
        self.set_socket(sock)
 
279
 
 
280
    def set_socket(self, sock, map=None):
 
281
        self.socket = sock
 
282
##        self.__dict__['socket'] = sock
 
283
        self._fileno = sock.fileno()
 
284
        self.add_channel(map)
 
285
 
 
286
    def set_reuse_addr(self):
 
287
        # try to re-use a server port if possible
 
288
        try:
 
289
            self.socket.setsockopt(
 
290
                socket.SOL_SOCKET, socket.SO_REUSEADDR,
 
291
                self.socket.getsockopt(socket.SOL_SOCKET,
 
292
                                       socket.SO_REUSEADDR) | 1
 
293
                )
 
294
        except socket.error:
 
295
            pass
 
296
 
 
297
    # ==================================================
 
298
    # predicates for select()
 
299
    # these are used as filters for the lists of sockets
 
300
    # to pass to select().
 
301
    # ==================================================
 
302
 
 
303
    def readable(self):
 
304
        return True
 
305
 
 
306
    def writable(self):
 
307
        return True
 
308
 
 
309
    # ==================================================
 
310
    # socket object methods.
 
311
    # ==================================================
 
312
 
 
313
    def listen(self, num):
 
314
        self.accepting = True
 
315
        if os.name == 'nt' and num > 5:
 
316
            num = 5
 
317
        return self.socket.listen(num)
 
318
 
 
319
    def bind(self, addr):
 
320
        self.addr = addr
 
321
        return self.socket.bind(addr)
 
322
 
 
323
    def connect(self, address):
 
324
        self.connected = False
 
325
        err = self.socket.connect_ex(address)
 
326
        # XXX Should interpret Winsock return values
 
327
        if err in (EINPROGRESS, EALREADY, EWOULDBLOCK):
 
328
            return
 
329
        if err in (0, EISCONN):
 
330
            self.addr = address
 
331
            self.handle_connect_event()
 
332
        else:
 
333
            raise socket.error(err, errorcode[err])
 
334
 
 
335
    def accept(self):
 
336
        # XXX can return either an address pair or None
 
337
        try:
 
338
            conn, addr = self.socket.accept()
 
339
            return conn, addr
 
340
        except socket.error as why:
 
341
            if why.args[0] == EWOULDBLOCK:
 
342
                pass
 
343
            else:
 
344
                raise
 
345
 
 
346
    def send(self, data):
 
347
        try:
 
348
            result = self.socket.send(data)
 
349
            return result
 
350
        except socket.error as why:
 
351
            if why.args[0] == EWOULDBLOCK:
 
352
                return 0
 
353
            elif why.args[0] in (ECONNRESET, ENOTCONN, ESHUTDOWN, ECONNABORTED):
 
354
                self.handle_close()
 
355
                return 0
 
356
            else:
 
357
                raise
 
358
 
 
359
    def recv(self, buffer_size):
 
360
        try:
 
361
            data = self.socket.recv(buffer_size)
 
362
            if not data:
 
363
                # a closed connection is indicated by signaling
 
364
                # a read condition, and having recv() return 0.
 
365
                self.handle_close()
 
366
                return b''
 
367
            else:
 
368
                return data
 
369
        except socket.error as why:
 
370
            # winsock sometimes throws ENOTCONN
 
371
            if why.args[0] in [ECONNRESET, ENOTCONN, ESHUTDOWN, ECONNABORTED]:
 
372
                self.handle_close()
 
373
                return b''
 
374
            else:
 
375
                raise
 
376
 
 
377
    def close(self):
 
378
        self.connected = False
 
379
        self.accepting = False
 
380
        self.del_channel()
 
381
        try:
 
382
            self.socket.close()
 
383
        except socket.error as why:
 
384
            if why.args[0] not in (ENOTCONN, EBADF):
 
385
                raise
 
386
 
 
387
    # cheap inheritance, used to pass all other attribute
 
388
    # references to the underlying socket object.
 
389
    def __getattr__(self, attr):
 
390
        return getattr(self.socket, attr)
 
391
 
 
392
    # log and log_info may be overridden to provide more sophisticated
 
393
    # logging and warning methods. In general, log is for 'hit' logging
 
394
    # and 'log_info' is for informational, warning and error logging.
 
395
 
 
396
    def log(self, message):
 
397
        sys.stderr.write('log: %s\n' % str(message))
 
398
 
 
399
    def log_info(self, message, type='info'):
 
400
        if __debug__ or type != 'info':
 
401
            print('%s: %s' % (type, message))
 
402
 
 
403
    def handle_read_event(self):
 
404
        if self.accepting:
 
405
            # accepting sockets are never connected, they "spawn" new
 
406
            # sockets that are connected
 
407
            self.handle_accept()
 
408
        elif not self.connected:
 
409
            self.handle_connect_event()
 
410
            self.handle_read()
 
411
        else:
 
412
            self.handle_read()
 
413
 
 
414
    def handle_connect_event(self):
 
415
        self.connected = True
 
416
        self.handle_connect()
 
417
 
 
418
    def handle_write_event(self):
 
419
        if self.accepting:
 
420
            # Accepting sockets shouldn't get a write event.
 
421
            # We will pretend it didn't happen.
 
422
            return
 
423
 
 
424
        if not self.connected:
 
425
            #check for errors
 
426
            err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
 
427
            if err != 0:
 
428
                raise socket.error(err, _strerror(err))
 
429
 
 
430
            self.handle_connect_event()
 
431
        self.handle_write()
 
432
 
 
433
    def handle_expt_event(self):
 
434
        # if the handle_expt is the same default worthless method,
 
435
        # we'll not even bother calling it, we'll instead generate
 
436
        # a useful error
 
437
        x = True
 
438
        try:
 
439
            y1 = self.handle_expt.__func__
 
440
            y2 = dispatcher.handle_expt
 
441
            x = y1 is y2
 
442
        except AttributeError:
 
443
            pass
 
444
 
 
445
        if x:
 
446
            err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
 
447
            msg = _strerror(err)
 
448
 
 
449
            raise socket.error(err, msg)
 
450
        else:
 
451
            self.handle_expt()
 
452
 
 
453
    def handle_error(self):
 
454
        nil, t, v, tbinfo = compact_traceback()
 
455
 
 
456
        # sometimes a user repr method will crash.
 
457
        try:
 
458
            self_repr = repr(self)
 
459
        except:
 
460
            self_repr = '<__repr__(self) failed for object at %0x>' % id(self)
 
461
 
 
462
        self.log_info(
 
463
            'uncaptured python exception, closing channel %s (%s:%s %s)' % (
 
464
                self_repr,
 
465
                t,
 
466
                v,
 
467
                tbinfo
 
468
                ),
 
469
            'error'
 
470
            )
 
471
        self.handle_close()
 
472
 
 
473
    def handle_expt(self):
 
474
        self.log_info('unhandled exception', 'warning')
 
475
 
 
476
    def handle_read(self):
 
477
        self.log_info('unhandled read event', 'warning')
 
478
 
 
479
    def handle_write(self):
 
480
        self.log_info('unhandled write event', 'warning')
 
481
 
 
482
    def handle_connect(self):
 
483
        self.log_info('unhandled connect event', 'warning')
 
484
 
 
485
    def handle_accept(self):
 
486
        self.log_info('unhandled accept event', 'warning')
 
487
 
 
488
    def handle_close(self):
 
489
        self.log_info('unhandled close event', 'warning')
 
490
        self.close()
 
491
 
 
492
# ---------------------------------------------------------------------------
 
493
# adds simple buffered output capability, useful for simple clients.
 
494
# [for more sophisticated usage use asynchat.async_chat]
 
495
# ---------------------------------------------------------------------------
 
496
 
 
497
class dispatcher_with_send(dispatcher):
 
498
 
 
499
    def __init__(self, sock=None, map=None):
 
500
        dispatcher.__init__(self, sock, map)
 
501
        self.out_buffer = b''
 
502
 
 
503
    def initiate_send(self):
 
504
        num_sent = 0
 
505
        num_sent = dispatcher.send(self, self.out_buffer[:512])
 
506
        self.out_buffer = self.out_buffer[num_sent:]
 
507
 
 
508
    def handle_write(self):
 
509
        self.initiate_send()
 
510
 
 
511
    def writable(self):
 
512
        return (not self.connected) or len(self.out_buffer)
 
513
 
 
514
    def send(self, data):
 
515
        if self.debug:
 
516
            self.log_info('sending %s' % repr(data))
 
517
        self.out_buffer = self.out_buffer + data
 
518
        self.initiate_send()
 
519
 
 
520
# ---------------------------------------------------------------------------
 
521
# used for debugging.
 
522
# ---------------------------------------------------------------------------
 
523
 
 
524
def compact_traceback():
 
525
    t, v, tb = sys.exc_info()
 
526
    tbinfo = []
 
527
    if not tb: # Must have a traceback
 
528
        raise AssertionError("traceback does not exist")
 
529
    while tb:
 
530
        tbinfo.append((
 
531
            tb.tb_frame.f_code.co_filename,
 
532
            tb.tb_frame.f_code.co_name,
 
533
            str(tb.tb_lineno)
 
534
            ))
 
535
        tb = tb.tb_next
 
536
 
 
537
    # just to be safe
 
538
    del tb
 
539
 
 
540
    file, function, line = tbinfo[-1]
 
541
    info = ' '.join(['[%s|%s|%s]' % x for x in tbinfo])
 
542
    return (file, function, line), t, v, info
 
543
 
 
544
def close_all(map=None, ignore_all=False):
 
545
    if map is None:
 
546
        map = socket_map
 
547
    for x in list(map.values()):
 
548
        try:
 
549
            x.close()
 
550
        except OSError as x:
 
551
            if x.args[0] == EBADF:
 
552
                pass
 
553
            elif not ignore_all:
 
554
                raise
 
555
        except (ExitNow, KeyboardInterrupt, SystemExit):
 
556
            raise
 
557
        except:
 
558
            if not ignore_all:
 
559
                raise
 
560
    map.clear()
 
561
 
 
562
# Asynchronous File I/O:
 
563
#
 
564
# After a little research (reading man pages on various unixen, and
 
565
# digging through the linux kernel), I've determined that select()
 
566
# isn't meant for doing asynchronous file i/o.
 
567
# Heartening, though - reading linux/mm/filemap.c shows that linux
 
568
# supports asynchronous read-ahead.  So _MOST_ of the time, the data
 
569
# will be sitting in memory for us already when we go to read it.
 
570
#
 
571
# What other OS's (besides NT) support async file i/o?  [VMS?]
 
572
#
 
573
# Regardless, this is useful for pipes, and stdin/stdout...
 
574
 
 
575
if os.name == 'posix':
 
576
    import fcntl
 
577
 
 
578
    class file_wrapper:
 
579
        # Here we override just enough to make a file
 
580
        # look like a socket for the purposes of asyncore.
 
581
        # The passed fd is automatically os.dup()'d
 
582
 
 
583
        def __init__(self, fd):
 
584
            self.fd = os.dup(fd)
 
585
 
 
586
        def recv(self, *args):
 
587
            return os.read(self.fd, *args)
 
588
 
 
589
        def send(self, *args):
 
590
            return os.write(self.fd, *args)
 
591
 
 
592
        read = recv
 
593
        write = send
 
594
 
 
595
        def close(self):
 
596
            os.close(self.fd)
 
597
 
 
598
        def fileno(self):
 
599
            return self.fd
 
600
 
 
601
    class file_dispatcher(dispatcher):
 
602
 
 
603
        def __init__(self, fd, map=None):
 
604
            dispatcher.__init__(self, None, map)
 
605
            self.connected = True
 
606
            try:
 
607
                fd = fd.fileno()
 
608
            except AttributeError:
 
609
                pass
 
610
            self.set_file(fd)
 
611
            # set it to non-blocking mode
 
612
            flags = fcntl.fcntl(fd, fcntl.F_GETFL, 0)
 
613
            flags = flags | os.O_NONBLOCK
 
614
            fcntl.fcntl(fd, fcntl.F_SETFL, flags)
 
615
 
 
616
        def set_file(self, fd):
 
617
            self.socket = file_wrapper(fd)
 
618
            self._fileno = self.socket.fileno()
 
619
            self.add_channel()