36
36
#include "common/Timer.h"
37
#include "common/signal.h"
37
#include "common/errno.h"
38
#include "common/pidfile.h"
39
#include "common/safe_io.h"
39
41
#define DOUT_SUBSYS ms
41
43
#define dout_prefix _prefix(messenger)
42
44
static ostream& _prefix(SimpleMessenger *messenger) {
43
return *_dout << dbeginl << "-- " << messenger->ms_addr << " ";
45
return *_dout << "-- " << messenger->ms_addr << " ";
53
int SimpleMessenger::Accepter::bind(int64_t force_nonce, entity_addr_t &bind_addr, int avoid_port1, int avoid_port2)
55
int SimpleMessenger::Accepter::bind(uint64_t nonce, entity_addr_t &bind_addr, int avoid_port1, int avoid_port2)
55
57
// bind to a socket
56
58
dout(10) << "accepter.bind" << dendl;
67
69
listen_sd = ::socket(family, SOCK_STREAM, 0);
68
70
if (listen_sd < 0) {
70
derr(0) << "accepter.bind unable to create socket: "
72
dout(0) << "accepter.bind unable to create socket: "
71
73
<< strerror_r(errno, buf, sizeof(buf)) << dendl;
72
74
cerr << "accepter.bind unable to create socket: "
73
75
<< strerror_r(errno, buf, sizeof(buf)) << std::endl;
89
91
rc = ::bind(listen_sd, (struct sockaddr *) &listen_addr.ss_addr(), sizeof(listen_addr.ss_addr()));
92
derr(0) << "accepter.bind unable to bind to " << bind_addr.ss_addr()
94
dout(0) << "accepter.bind unable to bind to " << bind_addr.ss_addr()
93
95
<< ": " << strerror_r(errno, buf, sizeof(buf)) << dendl;
94
96
cerr << "accepter.bind unable to bind to " << bind_addr.ss_addr()
95
97
<< ": " << strerror_r(errno, buf, sizeof(buf)) << std::endl;
110
derr(0) << "accepter.bind unable to bind to " << bind_addr.ss_addr()
112
dout(0) << "accepter.bind unable to bind to " << bind_addr.ss_addr()
111
113
<< " on any port in range " << CEPH_PORT_START << "-" << CEPH_PORT_LAST
112
114
<< ": " << strerror_r(errno, buf, sizeof(buf)) << dendl;
113
115
cerr << "accepter.bind unable to bind to " << bind_addr.ss_addr()
128
130
rc = ::listen(listen_sd, 128);
131
derr(0) << "accepter.bind unable to listen on " << bind_addr.ss_addr()
133
dout(0) << "accepter.bind unable to listen on " << bind_addr.ss_addr()
132
134
<< ": " << strerror_r(errno, buf, sizeof(buf)) << dendl;
133
135
cerr << "accepter.bind unable to listen on " << bind_addr.ss_addr()
134
136
<< ": " << strerror_r(errno, buf, sizeof(buf)) << std::endl;
144
146
if (messenger->ms_addr.get_port() == 0) {
145
147
messenger->ms_addr = listen_addr;
146
if (force_nonce >= 0)
147
messenger->ms_addr.nonce = force_nonce;
149
messenger->ms_addr.nonce = getpid(); // FIXME: pid might not be best choice here.
148
messenger->ms_addr.nonce = nonce;
152
151
messenger->init_local_pipe();
467
466
<< " --> " << dest.name << " " << dest.addr
469
468
<< " -- ?+" << m->get_data().length()
473
472
submit_message(m, dest.addr, dest.name.type(), true);
490
489
#undef dout_prefix
491
490
#define dout_prefix _pipe_prefix()
492
491
ostream& SimpleMessenger::Pipe::_pipe_prefix() {
493
return *_dout << dbeginl
494
<< "-- " << messenger->ms_addr << " >> " << peer_addr << " pipe(" << this
492
return *_dout << "-- " << messenger->ms_addr << " >> " << peer_addr << " pipe(" << this
496
494
<< " pgs=" << peer_global_seq
497
495
<< " cs=" << connect_seq
533
531
assert(pipe_lock.is_locked());
535
533
list<Message *>& queue = in_q[priority];
538
535
if (halt_delivery)
541
was_empty = queue.empty();
544
//increment queue length counters
546
messenger->dispatch_queue.qlen_lock.lock();
547
++messenger->dispatch_queue.qlen;
548
messenger->dispatch_queue.qlen_lock.unlock();
550
if (was_empty) { //this pipe isn't on the endpoint queue
539
// queue pipe AND message under pipe AND dispatch_queue locks.
551
540
pipe_lock.Unlock();
552
541
messenger->dispatch_queue.lock.Lock();
553
542
pipe_lock.Lock();
555
544
if (halt_delivery) {
556
545
messenger->dispatch_queue.lock.Unlock();
560
dout(20) << "queue_received queuing pipe" << dendl;
561
if (!queue_items.count(priority))
562
queue_items[priority] = new xlist<Pipe *>::item(this);
563
if (messenger->dispatch_queue.queued_pipes.empty())
564
messenger->dispatch_queue.cond.Signal();
565
messenger->dispatch_queue.queued_pipes[priority].push_back(queue_items[priority]);
550
dout(20) << "queue_received queuing pipe" << dendl;
551
if (!queue_items.count(priority))
552
queue_items[priority] = new xlist<Pipe *>::item(this);
553
if (messenger->dispatch_queue.queued_pipes.empty())
554
messenger->dispatch_queue.cond.Signal();
555
messenger->dispatch_queue.queued_pipes[priority].push_back(queue_items[priority]);
566
560
messenger->dispatch_queue.lock.Unlock();
562
// just queue message under pipe lock.
566
// increment queue length counters
568
messenger->dispatch_queue.qlen_lock.lock();
569
++messenger->dispatch_queue.qlen;
570
messenger->dispatch_queue.qlen_lock.unlock();
756
760
if (existing->policy.lossy) {
757
dout(-10) << "accept replacing existing (lossy) channel (new one lossy="
758
<< policy.lossy << ")" << dendl;
761
dout(0) << "accept replacing existing (lossy) channel (new one lossy="
762
<< policy.lossy << ")" << dendl;
759
763
existing->was_session_reset();
762
766
/*if (lossy_rx) {
763
767
if (existing->state == STATE_STANDBY) {
764
dout(-10) << "accept incoming lossy connection, kicking outgoing lossless "
768
dout(0) << "accept incoming lossy connection, kicking outgoing lossless "
765
769
<< existing << dendl;
766
770
existing->state = STATE_CONNECTING;
767
771
existing->cond.Signal();
769
dout(-10) << "accept incoming lossy connection, our lossless " << existing
773
dout(0) << "accept incoming lossy connection, our lossless " << existing
770
774
<< " has state " << existing->state << ", doing nothing" << dendl;
772
776
existing->lock.Unlock();
776
dout(-10) << "accept connect_seq " << connect.connect_seq
780
dout(0) << "accept connect_seq " << connect.connect_seq
777
781
<< " vs existing " << existing->connect_seq
778
782
<< " state " << existing->state << dendl;
780
784
if (connect.connect_seq < existing->connect_seq) {
781
785
if (connect.connect_seq == 0) {
782
dout(-10) << "accept peer reset, then tried to connect to us, replacing" << dendl;
786
dout(0) << "accept peer reset, then tried to connect to us, replacing" << dendl;
783
787
existing->was_session_reset(); // this resets out_queue, msg_ and connect_seq #'s
999
1003
sd = ::socket(peer_addr.get_family(), SOCK_STREAM, 0);
1002
dout(-1) << "connect couldn't created socket " << strerror_r(errno, buf, sizeof(buf)) << dendl;
1006
derr << "connect couldn't created socket " << strerror_r(errno, buf, sizeof(buf)) << dendl;
1587
1591
// occasionally pull a message out of the sent queue to send elsewhere. in that case
1588
1592
// it doesn't matter if we "got" it or not.
1589
1593
if (m->get_seq() <= in_seq) {
1590
dout(-10) << "reader got old message "
1591
<< m->get_seq() << " <= " << in_seq << " " << m << " " << *m
1592
<< ", discarding" << dendl;
1594
dout(0) << "reader got old message "
1595
<< m->get_seq() << " <= " << in_seq << " " << m << " " << *m
1596
<< ", discarding" << dendl;
1593
1597
messenger->dispatch_throttle_release(m->get_dispatch_throttle_size());
1665
1669
char tag = CEPH_MSGR_TAG_CLOSE;
1666
1670
state = STATE_CLOSED;
1667
1671
pipe_lock.Unlock();
1668
if (sd) ::write(sd, &tag, 1);
1673
int r = ::write(sd, &tag, 1);
1674
// we can ignore r, actually; we don't care if this succeeds.
1675
r++; r = 0; // placate gcc
1669
1677
pipe_lock.Lock();
1725
1733
pipe_lock.Lock();
1727
derr(1) << "writer error sending " << m << ", "
1735
dout(1) << "writer error sending " << m << ", "
1728
1736
<< errno << ": " << strerror_r(errno, buf, sizeof(buf)) << dendl;
1980
1988
assert(l == len);
1983
int r = ::sendmsg(sd, msg, more ? MSG_MORE : 0);
1991
int r = ::sendmsg(sd, msg, MSG_NOSIGNAL | (more ? MSG_MORE : 0));
1985
1993
dout(10) << "do_sendmsg hmm do_sendmsg got r==0!" << dendl;
2031
2039
if (len == 0) break;
2033
2041
// hrmph. trim r bytes off the front of our message.
2034
dout(20) << "do_sendmail short write did " << r << ", still have " << len << dendl;
2042
dout(20) << "do_sendmsg short write did " << r << ", still have " << len << dendl;
2035
2043
while (r > 0) {
2036
2044
if (msg->msg_iov[0].iov_len <= (size_t)r) {
2037
2045
// lose this whole item
2309
2317
return accepter.rebind(avoid_port);
2312
static void remove_pid_file(int signal = 0)
2314
if (!g_conf.pid_file)
2317
// only remove it if it has OUR pid in it!
2318
int fd = ::open(g_conf.pid_file, O_RDONLY);
2321
::read(fd, buf, 20);
2326
::unlink(g_conf.pid_file);
2328
generic_dout(0) << "strange, pid file " << g_conf.pid_file
2329
<< " has " << a << ", not expected " << getpid()
2334
static void handle_signal(int sig)
2336
remove_pid_file(sig);
2337
signal(sig, SIG_DFL);
2338
kill(getpid(), sig);
2341
static void write_pid_file(int pid)
2343
if (!g_conf.pid_file)
2346
int fd = ::open(g_conf.pid_file, O_CREAT|O_TRUNC|O_WRONLY, 0644);
2349
int len = snprintf(buf, sizeof(buf), "%d\n", pid);
2350
::write(fd, buf, len);
2353
signal(SIGTERM, handle_signal);
2354
signal(SIGINT, handle_signal);
2358
int SimpleMessenger::start(bool nodaemon)
2320
int SimpleMessenger::start(bool daemonize, uint64_t nonce)
2360
2322
// register at least one entity, first!
2361
2323
assert(my_type >= 0);
2371
ms_addr.nonce = getpid();
2332
ms_addr.nonce = nonce;
2373
2335
dout(1) << "messenger.start" << dendl;
2374
2336
started = true;
2378
if (g_conf.daemonize && !nodaemon) {
2379
if (Thread::get_num_threads() > 0) {
2380
derr(0) << "messenger.start BUG: there are " << Thread::get_num_threads()
2381
<< " already started that will now die! call messenger.start() sooner."
2341
int num_threads = Thread::get_num_threads();
2342
if (num_threads > 1) {
2343
derr << "messenger.start BUG: there are " << num_threads - 1
2344
<< " child threads already started that will now die! call messenger.start() sooner."
2384
dout(1) << "messenger.start daemonizing" << dendl;
2388
install_standard_sighandlers();
2389
write_pid_file(getpid());
2394
write_pid_file(pid);
2348
int r = daemon(1, 0);
2350
install_standard_sighandlers();
2351
pidfile_write(&g_conf);
2353
if (!g_conf.chdir.empty()) {
2354
if (::chdir(g_conf.chdir.c_str())) {
2356
derr << "messenger.start: failed to chdir to directory: '"
2357
<< g_conf.chdir << "': " << cpp_strerror(err) << dendl;
2400
install_standard_sighandlers();
2403
if (g_conf.chdir && g_conf.chdir[0]) {
2404
::mkdir(g_conf.chdir, 0700);
2405
::chdir(g_conf.chdir);
2408
dout_rename_output_file();
2360
dout_handle_daemonize();
2361
dout(1) << "messenger.start daemonized" << dendl;
2570
2523
dout(20) << "submit_message " << *m << " local" << dendl;
2571
2524
dispatch_queue.local_delivery(m, m->get_priority());
2573
derr(0) << "submit_message " << *m << " " << dest_addr << " local but no local endpoint, dropping." << dendl;
2526
dout(0) << "submit_message " << *m << " " << dest_addr << " local but no local endpoint, dropping." << dendl;
2574
2527
assert(0); // hmpf, this is probably mds->mon beacon from newsyn.
2745
void SimpleMessenger::learned_addr(entity_addr_t peer_addr_for_me)
2698
void SimpleMessenger::mark_down(Connection *con)
2701
Pipe *p = (Pipe *)con->get_pipe();
2703
dout(1) << "mark_down " << con << " -- " << p << dendl;
2704
p->unregister_pipe();
2705
p->pipe_lock.Lock();
2707
p->pipe_lock.Unlock();
2710
dout(1) << "mark_down " << con << " -- pipe dne" << dendl;
2715
void SimpleMessenger::learned_addr(const entity_addr_t &peer_addr_for_me)
2748
2718
int port = ms_addr.get_port();