54
54
// default feature(s) everyone gets
55
#define MSGR_FEATURES_SUPPORTED CEPH_FEATURE_NOSRCADDR|CEPH_FEATURE_SUBSCRIBE2
55
#define MSGR_FEATURES_SUPPORTED \
56
CEPH_FEATURE_NOSRCADDR | \
57
CEPH_FEATURE_SUBSCRIBE2 | \
58
CEPH_FEATURE_MONNAMES | \
59
CEPH_FEATURE_FLOCK | \
60
CEPH_FEATURE_RECONNECT_SEQ | \
61
CEPH_FEATURE_DIRLAYOUTHASH
57
63
class SimpleMessenger : public Messenger {
100
int bind(int64_t force_nonce);
106
int bind(int64_t force_nonce, entity_addr_t &bind_addr, int avoid_port1=0, int avoid_port2=0);
107
int rebind(int avoid_port);
139
146
map<int, list<Message*> > out_q; // priority queue for outbound msgs
140
147
map<int, list<Message*> > in_q; // and inbound ones
142
map<int, xlist<Pipe *>::item* > queue_items; // _map_ protected by pipe_lock, *item protected by q.lock
149
map<int, xlist<Pipe *>::item* > queue_items; // protected by pipe_lock AND q.lock
143
150
list<Message*> sent;
153
bool halt_delivery; //if a pipe's queue is destroyed, stop adding to it
147
155
__u32 connect_seq, peer_global_seq;
148
156
uint64_t out_seq;
166
174
void was_session_reset();
176
/* Clean up sent list */
177
void handle_ack(uint64_t seq) {
178
dout(15) << "reader got ack seq " << seq << dendl;
180
while (!sent.empty() &&
181
sent.front()->get_seq() <= seq) {
182
Message *m = sent.front();
184
dout(10) << "reader got ack seq "
185
<< seq << " >= " << m->get_seq() << " on " << m << " " << *m << dendl;
169
191
class Reader : public Thread {
186
208
Pipe(SimpleMessenger *r, int st) :
188
sd(-1), peer_type(-1),
189
212
pipe_lock("SimpleMessenger::Pipe::pipe_lock"),
191
214
connection_state(new Connection),
192
215
reader_running(false), reader_joining(false), writer_running(false),
193
in_qlen(0), keepalive(false),
216
in_qlen(0), keepalive(false), halt_delivery(false),
194
217
connect_seq(0), peer_global_seq(0),
195
218
out_seq(0), in_seq(0), in_seq_acked(0),
196
219
reader_thread(this), writer_thread(this) {
197
220
connection_state->pipe = get();
221
messenger->timeout = g_conf.ms_tcp_read_timeout * 1000; //convert to ms
222
if (messenger->timeout == 0)
223
messenger->timeout = -1;
200
226
for (map<int, xlist<Pipe *>::item* >::iterator i = queue_items.begin();
201
227
i != queue_items.end();
203
if (i->second->is_on_list())
204
i->second->remove_myself();
229
assert(!i->second->is_on_list());
205
230
delete i->second;
207
232
assert(out_q.empty());
208
233
assert(sent.empty());
209
connection_state->put();
234
if (connection_state)
235
connection_state->put();
239
265
static const Pipe& Server(int s);
240
266
static const Pipe& Client(const entity_addr_t& pi);
242
//callers make sure it's not already enqueued or you'll just
243
//push it to the back of the line!
244
//Also, call with pipe_lock held or bad things happen
245
void enqueue_me(int priority) {
246
if (!queue_items.count(priority))
247
queue_items[priority] = new xlist<Pipe *>::item(this);
249
messenger->dispatch_queue.lock.Lock();
250
if (messenger->dispatch_queue.queued_pipes.empty())
251
messenger->dispatch_queue.cond.Signal();
252
messenger->dispatch_queue.queued_pipes[priority].push_back(queue_items[priority]);
253
messenger->dispatch_queue.lock.Unlock();
257
268
//we have two queue_received's to allow local signal delivery
258
269
// via Message * (that doesn't actually point to a Message)
259
//Don't call while holding pipe_lock!
260
void queue_received(Message *m, int priority) {
261
list<Message *>& queue = in_q[priority];
264
bool was_empty = queue.empty();
266
if (was_empty) //this pipe isn't on the endpoint queue
267
enqueue_me(priority);
269
//increment queue length counters
271
messenger->dispatch_queue.qlen_lock.lock();
272
++messenger->dispatch_queue.qlen;
273
messenger->dispatch_queue.qlen_lock.unlock();
270
void queue_received(Message *m, int priority);
278
272
void queue_received(Message *m) {
279
273
m->set_recv_stamp(g_clock.now());
341
/* Remove all messages from the sent queue. Add those with seq > max_acked
342
* to the highest priority outgoing queue. */
343
void requeue_sent(uint64_t max_acked=0);
348
344
void discard_queue();
351
if (sd >= 0) ::close(sd);
346
void shutdown_socket() {
348
::shutdown(sd, SHUT_RDWR);
371
368
Pipe *local_pipe;
372
369
void local_delivery(Message *m, int priority) {
370
local_pipe->pipe_lock.Lock();
373
371
if ((unsigned long)m > 10)
374
372
m->set_connection(local_pipe->connection_state->get());
375
373
local_pipe->queue_received(m, priority);
374
local_pipe->pipe_lock.Unlock();
378
377
int get_queue_len() {
385
void local_delivery(Message *m) {
386
if ((unsigned long)m > 10)
387
m->set_connection(local_pipe->connection_state->get());
388
local_pipe->queue_received(m);
391
384
void queue_connect(Connection *con) {
393
386
connect_q.push_back(con);
519
513
void prepare_dest(const entity_inst_t& inst);
520
514
int send_message(Message *m, const entity_inst_t& dest);
521
515
int send_message(Message *m, Connection *con);
516
Connection *get_connection(const entity_inst_t& dest);
522
517
int lazy_send_message(Message *m, const entity_inst_t& dest);
523
518
int lazy_send_message(Message *m, Connection *con) {
524
519
return send_message(m, con);
563
559
//void set_listen_addr(tcpaddr_t& a);
565
int bind(int64_t force_nonce = -1);
561
int bind(entity_addr_t& bind_addr, int64_t force_nonce = -1);
562
int bind(int64_t force_nonce = -1) { return bind(g_conf.public_addr, force_nonce); }
566
563
int start(bool nodaemon = false);
566
int rebind(int avoid_port);
569
568
__u32 get_global_seq(__u32 old=0) {
570
569
Mutex::Locker l(global_seq_lock);
571
570
if (old > global_seq)