~ubuntu-branches/ubuntu/quantal/ceph/quantal

« back to all changes in this revision

Viewing changes to src/mon/MonClient.cc

  • Committer: Bazaar Package Importer
  • Author(s): Clint Byrum, Clint Byrum, Micah Gersten
  • Date: 2011-02-12 22:50:26 UTC
  • mfrom: (1.1.1 upstream)
  • Revision ID: james.westby@ubuntu.com-20110212225026-yyyw4tk0msgql3ul
Tags: 0.24.2-0ubuntu1
[ Clint Byrum <clint@ubuntu.com> ]
* New upstream release. (LP: #658670, LP: #684011)
* debian/patches/fix-mkcephfs.patch: dropped (applied upstream)
* Removed .la files from libceph1-dev, libcrush1-dev and 
  librados1-dev (per Debian policy v3.9.1 10.2).
* debian/control: adding pkg-config as a build dependency
* debian/control: depend on libcrypto++-dev instead of libssl-dev
* debian/watch: added watch file

[ Micah Gersten <micahg@ubuntu.com> ]
* debian/control: add Homepage

Show diffs side-by-side

added added

removed removed

Lines of Context:
29
29
#include "auth/KeyRing.h"
30
30
 
31
31
#include "include/str_list.h"
 
32
#include "include/addr_parsing.h"
32
33
 
33
34
#include "config.h"
34
35
 
49
50
  // file?
50
51
  if (g_conf.monmap) {
51
52
    const char *monmap_fn = g_conf.monmap;
52
 
    int r = monmap.read(monmap_fn);
 
53
    int r;
 
54
    try {
 
55
      r = monmap.read(monmap_fn);
 
56
    }
 
57
    catch (const buffer::error &e) {
 
58
      r = -EINVAL;
 
59
    }
53
60
    if (r >= 0)
54
61
      return 0;
55
62
    char buf[80];
56
 
    cerr << "unable to read monmap from " << monmap_fn << ": " << strerror_r(errno, buf, sizeof(buf)) << std::endl;
 
63
    cerr << "unable to read/decode monmap from " << monmap_fn << ": " << strerror_r(-r, buf, sizeof(buf)) << std::endl;
 
64
    return r;
57
65
  }
58
66
 
59
67
  // -m foo?
61
69
    vector<entity_addr_t> addrs;
62
70
    if (parse_ip_port_vec(g_conf.mon_host, addrs)) {
63
71
      for (unsigned i=0; i<addrs.size(); i++) {
64
 
        entity_inst_t inst;
65
 
        inst.name = entity_name_t::MON(i);
66
 
        inst.addr = addrs[i];
67
 
        monmap.add_mon(inst);
 
72
        char n[2];
 
73
        n[0] = 'a' + i;
 
74
        n[1] = 0;
 
75
        if (addrs[i].get_port() == 0)
 
76
          addrs[i].set_port(CEPH_MON_PORT);
 
77
        monmap.add(n, addrs[i]);
68
78
      }
69
79
      return 0;
 
80
    } else { //maybe they passed us a DNS-resolvable name
 
81
      char *hosts = NULL;
 
82
      char *old_addrs = new char[strlen(g_conf.mon_host)+1];
 
83
      strcpy(old_addrs, g_conf.mon_host);
 
84
      hosts = mount_resolve_dest(old_addrs);
 
85
      delete [] old_addrs;
 
86
      bool success = parse_ip_port_vec(hosts, addrs);
 
87
      free(hosts);
 
88
      if (success) {
 
89
        for (unsigned i=0; i<addrs.size(); i++) {
 
90
          char n[2];
 
91
          n[0] = 'a' + i;
 
92
          n[1] = 0;
 
93
          if (addrs[i].get_port() == 0)
 
94
            addrs[i].set_port(CEPH_MON_PORT);
 
95
          monmap.add(n, addrs[i]);
 
96
        }
 
97
        return 0;
 
98
      } else cerr << "couldn't parse_ip_port_vec on " << hosts << std::endl;
70
99
    }
71
100
    cerr << "unable to parse addrs in '" << g_conf.mon_host << "'" << std::endl;
72
101
  }
79
108
    for (list<string>::iterator p = ls.begin(); p != ls.end(); p++) {
80
109
      ConfFile c(p->c_str());
81
110
      if (c.parse()) {
82
 
        static string monstr;
83
 
        for (int i=0; i<15; i++) {
84
 
          char monname[20];
85
 
          char *val = 0;
86
 
          snprintf(monname, sizeof(monname), "mon%d", i);
87
 
          c.read(monname, "mon addr", &val, 0);
88
 
          if (!val || !val[0])
89
 
            break;
90
 
          
91
 
          entity_inst_t inst;
92
 
          if (!inst.addr.parse(val)) {
93
 
            cerr << "unable to parse conf's addr for " << monname << " (" << val << ")" << std::endl;
94
 
            continue;
 
111
        for (list<ConfSection*>::const_iterator q = c.get_section_list().begin();
 
112
             q != c.get_section_list().end();
 
113
             q++) {
 
114
          const char *section = (*q)->get_name().c_str();
 
115
          if (strncmp(section, "mon", 3) == 0) {
 
116
            const char *name = section + 3;
 
117
            if (name[0] == '.')
 
118
              name++;
 
119
            char *val = 0;
 
120
            c.read(section, "mon addr", &val, 0);
 
121
            if (!val || !val[0]) {
 
122
              delete val;
 
123
              continue;
 
124
            }
 
125
            entity_addr_t addr;
 
126
            if (!addr.parse(val)) {
 
127
              cerr << "unable to " << *p << " mon addr for " << section << " (" << val << ")" << std::endl;
 
128
              delete val;
 
129
              continue;
 
130
            }
 
131
            monmap.add(name, addr);
95
132
          }
96
 
          inst.name = entity_name_t::MON(monmap.mon_inst.size());
97
 
          monmap.add_mon(inst);
98
133
        }
99
 
        break;
 
134
        break;
100
135
      }
101
136
    }
102
137
    if (monmap.size())
116
151
  Mutex::Locker l(monc_lock);
117
152
  
118
153
  _sub_want("monmap", 0, 0);
119
 
  if (cur_mon < 0)
 
154
  if (cur_mon.empty())
120
155
    _reopen_session();
121
156
 
122
157
  while (want_monmap)
142
177
  }
143
178
  
144
179
  int attempt = 10;
145
 
  int i = 0;
146
180
  
147
181
  dout(10) << "have " << monmap.epoch << dendl;
148
182
  
149
183
  while (monmap.epoch == 0) {
150
 
    i = rand() % monmap.mon_inst.size();
151
 
    dout(10) << "querying " << monmap.mon_inst[i] << dendl;
152
 
    messenger->send_message(new MMonGetMap, monmap.mon_inst[i]);
 
184
    cur_mon = monmap.pick_random_mon();
 
185
    dout(10) << "querying mon." << cur_mon << " " << monmap.get_inst(cur_mon) << dendl;
 
186
    messenger->send_message(new MMonGetMap, monmap.get_inst(cur_mon));
153
187
    
154
188
    if (--attempt == 0)
155
189
      break;
156
190
    
157
191
    utime_t interval(1, 0);
158
192
    map_cond.WaitInterval(monc_lock, interval);
 
193
 
 
194
    if (monmap.epoch == 0)
 
195
      messenger->mark_down(monmap.get_addr(cur_mon));  // nope, clean that connection up
159
196
  }
160
197
 
161
198
  if (temp_msgr) {
168
205
  }
169
206
 
170
207
  hunting = true;  // reset this to true!
 
208
  cur_mon.clear();
171
209
 
172
210
  if (monmap.epoch)
173
211
    return 0;
200
238
 
201
239
void MonClient::handle_monmap(MMonMap *m)
202
240
{
 
241
  Mutex::Locker lock(monc_lock);
203
242
  dout(10) << "handle_monmap " << *m << dendl;
204
 
  monc_lock.Lock();
 
243
 
 
244
  assert(!cur_mon.empty());
 
245
  entity_addr_t cur_mon_addr = monmap.get_addr(cur_mon);
205
246
 
206
247
  bufferlist::iterator p = m->monmapbl.begin();
207
248
  ::decode(monmap, p);
208
249
 
209
 
  dout(10) << " got monmap " << monmap.epoch << dendl;
 
250
  dout(10) << " got monmap " << monmap.epoch
 
251
           << ", mon." << cur_mon << " is now rank " << monmap.get_rank(cur_mon)
 
252
           << " at " << monmap.get_inst(cur_mon)
 
253
           << dendl;
 
254
  dout(10) << "dump:\n";
 
255
  monmap.print(*_dout);
 
256
  *_dout << dendl;
210
257
 
211
258
  _sub_got("monmap", monmap.get_epoch());
212
259
 
213
260
  map_cond.Signal();
214
261
  want_monmap = false;
215
262
 
216
 
  if (cur_mon >= 0)
 
263
  if (!cur_mon.empty() && monmap.get_rank(cur_mon) < 0) {
 
264
    dout(10) << "mon." << cur_mon << " went away" << dendl;
 
265
    cur_mon.clear();
 
266
  }
 
267
 
 
268
  if (cur_mon.empty())
 
269
    _pick_new_mon();  // can't find the mon we were talking to (above)
 
270
  else
217
271
    _finish_hunting();
218
272
 
219
 
  monc_lock.Unlock();
220
273
  m->put();
221
274
}
222
275
 
225
278
void MonClient::init()
226
279
{
227
280
  dout(10) << "init" << dendl;
 
281
 
228
282
  messenger->add_dispatcher_head(this);
229
283
 
230
284
  entity_name = *g_conf.entity_name;
231
285
  
232
286
  Mutex::Locker l(monc_lock);
 
287
  timer.init();
233
288
  schedule_tick();
234
289
 
235
290
  // seed rng so we choose a different monitor each time
254
309
 
255
310
void MonClient::shutdown()
256
311
{
257
 
  timer.cancel_all_events();
 
312
  monc_lock.Lock();
 
313
  timer.shutdown();
 
314
  monc_lock.Unlock();
258
315
}
259
316
 
260
317
int MonClient::authenticate(double timeout)
267
324
  }
268
325
 
269
326
  _sub_want("monmap", monmap.get_epoch() ? monmap.get_epoch() + 1 : 0, 0);
270
 
  if (cur_mon < 0)
 
327
  if (cur_mon.empty())
271
328
    _reopen_session();
272
329
 
273
330
  utime_t until = g_clock.now();
294
351
 
295
352
void MonClient::handle_auth(MAuthReply *m)
296
353
{
 
354
  Mutex::Locker lock(monc_lock);
 
355
 
297
356
  bufferlist::iterator p = m->result_bl.begin();
298
357
  if (state == MC_STATE_NEGOTIATING) {
299
358
    if (!auth || (int)m->protocol != auth->get_protocol()) {
351
410
 
352
411
void MonClient::_send_mon_message(Message *m, bool force)
353
412
{
354
 
  assert(cur_mon >= 0);
 
413
  assert(monc_lock.is_locked());
 
414
  assert(!cur_mon.empty());
355
415
  if (force || state == MC_STATE_HAVE_SESSION) {
356
 
    messenger->send_message(m, monmap.mon_inst[cur_mon]);
 
416
    dout(10) << "_send_mon_message to mon." << cur_mon << " at " << monmap.get_inst(cur_mon) << dendl;
 
417
    messenger->send_message(m, monmap.get_inst(cur_mon));
357
418
  } else {
358
419
    waiting_for_session.push_back(m);
359
420
  }
361
422
 
362
423
void MonClient::_pick_new_mon()
363
424
{
364
 
  if (cur_mon >= 0)
365
 
    messenger->mark_down(monmap.get_inst(cur_mon).addr);
 
425
  assert(monc_lock.is_locked());
 
426
  if (!cur_mon.empty())
 
427
    messenger->mark_down(monmap.get_addr(cur_mon));
366
428
 
367
 
  if (cur_mon >= 0 && monmap.size() > 1) {
 
429
  if (!cur_mon.empty() && monmap.size() > 1) {
368
430
    // pick a _different_ mon
369
 
    int n = rand() % (monmap.size() - 1);
370
 
    if (n >= cur_mon)
371
 
      n++;
372
 
    cur_mon = n;
 
431
    cur_mon = monmap.pick_random_mon_not(cur_mon);
373
432
  } else {
374
 
    cur_mon = rand() % monmap.size();
 
433
    cur_mon = monmap.pick_random_mon();
375
434
  }
376
 
  dout(10) << "_pick_new_mon picked mon" << cur_mon << dendl;
 
435
  dout(10) << "_pick_new_mon picked mon." << cur_mon << dendl;
377
436
}
378
437
 
 
438
 
379
439
void MonClient::_reopen_session()
380
440
{
 
441
  assert(monc_lock.is_locked());
381
442
  dout(10) << "_reopen_session" << dendl;
382
443
 
383
444
  _pick_new_mon();
410
471
  Mutex::Locker lock(monc_lock);
411
472
 
412
473
  if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON) {
413
 
    if (cur_mon < 0 || con->get_peer_addr() != monmap.get_inst(cur_mon).addr) {
 
474
    if (cur_mon.empty() || con->get_peer_addr() != monmap.get_addr(cur_mon)) {
414
475
      dout(10) << "ms_handle_reset stray mon " << con->get_peer_addr() << dendl;
415
476
      return true;
416
477
    } else {
428
489
 
429
490
void MonClient::_finish_hunting()
430
491
{
 
492
  assert(monc_lock.is_locked());
431
493
  if (hunting) {
432
 
    dout(1) << "found mon" << cur_mon << dendl;
 
494
    dout(1) << "found mon." << cur_mon << dendl; 
433
495
    hunting = false;
434
496
  }
435
497
}
443
505
  if (hunting) {
444
506
    dout(1) << "continuing hunt" << dendl;
445
507
    _reopen_session();
446
 
  } else if (cur_mon >= 0) {
 
508
  } else if (!cur_mon.empty()) {
447
509
    // just renew as needed
448
510
    utime_t now = g_clock.now();
449
511
    if (now > sub_renew_after)
450
512
      _renew_subs();
451
513
 
452
 
    messenger->send_keepalive(monmap.mon_inst[cur_mon]);
 
514
    messenger->send_keepalive(monmap.get_inst(cur_mon));
453
515
  }
454
516
 
455
517
  if (auth)
471
533
 
472
534
void MonClient::_renew_subs()
473
535
{
 
536
  assert(monc_lock.is_locked());
474
537
  if (sub_have.empty()) {
475
538
    dout(10) << "renew_subs - empty" << dendl;
476
539
    return;
477
540
  }
478
541
 
479
542
  dout(10) << "renew_subs" << dendl;
480
 
  if (cur_mon < 0)
 
543
  if (cur_mon.empty())
481
544
    _reopen_session();
482
545
  else {
483
546
    if (sub_renew_sent == utime_t())
491
554
 
492
555
void MonClient::handle_subscribe_ack(MMonSubscribeAck *m)
493
556
{
 
557
  Mutex::Locker lock(monc_lock);
 
558
  
494
559
  _finish_hunting();
495
560
 
496
561
  if (sub_renew_sent != utime_t()) {
507
572
 
508
573
int MonClient::_check_auth_tickets()
509
574
{
 
575
  assert(monc_lock.is_locked());
510
576
  if (state == MC_STATE_HAVE_SESSION && auth) {
511
577
    if (auth->need_tickets()) {
512
578
      dout(10) << "_check_auth_tickets getting new tickets!" << dendl;
523
589
 
524
590
int MonClient::_check_auth_rotating()
525
591
{
 
592
  assert(monc_lock.is_locked());
526
593
  if (!rotating_secrets ||
527
594
      !auth_principal_needs_rotating_keys(entity_name)) {
528
595
    dout(20) << "_check_auth_rotating not needed by " << entity_name << dendl;