1
mergeInto(LibraryManager.library, {
2
$SOCKFS__postset: '__ATINIT__.push({ func: function() { SOCKFS.root = FS.mount(SOCKFS, {}, null); } });',
3
$SOCKFS__deps: ['$FS'],
5
mount: function(mount) {
6
return FS.createNode(null, '/', {{{ cDefine('S_IFDIR') }}} | 0777, 0);
9
if (!SOCKFS.nextname.current) {
10
SOCKFS.nextname.current = 0;
12
return 'socket[' + (SOCKFS.nextname.current++) + ']';
14
createSocket: function(family, type, protocol) {
15
var streaming = type == {{{ cDefine('SOCK_STREAM') }}};
17
assert(streaming == (protocol == {{{ cDefine('IPPROTO_TCP') }}})); // if SOCK_STREAM, must be tcp
20
// create our internal socket structure
31
sock_ops: SOCKFS.websocket_sock_ops
35
// create the filesystem node to store the socket structure
36
var name = SOCKFS.nextname();
37
var node = FS.createNode(SOCKFS.root, name, {{{ cDefine('S_IFSOCK') }}}, 0);
40
// and the wrapping stream that enables library functions such
41
// as read and write to indirectly interact with the socket
42
var stream = FS.createStream({
45
flags: FS.modeStringToFlags('r+'),
47
stream_ops: SOCKFS.stream_ops
50
// map the new stream to the socket structure (sockets have a 1:1
51
// relationship with a stream)
56
getSocket: function(fd) {
57
var stream = FS.getStream(fd);
58
if (!stream || !FS.isSocket(stream.node.mode)) {
61
return stream.node.sock;
63
// node and stream ops are backend agnostic
65
poll: function(stream) {
66
var sock = stream.node.sock;
67
return sock.sock_ops.poll(sock);
69
ioctl: function(stream, request, varargs) {
70
var sock = stream.node.sock;
71
return sock.sock_ops.ioctl(sock, request, varargs);
73
read: function(stream, buffer, offset, length, position /* ignored */) {
74
var sock = stream.node.sock;
75
var msg = sock.sock_ops.recvmsg(sock, length);
80
#if USE_TYPED_ARRAYS == 2
81
buffer.set(msg.buffer, offset);
83
for (var i = 0; i < size; i++) {
84
buffer[offset + i] = msg.buffer[i];
87
return msg.buffer.length;
89
write: function(stream, buffer, offset, length, position /* ignored */) {
90
var sock = stream.node.sock;
91
return sock.sock_ops.sendmsg(sock, buffer, offset, length);
93
close: function(stream) {
94
var sock = stream.node.sock;
95
sock.sock_ops.close(sock);
98
// backend-specific stream ops
101
// peers are a small wrapper around a WebSocket to help in
102
// emulating dgram sockets
104
// these functions aren't actually sock_ops members, but we're
105
// abusing the namespace to organize them
107
createPeer: function(sock, addr, port) {
110
if (typeof addr === 'object') {
117
// for sockets that've already connected (e.g. we're the server)
118
// we can inspect the _socket property for the address
120
addr = ws._socket.remoteAddress;
121
port = ws._socket.remotePort;
123
// if we're just now initializing a connection to the remote,
124
// inspect the url property
126
var result = /ws[s]?:\/\/([^:]+):(\d+)/.exec(ws.url);
128
throw new Error('WebSocket URL must be in the format ws(s)://address:port');
131
port = parseInt(result[2], 10);
134
// create the actual websocket object and connect
136
var url = 'ws://' + addr + ':' + port;
138
console.log('connect: ' + url);
140
// the node ws library API is slightly different than the browser's
141
var opts = ENVIRONMENT_IS_NODE ? {} : ['binary'];
142
ws = new WebSocket(url, opts);
143
ws.binaryType = 'arraybuffer';
145
throw new FS.ErrnoError(ERRNO_CODES.EHOSTUNREACH);
150
Module.print('websocket adding peer: ' + addr + ':' + port);
160
SOCKFS.websocket_sock_ops.addPeer(sock, peer);
161
SOCKFS.websocket_sock_ops.handlePeerEvents(sock, peer);
163
// if this is a bound dgram socket, send the port number first to allow
164
// us to override the ephemeral port reported to us by remotePort on the
166
if (sock.type === {{{ cDefine('SOCK_DGRAM') }}} && typeof sock.sport !== 'undefined') {
168
Module.print('websocket queuing port message (port ' + sock.sport + ')');
170
peer.dgram_send_queue.push(new Uint8Array([
172
'p'.charCodeAt(0), 'o'.charCodeAt(0), 'r'.charCodeAt(0), 't'.charCodeAt(0),
173
((sock.sport & 0xff00) >> 8) , (sock.sport & 0xff)
179
getPeer: function(sock, addr, port) {
180
return sock.peers[addr + ':' + port];
182
addPeer: function(sock, peer) {
183
sock.peers[peer.addr + ':' + peer.port] = peer;
185
removePeer: function(sock, peer) {
186
delete sock.peers[peer.addr + ':' + peer.port];
188
handlePeerEvents: function(sock, peer) {
191
var handleOpen = function () {
193
Module.print('websocket handle open');
196
var queued = peer.dgram_send_queue.shift();
199
Module.print('websocket sending queued data (' + queued.byteLength + ' bytes): ' + [Array.prototype.slice.call(new Uint8Array(queued))]);
201
peer.socket.send(queued);
202
queued = peer.dgram_send_queue.shift();
205
// not much we can do here in the way of proper error handling as we've already
206
// lied and said this data was sent. shut it down.
211
var handleMessage = function(data) {
212
assert(typeof data !== 'string' && data.byteLength !== undefined); // must receive an ArrayBuffer
213
data = new Uint8Array(data); // make a typed array view on the array buffer
216
Module.print('websocket handle message (' + data.byteLength + ' bytes): ' + [Array.prototype.slice.call(data)]);
219
// if this is the port message, override the peer's port with it
220
var wasfirst = first;
223
data.length === 10 &&
224
data[0] === 255 && data[1] === 255 && data[2] === 255 && data[3] === 255 &&
225
data[4] === 'p'.charCodeAt(0) && data[5] === 'o'.charCodeAt(0) && data[6] === 'r'.charCodeAt(0) && data[7] === 't'.charCodeAt(0)) {
226
// update the peer's port and it's key in the peer map
227
var newport = ((data[8] << 8) | data[9]);
228
SOCKFS.websocket_sock_ops.removePeer(sock, peer);
230
SOCKFS.websocket_sock_ops.addPeer(sock, peer);
234
sock.recv_queue.push({ addr: peer.addr, port: peer.port, data: data });
237
if (ENVIRONMENT_IS_NODE) {
238
peer.socket.on('open', handleOpen);
239
peer.socket.on('message', function(data, flags) {
243
handleMessage((new Uint8Array(data)).buffer); // copy from node Buffer -> ArrayBuffer
245
peer.socket.on('error', function() {
249
peer.socket.onopen = handleOpen;
250
peer.socket.onmessage = function(event) {
251
handleMessage(event.data);
259
poll: function(sock) {
260
if (sock.type === {{{ cDefine('SOCK_STREAM') }}} && sock.server) {
261
// listen sockets should only say they're available for reading
262
// if there are pending clients.
263
return sock.pending.length ? ({{{ cDefine('POLLRDNORM') }}} | {{{ cDefine('POLLIN') }}}) : 0;
267
var dest = sock.type === {{{ cDefine('SOCK_STREAM') }}} ? // we only care about the socket state for connection-based sockets
268
SOCKFS.websocket_sock_ops.getPeer(sock, sock.daddr, sock.dport) :
271
if (sock.recv_queue.length ||
272
!dest || // connection-less sockets are always ready to read
273
(dest && dest.socket.readyState === dest.socket.CLOSING) ||
274
(dest && dest.socket.readyState === dest.socket.CLOSED)) { // let recv return 0 once closed
275
mask |= ({{{ cDefine('POLLRDNORM') }}} | {{{ cDefine('POLLIN') }}});
278
if (!dest || // connection-less sockets are always ready to write
279
(dest && dest.socket.readyState === dest.socket.OPEN)) {
280
mask |= {{{ cDefine('POLLOUT') }}};
283
if ((dest && dest.socket.readyState === dest.socket.CLOSING) ||
284
(dest && dest.socket.readyState === dest.socket.CLOSED)) {
285
mask |= {{{ cDefine('POLLHUP') }}};
290
ioctl: function(sock, request, arg) {
292
case {{{ cDefine('FIONREAD') }}}:
294
if (sock.recv_queue.length) {
295
bytes = sock.recv_queue[0].data.length;
297
{{{ makeSetValue('arg', '0', 'bytes', 'i32') }}};
300
return ERRNO_CODES.EINVAL;
303
close: function(sock) {
304
// if we've spawned a listen server, close it
312
// close any peer connections
313
var peers = Object.keys(sock.peers);
314
for (var i = 0; i < peers.length; i++) {
315
var peer = sock.peers[peers[i]];
320
SOCKFS.websocket_sock_ops.removePeer(sock, peer);
324
bind: function(sock, addr, port) {
325
if (typeof sock.saddr !== 'undefined' || typeof sock.sport !== 'undefined') {
326
throw new FS.ErrnoError(ERRNO_CODES.EINVAL); // already bound
329
sock.sport = port || _mkport();
330
// in order to emulate dgram sockets, we need to launch a listen server when
331
// binding on a connection-less socket
332
// note: this is only required on the server side
333
if (sock.type === {{{ cDefine('SOCK_DGRAM') }}}) {
334
// close the existing server if it exists
339
// swallow error operation not supported error that occurs when binding in the
340
// browser where this isn't supported
342
sock.sock_ops.listen(sock, 0);
344
if (!(e instanceof FS.ErrnoError)) throw e;
345
if (e.errno !== ERRNO_CODES.EOPNOTSUPP) throw e;
349
connect: function(sock, addr, port) {
351
throw new FS.ErrnoError(ERRNO_CODS.EOPNOTSUPP);
355
// if (!sock.addr && sock.type == {{{ cDefine('SOCK_DGRAM') }}}) {
358
// early out if we're already connected / in the middle of connecting
359
if (typeof sock.daddr !== 'undefined' && typeof sock.dport !== 'undefined') {
360
var dest = SOCKFS.websocket_sock_ops.getPeer(sock, sock.daddr, sock.dport);
362
if (dest.socket.readyState === dest.socket.CONNECTING) {
363
throw new FS.ErrnoError(ERRNO_CODES.EALREADY);
365
throw new FS.ErrnoError(ERRNO_CODES.EISCONN);
370
// add the socket to our peer list and set our
371
// destination address / port to match
372
var peer = SOCKFS.websocket_sock_ops.createPeer(sock, addr, port);
373
sock.daddr = peer.addr;
374
sock.dport = peer.port;
376
// always "fail" in non-blocking mode
377
throw new FS.ErrnoError(ERRNO_CODES.EINPROGRESS);
379
listen: function(sock, backlog) {
380
if (!ENVIRONMENT_IS_NODE) {
381
throw new FS.ErrnoError(ERRNO_CODES.EOPNOTSUPP);
384
throw new FS.ErrnoError(ERRNO_CODES.EINVAL); // already listening
386
var WebSocketServer = require('ws').Server;
387
var host = sock.saddr;
389
console.log('listen: ' + host + ':' + sock.sport);
391
sock.server = new WebSocketServer({
394
// TODO support backlog
397
sock.server.on('connection', function(ws) {
399
console.log('received connection from: ' + ws._socket.remoteAddress + ':' + ws._socket.remotePort);
401
if (sock.type === {{{ cDefine('SOCK_STREAM') }}}) {
402
var newsock = SOCKFS.createSocket(sock.family, sock.type, sock.protocol);
404
// create a peer on the new socket
405
var peer = SOCKFS.websocket_sock_ops.createPeer(newsock, ws);
406
newsock.daddr = peer.addr;
407
newsock.dport = peer.port;
409
// push to queue for accept to pick up
410
sock.pending.push(newsock);
412
// create a peer on the listen socket so calling sendto
413
// with the listen socket and an address will resolve
414
// to the correct client
415
SOCKFS.websocket_sock_ops.createPeer(sock, ws);
418
sock.server.on('closed', function() {
421
sock.server.on('error', function() {
425
accept: function(listensock) {
426
if (!listensock.server) {
427
throw new FS.ErrnoError(ERRNO_CODES.EINVAL);
429
var newsock = listensock.pending.shift();
430
newsock.stream.flags = listensock.stream.flags;
433
getname: function(sock, peer) {
436
if (sock.daddr === undefined || sock.dport === undefined) {
437
throw new FS.ErrnoError(ERRNO_CODES.ENOTCONN);
442
// TODO saddr and sport will be set for bind()'d UDP sockets, but what
443
// should we be returning for TCP sockets that've been connect()'d?
444
addr = sock.saddr || 0;
445
port = sock.sport || 0;
447
return { addr: addr, port: port };
449
sendmsg: function(sock, buffer, offset, length, addr, port) {
450
if (sock.type === {{{ cDefine('SOCK_DGRAM') }}}) {
451
// connection-less sockets will honor the message address,
452
// and otherwise fall back to the bound destination address
453
if (addr === undefined || port === undefined) {
457
// if there was no address to fall back to, error out
458
if (addr === undefined || port === undefined) {
459
throw new FS.ErrnoError(ERRNO_CODES.EDESTADDRREQ);
462
// connection-based sockets will only use the bound
467
// find the peer for the destination address
468
var dest = SOCKFS.websocket_sock_ops.getPeer(sock, addr, port);
470
// early out if not connected with a connection-based socket
471
if (sock.type === {{{ cDefine('SOCK_STREAM') }}}) {
472
if (!dest || dest.socket.readyState === dest.socket.CLOSING || dest.socket.readyState === dest.socket.CLOSED) {
473
throw new FS.ErrnoError(ERRNO_CODES.ENOTCONN);
474
} else if (dest.socket.readyState === dest.socket.CONNECTING) {
475
throw new FS.ErrnoError(ERRNO_CODES.EAGAIN);
479
// create a copy of the incoming data to send, as the WebSocket API
480
// doesn't work entirely with an ArrayBufferView, it'll just send
481
// the entire underlying buffer
483
if (buffer instanceof Array || buffer instanceof ArrayBuffer) {
484
data = buffer.slice(offset, offset + length);
485
} else { // ArrayBufferView
486
data = buffer.buffer.slice(buffer.byteOffset + offset, buffer.byteOffset + offset + length);
489
// if we're emulating a connection-less dgram socket and don't have
490
// a cached connection, queue the buffer to send upon connect and
491
// lie, saying the data was sent now.
492
if (sock.type === {{{ cDefine('SOCK_DGRAM') }}}) {
493
if (!dest || dest.socket.readyState !== dest.socket.OPEN) {
494
// if we're not connected, open a new connection
495
if (!dest || dest.socket.readyState === dest.socket.CLOSING || dest.socket.readyState === dest.socket.CLOSED) {
496
dest = SOCKFS.websocket_sock_ops.createPeer(sock, addr, port);
499
Module.print('websocket queuing (' + length + ' bytes): ' + [Array.prototype.slice.call(new Uint8Array(data))]);
501
dest.dgram_send_queue.push(data);
508
Module.print('websocket send (' + length + ' bytes): ' + [Array.prototype.slice.call(new Uint8Array(data))]);
510
// send the actual data
511
dest.socket.send(data);
514
throw new FS.ErrnoError(ERRNO_CODES.EINVAL);
517
recvmsg: function(sock, length) {
518
// http://pubs.opengroup.org/onlinepubs/7908799/xns/recvmsg.html
519
if (sock.type === {{{ cDefine('SOCK_STREAM') }}} && sock.server) {
520
// tcp servers should not be recv()'ing on the listen socket
521
throw new FS.ErrnoError(ERRNO_CODES.ENOTCONN);
524
var queued = sock.recv_queue.shift();
526
if (sock.type === {{{ cDefine('SOCK_STREAM') }}}) {
527
var dest = SOCKFS.websocket_sock_ops.getPeer(sock, sock.daddr, sock.dport);
530
// if we have a destination address but are not connected, error out
531
throw new FS.ErrnoError(ERRNO_CODES.ENOTCONN);
533
else if (dest.socket.readyState === dest.socket.CLOSING || dest.socket.readyState === dest.socket.CLOSED) {
534
// return null if the socket has closed
538
// else, our socket is in a valid state but truly has nothing available
539
throw new FS.ErrnoError(ERRNO_CODES.EAGAIN);
542
throw new FS.ErrnoError(ERRNO_CODES.EAGAIN);
546
// queued.data will be an ArrayBuffer if it's unadulterated, but if it's
547
// requeued TCP data it'll be an ArrayBufferView
548
var queuedLength = queued.data.byteLength || queued.data.length;
549
var queuedOffset = queued.data.byteOffset || 0;
550
var queuedBuffer = queued.data.buffer || queued.data;
551
var bytesRead = Math.min(length, queuedLength);
553
buffer: new Uint8Array(queuedBuffer, queuedOffset, bytesRead),
559
Module.print('websocket read (' + bytesRead + ' bytes): ' + [Array.prototype.slice.call(res.buffer)]);
562
// push back any unread data for TCP connections
563
if (sock.type === {{{ cDefine('SOCK_STREAM') }}} && bytesRead < queuedLength) {
564
var bytesRemaining = queuedLength - bytesRead;
566
Module.print('websocket read: put back ' + bytesRemaining + ' bytes');
568
queued.data = new Uint8Array(queuedBuffer, queuedOffset + bytesRead, bytesRemaining);
569
sock.recv_queue.unshift(queued);
b'\\ No newline at end of file'