2
# Id: asyncore.py,v 2.51 2000/09/07 22:29:26 rushing Exp
3
# Author: Sam Rushing <rushing@nightmare.com>
5
# ======================================================================
6
# Copyright 1996 by Sam Rushing
10
# Permission to use, copy, modify, and distribute this software and
11
# its documentation for any purpose and without fee is hereby
12
# granted, provided that the above copyright notice appear in all
13
# copies and that both that copyright notice and this permission
14
# notice appear in supporting documentation, and that the name of Sam
15
# Rushing not be used in advertising or publicity pertaining to
16
# distribution of the software without specific, written prior
19
# SAM RUSHING DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE,
20
# INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN
21
# NO EVENT SHALL SAM RUSHING BE LIABLE FOR ANY SPECIAL, INDIRECT OR
22
# CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS
23
# OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
24
# NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
25
# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
26
# ======================================================================
28
"""Basic infrastructure for asynchronous socket service clients and servers.
30
There are only two ways to have a program on a single processor do "more
31
than one thing at a time". Multi-threaded programming is the simplest and
32
most popular way to do it, but there is another very different technique,
33
that lets you have nearly all the advantages of multi-threading, without
34
actually using multiple threads. it's really only practical if your program
35
is largely I/O bound. If your program is CPU bound, then pre-emptive
36
scheduled threads are probably what you really need. Network servers are
37
rarely CPU-bound, however.
39
If your operating system supports the select() system call in its I/O
40
library (and nearly all do), then you can use it to juggle multiple
41
communication channels at once; doing other work while your I/O is taking
42
place in the "background." Although this strategy can seem strange and
43
complex, especially at first, it is in many ways easier to understand and
44
control than multi-threaded programming. The module documented here solves
45
many of the difficult problems for you, making the task of building
46
sophisticated high-performance network servers and clients a snap.
54
from errno import EALREADY, EINPROGRESS, EWOULDBLOCK, ECONNRESET, \
55
ENOTCONN, ESHUTDOWN, EINTR, EISCONN, EBADF, ECONNABORTED, errorcode
63
res = os.strerror(err)
64
if res == 'Unknown error':
68
class ExitNow(Exception):
73
obj.handle_read_event()
74
except (ExitNow, KeyboardInterrupt, SystemExit):
81
obj.handle_write_event()
82
except (ExitNow, KeyboardInterrupt, SystemExit):
89
obj.handle_expt_event()
90
except (ExitNow, KeyboardInterrupt, SystemExit):
95
def readwrite(obj, flags):
97
if flags & (select.POLLIN | select.POLLPRI):
98
obj.handle_read_event()
99
if flags & select.POLLOUT:
100
obj.handle_write_event()
101
if flags & (select.POLLERR | select.POLLNVAL):
102
obj.handle_expt_event()
103
if flags & select.POLLHUP:
105
except (ExitNow, KeyboardInterrupt, SystemExit):
110
def poll(timeout=0.0, map=None):
114
r = []; w = []; e = []
115
for fd, obj in list(map.items()):
116
is_r = obj.readable()
117
is_w = obj.writable()
124
if [] == r == w == e:
129
r, w, e = select.select(r, w, e, timeout)
130
except select.error as err:
131
if err.args[0] != EINTR:
154
def poll2(timeout=0.0, map=None):
155
# Use the poll() support added to the select module in Python 2.0
158
if timeout is not None:
159
# timeout is in milliseconds
160
timeout = int(timeout*1000)
161
pollster = select.poll()
163
for fd, obj in list(map.items()):
166
flags |= select.POLLIN | select.POLLPRI
168
flags |= select.POLLOUT
170
# Only check for exceptions if object was either readable
172
flags |= select.POLLERR | select.POLLHUP | select.POLLNVAL
173
pollster.register(fd, flags)
175
r = pollster.poll(timeout)
176
except select.error as err:
177
if err.args[0] != EINTR:
184
readwrite(obj, flags)
186
poll3 = poll2 # Alias for backward compatibility
188
def loop(timeout=30.0, use_poll=False, map=None, count=None):
192
if use_poll and hasattr(select, 'poll'):
199
poll_fun(timeout, map)
202
while map and count > 0:
203
poll_fun(timeout, map)
214
def __init__(self, sock=None, map=None):
216
self._map = socket_map
223
# Set to nonblocking just to make sure for cases where we
224
# get a socket from a blocking source.
226
self.set_socket(sock, map)
227
self.connected = True
228
# The constructor no longer requires that the socket
229
# passed be connected.
231
self.addr = sock.getpeername()
232
except socket.error as err:
233
if err.args[0] == ENOTCONN:
234
# To handle the case where we got an unconnected
236
self.connected = False
238
# The socket is broken in some unknown way, alert
239
# the user and remove it from the map (to prevent
240
# polling of broken sockets).
241
self.del_channel(map)
247
status = [self.__class__.__module__+"."+self.__class__.__name__]
248
if self.accepting and self.addr:
249
status.append('listening')
251
status.append('connected')
252
if self.addr is not None:
254
status.append('%s:%d' % self.addr)
256
status.append(repr(self.addr))
257
return '<%s at %#x>' % (' '.join(status), id(self))
259
def add_channel(self, map=None):
260
#self.log_info('adding channel %s' % self)
263
map[self._fileno] = self
265
def del_channel(self, map=None):
270
#self.log_info('closing channel %d:%s' % (fd, self))
274
def create_socket(self, family, type):
275
self.family_and_type = family, type
276
sock = socket.socket(family, type)
278
self.set_socket(sock)
280
def set_socket(self, sock, map=None):
282
## self.__dict__['socket'] = sock
283
self._fileno = sock.fileno()
284
self.add_channel(map)
286
def set_reuse_addr(self):
287
# try to re-use a server port if possible
289
self.socket.setsockopt(
290
socket.SOL_SOCKET, socket.SO_REUSEADDR,
291
self.socket.getsockopt(socket.SOL_SOCKET,
292
socket.SO_REUSEADDR) | 1
297
# ==================================================
298
# predicates for select()
299
# these are used as filters for the lists of sockets
300
# to pass to select().
301
# ==================================================
309
# ==================================================
310
# socket object methods.
311
# ==================================================
313
def listen(self, num):
314
self.accepting = True
315
if os.name == 'nt' and num > 5:
317
return self.socket.listen(num)
319
def bind(self, addr):
321
return self.socket.bind(addr)
323
def connect(self, address):
324
self.connected = False
325
err = self.socket.connect_ex(address)
326
# XXX Should interpret Winsock return values
327
if err in (EINPROGRESS, EALREADY, EWOULDBLOCK):
329
if err in (0, EISCONN):
331
self.handle_connect_event()
333
raise socket.error(err, errorcode[err])
336
# XXX can return either an address pair or None
338
conn, addr = self.socket.accept()
340
except socket.error as why:
341
if why.args[0] == EWOULDBLOCK:
346
def send(self, data):
348
result = self.socket.send(data)
350
except socket.error as why:
351
if why.args[0] == EWOULDBLOCK:
353
elif why.args[0] in (ECONNRESET, ENOTCONN, ESHUTDOWN, ECONNABORTED):
359
def recv(self, buffer_size):
361
data = self.socket.recv(buffer_size)
363
# a closed connection is indicated by signaling
364
# a read condition, and having recv() return 0.
369
except socket.error as why:
370
# winsock sometimes throws ENOTCONN
371
if why.args[0] in [ECONNRESET, ENOTCONN, ESHUTDOWN, ECONNABORTED]:
378
self.connected = False
379
self.accepting = False
383
except socket.error as why:
384
if why.args[0] not in (ENOTCONN, EBADF):
387
# cheap inheritance, used to pass all other attribute
388
# references to the underlying socket object.
389
def __getattr__(self, attr):
390
return getattr(self.socket, attr)
392
# log and log_info may be overridden to provide more sophisticated
393
# logging and warning methods. In general, log is for 'hit' logging
394
# and 'log_info' is for informational, warning and error logging.
396
def log(self, message):
397
sys.stderr.write('log: %s\n' % str(message))
399
def log_info(self, message, type='info'):
400
if __debug__ or type != 'info':
401
print('%s: %s' % (type, message))
403
def handle_read_event(self):
405
# accepting sockets are never connected, they "spawn" new
406
# sockets that are connected
408
elif not self.connected:
409
self.handle_connect_event()
414
def handle_connect_event(self):
415
self.connected = True
416
self.handle_connect()
418
def handle_write_event(self):
420
# Accepting sockets shouldn't get a write event.
421
# We will pretend it didn't happen.
424
if not self.connected:
426
err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
428
raise socket.error(err, _strerror(err))
430
self.handle_connect_event()
433
def handle_expt_event(self):
434
# if the handle_expt is the same default worthless method,
435
# we'll not even bother calling it, we'll instead generate
439
y1 = self.handle_expt.__func__
440
y2 = dispatcher.handle_expt
442
except AttributeError:
446
err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
449
raise socket.error(err, msg)
453
def handle_error(self):
454
nil, t, v, tbinfo = compact_traceback()
456
# sometimes a user repr method will crash.
458
self_repr = repr(self)
460
self_repr = '<__repr__(self) failed for object at %0x>' % id(self)
463
'uncaptured python exception, closing channel %s (%s:%s %s)' % (
473
def handle_expt(self):
474
self.log_info('unhandled exception', 'warning')
476
def handle_read(self):
477
self.log_info('unhandled read event', 'warning')
479
def handle_write(self):
480
self.log_info('unhandled write event', 'warning')
482
def handle_connect(self):
483
self.log_info('unhandled connect event', 'warning')
485
def handle_accept(self):
486
self.log_info('unhandled accept event', 'warning')
488
def handle_close(self):
489
self.log_info('unhandled close event', 'warning')
492
# ---------------------------------------------------------------------------
493
# adds simple buffered output capability, useful for simple clients.
494
# [for more sophisticated usage use asynchat.async_chat]
495
# ---------------------------------------------------------------------------
497
class dispatcher_with_send(dispatcher):
499
def __init__(self, sock=None, map=None):
500
dispatcher.__init__(self, sock, map)
501
self.out_buffer = b''
503
def initiate_send(self):
505
num_sent = dispatcher.send(self, self.out_buffer[:512])
506
self.out_buffer = self.out_buffer[num_sent:]
508
def handle_write(self):
512
return (not self.connected) or len(self.out_buffer)
514
def send(self, data):
516
self.log_info('sending %s' % repr(data))
517
self.out_buffer = self.out_buffer + data
520
# ---------------------------------------------------------------------------
521
# used for debugging.
522
# ---------------------------------------------------------------------------
524
def compact_traceback():
525
t, v, tb = sys.exc_info()
527
if not tb: # Must have a traceback
528
raise AssertionError("traceback does not exist")
531
tb.tb_frame.f_code.co_filename,
532
tb.tb_frame.f_code.co_name,
540
file, function, line = tbinfo[-1]
541
info = ' '.join(['[%s|%s|%s]' % x for x in tbinfo])
542
return (file, function, line), t, v, info
544
def close_all(map=None, ignore_all=False):
547
for x in list(map.values()):
551
if x.args[0] == EBADF:
555
except (ExitNow, KeyboardInterrupt, SystemExit):
562
# Asynchronous File I/O:
564
# After a little research (reading man pages on various unixen, and
565
# digging through the linux kernel), I've determined that select()
566
# isn't meant for doing asynchronous file i/o.
567
# Heartening, though - reading linux/mm/filemap.c shows that linux
568
# supports asynchronous read-ahead. So _MOST_ of the time, the data
569
# will be sitting in memory for us already when we go to read it.
571
# What other OS's (besides NT) support async file i/o? [VMS?]
573
# Regardless, this is useful for pipes, and stdin/stdout...
575
if os.name == 'posix':
579
# Here we override just enough to make a file
580
# look like a socket for the purposes of asyncore.
581
# The passed fd is automatically os.dup()'d
583
def __init__(self, fd):
586
def recv(self, *args):
587
return os.read(self.fd, *args)
589
def send(self, *args):
590
return os.write(self.fd, *args)
601
class file_dispatcher(dispatcher):
603
def __init__(self, fd, map=None):
604
dispatcher.__init__(self, None, map)
605
self.connected = True
608
except AttributeError:
611
# set it to non-blocking mode
612
flags = fcntl.fcntl(fd, fcntl.F_GETFL, 0)
613
flags = flags | os.O_NONBLOCK
614
fcntl.fcntl(fd, fcntl.F_SETFL, flags)
616
def set_file(self, fd):
617
self.socket = file_wrapper(fd)
618
self._fileno = self.socket.fileno()