4
Python WebSocket library with support for "wss://" encryption.
5
Copyright 2011 Joel Martin
6
Licensed under LGPL version 3 (see docs/LICENSE.LGPL-3)
8
Supports following protocol versions:
9
- http://tools.ietf.org/html/draft-hixie-thewebsocketprotocol-75
10
- http://tools.ietf.org/html/draft-hixie-thewebsocketprotocol-76
11
- http://tools.ietf.org/html/draft-ietf-hybi-thewebsocketprotocol-10
13
You can make a cert/key with openssl using:
14
openssl req -new -x509 -days 365 -nodes -out self.pem -keyout self.pem
15
as taken from http://docs.python.org/dev/library/ssl.html#certificates
19
import os, sys, time, errno, signal, socket, traceback, select
21
from base64 import b64encode, b64decode
23
# Imports that vary by python version
25
# python 3.0 differences
26
if sys.hexversion > 0x3000000:
27
b2s = lambda buf: buf.decode('latin_1')
28
s2b = lambda s: s.encode('latin_1')
31
b2s = lambda buf: buf # No-op
32
s2b = lambda s: s # No-op
33
s2a = lambda s: [ord(c) for c in s]
34
try: from io import StringIO
35
except: from cStringIO import StringIO
36
try: from http.server import SimpleHTTPRequestHandler
37
except: from SimpleHTTPServer import SimpleHTTPRequestHandler
39
# python 2.6 differences
40
try: from hashlib import md5, sha1
41
except: from md5 import md5; from sha import sha as sha1
43
# python 2.5 differences
45
from struct import pack, unpack_from
47
from struct import pack
48
def unpack_from(fmt, buf, offset=0):
49
slice = buffer(buf, offset, struct.calcsize(fmt))
50
return struct.unpack(fmt, slice)
52
# Degraded functionality if these imports are missing
53
for mod, sup in [('numpy', 'HyBi protocol'), ('ssl', 'TLS/SSL/wss'),
54
('multiprocessing', 'Multi-Processing'),
55
('resource', 'daemonizing')]:
57
globals()[mod] = __import__(mod)
60
print("WARNING: no '%s' module, %s is slower or disabled" % (
62
if multiprocessing and sys.platform == 'win32':
63
# make sockets pickle-able/inheritable
64
import multiprocessing.reduction
67
class WebSocketServer(object):
69
WebSockets server class.
70
Must be sub-classed with new_client method definition.
76
server_handshake_hixie = """HTTP/1.1 101 Web Socket Protocol Handshake\r
79
%sWebSocket-Origin: %s\r
80
%sWebSocket-Location: %s://%s%s\r
83
server_handshake_hybi = """HTTP/1.1 101 Switching Protocols\r
86
Sec-WebSocket-Accept: %s\r
89
GUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"
91
policy_response = """<cross-domain-policy><allow-access-from domain="*" to-ports="*" /></cross-domain-policy>\n"""
93
# An exception before the WebSocket connection was established
94
class EClose(Exception):
97
# An exception while the WebSocket client was connected
98
class CClose(Exception):
101
def __init__(self, listen_host='', listen_port=None, source_is_ipv6=False,
102
verbose=False, cert='', key='', ssl_only=None,
103
daemon=False, record='', web='',
104
run_once=False, timeout=0, idle_timeout=0):
107
self.verbose = verbose
108
self.listen_host = listen_host
109
self.listen_port = listen_port
110
self.prefer_ipv6 = source_is_ipv6
111
self.ssl_only = ssl_only
113
self.run_once = run_once
114
self.timeout = timeout
115
self.idle_timeout = idle_timeout
117
self.launch_time = time.time()
118
self.ws_connection = False
121
# Make paths settings absolute
122
self.cert = os.path.abspath(cert)
123
self.key = self.web = self.record = ''
125
self.key = os.path.abspath(key)
127
self.web = os.path.abspath(web)
129
self.record = os.path.abspath(record)
135
if not ssl and self.ssl_only:
136
raise Exception("No 'ssl' module and SSL-only specified")
137
if self.daemon and not resource:
138
raise Exception("Module 'resource' required to daemonize")
141
print("WebSocket server settings:")
142
print(" - Listen on %s:%s" % (
143
self.listen_host, self.listen_port))
144
print(" - Flash security policy server")
146
print(" - Web server. Web root: %s" % self.web)
148
if os.path.exists(self.cert):
149
print(" - SSL/TLS support")
151
print(" - Deny non-SSL/TLS connections")
153
print(" - No SSL/TLS support (no cert file)")
155
print(" - No SSL/TLS support (no 'ssl' module)")
157
print(" - Backgrounding (daemon)")
159
print(" - Recording to '%s.*'" % self.record)
162
# WebSocketServer static methods
166
def socket(host, port=None, connect=False, prefer_ipv6=False, unix_socket=None, use_ssl=False):
167
""" Resolve a host (and optional port) to an IPv4 or IPv6
168
address. Create a socket. Bind to it if listen is set,
169
otherwise connect to it. Return the socket.
174
if connect and not (port or unix_socket):
175
raise Exception("Connect mode requires a port")
176
if use_ssl and not ssl:
177
raise Exception("SSL socket requested but Python SSL module not loaded.");
178
if not connect and use_ssl:
179
raise Exception("SSL only supported in connect mode (for now)")
181
flags = flags | socket.AI_PASSIVE
184
addrs = socket.getaddrinfo(host, port, 0, socket.SOCK_STREAM,
185
socket.IPPROTO_TCP, flags)
187
raise Exception("Could not resolve host '%s'" % host)
188
addrs.sort(key=lambda x: x[0])
191
sock = socket.socket(addrs[0][0], addrs[0][1])
193
sock.connect(addrs[0][4])
195
sock = ssl.wrap_socket(sock)
197
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
198
sock.bind(addrs[0][4])
201
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
202
sock.connect(unix_socket)
207
def daemonize(keepfd=None, chdir='/'):
213
os.setgid(os.getgid()) # relinquish elevations
214
os.setuid(os.getuid()) # relinquish elevations
216
# Double fork to daemonize
217
if os.fork() > 0: os._exit(0) # Parent exits
218
os.setsid() # Obtain new process group
219
if os.fork() > 0: os._exit(0) # Parent exits
222
def terminate(a,b): os._exit(0)
223
signal.signal(signal.SIGTERM, terminate)
224
signal.signal(signal.SIGINT, signal.SIG_IGN)
227
maxfd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
228
if maxfd == resource.RLIM_INFINITY: maxfd = 256
229
for fd in reversed(range(maxfd)):
234
_, exc, _ = sys.exc_info()
235
if exc.errno != errno.EBADF: raise
237
# Redirect I/O to /dev/null
238
os.dup2(os.open(os.devnull, os.O_RDWR), sys.stdin.fileno())
239
os.dup2(os.open(os.devnull, os.O_RDWR), sys.stdout.fileno())
240
os.dup2(os.open(os.devnull, os.O_RDWR), sys.stderr.fileno())
243
def unmask(buf, hlen, plen):
249
mask = numpy.frombuffer(buf, dtype=numpy.dtype('<u4'),
250
offset=hlen, count=1)
251
data = numpy.frombuffer(buf, dtype=numpy.dtype('<u4'),
252
offset=pstart, count=int(plen / 4))
253
#b = numpy.bitwise_xor(data, mask).data
254
b = numpy.bitwise_xor(data, mask).tostring()
257
#print("Partial unmask")
258
mask = numpy.frombuffer(buf, dtype=numpy.dtype('B'),
259
offset=hlen, count=(plen % 4))
260
data = numpy.frombuffer(buf, dtype=numpy.dtype('B'),
261
offset=pend - (plen % 4),
263
c = numpy.bitwise_xor(data, mask).tostring()
267
mask = buf[hlen:hlen+4]
268
data = array.array('B')
270
data.fromstring(buf[pstart:pend])
271
for i in range(len(data)):
272
data[i] ^= mask[i % 4]
273
return data.tostring()
276
def encode_hybi(buf, opcode, base64=False):
277
""" Encode a HyBi style WebSocket frame.
280
0x1 - text frame (base64 encode buf)
281
0x2 - binary frame (use raw buf)
282
0x8 - connection close
289
b1 = 0x80 | (opcode & 0x0f) # FIN + opcode
290
payload_len = len(buf)
291
if payload_len <= 125:
292
header = pack('>BB', b1, payload_len)
293
elif payload_len > 125 and payload_len < 65536:
294
header = pack('>BBH', b1, 126, payload_len)
295
elif payload_len >= 65536:
296
header = pack('>BBQ', b1, 127, payload_len)
298
#print("Encoded: %s" % repr(header + buf))
300
return header + buf, len(header), 0
303
def decode_hybi(buf, base64=False):
304
""" Decode HyBi style WebSocket packets.
309
'hlen' : header_bytes_number,
310
'length' : payload_bytes_number,
311
'payload' : decoded_buffer,
312
'left' : bytes_left_number,
313
'close_code' : number,
314
'close_reason' : string}
331
return f # Incomplete frame header
333
b1, b2 = unpack_from(">BB", buf)
334
f['opcode'] = b1 & 0x0f
335
f['fin'] = (b1 & 0x80) >> 7
336
f['masked'] = (b2 & 0x80) >> 7
338
f['length'] = b2 & 0x7f
340
if f['length'] == 126:
343
return f # Incomplete frame header
344
(f['length'],) = unpack_from('>xxH', buf)
345
elif f['length'] == 127:
348
return f # Incomplete frame header
349
(f['length'],) = unpack_from('>xxQ', buf)
351
full_len = f['hlen'] + f['masked'] * 4 + f['length']
353
if blen < full_len: # Incomplete frame
354
return f # Incomplete frame header
356
# Number of bytes that are part of the next frame(s)
357
f['left'] = blen - full_len
362
f['payload'] = WebSocketServer.unmask(buf, f['hlen'],
365
print("Unmasked frame: %s" % repr(buf))
366
f['payload'] = buf[(f['hlen'] + f['masked'] * 4):full_len]
368
if base64 and f['opcode'] in [1, 2]:
370
f['payload'] = b64decode(f['payload'])
372
print("Exception while b64decoding buffer: %s" %
376
if f['opcode'] == 0x08:
378
f['close_code'] = unpack_from(">H", f['payload'])[0]
380
f['close_reason'] = f['payload'][2:]
385
def encode_hixie(buf):
386
return s2b("\x00" + b2s(b64encode(buf)) + "\xff"), 1, 1
389
def decode_hixie(buf):
390
end = buf.find(s2b('\xff'))
391
return {'payload': b64decode(buf[1:end]),
395
'left': len(buf) - (end + 1)}
400
""" Generate hash value for WebSockets hixie-76. """
401
key1 = keys['Sec-WebSocket-Key1']
402
key2 = keys['Sec-WebSocket-Key2']
404
spaces1 = key1.count(" ")
405
spaces2 = key2.count(" ")
406
num1 = int("".join([c for c in key1 if c.isdigit()])) / spaces1
407
num2 = int("".join([c for c in key2 if c.isdigit()])) / spaces2
409
return b2s(md5(pack('>II8s',
410
int(num1), int(num2), key3)).digest())
413
# WebSocketServer logging/output functions
416
def traffic(self, token="."):
417
""" Show traffic flow in verbose mode. """
418
if self.verbose and not self.daemon:
419
sys.stdout.write(token)
423
""" Output message with handler_id prefix. """
425
print("% 3d: %s" % (self.handler_id, msg))
428
""" Same as msg() but only if verbose. """
433
# Main WebSocketServer methods
435
def send_frames(self, bufs=None):
436
""" Encode and send WebSocket frames. Any frames already
437
queued will be sent first. If buf is not set then only queued
438
frames will be sent. Returns the number of pending frames that
439
could not be fully sent. If returned pending frames is greater
440
than 0, then the caller should call again when the socket is
443
tdelta = int(time.time()*1000) - self.start_time
447
if self.version.startswith("hybi"):
449
encbuf, lenhead, lentail = self.encode_hybi(
450
buf, opcode=1, base64=True)
452
encbuf, lenhead, lentail = self.encode_hybi(
453
buf, opcode=2, base64=False)
456
encbuf, lenhead, lentail = self.encode_hixie(buf)
459
self.rec.write("%s,\n" %
461
+ encbuf[lenhead:len(encbuf)-lentail]))
463
self.send_parts.append(encbuf)
465
while self.send_parts:
466
# Send pending frames
467
buf = self.send_parts.pop(0)
468
sent = self.client.send(buf)
474
self.send_parts.insert(0, buf[sent:])
477
return len(self.send_parts)
479
def recv_frames(self):
480
""" Receive and decode WebSocket frames.
483
(bufs_list, closed_string)
488
tdelta = int(time.time()*1000) - self.start_time
490
buf = self.client.recv(self.buffer_size)
492
closed = {'code': 1000, 'reason': "Client closed abruptly"}
496
# Add partially received frames to current read buffer
497
buf = self.recv_part + buf
498
self.recv_part = None
501
if self.version.startswith("hybi"):
503
frame = self.decode_hybi(buf, base64=self.base64)
504
#print("Received buf: %s, frame: %s" % (repr(buf), frame))
506
if frame['payload'] == None:
507
# Incomplete/partial frame
509
if frame['left'] > 0:
510
self.recv_part = buf[-frame['left']:]
513
if frame['opcode'] == 0x8: # connection close
514
closed = {'code': frame['close_code'],
515
'reason': frame['close_reason']}
519
if buf[0:2] == s2b('\xff\x00'):
520
closed = {'code': 1000,
521
'reason': "Client sent orderly close frame"}
524
elif buf[0:2] == s2b('\x00\xff'):
528
elif buf.count(s2b('\xff')) == 0:
534
frame = self.decode_hixie(buf)
539
start = frame['hlen']
540
end = frame['hlen'] + frame['length']
542
recbuf = WebSocketServer.unmask(buf, frame['hlen'],
545
recbuf = buf[frame['hlen']:frame['hlen'] +
547
self.rec.write("%s,\n" %
548
repr("}%s}" % tdelta + recbuf))
551
bufs.append(frame['payload'])
554
buf = buf[-frame['left']:]
560
def send_close(self, code=1000, reason=''):
561
""" Send a WebSocket orderly close frame. """
563
if self.version.startswith("hybi"):
564
msg = pack(">H%ds" % len(reason), code, reason)
566
buf, h, t = self.encode_hybi(msg, opcode=0x08, base64=False)
567
self.client.send(buf)
569
elif self.version == "hixie-76":
570
buf = s2b('\xff\x00')
571
self.client.send(buf)
573
# No orderly close for 75
575
def do_websocket_handshake(self, headers, path):
576
h = self.headers = headers
579
prot = 'WebSocket-Protocol'
580
protocols = h.get('Sec-'+prot, h.get(prot, '')).split(',')
582
ver = h.get('Sec-WebSocket-Version')
584
# HyBi/IETF version of the protocol
586
# HyBi-07 report version 7
587
# HyBi-08 - HyBi-12 report version 8
588
# HyBi-13 reports version 13
589
if ver in ['7', '8', '13']:
590
self.version = "hybi-%02d" % int(ver)
592
raise self.EClose('Unsupported protocol version %s' % ver)
594
key = h['Sec-WebSocket-Key']
596
# Choose binary if client supports it
597
if 'binary' in protocols:
599
elif 'base64' in protocols:
602
raise self.EClose("Client must support 'binary' or 'base64' protocol")
604
# Generate the hash value for the accept header
605
accept = b64encode(sha1(s2b(key + self.GUID)).digest())
607
response = self.server_handshake_hybi % b2s(accept)
609
response += "Sec-WebSocket-Protocol: base64\r\n"
611
response += "Sec-WebSocket-Protocol: binary\r\n"
615
# Hixie version of the protocol (75 or 76)
618
trailer = self.gen_md5(h)
620
self.version = "hixie-76"
624
self.version = "hixie-75"
626
# We only support base64 in Hixie era
629
response = self.server_handshake_hixie % (pre,
630
h['Origin'], pre, self.scheme, h['Host'], path)
632
if 'base64' in protocols:
633
response += "%sWebSocket-Protocol: base64\r\n" % pre
635
self.msg("Warning: client does not report 'base64' protocol support")
636
response += "\r\n" + trailer
641
def do_handshake(self, sock, address):
643
do_handshake does the following:
644
- Peek at the first few bytes from the socket.
645
- If the connection is Flash policy request then answer it,
646
close the socket and return.
647
- If the connection is an HTTPS/SSL/TLS connection then SSL
649
- Read from the (possibly wrapped) socket.
650
- If we have received a HTTP GET request and the webserver
651
functionality is enabled, answer it, close the socket and
653
- Assume we have a WebSockets connection, parse the client
655
- Send a WebSockets handshake server response.
656
- Return the socket for this WebSocket client.
659
ready = select.select([sock], [], [], 3)[0]
663
raise self.EClose("ignoring socket not ready")
664
# Peek, but do not read the data so that we have a opportunity
665
# to SSL wrap the socket first
666
handshake = sock.recv(1024, socket.MSG_PEEK)
667
#self.msg("Handshake [%s]" % handshake)
670
raise self.EClose("ignoring empty handshake")
672
elif handshake.startswith(s2b("<policy-file-request/>")):
673
# Answer Flash policy request
674
handshake = sock.recv(1024)
675
sock.send(s2b(self.policy_response))
676
raise self.EClose("Sending flash policy response")
678
elif handshake[0] in ("\x16", "\x80", 22, 128):
679
# SSL wrap the connection
681
raise self.EClose("SSL connection but no 'ssl' module")
682
if not os.path.exists(self.cert):
683
raise self.EClose("SSL connection but '%s' not found"
687
retsock = ssl.wrap_socket(
693
_, x, _ = sys.exc_info()
694
if x.args[0] == ssl.SSL_ERROR_EOF:
696
raise self.EClose(x.args[1])
698
raise self.EClose("Got SSL_ERROR_EOF")
703
stype = "SSL/TLS (wss://)"
706
raise self.EClose("non-SSL connection received but disallowed")
711
stype = "Plain non-SSL (ws://)"
713
wsh = WSRequestHandler(retsock, address, not self.web)
714
if wsh.last_code == 101:
715
# Continue on to handle WebSocket upgrade
717
elif wsh.last_code == 405:
718
raise self.EClose("Normal web request received but disallowed")
719
elif wsh.last_code < 200 or wsh.last_code >= 300:
720
raise self.EClose(wsh.last_message)
722
raise self.EClose(wsh.last_message)
724
raise self.EClose("")
726
response = self.do_websocket_handshake(wsh.headers, wsh.path)
728
self.msg("%s: %s WebSocket connection" % (address[0], stype))
729
self.msg("%s: Version %s, base64: '%s'" % (address[0],
730
self.version, self.base64))
732
self.msg("%s: Path: '%s'" % (address[0], self.path))
735
# Send server WebSockets handshake response
736
#self.msg("sending response [%s]" % response)
737
retsock.send(s2b(response))
739
# Return the WebSockets socket which may be SSL wrapped
744
# Events that can/should be overridden in sub-classes
747
""" Called after WebSockets startup """
748
self.vmsg("WebSockets server started")
751
""" Run periodically while waiting for connections. """
752
#self.vmsg("Running poll()")
755
def fallback_SIGCHLD(self, sig, stack):
756
# Reap zombies when using os.fork() (python 2.4)
757
self.vmsg("Got SIGCHLD, reaping zombies")
759
result = os.waitpid(-1, os.WNOHANG)
761
self.vmsg("Reaped child process %s" % result[0])
762
result = os.waitpid(-1, os.WNOHANG)
766
def do_SIGINT(self, sig, stack):
767
self.msg("Got SIGINT, exiting")
770
def top_new_client(self, startsock, address):
771
""" Do something with a WebSockets client connection. """
772
# Initialize per client settings
774
self.recv_part = None
777
self.start_time = int(time.time()*1000)
782
self.client = self.do_handshake(startsock, address)
785
# Record raw frame data as JavaScript array
786
fname = "%s.%s" % (self.record,
788
self.msg("opening record file: %s" % fname)
789
self.rec = open(fname, 'w+')
791
if self.base64: encoding = "base64"
792
self.rec.write("var VNC_frame_encoding = '%s';\n"
794
self.rec.write("var VNC_frame_data = [\n")
796
self.ws_connection = True
800
_, exc, _ = sys.exc_info()
802
self.send_close(exc.args[0], exc.args[1])
804
_, exc, _ = sys.exc_info()
805
# Connection was not a WebSockets connection
807
self.msg("%s: %s" % (address[0], exc.args[0]))
809
_, exc, _ = sys.exc_info()
810
self.msg("handler exception: %s" % str(exc))
812
self.msg(traceback.format_exc())
815
self.rec.write("'EOF'];\n")
818
if self.client and self.client != startsock:
819
# Close the SSL wrapped socket
820
# Original socket closed by caller
823
def new_client(self):
824
""" Do something with a WebSockets client connection. """
825
raise("WebSocketServer.new_client() must be overloaded")
827
def start_server(self):
829
Daemonize if requested. Listen for for connections. Run
830
do_handshake() method for each connection. If the connection
831
is a WebSockets client then call new_client() method (which must
832
be overridden) for each new client connection.
834
lsock = self.socket(self.listen_host, self.listen_port, False, self.prefer_ipv6)
837
self.daemonize(keepfd=lsock.fileno(), chdir=self.web)
839
self.started() # Some things need to happen after daemonizing
841
# Allow override of SIGINT
842
signal.signal(signal.SIGINT, self.do_SIGINT)
843
if not multiprocessing:
844
# os.fork() (python 2.4) child reaper
845
signal.signal(signal.SIGCHLD, self.fallback_SIGCHLD)
847
last_active_time = self.launch_time
856
if multiprocessing and self.idle_timeout:
857
child_count = len(multiprocessing.active_children())
859
time_elapsed = time.time() - self.launch_time
860
if self.timeout and time_elapsed > self.timeout:
861
self.msg('listener exit due to --timeout %s'
865
if self.idle_timeout:
868
idle_time = time.time() - last_active_time
871
last_active_time = time.time()
873
if idle_time > self.idle_timeout and child_count == 0:
874
self.msg('listener exit due to --idle-timeout %s'
881
ready = select.select([lsock], [], [], 1)[0]
883
startsock, address = lsock.accept()
887
_, exc, _ = sys.exc_info()
888
if hasattr(exc, 'errno'):
890
elif hasattr(exc, 'args'):
894
if err == errno.EINTR:
895
self.vmsg("Ignoring interrupted syscall")
901
# Run in same process if run_once
902
self.top_new_client(startsock, address)
903
if self.ws_connection :
904
self.msg('%s: exiting due to --run-once'
907
elif multiprocessing:
908
self.vmsg('%s: new handler Process' % address[0])
909
p = multiprocessing.Process(
910
target=self.top_new_client,
911
args=(startsock, address))
913
# child will not return
916
self.vmsg('%s: forking handler' % address[0])
919
# child handler process
920
self.top_new_client(startsock, address)
921
break # child process exits
926
except KeyboardInterrupt:
927
_, exc, _ = sys.exc_info()
928
print("In KeyboardInterrupt")
931
_, exc, _ = sys.exc_info()
932
print("In SystemExit")
935
_, exc, _ = sys.exc_info()
936
self.msg("handler exception: %s" % str(exc))
938
self.msg(traceback.format_exc())
945
self.vmsg("Closing socket listening at %s:%s"
946
% (self.listen_host, self.listen_port))
950
# HTTP handler with WebSocket upgrade support
951
class WSRequestHandler(SimpleHTTPRequestHandler):
952
def __init__(self, req, addr, only_upgrade=False):
953
self.only_upgrade = only_upgrade # only allow upgrades
954
SimpleHTTPRequestHandler.__init__(self, req, addr, object())
957
if (self.headers.get('upgrade') and
958
self.headers.get('upgrade').lower() == 'websocket'):
960
if (self.headers.get('sec-websocket-key1') or
961
self.headers.get('websocket-key1')):
962
# For Hixie-76 read out the key hash
963
self.headers.__setitem__('key3', self.rfile.read(8))
965
# Just indicate that an WebSocket upgrade is needed
967
self.last_message = "101 Switching Protocols"
968
elif self.only_upgrade:
969
# Normal web request responses are disabled
971
self.last_message = "405 Method Not Allowed"
973
SimpleHTTPRequestHandler.do_GET(self)
975
def send_response(self, code, message=None):
976
# Save the status code
977
self.last_code = code
978
SimpleHTTPRequestHandler.send_response(self, code, message)
980
def log_message(self, f, *args):
981
# Save instead of printing
982
self.last_message = f % args