331
OSD::OSD(int id, Messenger *m, Messenger *hbm, MonClient *mc, const char *dev, const char *jdev) :
336
OSD::OSD(int id, Messenger *internal_messenger, Messenger *external_messenger, Messenger *hbm, MonClient *mc, const char *dev, const char *jdev) :
332
337
osd_lock("OSD::osd_lock"),
339
cluster_messenger(internal_messenger),
340
client_messenger(external_messenger),
336
342
logger(NULL), logger_started(false),
338
logclient(messenger, &mc->monmap, mc),
344
map_in_progress(false),
345
logclient(client_messenger, &mc->monmap, mc),
340
347
dev_path(dev), journal_path(jdev),
341
348
dispatch_running(false),
1202
1260
for (map<int,epoch_t>::iterator p = old_to.begin();
1203
1261
p != old_to.end();
1263
assert(old_inst.count(p->first));
1264
if (heartbeat_to.count(p->first))
1205
1266
if (p->second > osdmap->get_epoch()) {
1206
1267
dout(10) << "update_heartbeat_peers: keeping newer _to peer " << old_inst[p->first]
1207
1268
<< " as of " << p->second << dendl;
1208
1269
heartbeat_to[p->first] = p->second;
1209
1270
heartbeat_inst[p->first] = old_inst[p->first];
1210
} else if (p->second < osdmap->get_epoch() &&
1211
(!osdmap->is_up(p->first) ||
1212
osdmap->get_hb_inst(p->first) != old_inst[p->first])) {
1213
dout(10) << "update_heartbeat_peers: marking down old _to peer " << old_inst[p->first]
1214
<< " as of " << p->second << dendl;
1215
heartbeat_messenger->mark_down(old_inst[p->first].addr);
1272
if (heartbeat_from.count(p->first) && old_inst[p->first] == heartbeat_inst[p->first]) {
1273
dout(10) << "update_heartbeat_peers: old _to peer " << old_inst[p->first]
1274
<< " is still a _from peer, not marking down" << dendl;
1276
dout(10) << "update_heartbeat_peers: marking down old _to peer " << old_inst[p->first]
1277
<< " as of " << p->second << dendl;
1278
heartbeat_messenger->mark_down(old_inst[p->first].addr);
1281
if (!osdmap->is_down(p->first) &&
1282
osdmap->get_hb_inst(p->first) == old_inst[p->first]) {
1283
dout(10) << "update_heartbeat_peers: sharing map with old _to peer " << old_inst[p->first]
1284
<< " as of " << p->second << dendl;
1285
// share latest map with this peer (via the cluster link, NOT
1286
// the heartbeat link), so they know not to expect heartbeats
1287
// from us. otherwise they may mark us down!
1288
_share_map_outgoing(osdmap->get_cluster_inst(p->first));
1218
1292
for (map<int,epoch_t>::iterator p = old_from.begin();
1219
1293
p != old_from.end();
1221
if (heartbeat_to.count(p->first) == 0 ||
1222
heartbeat_inst[p->first] != old_inst[p->first])
1223
dout(10) << "update_heartbeat_peers: dropped old _from osd" << p->first
1224
<< " " << old_inst[p->first] << dendl;
1295
if (heartbeat_from.count(p->first) == 0 ||
1296
heartbeat_inst[p->first] != old_inst[p->first]) {
1297
if (heartbeat_to.count(p->first) == 0) {
1298
dout(10) << "update_heartbeat_peers: marking down old _from peer " << old_inst[p->first]
1299
<< " as of " << p->second << dendl;
1300
heartbeat_messenger->mark_down(old_inst[p->first].addr);
1302
dout(10) << "update_heartbeat_peers: old _from peer " << old_inst[p->first]
1303
<< " is still a _to peer, not marking down" << dendl;
1227
heartbeat_epoch = osdmap->get_epoch();
1229
1308
dout(10) << "update_heartbeat_peers: hb to: " << heartbeat_to << dendl;
1230
1309
dout(10) << "update_heartbeat_peers: hb from: " << heartbeat_from << dendl;
1312
for (map<int,epoch_t>::iterator p = heartbeat_to.begin(); p != heartbeat_to.end(); p++)
1313
if (heartbeat_inst.count(p->first) == 0)
1314
dout(0) << " no inst for _to " << p->first << dendl;
1315
for (map<int,epoch_t>::iterator p = heartbeat_from.begin(); p != heartbeat_from.end(); p++)
1316
if (heartbeat_inst.count(p->first) == 0)
1317
dout(0) << " no inst for _from " << p->first << dendl;
1232
1320
heartbeat_lock.Unlock();
1521
1608
void OSD::send_boot()
1523
1610
dout(10) << "send_boot" << dendl;
1611
entity_addr_t cluster_addr = cluster_messenger->get_myaddr();
1612
if (cluster_addr.is_blank_addr()) {
1613
int port = cluster_addr.get_port();
1614
cluster_addr = client_messenger->get_myaddr();
1615
cluster_addr.set_port(port);
1616
dout(10) << " assuming cluster_addr ip matches client_addr" << dendl;
1524
1618
entity_addr_t hb_addr = heartbeat_messenger->get_myaddr();
1525
1619
if (hb_addr.is_blank_addr()) {
1526
1620
int port = hb_addr.get_port();
1527
hb_addr = messenger->get_myaddr();
1621
hb_addr = cluster_addr;
1528
1622
hb_addr.set_port(port);
1623
dout(10) << " assuming hb_addr ip matches cluster_addr" << dendl;
1530
monc->send_mon_message(new MOSDBoot(superblock, hb_addr));
1625
MOSDBoot *mboot = new MOSDBoot(superblock, hb_addr, cluster_addr);
1626
dout(10) << " client_addr " << client_messenger->get_myaddr()
1627
<< ", cluster_addr " << cluster_addr
1628
<< ", hb addr " << hb_addr
1630
monc->send_mon_message(mboot);
1533
1633
void OSD::queue_want_up_thru(epoch_t want)
1704
1819
utime_t start = g_clock.now();
1705
1820
for (uint64_t pos = 0; pos < count; pos += bsize) {
1707
sprintf(nm, "disk_bw_test_%lld", (long long)pos);
1822
snprintf(nm, sizeof(nm), "disk_bw_test_%lld", (long long)pos);
1708
1823
object_t oid(nm);
1709
1824
sobject_t soid(oid, 0);
1710
1825
ObjectStore::Transaction *t = new ObjectStore::Transaction;
1711
t->write(meta_coll, soid, 0, bsize, bl);
1826
t->write(coll_t::META_COLL, soid, 0, bsize, bl);
1712
1827
store->queue_transaction(NULL, t);
1713
cleanupt->remove(meta_coll, soid);
1828
cleanupt->remove(coll_t::META_COLL, soid);
1715
1830
store->sync_and_flush();
1716
1831
utime_t end = g_clock.now();
1729
1844
logger_reset_all();
1730
1845
} else if (m->cmd.size() == 2 && m->cmd[0] == "logger" && m->cmd[1] == "reopen") {
1731
1846
logger_reopen_all();
1733
dout(0) << "unrecognized command! " << m->cmd << dendl;
1847
} else if (m->cmd.size() == 1 && m->cmd[0] == "heapdump") {
1849
if (g_conf.tcmalloc_have) {
1850
if (!g_conf.profiler_running()) {
1851
ss << "can't dump heap: profiler not running";
1853
ss << g_conf.name << "dumping heap profile now";
1854
g_conf.profiler_dump("admin request");
1857
ss << g_conf.name << " does not have tcmalloc, can't use profiler";
1859
logclient.log(LOG_INFO, ss);
1860
} else if (m->cmd.size() == 1 && m->cmd[0] == "enable_profiler_options") {
1861
char val[sizeof(int)*8+1];
1862
snprintf(val, sizeof(val), "%i", g_conf.profiler_allocation_interval);
1863
setenv("HEAP_PROFILE_ALLOCATION_INTERVAL", val, g_conf.profiler_allocation_interval);
1864
snprintf(val, sizeof(val), "%i", g_conf.profiler_highwater_interval);
1865
setenv("HEAP_PROFILE_INUSE_INTERVAL", val, g_conf.profiler_highwater_interval);
1867
ss << g_conf.name << " set heap variables from current config";
1868
logclient.log(LOG_INFO, ss);
1869
} else if (m->cmd.size() == 1 && m->cmd[0] == "start_profiler") {
1870
char location[PATH_MAX];
1871
snprintf(location, sizeof(location),
1872
"%s/%s", g_conf.log_dir, g_conf.name);
1873
g_conf.profiler_start(location);
1875
ss << g_conf.name << " started profiler with output " << location;
1876
logclient.log(LOG_INFO, ss);
1877
} else if (m->cmd.size() == 1 && m->cmd[0] == "stop_profiler") {
1878
g_conf.profiler_stop();
1880
ss << g_conf.name << " stopped profiler";
1881
logclient.log(LOG_INFO, ss);
1883
else if (m->cmd.size() > 1 && m->cmd[0] == "debug") {
1884
if (m->cmd.size() == 3 && m->cmd[1] == "dump_missing") {
1885
const string &file_name(m->cmd[2]);
1886
std::ofstream fout(file_name.c_str());
1887
if (!fout.is_open()) {
1889
ss << "failed to open file '" << file_name << "'";
1890
logclient.log(LOG_INFO, ss);
1894
std::set <pg_t> keys;
1895
for (hash_map<pg_t, PG*>::const_iterator pg_map_e = pg_map.begin();
1896
pg_map_e != pg_map.end(); ++pg_map_e) {
1897
keys.insert(pg_map_e->first);
1900
fout << "*** osd " << whoami << ": dump_missing ***" << std::endl;
1901
for (std::set <pg_t>::iterator p = keys.begin();
1902
p != keys.end(); ++p) {
1903
hash_map<pg_t, PG*>::iterator q = pg_map.find(*p);
1904
assert(q != pg_map.end());
1908
fout << *pg << std::endl;
1909
std::map<sobject_t, PG::Missing::item>::iterator mend = pg->missing.missing.end();
1910
std::map<sobject_t, PG::Missing::item>::iterator m = pg->missing.missing.begin();
1911
for (; m != mend; ++m) {
1912
fout << m->first << " -> " << m->second << std::endl;
1913
map<sobject_t, set<int> >::const_iterator mli =
1914
pg->missing_loc.find(m->first);
1915
if (mli == pg->missing_loc.end())
1917
const set<int> &mls(mli->second);
1920
fout << "missing_loc: " << mls << std::endl;
1928
else if (m->cmd.size() == 3 && m->cmd[1] == "kick_recovery_wq") {
1929
g_conf.osd_recovery_delay_start = atoi(m->cmd[2].c_str());
1931
ss << "kicking recovery queue. set osd_recovery_delay_start to "
1932
<< g_conf.osd_recovery_delay_start;
1933
logclient.log(LOG_INFO, ss);
1935
defer_recovery_until = g_clock.now();
1936
defer_recovery_until += g_conf.osd_recovery_delay_start;
1937
recovery_wq._kick();
1940
else dout(0) << "unrecognized command! " << m->cmd << dendl;
2310
bool OSD::scrub_should_schedule()
2313
if (getloadavg(loadavgs, 1) != 1) {
2314
dout(10) << "scrub_should_schedule couldn't read loadavgs\n" << dendl;
2318
if (loadavgs[0] >= g_conf.osd_scrub_load_threshold) {
2319
dout(20) << "scrub_should_schedule loadavg " << loadavgs[0]
2320
<< " >= max " << g_conf.osd_scrub_load_threshold
2321
<< " = no, load too high" << dendl;
2325
bool coin_flip = (rand() % 3) == whoami % 3;
2327
dout(20) << "scrub_should_schedule loadavg " << loadavgs[0]
2328
<< " < max " << g_conf.osd_scrub_load_threshold
2329
<< " = no, randomly backing off"
2334
dout(20) << "scrub_should_schedule loadavg " << loadavgs[0]
2335
<< " < max " << g_conf.osd_scrub_load_threshold
2336
<< " = yes" << dendl;
2337
return loadavgs[0] < g_conf.osd_scrub_load_threshold;
2340
void OSD::sched_scrub()
2342
assert(osd_lock.is_locked());
2344
dout(20) << "sched_scrub" << dendl;
2346
pair<utime_t,pg_t> pos;
2347
utime_t max = g_clock.now();
2348
max -= g_conf.osd_scrub_max_interval;
2350
sched_scrub_lock.Lock();
2352
//dout(20) << " " << last_scrub_pg << dendl;
2354
set< pair<utime_t,pg_t> >::iterator p = last_scrub_pg.begin();
2355
while (p != last_scrub_pg.end()) {
2356
//dout(10) << "pos is " << *p << dendl;
2358
utime_t t = pos.first;
2359
pg_t pgid = pos.second;
2362
dout(10) << " " << pgid << " at " << t
2363
<< " > " << max << " (" << g_conf.osd_scrub_max_interval << " seconds ago)" << dendl;
2367
dout(10) << " on " << t << " " << pgid << dendl;
2368
sched_scrub_lock.Unlock();
2369
PG *pg = _lookup_lock_pg(pgid);
2371
if (pg->is_active() && !pg->sched_scrub()) {
2373
sched_scrub_lock.Lock();
2378
sched_scrub_lock.Lock();
2381
p = last_scrub_pg.lower_bound(pos);
2382
//dout(10) << "lb is " << *p << dendl;
2383
if (p != last_scrub_pg.end())
2386
sched_scrub_lock.Unlock();
2388
dout(20) << "sched_scrub done" << dendl;
2391
bool OSD::inc_scrubs_pending()
2393
bool result = false;
2395
sched_scrub_lock.Lock();
2396
if (scrubs_pending + scrubs_active < g_conf.osd_max_scrubs) {
2397
dout(20) << "inc_scrubs_pending " << scrubs_pending << " -> " << (scrubs_pending+1)
2398
<< " (max " << g_conf.osd_max_scrubs << ", active " << scrubs_active << ")" << dendl;
2402
dout(20) << "inc_scrubs_pending " << scrubs_pending << " + " << scrubs_active << " active >= max " << g_conf.osd_max_scrubs << dendl;
2404
sched_scrub_lock.Unlock();
2409
void OSD::dec_scrubs_pending()
2411
sched_scrub_lock.Lock();
2412
dout(20) << "dec_scrubs_pending " << scrubs_pending << " -> " << (scrubs_pending-1)
2413
<< " (max " << g_conf.osd_max_scrubs << ", active " << scrubs_active << ")" << dendl;
2415
assert(scrubs_pending >= 0);
2416
sched_scrub_lock.Unlock();
2419
void OSD::dec_scrubs_active()
2421
sched_scrub_lock.Lock();
2422
dout(20) << "dec_scrubs_active " << scrubs_active << " -> " << (scrubs_active-1)
2423
<< " (max " << g_conf.osd_max_scrubs << ", pending " << scrubs_pending << ")" << dendl;
2425
sched_scrub_lock.Unlock();
2085
2428
// =====================================================
2104
2447
void OSD::note_down_osd(int osd)
2106
messenger->mark_down(osdmap->get_addr(osd));
2449
cluster_messenger->mark_down(osdmap->get_cluster_addr(osd));
2108
2451
heartbeat_lock.Lock();
2110
heartbeat_messenger->mark_down(osdmap->get_hb_addr(osd));
2112
if (heartbeat_inst.count(osd)) {
2113
if (heartbeat_inst[osd] == osdmap->get_hb_inst(osd)) {
2114
dout(10) << "note_down_osd removing heartbeat_inst " << heartbeat_inst[osd] << dendl;
2115
heartbeat_inst.erase(osd);
2117
dout(10) << "note_down_osd leaving heartbeat_inst " << heartbeat_inst[osd]
2118
<< " != " << osdmap->get_hb_inst(osd) << dendl;
2121
dout(10) << "note_down_osd no heartbeat_inst for osd" << osd << dendl;
2453
// note: update_heartbeat_peers will mark down the heartbeat connection.
2123
2455
peer_map_epoch.erase(entity_name_t::OSD(osd));
2124
2456
failure_queue.erase(osd);
2125
2457
failure_pending.erase(osd);
2126
heartbeat_from_stamp.erase(osd);
2128
2459
heartbeat_lock.Unlock();
2381
2739
pg->write_log(t);
2742
bool do_shutdown = false;
2384
2743
if (osdmap->get_epoch() > 0 &&
2385
state != STATE_BOOTING &&
2386
(!osdmap->exists(whoami) ||
2387
(!osdmap->is_up(whoami) && osdmap->get_addr(whoami) == messenger->get_myaddr()))) {
2388
dout(0) << "map says i am down. switching to boot state." << dendl;
2392
ss << "map e" << osdmap->get_epoch() << " wrongly marked me down";
2393
logclient.log(LOG_WARN, ss);
2395
state = STATE_BOOTING;
2398
reset_heartbeat_peers();
2744
state == STATE_ACTIVE) {
2745
if (!osdmap->exists(whoami)) {
2746
dout(0) << "map says i do not exist. shutting down." << dendl;
2747
do_shutdown = true; // don't call shutdown() while we have everything paused
2748
} else if (!osdmap->is_up(whoami) ||
2749
osdmap->get_addr(whoami) != client_messenger->get_myaddr()) {
2751
ss << "map e" << osdmap->get_epoch() << " wrongly marked me down";
2752
logclient.log(LOG_WARN, ss);
2754
state = STATE_BOOTING;
2757
int cport = cluster_messenger->get_myaddr().get_port();
2758
int hbport = heartbeat_messenger->get_myaddr().get_port();
2760
int r = cluster_messenger->rebind(hbport);
2762
do_shutdown = true; // FIXME: do_restart?
2764
r = heartbeat_messenger->rebind(cport);
2766
do_shutdown = true; // FIXME: do_restart?
2768
reset_heartbeat_peers();
2401
2772
// note in the superblock that we were clean thru the prior epoch
2790
3178
assert(0); // we should have all maps.
3181
Messenger *msgr = client_messenger;
3182
if (entity_name_t::TYPE_OSD == inst.name._type)
3183
msgr = cluster_messenger;
2795
messenger->lazy_send_message(m, inst); // only if we already have an open connection
3185
msgr->lazy_send_message(m, inst); // only if we already have an open connection
2797
messenger->send_message(m, inst);
3187
msgr->send_message(m, inst);
2800
3190
bool OSD::get_map_bl(epoch_t e, bufferlist& bl)
2802
return store->read(meta_coll, get_osdmap_pobject_name(e), 0, 0, bl) >= 0;
3192
return store->read(coll_t::META_COLL, get_osdmap_pobject_name(e), 0, 0, bl) >= 0;
2805
3195
bool OSD::get_inc_map_bl(epoch_t e, bufferlist& bl)
2807
return store->read(meta_coll, get_inc_osdmap_pobject_name(e), 0, 0, bl) >= 0;
3197
return store->read(coll_t::META_COLL, get_inc_osdmap_pobject_name(e), 0, 0, bl) >= 0;
2810
3200
OSDMap *OSD::get_map(epoch_t epoch)
3082
3479
struct stat st;
3083
store->stat(coll_t::build_pg_coll(parentid), poid, &st);
3084
store->getattr(coll_t::build_pg_coll(parentid), poid, OI_ATTR, bv);
3480
store->stat(coll_t(parentid), poid, &st);
3481
store->getattr(coll_t(parentid), poid, OI_ATTR, bv);
3085
3482
object_info_t oi(bv);
3087
t.collection_add(coll_t::build_pg_coll(pgid), coll_t::build_pg_coll(parentid), poid);
3088
t.collection_remove(coll_t::build_pg_coll(parentid), poid);
3484
t.collection_add(coll_t(pgid), coll_t(parentid), poid);
3485
t.collection_remove(coll_t(parentid), poid);
3089
3486
if (oi.snaps.size()) {
3090
3487
snapid_t first = oi.snaps[0];
3091
t.collection_add(coll_t::build_snap_pg_coll(pgid, first), coll_t::build_pg_coll(parentid), poid);
3092
t.collection_remove(coll_t::build_snap_pg_coll(parentid, first), poid);
3488
t.collection_add(coll_t(pgid, first), coll_t(parentid), poid);
3489
t.collection_remove(coll_t(parentid, first), poid);
3093
3490
if (oi.snaps.size() > 1) {
3094
3491
snapid_t last = oi.snaps[oi.snaps.size()-1];
3095
t.collection_add(coll_t::build_snap_pg_coll(pgid, last), coll_t::build_pg_coll(parentid), poid);
3096
t.collection_remove(coll_t::build_snap_pg_coll(parentid, last), poid);
3492
t.collection_add(coll_t(pgid, last), coll_t(parentid), poid);
3493
t.collection_remove(coll_t(parentid, last), poid);
3525
dout(10) << *pg << " got " << info << " " << log << " " << missing << dendl;
3938
dout(10) << *pg << ": " << __func__ << " info: " << info << ", ";
3940
*_dout << "(log omitted)";
3942
*_dout << "log: " << log;
3945
*_dout << "(missing omitted)";
3947
*_dout << "missing: " << *missing;
3950
unreg_last_pg_scrub(pg->info.pgid, pg->info.history.last_scrub_stamp);
3526
3951
pg->info.history.merge(info.history);
3952
reg_last_pg_scrub(pg->info.pgid, pg->info.history.last_scrub_stamp);
3529
3955
dout(15) << *pg << " my log = ";
3530
3956
pg->log.print(*_dout);
3532
dout(15) << *pg << " osd" << from << " log = ";
3957
*_dout << std::endl;
3959
*_dout << *pg << " osd" << from << " log = ";
3534
3962
*_dout << dendl;
3536
3964
if (pg->is_primary()) {
3537
3965
// i am PRIMARY
3538
if (pg->peer_log_requested.count(from) ||
3539
pg->peer_summary_requested.count(from)) {
3540
if (!pg->is_active()) {
3541
pg->proc_replica_log(*t, info, log, missing, from);
3966
if (pg->is_active()) {
3968
dout(10) << *pg << " searching osd" << from << " log for unfound items." << dendl;
3969
pg->search_for_missing(info, missing, from);
3971
if (pg->have_unfound()) {
3972
// Make sure we've requested MISSING information from every OSD
3544
3974
map< int, map<pg_t,PG::Query> > query_map;
3545
pg->peer(*t, fin->contexts, query_map, info_map);
3975
pg->discover_all_missing(query_map);
3547
3976
do_queries(query_map);
3549
3979
dout(10) << *pg << " ignoring osd" << from << " log, pg is already active" << dendl;
3982
else if ((!log.empty()) && missing) {
3984
pg->proc_replica_log(*t, info, log, *missing, from);
3987
map< int, map<pg_t,PG::Query> > query_map;
3988
pg->do_peer(*t, fin->contexts, query_map, info_map);
3990
do_queries(query_map);
3992
} else if (!pg->info.dne()) {
3993
if (!pg->is_active()) {
3995
assert(from == pg->acting[0]);
3996
pg->merge_log(*t, info, log, from);
3998
// We should have the right logs before activating.
3999
assert(pg->log.tail <= pg->info.last_complete || pg->log.backlog);
4000
assert(pg->log.head == pg->info.last_update);
4002
pg->activate(*t, fin->contexts, info_map);
3552
dout(10) << *pg << " ignoring osd" << from << " log, i didn't ask for it (recently)" << dendl;
3555
if (!pg->info.dne()) {
3557
if (!pg->is_active()) {
3558
pg->merge_log(*t, info, log, missing, from);
3559
pg->activate(*t, fin->contexts, info_map);
3561
// just update our stats
3562
dout(10) << *pg << " writing updated stats" << dendl;
3563
pg->info.stats = info.stats;
3565
// did a snap just get purged?
3566
if (info.purged_snaps.size() < pg->info.purged_snaps.size()) {
3568
ss << "pg " << pg->info.pgid << " replica got purged_snaps " << info.purged_snaps
3569
<< " had " << pg->info.purged_snaps;
3570
logclient.log(LOG_WARN, ss);
3571
pg->info.purged_snaps = info.purged_snaps;
3573
interval_set<snapid_t> p = info.purged_snaps;
3574
p.subtract(pg->info.purged_snaps);
3576
dout(10) << " purged_snaps " << pg->info.purged_snaps
3577
<< " -> " << info.purged_snaps
3578
<< " removed " << p << dendl;
3579
snapid_t sn = p.start();
3580
coll_t c = coll_t::build_snap_pg_coll(info.pgid, sn);
3581
t->remove_collection(c);
3583
pg->info.purged_snaps = info.purged_snaps;
4005
assert(pg->is_replica());
4007
// just update our stats
4008
dout(10) << *pg << " writing updated stats" << dendl;
4009
pg->info.stats = info.stats;
4011
// Handle changes to purged_snaps
4012
interval_set<snapid_t> p;
4013
p.union_of(info.purged_snaps, pg->info.purged_snaps);
4014
p.subtract(pg->info.purged_snaps);
4015
pg->info.purged_snaps = info.purged_snaps;
4017
dout(10) << " purged_snaps " << pg->info.purged_snaps
4018
<< " -> " << info.purged_snaps
4019
<< " removed " << p << dendl;
4020
pg->adjust_local_snaps(*t, p);
4027
dout(10) << *pg << ": inactive replica merging new PG log entries" << dendl;
4028
pg->merge_log(*t, info, log, from);
3592
4032
int tr = store->queue_transaction(&pg->osr, t, new ObjectStore::C_DeleteTransaction(t), fin);
3902
4380
ObjectStore::Transaction *rmt = new ObjectStore::Transaction;
3904
4382
// snap collections
3905
for (set<snapid_t>::iterator p = pg->snap_collections.begin();
4383
for (interval_set<snapid_t>::iterator p = pg->snap_collections.begin();
3906
4384
p != pg->snap_collections.end();
3908
vector<sobject_t> olist;
3909
store->collection_list(coll_t::build_snap_pg_coll(pgid, *p), olist);
3910
dout(10) << "_remove_pg " << pgid << " snap " << *p << " " << olist.size() << " objects" << dendl;
3911
for (vector<sobject_t>::iterator q = olist.begin();
3914
ObjectStore::Transaction *t = new ObjectStore::Transaction;
3915
t->remove(coll_t::build_snap_pg_coll(pgid, *p), *q);
3916
t->remove(coll_t::build_pg_coll(pgid), *q); // we may hit this twice, but it's harmless
3917
int tr = store->queue_transaction(&pg->osr, t);
3920
if ((++n & 0xff) == 0) {
3923
if (!pg->deleting) {
3924
dout(10) << "_remove_pg aborted on " << *pg << dendl;
4386
for (snapid_t cur = p.get_start();
4387
cur < p.get_start() + p.get_len();
4389
vector<sobject_t> olist;
4390
store->collection_list(coll_t(pgid, cur), olist);
4391
dout(10) << "_remove_pg " << pgid << " snap " << cur << " " << olist.size() << " objects" << dendl;
4392
for (vector<sobject_t>::iterator q = olist.begin();
4395
ObjectStore::Transaction *t = new ObjectStore::Transaction;
4396
t->remove(coll_t(pgid, cur), *q);
4397
t->remove(coll_t(pgid), *q); // we may hit this twice, but it's harmless
4398
int tr = store->queue_transaction(&pg->osr, t);
4401
if ((++n & 0xff) == 0) {
4404
if (!pg->deleting) {
4405
dout(10) << "_remove_pg aborted on " << *pg << dendl;
4411
rmt->remove_collection(coll_t(pgid, cur));
3930
rmt->remove_collection(coll_t::build_snap_pg_coll(pgid, *p));
3933
4415
// (what remains of the) main collection
3934
4416
vector<sobject_t> olist;
3935
store->collection_list(coll_t::build_pg_coll(pgid), olist);
4417
store->collection_list(coll_t(pgid), olist);
3936
4418
dout(10) << "_remove_pg " << pgid << " " << olist.size() << " objects" << dendl;
3937
4419
for (vector<sobject_t>::iterator p = olist.begin();
3938
4420
p != olist.end();
3940
4422
ObjectStore::Transaction *t = new ObjectStore::Transaction;
3941
t->remove(coll_t::build_pg_coll(pgid), *p);
4423
t->remove(coll_t(pgid), *p);
3942
4424
int tr = store->queue_transaction(&pg->osr, t);
3943
4425
assert(tr == 0);
4062
4547
MOSDPGLog *m = new MOSDPGLog(osdmap->get_epoch(), pg->info);
4063
4548
m->missing = pg->missing;
4064
4549
m->log = pg->log;
4065
_share_map_outgoing(osdmap->get_inst(pg->get_primary()));
4066
messenger->send_message(m, osdmap->get_inst(pg->get_primary()));
4550
_share_map_outgoing(osdmap->get_cluster_inst(pg->get_primary()));
4551
cluster_messenger->send_message(m, osdmap->get_cluster_inst(pg->get_primary()));
4068
4553
dout(10) << *pg << " generated backlog, peering" << dendl;
4070
4555
map< int, map<pg_t,PG::Query> > query_map; // peer -> PG -> get_summary_since
4071
4556
ObjectStore::Transaction *t = new ObjectStore::Transaction;
4072
4557
C_Contexts *fin = new C_Contexts;
4073
pg->peer(*t, fin->contexts, query_map, NULL);
4558
pg->do_peer(*t, fin->contexts, query_map, NULL);
4074
4559
do_queries(query_map);
4075
4560
if (pg->dirty_info)
4076
4561
pg->write_info(*t);
4712
5160
// --------------------------------
4714
int OSD::get_class(const string& cname, ClassVersion& version, pg_t pgid, Message *m, ClassHandler::ClassData **cls)
5162
int OSD::get_class(const string& cname, ClassVersion& version, pg_t pgid, Message *m,
5163
ClassHandler::ClassData **pcls)
4716
5165
Mutex::Locker l(class_lock);
4717
dout(10) << "wait_for_missing_class '" << cname << "' by " << pgid << dendl;
4720
*cls = class_handler->get_class(cname, version);
4722
switch ((*cls)->status) {
5166
dout(10) << "get_class '" << cname << "' by " << m << dendl;
5168
ClassHandler::ClassData *cls = class_handler->get_class(cname, version);
5170
switch (cls->status) {
4723
5171
case ClassHandler::ClassData::CLASS_LOADED:
4725
5174
case ClassHandler::ClassData::CLASS_INVALID:
4726
dout(0) << "class not supported" << dendl;
5175
dout(1) << "class " << cname << " not supported" << dendl;
4727
5176
return -EOPNOTSUPP;
5177
case ClassHandler::ClassData::CLASS_ERROR:
5178
dout(0) << "error loading class!" << dendl;
4733
waiting_for_missing_class[cname][pgid].push_back(m);
5185
dout(10) << "get_class '" << cname << "' by " << m << " waiting for missing class" << dendl;
5186
waiting_for_missing_class[cname].push_back(m);
4734
5187
return -EAGAIN;