159
160
"Remove SOCKET from the table of rpc connections."
160
161
(let ((server (socket-peer-address socket))
161
162
(port (socket-peer-port socket)))
162
(when (eq (server-socket server port) socket)
163
(let ((data (socket-data socket)))
163
(when (eq (table-ref socket-cache (cons server port)) socket)
164
(table-unset socket-cache (cons server port)))
165
(let ((data (socket-data socket)))
167
(close-socket socket)
164
168
(when (socket-closable-p data)
165
169
(close-socket socket))
166
(table-unset socket-cache (cons server port))
167
(table-unset socket-data-table socket)))))
170
(table-unset socket-data-table socket)
171
;; fail-out any pending calls on this socket
173
(dispatch-pending-call
175
(list 'rpc-error "Lost connection" server port)))
176
(socket-pending-calls data))))))
169
178
;; Return the data structure associated with SOCKET
170
179
(define (socket-data socket) (table-ref socket-data-table socket))
183
;; maps from ID -> (CALLBACK ERROR? VALUE)
184
(define pending-calls (make-table eq-hash eq))
186
;; XXX make this unspoofable
190
(setq counter (1+ counter)))))
192
(define (record-pending-call socket id callback)
193
(table-set pending-calls id callback)
194
(let ((data (socket-data socket)))
195
(socket-pending-calls-set! data (cons id (socket-pending-calls data)))))
197
(define (dispatch-pending-call socket id succeeded value)
198
(let ((data (socket-data socket)))
199
(socket-pending-calls-set! data (delq id (socket-pending-calls data))))
200
(let ((callback (table-ref pending-calls id)))
202
(table-unset pending-calls id)
203
(callback succeeded value))))
174
205
(define (rpc-socket-listener master-socket)
175
206
"The function that should be used to listen for connections on rpc
197
228
(define (rpc-output-handler socket output)
198
229
"The function used to handle any OUTPUT from SOCKET."
199
(debug "Read: %S\n" output)
200
230
(let ((sock-data (socket-data socket)))
201
(when (socket-pending-data sock-data)
202
(setq output (concat (socket-pending-data sock-data) output))
203
(socket-pending-data-set! sock-data nil))
204
(let ((stream (make-string-input-stream output))
231
(socket-pending-data-set!
232
sock-data (concat (socket-pending-data sock-data) output))
233
;;(debug "Input: %S\n" (socket-pending-data sock-data))
236
(let ((stream (make-string-input-stream
237
(socket-pending-data sock-data)))
209
239
(condition-case nil
210
240
(setq form (read stream))
211
241
((premature-end-of-stream end-of-stream)
213
243
((invalid-read-syntax)
214
(error "Can't parse rpc message: %S" (substring output point))))
244
(error "Can't parse rpc message: %S"
245
(socket-pending-data sock-data))))
216
247
(debug "Parsed: %S\n" form)
249
;; this function may be called reentrantly, so make sure the
250
;; state is always consistent..
251
(socket-pending-data-set!
252
;; stream is (STRING . POINT)
253
sock-data (substring (cdr stream) (car stream)))
220
(unless (socket-result-pending sock-data)
221
(error "Spurious result on %s" socket))
222
((socket-result-pending sock-data) form))
225
(let ((send-result t))
227
;; vectors denote async requests
228
(setq send-result nil)
229
(setq form (vector->list form)))
257
;; (result CALL-ID RETURNED? VALUE-OR-EXCEPTION)
258
(let ((id (nth 1 form))
259
(succeeded (nth 2 form))
260
(value (nth 3 form)))
261
(dispatch-pending-call socket id succeeded value)))
264
;; (call CALL-ID SERVANT-ID ARGS...)
265
(let ((id (nth 1 form))
266
(servant-id (nth 2 form))
267
(args (nthcdr 3 form)))
230
268
(let ((result (call-with-exception-handler
232
(let ((impl (servant-ref (car form)))
270
(let ((impl (servant-ref servant-id)))
235
(error "No such RPC servant: %s"
273
"No such RPC servant: %s" servant-id))
237
274
(let-fluids ((active-socket socket))
238
(cons '#t (apply impl args)))))
275
(list t (apply impl args)))))
242
(debug "Wrote: %S\n" result)
243
(write socket (prin1-to-string result)))))))
244
(setq point (car stream))))
245
(when (< point (length output))
246
(socket-pending-data-set! sock-data (substring output point))))))
248
;; Wait for an rpc response on SOCKET. Parse it and either return the
249
;; value or raise the exception
250
(define (wait-for-reponse socket)
251
(let ((old-vector (socket-result-pending (socket-data socket)))
253
(define (result-callback value)
254
(debug "Result: %S\n" value)
256
(socket-result-pending-set! (socket-data socket) result-callback)
259
(accept-process-output 60))
260
(socket-result-pending-set! (socket-data socket) old-vector))
261
(if (eq (car result) '#t)
265
(raise-exception (cdr result)))))
279
(let ((response (list* 'result id result)))
280
(debug "Wrote: %S\n" response)
281
(write socket (prin1-to-string response)))))))))))))
283
(define (invoke-method socket id callback servant-id args)
284
(record-pending-call socket id callback)
285
(let ((request (list* 'call id servant-id args)))
286
(debug "Wrote: %S\n" request)
287
(write socket (prin1-to-string request))))
289
(define (invoke-oneway-method socket servant-id args)
290
(let ((request (list* 'call nil servant-id args)))
291
(debug "Wrote: %S\n" request)
292
(write socket (prin1-to-string request))))
294
(define (synchronous-method-call socket servant-id args)
295
(let ((id (make-call-id))
298
(invoke-method socket id
305
(accept-process-output 60))
308
(raise-exception value))))
310
(define (asynchronous-method-call socket callback servant-id args)
311
(invoke-method socket (make-call-id) callback servant-id args))
313
(define (oneway-method-call socket servant-id args)
314
(invoke-oneway-method socket servant-id args))
267
316
(define (rpc-create-server)
268
317
"Start listening for rpc connections on the current machine"
313
362
;; magic object used to get information from proxies
314
363
(define proxy-token (cons))
316
;; XXX shouldn't keep consing new proxies..
365
;; table mapping GLOBAL-ID -> PROXY-WEAK-REF
366
(define proxy-table (make-table string-hash string=))
317
368
(define (make-proxy server port servant-id)
318
369
(let ((global-id (make-global-id server port servant-id)))
320
(if (eq (car args) proxy-token)
321
;; when called like this, do special things
323
((global-id) global-id)
325
((servant-id) servant-id)
328
;; async request - no result required
329
(let ((socket (server-socket server port)))
330
(debug "Wrote: %S\n" (cons servant-id (cddr args)))
331
(write socket (prin1-to-string
332
;; cheap hack, vectors mean async
333
(apply vector (cons servant-id
336
;; otherwise, just forward to the server
337
(let ((socket (server-socket server port)))
338
(debug "Wrote: %S\n" (cons servant-id args))
339
(write socket (prin1-to-string (cons servant-id args)))
340
(wait-for-reponse socket))))))
342
(define (async-rpc-call proxy . args)
373
(if (eq (car args) proxy-token)
374
;; when called like this, do special things
376
((global-id) global-id)
378
((servant-id) servant-id)
381
;; async request - no result required
383
(server-socket server port) servant-id (cddr args)))
386
(asynchronous-method-call
387
(server-socket server port)
388
(caddr args) servant-id (cdddr args))))
390
;; otherwise, just forward to the server
391
(synchronous-method-call
392
(server-socket server port) servant-id args))))
394
;; Avoid consing a new proxy each time..
395
(let ((ref (table-ref proxy-table global-id)))
402
(table-set proxy-table global-id (make-weak-ref p))
405
(define (async-rpc-call proxy #!key callback . args)
343
406
"Call the rpc proxy function PROXY with arguments ARGS. It will be called
344
asynchronously - no result will be returned from the remote function."
345
(apply proxy proxy-token 'async args))
407
asynchronously. No result will be returned from the remote function
408
unless CALLBACK is given, in which case (CALLBACK STATUS VALUE) will be
409
called at some point in the future."
411
(apply proxy proxy-token 'async callback args)
412
(apply proxy proxy-token 'oneway args))
347
415
(define (rpc-proxy->global-id proxy)
348
416
"Return the globally-valid servant-id (a string) that can be used to