~ubuntu-branches/ubuntu/trusty/librep/trusty

« back to all changes in this revision

Viewing changes to lisp/rep/net/rpc.jl

  • Committer: Bazaar Package Importer
  • Author(s): Christian Marillat
  • Date: 2005-01-14 14:18:11 UTC
  • mfrom: (2.1.2 hoary)
  • Revision ID: james.westby@ubuntu.com-20050114141811-k2x3wczuc17qai2v
Tags: 0.17-7
Build with -Oo for amd64

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
1
#| rep.net.rpc -- simple RPC mechanisms for inter-host communication
2
2
 
3
 
   $Id: rpc.jl,v 1.10 2001/08/10 02:38:24 jsh Exp $
 
3
   $Id: rpc.jl,v 1.13 2002/03/31 03:43:30 jsh Exp $
4
4
 
5
5
   Copyright (C) 2001 John Harper <jsh@pixelslut.com>
6
6
 
112
112
          rep.data.tables
113
113
          rep.data.records)
114
114
 
 
115
  (define debug-rpc nil)
 
116
 
115
117
  (define (debug fmt . args)
116
 
    (when nil
 
118
    (when debug-rpc
117
119
      (let ((print-escape t))
118
120
        (apply format standard-error fmt args))))
119
121
 
121
123
    (make-socket-data closable)
122
124
    ;; no predicate
123
125
    (pending-data socket-pending-data socket-pending-data-set!)
124
 
    (result-pending socket-result-pending socket-result-pending-set!)
125
 
    (closable socket-closable-p))
 
126
    (closable socket-closable-p)
 
127
    (pending-calls socket-pending-calls socket-pending-calls-set!))
126
128
 
127
129
  ;; The socket used to listen for connections to this server (or false)
128
130
  (define listener-socket nil)
140
142
  (define socket-data-table (make-weak-table eq-hash eq))
141
143
 
142
144
  ;; Return the socket associated with SERVER:PORT. If there isn't one,
143
 
  ;; try to connect to the server
 
145
  ;; try to connect to the server. Signals an error on failure
144
146
  (define (server-socket server port)
145
147
    (or (table-ref socket-cache (cons server port))
146
 
        (open-server server port)
147
 
        (error "No connection with server %s:%d" server port)))
 
148
        (open-server server port)))
148
149
 
149
150
  (define (register-rpc-server socket #!key closable)
150
151
    "Add the connection SOCKET to the table of known rpc connections. If
159
160
    "Remove SOCKET from the table of rpc connections."
160
161
    (let ((server (socket-peer-address socket))
161
162
          (port (socket-peer-port socket)))
162
 
      (when (eq (server-socket server port) socket)
163
 
        (let ((data (socket-data socket)))
 
163
      (when (eq (table-ref socket-cache (cons server port)) socket)
 
164
        (table-unset socket-cache (cons server port)))
 
165
      (let ((data (socket-data socket)))
 
166
        (if (not data)
 
167
            (close-socket socket)
164
168
          (when (socket-closable-p data)
165
169
            (close-socket socket))
166
 
          (table-unset socket-cache (cons server port))
167
 
          (table-unset socket-data-table socket)))))
 
170
          (table-unset socket-data-table socket)
 
171
          ;; fail-out any pending calls on this socket
 
172
          (mapc (lambda (id)
 
173
                  (dispatch-pending-call
 
174
                   socket id nil
 
175
                   (list 'rpc-error "Lost connection" server port)))
 
176
                (socket-pending-calls data))))))
168
177
 
169
178
  ;; Return the data structure associated with SOCKET
170
179
  (define (socket-data socket) (table-ref socket-data-table socket))
171
180
 
172
181
;;; socket I/O
173
182
 
 
183
  ;; maps from ID -> (CALLBACK ERROR? VALUE)
 
184
  (define pending-calls (make-table eq-hash eq))
 
185
 
 
186
  ;; XXX make this unspoofable
 
187
  (define make-call-id
 
188
    (let ((counter 0))
 
189
      (lambda ()
 
190
        (setq counter (1+ counter)))))
 
191
 
 
192
  (define (record-pending-call socket id callback)
 
193
    (table-set pending-calls id callback)
 
194
    (let ((data (socket-data socket)))
 
195
      (socket-pending-calls-set! data (cons id (socket-pending-calls data)))))
 
196
 
 
197
  (define (dispatch-pending-call socket id succeeded value)
 
198
    (let ((data (socket-data socket)))
 
199
      (socket-pending-calls-set! data (delq id (socket-pending-calls data))))
 
200
    (let ((callback (table-ref pending-calls id)))
 
201
      (when callback
 
202
        (table-unset pending-calls id)
 
203
        (callback succeeded value))))
 
204
 
174
205
  (define (rpc-socket-listener master-socket)
175
206
    "The function that should be used to listen for connections on rpc
176
207
server sockets."
183
214
      (register-rpc-server socket #:closable nil)
184
215
      socket))
185
216
 
186
 
  ;; Open an rpc connection to HOST:PORT
 
217
  ;; Open an rpc connection to HOST:PORT; signals an error on failure
187
218
  (define (open-server host port)
188
219
    (let (socket)
189
220
      (setq socket (socket-client host port
196
227
 
197
228
  (define (rpc-output-handler socket output)
198
229
    "The function used to handle any OUTPUT from SOCKET."
199
 
    (debug "Read: %S\n" output)
200
230
    (let ((sock-data (socket-data socket)))
201
 
      (when (socket-pending-data sock-data)
202
 
        (setq output (concat (socket-pending-data sock-data) output))
203
 
        (socket-pending-data-set! sock-data nil))
204
 
      (let ((stream (make-string-input-stream output))
205
 
            (point 0)
206
 
            form)
207
 
        (catch 'out
208
 
          (while t
 
231
      (socket-pending-data-set!
 
232
       sock-data (concat (socket-pending-data sock-data) output))
 
233
      ;;(debug "Input: %S\n" (socket-pending-data sock-data))
 
234
      (catch 'out
 
235
        (while t
 
236
          (let ((stream (make-string-input-stream
 
237
                         (socket-pending-data sock-data)))
 
238
                form)
209
239
            (condition-case nil
210
240
                (setq form (read stream))
211
241
              ((premature-end-of-stream end-of-stream)
212
242
               (throw 'out))
213
243
              ((invalid-read-syntax)
214
 
               (error "Can't parse rpc message: %S" (substring output point))))
 
244
               (error "Can't parse rpc message: %S"
 
245
                      (socket-pending-data sock-data))))
215
246
 
216
247
            (debug "Parsed: %S\n" form)
 
248
 
 
249
            ;; this function may be called reentrantly, so make sure the
 
250
            ;; state is always consistent..
 
251
            (socket-pending-data-set!
 
252
             ;; stream is (STRING . POINT)
 
253
             sock-data (substring (cdr stream) (car stream)))
 
254
 
217
255
            (case (car form)
218
 
              ((#t #f)
219
 
               ;; Response
220
 
               (unless (socket-result-pending sock-data)
221
 
                 (error "Spurious result on %s" socket))
222
 
               ((socket-result-pending sock-data) form))
223
 
                
224
 
              (t ;; Request
225
 
               (let ((send-result t))
226
 
                 (when (vectorp form)
227
 
                   ;; vectors denote async requests
228
 
                   (setq send-result nil)
229
 
                   (setq form (vector->list form)))
 
256
              ((result)
 
257
               ;; (result CALL-ID RETURNED? VALUE-OR-EXCEPTION)
 
258
               (let ((id (nth 1 form))
 
259
                     (succeeded (nth 2 form))
 
260
                     (value (nth 3 form)))
 
261
                 (dispatch-pending-call socket id succeeded value)))
 
262
 
 
263
              ((call)
 
264
               ;; (call CALL-ID SERVANT-ID ARGS...)
 
265
               (let ((id (nth 1 form))
 
266
                     (servant-id (nth 2 form))
 
267
                     (args (nthcdr 3 form)))
230
268
                 (let ((result (call-with-exception-handler
231
269
                                (lambda ()
232
 
                                  (let ((impl (servant-ref (car form)))
233
 
                                        (args (cdr form)))
 
270
                                  (let ((impl (servant-ref servant-id)))
234
271
                                    (unless impl
235
 
                                      (error "No such RPC servant: %s"
236
 
                                             (car form)))
 
272
                                      (error
 
273
                                       "No such RPC servant: %s" servant-id))
237
274
                                    (let-fluids ((active-socket socket))
238
 
                                      (cons '#t (apply impl args)))))
 
275
                                      (list t (apply impl args)))))
239
276
                                (lambda (data)
240
 
                                  (cons '#f data)))))
241
 
                   (when send-result
242
 
                     (debug "Wrote: %S\n" result)
243
 
                     (write socket (prin1-to-string result)))))))
244
 
            (setq point (car stream))))
245
 
        (when (< point (length output))
246
 
          (socket-pending-data-set! sock-data (substring output point))))))
247
 
 
248
 
  ;; Wait for an rpc response on SOCKET. Parse it and either return the
249
 
  ;; value or raise the exception
250
 
  (define (wait-for-reponse socket)
251
 
    (let ((old-vector (socket-result-pending (socket-data socket)))
252
 
          (result '()))
253
 
      (define (result-callback value)
254
 
        (debug "Result: %S\n" value)
255
 
        (setq result value))
256
 
      (socket-result-pending-set! (socket-data socket) result-callback)
257
 
      (unwind-protect
258
 
          (while (not result)
259
 
            (accept-process-output 60))
260
 
        (socket-result-pending-set! (socket-data socket) old-vector))
261
 
      (if (eq (car result) '#t)
262
 
          ;; success
263
 
          (cdr result)
264
 
        ;; exception raised
265
 
        (raise-exception (cdr result)))))
 
277
                                  (list nil data)))))
 
278
                   (when id
 
279
                     (let ((response (list* 'result id result)))
 
280
                       (debug "Wrote: %S\n" response)
 
281
                       (write socket (prin1-to-string response)))))))))))))
 
282
 
 
283
  (define (invoke-method socket id callback servant-id args)
 
284
    (record-pending-call socket id callback)
 
285
    (let ((request (list* 'call id servant-id args)))
 
286
      (debug "Wrote: %S\n" request)
 
287
      (write socket (prin1-to-string request))))
 
288
 
 
289
  (define (invoke-oneway-method socket servant-id args)
 
290
    (let ((request (list* 'call nil servant-id args)))
 
291
      (debug "Wrote: %S\n" request)
 
292
      (write socket (prin1-to-string request))))
 
293
 
 
294
  (define (synchronous-method-call socket servant-id args)
 
295
    (let ((id (make-call-id))
 
296
          (done nil)
 
297
          succeeded value)
 
298
      (invoke-method socket id
 
299
                     (lambda (a b)
 
300
                       (setq done t)
 
301
                       (setq succeeded a)
 
302
                       (setq value b))
 
303
                     servant-id args)
 
304
      (while (not done)
 
305
        (accept-process-output 60))
 
306
      (if succeeded
 
307
          value
 
308
        (raise-exception value))))
 
309
 
 
310
  (define (asynchronous-method-call socket callback servant-id args)
 
311
    (invoke-method socket (make-call-id) callback servant-id args))
 
312
 
 
313
  (define (oneway-method-call socket servant-id args)
 
314
    (invoke-oneway-method socket servant-id args))
266
315
 
267
316
  (define (rpc-create-server)
268
317
    "Start listening for rpc connections on the current machine"
313
362
  ;; magic object used to get information from proxies
314
363
  (define proxy-token (cons))
315
364
 
316
 
  ;; XXX shouldn't keep consing new proxies..
 
365
  ;; table mapping GLOBAL-ID -> PROXY-WEAK-REF
 
366
  (define proxy-table (make-table string-hash string=))
 
367
 
317
368
  (define (make-proxy server port servant-id)
318
369
    (let ((global-id (make-global-id server port servant-id)))
319
 
      (lambda args
320
 
        (if (eq (car args) proxy-token)
321
 
            ;; when called like this, do special things
322
 
            (case (cadr args)
323
 
              ((global-id) global-id)
324
 
 
325
 
              ((servant-id) servant-id)
326
 
 
327
 
              ((async)
328
 
               ;; async request - no result required
329
 
               (let ((socket (server-socket server port)))
330
 
                 (debug "Wrote: %S\n" (cons servant-id (cddr args)))
331
 
                 (write socket (prin1-to-string
332
 
                                ;; cheap hack, vectors mean async
333
 
                                (apply vector (cons servant-id
334
 
                                                    (cddr args))))))))
335
 
 
336
 
          ;; otherwise, just forward to the server
337
 
          (let ((socket (server-socket server port)))
338
 
            (debug "Wrote: %S\n" (cons servant-id args))
339
 
            (write socket (prin1-to-string (cons servant-id args)))
340
 
            (wait-for-reponse socket))))))
341
 
 
342
 
  (define (async-rpc-call proxy . args)
 
370
 
 
371
      (define (proxy)
 
372
        (lambda args
 
373
          (if (eq (car args) proxy-token)
 
374
              ;; when called like this, do special things
 
375
              (case (cadr args)
 
376
                ((global-id) global-id)
 
377
 
 
378
                ((servant-id) servant-id)
 
379
 
 
380
                ((oneway)
 
381
                 ;; async request - no result required
 
382
                 (oneway-method-call
 
383
                  (server-socket server port) servant-id (cddr args)))
 
384
 
 
385
                ((async)
 
386
                 (asynchronous-method-call
 
387
                  (server-socket server port)
 
388
                  (caddr args) servant-id (cdddr args))))
 
389
 
 
390
            ;; otherwise, just forward to the server
 
391
            (synchronous-method-call
 
392
             (server-socket server port) servant-id args))))
 
393
 
 
394
      ;; Avoid consing a new proxy each time..
 
395
      (let ((ref (table-ref proxy-table global-id)))
 
396
        (if ref
 
397
            (or (weak-ref ref)
 
398
                (let ((p (proxy)))
 
399
                  (weak-ref-set ref p)
 
400
                  p))
 
401
          (let ((p (proxy)))
 
402
            (table-set proxy-table global-id (make-weak-ref p))
 
403
            p)))))
 
404
 
 
405
  (define (async-rpc-call proxy #!key callback . args)
343
406
    "Call the rpc proxy function PROXY with arguments ARGS. It will be called
344
 
asynchronously - no result will be returned from the remote function."
345
 
    (apply proxy proxy-token 'async args))
 
407
asynchronously. No result will be returned from the remote function
 
408
unless CALLBACK is given, in which case (CALLBACK STATUS VALUE) will be
 
409
called at some point in the future."
 
410
    (if callback
 
411
        (apply proxy proxy-token 'async callback args)
 
412
      (apply proxy proxy-token 'oneway args))
 
413
    #undefined)
346
414
 
347
415
  (define (rpc-proxy->global-id proxy)
348
416
    "Return the globally-valid servant-id (a string) that can be used to
363
431
  (define (servant-id->global-id id)
364
432
    "Return the globally referenceable RPC servant id for local servant id ID."
365
433
    (unless listener-socket
366
 
      (error "Need an opened RPC server"))
 
434
      (error "Need an active local RPC server"))
367
435
    (make-global-id (socket-address listener-socket)
368
436
                    (socket-port listener-socket) id))
369
437