~ubuntu-branches/ubuntu/precise/ceph/precise

« back to all changes in this revision

Viewing changes to src/osdc/Objecter.cc

  • Committer: Bazaar Package Importer
  • Author(s): Clint Byrum, Clint Byrum, Micah Gersten
  • Date: 2011-02-12 22:50:26 UTC
  • mfrom: (1.1.1 upstream)
  • Revision ID: james.westby@ubuntu.com-20110212225026-yyyw4tk0msgql3ul
Tags: 0.24.2-0ubuntu1
[ Clint Byrum <clint@ubuntu.com> ]
* New upstream release. (LP: #658670, LP: #684011)
* debian/patches/fix-mkcephfs.patch: dropped (applied upstream)
* Removed .la files from libceph1-dev, libcrush1-dev and 
  librados1-dev (per Debian policy v3.9.1 10.2).
* debian/control: adding pkg-config as a build dependency
* debian/control: depend on libcrypto++-dev instead of libssl-dev
* debian/watch: added watch file

[ Micah Gersten <micahg@ubuntu.com> ]
* debian/control: add Homepage

Show diffs side-by-side

added added

removed removed

Lines of Context:
49
49
 
50
50
void Objecter::init()
51
51
{
52
 
  assert(client_lock.is_locked());  // otherwise event cancellation is unsafe
 
52
  assert(client_lock.is_locked());
53
53
  timer.add_event_after(g_conf.objecter_tick_interval, new C_Tick(this));
54
54
  maybe_request_map();
55
55
}
56
56
 
57
57
void Objecter::shutdown() 
58
58
{
59
 
  assert(client_lock.is_locked());  // otherwise event cancellation is unsafe
60
 
  timer.cancel_all();
61
59
}
62
60
 
63
61
 
121
119
 
122
120
        bool was_pauserd = osdmap->test_flag(CEPH_OSDMAP_PAUSERD);
123
121
        bool was_pausewr = osdmap->test_flag(CEPH_OSDMAP_PAUSEWR);
124
 
 
125
 
        if (was_pauserd || was_pausewr)
126
 
          maybe_request_map();
127
 
    
 
122
        bool was_full = osdmap->test_flag(CEPH_OSDMAP_FULL);
 
123
  
128
124
        if (m->incremental_maps.count(e)) {
129
125
          dout(3) << "handle_osd_map decoding incremental epoch " << e << dendl;
130
126
          OSDMap::Incremental inc(m->incremental_maps[e]);
143
139
        }
144
140
        else {
145
141
          dout(3) << "handle_osd_map requesting missing epoch " << osdmap->get_epoch()+1 << dendl;
146
 
          monc->sub_want("osdmap", osdmap->get_epoch() + 1, CEPH_SUBSCRIBE_ONETIME);
147
 
          monc->renew_subs();
 
142
          maybe_request_map();
148
143
          break;
149
144
        }
 
145
 
 
146
        if (was_pauserd || was_pausewr || was_full)
 
147
          maybe_request_map();
150
148
        
151
149
        // scan pgs for changes
152
150
        scan_pgs(changed_pgs);
153
151
 
154
152
        // kick paused
155
153
        if ((was_pauserd && !osdmap->test_flag(CEPH_OSDMAP_PAUSERD)) ||
156
 
            (was_pausewr && !osdmap->test_flag(CEPH_OSDMAP_PAUSEWR))) {
 
154
            (was_pausewr && !osdmap->test_flag(CEPH_OSDMAP_PAUSEWR)) ||
 
155
            (was_full && !osdmap->test_flag(CEPH_OSDMAP_FULL))) {
157
156
          for (hash_map<tid_t,Op*>::iterator p = op_osd.begin();
158
157
               p != op_osd.end();
159
158
               p++) {
209
208
  monc->sub_got("osdmap", osdmap->get_epoch());
210
209
}
211
210
 
 
211
void Objecter::wait_for_osd_map()
 
212
{
 
213
  if (osdmap->get_epoch()) return;
 
214
  Mutex lock("");
 
215
  Cond cond;
 
216
  bool done;
 
217
  lock.Lock();
 
218
  C_SafeCond *context = new C_SafeCond(&lock, &cond, &done, NULL);
 
219
  waiting_for_map[0].push_back(pair<Context*, int>(context, 0));
 
220
  while (!done)
 
221
    cond.Wait(lock);
 
222
  lock.Unlock();
 
223
}
 
224
 
212
225
 
213
226
void Objecter::maybe_request_map()
214
227
{
215
 
  dout(10) << "maybe_request_map subscribing (onetime) to next osd map" << dendl;
216
 
  if (monc->sub_want("osdmap", osdmap->get_epoch() ? osdmap->get_epoch()+1 : 0, CEPH_SUBSCRIBE_ONETIME))
 
228
  int flag = 0;
 
229
  if (osdmap->test_flag(CEPH_OSDMAP_FULL)) {
 
230
    dout(10) << "maybe_request_map subscribing (continuous) to next osd map (FULL flag is set)" << dendl;
 
231
  } else {
 
232
    dout(10) << "maybe_request_map subscribing (onetime) to next osd map" << dendl;
 
233
    flag = CEPH_SUBSCRIBE_ONETIME;
 
234
  }
 
235
  if (monc->sub_want("osdmap", osdmap->get_epoch() ? osdmap->get_epoch()+1 : 0, flag))
217
236
    monc->renew_subs();
218
237
}
219
238
 
302
321
      hash_map<tid_t, Op*>::iterator p = op_osd.find(tid);
303
322
      if (p != op_osd.end()) {
304
323
        Op *op = p->second;
 
324
        put_op_budget(op);
305
325
        op_osd.erase(p);
306
326
 
307
327
        if (op->onack)
387
407
 
388
408
tid_t Objecter::op_submit(Op *op)
389
409
{
 
410
 
 
411
  // throttle.  before we look at any state, because
 
412
  // take_op_budget() may drop our lock while it blocks.
 
413
  take_op_budget(op);
 
414
 
 
415
  if (op->oid.name.length())
 
416
    op->pgid = osdmap->object_locator_to_pg(op->oid, op->oloc);
 
417
 
390
418
  // find
391
 
  PG &pg = get_pg( pg_t(op->layout.ol_pgid) );
 
419
  PG &pg = get_pg(op->pgid);
392
420
    
393
421
  // pick tid
394
422
  if (!op->tid)
415
443
 
416
444
  // send?
417
445
  dout(10) << "op_submit oid " << op->oid
 
446
           << " " << op->oloc 
418
447
           << " " << op->ops << " tid " << op->tid
419
 
           << " " << op->layout 
420
448
           << " osd" << pg.primary()
421
449
           << dendl;
422
450
 
 
451
  assert(op->flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE));
 
452
 
423
453
  if ((op->flags & CEPH_OSD_FLAG_WRITE) &&
424
454
      osdmap->test_flag(CEPH_OSDMAP_PAUSEWR)) {
425
455
    dout(10) << " paused modify " << op << " tid " << last_tid << dendl;
430
460
    dout(10) << " paused read " << op << " tid " << last_tid << dendl;
431
461
    op->paused = true;
432
462
    maybe_request_map();
 
463
 } else if ((op->flags & CEPH_OSD_FLAG_WRITE) &&
 
464
            osdmap->test_flag(CEPH_OSDMAP_FULL)) {
 
465
    dout(10) << " FULL, paused modify " << op << " tid " << last_tid << dendl;
 
466
    op->paused = true;
 
467
    maybe_request_map();
433
468
  } else if (pg.primary() >= 0) {
434
469
    int flags = op->flags;
435
470
    if (op->oncommit)
437
472
    if (op->onack)
438
473
      flags |= CEPH_OSD_FLAG_ACK;
439
474
 
 
475
    if (op->con) {
 
476
      if (op->outbl && op->outbl->length()) {
 
477
        dout(20) << " revoking rx buffer for " << op->tid << " on " << op->con << dendl;
 
478
        op->con->revoke_rx_buffer(op->tid);
 
479
      }
 
480
      op->con->put();
 
481
    }
 
482
    op->con = messenger->get_connection(osdmap->get_inst(pg.primary()));
 
483
    assert(op->con);
 
484
    if (op->outbl && op->outbl->length()) {
 
485
      dout(20) << " posting rx buffer for " << op->tid << " on " << op->con << dendl;
 
486
      op->con->post_rx_buffer(op->tid, *op->outbl);
 
487
    }
 
488
 
 
489
    ceph_object_layout ol;
 
490
    ol.ol_pgid = op->pgid.v;
 
491
    ol.ol_stripe_unit = 0;
 
492
 
440
493
    MOSDOp *m = new MOSDOp(client_inc, op->tid,
441
 
                           op->oid, op->layout, osdmap->get_epoch(),
 
494
                           op->oid, ol, osdmap->get_epoch(),
442
495
                           flags);
443
496
 
444
497
    m->set_snapid(op->snapid);
455
508
    if (op->priority)
456
509
      m->set_priority(op->priority);
457
510
 
458
 
    messenger->send_message(m, osdmap->get_inst(pg.primary()));
 
511
    messenger->send_message(m, op->con);
459
512
  } else 
460
513
    maybe_request_map();
461
514
  
464
517
  return op->tid;
465
518
}
466
519
 
 
520
int Objecter::calc_op_budget(Op *op)
 
521
{
 
522
  int op_budget = 0;
 
523
  for (vector<OSDOp>::iterator i = op->ops.begin();
 
524
       i != op->ops.end();
 
525
       ++i) {
 
526
    if (i->op.op & CEPH_OSD_OP_MODE_WR) {
 
527
      op_budget += i->data.length();
 
528
    } else if (i->op.op & CEPH_OSD_OP_MODE_RD) {
 
529
      if (i->op.op & CEPH_OSD_OP_TYPE_DATA) {
 
530
        if ((int64_t)i->op.extent.length > 0)
 
531
          op_budget += (int64_t)i->op.extent.length;
 
532
      } else if (i->op.op & CEPH_OSD_OP_TYPE_ATTR) {
 
533
        op_budget += i->op.xattr.name_len + i->op.xattr.value_len;
 
534
      }
 
535
    }
 
536
  }
 
537
  return op_budget;
 
538
}
 
539
 
 
540
void Objecter::throttle_op(Op *op, int op_budget)
 
541
{
 
542
  if (!op_budget)
 
543
    op_budget = calc_op_budget(op);
 
544
  if (!op_throttler.get_or_fail(op_budget)) { //couldn't take right now
 
545
    client_lock.Unlock();
 
546
    op_throttler.get(op_budget);
 
547
    client_lock.Lock();
 
548
  }
 
549
}
 
550
 
 
551
/* This function DOES put the passed message before returning */
467
552
void Objecter::handle_osd_op_reply(MOSDOpReply *m)
468
553
{
469
554
  dout(10) << "in handle_osd_op_reply" << dendl;
484
569
          << dendl;
485
570
  Op *op = op_osd[ tid ];
486
571
 
 
572
  if (op->con != m->get_connection()) {
 
573
    dout(7) << " ignoring reply from " << m->get_source_inst()
 
574
            << ", i last sent to " << op->con->get_peer_addr() << dendl;
 
575
    m->put();
 
576
    return;
 
577
  }
 
578
 
487
579
  Context *onack = 0;
488
580
  Context *oncommit = 0;
489
581
 
490
 
  PG &pg = get_pg( m->get_pg() );
491
 
 
492
 
  // ignore?
493
 
  if (pg.acker() != m->get_source().num()) {
494
 
    dout(7) << " ignoring ack|commit from non-acker" << dendl;
495
 
    m->put();
496
 
    return;
497
 
  }
498
 
  
499
582
  int rc = m->get_result();
500
583
 
501
584
  if (rc == -EAGAIN) {
511
594
 
512
595
  // got data?
513
596
  if (op->outbl) {
 
597
    if (op->outbl->length())
 
598
      op->con->revoke_rx_buffer(op->tid);
514
599
    m->claim_data(*op->outbl);
515
600
    op->outbl = 0;
516
601
  }
532
617
 
533
618
  // done with this tid?
534
619
  if (!op->onack && !op->oncommit) {
 
620
    PG &pg = get_pg( m->get_pg() );
535
621
    assert(pg.active_tids.count(tid));
536
622
    pg.active_tids.erase(tid);
537
623
    dout(15) << "handle_osd_op_reply completed tid " << tid << ", pg " << m->get_pg()
538
624
             << " still has " << pg.active_tids << dendl;
539
625
    if (pg.active_tids.empty()) 
540
626
      close_pg( m->get_pg() );
 
627
    put_op_budget(op);
541
628
    op_osd.erase( tid );
 
629
    if (op->con)
 
630
      op->con->put();
542
631
    delete op;
543
632
  }
544
633
  
577
666
    return;
578
667
  }
579
668
 
580
 
  ceph_object_layout layout;
581
 
  object_t oid;
 
669
  const pg_pool_t *pool = osdmap->get_pg_pool(list_context->pool_id);
 
670
  int pg_num = pool->get_pg_num();
582
671
 
583
 
  int pg_num = osdmap->get_pg_layout(list_context->pool_id, list_context->current_pg, layout);
584
672
  if (list_context->starting_pg_num == 0) {     // there can't be zero pgs!
585
673
    list_context->starting_pg_num = pg_num;
586
674
    dout(20) << pg_num << " placement groups" << dendl;
591
679
    list_context->current_pg = 0;
592
680
    list_context->cookie = 0;
593
681
    list_context->starting_pg_num = pg_num;
594
 
    osdmap->get_pg_layout(list_context->pool_id, list_context->current_pg, layout);
595
682
  }
596
683
  if (list_context->current_pg == pg_num){ //this context got all the way through
597
684
    onfinish->finish(0);
598
685
    delete onfinish;
 
686
    return;
599
687
  }
600
688
 
601
689
  ObjectOperation op;
603
691
 
604
692
  bufferlist *bl = new bufferlist();
605
693
  C_List *onack = new C_List(list_context, onfinish, bl, this);
606
 
  read(oid, layout, op, list_context->pool_snap_seq, bl, 0, onack);
 
694
 
 
695
  object_t oid;
 
696
  object_locator_t oloc(list_context->pool_id);
 
697
 
 
698
  // 
 
699
  Op *o = new Op(oid, oloc, op.ops, CEPH_OSD_FLAG_READ, onack, NULL);
 
700
  o->priority = op.priority;
 
701
  o->snapid = list_context->pool_snap_seq;
 
702
  o->outbl = bl;
 
703
 
 
704
  o->pgid = pg_t(list_context->current_pg, list_context->pool_id, -1);
 
705
 
 
706
  op_submit(o);
607
707
}
608
708
 
609
709
void Objecter::_list_reply(ListContext *list_context, bufferlist *bl, Context *final_finish)
1061
1161
 
1062
1162
void Objecter::ms_handle_reset(Connection *con)
1063
1163
{
1064
 
  if (con->get_peer_type() == CEPH_ENTITY_TYPE_OSD)
1065
 
    maybe_request_map();
 
1164
  if (con->get_peer_type() == CEPH_ENTITY_TYPE_OSD) {
 
1165
    //
 
1166
    int osd = osdmap->identify_osd(con->get_peer_addr());
 
1167
    if (osd >= 0) {
 
1168
      dout(1) << "ms_handle_reset on osd" << osd << dendl;
 
1169
      set<pg_t> changed_pgs;
 
1170
      scan_pgs_for(changed_pgs, osd);
 
1171
      kick_requests(changed_pgs);
 
1172
      maybe_request_map();
 
1173
    } else {
 
1174
      dout(10) << "ms_handle_reset on unknown osd addr " << con->get_peer_addr() << dendl;
 
1175
    }
 
1176
  }
1066
1177
}
1067
1178
 
1068
1179
void Objecter::ms_handle_remote_reset(Connection *con)