~ubuntu-branches/ubuntu/quantal/ceph/quantal

« back to all changes in this revision

Viewing changes to src/msg/SimpleMessenger.h

  • Committer: Bazaar Package Importer
  • Author(s): Clint Byrum, Clint Byrum, Micah Gersten
  • Date: 2011-02-12 22:50:26 UTC
  • mfrom: (1.1.1 upstream)
  • Revision ID: james.westby@ubuntu.com-20110212225026-yyyw4tk0msgql3ul
Tags: 0.24.2-0ubuntu1
[ Clint Byrum <clint@ubuntu.com> ]
* New upstream release. (LP: #658670, LP: #684011)
* debian/patches/fix-mkcephfs.patch: dropped (applied upstream)
* Removed .la files from libceph1-dev, libcrush1-dev and 
  librados1-dev (per Debian policy v3.9.1 10.2).
* debian/control: adding pkg-config as a build dependency
* debian/control: depend on libcrypto++-dev instead of libssl-dev
* debian/watch: added watch file

[ Micah Gersten <micahg@ubuntu.com> ]
* debian/control: add Homepage

Show diffs side-by-side

added added

removed removed

Lines of Context:
52
52
 */
53
53
 
54
54
// default feature(s) everyone gets
55
 
#define MSGR_FEATURES_SUPPORTED  CEPH_FEATURE_NOSRCADDR|CEPH_FEATURE_SUBSCRIBE2
 
55
#define MSGR_FEATURES_SUPPORTED  \
 
56
  CEPH_FEATURE_NOSRCADDR |       \
 
57
  CEPH_FEATURE_SUBSCRIBE2 |      \
 
58
  CEPH_FEATURE_MONNAMES |        \
 
59
  CEPH_FEATURE_FLOCK |           \
 
60
  CEPH_FEATURE_RECONNECT_SEQ |   \
 
61
  CEPH_FEATURE_DIRLAYOUTHASH
56
62
 
57
63
class SimpleMessenger : public Messenger {
58
64
public:
97
103
    
98
104
    void *entry();
99
105
    void stop();
100
 
    int bind(int64_t force_nonce);
 
106
    int bind(int64_t force_nonce, entity_addr_t &bind_addr, int avoid_port1=0, int avoid_port2=0);
 
107
    int rebind(int avoid_port);
101
108
    int start();
102
109
  } accepter;
103
110
 
139
146
    map<int, list<Message*> > out_q;  // priority queue for outbound msgs
140
147
    map<int, list<Message*> > in_q; // and inbound ones
141
148
    int in_qlen;
142
 
    map<int, xlist<Pipe *>::item* > queue_items; // _map_ protected by pipe_lock, *item protected by q.lock
 
149
    map<int, xlist<Pipe *>::item* > queue_items; // protected by pipe_lock AND q.lock
143
150
    list<Message*> sent;
144
151
    Cond cond;
145
152
    bool keepalive;
 
153
    bool halt_delivery; //if a pipe's queue is destroyed, stop adding to it
146
154
    
147
155
    __u32 connect_seq, peer_global_seq;
148
156
    uint64_t out_seq;
165
173
 
166
174
    void was_session_reset();
167
175
 
 
176
    /* Clean up sent list */
 
177
    void handle_ack(uint64_t seq) {
 
178
      dout(15) << "reader got ack seq " << seq << dendl;
 
179
      // trim sent list
 
180
      while (!sent.empty() &&
 
181
          sent.front()->get_seq() <= seq) {
 
182
        Message *m = sent.front();
 
183
        sent.pop_front();
 
184
        dout(10) << "reader got ack seq "
 
185
            << seq << " >= " << m->get_seq() << " on " << m << " " << *m << dendl;
 
186
        m->put();
 
187
      }
 
188
    }
 
189
 
168
190
    // threads
169
191
    class Reader : public Thread {
170
192
      Pipe *pipe;
185
207
  public:
186
208
    Pipe(SimpleMessenger *r, int st) : 
187
209
      messenger(r),
188
 
      sd(-1), peer_type(-1),
 
210
      sd(-1),
 
211
      peer_type(-1),
189
212
      pipe_lock("SimpleMessenger::Pipe::pipe_lock"),
190
213
      state(st), 
191
214
      connection_state(new Connection),
192
215
      reader_running(false), reader_joining(false), writer_running(false),
193
 
      in_qlen(0), keepalive(false),
 
216
      in_qlen(0), keepalive(false), halt_delivery(false),
194
217
      connect_seq(0), peer_global_seq(0),
195
218
      out_seq(0), in_seq(0), in_seq_acked(0),
196
219
      reader_thread(this), writer_thread(this) {
197
220
      connection_state->pipe = get();
 
221
      messenger->timeout = g_conf.ms_tcp_read_timeout * 1000; //convert to ms
 
222
      if (messenger->timeout == 0)
 
223
        messenger->timeout = -1;
198
224
    }
199
225
    ~Pipe() {
200
226
      for (map<int, xlist<Pipe *>::item* >::iterator i = queue_items.begin();
201
227
           i != queue_items.end();
202
228
           ++i) {
203
 
        if (i->second->is_on_list())
204
 
          i->second->remove_myself();
 
229
        assert(!i->second->is_on_list());
205
230
        delete i->second;
206
231
      }
207
232
      assert(out_q.empty());
208
233
      assert(sent.empty());
209
 
      connection_state->put();
 
234
      if (connection_state)
 
235
        connection_state->put();
210
236
    }
211
237
 
212
238
 
239
265
    static const Pipe& Server(int s);
240
266
    static const Pipe& Client(const entity_addr_t& pi);
241
267
 
242
 
    //callers make sure it's not already enqueued or you'll just
243
 
    //push it to the back of the line!
244
 
    //Also, call with pipe_lock held or bad things happen
245
 
    void enqueue_me(int priority) {
246
 
      if (!queue_items.count(priority))
247
 
        queue_items[priority] = new xlist<Pipe *>::item(this);
248
 
      pipe_lock.Unlock();
249
 
      messenger->dispatch_queue.lock.Lock();
250
 
      if (messenger->dispatch_queue.queued_pipes.empty())
251
 
        messenger->dispatch_queue.cond.Signal();
252
 
      messenger->dispatch_queue.queued_pipes[priority].push_back(queue_items[priority]);
253
 
      messenger->dispatch_queue.lock.Unlock();
254
 
      pipe_lock.Lock();
255
 
    }
256
 
 
257
268
    //we have two queue_received's to allow local signal delivery
258
269
    // via Message * (that doesn't actually point to a Message)
259
 
    //Don't call while holding pipe_lock!
260
 
    void queue_received(Message *m, int priority) {
261
 
      list<Message *>& queue = in_q[priority];
262
 
 
263
 
      pipe_lock.Lock();
264
 
      bool was_empty = queue.empty();
265
 
      queue.push_back(m);
266
 
      if (was_empty) //this pipe isn't on the endpoint queue
267
 
        enqueue_me(priority);
268
 
 
269
 
      //increment queue length counters
270
 
      in_qlen++;
271
 
      messenger->dispatch_queue.qlen_lock.lock();
272
 
      ++messenger->dispatch_queue.qlen;
273
 
      messenger->dispatch_queue.qlen_lock.unlock();
274
 
 
275
 
      pipe_lock.Unlock();
276
 
    }
 
270
    void queue_received(Message *m, int priority);
277
271
    
278
272
    void queue_received(Message *m) {
279
273
      m->set_recv_stamp(g_clock.now());
344
338
      return m;
345
339
    }
346
340
 
347
 
    void requeue_sent();
 
341
    /* Remove all messages from the sent queue. Add those with seq > max_acked
 
342
     * to the highest priority outgoing queue. */
 
343
    void requeue_sent(uint64_t max_acked=0);
348
344
    void discard_queue();
349
345
 
350
 
    void force_close() {
351
 
      if (sd >= 0) ::close(sd);
 
346
    void shutdown_socket() {
 
347
      if (sd >= 0)
 
348
        ::shutdown(sd, SHUT_RDWR);
352
349
    }
353
350
  };
354
351
 
370
367
 
371
368
    Pipe *local_pipe;
372
369
    void local_delivery(Message *m, int priority) {
 
370
      local_pipe->pipe_lock.Lock();
373
371
      if ((unsigned long)m > 10)
374
372
        m->set_connection(local_pipe->connection_state->get());
375
373
      local_pipe->queue_received(m, priority);
 
374
      local_pipe->pipe_lock.Unlock();
376
375
    }
377
376
 
378
377
    int get_queue_len() {
382
381
      return l;
383
382
    }
384
383
    
385
 
    void local_delivery(Message *m) {
386
 
      if ((unsigned long)m > 10)
387
 
        m->set_connection(local_pipe->connection_state->get());
388
 
      local_pipe->queue_received(m);
389
 
    }
390
 
 
391
384
    void queue_connect(Connection *con) {
392
385
      lock.Lock();
393
386
      connect_q.push_back(con);
478
471
  const entity_addr_t &get_ms_addr() { return ms_addr; }
479
472
 
480
473
  void mark_down(const entity_addr_t& addr);
 
474
  void mark_down_all();
481
475
 
482
476
  // reaper
483
477
  class ReaperThread : public Thread {
519
513
  void prepare_dest(const entity_inst_t& inst);
520
514
  int send_message(Message *m, const entity_inst_t& dest);
521
515
  int send_message(Message *m, Connection *con);
 
516
  Connection *get_connection(const entity_inst_t& dest);
522
517
  int lazy_send_message(Message *m, const entity_inst_t& dest);
523
518
  int lazy_send_message(Message *m, Connection *con) {
524
519
    return send_message(m, con);
542
537
  void dispatch_entry();
543
538
 
544
539
  SimpleMessenger *messenger; //hack to make dout macro work, will fix
 
540
  int timeout;
545
541
 
546
542
public:
547
543
  SimpleMessenger() :
562
558
 
563
559
  //void set_listen_addr(tcpaddr_t& a);
564
560
 
565
 
  int bind(int64_t force_nonce = -1);
 
561
  int bind(entity_addr_t& bind_addr, int64_t force_nonce = -1);
 
562
  int bind(int64_t force_nonce = -1) { return bind(g_conf.public_addr, force_nonce); }
566
563
  int start(bool nodaemon = false);
567
564
  void wait();
568
565
 
 
566
  int rebind(int avoid_port);
 
567
 
569
568
  __u32 get_global_seq(__u32 old=0) {
570
569
    Mutex::Locker l(global_seq_lock);
571
570
    if (old > global_seq)