~ubuntu-branches/ubuntu/raring/ceph/raring

« back to all changes in this revision

Viewing changes to src/osdc/Objecter.cc

  • Committer: Package Import Robot
  • Author(s): Laszlo Boszormenyi (GCS)
  • Date: 2012-02-05 10:07:38 UTC
  • mfrom: (1.1.7) (0.1.11 sid)
  • Revision ID: package-import@ubuntu.com-20120205100738-00s0bxx93mamy8tk
Tags: 0.41-1
New upstream release.

Show diffs side-by-side

added added

removed removed

Lines of Context:
199
199
    cct->get_perfcounters_collection()->add(logger);
200
200
  }
201
201
 
202
 
  timer.add_event_after(cct->_conf->objecter_tick_interval, new C_Tick(this));
 
202
  schedule_tick();
203
203
  maybe_request_map();
204
204
}
205
205
 
211
211
    close_session(p->second);
212
212
  }
213
213
 
 
214
  if (tick_event) {
 
215
    timer.cancel_event(tick_event);
 
216
    tick_event = NULL;
 
217
  }
 
218
 
214
219
  if (logger) {
215
220
    cct->get_perfcounters_collection()->remove(logger);
216
221
    delete logger;
343
348
 
344
349
void Objecter::handle_osd_map(MOSDMap *m)
345
350
{
 
351
  assert(client_lock.is_locked());
346
352
  assert(osdmap); 
347
353
 
348
 
  if (ceph_fsid_compare(&m->fsid, &monc->get_fsid())) {
 
354
  if (m->fsid != monc->get_fsid()) {
349
355
    ldout(cct, 0) << "handle_osd_map fsid " << m->fsid << " != " << monc->get_fsid() << dendl;
350
356
    m->put();
351
357
    return;
389
395
          logger->inc(l_osdc_map_full);
390
396
        }
391
397
        else {
392
 
          if (m->get_first() > m->get_oldest() || e == m->get_first()) {
 
398
          if (e && e > m->get_oldest()) {
393
399
            ldout(cct, 3) << "handle_osd_map requesting missing epoch " << osdmap->get_epoch()+1 << dendl;
394
400
            maybe_request_map();
395
401
            break;
397
403
          ldout(cct, 3) << "handle_osd_map missing epoch " << osdmap->get_epoch()+1
398
404
                        << ", jumping to " << m->get_oldest() << dendl;
399
405
          e = m->get_oldest() - 1;
 
406
          skipped_map = true;
400
407
          continue;
401
408
        }
402
409
        logger->set(l_osdc_map_epoch, osdmap->get_epoch());
711
718
  }
712
719
}
713
720
 
 
721
void Objecter::schedule_tick()
 
722
{
 
723
  assert(tick_event == NULL);
 
724
  tick_event = new C_Tick(this);
 
725
  timer.add_event_after(cct->_conf->objecter_tick_interval, tick_event);
 
726
}
714
727
 
715
728
void Objecter::tick()
716
729
{
717
730
  ldout(cct, 10) << "tick" << dendl;
 
731
  assert(client_lock.is_locked());
 
732
 
 
733
  // we are only called by C_Tick
 
734
  assert(tick_event);
 
735
  tick_event = NULL;
718
736
 
719
737
  set<OSDSession*> toping;
720
738
 
745
763
    for (set<OSDSession*>::iterator i = toping.begin();
746
764
         i != toping.end();
747
765
         i++)
748
 
      messenger->send_message(new MPing, osdmap->get_inst((*i)->osd));
 
766
      messenger->send_message(new MPing, (*i)->con);
749
767
  }
750
768
    
751
769
  // reschedule
752
 
  timer.add_event_after(cct->_conf->objecter_tick_interval, new C_Tick(this));
 
770
  schedule_tick();
753
771
}
754
772
 
755
773
void Objecter::resend_mon_ops()
786
804
 
787
805
tid_t Objecter::op_submit(Op *op, OSDSession *s)
788
806
{
 
807
  assert(client_lock.is_locked());
 
808
 
 
809
  assert(op->ops.size() == op->out_bl.size());
 
810
  assert(op->ops.size() == op->out_rval.size());
 
811
  assert(op->ops.size() == op->out_handler.size());
 
812
 
789
813
  // throttle.  before we look at any state, because
790
814
  // take_op_budget() may drop our lock while it blocks.
791
815
  take_op_budget(op);
807
831
  }
808
832
 
809
833
  // add to gather set(s)
810
 
  int flags = op->flags;
811
834
  if (op->onack) {
812
 
    flags |= CEPH_OSD_FLAG_ACK;
813
835
    ++num_unacked;
814
836
  } else {
815
837
    ldout(cct, 20) << " note: not requesting ack" << dendl;
816
838
  }
817
839
  if (op->oncommit) {
818
 
    flags |= CEPH_OSD_FLAG_ONDISK;
819
840
    ++num_uncommitted;
820
841
  } else {
821
842
    ldout(cct, 20) << " note: not requesting commit" << dendl;
949
970
        ldout(cct, 10) << " chose random osd." << osd << " of " << acting << dendl;
950
971
      } else if (read && (op->flags & CEPH_OSD_FLAG_LOCALIZE_READS)) {
951
972
        // look for a local replica
952
 
        unsigned i;
953
 
        for (i = acting.size()-1; i > 0; i++)
954
 
          if (osdmap->get_addr(i).is_same_host(messenger->get_myaddr())) {
 
973
        int i;
 
974
        /* loop through the OSD replicas and see if any are local to read from.
 
975
         * We don't need to check the primary since we default to it. (Be
 
976
         * careful to preserve that default, which is why we iterate in reverse
 
977
         * order.) */
 
978
        for (i = acting.size()-1; i > 0; --i) {
 
979
          if (osdmap->get_addr(acting[i]).is_same_host(messenger->get_myaddr())) {
955
980
            op->used_replica = true;
956
981
            ldout(cct, 10) << " chose local osd." << acting[i] << " of " << acting << dendl;
957
982
            break;
958
983
          }
 
984
        }
959
985
        osd = acting[i];
960
986
      } else
961
987
        osd = acting[0];
1051
1077
  if (op->priority)
1052
1078
    m->set_priority(op->priority);
1053
1079
 
1054
 
  messenger->send_message(m, op->session->con);
1055
 
 
1056
1080
  logger->inc(l_osdc_op_send);
1057
1081
  logger->inc(l_osdc_op_send_bytes, m->get_data().length());
 
1082
 
 
1083
  messenger->send_message(m, op->session->con);
1058
1084
}
1059
1085
 
1060
1086
int Objecter::calc_op_budget(Op *op)
1064
1090
       i != op->ops.end();
1065
1091
       ++i) {
1066
1092
    if (i->op.op & CEPH_OSD_OP_MODE_WR) {
1067
 
      op_budget += i->data.length();
 
1093
      op_budget += i->indata.length();
1068
1094
    } else if (i->op.op & CEPH_OSD_OP_MODE_RD) {
1069
1095
      if (ceph_osd_op_type_data(i->op.op)) {
1070
1096
        if ((int64_t)i->op.extent.length > 0)
1091
1117
/* This function DOES put the passed message before returning */
1092
1118
void Objecter::handle_osd_op_reply(MOSDOpReply *m)
1093
1119
{
 
1120
  assert(client_lock.is_locked());
1094
1121
  ldout(cct, 10) << "in handle_osd_op_reply" << dendl;
 
1122
 
1095
1123
  // get pio
1096
1124
  tid_t tid = m->get_tid();
1097
1125
 
1104
1132
  }
1105
1133
 
1106
1134
  ldout(cct, 7) << "handle_osd_op_reply " << tid
1107
 
          << (m->is_ondisk() ? " ondisk":(m->is_onnvram() ? " onnvram":" ack"))
1108
 
          << " v " << m->get_version() << " in " << m->get_pg()
1109
 
          << dendl;
 
1135
                << (m->is_ondisk() ? " ondisk":(m->is_onnvram() ? " onnvram":" ack"))
 
1136
                << " v " << m->get_version() << " in " << m->get_pg()
 
1137
                << " attempt " << m->get_retry_attempt()
 
1138
                << dendl;
1110
1139
  Op *op = ops[tid];
1111
1140
 
1112
 
  if (op->session->con != m->get_connection()) {
1113
 
    ldout(cct, 7) << " ignoring reply from " << m->get_source_inst()
1114
 
            << ", i last sent to " << op->session->con->get_peer_addr() << dendl;
1115
 
    m->put();
1116
 
    return;
 
1141
  if (m->get_retry_attempt() >= 0) {
 
1142
    if (m->get_retry_attempt() != (op->attempts - 1)) {
 
1143
      ldout(cct, 7) << " ignoring reply from attempt " << m->get_retry_attempt()
 
1144
                    << " from " << m->get_source_inst()
 
1145
                    << "; last attempt " << (op->attempts - 1) << " sent to "
 
1146
                    << op->session->con->get_peer_addr() << dendl;
 
1147
      m->put();
 
1148
      return;
 
1149
    }
 
1150
  } else {
 
1151
    // we don't know the request attempt because the server is old, so
 
1152
    // just accept this one.  we may do ACK callbacks we shouldn't
 
1153
    // have, but that is better than doing callbacks out of order.
1117
1154
  }
1118
1155
 
1119
1156
  Context *onack = 0;
1137
1174
  if (op->reply_epoch)
1138
1175
    *op->reply_epoch = m->get_map_epoch();
1139
1176
 
1140
 
  // got data?
1141
 
  if (op->outbl) {
1142
 
    if (op->con)
1143
 
      op->con->revoke_rx_buffer(op->tid);
1144
 
    m->claim_data(*op->outbl);
1145
 
    op->outbl = 0;
 
1177
  // per-op result demuxing
 
1178
  vector<OSDOp> out_ops;
 
1179
  m->claim_ops(out_ops);
 
1180
  
 
1181
  if (out_ops.size() != op->ops.size())
 
1182
    ldout(cct, 0) << "WARNING: tid " << op->tid << " reply ops " << out_ops
 
1183
                  << " != request ops " << op->ops
 
1184
                  << " from " << m->get_source_inst() << dendl;
 
1185
 
 
1186
  vector<bufferlist*>::iterator pb = op->out_bl.begin();
 
1187
  vector<int*>::iterator pr = op->out_rval.begin();
 
1188
  vector<Context*>::iterator ph = op->out_handler.begin();
 
1189
  assert(op->out_bl.size() == op->out_rval.size());
 
1190
  assert(op->out_bl.size() == op->out_handler.size());
 
1191
  vector<OSDOp>::iterator p = out_ops.begin();
 
1192
  for (unsigned i = 0;
 
1193
       p != out_ops.end() && pb != op->out_bl.end();
 
1194
       ++i, ++p, ++pb, ++pr, ++ph) {
 
1195
    ldout(cct, 10) << " op " << i << " rval " << p->rval
 
1196
                   << " len " << p->outdata.length() << dendl;
 
1197
    if (*pb)
 
1198
      **pb = p->outdata;
 
1199
    if (*pr)
 
1200
      **pr = p->rval;
 
1201
    if (*ph) {
 
1202
      ldout(cct, 10) << " op " << i << " handler " << *ph << dendl;
 
1203
      (*ph)->complete(p->rval);
 
1204
    }
1146
1205
  }
1147
1206
 
1148
1207
  // ack|commit -> ack
1154
1213
    num_unacked--;
1155
1214
    logger->inc(l_osdc_op_ack);
1156
1215
  }
1157
 
  if (op->oncommit && m->is_ondisk()) {
 
1216
  if (op->oncommit && (m->is_ondisk() || rc)) {
1158
1217
    ldout(cct, 15) << "handle_osd_op_reply safe" << dendl;
1159
1218
    oncommit = op->oncommit;
1160
1219
    op->oncommit = 0;
1162
1221
    logger->inc(l_osdc_op_commit);
1163
1222
  }
1164
1223
 
 
1224
  // got data?
 
1225
  if (op->outbl) {
 
1226
    if (op->con)
 
1227
      op->con->revoke_rx_buffer(op->tid);
 
1228
    m->claim_data(*op->outbl);
 
1229
    op->outbl = 0;
 
1230
  }
 
1231
 
1165
1232
  // done with this tid?
1166
1233
  if (!op->onack && !op->oncommit) {
1167
1234
    op->session_item.remove_myself();
1178
1245
 
1179
1246
  // do callbacks
1180
1247
  if (onack) {
1181
 
    ldout(cct, 20) << "Calling onack->finish with rc " << rc << dendl;
1182
1248
    onack->finish(rc);
1183
 
    ldout(cct, 20) << "Finished onack-finish" << dendl;
1184
1249
    delete onack;
1185
1250
  }
1186
1251
  if (oncommit) {
1483
1548
 */
1484
1549
void Objecter::handle_pool_op_reply(MPoolOpReply *m)
1485
1550
{
 
1551
  assert(client_lock.is_locked());
1486
1552
  ldout(cct, 10) << "handle_pool_op_reply " << *m << dendl;
1487
1553
  tid_t tid = m->get_tid();
1488
1554
  if (pool_ops.count(tid)) {
1544
1610
 
1545
1611
void Objecter::handle_get_pool_stats_reply(MGetPoolStatsReply *m)
1546
1612
{
 
1613
  assert(client_lock.is_locked());
1547
1614
  ldout(cct, 10) << "handle_get_pool_stats_reply " << *m << dendl;
1548
1615
  tid_t tid = m->get_tid();
1549
1616
 
1594
1661
 
1595
1662
void Objecter::handle_fs_stats_reply(MStatfsReply *m)
1596
1663
{
 
1664
  assert(client_lock.is_locked());
1597
1665
  ldout(cct, 10) << "handle_fs_stats_reply " << *m << dendl;
1598
1666
  tid_t tid = m->get_tid();
1599
1667