~ubuntu-branches/ubuntu/quantal/ceph/quantal

« back to all changes in this revision

Viewing changes to src/osd/OSD.cc

  • Committer: Bazaar Package Importer
  • Author(s): Clint Byrum, Clint Byrum, Micah Gersten
  • Date: 2011-02-12 22:50:26 UTC
  • mfrom: (1.1.1 upstream)
  • Revision ID: james.westby@ubuntu.com-20110212225026-yyyw4tk0msgql3ul
Tags: 0.24.2-0ubuntu1
[ Clint Byrum <clint@ubuntu.com> ]
* New upstream release. (LP: #658670, LP: #684011)
* debian/patches/fix-mkcephfs.patch: dropped (applied upstream)
* Removed .la files from libceph1-dev, libcrush1-dev and 
  librados1-dev (per Debian policy v3.9.1 10.2).
* debian/control: adding pkg-config as a build dependency
* debian/control: depend on libcrypto++-dev instead of libssl-dev
* debian/watch: added watch file

[ Micah Gersten <micahg@ubuntu.com> ]
* debian/control: add Homepage

Show diffs side-by-side

added added

removed removed

Lines of Context:
58
58
#include "messages/MOSDPGInfo.h"
59
59
#include "messages/MOSDPGCreate.h"
60
60
#include "messages/MOSDPGTrim.h"
 
61
#include "messages/MOSDPGMissing.h"
61
62
 
62
63
#include "messages/MOSDAlive.h"
63
64
 
100
101
  return out << dbeginl << "osd" << whoami << " " << (osdmap ? osdmap->get_epoch():0) << " ";
101
102
}
102
103
 
103
 
 
104
 
const coll_t meta_coll(coll_t::TYPE_META);
105
 
const coll_t temp_coll(coll_t::TYPE_TEMP);
106
 
 
 
104
const coll_t coll_t::META_COLL("meta");
 
105
const coll_t coll_t::TEMP_COLL("temp");
107
106
 
108
107
const struct CompatSet::Feature ceph_osd_feature_compat[] = {
109
108
  END_FEATURE
110
109
};
111
110
const struct CompatSet::Feature ceph_osd_feature_incompat[] = {
112
111
  CEPH_OSD_FEATURE_INCOMPAT_BASE,
 
112
  CEPH_OSD_FEATURE_INCOMPAT_PGINFO,
 
113
  CEPH_OSD_FEATURE_INCOMPAT_OLOC,
 
114
  CEPH_OSD_FEATURE_INCOMPAT_LEC,
113
115
  END_FEATURE
114
116
};
115
117
const struct CompatSet::Feature ceph_osd_feature_ro_compat[] = {
180
182
    object_t oid("disk_bw_test");
181
183
    for (int i=0; i<1000; i++) {
182
184
      ObjectStore::Transaction *t = new ObjectStore::Transaction;
183
 
      t->write(meta_coll, sobject_t(oid, 0), i*bl.length(), bl.length(), bl);
 
185
      t->write(coll_t::META_COLL, sobject_t(oid, 0), i*bl.length(), bl.length(), bl);
184
186
      store->queue_transaction(NULL, t);
185
187
    }
186
188
    store->sync();
188
190
    end -= start;
189
191
    cout << "measured " << (1000.0 / (double)end) << " mb/sec" << std::endl;
190
192
    ObjectStore::Transaction tr;
191
 
    tr.remove(meta_coll, sobject_t(oid, 0));
 
193
    tr.remove(coll_t::META_COLL, sobject_t(oid, 0));
192
194
    store->apply_transaction(tr);
193
195
    
194
196
    // set osd weight
199
201
  ::encode(sb, bl);
200
202
 
201
203
  ObjectStore::Transaction t;
202
 
  t.create_collection(meta_coll);
203
 
  t.write(meta_coll, OSD_SUPERBLOCK_POBJECT, 0, bl.length(), bl);
204
 
  t.create_collection(temp_coll);
 
204
  t.create_collection(coll_t::META_COLL);
 
205
  t.write(coll_t::META_COLL, OSD_SUPERBLOCK_POBJECT, 0, bl.length(), bl);
 
206
  t.create_collection(coll_t::TEMP_COLL);
205
207
  int r = store->apply_transaction(t);
206
208
  store->umount();
207
209
  delete store;
238
240
 
239
241
  snprintf(fn, sizeof(fn), "%s/%s", base, file);
240
242
  fd = ::open(fn, O_WRONLY|O_CREAT|O_TRUNC, 0644);
241
 
  ::write(fd, val, vallen);
 
243
  if (fd < 0)
 
244
    return -errno;
 
245
  int r = ::write(fd, val, vallen);
 
246
  if (r < 0)
 
247
    return -errno;
242
248
  ::close(fd);
243
 
 
244
249
  return 0;
245
250
}
246
251
 
328
333
 
329
334
// cons/des
330
335
 
331
 
OSD::OSD(int id, Messenger *m, Messenger *hbm, MonClient *mc, const char *dev, const char *jdev) : 
 
336
OSD::OSD(int id, Messenger *internal_messenger, Messenger *external_messenger, Messenger *hbm, MonClient *mc, const char *dev, const char *jdev) :
332
337
  osd_lock("OSD::osd_lock"),
333
338
  timer(osd_lock),
334
 
  messenger(m),
 
339
  cluster_messenger(internal_messenger),
 
340
  client_messenger(external_messenger),
335
341
  monc(mc),
336
342
  logger(NULL), logger_started(false),
337
343
  store(NULL),
338
 
  logclient(messenger, &mc->monmap, mc),
 
344
  map_in_progress(false),
 
345
  logclient(client_messenger, &mc->monmap, mc),
339
346
  whoami(id),
340
347
  dev_path(dev), journal_path(jdev),
341
348
  dispatch_running(false),
373
380
  remove_list_lock("OSD::remove_list_lock"),
374
381
  replay_queue_lock("OSD::replay_queue_lock"),
375
382
  snap_trim_wq(this, &disk_tp),
 
383
  sched_scrub_lock("OSD::sched_scrub_lock"),
 
384
  scrubs_pending(0),
 
385
  scrubs_active(0),
376
386
  scrub_wq(this, &disk_tp),
377
387
  remove_wq(this, &disk_tp)
378
388
{
379
 
  monc->set_messenger(messenger);
 
389
  monc->set_messenger(client_messenger);
 
390
 
 
391
  map_in_progress_cond = new Cond();
380
392
  
381
393
  osdmap = 0;
382
394
 
395
407
 
396
408
OSD::~OSD()
397
409
{
 
410
  delete map_in_progress_cond;
398
411
  delete class_handler;
399
412
  delete osdmap;
400
413
  logger_remove(logger);
440
453
{
441
454
  Mutex::Locker lock(osd_lock);
442
455
 
 
456
  timer.init();
 
457
 
443
458
  // mount.
444
459
  dout(2) << "mounting " << dev_path << " " << (journal_path ? journal_path : "(no journal)") << dendl;
445
460
  assert(store);  // call pre_init() first!
482
497
  clear_temp();
483
498
 
484
499
  // make sure (newish) temp dir exists
485
 
  if (!store->collection_exists(coll_t(coll_t::TYPE_TEMP))) {
 
500
  if (!store->collection_exists(coll_t::TEMP_COLL)) {
486
501
    dout(10) << "creating temp pg dir" << dendl;
487
502
    ObjectStore::Transaction t;
488
 
    t.create_collection(coll_t(coll_t::TYPE_TEMP));
 
503
    t.create_collection(coll_t::TEMP_COLL);
489
504
    store->apply_transaction(t);
490
505
  }
491
506
 
502
517
  open_logger();
503
518
    
504
519
  // i'm ready!
505
 
  messenger->add_dispatcher_head(this);
506
 
  messenger->add_dispatcher_head(&logclient);
 
520
  client_messenger->add_dispatcher_head(this);
 
521
  client_messenger->add_dispatcher_head(&logclient);
 
522
  cluster_messenger->add_dispatcher_head(this);
507
523
 
508
524
  heartbeat_messenger->add_dispatcher_head(&heartbeat_dispatcher);
509
525
 
626
642
  g_conf.debug_ebofs = 100;
627
643
  g_conf.debug_ms = 100;
628
644
  
629
 
  dout(1) << "shutdown" << dendl;
 
645
  dout(1) << "shutdown." << dendl;
630
646
 
631
647
  state = STATE_STOPPING;
632
648
 
633
 
  // cancel timers
634
 
  timer.cancel_all();
635
 
  timer.join();
 
649
  timer.shutdown();
636
650
 
637
651
  heartbeat_lock.Lock();
638
652
  heartbeat_stop = true;
720
734
  }
721
735
  pg_map.clear();
722
736
 
723
 
  messenger->shutdown();
 
737
  client_messenger->shutdown();
 
738
  cluster_messenger->shutdown();
724
739
  if (heartbeat_messenger)
725
740
    heartbeat_messenger->shutdown();
726
741
 
740
755
 
741
756
  bufferlist bl;
742
757
  ::encode(superblock, bl);
743
 
  t.write(meta_coll, OSD_SUPERBLOCK_POBJECT, 0, bl.length(), bl);
 
758
  t.write(coll_t::META_COLL, OSD_SUPERBLOCK_POBJECT, 0, bl.length(), bl);
744
759
}
745
760
 
746
761
int OSD::read_superblock()
747
762
{
748
763
  bufferlist bl;
749
 
  int r = store->read(meta_coll, OSD_SUPERBLOCK_POBJECT, 0, 0, bl);
 
764
  int r = store->read(coll_t::META_COLL, OSD_SUPERBLOCK_POBJECT, 0, 0, bl);
750
765
  if (r < 0)
751
766
    return r;
752
767
 
787
802
  dout(10) << "clear_temp" << dendl;
788
803
 
789
804
  vector<sobject_t> objects;
790
 
  store->collection_list(temp_coll, objects);
 
805
  store->collection_list(coll_t::TEMP_COLL, objects);
791
806
 
792
807
  dout(10) << objects.size() << " objects" << dendl;
793
808
  if (objects.empty())
798
813
  for (vector<sobject_t>::iterator p = objects.begin();
799
814
       p != objects.end();
800
815
       p++)
801
 
    t->collection_remove(temp_coll, *p);
 
816
    t->collection_remove(coll_t::TEMP_COLL, *p);
802
817
  int r = store->queue_transaction(NULL, t);
803
818
  assert(r == 0);
804
819
  store->sync_and_flush();
820
835
{
821
836
  PGPool *p = _lookup_pool(id);
822
837
  if (!p) {
 
838
    if (!osdmap->have_pg_pool(id)) {
 
839
      dout(5) << __func__ << ": the OSDmap does not contain a PG pool with id = "
 
840
              << id << dendl;
 
841
      return NULL;
 
842
    }
 
843
 
823
844
    p = new PGPool(id, osdmap->get_pool_name(id),
824
845
                   osdmap->get_pg_pool(id)->v.auid );
825
846
    pool_map[id] = p;
854
875
 
855
876
  dout(10) << "_open_lock_pg " << pgid << dendl;
856
877
  PGPool *pool = _get_pool(pgid.pool());
 
878
  assert(pool);
857
879
 
858
880
  // create
859
881
  PG *pg;
860
882
  sobject_t logoid = make_pg_log_oid(pgid);
 
883
  sobject_t infooid = make_pg_biginfo_oid(pgid);
861
884
  if (osdmap->get_pg_type(pgid) == CEPH_PG_TYPE_REP)
862
 
    pg = new ReplicatedPG(this, pool, pgid, logoid);
 
885
    pg = new ReplicatedPG(this, pool, pgid, logoid, infooid);
863
886
  //else if (pgid.is_raid4())
864
887
  //pg = new RAID4PG(this, pgid);
865
888
  else 
886
909
  PG *pg = _open_lock_pg(pgid);
887
910
 
888
911
  // create collection
889
 
  assert(!store->collection_exists(coll_t::build_pg_coll(pgid)));
890
 
  t.create_collection(coll_t::build_pg_coll(pgid));
 
912
  assert(!store->collection_exists(coll_t(pgid)));
 
913
  t.create_collection(coll_t(pgid));
891
914
 
892
915
  pg->write_info(t);
893
916
  pg->write_log(t);
904
927
 
905
928
  PG *pg = _open_lock_pg(pgid, true);
906
929
 
907
 
  assert(!store->collection_exists(coll_t::build_pg_coll(pgid)));
908
 
  t.create_collection(coll_t::build_pg_coll(pgid));
 
930
  assert(!store->collection_exists(coll_t(pgid)));
 
931
  t.create_collection(coll_t(pgid));
909
932
 
910
933
  pg->set_role(0);
911
934
  pg->acting.swap(acting);
951
974
  for (vector<coll_t>::iterator it = ls.begin();
952
975
       it != ls.end();
953
976
       it++) {
954
 
    if (*it == meta_coll)
955
 
      continue;
956
 
    if (it->snap != CEPH_NOSNAP)
957
 
      continue;
958
 
    pg_t pgid = it->pgid;
 
977
    pg_t pgid;
 
978
    snapid_t snap;
 
979
    if (!it->is_pg(pgid, snap)) {
 
980
      dout(10) << "load_pgs skipping non-pg " << *it << dendl;
 
981
      continue;
 
982
    }
 
983
    if (snap != CEPH_NOSNAP) {
 
984
      dout(10) << "load_pgs skipping snapped dir " << *it
 
985
               << " (pg " << pgid << " snap " << snap << ")" << dendl;
 
986
      continue;
 
987
    }
 
988
 
 
989
    if (!osdmap->have_pg_pool(pgid.pool())) {
 
990
      dout(10) << __func__ << ": skipping PG " << pgid << " because we don't have pool "
 
991
               << pgid.pool() << dendl;
 
992
      continue;
 
993
    }
 
994
 
959
995
    PG *pg = _open_lock_pg(pgid);
960
996
 
961
997
    // read pg state, log
969
1005
    dout(10) << "load_pgs loaded " << *pg << " " << pg->log << dendl;
970
1006
    pg->unlock();
971
1007
  }
 
1008
  dout(10) << "load_pgs done" << dendl;
972
1009
}
973
1010
 
974
1011
 
1156
1193
  assert(osd_lock.is_locked());
1157
1194
  heartbeat_lock.Lock();
1158
1195
 
 
1196
  /*
 
1197
  for (map<int,epoch_t>::iterator p = heartbeat_to.begin(); p != heartbeat_to.end(); p++)
 
1198
    if (heartbeat_inst.count(p->first) == 0)
 
1199
      dout(0) << " no inst for _to " << p->first << dendl;
 
1200
  for (map<int,epoch_t>::iterator p = heartbeat_from.begin(); p != heartbeat_from.end(); p++)
 
1201
    if (heartbeat_inst.count(p->first) == 0)
 
1202
      dout(0) << " no inst for _from " << p->first << dendl;
 
1203
  */
 
1204
 
1159
1205
  // filter heartbeat_from_stamp to only include osds that remain in
1160
1206
  // heartbeat_from.
1161
1207
  map<int, utime_t> old_from_stamp;
1167
1213
  old_from.swap(heartbeat_from);
1168
1214
  old_inst.swap(heartbeat_inst);
1169
1215
 
 
1216
  utime_t now = g_clock.now();
 
1217
 
 
1218
  heartbeat_epoch = osdmap->get_epoch();
 
1219
 
1170
1220
  // build heartbeat to/from set
1171
1221
  for (hash_map<pg_t, PG*>::iterator i = pg_map.begin();
1172
1222
       i != pg_map.end();
1177
1227
    if (pg->get_role() > 0) {
1178
1228
      assert(pg->acting.size() > 1);
1179
1229
      int p = pg->acting[0];
 
1230
      if (heartbeat_to.count(p))
 
1231
        continue;
1180
1232
      heartbeat_to[p] = osdmap->get_epoch();
1181
1233
      heartbeat_inst[p] = osdmap->get_hb_inst(p);
1182
1234
      if (old_to.count(p) == 0 || old_inst[p] != heartbeat_inst[p])
1187
1239
      for (unsigned i=1; i<pg->acting.size(); i++) {
1188
1240
        int p = pg->acting[i]; // peer
1189
1241
        assert(p != whoami);
 
1242
        if (heartbeat_from.count(p))
 
1243
          continue;
1190
1244
        heartbeat_from[p] = osdmap->get_epoch();
1191
1245
        heartbeat_inst[p] = osdmap->get_hb_inst(p);
1192
1246
        if (old_from_stamp.count(p) && old_from.count(p) &&
1195
1249
          heartbeat_from_stamp[p] = old_from_stamp[p];
1196
1250
        } else {
1197
1251
          dout(10) << "update_heartbeat_peers: new _from osd" << p << " " << heartbeat_inst[p] << dendl;
 
1252
          heartbeat_from_stamp[p] = now;  
 
1253
          MOSDPing *m = new MOSDPing(osdmap->get_fsid(), 0, heartbeat_epoch, my_stat, true); // request hb
 
1254
          m->set_priority(CEPH_MSG_PRIO_HIGH);
 
1255
          heartbeat_messenger->send_message(m, heartbeat_inst[p]);
1198
1256
        }
1199
1257
      }
1200
1258
    }
1202
1260
  for (map<int,epoch_t>::iterator p = old_to.begin();
1203
1261
       p != old_to.end();
1204
1262
       p++) {
 
1263
    assert(old_inst.count(p->first));
 
1264
    if (heartbeat_to.count(p->first))
 
1265
      continue;
1205
1266
    if (p->second > osdmap->get_epoch()) {
1206
1267
      dout(10) << "update_heartbeat_peers: keeping newer _to peer " << old_inst[p->first]
1207
1268
               << " as of " << p->second << dendl;
1208
1269
      heartbeat_to[p->first] = p->second;
1209
1270
      heartbeat_inst[p->first] = old_inst[p->first];
1210
 
    } else if (p->second < osdmap->get_epoch() &&
1211
 
               (!osdmap->is_up(p->first) ||
1212
 
                osdmap->get_hb_inst(p->first) != old_inst[p->first])) {
1213
 
      dout(10) << "update_heartbeat_peers: marking down old _to peer " << old_inst[p->first] 
1214
 
               << " as of " << p->second << dendl;      
1215
 
      heartbeat_messenger->mark_down(old_inst[p->first].addr);
 
1271
    } else {
 
1272
      if (heartbeat_from.count(p->first) && old_inst[p->first] == heartbeat_inst[p->first]) {
 
1273
        dout(10) << "update_heartbeat_peers: old _to peer " << old_inst[p->first] 
 
1274
                 << " is still a _from peer, not marking down" << dendl;
 
1275
      } else {
 
1276
        dout(10) << "update_heartbeat_peers: marking down old _to peer " << old_inst[p->first] 
 
1277
                 << " as of " << p->second << dendl;
 
1278
        heartbeat_messenger->mark_down(old_inst[p->first].addr);
 
1279
      }
 
1280
 
 
1281
      if (!osdmap->is_down(p->first) &&
 
1282
          osdmap->get_hb_inst(p->first) == old_inst[p->first]) {
 
1283
        dout(10) << "update_heartbeat_peers: sharing map with old _to peer " << old_inst[p->first] 
 
1284
                 << " as of " << p->second << dendl;
 
1285
        // share latest map with this peer (via the cluster link, NOT
 
1286
        // the heartbeat link), so they know not to expect heartbeats
 
1287
        // from us.  otherwise they may mark us down!
 
1288
        _share_map_outgoing(osdmap->get_cluster_inst(p->first));
 
1289
      }
1216
1290
    }
1217
1291
  }
1218
1292
  for (map<int,epoch_t>::iterator p = old_from.begin();
1219
1293
       p != old_from.end();
1220
1294
       p++) {
1221
 
    if (heartbeat_to.count(p->first) == 0 ||
1222
 
        heartbeat_inst[p->first] != old_inst[p->first])
1223
 
      dout(10) << "update_heartbeat_peers: dropped old _from osd" << p->first 
1224
 
               << " " << old_inst[p->first] << dendl;
 
1295
    if (heartbeat_from.count(p->first) == 0 ||
 
1296
        heartbeat_inst[p->first] != old_inst[p->first]) {
 
1297
      if (heartbeat_to.count(p->first) == 0) {
 
1298
        dout(10) << "update_heartbeat_peers: marking down old _from peer " << old_inst[p->first] 
 
1299
                 << " as of " << p->second << dendl;
 
1300
        heartbeat_messenger->mark_down(old_inst[p->first].addr);
 
1301
      } else {
 
1302
        dout(10) << "update_heartbeat_peers: old _from peer " << old_inst[p->first]
 
1303
                 << " is still a _to peer, not marking down" << dendl;
 
1304
      }
 
1305
    }
1225
1306
  }
1226
1307
 
1227
 
  heartbeat_epoch = osdmap->get_epoch();
1228
 
 
1229
1308
  dout(10) << "update_heartbeat_peers: hb   to: " << heartbeat_to << dendl;
1230
1309
  dout(10) << "update_heartbeat_peers: hb from: " << heartbeat_from << dendl;
1231
1310
 
 
1311
  /*
 
1312
  for (map<int,epoch_t>::iterator p = heartbeat_to.begin(); p != heartbeat_to.end(); p++)
 
1313
    if (heartbeat_inst.count(p->first) == 0)
 
1314
      dout(0) << " no inst for _to " << p->first << dendl;
 
1315
  for (map<int,epoch_t>::iterator p = heartbeat_from.begin(); p != heartbeat_from.end(); p++)
 
1316
    if (heartbeat_inst.count(p->first) == 0)
 
1317
      dout(0) << " no inst for _from " << p->first << dendl;
 
1318
  */
 
1319
 
1232
1320
  heartbeat_lock.Unlock();
1233
1321
}
1234
1322
 
1240
1328
  heartbeat_from.clear();
1241
1329
  heartbeat_from_stamp.clear();
1242
1330
  heartbeat_inst.clear();
1243
 
  heartbeat_lock.Unlock();
1244
 
 
1245
1331
  failure_queue.clear();
 
1332
  heartbeat_lock.Unlock();
 
1333
 
1246
1334
}
1247
1335
 
1248
1336
void OSD::handle_osd_ping(MOSDPing *m)
1272
1360
    heartbeat_to[from] = m->peer_as_of_epoch;
1273
1361
    heartbeat_inst[from] = m->get_source_inst();
1274
1362
 
1275
 
    if (locked && m->map_epoch)
 
1363
    if (locked && m->map_epoch && !is_booting())
1276
1364
      _share_map_incoming(m->get_source_inst(), m->map_epoch,
1277
1365
                          (Session*) m->get_connection()->get_priv());
1278
1366
  }
1282
1370
 
1283
1371
    // only take peer stat or share map now if map_lock is uncontended
1284
1372
    if (locked) {
1285
 
      if (m->map_epoch)
 
1373
      if (m->map_epoch && !is_booting())
1286
1374
        _share_map_incoming(m->get_source_inst(), m->map_epoch,
1287
1375
                            (Session*) m->get_connection()->get_priv());
1288
1376
      take_peer_stat(from, m->peer_stat);  // only with map_lock held!
1289
1377
    }
1290
1378
 
1291
1379
    heartbeat_from_stamp[from] = g_clock.now();  // don't let _my_ lag interfere.
 
1380
    // remove from failure lists if needed
 
1381
    if (failure_pending.count(from)) {
 
1382
      send_still_alive(from);
 
1383
      failure_pending.erase(from);
 
1384
    }
 
1385
    failure_queue.erase(from);
1292
1386
  } else {
1293
1387
    dout(10) << " ignoring " << m->get_source_inst() << dendl;
1294
1388
  }
1331
1425
              << " since " << heartbeat_from_stamp[p->first]
1332
1426
              << " (cutoff " << grace << ")" << dendl;
1333
1427
      queue_failure(p->first);
 
1428
 
1334
1429
    }
1335
1430
  }
1336
1431
}
1343
1438
    dout(0) << "got SIGTERM, shutting down" << dendl;
1344
1439
    Message *m = new MGenericMessage(CEPH_MSG_SHUTDOWN);
1345
1440
    m->set_priority(CEPH_MSG_PRIO_HIGHEST);
1346
 
    messenger->send_message(m, messenger->get_myinst());
 
1441
    cluster_messenger->send_message(m, cluster_messenger->get_myinst());
1347
1442
    return;
1348
1443
  }
1349
1444
 
1357
1452
      in.close();
1358
1453
    }
1359
1454
  }
1360
 
  catch (ios::failure f) {
 
1455
  catch (const ios::failure &f) {
1361
1456
    dout(0) << "heartbeat: failed to read /proc/loadavg" << dendl;
1362
1457
  }
1363
1458
 
1389
1484
    }
1390
1485
  }
1391
1486
 
1392
 
  // request heartbeats?
1393
 
  for (map<int, epoch_t>::iterator p = heartbeat_from.begin();
1394
 
       p != heartbeat_from.end();
1395
 
       p++) {
1396
 
    if (heartbeat_from_stamp.count(p->first) == 0) {
1397
 
      // fake initial stamp.  and send them a ping so they know we expect it.
1398
 
      dout(10) << "requesting heartbeats from " << heartbeat_inst[p->first] << dendl;
1399
 
      heartbeat_from_stamp[p->first] = now;  
1400
 
      Message *m = new MOSDPing(osdmap->get_fsid(), 0, heartbeat_epoch, my_stat, true);  // request ack
1401
 
      m->set_priority(CEPH_MSG_PRIO_HIGH);
1402
 
      heartbeat_messenger->send_message(m, heartbeat_inst[p->first]);
1403
 
    }
1404
 
  }
1405
 
 
1406
1487
  if (map_locked)
1407
1488
    heartbeat_check();
1408
1489
 
1432
1513
  assert(osd_lock.is_locked());
1433
1514
  dout(5) << "tick" << dendl;
1434
1515
 
 
1516
  logger->set(l_osd_buf, buffer_total_alloc.read());
 
1517
 
1435
1518
  _dout_check_log();
1436
1519
 
1437
1520
  if (got_sigterm) {
1438
1521
    dout(0) << "got SIGTERM, shutting down" << dendl;
1439
 
    messenger->send_message(new MGenericMessage(CEPH_MSG_SHUTDOWN),
1440
 
                            messenger->get_myinst());
 
1522
    cluster_messenger->send_message(new MGenericMessage(CEPH_MSG_SHUTDOWN),
 
1523
                            cluster_messenger->get_myinst());
1441
1524
    return;
1442
1525
  }
1443
1526
 
1446
1529
  
1447
1530
  map_lock.get_read();
1448
1531
 
 
1532
  if (scrub_should_schedule()) {
 
1533
    sched_scrub();
 
1534
  }
 
1535
 
1449
1536
  heartbeat_lock.Lock();
1450
1537
  heartbeat_check();
1451
1538
  heartbeat_lock.Unlock();
1467
1554
         q++) {
1468
1555
      if (osdmap->is_up(q->first)) {
1469
1556
        MOSDPGRemove *m = new MOSDPGRemove(p->first, q->second);
1470
 
        messenger->send_message(m, osdmap->get_inst(q->first));
 
1557
        cluster_messenger->send_message(m, osdmap->get_cluster_inst(q->first));
1471
1558
      }
1472
1559
    }
1473
1560
  remove_list.clear();
1521
1608
void OSD::send_boot()
1522
1609
{
1523
1610
  dout(10) << "send_boot" << dendl;
 
1611
  entity_addr_t cluster_addr = cluster_messenger->get_myaddr();
 
1612
  if (cluster_addr.is_blank_addr()) {
 
1613
    int port = cluster_addr.get_port();
 
1614
    cluster_addr = client_messenger->get_myaddr();
 
1615
    cluster_addr.set_port(port);
 
1616
    dout(10) << " assuming cluster_addr ip matches client_addr" << dendl;
 
1617
  }
1524
1618
  entity_addr_t hb_addr = heartbeat_messenger->get_myaddr();
1525
1619
  if (hb_addr.is_blank_addr()) {
1526
1620
    int port = hb_addr.get_port();
1527
 
    hb_addr = messenger->get_myaddr();
 
1621
    hb_addr = cluster_addr;
1528
1622
    hb_addr.set_port(port);
 
1623
    dout(10) << " assuming hb_addr ip matches cluster_addr" << dendl;
1529
1624
  }
1530
 
  monc->send_mon_message(new MOSDBoot(superblock, hb_addr));
 
1625
  MOSDBoot *mboot = new MOSDBoot(superblock, hb_addr, cluster_addr);
 
1626
  dout(10) << " client_addr " << client_messenger->get_myaddr()
 
1627
           << ", cluster_addr " << cluster_addr
 
1628
           << ", hb addr " << hb_addr
 
1629
           << dendl;
 
1630
  monc->send_mon_message(mboot);
1531
1631
}
1532
1632
 
1533
1633
void OSD::queue_want_up_thru(epoch_t want)
1579
1679
 
1580
1680
void OSD::send_failures()
1581
1681
{
 
1682
  bool locked = false;
 
1683
  if (!failure_queue.empty()) {
 
1684
    heartbeat_lock.Lock();
 
1685
    locked = true;
 
1686
  }
1582
1687
  while (!failure_queue.empty()) {
1583
1688
    int osd = *failure_queue.begin();
1584
1689
    monc->send_mon_message(new MOSDFailure(monc->get_fsid(), osdmap->get_inst(osd), osdmap->get_epoch()));
1585
1690
    failure_queue.erase(osd);
 
1691
    failure_pending.insert(osd);
1586
1692
  }
 
1693
  if (locked) heartbeat_lock.Unlock();
 
1694
}
 
1695
 
 
1696
void OSD::send_still_alive(int osd)
 
1697
{
 
1698
  MOSDFailure *m = new MOSDFailure(monc->get_fsid(), osdmap->get_inst(osd),
 
1699
                                   osdmap->get_epoch());
 
1700
  m->is_failed = false;
 
1701
  monc->send_mon_message(m);
1587
1702
}
1588
1703
 
1589
1704
void OSD::send_pg_stats()
1601
1716
  if (osd_stat_updated || !pg_stat_queue.empty()) {
1602
1717
    osd_stat_updated = false;
1603
1718
    
1604
 
    dout(1) << "send_pg_stats - " << pg_stat_queue.size() << " pgs updated" << dendl;
 
1719
    dout(10) << "send_pg_stats - " << pg_stat_queue.size() << " pgs updated" << dendl;
1605
1720
    
1606
1721
    utime_t had_for = g_clock.now();
1607
1722
    had_for -= had_map_since;
1704
1819
    utime_t start = g_clock.now();
1705
1820
    for (uint64_t pos = 0; pos < count; pos += bsize) {
1706
1821
      char nm[30];
1707
 
      sprintf(nm, "disk_bw_test_%lld", (long long)pos);
 
1822
      snprintf(nm, sizeof(nm), "disk_bw_test_%lld", (long long)pos);
1708
1823
      object_t oid(nm);
1709
1824
      sobject_t soid(oid, 0);
1710
1825
      ObjectStore::Transaction *t = new ObjectStore::Transaction;
1711
 
      t->write(meta_coll, soid, 0, bsize, bl);
 
1826
      t->write(coll_t::META_COLL, soid, 0, bsize, bl);
1712
1827
      store->queue_transaction(NULL, t);
1713
 
      cleanupt->remove(meta_coll, soid);
 
1828
      cleanupt->remove(coll_t::META_COLL, soid);
1714
1829
    }
1715
1830
    store->sync_and_flush();
1716
1831
    utime_t end = g_clock.now();
1729
1844
    logger_reset_all();
1730
1845
  } else if (m->cmd.size() == 2 && m->cmd[0] == "logger" && m->cmd[1] == "reopen") {
1731
1846
    logger_reopen_all();
1732
 
  } else
1733
 
    dout(0) << "unrecognized command! " << m->cmd << dendl;
 
1847
  } else if (m->cmd.size() == 1 && m->cmd[0] == "heapdump") {
 
1848
    stringstream ss;
 
1849
    if (g_conf.tcmalloc_have) {
 
1850
      if (!g_conf.profiler_running()) {
 
1851
        ss << "can't dump heap: profiler not running";
 
1852
      } else {
 
1853
        ss << g_conf.name << "dumping heap profile now";
 
1854
        g_conf.profiler_dump("admin request");
 
1855
      }
 
1856
    } else {
 
1857
      ss << g_conf.name << " does not have tcmalloc, can't use profiler";
 
1858
    }
 
1859
    logclient.log(LOG_INFO, ss);
 
1860
  } else if (m->cmd.size() == 1 && m->cmd[0] == "enable_profiler_options") {
 
1861
    char val[sizeof(int)*8+1];
 
1862
    snprintf(val, sizeof(val), "%i", g_conf.profiler_allocation_interval);
 
1863
    setenv("HEAP_PROFILE_ALLOCATION_INTERVAL", val, g_conf.profiler_allocation_interval);
 
1864
    snprintf(val, sizeof(val), "%i", g_conf.profiler_highwater_interval);
 
1865
    setenv("HEAP_PROFILE_INUSE_INTERVAL", val, g_conf.profiler_highwater_interval);
 
1866
    stringstream ss;
 
1867
    ss << g_conf.name << " set heap variables from current config";
 
1868
    logclient.log(LOG_INFO, ss);
 
1869
  } else if (m->cmd.size() == 1 && m->cmd[0] == "start_profiler") {
 
1870
    char location[PATH_MAX];
 
1871
    snprintf(location, sizeof(location),
 
1872
             "%s/%s", g_conf.log_dir, g_conf.name);
 
1873
    g_conf.profiler_start(location);
 
1874
    stringstream ss;
 
1875
    ss << g_conf.name << " started profiler with output " << location;
 
1876
    logclient.log(LOG_INFO, ss);
 
1877
  } else if (m->cmd.size() == 1 && m->cmd[0] == "stop_profiler") {
 
1878
    g_conf.profiler_stop();
 
1879
    stringstream ss;
 
1880
    ss << g_conf.name << " stopped profiler";
 
1881
    logclient.log(LOG_INFO, ss);
 
1882
  }
 
1883
  else if (m->cmd.size() > 1 && m->cmd[0] == "debug") {
 
1884
    if (m->cmd.size() == 3 && m->cmd[1] == "dump_missing") {
 
1885
      const string &file_name(m->cmd[2]);
 
1886
      std::ofstream fout(file_name.c_str());
 
1887
      if (!fout.is_open()) {
 
1888
        stringstream ss;
 
1889
        ss << "failed to open file '" << file_name << "'";
 
1890
        logclient.log(LOG_INFO, ss);
 
1891
        goto done;
 
1892
      }
 
1893
 
 
1894
      std::set <pg_t> keys;
 
1895
      for (hash_map<pg_t, PG*>::const_iterator pg_map_e = pg_map.begin();
 
1896
           pg_map_e != pg_map.end(); ++pg_map_e) {
 
1897
        keys.insert(pg_map_e->first);
 
1898
      }
 
1899
 
 
1900
      fout << "*** osd " << whoami << ": dump_missing ***" << std::endl;
 
1901
      for (std::set <pg_t>::iterator p = keys.begin();
 
1902
           p != keys.end(); ++p) {
 
1903
        hash_map<pg_t, PG*>::iterator q = pg_map.find(*p);
 
1904
        assert(q != pg_map.end());
 
1905
        PG *pg = q->second;
 
1906
        pg->lock();
 
1907
 
 
1908
        fout << *pg << std::endl;
 
1909
        std::map<sobject_t, PG::Missing::item>::iterator mend = pg->missing.missing.end();
 
1910
        std::map<sobject_t, PG::Missing::item>::iterator m = pg->missing.missing.begin();
 
1911
        for (; m != mend; ++m) {
 
1912
          fout << m->first << " -> " << m->second << std::endl;
 
1913
          map<sobject_t, set<int> >::const_iterator mli =
 
1914
            pg->missing_loc.find(m->first);
 
1915
          if (mli == pg->missing_loc.end())
 
1916
            continue;
 
1917
          const set<int> &mls(mli->second);
 
1918
          if (mls.empty())
 
1919
            continue;
 
1920
          fout << "missing_loc: " << mls << std::endl;
 
1921
        }
 
1922
        pg->unlock();
 
1923
        fout << std::endl;
 
1924
      }
 
1925
 
 
1926
      fout.close();
 
1927
    }
 
1928
    else if (m->cmd.size() == 3 && m->cmd[1] == "kick_recovery_wq") {
 
1929
      g_conf.osd_recovery_delay_start = atoi(m->cmd[2].c_str());
 
1930
      stringstream ss;
 
1931
      ss << "kicking recovery queue. set osd_recovery_delay_start to "
 
1932
         << g_conf.osd_recovery_delay_start;
 
1933
      logclient.log(LOG_INFO, ss);
 
1934
 
 
1935
      defer_recovery_until = g_clock.now();
 
1936
      defer_recovery_until += g_conf.osd_recovery_delay_start;
 
1937
      recovery_wq._kick();
 
1938
    }
 
1939
  }
 
1940
  else dout(0) << "unrecognized command! " << m->cmd << dendl;
 
1941
 
 
1942
done:
1734
1943
  m->put();
1735
1944
}
1736
1945
 
1747
1956
  dout(20) << "_share_map_incoming " << inst << " " << epoch << dendl;
1748
1957
  //assert(osd_lock.is_locked());
1749
1958
 
 
1959
  assert(!is_booting());
 
1960
 
1750
1961
  // does client have old map?
1751
1962
  if (inst.name.is_client()) {
1752
1963
    bool sendmap = epoch < osdmap->get_epoch();
1769
1980
  // does peer have old map?
1770
1981
  if (inst.name.is_osd() &&
1771
1982
      osdmap->is_up(inst.name.num()) &&
1772
 
      (osdmap->get_inst(inst.name.num()) == inst ||
 
1983
      (osdmap->get_cluster_inst(inst.name.num()) == inst ||
1773
1984
       osdmap->get_hb_inst(inst.name.num()) == inst)) {
1774
1985
    // remember
1775
1986
    if (peer_map_epoch[inst.name] < epoch) {
1781
1992
    if (peer_map_epoch[inst.name] < osdmap->get_epoch()) {
1782
1993
      dout(10) << inst.name << " has old map " << epoch << " < " << osdmap->get_epoch() << dendl;
1783
1994
      peer_map_epoch[inst.name] = osdmap->get_epoch();  // so we don't send it again.
1784
 
      send_incremental_map(epoch, osdmap->get_inst(inst.name.num()));
 
1995
      send_incremental_map(epoch, osdmap->get_cluster_inst(inst.name.num()));
1785
1996
      shared = true;
1786
1997
    }
1787
1998
  }
1837
2048
{
1838
2049
  // lock!
1839
2050
  osd_lock.Lock();
1840
 
  dispatch_running = true;
 
2051
  ++dispatch_running;
1841
2052
  _dispatch(m);
1842
 
  dispatch_running = false;
 
2053
  --dispatch_running;
1843
2054
  do_waiters();
1844
2055
  osd_lock.Unlock();
1845
2056
  return true;
1935
2146
  assert(osd_lock.is_locked());
1936
2147
  dout(20) << "_dispatch " << m << " " << *m << dendl;
1937
2148
  Session *session = NULL;
 
2149
 
 
2150
  if (map_in_progress_cond) { //can't dispatch while map is being updated!
 
2151
    if (map_in_progress) {
 
2152
      dout(25) << "waiting for handle_osd_map to complete before dispatching" << dendl;
 
2153
      while (map_in_progress)
 
2154
        map_in_progress_cond->Wait(osd_lock);
 
2155
    }
 
2156
  }
 
2157
 
 
2158
  logger->set(l_osd_buf, buffer_total_alloc.read());
 
2159
 
1938
2160
  switch (m->get_type()) {
1939
2161
 
1940
2162
    // -- don't need lock -- 
2015
2237
      case MSG_OSD_PG_TRIM:
2016
2238
        handle_pg_trim((MOSDPGTrim*)m);
2017
2239
        break;
 
2240
      case MSG_OSD_PG_MISSING:
 
2241
        handle_pg_missing((MOSDPGMissing*)m);
 
2242
        break;
2018
2243
 
2019
2244
        // client ops
2020
2245
      case CEPH_MSG_OSD_OP:
2028
2253
      case MSG_OSD_SUBOPREPLY:
2029
2254
        handle_sub_op_reply((MOSDSubOpReply*)m);
2030
2255
        break;
2031
 
        
2032
2256
      }
2033
2257
    }
2034
2258
  }
 
2259
 
 
2260
  logger->set(l_osd_buf, buffer_total_alloc.read());
 
2261
 
2035
2262
}
2036
2263
 
2037
2264
 
2080
2307
  m->put();
2081
2308
}
2082
2309
 
2083
 
 
 
2310
bool OSD::scrub_should_schedule()
 
2311
{
 
2312
  double loadavgs[1];
 
2313
  if (getloadavg(loadavgs, 1) != 1) {
 
2314
    dout(10) << "scrub_should_schedule couldn't read loadavgs\n" << dendl;
 
2315
    return false;
 
2316
  }
 
2317
 
 
2318
  if (loadavgs[0] >= g_conf.osd_scrub_load_threshold) {
 
2319
    dout(20) << "scrub_should_schedule loadavg " << loadavgs[0]
 
2320
             << " >= max " << g_conf.osd_scrub_load_threshold
 
2321
             << " = no, load too high" << dendl;
 
2322
    return false;
 
2323
  }
 
2324
 
 
2325
  bool coin_flip = (rand() % 3) == whoami % 3;
 
2326
  if (!coin_flip) {
 
2327
    dout(20) << "scrub_should_schedule loadavg " << loadavgs[0]
 
2328
             << " < max " << g_conf.osd_scrub_load_threshold
 
2329
             << " = no, randomly backing off"
 
2330
             << dendl;
 
2331
    return false;
 
2332
  }
 
2333
  
 
2334
  dout(20) << "scrub_should_schedule loadavg " << loadavgs[0]
 
2335
           << " < max " << g_conf.osd_scrub_load_threshold
 
2336
           << " = yes" << dendl;
 
2337
  return loadavgs[0] < g_conf.osd_scrub_load_threshold;
 
2338
}
 
2339
 
 
2340
void OSD::sched_scrub()
 
2341
{
 
2342
  assert(osd_lock.is_locked());
 
2343
 
 
2344
  dout(20) << "sched_scrub" << dendl;
 
2345
 
 
2346
  pair<utime_t,pg_t> pos;
 
2347
  utime_t max = g_clock.now();
 
2348
  max -= g_conf.osd_scrub_max_interval;
 
2349
  
 
2350
  sched_scrub_lock.Lock();
 
2351
 
 
2352
  //dout(20) << " " << last_scrub_pg << dendl;
 
2353
 
 
2354
  set< pair<utime_t,pg_t> >::iterator p = last_scrub_pg.begin();
 
2355
  while (p != last_scrub_pg.end()) {
 
2356
    //dout(10) << "pos is " << *p << dendl;
 
2357
    pos = *p;
 
2358
    utime_t t = pos.first;
 
2359
    pg_t pgid = pos.second;
 
2360
 
 
2361
    if (t > max) {
 
2362
      dout(10) << " " << pgid << " at " << t
 
2363
               << " > " << max << " (" << g_conf.osd_scrub_max_interval << " seconds ago)" << dendl;
 
2364
      break;
 
2365
    }
 
2366
 
 
2367
    dout(10) << " on " << t << " " << pgid << dendl;
 
2368
    sched_scrub_lock.Unlock();
 
2369
    PG *pg = _lookup_lock_pg(pgid);
 
2370
    if (pg) {
 
2371
      if (pg->is_active() && !pg->sched_scrub()) {
 
2372
        pg->unlock();
 
2373
        sched_scrub_lock.Lock();
 
2374
        break;
 
2375
      }
 
2376
      pg->unlock();
 
2377
    }
 
2378
    sched_scrub_lock.Lock();
 
2379
 
 
2380
    // next!
 
2381
    p = last_scrub_pg.lower_bound(pos);
 
2382
    //dout(10) << "lb is " << *p << dendl;
 
2383
    if (p != last_scrub_pg.end())
 
2384
      p++;
 
2385
  }    
 
2386
  sched_scrub_lock.Unlock();
 
2387
 
 
2388
  dout(20) << "sched_scrub done" << dendl;
 
2389
}
 
2390
 
 
2391
bool OSD::inc_scrubs_pending()
 
2392
{
 
2393
  bool result = false;
 
2394
 
 
2395
  sched_scrub_lock.Lock();
 
2396
  if (scrubs_pending + scrubs_active < g_conf.osd_max_scrubs) {
 
2397
    dout(20) << "inc_scrubs_pending " << scrubs_pending << " -> " << (scrubs_pending+1)
 
2398
             << " (max " << g_conf.osd_max_scrubs << ", active " << scrubs_active << ")" << dendl;
 
2399
    result = true;
 
2400
    ++scrubs_pending;
 
2401
  } else {
 
2402
    dout(20) << "inc_scrubs_pending " << scrubs_pending << " + " << scrubs_active << " active >= max " << g_conf.osd_max_scrubs << dendl;
 
2403
  }
 
2404
  sched_scrub_lock.Unlock();
 
2405
 
 
2406
  return result;
 
2407
}
 
2408
 
 
2409
void OSD::dec_scrubs_pending()
 
2410
{
 
2411
  sched_scrub_lock.Lock();
 
2412
  dout(20) << "dec_scrubs_pending " << scrubs_pending << " -> " << (scrubs_pending-1)
 
2413
           << " (max " << g_conf.osd_max_scrubs << ", active " << scrubs_active << ")" << dendl;
 
2414
  --scrubs_pending;
 
2415
  assert(scrubs_pending >= 0);
 
2416
  sched_scrub_lock.Unlock();
 
2417
}
 
2418
 
 
2419
void OSD::dec_scrubs_active()
 
2420
{
 
2421
  sched_scrub_lock.Lock();
 
2422
  dout(20) << "dec_scrubs_active " << scrubs_active << " -> " << (scrubs_active-1)
 
2423
           << " (max " << g_conf.osd_max_scrubs << ", pending " << scrubs_pending << ")" << dendl;
 
2424
  --scrubs_active;
 
2425
  sched_scrub_lock.Unlock();
 
2426
}
2084
2427
 
2085
2428
// =====================================================
2086
2429
// MAP
2103
2446
 
2104
2447
void OSD::note_down_osd(int osd)
2105
2448
{
2106
 
  messenger->mark_down(osdmap->get_addr(osd));
 
2449
  cluster_messenger->mark_down(osdmap->get_cluster_addr(osd));
2107
2450
 
2108
2451
  heartbeat_lock.Lock();
2109
2452
 
2110
 
  heartbeat_messenger->mark_down(osdmap->get_hb_addr(osd));
2111
 
 
2112
 
  if (heartbeat_inst.count(osd)) {
2113
 
    if (heartbeat_inst[osd] == osdmap->get_hb_inst(osd)) {
2114
 
      dout(10) << "note_down_osd removing heartbeat_inst " << heartbeat_inst[osd] << dendl;
2115
 
      heartbeat_inst.erase(osd);
2116
 
    } else {
2117
 
      dout(10) << "note_down_osd leaving heartbeat_inst " << heartbeat_inst[osd]
2118
 
               << " != " << osdmap->get_hb_inst(osd) << dendl;
2119
 
    }
2120
 
  } else
2121
 
    dout(10) << "note_down_osd no heartbeat_inst for osd" << osd << dendl;
 
2453
  // note: update_heartbeat_peers will mark down the heartbeat connection.
2122
2454
 
2123
2455
  peer_map_epoch.erase(entity_name_t::OSD(osd));
2124
2456
  failure_queue.erase(osd);
2125
2457
  failure_pending.erase(osd);
2126
 
  heartbeat_from_stamp.erase(osd);
2127
2458
 
2128
2459
  heartbeat_lock.Unlock();
2129
2460
}
2164
2495
    osdmap = new OSDMap;
2165
2496
  }
2166
2497
 
2167
 
  state = STATE_ACTIVE;
 
2498
 
 
2499
  // make sure there is something new, here, before we bother flushing the queues and such
 
2500
  if (m->get_last() <= osdmap->get_epoch()) {
 
2501
    dout(10) << " no new maps here, dropping" << dendl;
 
2502
    m->put();
 
2503
    return;
 
2504
  }
2168
2505
 
2169
2506
  // pause, requeue op queue
2170
2507
  //wait_for_no_ops();
2171
 
  
 
2508
 
 
2509
  if (map_in_progress_cond) {
 
2510
    if (map_in_progress) {
 
2511
      dout(15) << "waiting for prior handle_osd_map to complete" << dendl;
 
2512
      while (map_in_progress) {
 
2513
        map_in_progress_cond->Wait(osd_lock);
 
2514
      }
 
2515
    }
 
2516
    dout(10) << "locking handle_osd_map permissions" << dendl;
 
2517
    map_in_progress = true;
 
2518
  }
 
2519
 
2172
2520
  osd_lock.Unlock();
 
2521
 
2173
2522
  op_tp.pause();
2174
2523
  op_wq.lock();
2175
2524
  list<Message*> rq;
2179
2528
    pending_ops--;
2180
2529
    logger->set(l_osd_opq, pending_ops);
2181
2530
 
2182
 
    Message *m = pg->op_queue.back();
 
2531
    Message *mess = pg->op_queue.back();
2183
2532
    pg->op_queue.pop_back();
2184
2533
    pg->put();
2185
 
    dout(15) << " will requeue " << *m << dendl;
2186
 
    rq.push_front(m);
 
2534
    dout(15) << " will requeue " << *mess << dendl;
 
2535
    rq.push_front(mess);
2187
2536
  }
2188
2537
  op_wq.unlock();
2189
2538
  push_waiters(rq);
2196
2545
  store->flush();
2197
2546
  osd_lock.Lock();
2198
2547
 
2199
 
  map_lock.get_write();
2200
 
 
2201
2548
  assert(osd_lock.is_locked());
2202
2549
 
2203
2550
  ObjectStore::Transaction t;
2210
2557
       p != m->maps.end();
2211
2558
       p++) {
2212
2559
    sobject_t poid = get_osdmap_pobject_name(p->first);
2213
 
    if (store->exists(meta_coll, poid)) {
 
2560
    if (store->exists(coll_t::META_COLL, poid)) {
2214
2561
      dout(10) << "handle_osd_map already had full map epoch " << p->first << dendl;
2215
2562
      logger->inc(l_osd_mapfdup);
2216
2563
      bufferlist bl;
2221
2568
 
2222
2569
    dout(10) << "handle_osd_map got full map epoch " << p->first << dendl;
2223
2570
    ObjectStore::Transaction *ft = new ObjectStore::Transaction;
2224
 
    ft->write(meta_coll, poid, 0, p->second.length(), p->second);  // store _outside_ transaction; activate_map reads it.
 
2571
    ft->write(coll_t::META_COLL, poid, 0, p->second.length(), p->second);  // store _outside_ transaction; activate_map reads it.
2225
2572
    int r = store->queue_transaction(NULL, ft);
2226
2573
    assert(r == 0);
2227
2574
 
2237
2584
       p != m->incremental_maps.end();
2238
2585
       p++) {
2239
2586
    sobject_t poid = get_inc_osdmap_pobject_name(p->first);
2240
 
    if (store->exists(meta_coll, poid)) {
 
2587
    if (store->exists(coll_t::META_COLL, poid)) {
2241
2588
      dout(10) << "handle_osd_map already had incremental map epoch " << p->first << dendl;
2242
2589
      logger->inc(l_osd_mapidup);
2243
2590
      bufferlist bl;
2248
2595
 
2249
2596
    dout(10) << "handle_osd_map got incremental map epoch " << p->first << dendl;
2250
2597
    ObjectStore::Transaction *ft = new ObjectStore::Transaction;
2251
 
    ft->write(meta_coll, poid, 0, p->second.length(), p->second);  // store _outside_ transaction; activate_map reads it.
 
2598
    ft->write(coll_t::META_COLL, poid, 0, p->second.length(), p->second);  // store _outside_ transaction; activate_map reads it.
2252
2599
    int r = store->queue_transaction(NULL, ft);
2253
2600
    assert(r == 0);
2254
2601
 
2262
2609
  }
2263
2610
 
2264
2611
  // flush new maps (so they are readable)
 
2612
  osd_lock.Unlock();
2265
2613
  store->flush();
 
2614
  osd_lock.Lock();
 
2615
 
 
2616
  // finally, take map_lock _after_ we do this flush, to avoid deadlock
 
2617
  map_lock.get_write();
2266
2618
 
2267
2619
  // advance if we can
2268
2620
  bool advanced = false;
2274
2626
    OSDMap::Incremental inc;
2275
2627
 
2276
2628
    if (m->incremental_maps.count(cur+1) ||
2277
 
        store->exists(meta_coll, get_inc_osdmap_pobject_name(cur+1))) {
 
2629
        store->exists(coll_t::META_COLL, get_inc_osdmap_pobject_name(cur+1))) {
2278
2630
      dout(10) << "handle_osd_map decoding inc map epoch " << cur+1 << dendl;
2279
2631
      
2280
2632
      bufferlist bl;
2300
2652
      bl.clear();
2301
2653
      osdmap->encode(bl);
2302
2654
      ObjectStore::Transaction ft;
2303
 
      ft.write(meta_coll, get_osdmap_pobject_name(cur+1), 0, bl.length(), bl);
 
2655
      ft.write(coll_t::META_COLL, get_osdmap_pobject_name(cur+1), 0, bl.length(), bl);
2304
2656
      int r = store->apply_transaction(ft);
2305
2657
      assert(r == 0);
2306
2658
 
2312
2664
        if (osd == whoami) continue;
2313
2665
        note_down_osd(i->first);
2314
2666
      }
2315
 
      for (map<int32_t,entity_addr_t>::iterator i = inc.new_up.begin();
2316
 
           i != inc.new_up.end();
 
2667
      for (map<int32_t,entity_addr_t>::iterator i = inc.new_up_client.begin();
 
2668
           i != inc.new_up_client.end();
2317
2669
           i++) {
2318
2670
        if (i->first == whoami) continue;
2319
2671
        note_up_osd(i->first);
2320
2672
      }
2321
2673
    }
2322
2674
    else if (m->maps.count(cur+1) ||
2323
 
             store->exists(meta_coll, get_osdmap_pobject_name(cur+1))) {
 
2675
             store->exists(coll_t::META_COLL, get_osdmap_pobject_name(cur+1))) {
2324
2676
      dout(10) << "handle_osd_map decoding full map epoch " << cur+1 << dendl;
2325
2677
      bufferlist bl;
2326
2678
      if (m->maps.count(cur+1))
2361
2713
  // all the way?
2362
2714
  if (advanced && cur == superblock.newest_map) {
2363
2715
    if (osdmap->is_up(whoami) &&
2364
 
        osdmap->get_addr(whoami) == messenger->get_myaddr()) {
 
2716
        osdmap->get_addr(whoami) == client_messenger->get_myaddr()) {
 
2717
 
 
2718
      if (is_booting()) {
 
2719
        dout(1) << "state: booting -> active" << dendl;
 
2720
        state = STATE_ACTIVE;
 
2721
      }
 
2722
      
2365
2723
      // yay!
2366
2724
      activate_map(t, fin->contexts);
2367
2725
 
2381
2739
      pg->write_log(t);
2382
2740
  }
2383
2741
 
 
2742
  bool do_shutdown = false;
2384
2743
  if (osdmap->get_epoch() > 0 &&
2385
 
      state != STATE_BOOTING &&
2386
 
      (!osdmap->exists(whoami) || 
2387
 
       (!osdmap->is_up(whoami) && osdmap->get_addr(whoami) == messenger->get_myaddr()))) {
2388
 
    dout(0) << "map says i am down.  switching to boot state." << dendl;
2389
 
    //shutdown();
2390
 
 
2391
 
    stringstream ss;
2392
 
    ss << "map e" << osdmap->get_epoch() << " wrongly marked me down";
2393
 
    logclient.log(LOG_WARN, ss);
2394
 
 
2395
 
    state = STATE_BOOTING;
2396
 
    up_epoch = 0;
2397
 
 
2398
 
    reset_heartbeat_peers();
 
2744
      state == STATE_ACTIVE) {
 
2745
    if (!osdmap->exists(whoami)) {
 
2746
      dout(0) << "map says i do not exist.  shutting down." << dendl;
 
2747
      do_shutdown = true;   // don't call shutdown() while we have everything paused
 
2748
    } else if (!osdmap->is_up(whoami) ||
 
2749
               osdmap->get_addr(whoami) != client_messenger->get_myaddr()) {
 
2750
      stringstream ss;
 
2751
      ss << "map e" << osdmap->get_epoch() << " wrongly marked me down";
 
2752
      logclient.log(LOG_WARN, ss);
 
2753
      
 
2754
      state = STATE_BOOTING;
 
2755
      up_epoch = 0;
 
2756
 
 
2757
      int cport = cluster_messenger->get_myaddr().get_port();
 
2758
      int hbport = heartbeat_messenger->get_myaddr().get_port();
 
2759
 
 
2760
      int r = cluster_messenger->rebind(hbport);
 
2761
      if (r != 0)
 
2762
        do_shutdown = true;  // FIXME: do_restart?
 
2763
 
 
2764
      r = heartbeat_messenger->rebind(cport);
 
2765
      if (r != 0)
 
2766
        do_shutdown = true;  // FIXME: do_restart?
 
2767
 
 
2768
      reset_heartbeat_peers();
 
2769
    }
2399
2770
  }
2400
2771
 
2401
2772
  // note in the superblock that we were clean thru the prior epoch
2411
2782
    map_lock.put_write();
2412
2783
    char buf[80];
2413
2784
    dout(0) << "error writing map: " << r << " " << strerror_r(-r, buf, sizeof(buf)) << dendl;
 
2785
    m->put();
2414
2786
    shutdown();
2415
2787
    return;
2416
2788
  }
2433
2805
 
2434
2806
  if (is_booting())
2435
2807
    send_boot();
 
2808
  if (do_shutdown)
 
2809
    shutdown();
 
2810
 
 
2811
  if (map_in_progress_cond) {
 
2812
    map_in_progress = false;
 
2813
    dout(15) << "unlocking map_in_progress" << dendl;
 
2814
    map_in_progress_cond->Signal();
 
2815
  }
2436
2816
}
2437
2817
 
2438
2818
 
2450
2830
 
2451
2831
  if (!up_epoch &&
2452
2832
      osdmap->is_up(whoami) &&
2453
 
      osdmap->get_inst(whoami) == messenger->get_myinst()) {
 
2833
      osdmap->get_inst(whoami) == client_messenger->get_myinst()) {
2454
2834
    up_epoch = osdmap->get_epoch();
2455
2835
    dout(10) << "up_epoch is " << up_epoch << dendl;
2456
2836
    if (!boot_epoch) {
2616
2996
      if (osdmap->is_down(oldacting[i]))
2617
2997
        pg->on_osd_failure(oldacting[i]);
2618
2998
    pg->on_change();
 
2999
 
 
3000
    if (pg->deleting) {
 
3001
      dout(10) << *pg << " canceling deletion!" << dendl;
 
3002
      pg->deleting = false;
 
3003
      remove_wq.dequeue(pg);
 
3004
    }
2619
3005
    
2620
3006
    if (role != oldrole) {
2621
 
      // old stray?
2622
 
      if (oldrole < 0 && pg->deleting) {
2623
 
        dout(10) << *pg << " canceling deletion!" << dendl;
2624
 
        pg->deleting = false;
2625
 
      }
2626
 
 
2627
3007
      // old primary?
2628
3008
      if (oldrole == 0) {
2629
3009
        pg->state_clear(PG_STATE_CLEAN);
2721
3101
    if (g_conf.osd_check_for_log_corruption)
2722
3102
      pg->check_log_for_corruption(store);
2723
3103
 
 
3104
    if (pg->is_active() && pg->is_primary() &&
 
3105
        (pg->missing.num_missing() > pg->missing_loc.size())) {
 
3106
      if (pg->all_unfound_are_lost(osdmap)) {
 
3107
        pg->mark_all_unfound_as_lost(t);
 
3108
      }
 
3109
    }
 
3110
 
2724
3111
    if (!osdmap->have_pg_pool(pg->info.pgid.pool())) {
2725
3112
      //pool is deleted!
2726
3113
      queue_pg_for_deletion(pg);
2730
3117
    if (pg->is_active()) {
2731
3118
      // i am active
2732
3119
      if (pg->is_primary() &&
2733
 
          !pg->snap_trimq.empty())
 
3120
          !pg->snap_trimq.empty() &&
 
3121
          pg->is_clean())
2734
3122
        pg->queue_snap_trim();
2735
3123
    }
2736
3124
    else if (pg->is_primary() &&
2738
3126
      // i am (inactive) primary
2739
3127
      if (!pg->is_peering() || 
2740
3128
          (pg->need_up_thru && up_thru >= pg->info.history.same_acting_since))
2741
 
        pg->peer(t, tfin, query_map, &info_map);
 
3129
        pg->do_peer(t, tfin, query_map, &info_map);
2742
3130
    }
2743
3131
    else if (pg->is_stray() &&
2744
3132
             pg->get_primary() >= 0) {
2790
3178
      assert(0);  // we should have all maps.
2791
3179
    }
2792
3180
  }
2793
 
 
 
3181
  Messenger *msgr = client_messenger;
 
3182
  if (entity_name_t::TYPE_OSD == inst.name._type)
 
3183
    msgr = cluster_messenger;
2794
3184
  if (lazy)
2795
 
    messenger->lazy_send_message(m, inst);  // only if we already have an open connection
 
3185
    msgr->lazy_send_message(m, inst);  // only if we already have an open connection
2796
3186
  else
2797
 
    messenger->send_message(m, inst);
 
3187
    msgr->send_message(m, inst);
2798
3188
}
2799
3189
 
2800
3190
bool OSD::get_map_bl(epoch_t e, bufferlist& bl)
2801
3191
{
2802
 
  return store->read(meta_coll, get_osdmap_pobject_name(e), 0, 0, bl) >= 0;
 
3192
  return store->read(coll_t::META_COLL, get_osdmap_pobject_name(e), 0, 0, bl) >= 0;
2803
3193
}
2804
3194
 
2805
3195
bool OSD::get_inc_map_bl(epoch_t e, bufferlist& bl)
2806
3196
{
2807
 
  return store->read(meta_coll, get_inc_osdmap_pobject_name(e), 0, 0, bl) >= 0;
 
3197
  return store->read(coll_t::META_COLL, get_inc_osdmap_pobject_name(e), 0, 0, bl) >= 0;
2808
3198
}
2809
3199
 
2810
3200
OSDMap *OSD::get_map(epoch_t epoch)
2938
3328
  if (m->get_source().is_osd()) {
2939
3329
    int from = m->get_source().num();
2940
3330
    if (!osdmap->have_inst(from) ||
2941
 
        osdmap->get_addr(from) != m->get_source_inst().addr) {
 
3331
        osdmap->get_cluster_addr(from) != m->get_source_inst().addr) {
2942
3332
      dout(-7) << "from dead osd" << from << ", dropping, sharing map" << dendl;
2943
3333
      send_incremental_map(epoch, m->get_source_inst(), true);
2944
3334
      m->put();
2946
3336
    }
2947
3337
  }
2948
3338
 
 
3339
  // ok, we have at least as new a map as they do.  are we (re)booting?
 
3340
  if (is_booting()) {
 
3341
    dout(7) << "still in boot state, dropping message " << *m << dendl;
 
3342
    m->put();
 
3343
    return false;
 
3344
  }
 
3345
 
2949
3346
  return true;
2950
3347
}
2951
3348
 
3040
3437
 
3041
3438
      wake_pg_waiters(pg->info.pgid);
3042
3439
 
3043
 
      pg->peer(*t, fin->contexts, query_map, &info_map);
 
3440
      pg->do_peer(*t, fin->contexts, query_map, &info_map);
3044
3441
      pg->update_stats();
3045
3442
      pg->unlock();
3046
3443
      created++;
3067
3464
 
3068
3465
  // split objects
3069
3466
  vector<sobject_t> olist;
3070
 
  store->collection_list(coll_t::build_pg_coll(parent->info.pgid), olist);  
 
3467
  store->collection_list(coll_t(parent->info.pgid), olist);
3071
3468
 
3072
3469
  for (vector<sobject_t>::iterator p = olist.begin(); p != olist.end(); p++) {
3073
3470
    sobject_t poid = *p;
3080
3477
      bufferlist bv;
3081
3478
 
3082
3479
      struct stat st;
3083
 
      store->stat(coll_t::build_pg_coll(parentid), poid, &st);
3084
 
      store->getattr(coll_t::build_pg_coll(parentid), poid, OI_ATTR, bv);
 
3480
      store->stat(coll_t(parentid), poid, &st);
 
3481
      store->getattr(coll_t(parentid), poid, OI_ATTR, bv);
3085
3482
      object_info_t oi(bv);
3086
3483
 
3087
 
      t.collection_add(coll_t::build_pg_coll(pgid), coll_t::build_pg_coll(parentid), poid);
3088
 
      t.collection_remove(coll_t::build_pg_coll(parentid), poid);
 
3484
      t.collection_add(coll_t(pgid), coll_t(parentid), poid);
 
3485
      t.collection_remove(coll_t(parentid), poid);
3089
3486
      if (oi.snaps.size()) {
3090
3487
        snapid_t first = oi.snaps[0];
3091
 
        t.collection_add(coll_t::build_snap_pg_coll(pgid, first), coll_t::build_pg_coll(parentid), poid);
3092
 
        t.collection_remove(coll_t::build_snap_pg_coll(parentid, first), poid);
 
3488
        t.collection_add(coll_t(pgid, first), coll_t(parentid), poid);
 
3489
        t.collection_remove(coll_t(parentid, first), poid);
3093
3490
        if (oi.snaps.size() > 1) {
3094
3491
          snapid_t last = oi.snaps[oi.snaps.size()-1];
3095
 
          t.collection_add(coll_t::build_snap_pg_coll(pgid, last), coll_t::build_pg_coll(parentid), poid);
3096
 
          t.collection_remove(coll_t::build_snap_pg_coll(parentid, last), poid);
 
3492
          t.collection_add(coll_t(pgid, last), coll_t(parentid), poid);
 
3493
          t.collection_remove(coll_t(parentid, last), poid);
3097
3494
        }
3098
3495
      }
3099
3496
 
3256
3653
      creating_pgs.erase(pgid);
3257
3654
 
3258
3655
      wake_pg_waiters(pg->info.pgid);
3259
 
      pg->peer(*t, fin->contexts, query_map, &info_map);
 
3656
      pg->do_peer(*t, fin->contexts, query_map, &info_map);
3260
3657
      pg->update_stats();
3261
3658
 
3262
3659
      int tr = store->queue_transaction(&pg->osr, t, new ObjectStore::C_DeleteTransaction(t), fin);
3296
3693
    }
3297
3694
    dout(7) << "do_notify osd" << it->first << " on " << it->second.size() << " PGs" << dendl;
3298
3695
    MOSDPGNotify *m = new MOSDPGNotify(osdmap->get_epoch(), it->second);
3299
 
    _share_map_outgoing(osdmap->get_inst(it->first));
3300
 
    messenger->send_message(m, osdmap->get_inst(it->first));
 
3696
    _share_map_outgoing(osdmap->get_cluster_inst(it->first));
 
3697
    cluster_messenger->send_message(m, osdmap->get_cluster_inst(it->first));
3301
3698
  }
3302
3699
}
3303
3700
 
3314
3711
    dout(7) << "do_queries querying osd" << who
3315
3712
            << " on " << pit->second.size() << " PGs" << dendl;
3316
3713
    MOSDPGQuery *m = new MOSDPGQuery(osdmap->get_epoch(), pit->second);
3317
 
    _share_map_outgoing(osdmap->get_inst(who));
3318
 
    messenger->send_message(m, osdmap->get_inst(who));
 
3714
    _share_map_outgoing(osdmap->get_cluster_inst(who));
 
3715
    cluster_messenger->send_message(m, osdmap->get_cluster_inst(who));
3319
3716
  }
3320
3717
}
3321
3718
 
3325
3722
  for (map<int,MOSDPGInfo*>::iterator p = info_map.begin();
3326
3723
       p != info_map.end();
3327
3724
       ++p) 
3328
 
    messenger->send_message(p->second, osdmap->get_inst(p->first));
 
3725
    cluster_messenger->send_message(p->second, osdmap->get_cluster_inst(p->first));
3329
3726
  info_map.clear();
3330
3727
}
3331
3728
 
3405
3802
        pg->up.swap(up);
3406
3803
        pg->set_role(role);
3407
3804
        pg->info.history = history;
 
3805
        reg_last_pg_scrub(pg->info.pgid, pg->info.history.last_scrub_stamp);
3408
3806
        pg->clear_primary_state();  // yep, notably, set hml=false
3409
3807
        pg->write_info(*t);
3410
3808
        pg->write_log(*t);
3440
3838
    } else {
3441
3839
      dout(10) << *pg << " got osd" << from << " info " << *it << dendl;
3442
3840
      pg->peer_info[from] = *it;
 
3841
 
 
3842
      unreg_last_pg_scrub(pg->info.pgid, pg->info.history.last_scrub_stamp);
3443
3843
      pg->info.history.merge(it->history);
 
3844
      reg_last_pg_scrub(pg->info.pgid, pg->info.history.last_scrub_stamp);
3444
3845
 
3445
3846
      // stray?
3446
3847
      if (!pg->is_acting(from)) {
3449
3850
        pg->state_clear(PG_STATE_CLEAN);
3450
3851
      }
3451
3852
      
3452
 
      pg->peer(*t, fin->contexts, query_map, &info_map);
 
3853
      pg->do_peer(*t, fin->contexts, query_map, &info_map);
3453
3854
      pg->update_stats();
3454
3855
    }
 
3856
 
 
3857
    if (pg->is_active() && pg->have_unfound()) {
 
3858
      // Make sure we've requested MISSING information from every OSD
 
3859
      // we know about.
 
3860
      map< int, map<pg_t,PG::Query> > query_map;
 
3861
      pg->discover_all_missing(query_map);
 
3862
      do_queries(query_map);
 
3863
    }
 
3864
 
3455
3865
    int tr = store->queue_transaction(&pg->osr, t, new ObjectStore::C_DeleteTransaction(t), fin);
3456
3866
    assert(tr == 0);
3457
3867
    pg->unlock();
3481
3891
void OSD::_process_pg_info(epoch_t epoch, int from,
3482
3892
                           PG::Info &info, 
3483
3893
                           PG::Log &log, 
3484
 
                           PG::Missing &missing,
 
3894
                           PG::Missing *missing,
3485
3895
                           map<int, MOSDPGInfo*>* info_map,
3486
3896
                           int& created)
3487
3897
{
3509
3919
    pg->up.swap(up);
3510
3920
    pg->set_role(role);
3511
3921
    pg->info.history = info.history;
 
3922
    reg_last_pg_scrub(pg->info.pgid, pg->info.history.last_scrub_stamp);
3512
3923
    pg->write_info(*t);
3513
3924
    pg->write_log(*t);
3514
3925
    created++;
3515
3926
  } else {
3516
3927
    pg = _lookup_lock_pg(info.pgid);
3517
3928
    if (epoch < pg->info.history.same_acting_since) {
 
3929
      // The peering stuff resets when the acting set changes, so ignore any messges sent
 
3930
      // before that.
3518
3931
      dout(10) << *pg << " got old info " << info << ", ignoring" << dendl;
3519
3932
      pg->unlock();
3520
3933
      return;
3522
3935
  }
3523
3936
  assert(pg);
3524
3937
 
3525
 
  dout(10) << *pg << " got " << info << " " << log << " " << missing << dendl;
 
3938
  dout(10) << *pg << ": " << __func__ << " info: " << info << ", ";
 
3939
  if (log.empty())
 
3940
    *_dout << "(log omitted)";
 
3941
  else
 
3942
    *_dout << "log: " << log;
 
3943
  *_dout << ", ";
 
3944
  if (!missing)
 
3945
    *_dout << "(missing omitted)";
 
3946
  else
 
3947
    *_dout << "missing: " << *missing;
 
3948
  *_dout << dendl;
 
3949
 
 
3950
  unreg_last_pg_scrub(pg->info.pgid, pg->info.history.last_scrub_stamp);
3526
3951
  pg->info.history.merge(info.history);
 
3952
  reg_last_pg_scrub(pg->info.pgid, pg->info.history.last_scrub_stamp);
3527
3953
 
3528
3954
  // dump log
3529
3955
  dout(15) << *pg << " my log = ";
3530
3956
  pg->log.print(*_dout);
3531
 
  *_dout << dendl;
3532
 
  dout(15) << *pg << " osd" << from << " log = ";
3533
 
  log.print(*_dout);
 
3957
  *_dout << std::endl;
 
3958
  if (!log.empty()) {
 
3959
    *_dout << *pg << " osd" << from << " log = ";
 
3960
    log.print(*_dout);
 
3961
  }
3534
3962
  *_dout << dendl;
3535
3963
 
3536
3964
  if (pg->is_primary()) {
3537
3965
    // i am PRIMARY
3538
 
    if (pg->peer_log_requested.count(from) ||
3539
 
        pg->peer_summary_requested.count(from)) {
3540
 
      if (!pg->is_active()) {
3541
 
        pg->proc_replica_log(*t, info, log, missing, from);
3542
 
        
3543
 
        // peer
 
3966
    if (pg->is_active())  {
 
3967
      // PG is ACTIVE
 
3968
      dout(10) << *pg << " searching osd" << from << " log for unfound items." << dendl;
 
3969
      pg->search_for_missing(info, missing, from);
 
3970
 
 
3971
      if (pg->have_unfound()) {
 
3972
        // Make sure we've requested MISSING information from every OSD
 
3973
        // we know about.
3544
3974
        map< int, map<pg_t,PG::Query> > query_map;
3545
 
        pg->peer(*t, fin->contexts, query_map, info_map);
3546
 
        pg->update_stats();
 
3975
        pg->discover_all_missing(query_map);
3547
3976
        do_queries(query_map);
3548
 
      } else {
 
3977
      }
 
3978
      else {
3549
3979
        dout(10) << *pg << " ignoring osd" << from << " log, pg is already active" << dendl;
3550
3980
      }
 
3981
    }
 
3982
    else if ((!log.empty()) && missing) {
 
3983
      // PG is INACTIVE
 
3984
      pg->proc_replica_log(*t, info, log, *missing, from);
 
3985
      
 
3986
      // peer
 
3987
      map< int, map<pg_t,PG::Query> > query_map;
 
3988
      pg->do_peer(*t, fin->contexts, query_map, info_map);
 
3989
      pg->update_stats();
 
3990
      do_queries(query_map);
 
3991
    }
 
3992
  } else if (!pg->info.dne()) {
 
3993
    if (!pg->is_active()) {
 
3994
      // INACTIVE REPLICA
 
3995
      assert(from == pg->acting[0]);
 
3996
      pg->merge_log(*t, info, log, from);
 
3997
 
 
3998
      // We should have the right logs before activating.
 
3999
      assert(pg->log.tail <= pg->info.last_complete || pg->log.backlog);
 
4000
      assert(pg->log.head == pg->info.last_update);
 
4001
 
 
4002
      pg->activate(*t, fin->contexts, info_map);
3551
4003
    } else {
3552
 
      dout(10) << *pg << " ignoring osd" << from << " log, i didn't ask for it (recently)" << dendl;
3553
 
    }
3554
 
  } else {
3555
 
    if (!pg->info.dne()) {
3556
 
      // i am REPLICA
3557
 
      if (!pg->is_active()) {
3558
 
        pg->merge_log(*t, info, log, missing, from);
3559
 
        pg->activate(*t, fin->contexts, info_map);
3560
 
      } else {
3561
 
        // just update our stats
3562
 
        dout(10) << *pg << " writing updated stats" << dendl;
3563
 
        pg->info.stats = info.stats;
3564
 
 
3565
 
        // did a snap just get purged?
3566
 
        if (info.purged_snaps.size() < pg->info.purged_snaps.size()) {
3567
 
          stringstream ss;
3568
 
          ss << "pg " << pg->info.pgid << " replica got purged_snaps " << info.purged_snaps
3569
 
             << " had " << pg->info.purged_snaps;
3570
 
          logclient.log(LOG_WARN, ss);
3571
 
          pg->info.purged_snaps = info.purged_snaps;
3572
 
        } else {
3573
 
          interval_set<snapid_t> p = info.purged_snaps;
3574
 
          p.subtract(pg->info.purged_snaps);
3575
 
          if (!p.empty()) {
3576
 
            dout(10) << " purged_snaps " << pg->info.purged_snaps
3577
 
                     << " -> " << info.purged_snaps
3578
 
                     << " removed " << p << dendl;
3579
 
            snapid_t sn = p.start();
3580
 
            coll_t c = coll_t::build_snap_pg_coll(info.pgid, sn);
3581
 
            t->remove_collection(c);
3582
 
            
3583
 
            pg->info.purged_snaps = info.purged_snaps;
3584
 
          }
3585
 
        }
3586
 
 
3587
 
        pg->write_info(*t);
 
4004
      // ACTIVE REPLICA
 
4005
      assert(pg->is_replica());
 
4006
 
 
4007
      // just update our stats
 
4008
      dout(10) << *pg << " writing updated stats" << dendl;
 
4009
      pg->info.stats = info.stats;
 
4010
 
 
4011
      // Handle changes to purged_snaps
 
4012
      interval_set<snapid_t> p;
 
4013
      p.union_of(info.purged_snaps, pg->info.purged_snaps);
 
4014
      p.subtract(pg->info.purged_snaps);
 
4015
      pg->info.purged_snaps = info.purged_snaps;
 
4016
      if (!p.empty()) {
 
4017
        dout(10) << " purged_snaps " << pg->info.purged_snaps
 
4018
                 << " -> " << info.purged_snaps
 
4019
                 << " removed " << p << dendl;
 
4020
        pg->adjust_local_snaps(*t, p);
3588
4021
      }
3589
4022
    }
 
4023
    
 
4024
    pg->write_info(*t);
 
4025
    
 
4026
    if (!log.empty()) {
 
4027
      dout(10) << *pg << ": inactive replica merging new PG log entries" << dendl;
 
4028
      pg->merge_log(*t, info, log, from);
 
4029
    }
3590
4030
  }
3591
4031
 
3592
4032
  int tr = store->queue_transaction(&pg->osr, t, new ObjectStore::C_DeleteTransaction(t), fin);
3608
4048
  if (!require_same_or_newer_map(m, m->get_epoch())) return;
3609
4049
 
3610
4050
  _process_pg_info(m->get_epoch(), from, 
3611
 
                   m->info, m->log, m->missing, 0,
 
4051
                   m->info, m->log, &m->missing, 0,
3612
4052
                   created);
3613
4053
  if (created)
3614
4054
    update_heartbeat_peers();
3627
4067
  if (!require_same_or_newer_map(m, m->get_epoch())) return;
3628
4068
 
3629
4069
  PG::Log empty_log;
3630
 
  PG::Missing empty_missing;
3631
4070
  map<int,MOSDPGInfo*> info_map;
3632
4071
  int created = 0;
3633
4072
 
3634
4073
  for (vector<PG::Info>::iterator p = m->pg_info.begin();
3635
4074
       p != m->pg_info.end();
3636
4075
       ++p) 
3637
 
    _process_pg_info(m->get_epoch(), from, *p, empty_log, empty_missing, &info_map, created);
 
4076
    _process_pg_info(m->get_epoch(), from, *p, empty_log, NULL, &info_map, created);
3638
4077
 
3639
4078
  do_infos(info_map);
3640
4079
  if (created)
3687
4126
  m->put();
3688
4127
}
3689
4128
 
 
4129
void OSD::handle_pg_missing(MOSDPGMissing *m)
 
4130
{
 
4131
  dout(7) << __func__  << " " << *m << " from " << m->get_source() << dendl;
 
4132
 
 
4133
  if (!require_osd_peer(m))
 
4134
    return;
 
4135
 
 
4136
  int from = m->get_source().num();
 
4137
  if (!require_same_or_newer_map(m, m->get_epoch()))
 
4138
    return;
 
4139
 
 
4140
  PG::Log empty_log;
 
4141
  int created = 0;
 
4142
  _process_pg_info(m->get_epoch(), from, m->info,
 
4143
                   empty_log, &m->missing, NULL, created);
 
4144
  if (created)
 
4145
    update_heartbeat_peers();
 
4146
 
 
4147
  m->put();
 
4148
}
3690
4149
 
3691
4150
/** PGQuery
3692
4151
 * from primary to replica | stray
3750
4209
      pg->up.swap( up );
3751
4210
      pg->set_role(role);
3752
4211
      pg->info.history = history;
 
4212
      reg_last_pg_scrub(pg->info.pgid, pg->info.history.last_scrub_stamp);
3753
4213
      pg->write_info(*t);
3754
4214
      pg->write_log(*t);
3755
4215
      int tr = store->queue_transaction(&pg->osr, t);
3768
4228
      }
3769
4229
    }
3770
4230
 
 
4231
    if (pg->deleting) {
 
4232
      /*
 
4233
       * We cancel deletion on pg change.  And the primary will never
 
4234
       * query anything it already asked us to delete.  So the only
 
4235
       * reason we would ever get a query on a deleting pg is when we
 
4236
       * get an old query from an old primary.. which we can safely
 
4237
       * ignore.
 
4238
       */
 
4239
      dout(10) << *pg << " query on deleting pg; ignoring" << dendl;
 
4240
      pg->unlock();
 
4241
      continue;
 
4242
    }
 
4243
 
 
4244
    unreg_last_pg_scrub(pg->info.pgid, pg->info.history.last_scrub_stamp);
3771
4245
    pg->info.history.merge(it->second.history);
 
4246
    reg_last_pg_scrub(pg->info.pgid, pg->info.history.last_scrub_stamp);
3772
4247
 
3773
4248
    // ok, process query!
3774
4249
    assert(!pg->acting.empty());
3807
4282
        dout(10) << *pg << " sending " << mlog->log << " " << mlog->missing << dendl;
3808
4283
        //m->log.print(cout);
3809
4284
        
3810
 
        _share_map_outgoing(osdmap->get_inst(from));
3811
 
        messenger->send_message(mlog, m->get_connection());
 
4285
        _share_map_outgoing(osdmap->get_cluster_inst(from));
 
4286
        cluster_messenger->send_message(mlog, m->get_connection());
3812
4287
      }
3813
4288
    }    
3814
4289
 
3868
4343
void OSD::queue_pg_for_deletion(PG *pg)
3869
4344
{
3870
4345
  dout(10) << *pg << " removing." << dendl;
 
4346
  pg->assert_locked();
3871
4347
  assert(pg->get_role() == -1);
3872
 
  pg->deleting = true;
3873
 
  remove_wq.queue(pg);
 
4348
  if (!pg->deleting) {
 
4349
    pg->deleting = true;
 
4350
    remove_wq.queue(pg);
 
4351
  }
3874
4352
}
3875
4353
 
3876
4354
void OSD::_remove_pg(PG *pg)
3892
4370
  {
3893
4371
    ObjectStore::Transaction *t = new ObjectStore::Transaction;
3894
4372
    pg->write_info(*t);
3895
 
    t->remove(meta_coll, pg->log_oid);
 
4373
    pg->write_log(*t);
3896
4374
    int tr = store->queue_transaction(&pg->osr, t);
3897
4375
    assert(tr == 0);
3898
4376
  }
3902
4380
  ObjectStore::Transaction *rmt = new ObjectStore::Transaction;
3903
4381
 
3904
4382
  // snap collections
3905
 
  for (set<snapid_t>::iterator p = pg->snap_collections.begin();
 
4383
  for (interval_set<snapid_t>::iterator p = pg->snap_collections.begin();
3906
4384
       p != pg->snap_collections.end();
3907
4385
       p++) {
3908
 
    vector<sobject_t> olist;      
3909
 
    store->collection_list(coll_t::build_snap_pg_coll(pgid, *p), olist);
3910
 
    dout(10) << "_remove_pg " << pgid << " snap " << *p << " " << olist.size() << " objects" << dendl;
3911
 
    for (vector<sobject_t>::iterator q = olist.begin();
3912
 
         q != olist.end();
3913
 
         q++) {
3914
 
      ObjectStore::Transaction *t = new ObjectStore::Transaction;
3915
 
      t->remove(coll_t::build_snap_pg_coll(pgid, *p), *q);
3916
 
      t->remove(coll_t::build_pg_coll(pgid), *q);          // we may hit this twice, but it's harmless
3917
 
      int tr = store->queue_transaction(&pg->osr, t);
3918
 
      assert(tr == 0);
3919
 
 
3920
 
      if ((++n & 0xff) == 0) {
3921
 
        pg->unlock();
3922
 
        pg->lock();
3923
 
        if (!pg->deleting) {
3924
 
          dout(10) << "_remove_pg aborted on " << *pg << dendl;
 
4386
    for (snapid_t cur = p.get_start();
 
4387
         cur < p.get_start() + p.get_len();
 
4388
         ++cur) {
 
4389
      vector<sobject_t> olist;      
 
4390
      store->collection_list(coll_t(pgid, cur), olist);
 
4391
      dout(10) << "_remove_pg " << pgid << " snap " << cur << " " << olist.size() << " objects" << dendl;
 
4392
      for (vector<sobject_t>::iterator q = olist.begin();
 
4393
           q != olist.end();
 
4394
           q++) {
 
4395
        ObjectStore::Transaction *t = new ObjectStore::Transaction;
 
4396
        t->remove(coll_t(pgid, cur), *q);
 
4397
        t->remove(coll_t(pgid), *q);          // we may hit this twice, but it's harmless
 
4398
        int tr = store->queue_transaction(&pg->osr, t);
 
4399
        assert(tr == 0);
 
4400
        
 
4401
        if ((++n & 0xff) == 0) {
3925
4402
          pg->unlock();
3926
 
          return;
 
4403
          pg->lock();
 
4404
          if (!pg->deleting) {
 
4405
            dout(10) << "_remove_pg aborted on " << *pg << dendl;
 
4406
            pg->unlock();
 
4407
            return;
 
4408
          }
3927
4409
        }
3928
4410
      }
 
4411
      rmt->remove_collection(coll_t(pgid, cur));
3929
4412
    }
3930
 
    rmt->remove_collection(coll_t::build_snap_pg_coll(pgid, *p));
3931
4413
  }
3932
4414
 
3933
4415
  // (what remains of the) main collection
3934
4416
  vector<sobject_t> olist;
3935
 
  store->collection_list(coll_t::build_pg_coll(pgid), olist);
 
4417
  store->collection_list(coll_t(pgid), olist);
3936
4418
  dout(10) << "_remove_pg " << pgid << " " << olist.size() << " objects" << dendl;
3937
4419
  for (vector<sobject_t>::iterator p = olist.begin();
3938
4420
       p != olist.end();
3939
4421
       p++) {
3940
4422
    ObjectStore::Transaction *t = new ObjectStore::Transaction;
3941
 
    t->remove(coll_t::build_pg_coll(pgid), *p);
 
4423
    t->remove(coll_t(pgid), *p);
3942
4424
    int tr = store->queue_transaction(&pg->osr, t);
3943
4425
    assert(tr == 0);
3944
4426
 
3971
4453
  dout(10) << "_remove_pg " << pgid << " removing final" << dendl;
3972
4454
 
3973
4455
  {
3974
 
    rmt->remove_collection(coll_t::build_pg_coll(pgid));
 
4456
    rmt->remove(coll_t::META_COLL, pg->log_oid);
 
4457
    rmt->remove(coll_t::META_COLL, pg->biginfo_oid);
 
4458
    rmt->remove_collection(coll_t(pgid));
3975
4459
    int tr = store->queue_transaction(NULL, rmt);
3976
4460
    assert(tr == 0);
3977
4461
  }
3978
4462
  
3979
4463
  // remove from map
3980
4464
  pg_map.erase(pgid);
 
4465
  unreg_last_pg_scrub(pg->info.pgid, pg->info.history.last_scrub_stamp);
3981
4466
 
3982
4467
  _put_pool(pgid.pool());
3983
4468
 
4062
4547
    MOSDPGLog *m = new MOSDPGLog(osdmap->get_epoch(), pg->info);
4063
4548
    m->missing = pg->missing;
4064
4549
    m->log = pg->log;
4065
 
    _share_map_outgoing(osdmap->get_inst(pg->get_primary()));
4066
 
    messenger->send_message(m, osdmap->get_inst(pg->get_primary()));
 
4550
    _share_map_outgoing(osdmap->get_cluster_inst(pg->get_primary()));
 
4551
    cluster_messenger->send_message(m, osdmap->get_cluster_inst(pg->get_primary()));
4067
4552
  } else {
4068
4553
    dout(10) << *pg << "  generated backlog, peering" << dendl;
4069
4554
 
4070
4555
    map< int, map<pg_t,PG::Query> > query_map;    // peer -> PG -> get_summary_since
4071
4556
    ObjectStore::Transaction *t = new ObjectStore::Transaction;
4072
4557
    C_Contexts *fin = new C_Contexts;
4073
 
    pg->peer(*t, fin->contexts, query_map, NULL);
 
4558
    pg->do_peer(*t, fin->contexts, query_map, NULL);
4074
4559
    do_queries(query_map);
4075
4560
    if (pg->dirty_info)
4076
4561
      pg->write_info(*t);
4176
4661
             << " (" << recovery_ops_active << "/" << g_conf.osd_recovery_max_active << " rops) on "
4177
4662
             << *pg << dendl;
4178
4663
#ifdef DEBUG_RECOVERY_OIDS
4179
 
    dout(20) << "  active was " << recovery_oids << dendl;
 
4664
    dout(20) << "  active was " << recovery_oids[pg->info.pgid] << dendl;
4180
4665
#endif
4181
4666
    
4182
4667
    int started = pg->start_recovery_ops(max);
4203
4688
  recovery_ops_active++;
4204
4689
 
4205
4690
#ifdef DEBUG_RECOVERY_OIDS
4206
 
  dout(20) << "  active was " << recovery_oids << dendl;
4207
 
  assert(recovery_oids.count(soid) == 0);
4208
 
  recovery_oids.insert(soid);
4209
 
  assert((int)recovery_oids.size() == recovery_ops_active);
 
4691
  dout(20) << "  active was " << recovery_oids[pg->info.pgid] << dendl;
 
4692
  assert(recovery_oids[pg->info.pgid].count(soid) == 0);
 
4693
  recovery_oids[pg->info.pgid].insert(soid);
4210
4694
#endif
4211
4695
 
4212
4696
  recovery_wq.unlock();
4225
4709
  assert(recovery_ops_active >= 0);
4226
4710
 
4227
4711
#ifdef DEBUG_RECOVERY_OIDS
4228
 
  dout(20) << "  active oids was " << recovery_oids << dendl;
4229
 
  assert(recovery_oids.count(soid));
4230
 
  recovery_oids.erase(soid);
4231
 
  assert((int)recovery_oids.size() == recovery_ops_active);
 
4712
  dout(20) << "  active oids was " << recovery_oids[pg->info.pgid] << dendl;
 
4713
  assert(recovery_oids[pg->info.pgid].count(soid));
 
4714
  recovery_oids[pg->info.pgid].erase(soid);
4232
4715
#endif
4233
4716
 
4234
4717
  if (dequeue)
4269
4752
    flags = CEPH_OSD_FLAG_ACK;
4270
4753
 
4271
4754
  MOSDOpReply *reply = new MOSDOpReply(op, err, osdmap->get_epoch(), flags);
4272
 
  messenger->send_message(reply, op->get_connection());
 
4755
  Messenger *msgr = client_messenger;
 
4756
  if (op->get_source().is_osd())
 
4757
    msgr = cluster_messenger;
 
4758
  msgr->send_message(reply, op->get_connection());
4273
4759
  op->put();
4274
4760
}
4275
4761
 
4320
4806
  // ...
4321
4807
  throttle_op_queue();
4322
4808
 
4323
 
  logger->set(l_osd_buf, buffer_total_alloc.read());
4324
 
 
4325
4809
  utime_t now = g_clock.now();
4326
4810
 
4327
4811
  init_op_flags(op);
4353
4837
 
4354
4838
  int err = -EPERM;
4355
4839
  if (op->may_read() && !(perm & OSD_POOL_CAP_R)) {
4356
 
    dout(0) << "no READ permission to access pool " << pg->pool->name << dendl;
 
4840
    dout(10) << "no READ permission to access pool " << pg->pool->name << dendl;
4357
4841
  } else if (op->may_write() && !(perm & OSD_POOL_CAP_W)) {
4358
 
    dout(0) << "no WRITE permission to access pool " << pg->pool->name << dendl;
 
4842
    dout(10) << "no WRITE permission to access pool " << pg->pool->name << dendl;
4359
4843
  } else if (op->require_exec_caps() && !(perm & OSD_POOL_CAP_X)) {
4360
 
    dout(0) << "no EXEC permission to access pool " << pg->pool->name << dendl;
 
4844
    dout(10) << "no EXEC permission to access pool " << pg->pool->name << dendl;
4361
4845
  } else {
4362
4846
    err = 0;
4363
4847
  }
4424
4908
      return;
4425
4909
    }
4426
4910
    
4427
 
    // scrubbing?
4428
 
    if (pg->is_scrubbing()) {
4429
 
      dout(10) << *pg << " is scrubbing, deferring op " << *op << dendl;
4430
 
      pg->waiting_for_active.push_back(op);
4431
 
      pg->unlock();
4432
 
      return;
4433
 
    }
4434
4911
  } else {
4435
4912
    // read
4436
4913
    if (!pg->same_for_read_since(op->get_map_epoch())) {
4438
4915
      pg->unlock();
4439
4916
      return;
4440
4917
    }
4441
 
 
4442
 
    if (op->get_snapid() > 0) {
4443
 
      // snap read.  hrm.
4444
 
      // are we missing a revision that we might need?
4445
 
      // let's get them all.
4446
 
      sobject_t soid(op->get_oid(), CEPH_NOSNAP);
4447
 
      for (int i=-2; i<(int)op->get_snaps().size(); i++) {
4448
 
        if (i >= 0)
4449
 
          soid.snap = op->get_snaps()[i];
4450
 
        else if (i == -1)
4451
 
          soid.snap = CEPH_NOSNAP;
4452
 
        else
4453
 
          soid.snap = CEPH_SNAPDIR;
4454
 
        if (pg->is_missing_object(soid)) {
4455
 
          dout(10) << "handle_op _may_ need missing rev " << soid << ", pulling" << dendl;
4456
 
          pg->wait_for_missing_object(soid, op);
4457
 
          pg->unlock();
4458
 
          return;
4459
 
        }
4460
 
      }
4461
 
    }
4462
4918
  }
4463
4919
 
4464
4920
  if ((op->get_flags() & CEPH_OSD_FLAG_PGOP) == 0) {
4480
4936
  
4481
4937
  dout(10) << "handle_op " << *op << " in " << *pg << dendl;
4482
4938
 
4483
 
  /* turn this off for now.
4484
 
  // proprocess op? 
4485
 
  if (pg->preprocess_op(op, now)) {
4486
 
    pg->unlock();
4487
 
    return;
4488
 
  }
4489
 
  */
4490
 
 
4491
4939
  if (!op->may_write()) {
4492
4940
    Mutex::Locker lock(peer_stat_lock);
4493
4941
    stat_rd_ops_in_queue++;
4644
5092
    //  do this preemptively while we hold osd_lock and pg->lock
4645
5093
    //  to avoid lock ordering issues later.
4646
5094
    for (unsigned i=1; i<pg->acting.size(); i++) 
4647
 
      _share_map_outgoing( osdmap->get_inst(pg->acting[i]) ); 
 
5095
      _share_map_outgoing( osdmap->get_cluster_inst(pg->acting[i]) );
4648
5096
  }
4649
5097
  osd_lock.Unlock();
4650
5098
 
4711
5159
 
4712
5160
// --------------------------------
4713
5161
 
4714
 
int OSD::get_class(const string& cname, ClassVersion& version, pg_t pgid, Message *m, ClassHandler::ClassData **cls)
 
5162
int OSD::get_class(const string& cname, ClassVersion& version, pg_t pgid, Message *m,
 
5163
                   ClassHandler::ClassData **pcls)
4715
5164
{
4716
5165
  Mutex::Locker l(class_lock);
4717
 
  dout(10) << "wait_for_missing_class '" << cname << "' by " << pgid << dendl;
4718
 
 
4719
 
 
4720
 
  *cls = class_handler->get_class(cname, version);
4721
 
  if (*cls) {
4722
 
    switch ((*cls)->status) {
 
5166
  dout(10) << "get_class '" << cname << "' by " << m << dendl;
 
5167
 
 
5168
  ClassHandler::ClassData *cls = class_handler->get_class(cname, version);
 
5169
  if (cls) {
 
5170
    switch (cls->status) {
4723
5171
    case ClassHandler::ClassData::CLASS_LOADED:
 
5172
      *pcls = cls;
4724
5173
      return 0;
4725
5174
    case ClassHandler::ClassData::CLASS_INVALID:
4726
 
      dout(0) << "class not supported" << dendl;
 
5175
      dout(1) << "class " << cname << " not supported" << dendl;
4727
5176
      return -EOPNOTSUPP;
 
5177
    case ClassHandler::ClassData::CLASS_ERROR:
 
5178
      dout(0) << "error loading class!" << dendl;
 
5179
      return -EIO;
4728
5180
    default:
4729
5181
      assert(0);
4730
5182
    }
4731
5183
  }
4732
5184
 
4733
 
  waiting_for_missing_class[cname][pgid].push_back(m);
 
5185
  dout(10) << "get_class '" << cname << "' by " << m << " waiting for missing class" << dendl;
 
5186
  waiting_for_missing_class[cname].push_back(m);
4734
5187
  return -EAGAIN;
4735
5188
}
4736
5189
 
4740
5193
  dout(10) << "got_class '" << cname << "'" << dendl;
4741
5194
 
4742
5195
  if (waiting_for_missing_class.count(cname)) {
4743
 
    map<pg_t,list<Message*> >& w = waiting_for_missing_class[cname];
4744
 
    for (map<pg_t,list<Message*> >::iterator p = w.begin(); p != w.end(); p++)
4745
 
      take_waiters(p->second);
 
5196
    take_waiters(waiting_for_missing_class[cname]);
4746
5197
    waiting_for_missing_class.erase(cname);
4747
5198
  }
4748
5199
}
4753
5204
    return;
4754
5205
 
4755
5206
  Mutex::Locker l(class_lock);
4756
 
  dout(0) << "handle_class action=" << m->action << dendl;
 
5207
  dout(10) << "handle_class action=" << m->action << dendl;
4757
5208
 
4758
5209
  switch (m->action) {
4759
5210
  case CLASS_RESPONSE:
4810
5261
        is_write = flags & CLS_METHOD_WR;
4811
5262
        is_public = flags & CLS_METHOD_PUBLIC;
4812
5263
 
4813
 
        dout(0) << "class " << cname << " method " << mname
 
5264
        dout(10) << "class " << cname << " method " << mname
4814
5265
                << " flags=" << (is_read ? "r" : "") << (is_write ? "w" : "") << dendl;
4815
5266
        if (is_read)
4816
5267
          op->rmw_flags |= CEPH_OSD_FLAG_READ;