~ubuntu-branches/ubuntu/trusty/ocamlnet/trusty

« back to all changes in this revision

Viewing changes to src/rpc/rpc_server.ml

  • Committer: Bazaar Package Importer
  • Author(s): Stéphane Glondu
  • Date: 2011-09-02 14:12:33 UTC
  • mfrom: (18.2.3 sid)
  • Revision ID: james.westby@ubuntu.com-20110902141233-zbj0ygxb92u6gy4z
Tags: 3.4-1
* New upstream release
  - add a new NetcgiRequire directive to ease dependency management
    (Closes: #637147)
  - remove patches that were applied upstream:
    + Added-missing-shebang-lines-in-example-shell-scripts
    + Try-also-ocamlc-for-POSIX-threads

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
(* $Id: rpc_server.ml 1121 2007-05-06 18:20:37Z gerd $
 
1
(* $Id: rpc_server.ml 1651 2011-08-03 16:38:17Z gerd $
2
2
 * ----------------------------------------------------------------------
3
3
 *
4
4
 *)
9
9
open Uq_engines
10
10
open Rpc_common
11
11
open Rpc
 
12
open Printf
 
13
 
12
14
 
13
15
exception Connection_lost
14
16
 
45
47
    [ `Deny
46
48
    | `Drop
47
49
    | `Reject
 
50
    | `Reject_with of Rpc.server_error
48
51
    | `Accept
49
52
    | `Accept_limit_length of (int * rule)
50
53
    ]
51
54
 
52
55
type auth_result =
53
 
    Auth_positive of (string * string * string)
54
 
      (* (username, returned_verifier_flavour, returned_verifier_data) *)
 
56
    Auth_positive of
 
57
      (string * string * string * Xdr.encoder option * Xdr.decoder option)
55
58
  | Auth_negative of Rpc.server_error
 
59
  | Auth_reply of (Xdr_mstring.mstring list * string * string)
 
60
  | Auth_drop
56
61
 
57
62
 
58
63
type auth_peeker =
62
67
    ]
63
68
 
64
69
 
 
70
exception Late_drop
 
71
 
 
72
class type auth_details =
 
73
object
 
74
  method server_addr : Unix.sockaddr option
 
75
  method client_addr : Unix.sockaddr option
 
76
  method program : uint4
 
77
  method version : uint4
 
78
  method procedure : uint4
 
79
  method xid : uint4
 
80
  method credential : string * string
 
81
  method verifier : string * string
 
82
  method frame_len : int
 
83
  method message : Rpc_packer.packed_value
 
84
end
 
85
 
 
86
 
65
87
class type ['t] pre_auth_method =
66
88
object
67
89
  method name : string
73
95
  method authenticate :
74
96
    't ->
75
97
    connection_id ->
76
 
    Unix.sockaddr option ->
77
 
    Unix.sockaddr option ->
78
 
    string ->
79
 
    string ->
80
 
    string ->
81
 
    string ->
 
98
    auth_details ->
82
99
    (auth_result -> unit) ->
83
100
      unit
84
101
 
136
153
 
137
154
type t =
138
155
      { mutable main_socket_name : Rpc_transport.sockaddr;
 
156
        mutable dummy : bool;
139
157
        mutable service : (Rpc_program.t * binding Uint4Map.t) 
140
158
                            Uint4Map.t Uint4Map.t;
141
159
                (* Program nr/version nr/procedure nr *)
145
163
        mutable exception_handler : exn -> unit;
146
164
        mutable unmap_port : (unit -> unit);
147
165
        mutable onclose : (connection_id -> unit) list;
148
 
        mutable filter : (Unix.sockaddr -> connection_id -> rule);
 
166
        mutable filter : (Rpc_transport.sockaddr -> connection_id -> rule);
149
167
        mutable auth_methods : (string, t pre_auth_method) Hashtbl.t;
150
168
        mutable auth_peekers : (auth_peeker * t pre_auth_method) list;
151
169
        mutable connections : connection list;
152
170
        mutable master_acceptor : server_socket_acceptor option;
153
 
        mutable transport_timeout : float
 
171
        mutable transport_timeout : float;
 
172
        mutable nolog : bool;
 
173
        mutable get_last_proc : unit->string;
 
174
        mutable mstring_factories : Xdr_mstring.named_mstring_factories;
154
175
      }
155
176
 
156
177
and connection =
199
220
        parameter : xdr_value;     (* XV_void if not used *)
200
221
        result : Rpc_packer.packed_value;
201
222
         (* complete result; "" if not used *)
 
223
        ptrace_result :string;  (* ptrace only; "" if not used *)
202
224
        auth_method : t pre_auth_method;
203
225
        auth_user : string;
204
226
        auth_ret_flav : string;
205
227
        auth_ret_data : string;
 
228
        encoder : Xdr.encoder option;
206
229
      }
207
230
 
208
231
and connector =
210
233
    | Portmapped
211
234
    | Internet of (Unix.inet_addr * int)   (* addr, port *)
212
235
    | Unix of string                       (* path to unix dom sock *)
 
236
    | W32_pipe of string
213
237
    | Descriptor of Unix.file_descr
214
238
    | Dynamic_descriptor of (unit -> Unix.file_descr)
215
239
 
235
259
  method name = "AUTH_NONE"
236
260
  method flavors = [ "AUTH_NONE" ]
237
261
  method peek = `None
238
 
  method authenticate _ _ _ _ _ _ _ _ f = f(Auth_positive("","AUTH_NONE",""))
 
262
  method authenticate _ _ _ f = 
 
263
    f(Auth_positive("","AUTH_NONE","",None,None))
239
264
end
240
265
 
241
266
let auth_none = new auth_none
245
270
  method name = "AUTH_TOO_WEAK"
246
271
  method flavors = []
247
272
  method peek = `None
248
 
  method authenticate _ _ _ _ _ _ _ _ f = f(Auth_negative Auth_too_weak)
 
273
  method authenticate _ _ _ f = 
 
274
    f(Auth_negative Auth_too_weak)
249
275
end
250
276
 
251
277
let auth_too_weak = new auth_too_weak
259
285
      (fun mplex ->
260
286
         mplex # peer_user_name
261
287
      )
262
 
  method authenticate _ _ _ _ _ _ _ _ f = f(Auth_negative Auth_too_weak)
 
288
  method authenticate _ _ _ f = 
 
289
    f(Auth_negative Auth_too_weak)
263
290
end
264
291
 
265
292
let auth_transport = new auth_transport
266
293
 
267
294
  (*****)
268
295
 
269
 
let debug_internals_log = ref None
270
 
 
271
 
let debug_service_log = ref None
272
 
 
273
 
 
274
 
let debug_service msg =
275
 
  match !debug_service_log with
276
 
    | None -> ()
277
 
    | Some f -> f msg
278
 
 
279
 
let debug_servicef msgf =
280
 
  Printf.kprintf debug_service msgf
281
 
 
282
 
 
283
 
let debug_internals msg =
284
 
  match !debug_internals_log with
285
 
    | None -> ()
286
 
    | Some f -> f msg
287
 
 
288
 
let debug_internalsf msgf =
289
 
  Printf.kprintf debug_internals msgf
 
296
module Debug = struct
 
297
  let enable = ref false
 
298
  let enable_ctrace = ref false
 
299
  let enable_ptrace = ref false
 
300
  let ptrace_verbosity = ref `Name_abbrev_args
 
301
  let disable_for_server srv = srv.nolog <- true
 
302
end
 
303
 
 
304
let dlog0 = Netlog.Debug.mk_dlog "Rpc_server" Debug.enable
 
305
let dlogr0 = Netlog.Debug.mk_dlogr "Rpc_server" Debug.enable
 
306
 
 
307
let dlog srv m = if not srv.nolog then dlog0 m
 
308
let dlogr srv m = if not srv.nolog then dlogr0 m
 
309
 
 
310
let dlog0_ctrace = Netlog.Debug.mk_dlog "Rpc_server.Ctrace" Debug.enable_ctrace
 
311
let dlogr0_ctrace = Netlog.Debug.mk_dlogr "Rpc_server.Ctrace" Debug.enable_ctrace
 
312
 
 
313
let dlog_ctrace srv m = if not srv.nolog then dlog0_ctrace m
 
314
let dlogr_ctrace srv m = if not srv.nolog then dlogr0_ctrace m
 
315
 
 
316
 
 
317
let dlog0_ptrace = Netlog.Debug.mk_dlog "Rpc_server.Ptrace" Debug.enable_ptrace
 
318
let dlogr0_ptrace = Netlog.Debug.mk_dlogr "Rpc_server.Ptrace" Debug.enable_ptrace
 
319
 
 
320
let dlog_ptrace srv m = if not srv.nolog then dlog0_ptrace m
 
321
let dlogr_ptrace srv m = if not srv.nolog then dlogr0_ptrace m
 
322
 
 
323
 
 
324
let () =
 
325
  Netlog.Debug.register_module "Rpc_server" Debug.enable;
 
326
  Netlog.Debug.register_module "Rpc_server.Ctrace" Debug.enable_ctrace;
 
327
  Netlog.Debug.register_module "Rpc_server.Ptrace" Debug.enable_ptrace
 
328
 
 
329
  (*****)
 
330
 
 
331
let connector_of_sockaddr =
 
332
  function
 
333
    | Unix.ADDR_INET(ip,p) ->
 
334
        Internet(ip,p)
 
335
    | Unix.ADDR_UNIX s ->
 
336
        Unix s
 
337
 
 
338
let connector_of_socksymbol sym =
 
339
  connector_of_sockaddr
 
340
    (Uq_resolver.sockaddr_of_socksymbol sym)
290
341
 
291
342
 
292
343
let sockaddrname sa =
294
345
    | Unix.ADDR_INET(addr, port) ->
295
346
        Unix.string_of_inet_addr addr ^ ":" ^ string_of_int port
296
347
    | Unix.ADDR_UNIX path ->
297
 
        path
 
348
        String.escaped path
298
349
 
299
350
let portname fd =
300
351
  try 
339
390
    | Auth_too_weak            -> "Auth_too_weak"
340
391
    | Auth_invalid_resp        -> "Auth_invalid_resp"
341
392
    | Auth_failed              -> "Auth_failed"
 
393
    | RPCSEC_GSS_ctxproblem    -> "RPCSEC_GSS_ctxproblem"
 
394
    | RPCSEC_GSS_credproblem   -> "RPCSEC_GSS_credproblem"
342
395
 
343
396
  (*****)
344
397
 
357
410
    Execute_procedure
358
411
  | Reject_procedure of server_error
359
412
 
360
 
let process_incoming_message srv conn sockaddr peeraddr message reaction =
 
413
let process_incoming_message srv conn sockaddr_lz peeraddr message reaction =
361
414
 
362
415
  let sockaddr_opt =
363
 
    try Some(Lazy.force sockaddr) with _ -> None in
 
416
    try Some(Lazy.force sockaddr_lz) with _ -> None in
 
417
 
 
418
  let sockaddr =
 
419
    match sockaddr_opt with
 
420
      | Some a -> `Sockaddr a
 
421
      | None -> `Implied in
364
422
 
365
423
  let peeraddr_opt =
366
424
    match peeraddr with
373
431
             | `Sockaddr a -> a
374
432
         ) in
375
433
 
376
 
  let make_immediate_answer xid procname result =
 
434
  let make_immediate_answer xid procname result f_ptrace_result =
 
435
    srv.get_last_proc <- 
 
436
      (fun () -> 
 
437
         if procname = "" then "Unavailable" else "Response " ^ procname);
377
438
    { server = conn;
378
439
      prog = None;
379
440
      sess_conn_id = if srv.prot = Rpc.Tcp then conn.conn_id
380
 
                     else new connection_id sockaddr peeraddr_lz;
381
 
      sockaddr = sockaddr;
 
441
                     else new connection_id sockaddr_lz peeraddr_lz;
 
442
      sockaddr = sockaddr_lz;
382
443
      peeraddr = peeraddr;
383
444
      call_id = (-1);          (* not applicable *)
384
445
      client_id = xid;
389
450
      auth_user = "";
390
451
      auth_ret_flav = "AUTH_NONE";
391
452
      auth_ret_data = "";
 
453
      encoder = None;
 
454
      ptrace_result = (if !Debug.enable_ptrace then f_ptrace_result() else "")
392
455
    }
393
456
  in
394
457
 
417
480
               let xid = Rpc_packer.peek_xid message in
418
481
               let reply = Rpc_packer.pack_accepting_reply xid
419
482
                             ret_flav ret_data condition in
420
 
               let answer = make_immediate_answer xid "" reply in
421
 
               if !debug_service_log <> None then
422
 
                 debug_servicef "Rpc_server (port %s, xid %s): Error %s"
423
 
                   (mplexoptname conn.trans) (xidname answer.client_id) 
424
 
                   (errname condition);
 
483
               let answer = 
 
484
                 make_immediate_answer xid "" reply 
 
485
                   (fun () -> "Error " ^ errname condition) in
425
486
               schedule_answer answer
426
487
            )
427
 
      | Xdr.Xdr_format _
428
 
      | Xdr.Xdr_format_message_too_long _ ->          (* Convert to Garbage *)
429
 
          protect_protect
430
 
            (fun () ->
 
488
      | (Xdr.Xdr_format _
 
489
        | Xdr.Xdr_format_message_too_long _ as e
 
490
        ) ->
 
491
          (* Convert to Garbage *)
 
492
           protect_protect
 
493
             (fun () ->
 
494
                dlogr srv
 
495
                 (fun () ->
 
496
                    sprintf "Emitting Garbage after exception: %s"
 
497
                      (Netexn.to_string e)
 
498
                 );
431
499
               let xid = Rpc_packer.peek_xid message in
432
500
               let reply = Rpc_packer.pack_accepting_reply xid
433
501
                             ret_flav ret_data Garbage in
434
 
               let answer = make_immediate_answer xid "" reply in
435
 
               if !debug_service_log <> None then
436
 
                 debug_servicef "Rpc_server (port %s, xid %s): Error Garbage"
437
 
                   (mplexoptname conn.trans) (xidname answer.client_id);
 
502
               let answer = make_immediate_answer xid "" reply 
 
503
                 (fun () -> "Error Garbage") in
438
504
               schedule_answer answer
439
505
            )
440
506
      | Rpc_server condition ->
442
508
            (fun () ->
443
509
               let xid = Rpc_packer.peek_xid message in
444
510
               let reply = Rpc_packer.pack_rejecting_reply xid condition in
445
 
               let answer = make_immediate_answer xid "" reply in
446
 
               if !debug_service_log <> None then
447
 
                 debug_servicef "Rpc_server (port %s, xid %s): Error %s"
448
 
                   (mplexoptname conn.trans) (xidname answer.client_id)
449
 
                   (errname condition);
 
511
               let answer = make_immediate_answer xid "" reply 
 
512
                 (fun () -> "Error " ^ errname condition) in
450
513
               schedule_answer answer
451
514
            )
 
515
      | Late_drop ->
 
516
          Netlog.logf `Err
 
517
            "Dropping response message"
452
518
      | Abort(_,_) as x ->
453
519
          raise x
454
520
      | any ->
459
525
               let xid = Rpc_packer.peek_xid message in
460
526
               let reply = Rpc_packer.pack_accepting_reply xid
461
527
                             ret_flav ret_data System_err in
462
 
               let answer = make_immediate_answer xid "" reply in
463
 
               if !debug_service_log <> None then
464
 
                 debug_servicef "Rpc_server (port %s, xid %s): Error System_err"
465
 
                   (mplexoptname conn.trans) (xidname answer.client_id);
 
528
               let answer = make_immediate_answer xid "" reply
 
529
                 (fun () -> "Error System_err") in
466
530
               schedule_answer answer
467
531
            )
468
532
  in
477
541
               = Rpc_packer.unpack_call_frame_l message
478
542
             in
479
543
 
 
544
             dlogr_ptrace srv
 
545
               (fun () ->
 
546
                  sprintf
 
547
                    "Request (sock=%s,peer=%s,xid=%lu) for [0x%lx,0x%lx,0x%lx]"
 
548
                    (Rpc_transport.string_of_sockaddr sockaddr)
 
549
                    (Rpc_transport.string_of_sockaddr peeraddr)
 
550
                    (Rtypes.logical_int32_of_uint4 xid)
 
551
                    (Rtypes.logical_int32_of_uint4 prog_nr)
 
552
                    (Rtypes.logical_int32_of_uint4 vers_nr)
 
553
                    (Rtypes.logical_int32_of_uint4 proc_nr)
 
554
               );
 
555
             
480
556
             let sess_conn_id =
481
557
               if srv.prot = Rpc.Tcp then 
482
558
                 conn.conn_id
483
559
               else 
484
 
                 new connection_id sockaddr peeraddr_lz
 
560
                 new connection_id sockaddr_lz peeraddr_lz
485
561
             in
486
562
 
487
563
             (* First authenticate: *)
489
565
               match conn.peeked_user with
490
566
                 | Some uid ->
491
567
                     (conn.peeked_method,
492
 
                      (fun _ _ _ _ _ _ _ _ cb ->
493
 
                         cb (Auth_positive(uid, "AUTH_NONE", "")))
 
568
                      (fun _ _ _ cb ->
 
569
                         cb (Auth_positive(uid, "AUTH_NONE", "", None, None)))
494
570
                     )
495
571
                 | None ->
496
572
                     ( let m =
500
576
                     )
501
577
             in
502
578
 
 
579
             let auth_details =
 
580
               ( object
 
581
                   method server_addr = sockaddr_opt
 
582
                   method client_addr = peeraddr_opt
 
583
                   method program = prog_nr
 
584
                   method version = vers_nr
 
585
                   method procedure = proc_nr
 
586
                   method xid = xid
 
587
                   method credential = (flav_cred, data_cred)
 
588
                   method verifier = (flav_verf, data_verf)
 
589
                   method frame_len = frame_len
 
590
                   method message = message
 
591
                 end
 
592
               ) in
 
593
 
503
594
             (* The [authenticate] method will call the passed function
504
595
              * when the authentication is done. This may be at any time
505
596
              * in the future.
506
597
              *)
507
598
             authenticate
508
 
               srv sess_conn_id sockaddr_opt peeraddr_opt
509
 
               flav_cred data_cred flav_verf data_verf
510
 
               (function Auth_positive(user,ret_flav,ret_data) ->
 
599
               srv sess_conn_id auth_details
 
600
               (function 
 
601
                  Auth_positive(user,ret_flav,ret_data,enc_opt,dec_opt) ->
511
602
                  (* user: the username (method-dependent)
512
603
                   * ret_flav: flavour of verifier to return
513
604
                   * ret_data: data of verifier to return
546
637
 
547
638
                       let param =
548
639
                         Rpc_packer.unpack_call_body
 
640
                           ~mstring_factories:srv.mstring_factories
 
641
                           ?decoder:dec_opt
549
642
                           prog procname message frame_len in
550
643
 
 
644
                       srv.get_last_proc <-
 
645
                         (fun () ->
 
646
                            (* no [string_of_request] - we would keep a
 
647
                               reference to param forever!
 
648
                             *)
 
649
                            "Invoke " ^ procname ^ "()"
 
650
                         );
551
651
 
552
 
                       if !debug_service_log <> None then
553
 
                         debug_servicef "Rpc_server (port %s, xid %s): Call for %s"
554
 
                           (mplexoptname conn.trans) (xidname xid) procname;
 
652
                       dlogr_ptrace srv
 
653
                         (fun () ->
 
654
                            sprintf
 
655
                              "Invoke (sock=%s,peer=%s,xid=%lu): %s"
 
656
                              (Rpc_transport.string_of_sockaddr sockaddr)
 
657
                              (Rpc_transport.string_of_sockaddr peeraddr)
 
658
                              (Rtypes.logical_int32_of_uint4 xid)
 
659
                              (Rpc_util.string_of_request
 
660
                                 !Debug.ptrace_verbosity
 
661
                                 prog
 
662
                                 procname
 
663
                                 param
 
664
                              )
 
665
                         );
555
666
 
556
667
                       begin match proc with
557
668
                           Sync p ->
558
669
                             let result_value =
559
670
                               p.sync_proc param
560
671
                             in
561
 
                             let reply = Rpc_packer.pack_successful_reply
562
 
                                           prog p.sync_name xid
563
 
                                           ret_flav ret_data result_value in
 
672
                             (* Exceptions raised by the encoder are
 
673
                                handled by [protect]
 
674
                              *)
 
675
                             let reply = 
 
676
                               Rpc_packer.pack_successful_reply
 
677
                                 ?encoder:enc_opt
 
678
                                 prog p.sync_name xid
 
679
                                 ret_flav ret_data result_value in
564
680
                             let answer = make_immediate_answer
565
 
                               xid procname reply in
566
 
                             if !debug_service_log <> None then
567
 
                               debug_servicef "Rpc_server (port %s, xid %s): Reply from %s"
568
 
                                 (mplexoptname conn.trans) (xidname xid) 
569
 
                                 procname;
 
681
                               xid procname reply 
 
682
                               (fun () ->
 
683
                                  Rpc_util.string_of_response
 
684
                                    !Debug.ptrace_verbosity
 
685
                                    prog
 
686
                                    procname
 
687
                                    result_value
 
688
                               )
 
689
                             in
570
690
                             schedule_answer answer
571
691
                         | Async p ->
572
692
                             let u, m = match conn.peeked_user with
577
697
                               { server = conn;
578
698
                                 prog = Some prog;
579
699
                                 sess_conn_id = sess_conn_id;
580
 
                                 sockaddr = sockaddr;
 
700
                                 sockaddr = sockaddr_lz;
581
701
                                 peeraddr = peeraddr;
582
702
                                 call_id = conn.next_call_id;
583
703
                                 client_id = xid;
588
708
                                 auth_user = u;
589
709
                                 auth_ret_flav = ret_flav;
590
710
                                 auth_ret_data = ret_data;
 
711
                                 ptrace_result = "";  (* not yet known *)
 
712
                                 encoder = enc_opt;
591
713
                               } in
592
714
                             conn.next_call_id <- conn.next_call_id + 1;
593
715
                             p.async_invoke this_session param
595
717
                    )
596
718
                  | Auth_negative code ->
597
719
                      protect (fun () -> raise(Rpc_server code))
 
720
                  | Auth_reply (data, ret_flav, ret_data) ->
 
721
                      let reply = 
 
722
                        Rpc_packer.pack_successful_reply_raw
 
723
                          xid ret_flav ret_data data in
 
724
                      let answer = 
 
725
                        make_immediate_answer
 
726
                          xid "" reply 
 
727
                          (fun () -> "") in
 
728
                      schedule_answer answer
 
729
                  | Auth_drop ->
 
730
                      dlog srv "auth_drop";
 
731
                      ()
 
732
                      
598
733
               )
599
734
         | Reject_procedure reason ->
 
735
             srv.get_last_proc <-
 
736
               (fun () ->
 
737
                  "Reject " ^ Rpc.string_of_server_error reason
 
738
               );
600
739
             protect (fun () -> raise(Rpc_server reason))
601
740
    )
602
741
;;
608
747
    | None ->
609
748
        ()
610
749
    | Some mplex ->
611
 
        if !debug_service_log <> None then
612
 
          debug_servicef "Rpc_server (port %s): Closing"
613
 
            (mplexname mplex);
 
750
        dlogr_ctrace srv
 
751
          (fun () ->
 
752
             sprintf "(sock=%s,peer=%s): Closing"
 
753
               (Rpc_transport.string_of_sockaddr mplex#getsockname)
 
754
               (Rpc_transport.string_of_sockaddr mplex#getpeername));
614
755
        conn.trans <- None;
615
756
        mplex # abort_rw();
616
757
        ( try
643
784
 
644
785
  (*****)
645
786
 
646
 
let get_rule conn srv peer =
647
 
  match conn.rule with
648
 
      None ->
649
 
        let r = srv.filter peer conn.conn_id in
650
 
        conn.rule <- Some r;
651
 
          r
652
 
    | Some r -> r
653
 
;;
654
 
 
655
787
 
656
788
let rec unroll_rule r length =
657
789
  match r with
658
 
      `Accept_limit_length(limit,r') ->
 
790
    | `Accept_limit_length(limit,r') ->
659
791
        if length > limit then unroll_rule r' length else `Accept
660
 
    | _ -> r
 
792
    | (`Drop | `Reject | `Reject_with _ | `Deny | `Accept as other) -> 
 
793
        other
661
794
;;
662
795
 
663
796
 
668
801
        terminate_connection srv conn;
669
802
        raise e
670
803
 
671
 
    | `Ok(pv,trans_addr) ->
672
 
        if !debug_internals_log <> None then 
673
 
          debug_internalsf "Rpc_server: got message";
 
804
    | `Ok(in_rec,trans_addr) ->
 
805
        dlog srv "got message";
674
806
 
675
807
        if conn.close_when_empty then (
676
 
          if !debug_internals_log <> None then 
677
 
            debug_internalsf "Rpc_server: ignoring msg after shutdown";
 
808
            dlog srv "ignoring msg after shutdown";
678
809
        ) else (
679
810
 
680
811
          (* First check whether the message matches the filter rule: *)
681
 
          let rule =
682
 
            match trans_addr with
683
 
              | `Implied -> 
684
 
                  if !debug_internals_log <> None then 
685
 
                    debug_internalsf "Rpc_server: No filter 1 (implied address)";
686
 
                  `Accept  (* Don't have the information to process filters *)
687
 
              | `Sockaddr peer ->
688
 
                  if !debug_internals_log <> None then 
689
 
                    debug_internalsf "Rpc_server: Checking filter 1";
690
 
                  let rule = 
691
 
                    unroll_rule (get_rule conn srv peer)
692
 
                      (Rpc_packer.length_of_packed_value pv)
693
 
                  in
694
 
                  rule
695
 
          in
696
 
          conn.rule <- None;                 (* reset rule after usage *)
697
812
          
698
813
          let peeraddr = trans_addr in
699
 
          
700
 
          let sockaddr =
 
814
 
 
815
          let sockaddr, trans_sockaddr =
701
816
            match conn.trans with
702
817
              | None -> assert false
703
818
              | Some trans ->
704
 
                  lazy ( match trans # getsockname with
705
 
                           | `Sockaddr a -> a
706
 
                           | `Implied -> failwith "Address not available" ) in
707
 
          
708
 
          ( match rule with
709
 
              | `Accept ->
710
 
                  process_incoming_message
711
 
                    srv conn sockaddr peeraddr pv Execute_procedure
 
819
                  ( lazy ( match trans # getsockname with
 
820
                             | `Sockaddr a -> a
 
821
                             | `Implied -> failwith "Address not available" ),
 
822
                    trans#getsockname
 
823
                  ) in
 
824
                  
 
825
          ( match in_rec with
712
826
              | `Deny ->
713
 
                  terminate_connection srv conn
 
827
                  dlogr_ptrace srv
 
828
                    (fun () ->
 
829
                       sprintf
 
830
                         "Request (sock=%s,peer=%s): Deny"
 
831
                         (Rpc_transport.string_of_sockaddr trans_sockaddr)
 
832
                         (Rpc_transport.string_of_sockaddr peeraddr)
 
833
                    );
 
834
                  terminate_connection srv conn (* for safety *)
714
835
              | `Drop ->
715
836
                  (* Simply forget the message *)
 
837
                  dlogr_ptrace srv
 
838
                    (fun () ->
 
839
                       sprintf
 
840
                         "Request (sock=%s,peer=%s): Drop"
 
841
                         (Rpc_transport.string_of_sockaddr trans_sockaddr)
 
842
                         (Rpc_transport.string_of_sockaddr peeraddr)
 
843
                    );
716
844
                  ()
717
 
              | `Reject ->
 
845
              | `Accept pv ->
 
846
                  process_incoming_message
 
847
                    srv conn sockaddr peeraddr pv Execute_procedure
 
848
              | `Reject pv ->
718
849
                  process_incoming_message
719
850
                    srv conn sockaddr peeraddr pv
720
851
                    (Reject_procedure Auth_too_weak)
721
 
              | `Accept_limit_length(_,_) -> assert false
 
852
              | `Reject_with(pv,code) ->
 
853
                  process_incoming_message
 
854
                    srv conn sockaddr peeraddr pv
 
855
                    (Reject_procedure code)
722
856
          );
723
857
          next_incoming_message srv conn  (* if still connected *)
724
858
        )
725
859
 
726
860
    | `End_of_file ->
727
 
        if !debug_internals_log <> None then 
728
 
          debug_internalsf "Rpc_server: End of file";
 
861
        dlog srv "End of file";
729
862
        terminate_connection srv conn
730
863
 
731
864
 
735
868
    | Some trans -> next_incoming_message' srv conn trans
736
869
 
737
870
and next_incoming_message' srv conn trans =
 
871
  let filter_var = ref None in
738
872
  trans # start_reading
739
873
    ~peek:(fun () -> peek_credentials srv conn)
 
874
    ~before_record:(handle_before_record srv conn filter_var)
740
875
    ~when_done:(fun r -> handle_incoming_message srv conn r)
741
876
    ()
742
877
 
743
 
and handle_before_record srv conn n trans_addr =
744
 
  match trans_addr with
745
 
    | `Implied ->
746
 
        if !debug_internals_log <> None then 
747
 
          debug_internalsf "Rpc_server: No filter 2 (implied address)";
748
 
        ()     (* Don't have the information to process filters *)
749
 
    | `Sockaddr peer ->
750
 
        if !debug_internals_log <> None then 
751
 
          debug_internalsf "Rpc_server: Checking filter 2";
752
 
        ( match unroll_rule (get_rule conn srv peer) n with
753
 
            | `Accept -> ()
754
 
            | `Deny   -> terminate_connection srv conn
755
 
            | `Drop   -> 
756
 
                ( match conn.trans with
757
 
                    | None -> ()
758
 
                    | Some trans ->
759
 
                        conn.rule <- None;
760
 
                        trans # skip_message()
761
 
                )
762
 
            | `Reject -> ()
763
 
            | `Accept_limit_length(_,_) -> assert false
764
 
        )
 
878
and handle_before_record srv conn filter_var n trans_addr =
 
879
  dlog srv "Checking filter before_record";
 
880
(*
 
881
  let filter = 
 
882
    match !filter_var with
 
883
      | Some filter -> 
 
884
          filter
 
885
      | None ->
 
886
 *)
 
887
          let filter = srv.filter trans_addr conn.conn_id in
 
888
(*
 
889
          filter_var := Some filter;
 
890
          filter in
 
891
 *)
 
892
  ( match unroll_rule filter n with
 
893
      | `Accept -> `Accept
 
894
      | `Deny   -> terminate_connection srv conn; `Deny
 
895
      | `Drop   -> `Drop
 
896
      | `Reject -> `Reject
 
897
      | `Reject_with code -> `Reject_with code
 
898
  )
765
899
 
766
900
and peek_credentials srv conn =
767
901
  if not conn.peeked && srv.prot = Tcp && srv.auth_peekers <> [] then begin
817
951
        raise e
818
952
 
819
953
    | `Ok () ->
820
 
        if !debug_internals_log <> None then 
821
 
          debug_internalsf "Rpc_server: message writing finished";
 
954
        dlog srv "message writing finished";
822
955
        if conn.close_when_empty && Queue.is_empty conn.replies then (
823
 
          if !debug_internals_log <> None then 
824
 
            debug_internalsf "Rpc_server: closing connection gracefully";
 
956
          dlog srv "closing connection gracefully";
825
957
          terminate_connection srv conn
826
958
        )
827
959
        else
840
972
 
841
973
  match reply_opt with
842
974
    | Some reply ->
843
 
        if !debug_internals_log <> None then 
844
 
          debug_internalsf "Rpc_server: next reply";
 
975
        dlogr_ptrace srv
 
976
          (fun () ->
 
977
             let sockaddr =
 
978
               try `Sockaddr (Lazy.force reply.sockaddr)
 
979
               with _ -> `Implied in
 
980
             sprintf
 
981
               "Response (sock=%s,peer=%s,cid=%d,xid=%ld): %s"
 
982
               (Rpc_transport.string_of_sockaddr sockaddr)
 
983
               (Rpc_transport.string_of_sockaddr reply.peeraddr)
 
984
               reply.call_id
 
985
               (Rtypes.logical_int32_of_uint4 reply.client_id)
 
986
               reply.ptrace_result
 
987
          );
 
988
 
 
989
        dlog srv "next reply";
845
990
        trans # start_writing
846
991
          ~when_done:(fun r ->
847
992
                        handle_outgoing_message srv conn r)
849
994
          reply.peeraddr
850
995
    | None ->
851
996
        (* this was the last reply in the queue *)
852
 
        if !debug_internals_log <> None then 
853
 
          debug_internalsf "Rpc_server: last reply"
 
997
        dlog srv "last reply"
854
998
;;
855
999
 
856
1000
check_for_output := next_outgoing_message ;;
871
1015
    [ `Socket_endpoint of protocol * Unix.file_descr
872
1016
    | `Multiplexer_endpoint of Rpc_transport.rpc_multiplex_controller
873
1017
    | `Socket of protocol * connector * socket_config
 
1018
    | `Dummy of protocol
874
1019
    ]
875
1020
 
876
1021
 
877
1022
let create2_srv prot esys =
878
1023
  let default_exception_handler ex =
879
 
    prerr_endline ("RPC server exception handler: Exception " ^ Printexc.to_string ex ^ " caught")
 
1024
    Netlog.log
 
1025
      `Crit
 
1026
      ("Rpc_server exception handler: Exception " ^ Netexn.to_string ex)
880
1027
  in
881
1028
 
882
1029
  let none = Hashtbl.create 3 in
883
1030
  Hashtbl.add none "AUTH_NONE" auth_none;
884
1031
 
 
1032
  let mf = Hashtbl.create 1 in
 
1033
  Hashtbl.add mf "*" Xdr_mstring.string_based_mstrings;
 
1034
  
885
1035
  { main_socket_name = `Implied;
 
1036
    dummy = false;
886
1037
    service = Uint4Map.empty;
887
1038
    portmapped = None;
888
1039
    esys = esys;
896
1047
    connections = [];
897
1048
    master_acceptor = None;
898
1049
    transport_timeout = (-1.0);
899
 
  }
 
1050
    nolog = false;
 
1051
    get_last_proc = (fun () -> "");
 
1052
    mstring_factories = mf 
 
1053
  }  
900
1054
;;
901
1055
 
902
1056
 
928
1082
;;
929
1083
 
930
1084
 
 
1085
let track fd =
 
1086
  Netlog.Debug.track_fd
 
1087
    ~owner:"Rpc_server"
 
1088
    ~descr:(sprintf "RPC connection %s"
 
1089
              (Netsys.string_of_fd fd))
 
1090
    fd
 
1091
 
 
1092
 
 
1093
let track_server fd =
 
1094
  Netlog.Debug.track_fd
 
1095
    ~owner:"Rpc_server"
 
1096
    ~descr:(sprintf "RPC server %s" (Netsys.string_of_fd fd))
 
1097
    fd
 
1098
 
 
1099
 
931
1100
let create2_multiplexer_endpoint ?fd mplex =
932
1101
  let prot = mplex#protocol in
933
1102
  let srv  = create2_srv prot mplex#event_system in
942
1111
    (fun () ->
943
1112
       (* Try to peek credentials. This can be too early, however. *)
944
1113
       if conn.trans <> None then (
945
 
         if !debug_service_log <> None then
946
 
           debug_servicef "Rpc_server (port %s): Serving connection"
947
 
             (portoptname fd);
 
1114
         dlogr_ctrace srv
 
1115
           (fun () ->
 
1116
              sprintf "(sock=%s,peer=%s): Serving connection"
 
1117
                (Rpc_transport.string_of_sockaddr mplex#getsockname)
 
1118
                (portoptname fd));
948
1119
         if srv.transport_timeout >= 0.0 then
949
1120
           mplex # set_timeout 
950
1121
             ~notify:(on_trans_timeout srv conn) srv.transport_timeout;
958
1129
 
959
1130
 
960
1131
let mplex_of_fd ~close_inactive_descr prot fd esys =
 
1132
  let preclose() =
 
1133
    Netlog.Debug.release_fd fd in
961
1134
  match prot with
962
1135
    | Tcp ->
963
1136
        Rpc_transport.stream_rpc_multiplex_controller
964
 
          ~close_inactive_descr fd esys
 
1137
          ~close_inactive_descr ~preclose fd esys
965
1138
    | Udp ->
966
1139
        Rpc_transport.datagram_rpc_multiplex_controller
967
 
          ~close_inactive_descr fd esys 
 
1140
          ~close_inactive_descr ~preclose fd esys 
968
1141
;;
969
1142
 
970
1143
 
990
1163
 
991
1164
let create2_socket_endpoint ?(close_inactive_descr=true) 
992
1165
                            prot fd esys =
 
1166
  if close_inactive_descr then track fd;
993
1167
  let mplex = mplex_of_fd ~close_inactive_descr prot fd esys in
994
1168
  create2_multiplexer_endpoint ~fd mplex 
995
1169
;;
1001
1175
  let srv = create2_srv prot esys in
1002
1176
 
1003
1177
  let create_multiplexer_eng ?(close_inactive_descr = true) fd prot =
 
1178
    if close_inactive_descr then track fd;
1004
1179
    config # multiplexing ~close_inactive_descr prot fd esys in
1005
1180
 
1006
1181
  let rec accept_connections acc =  (* for stream sockets *)
1012
1187
                    ~is_done:(fun mplex ->
1013
1188
                                let conn = connection srv mplex in
1014
1189
                                conn.fd <- Some slave_fd;
1015
 
                                if !debug_service_log <> None then
1016
 
                                  debug_servicef "Rpc_server (port %s): Serving connection"
1017
 
                                    (portname slave_fd);
 
1190
                                dlogr_ctrace srv
 
1191
                                  (fun () ->
 
1192
                                     sprintf "(sock=%s,peer=%s): Serving connection"
 
1193
                                       (Rpc_transport.string_of_sockaddr 
 
1194
                                          mplex#getsockname)
 
1195
                                       (portname slave_fd));
1018
1196
                                if srv.transport_timeout >= 0.0 then
1019
1197
                                  mplex # set_timeout 
1020
1198
                                    ~notify:(on_trans_timeout srv conn) 
1055
1233
        (Unix.ADDR_INET (addr, port));
1056
1234
      s
1057
1235
    with
1058
 
        any -> Unix.close s; raise any
 
1236
        any -> 
 
1237
          Unix.close s; raise any
1059
1238
  in
1060
1239
 
1061
1240
  let bind_to_localhost port =
1062
1241
    bind_to_internet (Unix.inet_addr_of_string "127.0.0.1") port
1063
1242
  in
1064
1243
 
1065
 
  let (fd, close_inactive_descr) =
1066
 
    match conn with
1067
 
      | Localhost port ->
1068
 
          let s = bind_to_localhost port in
1069
 
          (s, true)
1070
 
      | Internet (addr,port) ->
1071
 
          let s = bind_to_internet addr port in
1072
 
          (s, true)
1073
 
      | Portmapped ->
1074
 
          let s = bind_to_internet Unix.inet_addr_any 0 in
1075
 
          ( try
1076
 
              let port =
1077
 
                match Unix.getsockname s with
1078
 
                  | Unix.ADDR_INET(_,port) -> port
1079
 
                  | _ -> assert false in
1080
 
              if !debug_internals_log <> None then 
1081
 
                debug_internalsf "Rpc_server: Using anonymous port %d" port;
1082
 
              srv.portmapped <- Some port;
1083
 
              (s, true)
1084
 
            with
1085
 
                any -> Unix.close s; raise any
1086
 
          )
1087
 
      | Unix path ->
1088
 
          let s =
1089
 
            Unix.socket
1090
 
              Unix.PF_UNIX
1091
 
              (if prot = Tcp then Unix.SOCK_STREAM else Unix.SOCK_DGRAM)
1092
 
              0
1093
 
          in
1094
 
          begin try
1095
 
            Unix.bind s (Unix.ADDR_UNIX path);
1096
 
            (s, true)
1097
 
          with
1098
 
            any -> Unix.close s; raise any
1099
 
          end
1100
 
      | Descriptor s -> 
1101
 
          (s, false)
1102
 
      | Dynamic_descriptor f -> 
1103
 
          let s = f() in
1104
 
          (s, true)
1105
 
  in
1106
 
  srv.main_socket_name <- `Sockaddr (Unix.getsockname fd);
 
1244
  let bind_to_w32_pipe name mode =
 
1245
    let psrv = Netsys_win32.create_local_pipe_server name mode max_int in
 
1246
    let s = Netsys_win32.pipe_server_descr psrv in
 
1247
    s
 
1248
  in
 
1249
 
 
1250
  let get_descriptor() =
 
1251
    let (fd, close_inactive_descr) =
 
1252
      match conn with
 
1253
        | Localhost port ->
 
1254
            let s = bind_to_localhost port in
 
1255
            (s, true)
 
1256
        | Internet (addr,port) ->
 
1257
            let s = bind_to_internet addr port in
 
1258
            (s, true)
 
1259
        | Portmapped ->
 
1260
            let s = bind_to_internet Unix.inet_addr_any 0 in
 
1261
            ( try
 
1262
                let port =
 
1263
                  match Unix.getsockname s with
 
1264
                    | Unix.ADDR_INET(_,port) -> port
 
1265
                    | _ -> assert false in
 
1266
                dlogr srv
 
1267
                  (fun () ->
 
1268
                     sprintf "Using anonymous port %d" port);
 
1269
                srv.portmapped <- Some port;
 
1270
                (s, true)
 
1271
              with
 
1272
                  any -> Unix.close s; raise any
 
1273
            )
 
1274
        | Unix path ->
 
1275
            ( match Sys.os_type with
 
1276
                | "Win32" ->
 
1277
                    let s =
 
1278
                      Unix.socket Unix.PF_INET Unix.SOCK_STREAM 0 in
 
1279
                    Unix.bind s (Unix.ADDR_INET(Unix.inet_addr_loopback, 0));
 
1280
                    ( match Unix.getsockname s with
 
1281
                        | Unix.ADDR_INET(_, port) ->
 
1282
                            let f = open_out path in
 
1283
                            output_string f (string_of_int port ^ "\n");
 
1284
                            close_out f
 
1285
                        | _ -> ()
 
1286
                    );
 
1287
                    (s, true)
 
1288
                | _ ->
 
1289
                    let s =
 
1290
                      Unix.socket
 
1291
                        Unix.PF_UNIX
 
1292
                        (if prot = Tcp then Unix.SOCK_STREAM else Unix.SOCK_DGRAM)
 
1293
                        0
 
1294
                    in
 
1295
                    begin try
 
1296
                      Unix.bind s (Unix.ADDR_UNIX path);
 
1297
                      (s, true)
 
1298
                    with
 
1299
                        any -> Unix.close s; raise any
 
1300
                    end
 
1301
            )
 
1302
        | W32_pipe path ->
 
1303
            let s = bind_to_w32_pipe path Netsys_win32.Pipe_duplex in
 
1304
            (s, true)
 
1305
        | Descriptor s -> 
 
1306
            (s, false)
 
1307
        | Dynamic_descriptor f -> 
 
1308
            let s = f() in
 
1309
            (s, true)
 
1310
    in
 
1311
    srv.main_socket_name <- ( try
 
1312
                                `Sockaddr (Unix.getsockname fd)
 
1313
                              with _ -> 
 
1314
                                `Implied 
 
1315
                            );
 
1316
    (fd, close_inactive_descr)
 
1317
  in
1107
1318
 
1108
1319
  match prot with
1109
1320
    | Udp ->
1110
 
        let mplex_eng = create_multiplexer_eng fd prot in
 
1321
        let (fd, close_inactive_descr) = get_descriptor() in
 
1322
        let mplex_eng = create_multiplexer_eng ~close_inactive_descr fd prot in
1111
1323
        when_state
1112
1324
          ~is_done:(fun mplex ->
1113
1325
                      let conn = connection srv mplex in
1114
1326
                      conn.fd <- Some fd;
1115
 
                      if !debug_service_log <> None then
1116
 
                        debug_servicef "Rpc_server (port %s): Accepting datagrams"
1117
 
                          (portname fd);
 
1327
                      dlogr_ctrace srv
 
1328
                        (fun () ->
 
1329
                           sprintf "(sock=%s,peer=%s): Accepting datagrams"
 
1330
                             (Rpc_transport.string_of_sockaddr 
 
1331
                                mplex#getsockname)
 
1332
                             (portname fd));
1118
1333
                      if srv.transport_timeout >= 0.0 then
1119
1334
                        mplex # set_timeout 
1120
1335
                          ~notify:(on_trans_timeout srv conn) 
1132
1347
        srv
1133
1348
 
1134
1349
    | Tcp ->
1135
 
        if !debug_service_log <> None then
1136
 
          debug_servicef "Rpc_server (port %s): Listening"
1137
 
            (portname fd);
 
1350
        let (fd, close_inactive_descr) = get_descriptor() in
 
1351
 
 
1352
        dlogr_ctrace srv
 
1353
          (fun () ->
 
1354
             sprintf "(sock=%s): Listening"
 
1355
               (portname fd));
1138
1356
        let backlog =
1139
1357
          match override_listen_backlog with
1140
1358
            | Some n -> n
1141
1359
            | None -> config#listen_options.lstn_backlog in
1142
 
        Unix.listen fd backlog;
1143
 
        let acc = new direct_socket_acceptor fd esys in
 
1360
        ( match conn with
 
1361
            | W32_pipe _ ->
 
1362
                let psrv = Netsys_win32.lookup_pipe_server fd in
 
1363
                Netsys_win32.pipe_listen psrv backlog
 
1364
            | _ ->
 
1365
                Unix.listen fd backlog
 
1366
        );
 
1367
        if close_inactive_descr then track_server fd;     
 
1368
        let acc = 
 
1369
          new Uq_engines.direct_acceptor 
 
1370
            ~close_on_shutdown: close_inactive_descr
 
1371
            ~preclose:(fun () -> Netlog.Debug.release_fd fd)
 
1372
            fd esys in
1144
1373
        srv.master_acceptor <- Some acc;
1145
1374
        accept_connections acc;
1146
1375
        srv
1153
1382
        create2_socket_endpoint prot fd esys
1154
1383
    | `Multiplexer_endpoint mplex ->
1155
1384
        if mplex#event_system != esys then
1156
 
          failwith "Rpc_server.create2: Multiplexer is attached to the wrong event system";
 
1385
          failwith "Rpc_server.create2: Multiplexer is attached \
 
1386
                    to the wrong event system";
1157
1387
        create2_multiplexer_endpoint mplex 
1158
1388
    | `Socket(prot,conn,config) ->
1159
1389
        create2_socket_server ~config prot conn esys
 
1390
    | `Dummy prot ->
 
1391
        let srv = create2_srv prot esys in
 
1392
        srv.dummy <- true;
 
1393
        srv
1160
1394
;;
1161
1395
 
1162
1396
 
 
1397
let is_dummy srv = srv.dummy
 
1398
 
 
1399
 
1163
1400
let bind ?program_number ?version_number prog0 procs srv =
1164
1401
  let prog = Rpc_program.update ?program_number ?version_number prog0 in
1165
1402
  let prog_nr = Rpc_program.program_number prog in
1218
1455
      ) in
1219
1456
 
1220
1457
  let pm_unset_old_port pm old_port f =
1221
 
    if !debug_internals_log <> None then 
1222
 
      debug_internalsf "Rpc_server: unregistering port: %d" old_port;
 
1458
    dlogr srv
 
1459
      (fun () ->
 
1460
         sprintf "unregistering port: %d" old_port);
1223
1461
    Rpc_portmapper_clnt.PMAP.V2.pmapproc_unset'async pm (pm_mapping old_port)
1224
1462
      (fun get_result ->
1225
1463
         try
1226
1464
           let success = get_result() in
1227
 
           if !debug_internals_log <> None then 
1228
 
             debug_internalsf "Rpc_server: portmapper reports %s"
1229
 
               (if success then "success" else "failure");
 
1465
           dlogr srv
 
1466
             (fun () ->
 
1467
                sprintf "portmapper reports %s"
 
1468
                  (if success then "success" else "failure"));
1230
1469
           if not success then
1231
1470
             failwith "Rpc_server.bind: Cannot unregister old port";
1232
1471
           f ()
1235
1474
      ) in
1236
1475
 
1237
1476
  let pm_set_new_port pm new_port f =
1238
 
    if !debug_internals_log <> None then 
1239
 
      debug_internalsf "Rpc_server: registering port: %d" new_port;
 
1477
    dlogr srv
 
1478
      (fun () ->
 
1479
         sprintf "registering port: %d" new_port);
1240
1480
    Rpc_portmapper_clnt.PMAP.V2.pmapproc_set'async pm (pm_mapping new_port)
1241
1481
      (fun get_result ->
1242
1482
         try
1243
1483
           let success = get_result() in
1244
 
           if !debug_internals_log <> None then 
1245
 
             debug_internalsf "Rpc_server: portmapper reports %s"
1246
 
               (if success then "success" else "failure");
 
1484
           dlogr srv
 
1485
             (fun () ->
 
1486
                sprintf "portmapper reports %s"
 
1487
                  (if success then "success" else "failure"));
1247
1488
           if not success then
1248
1489
             failwith "Rpc_server.bind: Cannot register port";
1249
1490
           f ()
1323
1564
    (try srv.exception_handler error with _ -> ()) in
1324
1565
 
1325
1566
  let pm_unset_port pm port f =
1326
 
    if !debug_internals_log <> None then 
1327
 
      debug_internalsf "Rpc_server: unregistering port: %d" port;
 
1567
    dlogr srv
 
1568
      (fun () ->
 
1569
         sprintf "unregistering port: %d" port);
1328
1570
    Rpc_portmapper_clnt.PMAP.V2.pmapproc_unset'async pm (pm_mapping port)
1329
1571
      (fun get_result ->
1330
1572
         try
1331
1573
           let success = get_result() in
1332
 
           if !debug_internals_log <> None then 
1333
 
             debug_internalsf "Rpc_server: portmapper reports %s"
1334
 
               (if success then "success" else "failure");
 
1574
           dlogr srv
 
1575
             (fun () ->
 
1576
                sprintf "portmapper reports %s"
 
1577
                  (if success then "success" else "failure"));
1335
1578
           if not success then
1336
1579
             failwith "Rpc_server.unbind: Cannot unregister port";
1337
1580
           f ()
1469
1712
 
1470
1713
let get_auth_method sess = sess.auth_method
1471
1714
 
 
1715
let get_last_proc_info srv = srv.get_last_proc()
 
1716
 
1472
1717
  (*****)
1473
1718
 
1474
 
let reply a_session result_value =
1475
 
    let conn = a_session.server in
1476
 
    let srv = conn.whole_server in
1477
 
    if conn.trans = None then raise Connection_lost;
1478
 
 
1479
 
    let prog =
1480
 
      match a_session.prog with
1481
 
        | None -> assert false
1482
 
        | Some p -> p in
1483
 
 
1484
 
    let reply = Rpc_packer.pack_successful_reply
1485
 
        prog a_session.procname a_session.client_id
1486
 
        a_session.auth_ret_flav a_session.auth_ret_data
1487
 
        result_value
1488
 
    in
1489
 
 
1490
 
    let reply_session =
1491
 
      { a_session with
1492
 
          parameter = XV_void;
1493
 
          result = reply
1494
 
      }
1495
 
    in
1496
 
 
1497
 
    if !debug_service_log <> None then
1498
 
      debug_servicef "Rpc_server (port %s, xid %s): Reply from %s"
1499
 
        (mplexoptname conn.trans) (xidname a_session.client_id) 
1500
 
        a_session.procname;
1501
 
 
1502
 
    Queue.add reply_session conn.replies;
1503
 
 
1504
 
    next_outgoing_message srv conn
1505
 
 
1506
 
 
1507
1719
let reply_error a_session condition =
1508
1720
    let conn = a_session.server in
1509
1721
    let srv = conn.whole_server in
1528
1740
    let reply_session =
1529
1741
      { a_session with
1530
1742
          parameter = XV_void;
1531
 
          result = reply
 
1743
          result = reply;
 
1744
          ptrace_result = (if !Debug.enable_ptrace then
 
1745
                             "Error " ^ errname condition
 
1746
                           else ""
 
1747
                          )
 
1748
 
1532
1749
      }
1533
1750
    in
1534
1751
 
1535
 
    if !debug_service_log <> None then
1536
 
      debug_servicef "Rpc_server (port %s, xid %s): Error %s"
1537
 
        (mplexoptname conn.trans) (xidname a_session.client_id)
1538
 
        (errname condition);
1539
 
 
1540
1752
    Queue.add reply_session conn.replies;
1541
1753
 
1542
1754
    next_outgoing_message srv conn
1543
1755
 
1544
1756
 
 
1757
let reply a_session result_value =
 
1758
  let conn = a_session.server in
 
1759
  let srv = conn.whole_server in
 
1760
 
 
1761
  dlogr srv
 
1762
    (fun () ->
 
1763
       sprintf "reply xid=%Ld have_encoder=%B"
 
1764
         (Rtypes.int64_of_uint4 a_session.client_id)
 
1765
         (a_session.encoder <> None)
 
1766
    );
 
1767
  
 
1768
  if conn.trans = None then raise Connection_lost;
 
1769
  
 
1770
  let prog =
 
1771
    match a_session.prog with
 
1772
      | None -> assert false
 
1773
      | Some p -> p in
 
1774
 
 
1775
  let f =
 
1776
    try
 
1777
      let reply = Rpc_packer.pack_successful_reply
 
1778
        ?encoder:a_session.encoder
 
1779
        prog a_session.procname a_session.client_id
 
1780
        a_session.auth_ret_flav a_session.auth_ret_data
 
1781
        result_value
 
1782
      in
 
1783
  
 
1784
      let reply_session =
 
1785
        { a_session with
 
1786
            parameter = XV_void;
 
1787
            result = reply;
 
1788
            ptrace_result = (if !Debug.enable_ptrace then
 
1789
                               Rpc_util.string_of_response
 
1790
                                 !Debug.ptrace_verbosity
 
1791
                                 prog
 
1792
                               a_session.procname
 
1793
                                 result_value
 
1794
                             else ""
 
1795
                            )
 
1796
        }
 
1797
      in
 
1798
      (fun () ->
 
1799
         Queue.add reply_session conn.replies;
 
1800
         next_outgoing_message srv conn
 
1801
      )
 
1802
    with (* exceptions raised by the encoder *)
 
1803
      | Late_drop ->
 
1804
          Netlog.logf `Err
 
1805
            "Dropping response message";
 
1806
          (fun () -> ())
 
1807
      | Rpc_server condition ->
 
1808
          reply_error a_session condition;
 
1809
          (fun () -> ())
 
1810
  in
 
1811
  f()
 
1812
    
 
1813
 
1545
1814
let set_exception_handler srv eh =
1546
1815
  srv.exception_handler <- eh
1547
1816
 
1571
1840
let set_timeout srv tmo =
1572
1841
  srv.transport_timeout <- tmo
1573
1842
 
 
1843
let set_mstring_factories srv fac =
 
1844
  srv.mstring_factories <- fac
 
1845
 
1574
1846
let stop_server ?(graceful = false) srv =
1575
 
  if !debug_internals_log <> None then 
1576
 
    debug_internalsf "Rpc_server: Stopping %s"
1577
 
      (if graceful then " gracefully" else "");
 
1847
  dlogr srv
 
1848
    (fun () ->
 
1849
       sprintf "Stopping %s" (if graceful then " gracefully" else ""));
1578
1850
  (* Close TCP server socket, if present: *)
1579
1851
  ( match srv.master_acceptor with
1580
1852
      | Some acc -> 
1614
1886
      | Not_found -> ()
1615
1887
  )
1616
1888
 
 
1889
let detach srv =
 
1890
  (* Detach from connections: *)
 
1891
    let l = srv.connections in
 
1892
    srv.connections <- [];
 
1893
    List.iter
 
1894
      (fun conn ->
 
1895
         conn.fd <- None;
 
1896
         match conn.trans with
 
1897
           | Some t -> 
 
1898
               t#abort_rw();
 
1899
               t#cancel_shutting_down()
 
1900
           | None -> ()
 
1901
      )
 
1902
      l
 
1903
  
 
1904
  
 
1905
 
1617
1906
let verbose b =
1618
 
  if b then (
1619
 
    debug_service_log := Some prerr_endline;
1620
 
    debug_internals_log := Some prerr_endline
1621
 
  )
1622
 
  else (
1623
 
    debug_service_log := None;
1624
 
    debug_internals_log := None
1625
 
  )
 
1907
  Debug.enable := b;
 
1908
  Debug.enable_ctrace := b