261
298
// read into a buffer
262
299
vector<hobject_t> sentries;
263
300
PGLSResponse response;
264
::decode(response.handle, bp);
267
if (p->op.pgls.start_epoch &&
268
p->op.pgls.start_epoch < info.history.same_primary_since) {
269
dout(10) << " pgls sequence started epoch " << p->op.pgls.start_epoch
270
<< " < same_primary_since " << info.history.same_primary_since
271
<< ", resetting cookie" << dendl;
272
response.handle = collection_list_handle_t();
275
if (response.handle.in_missing_set) {
276
// it's an offset into the missing set
277
version_t v = response.handle.index;
278
dout(10) << " handle low/missing " << v << dendl;
279
map<version_t, hobject_t>::iterator mp = missing.rmissing.lower_bound(v);
281
while (sentries.size() < p->op.pgls.count) {
282
if (mp == missing.rmissing.end()) {
283
dout(10) << " handle finished low/missing, moving to high/ondisk" << dendl;
284
response.handle.in_missing_set = false;
287
sentries.push_back(mp->second);
288
response.handle.index = mp->first + 1;
291
if (sentries.size() < p->op.pgls.count &&
292
!response.handle.in_missing_set) {
293
// it's a readdir cookie
294
dout(10) << " handle high/missing " << response.handle << dendl;
295
osr.flush(); // order wrt preceeding writes
296
result = osd->store->collection_list_partial(coll, snapid,
297
sentries, p->op.pgls.count - sentries.size(),
299
response.handle.in_missing_set = false;
303
vector<hobject_t>::iterator iter;
304
for (iter = sentries.begin(); iter != sentries.end(); ++iter) {
306
// skip snapdir objects
307
if (iter->snap == CEPH_SNAPDIR)
310
if (snapid != CEPH_NOSNAP) {
311
// skip items not defined for this snapshot
312
if (iter->snap == CEPH_NOSNAP) {
314
osd->store->getattr(coll, *iter, SS_ATTR, bl);
316
if (snapid <= snapset.seq)
320
osd->store->getattr(coll, *iter, OI_ATTR, bl);
321
object_info_t oi(bl);
323
for (vector<snapid_t>::iterator i = oi.snaps.begin(); i != oi.snaps.end(); ++i)
328
dout(10) << *iter << " has " << oi.snaps << " .. exists=" << exists << dendl;
334
keep = pgls_filter(filter, *iter, filter_out);
337
response.entries.push_back(iter->oid);
339
::encode(response, outdata);
341
::encode(filter_out, outdata);
343
dout(10) << " pgls result=" << result << " outdata.length()=" << outdata.length() << dendl;
302
::decode(response.handle, bp);
304
catch (const buffer::error& e) {
305
dout(0) << "unable to decode PGLS handle in " << *op << dendl;
311
hobject_t current = response.handle;
313
int r = osd->store->collection_list_partial(coll, current,
324
assert(snapid == CEPH_NOSNAP || missing.missing.empty());
325
map<hobject_t, Missing::item>::iterator missing_iter =
326
missing.missing.lower_bound(current);
327
vector<hobject_t>::iterator ls_iter = sentries.begin();
329
if (ls_iter == sentries.end()) {
334
if (missing_iter == missing.missing.end() ||
335
*ls_iter < missing_iter->first) {
336
candidate = *(ls_iter++);
338
candidate = (missing_iter++)->first;
341
if (response.entries.size() == p->op.pgls.count) {
345
// skip snapdir objects
346
if (candidate.snap == CEPH_SNAPDIR)
349
if (candidate.snap < snapid)
352
if (snapid != CEPH_NOSNAP) {
354
if (candidate.snap == CEPH_NOSNAP) {
355
osd->store->getattr(coll, candidate, SS_ATTR, bl);
357
if (snapid <= snapset.seq)
361
osd->store->getattr(coll, candidate, OI_ATTR, attr_bl);
362
object_info_t oi(attr_bl);
363
vector<snapid_t>::iterator i = find(oi.snaps.begin(),
366
if (i == oi.snaps.end())
371
if (filter && !pgls_filter(filter, candidate, filter_out))
374
response.entries.push_back(make_pair(candidate.oid,
375
candidate.get_key()));
377
response.handle = next;
378
::encode(response, outdata);
380
::encode(filter_out, outdata);
381
dout(10) << " pgls result=" << result << " outdata.length()="
382
<< outdata.length() << dendl;
497
567
dout(10) << "do_op mode now " << mode << dendl;
569
// are writes blocked by another object?
570
if (obc->blocked_by) {
571
dout(10) << "do_op writes for " << obc->obs.oi.soid << " blocked by "
572
<< obc->blocked_by->obs.oi.soid << dendl;
573
wait_for_degraded_object(obc->blocked_by->obs.oi.soid, op);
574
put_object_context(obc);
578
// if we have src_oids, we need to be careful of the target being
579
// before and a src being after the last_backfill line, or else the
580
// operation won't apply properly on the backfill_target. (the
581
// opposite is not a problem; if the target is after the line, we
582
// don't apply on the backfill_target and it doesn't matter.)
583
Info *backfill_target_info = NULL;
584
bool before_backfill = false;
585
if (backfill_target >= 0) {
586
backfill_target_info = &peer_info[backfill_target];
587
before_backfill = obc->obs.oi.soid < backfill_target_info->last_backfill;
500
591
map<hobject_t,ObjectContext*> src_obc;
501
592
for (vector<OSDOp>::iterator p = op->ops.begin(); p != op->ops.end(); p++) {
502
593
OSDOp& osd_op = *p;
503
object_locator_t src_oloc;
504
get_src_oloc(op->get_oid(), op->get_object_locator(), src_oloc);
505
hobject_t toid(osd_op.soid, src_oloc.key, op->get_pg().ps());
594
if (!ceph_osd_op_type_multi(osd_op.op.op))
506
596
if (osd_op.soid.oid.name.length()) {
507
if (!src_obc.count(toid)) {
597
object_locator_t src_oloc;
598
get_src_oloc(op->get_oid(), op->get_object_locator(), src_oloc);
599
hobject_t src_oid(osd_op.soid, src_oloc.key, op->get_pg().ps());
600
if (!src_obc.count(src_oid)) {
508
601
ObjectContext *sobc;
509
602
snapid_t ssnapid;
511
int r = find_object_context(hobject_t(toid.oid, src_oloc.key,
512
toid.snap, op->get_pg().ps()),
514
&sobc, false, &ssnapid);
604
int r = find_object_context(src_oid, src_oloc, &sobc, false, &ssnapid);
515
605
if (r == -EAGAIN) {
516
606
// missing the specific snap we need; requeue and wait.
517
607
hobject_t wait_oid(osd_op.soid.oid, src_oloc.key, ssnapid, op->get_pg().ps());
518
608
wait_for_missing_object(wait_oid, op);
520
610
osd->reply_op_error(op, r);
521
} else if (is_degraded_object(sobc->obs.oi.soid)) {
522
wait_for_degraded_object(sobc->obs.oi.soid, op);
523
611
} else if (sobc->obs.oi.oloc.key != obc->obs.oi.oloc.key &&
524
612
sobc->obs.oi.oloc.key != obc->obs.oi.soid.oid.name &&
525
613
sobc->obs.oi.soid.oid.name != obc->obs.oi.oloc.key) {
526
614
dout(1) << " src_oid " << osd_op.soid << " oloc " << sobc->obs.oi.oloc << " != "
527
615
<< op->get_oid() << " oloc " << obc->obs.oi.oloc << dendl;
528
616
osd->reply_op_error(op, -EINVAL);
617
} else if (is_degraded_object(sobc->obs.oi.soid) ||
618
(before_backfill && sobc->obs.oi.soid > backfill_target_info->last_backfill)) {
619
wait_for_degraded_object(sobc->obs.oi.soid, op);
620
dout(10) << " writes for " << obc->obs.oi.soid << " now blocked by "
621
<< sobc->obs.oi.soid << dendl;
623
obc->blocked_by = sobc;
625
sobc->blocking.insert(obc);
530
src_obc[toid] = sobc;
627
dout(10) << " src_oid " << src_oid << " obc " << src_obc << dendl;
628
src_obc[src_oid] = sobc;
533
631
// Error cleanup below
716
816
osd->logger->inc(l_osd_op_outb, outb);
717
817
osd->logger->inc(l_osd_op_inb, inb);
718
osd->logger->fset(l_osd_op_lat, latency);
818
osd->logger->finc(l_osd_op_lat, latency);
720
820
if (op->may_read() && op->may_write()) {
721
821
osd->logger->inc(l_osd_op_rw);
722
822
osd->logger->inc(l_osd_op_rw_inb, inb);
723
823
osd->logger->inc(l_osd_op_rw_outb, outb);
724
osd->logger->fset(l_osd_op_rw_rlat, rlatency);
725
osd->logger->fset(l_osd_op_rw_lat, latency);
824
osd->logger->finc(l_osd_op_rw_rlat, rlatency);
825
osd->logger->finc(l_osd_op_rw_lat, latency);
726
826
} else if (op->may_read()) {
727
827
osd->logger->inc(l_osd_op_r);
728
828
osd->logger->inc(l_osd_op_r_outb, outb);
729
osd->logger->fset(l_osd_op_r_lat, latency);
829
osd->logger->finc(l_osd_op_r_lat, latency);
730
830
} else if (op->may_write()) {
731
831
osd->logger->inc(l_osd_op_w);
732
832
osd->logger->inc(l_osd_op_w_inb, inb);
733
osd->logger->fset(l_osd_op_w_rlat, rlatency);
734
osd->logger->fset(l_osd_op_w_lat, latency);
833
osd->logger->finc(l_osd_op_w_rlat, rlatency);
834
osd->logger->finc(l_osd_op_w_lat, latency);
814
917
sub_op_modify_reply(r);
920
void ReplicatedPG::do_scan(MOSDPGScan *m)
922
dout(10) << "do_scan " << *m << dendl;
925
case MOSDPGScan::OP_SCAN_GET_DIGEST:
929
scan_range(m->begin, g_conf->osd_backfill_scan_min, g_conf->osd_backfill_scan_max, &bi);
930
MOSDPGScan *reply = new MOSDPGScan(MOSDPGScan::OP_SCAN_DIGEST,
931
get_osdmap()->get_epoch(), m->query_epoch,
932
info.pgid, bi.begin, bi.end);
933
::encode(bi.objects, reply->get_data());
934
osd->cluster_messenger->send_message(reply, m->get_connection());
938
case MOSDPGScan::OP_SCAN_DIGEST:
940
int from = m->get_source().num();
941
assert(from == backfill_target);
942
BackfillInterval& bi = peer_backfill_info;
945
bufferlist::iterator p = m->get_data().begin();
946
::decode(bi.objects, p);
948
backfill_pos = backfill_info.begin > peer_backfill_info.begin ?
949
peer_backfill_info.begin : backfill_info.begin;
950
dout(10) << " backfill_pos now " << backfill_pos << dendl;
952
assert(waiting_on_backfill);
953
waiting_on_backfill = false;
954
finish_recovery_op(bi.begin);
962
void ReplicatedPG::do_backfill(MOSDPGBackfill *m)
964
dout(10) << "do_backfill " << *m << dendl;
967
case MOSDPGBackfill::OP_BACKFILL_FINISH:
969
assert(is_replica());
970
assert(g_conf->osd_kill_backfill_at != 1);
972
MOSDPGBackfill *reply = new MOSDPGBackfill(MOSDPGBackfill::OP_BACKFILL_FINISH_ACK,
973
get_osdmap()->get_epoch(), m->query_epoch,
975
osd->cluster_messenger->send_message(reply, m->get_connection());
979
case MOSDPGBackfill::OP_BACKFILL_PROGRESS:
981
assert(is_replica());
982
assert(g_conf->osd_kill_backfill_at != 2);
984
info.last_backfill = m->last_backfill;
985
info.stats.stats = m->stats;
987
ObjectStore::Transaction *t = new ObjectStore::Transaction;
989
int tr = osd->store->queue_transaction(&osr, t);
994
case MOSDPGBackfill::OP_BACKFILL_FINISH_ACK:
996
assert(is_primary());
997
assert(g_conf->osd_kill_backfill_at != 3);
998
finish_recovery_op(hobject_t::get_max());
817
1006
/* Returns head of snap_trimq as snap_to_trim and the relevant objects as
818
1007
* obs_to_trim */
819
1008
bool ReplicatedPG::get_obs_to_trim(snapid_t &snap_to_trim,
2885
3060
wr->set_data(repop->ctx->op->get_data()); // _copy_ bufferlist
2887
3062
// ship resulting transaction, log entries, and pg_stats
2888
::encode(repop->ctx->op_t, wr->get_data());
3063
if (peer == backfill_target && soid >= backfill_pos) {
3064
dout(10) << "issue_repop shipping empty opt to osd." << peer << ", object beyond backfill_pos "
3065
<< backfill_pos << ", last_backfill is " << pinfo.last_backfill << dendl;
3066
ObjectStore::Transaction t;
3067
::encode(t, wr->get_data());
3069
::encode(repop->ctx->op_t, wr->get_data());
2889
3071
::encode(repop->ctx->log, wr->logbl);
2890
wr->pg_stats = info.stats;
3073
if (backfill_target >= 0 && backfill_target == peer)
3074
wr->pg_stats = pinfo.stats; // reflects backfill progress
3076
wr->pg_stats = info.stats;
2893
3079
wr->pg_trim_to = pg_trim_to;
2894
osd->cluster_messenger->send_message(wr, osd->osdmap->get_cluster_inst(peer));
3080
osd->cluster_messenger->send_message(wr, get_osdmap()->get_cluster_inst(peer));
2896
3082
// keep peer_info up to date
2897
Info &in = peer_info[peer];
2898
in.last_update = ctx->at_version;
2899
if (in.last_complete == old_last_update)
2900
in.last_update = ctx->at_version;
3083
if (pinfo.last_complete == pinfo.last_update)
3084
pinfo.last_update = ctx->at_version;
3085
pinfo.last_update = ctx->at_version;
4927
5232
ObjectContext *headobc = get_object_context(head, OLOC_BLANK, false);
4929
5234
object_info_t oi(headobc->obs.oi);
4930
5236
oi.version = latest->version;
4931
5237
oi.prior_version = latest->prior_version;
4932
5238
bufferlist::iterator i = latest->snaps.begin();
4933
5239
::decode(oi.snaps, i);
4934
5240
assert(oi.snaps.size() > 0);
4935
5241
oi.copy_user_bits(headobc->obs.oi);
4936
_make_clone(*t, head, soid, &oi);
5243
ObjectContext *clone_obc = new ObjectContext(oi, true, NULL);
5245
clone_obc->ondisk_write_lock();
5246
clone_obc->ssc = get_snapset_context(soid.oid, soid.get_key(), soid.hash, true);
5247
register_object_context(clone_obc);
5249
_make_clone(*t, head, soid, &clone_obc->obs.oi);
5251
Context *onreadable = new C_OSD_AppliedRecoveredObject(this, t, clone_obc);
5252
Context *onreadable_sync = new C_OSD_OndiskWriteUnlock(clone_obc);
5253
int tr = osd->store->queue_transaction(&osr, t, onreadable, NULL, onreadable_sync);
4938
5256
put_object_context(headobc);
4940
// XXX: track objectcontext!
4941
int tr = osd->store->queue_transaction(&osr, t);
4943
5258
missing.got(latest->soid, latest->version);
4944
5259
missing_loc.erase(latest->soid);
5129
5444
return started;
5452
* backfilled: fully pushed to replica or present in replica's missing set (both
5453
* our copy and theirs).
5455
* All objects on backfill_target in [MIN,peer_backfill_info.begin) are either
5456
* not present or backfilled (all removed objects have been removed).
5457
* There may be PG objects in this interval yet to be backfilled.
5459
* All objects in PG in [MIN,backfill_info.begin) have been backfilled to
5460
* backfill_target. There may be objects on backfill_target yet to be deleted.
5462
* All objects < MIN(peer_backfill_info.begin, backfill_info.begin) in PG are
5463
* backfilled. No deleted objects in this interval remain on backfill_target.
5465
* peer_info[backfill_target].last_backfill = MIN(peer_backfill_info.begin,
5466
* backfill_info.begin, backfills_in_flight)
5468
int ReplicatedPG::recover_backfill(int max)
5470
dout(10) << "recover_backfill (" << max << ")" << dendl;
5471
assert(backfill_target >= 0);
5473
Info& pinfo = peer_info[backfill_target];
5474
BackfillInterval& pbi = peer_backfill_info;
5476
// Initialize from prior backfill state
5477
if (pbi.begin < pinfo.last_backfill) {
5478
pbi.reset(pinfo.last_backfill);
5479
backfill_info.reset(pinfo.last_backfill);
5482
dout(10) << " peer osd." << backfill_target
5483
<< " pos " << backfill_pos
5484
<< " info " << pinfo
5485
<< " interval " << pbi.begin << "-" << pbi.end
5486
<< " " << pbi.objects.size() << " objects" << dendl;
5488
int local_min = osd->store->get_ideal_list_min();
5489
int local_max = osd->store->get_ideal_list_max();
5491
// re-scan our local interval to cope with recent changes
5492
// FIXME: we could track the eversion_t when we last scanned, and invalidate
5493
// that way. or explicitly modify/invalidate when we actually change specific
5495
dout(10) << " rescanning local backfill_info from " << backfill_pos << dendl;
5496
backfill_info.clear();
5498
scan_range(backfill_pos, local_min, local_max, &backfill_info);
5501
map<hobject_t, pair<eversion_t, eversion_t> > to_push;
5502
map<hobject_t, eversion_t> to_remove;
5503
set<hobject_t> add_to_stat;
5506
backfill_info.trim();
5509
if (backfill_info.begin <= pbi.begin &&
5510
!backfill_info.extends_to_end() && backfill_info.empty()) {
5512
scan_range(backfill_info.end, local_min, local_max, &backfill_info);
5513
backfill_info.trim();
5515
backfill_pos = backfill_info.begin > pbi.begin ? pbi.begin : backfill_info.begin;
5517
dout(20) << " my backfill " << backfill_info.begin << "-" << backfill_info.end
5518
<< " " << backfill_info.objects << dendl;
5519
dout(20) << " peer backfill " << pbi.begin << "-" << pbi.end << " " << pbi.objects << dendl;
5521
if (pbi.begin <= backfill_info.begin &&
5522
!pbi.extends_to_end() && pbi.empty()) {
5523
dout(10) << " scanning peer osd." << backfill_target << " from " << pbi.end << dendl;
5524
epoch_t e = get_osdmap()->get_epoch();
5525
MOSDPGScan *m = new MOSDPGScan(MOSDPGScan::OP_SCAN_GET_DIGEST, e, e, info.pgid,
5526
pbi.end, hobject_t());
5527
osd->cluster_messenger->send_message(m, get_osdmap()->get_cluster_inst(backfill_target));
5528
waiting_on_backfill = true;
5529
start_recovery_op(pbi.end);
5534
if (backfill_info.empty() && pbi.empty()) {
5535
dout(10) << " reached end for both local and peer" << dendl;
5539
if (pbi.begin < backfill_info.begin) {
5540
dout(20) << " removing peer " << pbi.begin << dendl;
5541
to_remove[pbi.begin] = pbi.objects.begin()->second;
5544
} else if (pbi.begin == backfill_info.begin) {
5545
if (pbi.objects.begin()->second !=
5546
backfill_info.objects.begin()->second) {
5547
dout(20) << " replacing peer " << pbi.begin << " with local "
5548
<< backfill_info.objects.begin()->second << dendl;
5549
to_push[pbi.begin] = make_pair(backfill_info.objects.begin()->second,
5550
pbi.objects.begin()->second);
5553
dout(20) << " keeping peer " << pbi.begin << " "
5554
<< pbi.objects.begin()->second << dendl;
5556
add_to_stat.insert(pbi.begin);
5557
backfill_info.pop_front();
5560
dout(20) << " pushing local " << backfill_info.begin << " "
5561
<< backfill_info.objects.begin()->second
5562
<< " to peer osd." << backfill_target << dendl;
5563
to_push[backfill_info.begin] =
5564
make_pair(backfill_info.objects.begin()->second,
5566
add_to_stat.insert(backfill_info.begin);
5567
backfill_info.pop_front();
5571
backfill_pos = backfill_info.begin > pbi.begin ? pbi.begin : backfill_info.begin;
5573
for (set<hobject_t>::iterator i = add_to_stat.begin();
5574
i != add_to_stat.end();
5576
ObjectContext *obc = get_object_context(*i, OLOC_BLANK, false);
5578
add_object_context_to_pg_stat(obc, &stat);
5579
pending_backfill_updates[*i] = stat;
5580
put_object_context(obc);
5582
for (map<hobject_t, eversion_t>::iterator i = to_remove.begin();
5583
i != to_remove.end();
5585
send_remove_op(i->first, i->second, backfill_target);
5587
for (map<hobject_t, pair<eversion_t, eversion_t> >::iterator i = to_push.begin();
5590
push_backfill_object(i->first, i->second.first, i->second.second, backfill_target);
5593
hobject_t bound = backfills_in_flight.size() ?
5594
*(backfills_in_flight.begin()) : backfill_pos;
5595
if (bound > pinfo.last_backfill) {
5596
pinfo.last_backfill = bound;
5597
for (map<hobject_t, pg_stat_t>::iterator i = pending_backfill_updates.begin();
5598
i != pending_backfill_updates.end() && i->first < bound;
5599
pending_backfill_updates.erase(i++)) {
5600
pinfo.stats.add(i->second);
5602
epoch_t e = get_osdmap()->get_epoch();
5603
MOSDPGBackfill *m = NULL;
5604
if (bound.is_max()) {
5605
m = new MOSDPGBackfill(MOSDPGBackfill::OP_BACKFILL_FINISH, e, e, info.pgid);
5606
if (info.stats.stats.sum.num_bytes != pinfo.stats.stats.sum.num_bytes)
5607
osd->clog.error() << info.pgid << " backfill osd." << backfill_target << " stat mismatch on finish: "
5608
<< "num_bytes " << pinfo.stats.stats.sum.num_bytes
5609
<< " != expected " << info.stats.stats.sum.num_bytes << "\n";
5610
if (info.stats.stats.sum.num_objects != pinfo.stats.stats.sum.num_objects)
5611
osd->clog.error() << info.pgid << " backfill osd." << backfill_target << " stat mismatch on finish: "
5612
<< "num_objects " << pinfo.stats.stats.sum.num_objects
5613
<< " != expected " << info.stats.stats.sum.num_objects << "\n";
5614
start_recovery_op(hobject_t::get_max());
5616
m = new MOSDPGBackfill(MOSDPGBackfill::OP_BACKFILL_PROGRESS, e, e, info.pgid);
5618
m->last_backfill = bound;
5619
m->stats = pinfo.stats.stats;
5620
osd->cluster_messenger->send_message(m, get_osdmap()->get_cluster_inst(backfill_target));
5623
dout(10) << " peer num_objects now " << pinfo.stats.stats.sum.num_objects
5624
<< " / " << info.stats.stats.sum.num_objects << dendl;
5628
void ReplicatedPG::push_backfill_object(hobject_t oid, eversion_t v, eversion_t have, int peer)
5630
dout(10) << "push_backfill_object " << oid << " v " << v << " to osd." << peer << dendl;
5632
backfills_in_flight.insert(oid);
5634
start_recovery_op(oid);
5635
ObjectContext *obc = get_object_context(oid, OLOC_BLANK, false);
5636
obc->ondisk_read_lock();
5637
push_to_replica(obc, oid, peer);
5638
obc->ondisk_read_unlock();
5639
put_object_context(obc);
5642
void ReplicatedPG::scan_range(hobject_t begin, int min, int max, BackfillInterval *bi)
5644
assert(is_locked());
5645
dout(10) << "scan_range from " << begin << dendl;
5647
bi->objects.clear(); // for good measure
5649
vector<hobject_t> ls;
5651
int r = osd->store->collection_list_partial(coll, begin, min, max,
5654
dout(10) << " got " << ls.size() << " items, next " << bi->end << dendl;
5655
dout(20) << ls << dendl;
5657
for (vector<hobject_t>::iterator p = ls.begin(); p != ls.end(); ++p) {
5658
ObjectContext *obc = NULL;
5660
obc = _lookup_object_context(*p);
5662
bi->objects[*p] = obc->obs.oi.version;
5663
dout(20) << " " << *p << " " << obc->obs.oi.version << dendl;
5666
int r = osd->store->getattr(coll, *p, OI_ATTR, bl);
5668
object_info_t oi(bl);
5669
bi->objects[*p] = oi.version;
5670
dout(20) << " " << *p << " " << oi.version << dendl;
5132
5676
void ReplicatedPG::remove_object_with_snap_hardlinks(ObjectStore::Transaction& t, const hobject_t& soid)
5134
5678
t.remove(coll, soid);