~ubuntu-branches/ubuntu/quantal/ruby1.9.1/quantal

« back to all changes in this revision

Viewing changes to lib/drb/drb.rb

  • Committer: Bazaar Package Importer
  • Author(s): Lucas Nussbaum
  • Date: 2011-09-24 19:16:17 UTC
  • mfrom: (1.1.8 upstream) (13.1.7 experimental)
  • Revision ID: james.westby@ubuntu.com-20110924191617-o1qz4rcmqjot8zuy
Tags: 1.9.3~rc1-1
* New upstream release: 1.9.3 RC1.
  + Includes load.c fixes. Closes: #639959.
* Upload to unstable.

Show diffs side-by-side

added added

removed removed

Lines of Context:
466
466
    def initialize(err, buf)
467
467
      case err.to_s
468
468
      when /uninitialized constant (\S+)/
469
 
        @name = $1
 
469
        @name = $1
470
470
      when /undefined class\/module (\S+)/
471
 
        @name = $1
 
471
        @name = $1
472
472
      else
473
 
        @name = nil
 
473
        @name = nil
474
474
      end
475
475
      @buf = buf
476
476
    end
486
486
 
487
487
    def self._load(s) # :nodoc:
488
488
      begin
489
 
        Marshal::load(s)
 
489
        Marshal::load(s)
490
490
      rescue NameError, ArgumentError
491
 
        DRbUnknown.new($!, s)
 
491
        DRbUnknown.new($!, s)
492
492
      end
493
493
    end
494
494
 
514
514
  class DRbArray
515
515
    def initialize(ary)
516
516
      @ary = ary.collect { |obj|
517
 
        if obj.kind_of? DRbUndumped
518
 
          DRbObject.new(obj)
519
 
        else
520
 
          begin
521
 
            Marshal.dump(obj)
522
 
            obj
523
 
          rescue
524
 
            DRbObject.new(obj)
525
 
          end
526
 
        end
 
517
        if obj.kind_of? DRbUndumped
 
518
          DRbObject.new(obj)
 
519
        else
 
520
          begin
 
521
            Marshal.dump(obj)
 
522
            obj
 
523
          rescue
 
524
            DRbObject.new(obj)
 
525
          end
 
526
        end
527
527
      }
528
528
    end
529
529
 
554
554
    def dump(obj, error=false)  # :nodoc:
555
555
      obj = make_proxy(obj, error) if obj.kind_of? DRbUndumped
556
556
      begin
557
 
        str = Marshal::dump(obj)
 
557
        str = Marshal::dump(obj)
558
558
      rescue
559
 
        str = Marshal::dump(make_proxy(obj, error))
 
559
        str = Marshal::dump(make_proxy(obj, error))
560
560
      end
561
561
      [str.size].pack('N') + str
562
562
    end
563
563
 
564
564
    def load(soc)  # :nodoc:
565
565
      begin
566
 
        sz = soc.read(4)        # sizeof (N)
 
566
        sz = soc.read(4)        # sizeof (N)
567
567
      rescue
568
568
        raise(DRbConnError, $!.message, $!.backtrace)
569
569
      end
600
600
      ary.push(dump(msg_id.id2name))
601
601
      ary.push(dump(arg.length))
602
602
      arg.each do |e|
603
 
        ary.push(dump(e))
 
603
        ary.push(dump(e))
604
604
      end
605
605
      ary.push(dump(b))
606
606
      stream.write(ary.join(''))
616
616
      raise(DRbConnError, "too many arguments") if @argc_limit < argc
617
617
      argv = Array.new(argc, nil)
618
618
      argc.times do |n|
619
 
        argv[n] = load(stream)
 
619
        argv[n] = load(stream)
620
620
      end
621
621
      block = load(stream)
622
622
      return ro, msg, argv, block
727
727
    # URI, but an error occurs in opening it, a DRbConnError is raised.
728
728
    def open(uri, config, first=true)
729
729
      @protocol.each do |prot|
730
 
        begin
731
 
          return prot.open(uri, config)
732
 
        rescue DRbBadScheme
733
 
        rescue DRbConnError
734
 
          raise($!)
735
 
        rescue
736
 
          raise(DRbConnError, "#{uri} - #{$!.inspect}")
737
 
        end
 
730
        begin
 
731
          return prot.open(uri, config)
 
732
        rescue DRbBadScheme
 
733
        rescue DRbConnError
 
734
          raise($!)
 
735
        rescue
 
736
          raise(DRbConnError, "#{uri} - #{$!.inspect}")
 
737
        end
738
738
      end
739
739
      if first && (config[:auto_load] != false)
740
 
        auto_load(uri, config)
741
 
        return open(uri, config, false)
 
740
        auto_load(uri, config)
 
741
        return open(uri, config, false)
742
742
      end
743
743
      raise DRbBadURI, 'can\'t parse uri:' + uri
744
744
    end
755
755
    # error is passed on to the caller.
756
756
    def open_server(uri, config, first=true)
757
757
      @protocol.each do |prot|
758
 
        begin
759
 
          return prot.open_server(uri, config)
760
 
        rescue DRbBadScheme
761
 
        end
 
758
        begin
 
759
          return prot.open_server(uri, config)
 
760
        rescue DRbBadScheme
 
761
        end
762
762
      end
763
763
      if first && (config[:auto_load] != false)
764
 
        auto_load(uri, config)
765
 
        return open_server(uri, config, false)
 
764
        auto_load(uri, config)
 
765
        return open_server(uri, config, false)
766
766
      end
767
767
      raise DRbBadURI, 'can\'t parse uri:' + uri
768
768
    end
776
776
    # URI, then a DRbBadURI error is raised.
777
777
    def uri_option(uri, config, first=true)
778
778
      @protocol.each do |prot|
779
 
        begin
780
 
          uri, opt = prot.uri_option(uri, config)
781
 
          # opt = nil if opt == ''
782
 
          return uri, opt
783
 
        rescue DRbBadScheme
784
 
        end
 
779
        begin
 
780
          uri, opt = prot.uri_option(uri, config)
 
781
          # opt = nil if opt == ''
 
782
          return uri, opt
 
783
        rescue DRbBadScheme
 
784
        end
785
785
      end
786
786
      if first && (config[:auto_load] != false)
787
 
        auto_load(uri, config)
 
787
        auto_load(uri, config)
788
788
        return uri_option(uri, config, false)
789
789
      end
790
790
      raise DRbBadURI, 'can\'t parse uri:' + uri
793
793
 
794
794
    def auto_load(uri, config)  # :nodoc:
795
795
      if uri =~ /^drb([a-z0-9]+):/
796
 
        require("drb/#{$1}") rescue nil
 
796
        require("drb/#{$1}") rescue nil
797
797
      end
798
798
    end
799
799
    module_function :auto_load
806
806
    private
807
807
    def self.parse_uri(uri)
808
808
      if uri =~ /^druby:\/\/(.*?):(\d+)(\?(.*))?$/
809
 
        host = $1
810
 
        port = $2.to_i
811
 
        option = $4
812
 
        [host, port, option]
 
809
        host = $1
 
810
        port = $2.to_i
 
811
        option = $4
 
812
        [host, port, option]
813
813
      else
814
 
        raise(DRbBadScheme, uri) unless uri =~ /^druby:/
815
 
        raise(DRbBadURI, 'can\'t parse uri:' + uri)
 
814
        raise(DRbBadScheme, uri) unless uri =~ /^druby:/
 
815
        raise(DRbBadURI, 'can\'t parse uri:' + uri)
816
816
      end
817
817
    end
818
818
 
820
820
 
821
821
    # Open a client connection to +uri+ using configuration +config+.
822
822
    def self.open(uri, config)
823
 
      host, port, option = parse_uri(uri)
 
823
      host, port, = parse_uri(uri)
824
824
      host.untaint
825
825
      port.untaint
826
826
      soc = TCPSocket.open(host, port)
852
852
    # configuration +config+.
853
853
    def self.open_server(uri, config)
854
854
      uri = 'druby://:0' unless uri
855
 
      host, port, opt = parse_uri(uri)
 
855
      host, port, _ = parse_uri(uri)
856
856
      config = {:tcp_original_host => host}.update(config)
857
857
      if host.size == 0
858
858
        host = getservername
859
859
        soc = open_server_inaddr_any(host, port)
860
860
      else
861
 
        soc = TCPServer.open(host, port)
 
861
        soc = TCPServer.open(host, port)
862
862
      end
863
863
      port = soc.addr[1] if port == 0
864
864
      config[:tcp_port] = port
928
928
    # client-server session.
929
929
    def close
930
930
      if @socket
931
 
        @socket.close
932
 
        @socket = nil
 
931
        @socket.close
 
932
        @socket = nil
933
933
      end
934
934
    end
935
935
 
938
938
    # the server's side of this client-server session.
939
939
    def accept
940
940
      while true
941
 
        s = @socket.accept
942
 
        break if (@acl ? @acl.allow_socket?(s) : true)
943
 
        s.close
 
941
        s = @socket.accept
 
942
        break if (@acl ? @acl.allow_socket?(s) : true)
 
943
        s.close
944
944
      end
945
945
      if @config[:tcp_original_host].to_s.size == 0
946
946
        uri = "druby://#{s.addr[3]}:#{@config[:tcp_port]}"
954
954
    def alive?
955
955
      return false unless @socket
956
956
      if IO.select([@socket], nil, nil, 0)
957
 
        close
958
 
        return false
 
957
        close
 
958
        return false
959
959
      end
960
960
      true
961
961
    end
1004
1004
      uri, ref = Marshal.load(s)
1005
1005
 
1006
1006
      if DRb.here?(uri)
1007
 
        obj = DRb.to_obj(ref)
 
1007
        obj = DRb.to_obj(ref)
1008
1008
        if ((! obj.tainted?) && Thread.current[:drb_untaint])
1009
1009
          Thread.current[:drb_untaint].push(obj)
1010
1010
        end
1042
1042
      @uri = nil
1043
1043
      @ref = nil
1044
1044
      if obj.nil?
1045
 
        return if uri.nil?
1046
 
        @uri, option = DRbProtocol.uri_option(uri, DRb.config)
1047
 
        @ref = DRbURIOption.new(option) unless option.nil?
 
1045
        return if uri.nil?
 
1046
        @uri, option = DRbProtocol.uri_option(uri, DRb.config)
 
1047
        @ref = DRbURIOption.new(option) unless option.nil?
1048
1048
      else
1049
 
        @uri = uri ? uri : (DRb.uri rescue nil)
1050
 
        @ref = obj ? DRb.to_id(obj) : nil
 
1049
        @uri = uri ? uri : (DRb.uri rescue nil)
 
1050
        @ref = obj ? DRb.to_id(obj) : nil
1051
1051
      end
1052
1052
    end
1053
1053
 
1078
1078
    # Routes method calls to the referenced object.
1079
1079
    def method_missing(msg_id, *a, &b)
1080
1080
      if DRb.here?(@uri)
1081
 
        obj = DRb.to_obj(@ref)
1082
 
        DRb.current_server.check_insecure_method(obj, msg_id)
1083
 
        return obj.__send__(msg_id, *a, &b)
 
1081
        obj = DRb.to_obj(@ref)
 
1082
        DRb.current_server.check_insecure_method(obj, msg_id)
 
1083
        return obj.__send__(msg_id, *a, &b)
1084
1084
      end
1085
1085
 
1086
1086
      succ, result = self.class.with_friend(@uri) do
1095
1095
        raise result
1096
1096
      else
1097
1097
        bt = self.class.prepare_backtrace(@uri, result)
1098
 
        result.set_backtrace(bt + caller)
 
1098
        result.set_backtrace(bt + caller)
1099
1099
        raise result
1100
1100
      end
1101
1101
    end
1153
1153
 
1154
1154
    def self.open(remote_uri)  # :nodoc:
1155
1155
      begin
1156
 
        conn = nil
1157
 
 
1158
 
        @mutex.synchronize do
1159
 
          #FIXME
1160
 
          new_pool = []
1161
 
          @pool.each do |c|
1162
 
            if conn.nil? and c.uri == remote_uri
1163
 
              conn = c if c.alive?
1164
 
            else
1165
 
              new_pool.push c
1166
 
            end
1167
 
          end
1168
 
          @pool = new_pool
1169
 
        end
1170
 
 
1171
 
        conn = self.new(remote_uri) unless conn
1172
 
        succ, result = yield(conn)
1173
 
        return succ, result
 
1156
        conn = nil
 
1157
 
 
1158
        @mutex.synchronize do
 
1159
          #FIXME
 
1160
          new_pool = []
 
1161
          @pool.each do |c|
 
1162
            if conn.nil? and c.uri == remote_uri
 
1163
              conn = c if c.alive?
 
1164
            else
 
1165
              new_pool.push c
 
1166
            end
 
1167
          end
 
1168
          @pool = new_pool
 
1169
        end
 
1170
 
 
1171
        conn = self.new(remote_uri) unless conn
 
1172
        succ, result = yield(conn)
 
1173
        return succ, result
1174
1174
 
1175
1175
      ensure
1176
 
        if conn
1177
 
          if succ
1178
 
            @mutex.synchronize do
1179
 
              @pool.unshift(conn)
1180
 
              @pool.pop.close while @pool.size > POOL_SIZE
1181
 
            end
1182
 
          else
1183
 
            conn.close
1184
 
          end
1185
 
        end
 
1176
        if conn
 
1177
          if succ
 
1178
            @mutex.synchronize do
 
1179
              @pool.unshift(conn)
 
1180
              @pool.pop.close while @pool.size > POOL_SIZE
 
1181
            end
 
1182
          else
 
1183
            conn.close
 
1184
          end
 
1185
        end
1186
1186
      end
1187
1187
    end
1188
1188
 
1274
1274
 
1275
1275
    def self.make_config(hash={})  # :nodoc:
1276
1276
      default_config = {
1277
 
        :idconv => @@idconv,
1278
 
        :verbose => @@verbose,
1279
 
        :tcp_acl => @@acl,
1280
 
        :load_limit => @@load_limit,
1281
 
        :argc_limit => @@argc_limit,
 
1277
        :idconv => @@idconv,
 
1278
        :verbose => @@verbose,
 
1279
        :tcp_acl => @@acl,
 
1280
        :load_limit => @@load_limit,
 
1281
        :argc_limit => @@argc_limit,
1282
1282
        :safe_level => @@safe_level
1283
1283
      }
1284
1284
      default_config.update(hash)
1329
1329
    # The server will immediately start running in its own thread.
1330
1330
    def initialize(uri=nil, front=nil, config_or_acl=nil)
1331
1331
      if Hash === config_or_acl
1332
 
        config = config_or_acl.dup
 
1332
        config = config_or_acl.dup
1333
1333
      else
1334
 
        acl = config_or_acl || @@acl
1335
 
        config = {
1336
 
          :tcp_acl => acl
1337
 
        }
 
1334
        acl = config_or_acl || @@acl
 
1335
        config = {
 
1336
          :tcp_acl => acl
 
1337
        }
1338
1338
      end
1339
1339
 
1340
1340
      @config = self.class.make_config(config)
1341
1341
 
1342
1342
      @protocol = DRbProtocol.open_server(uri, @config)
1343
1343
      @uri = @protocol.uri
 
1344
      @exported_uri = [@uri]
1344
1345
 
1345
1346
      @front = front
1346
1347
      @idconv = @config[:idconv]
1387
1388
    def alive?
1388
1389
      @thread.alive?
1389
1390
    end
 
1391
    
 
1392
    def here?(uri)
 
1393
      @exported_uri.include?(uri)
 
1394
    end
1390
1395
 
1391
1396
    # Stop this server.
1392
1397
    def stop_service
1412
1417
    end
1413
1418
 
1414
1419
    private
1415
 
    def kill_sub_thread
1416
 
      Thread.new do
1417
 
        grp = ThreadGroup.new
1418
 
        grp.add(Thread.current)
1419
 
        list = @grp.list
1420
 
        while list.size > 0
1421
 
          list.each do |th|
1422
 
            th.kill if th.alive?
1423
 
          end
1424
 
          list = @grp.list
1425
 
        end
1426
 
      end
1427
 
    end
1428
 
 
1429
1420
    def run
1430
1421
      Thread.start do
1431
 
        begin
1432
 
          while true
1433
 
            main_loop
1434
 
          end
1435
 
        ensure
1436
 
          @protocol.close if @protocol
1437
 
          kill_sub_thread
1438
 
        end
 
1422
        begin
 
1423
          while true
 
1424
            main_loop
 
1425
          end
 
1426
        ensure
 
1427
          @protocol.close if @protocol
 
1428
        end
1439
1429
      end
1440
1430
    end
1441
1431
 
1473
1463
      raise(SecurityError, "insecure method `#{msg_id}'") if insecure_method?(msg_id)
1474
1464
 
1475
1465
      if obj.private_methods.include?(msg_id)
1476
 
        desc = any_to_s(obj)
 
1466
        desc = any_to_s(obj)
1477
1467
        raise NoMethodError, "private method `#{msg_id}' called for #{desc}"
1478
1468
      elsif obj.protected_methods.include?(msg_id)
1479
 
        desc = any_to_s(obj)
 
1469
        desc = any_to_s(obj)
1480
1470
        raise NoMethodError, "protected method `#{msg_id}' called for #{desc}"
1481
1471
      else
1482
1472
        true
1486
1476
 
1487
1477
    class InvokeMethod  # :nodoc:
1488
1478
      def initialize(drb_server, client)
1489
 
        @drb_server = drb_server
 
1479
        @drb_server = drb_server
1490
1480
        @safe_level = drb_server.safe_level
1491
 
        @client = client
 
1481
        @client = client
1492
1482
      end
1493
1483
 
1494
1484
      def perform
1495
 
        @result = nil
1496
 
        @succ = false
1497
 
        setup_message
 
1485
        @result = nil
 
1486
        @succ = false
 
1487
        setup_message
1498
1488
 
1499
1489
        if $SAFE < @safe_level
1500
1490
          info = Thread.current['DRb']
1518
1508
            @result = perform_without_block
1519
1509
          end
1520
1510
        end
1521
 
        @succ = true
1522
 
        if @msg_id == :to_ary && @result.class == Array
1523
 
          @result = DRbArray.new(@result)
1524
 
        end
1525
 
        return @succ, @result
 
1511
        @succ = true
 
1512
        if @msg_id == :to_ary && @result.class == Array
 
1513
          @result = DRbArray.new(@result)
 
1514
        end
 
1515
        return @succ, @result
1526
1516
      rescue StandardError, ScriptError, Interrupt
1527
 
        @result = $!
1528
 
        return @succ, @result
 
1517
        @result = $!
 
1518
        return @succ, @result
1529
1519
      end
1530
1520
 
1531
1521
      private
1532
1522
      def init_with_client
1533
 
        obj, msg, argv, block = @client.recv_request
 
1523
        obj, msg, argv, block = @client.recv_request
1534
1524
        @obj = obj
1535
1525
        @msg_id = msg.intern
1536
1526
        @argv = argv
1542
1532
      end
1543
1533
 
1544
1534
      def setup_message
1545
 
        init_with_client
1546
 
        check_insecure_method
 
1535
        init_with_client
 
1536
        check_insecure_method
1547
1537
      end
1548
1538
 
1549
1539
      def perform_without_block
1550
 
        if Proc === @obj && @msg_id == :__drb_yield
 
1540
        if Proc === @obj && @msg_id == :__drb_yield
1551
1541
          if @argv.size == 1
1552
 
            ary = @argv
1553
 
          else
1554
 
            ary = [@argv]
1555
 
          end
1556
 
          ary.collect(&@obj)[0]
1557
 
        else
1558
 
          @obj.__send__(@msg_id, *@argv)
1559
 
        end
 
1542
            ary = @argv
 
1543
          else
 
1544
            ary = [@argv]
 
1545
          end
 
1546
          ary.collect(&@obj)[0]
 
1547
        else
 
1548
          @obj.__send__(@msg_id, *@argv)
 
1549
        end
1560
1550
      end
1561
1551
 
1562
1552
    end
1582
1572
    # or a local method call fails.
1583
1573
    def main_loop
1584
1574
      Thread.start(@protocol.accept) do |client|
1585
 
        @grp.add Thread.current
1586
 
        Thread.current['DRb'] = { 'client' => client ,
1587
 
                                  'server' => self }
1588
 
        loop do
1589
 
          begin
1590
 
            succ = false
1591
 
            invoke_method = InvokeMethod.new(self, client)
1592
 
            succ, result = invoke_method.perform
1593
 
            if !succ && verbose
1594
 
              p result
1595
 
              result.backtrace.each do |x|
1596
 
                puts x
1597
 
              end
1598
 
            end
1599
 
            client.send_reply(succ, result) rescue nil
1600
 
          ensure
 
1575
        @grp.add Thread.current
 
1576
        Thread.current['DRb'] = { 'client' => client ,
 
1577
                                  'server' => self }
 
1578
        DRb.mutex.synchronize do
 
1579
          client_uri = client.uri
 
1580
          @exported_uri << client_uri unless @exported_uri.include?(client_uri)
 
1581
        end
 
1582
        loop do
 
1583
          begin
 
1584
            succ = false
 
1585
            invoke_method = InvokeMethod.new(self, client)
 
1586
            succ, result = invoke_method.perform
 
1587
            if !succ && verbose
 
1588
              p result
 
1589
              result.backtrace.each do |x|
 
1590
                puts x
 
1591
              end
 
1592
            end
 
1593
            client.send_reply(succ, result) rescue nil
 
1594
          ensure
1601
1595
            client.close unless succ
1602
1596
            if Thread.current['DRb']['stop_service']
1603
1597
              Thread.new { stop_service }
1604
1598
            end
1605
1599
            break unless succ
1606
 
          end
1607
 
        end
 
1600
          end
 
1601
        end
1608
1602
      end
1609
1603
    end
1610
1604
  end
1681
1675
 
1682
1676
  # Is +uri+ the URI for the current local server?
1683
1677
  def here?(uri)
1684
 
    (current_server.uri rescue nil) == uri
 
1678
    current_server.here?(uri) rescue false
 
1679
    # (current_server.uri rescue nil) == uri
1685
1680
  end
1686
1681
  module_function :here?
1687
1682
 
1773
1768
  module_function :fetch_server
1774
1769
end
1775
1770
 
 
1771
# :stopdoc:
1776
1772
DRbObject = DRb::DRbObject
1777
1773
DRbUndumped = DRb::DRbUndumped
1778
1774
DRbIdConv = DRb::DRbIdConv