~ubuntu-branches/ubuntu/precise/ceph/precise-proposed

« back to all changes in this revision

Viewing changes to src/msg/SimpleMessenger.cc

  • Committer: Bazaar Package Importer
  • Author(s): Laszlo Boszormenyi (GCS)
  • Date: 2011-04-25 10:09:05 UTC
  • mfrom: (1.1.3 upstream) (0.1.5 sid)
  • Revision ID: james.westby@ubuntu.com-20110425100905-exm7dfvi2v5ick02
Tags: 0.27-1
New upstream release.

Show diffs side-by-side

added added

removed removed

Lines of Context:
24
24
#include <limits.h>
25
25
#include <sys/user.h>
26
26
 
27
 
#include "config.h"
 
27
#include "common/config.h"
28
28
 
29
29
#include "messages/MGenericMessage.h"
30
30
 
34
34
#include <fstream>
35
35
 
36
36
#include "common/Timer.h"
37
 
#include "common/signal.h"
 
37
#include "common/errno.h"
 
38
#include "common/pidfile.h"
 
39
#include "common/safe_io.h"
38
40
 
39
41
#define DOUT_SUBSYS ms
40
42
#undef dout_prefix
41
43
#define dout_prefix _prefix(messenger)
42
44
static ostream& _prefix(SimpleMessenger *messenger) {
43
 
  return *_dout << dbeginl << "-- " << messenger->ms_addr << " ";
 
45
  return *_dout << "-- " << messenger->ms_addr << " ";
44
46
}
45
47
 
46
48
 
50
52
 * Accepter
51
53
 */
52
54
 
53
 
int SimpleMessenger::Accepter::bind(int64_t force_nonce, entity_addr_t &bind_addr, int avoid_port1, int avoid_port2)
 
55
int SimpleMessenger::Accepter::bind(uint64_t nonce, entity_addr_t &bind_addr, int avoid_port1, int avoid_port2)
54
56
{
55
57
  // bind to a socket
56
58
  dout(10) << "accepter.bind" << dendl;
67
69
  listen_sd = ::socket(family, SOCK_STREAM, 0);
68
70
  if (listen_sd < 0) {
69
71
    char buf[80];
70
 
    derr(0) << "accepter.bind unable to create socket: "
 
72
    dout(0) << "accepter.bind unable to create socket: "
71
73
            << strerror_r(errno, buf, sizeof(buf)) << dendl;
72
74
    cerr << "accepter.bind unable to create socket: "
73
75
         << strerror_r(errno, buf, sizeof(buf)) << std::endl;
89
91
    rc = ::bind(listen_sd, (struct sockaddr *) &listen_addr.ss_addr(), sizeof(listen_addr.ss_addr()));
90
92
    if (rc < 0) {
91
93
      char buf[80];
92
 
      derr(0) << "accepter.bind unable to bind to " << bind_addr.ss_addr()
 
94
      dout(0) << "accepter.bind unable to bind to " << bind_addr.ss_addr()
93
95
              << ": " << strerror_r(errno, buf, sizeof(buf)) << dendl;
94
96
      cerr << "accepter.bind unable to bind to " << bind_addr.ss_addr()
95
97
           << ": " << strerror_r(errno, buf, sizeof(buf)) << std::endl;
107
109
    }
108
110
    if (rc < 0) {
109
111
      char buf[80];
110
 
      derr(0) << "accepter.bind unable to bind to " << bind_addr.ss_addr()
 
112
      dout(0) << "accepter.bind unable to bind to " << bind_addr.ss_addr()
111
113
              << " on any port in range " << CEPH_PORT_START << "-" << CEPH_PORT_LAST
112
114
              << ": " << strerror_r(errno, buf, sizeof(buf)) << dendl;
113
115
      cerr << "accepter.bind unable to bind to " << bind_addr.ss_addr()
128
130
  rc = ::listen(listen_sd, 128);
129
131
  if (rc < 0) {
130
132
    char buf[80];
131
 
    derr(0) << "accepter.bind unable to listen on " << bind_addr.ss_addr()
 
133
    dout(0) << "accepter.bind unable to listen on " << bind_addr.ss_addr()
132
134
            << ": " << strerror_r(errno, buf, sizeof(buf)) << dendl;
133
135
    cerr << "accepter.bind unable to listen on " << bind_addr.ss_addr()
134
136
         << ": " << strerror_r(errno, buf, sizeof(buf)) << std::endl;
143
145
 
144
146
  if (messenger->ms_addr.get_port() == 0) {
145
147
    messenger->ms_addr = listen_addr;
146
 
    if (force_nonce >= 0)
147
 
      messenger->ms_addr.nonce = force_nonce;
148
 
    else
149
 
      messenger->ms_addr.nonce = getpid(); // FIXME: pid might not be best choice here.
 
148
    messenger->ms_addr.nonce = nonce;
150
149
  }
151
150
 
152
151
  messenger->init_local_pipe();
467
466
          << " --> " << dest.name << " " << dest.addr
468
467
          << " -- " << *m
469
468
          << " -- ?+" << m->get_data().length()
470
 
          << " " << m 
 
469
          << " " << m
471
470
          << dendl;
472
471
 
473
472
  submit_message(m, dest.addr, dest.name.type(), true);
490
489
#undef dout_prefix
491
490
#define dout_prefix _pipe_prefix()
492
491
ostream& SimpleMessenger::Pipe::_pipe_prefix() {
493
 
  return *_dout << dbeginl
494
 
                << "-- " << messenger->ms_addr << " >> " << peer_addr << " pipe(" << this
 
492
  return *_dout << "-- " << messenger->ms_addr << " >> " << peer_addr << " pipe(" << this
495
493
                << " sd=" << sd
496
494
                << " pgs=" << peer_global_seq
497
495
                << " cs=" << connect_seq
533
531
  assert(pipe_lock.is_locked());
534
532
  
535
533
  list<Message *>& queue = in_q[priority];
536
 
  bool was_empty;
537
534
  
538
535
  if (halt_delivery)
539
536
    goto halt;
540
537
  
541
 
  was_empty = queue.empty();
542
 
  queue.push_back(m);
543
 
  
544
 
  //increment queue length counters
545
 
  in_qlen++;
546
 
  messenger->dispatch_queue.qlen_lock.lock();
547
 
  ++messenger->dispatch_queue.qlen;
548
 
  messenger->dispatch_queue.qlen_lock.unlock();
549
 
  
550
 
  if (was_empty) { //this pipe isn't on the endpoint queue
 
538
  if (queue.empty()) {
 
539
    // queue pipe AND message under pipe AND dispatch_queue locks.
551
540
    pipe_lock.Unlock();
552
541
    messenger->dispatch_queue.lock.Lock();
553
542
    pipe_lock.Lock();
554
 
    
 
543
 
555
544
    if (halt_delivery) {
556
545
      messenger->dispatch_queue.lock.Unlock();
557
546
      goto halt;
558
547
    }
559
 
    
560
 
    dout(20) << "queue_received queuing pipe" << dendl;
561
 
    if (!queue_items.count(priority)) 
562
 
      queue_items[priority] = new xlist<Pipe *>::item(this);
563
 
    if (messenger->dispatch_queue.queued_pipes.empty())
564
 
      messenger->dispatch_queue.cond.Signal();
565
 
    messenger->dispatch_queue.queued_pipes[priority].push_back(queue_items[priority]);
 
548
 
 
549
    if (queue.empty()) {
 
550
      dout(20) << "queue_received queuing pipe" << dendl;
 
551
      if (!queue_items.count(priority)) 
 
552
        queue_items[priority] = new xlist<Pipe *>::item(this);
 
553
      if (messenger->dispatch_queue.queued_pipes.empty())
 
554
        messenger->dispatch_queue.cond.Signal();
 
555
      messenger->dispatch_queue.queued_pipes[priority].push_back(queue_items[priority]);
 
556
    }
 
557
 
 
558
    queue.push_back(m);
 
559
 
566
560
    messenger->dispatch_queue.lock.Unlock();
 
561
  } else {
 
562
    // just queue message under pipe lock.
 
563
    queue.push_back(m);
567
564
  }
 
565
  
 
566
  // increment queue length counters
 
567
  in_qlen++;
 
568
  messenger->dispatch_queue.qlen_lock.lock();
 
569
  ++messenger->dispatch_queue.qlen;
 
570
  messenger->dispatch_queue.qlen_lock.unlock();
 
571
  
568
572
  return;
569
573
  
570
574
 halt:
754
758
      }
755
759
      
756
760
      if (existing->policy.lossy) {
757
 
        dout(-10) << "accept replacing existing (lossy) channel (new one lossy="
758
 
                  << policy.lossy << ")" << dendl;
 
761
        dout(0) << "accept replacing existing (lossy) channel (new one lossy="
 
762
                << policy.lossy << ")" << dendl;
759
763
        existing->was_session_reset();
760
764
        goto replace;
761
765
      }
762
766
      /*if (lossy_rx) {
763
767
        if (existing->state == STATE_STANDBY) {
764
 
          dout(-10) << "accept incoming lossy connection, kicking outgoing lossless "
 
768
          dout(0) << "accept incoming lossy connection, kicking outgoing lossless "
765
769
                    << existing << dendl;
766
770
          existing->state = STATE_CONNECTING;
767
771
          existing->cond.Signal();
768
772
        } else {
769
 
          dout(-10) << "accept incoming lossy connection, our lossless " << existing
 
773
          dout(0) << "accept incoming lossy connection, our lossless " << existing
770
774
                    << " has state " << existing->state << ", doing nothing" << dendl;
771
775
        }
772
776
        existing->lock.Unlock();
773
777
        goto fail;
774
778
        }*/
775
779
 
776
 
      dout(-10) << "accept connect_seq " << connect.connect_seq
 
780
      dout(0) << "accept connect_seq " << connect.connect_seq
777
781
                << " vs existing " << existing->connect_seq
778
782
                << " state " << existing->state << dendl;
779
783
 
780
784
      if (connect.connect_seq < existing->connect_seq) {
781
785
        if (connect.connect_seq == 0) {
782
 
          dout(-10) << "accept peer reset, then tried to connect to us, replacing" << dendl;
 
786
          dout(0) << "accept peer reset, then tried to connect to us, replacing" << dendl;
783
787
          existing->was_session_reset(); // this resets out_queue, msg_ and connect_seq #'s
784
788
          goto replace;
785
789
        } else {
999
1003
  sd = ::socket(peer_addr.get_family(), SOCK_STREAM, 0);
1000
1004
  if (sd < 0) {
1001
1005
    char buf[80];
1002
 
    dout(-1) << "connect couldn't created socket " << strerror_r(errno, buf, sizeof(buf)) << dendl;
 
1006
    derr << "connect couldn't created socket " << strerror_r(errno, buf, sizeof(buf)) << dendl;
1003
1007
    assert(0);
1004
1008
    goto fail;
1005
1009
  }
1461
1465
 
1462
1466
void SimpleMessenger::Pipe::fail()
1463
1467
{
1464
 
  derr(10) << "fail" << dendl;
 
1468
  dout(10) << "fail" << dendl;
1465
1469
  assert(pipe_lock.is_locked());
1466
1470
 
1467
1471
  stop();
1587
1591
      // occasionally pull a message out of the sent queue to send elsewhere.  in that case
1588
1592
      // it doesn't matter if we "got" it or not.
1589
1593
      if (m->get_seq() <= in_seq) {
1590
 
        dout(-10) << "reader got old message "
1591
 
                  << m->get_seq() << " <= " << in_seq << " " << m << " " << *m
1592
 
                  << ", discarding" << dendl;
 
1594
        dout(0) << "reader got old message "
 
1595
                << m->get_seq() << " <= " << in_seq << " " << m << " " << *m
 
1596
                << ", discarding" << dendl;
1593
1597
        messenger->dispatch_throttle_release(m->get_dispatch_throttle_size());
1594
1598
        m->put();
1595
1599
        continue;
1665
1669
      char tag = CEPH_MSGR_TAG_CLOSE;
1666
1670
      state = STATE_CLOSED;
1667
1671
      pipe_lock.Unlock();
1668
 
      if (sd) ::write(sd, &tag, 1);
 
1672
      if (sd) {
 
1673
        int r = ::write(sd, &tag, 1);
 
1674
        // we can ignore r, actually; we don't care if this succeeds.
 
1675
        r++; r = 0; // placate gcc
 
1676
      }
1669
1677
      pipe_lock.Lock();
1670
1678
      continue;
1671
1679
    }
1724
1732
 
1725
1733
        pipe_lock.Lock();
1726
1734
        if (rc < 0) {
1727
 
          derr(1) << "writer error sending " << m << ", "
 
1735
          dout(1) << "writer error sending " << m << ", "
1728
1736
                  << errno << ": " << strerror_r(errno, buf, sizeof(buf)) << dendl;
1729
1737
          fault();
1730
1738
        }
1764
1772
  int head = 0;
1765
1773
  if (off & ~PAGE_MASK) {
1766
1774
    // head
1767
 
    head = MIN(PAGE_SIZE - (off & ~PAGE_MASK), (unsigned)left);
 
1775
    head = MIN(PAGE_SIZE - (off & ~PAGE_MASK), left);
1768
1776
    bufferptr bp = buffer::create(head);
1769
1777
    data.push_back(bp);
1770
1778
    left -= head;
1980
1988
      assert(l == len);
1981
1989
    }
1982
1990
 
1983
 
    int r = ::sendmsg(sd, msg, more ? MSG_MORE : 0);
 
1991
    int r = ::sendmsg(sd, msg, MSG_NOSIGNAL | (more ? MSG_MORE : 0));
1984
1992
    if (r == 0) 
1985
1993
      dout(10) << "do_sendmsg hmm do_sendmsg got r==0!" << dendl;
1986
1994
    if (r < 0) { 
2031
2039
    if (len == 0) break;
2032
2040
    
2033
2041
    // hrmph.  trim r bytes off the front of our message.
2034
 
    dout(20) << "do_sendmail short write did " << r << ", still have " << len << dendl;
 
2042
    dout(20) << "do_sendmsg short write did " << r << ", still have " << len << dendl;
2035
2043
    while (r > 0) {
2036
2044
      if (msg->msg_iov[0].iov_len <= (size_t)r) {
2037
2045
        // lose this whole item
2287
2295
 
2288
2296
 
2289
2297
 
2290
 
int SimpleMessenger::bind(entity_addr_t &bind_addr, int64_t force_nonce)
 
2298
int SimpleMessenger::bind(entity_addr_t bind_addr, int64_t nonce)
2291
2299
{
2292
2300
  lock.Lock();
2293
2301
  if (started) {
2299
2307
  lock.Unlock();
2300
2308
 
2301
2309
  // bind to a socket
2302
 
  return accepter.bind(force_nonce, bind_addr);
 
2310
  return accepter.bind(nonce, bind_addr);
2303
2311
}
2304
2312
 
2305
2313
int SimpleMessenger::rebind(int avoid_port)
2309
2317
  return accepter.rebind(avoid_port);
2310
2318
}
2311
2319
 
2312
 
static void remove_pid_file(int signal = 0)
2313
 
{
2314
 
  if (!g_conf.pid_file)
2315
 
    return;
2316
 
 
2317
 
  // only remove it if it has OUR pid in it!
2318
 
  int fd = ::open(g_conf.pid_file, O_RDONLY);
2319
 
  if (fd >= 0) {
2320
 
    char buf[20];
2321
 
    ::read(fd, buf, 20);
2322
 
    ::close(fd);
2323
 
    int a = atoi(buf);
2324
 
 
2325
 
    if (a == getpid())
2326
 
      ::unlink(g_conf.pid_file);
2327
 
    else if (!signal)
2328
 
      generic_dout(0) << "strange, pid file " << g_conf.pid_file 
2329
 
              << " has " << a << ", not expected " << getpid()
2330
 
              << dendl;
2331
 
  }
2332
 
}
2333
 
 
2334
 
static void handle_signal(int sig)
2335
 
{
2336
 
  remove_pid_file(sig);
2337
 
  signal(sig, SIG_DFL);
2338
 
  kill(getpid(), sig);
2339
 
}
2340
 
 
2341
 
static void write_pid_file(int pid)
2342
 
{
2343
 
  if (!g_conf.pid_file)
2344
 
    return;
2345
 
 
2346
 
  int fd = ::open(g_conf.pid_file, O_CREAT|O_TRUNC|O_WRONLY, 0644);
2347
 
  if (fd >= 0) {
2348
 
    char buf[20];
2349
 
    int len = snprintf(buf, sizeof(buf), "%d\n", pid);
2350
 
    ::write(fd, buf, len);
2351
 
    ::close(fd);
2352
 
 
2353
 
    signal(SIGTERM, handle_signal);
2354
 
    signal(SIGINT, handle_signal);
2355
 
  }
2356
 
}
2357
 
 
2358
 
int SimpleMessenger::start(bool nodaemon)
 
2320
int SimpleMessenger::start(bool daemonize, uint64_t nonce)
2359
2321
{
2360
2322
  // register at least one entity, first!
2361
2323
  assert(my_type >= 0); 
2366
2328
    lock.Unlock();
2367
2329
    return 0;
2368
2330
  }
2369
 
 
2370
 
  if (!did_bind)
2371
 
    ms_addr.nonce = getpid();
 
2331
  if (!did_bind) {
 
2332
    ms_addr.nonce = nonce;
 
2333
  }
2372
2334
 
2373
2335
  dout(1) << "messenger.start" << dendl;
2374
2336
  started = true;
2375
2337
  lock.Unlock();
2376
2338
 
2377
2339
  // daemonize?
2378
 
  if (g_conf.daemonize && !nodaemon) {
2379
 
    if (Thread::get_num_threads() > 0) {
2380
 
      derr(0) << "messenger.start BUG: there are " << Thread::get_num_threads()
2381
 
              << " already started that will now die!  call messenger.start() sooner." 
2382
 
              << dendl;
 
2340
  if (daemonize) {
 
2341
    int num_threads = Thread::get_num_threads();
 
2342
    if (num_threads > 1) {
 
2343
      derr << "messenger.start BUG: there are " << num_threads - 1
 
2344
           << " child threads already started that will now die!  call messenger.start() sooner."
 
2345
           << dendl;
2383
2346
    }
2384
 
    dout(1) << "messenger.start daemonizing" << dendl;
2385
2347
 
2386
 
    if (1) {
2387
 
      daemon(1, 0);
2388
 
      install_standard_sighandlers();
2389
 
      write_pid_file(getpid());
2390
 
    } else {
2391
 
      pid_t pid = fork();
2392
 
      if (pid) {
2393
 
        // i am parent
2394
 
        write_pid_file(pid);
2395
 
        ::close(0);
2396
 
        ::close(1);
2397
 
        ::close(2);
2398
 
        _exit(0);
 
2348
    int r = daemon(1, 0);
 
2349
    assert(r >= 0);
 
2350
    install_standard_sighandlers();
 
2351
    pidfile_write(&g_conf);
 
2352
 
 
2353
    if (!g_conf.chdir.empty()) {
 
2354
      if (::chdir(g_conf.chdir.c_str())) {
 
2355
        int err = errno;
 
2356
        derr << "messenger.start: failed to chdir to directory: '"
 
2357
             << g_conf.chdir << "': " << cpp_strerror(err) << dendl;
2399
2358
      }
2400
 
      install_standard_sighandlers();
2401
 
    }
2402
 
 
2403
 
    if (g_conf.chdir && g_conf.chdir[0]) {
2404
 
      ::mkdir(g_conf.chdir, 0700);
2405
 
      ::chdir(g_conf.chdir);
2406
 
    }
2407
 
 
2408
 
    dout_rename_output_file();
 
2359
    }
 
2360
    dout_handle_daemonize();
 
2361
    dout(1) << "messenger.start daemonized" << dendl;
2409
2362
  }
2410
2363
 
2411
2364
  // go!
2570
2523
        dout(20) << "submit_message " << *m << " local" << dendl;
2571
2524
        dispatch_queue.local_delivery(m, m->get_priority());
2572
2525
      } else {
2573
 
        derr(0) << "submit_message " << *m << " " << dest_addr << " local but no local endpoint, dropping." << dendl;
 
2526
        dout(0) << "submit_message " << *m << " " << dest_addr << " local but no local endpoint, dropping." << dendl;
2574
2527
        assert(0);  // hmpf, this is probably mds->mon beacon from newsyn.
2575
2528
        m->put();
2576
2529
      }
2702
2655
 
2703
2656
  dout(10) << "wait: done." << dendl;
2704
2657
  dout(1) << "shutdown complete." << dendl;
2705
 
  remove_pid_file();
 
2658
  pidfile_remove();
2706
2659
  started = false;
2707
2660
  did_bind = false;
2708
2661
  my_type = -1;
2742
2695
  lock.Unlock();
2743
2696
}
2744
2697
 
2745
 
void SimpleMessenger::learned_addr(entity_addr_t peer_addr_for_me)
 
2698
void SimpleMessenger::mark_down(Connection *con)
 
2699
{
 
2700
  lock.Lock();
 
2701
  Pipe *p = (Pipe *)con->get_pipe();
 
2702
  if (p) {
 
2703
    dout(1) << "mark_down " << con << " -- " << p << dendl;
 
2704
    p->unregister_pipe();
 
2705
    p->pipe_lock.Lock();
 
2706
    p->stop();
 
2707
    p->pipe_lock.Unlock();
 
2708
    p->put();
 
2709
  } else {
 
2710
    dout(1) << "mark_down " << con << " -- pipe dne" << dendl;
 
2711
  }
 
2712
  lock.Unlock();
 
2713
}
 
2714
 
 
2715
void SimpleMessenger::learned_addr(const entity_addr_t &peer_addr_for_me)
2746
2716
{
2747
2717
  lock.Lock();
2748
2718
  int port = ms_addr.get_port();