139
140
// ==========================================================
141
/** preprocess_op - preprocess an op (before it gets queued).
144
bool ReplicatedPG::preprocess_op(MOSDOp *op, utime_t now)
147
// we only care about reads here on out..
148
if (op->may_write() ||
152
ceph_osd_op& readop = op->ops[0];
154
object_t oid = op->get_oid();
155
sobject_t soid(oid, op->get_snapid());
157
// -- load balance reads --
159
// -- read on primary+acker ---
163
if (acting.size() > 1) {
164
int peer = acting[1];
165
dout(-10) << "preprocess_op fwd client read op to osd" << peer
166
<< " for " << op->get_orig_source() << " " << op->get_orig_source_inst() << dendl;
167
osd->messenger->forward_message(op, osd->osdmap->get_inst(peer));
174
if (g_conf.osd_balance_reads &&
175
!op->get_source().is_osd()) {
177
bool is_flash_crowd_candidate = false;
178
if (g_conf.osd_flash_crowd_iat_threshold > 0) {
179
osd->iat_averager.add_sample( oid, (double)g_clock.now() );
180
is_flash_crowd_candidate = osd->iat_averager.is_flash_crowd_candidate( oid );
185
if (stat_object_temp_rd.count(soid))
186
temp = stat_object_temp_rd[soid].get(op->get_recv_stamp());
187
bool is_hotly_read = temp > g_conf.osd_balance_reads_temp;
189
dout(20) << "balance_reads oid " << oid << " temp " << temp
190
<< (is_hotly_read ? " hotly_read":"")
191
<< (is_flash_crowd_candidate ? " flash_crowd_candidate":"")
194
bool should_balance = is_flash_crowd_candidate || is_hotly_read;
195
bool is_balanced = false;
197
// *** FIXME *** this may block, and we're in the fast path! ***
198
if (g_conf.osd_balance_reads &&
199
osd->store->getattr(info.pgid.to_coll(), soid, "balance-reads", &b, 1) >= 0)
202
if (!is_balanced && should_balance &&
203
balancing_reads.count(soid) == 0) {
204
dout(-10) << "preprocess_op balance-reads on " << oid << dendl;
205
balancing_reads.insert(soid);
206
ceph_object_layout layout;
207
layout.ol_pgid = info.pgid.u.pg64;
208
layout.ol_stripe_unit = 0;
209
MOSDOp *pop = new MOSDOp(0, osd->get_tid(),
212
osd->osdmap->get_epoch(),
213
CEPH_OSD_FLAG_MODIFY);
214
pop->add_simple_op(CEPH_OSD_OP_BALANCEREADS, 0, 0);
217
if (is_balanced && !should_balance &&
218
!unbalancing_reads.count(soid) == 0) {
219
dout(-10) << "preprocess_op unbalance-reads on " << oid << dendl;
220
unbalancing_reads.insert(soid);
221
ceph_object_layout layout;
222
layout.ol_pgid = info.pgid.u.pg64;
223
layout.ol_stripe_unit = 0;
224
MOSDOp *pop = new MOSDOp(0, osd->get_tid(),
227
osd->osdmap->get_epoch(),
228
CEPH_OSD_FLAG_MODIFY);
229
pop->add_simple_op(CEPH_OSD_OP_UNBALANCEREADS, 0, 0);
236
if (g_conf.osd_shed_reads &&
237
g_conf.osd_stat_refresh_interval > 0 &&
238
!op->get_source().is_osd()) { // no re-shedding!
239
Mutex::Locker lock(osd->peer_stat_lock);
241
osd->_refresh_my_stat(now);
244
// TODO xxx we must also compare with our own load
245
// if i am x percentage higher than replica ,
249
double bestscore = 0.0; // highest positive score wins
251
// we calculate score values such that we can interpret them as a probability.
253
switch (g_conf.osd_shed_reads) {
255
// above some minimum?
256
if (osd->my_stat.read_latency >= g_conf.osd_shed_reads_min_latency) {
257
for (unsigned i=1; i<acting.size(); ++i) {
258
int peer = acting[i];
259
if (osd->peer_stat.count(peer) == 0) continue;
261
// assume a read_latency of 0 (technically, undefined) is OK, since
262
// we'll be corrected soon enough if we're wrong.
264
double plat = osd->peer_stat[peer].read_latency_mine;
266
double diff = osd->my_stat.read_latency - plat;
267
if (diff < g_conf.osd_shed_reads_min_latency_diff) continue;
269
double c = .002; // add in a constant to smooth it a bit
271
(c+osd->my_stat.read_latency) /
273
double p = (latratio - 1.0) / 2.0 / latratio;
274
dout(15) << "preprocess_op " << op->get_reqid()
275
<< " my read latency " << osd->my_stat.read_latency
276
<< ", peer osd" << peer << " is " << plat << " (" << osd->peer_stat[peer].read_latency << ")"
277
<< ", latratio " << latratio
280
if (latratio > g_conf.osd_shed_reads_min_latency_ratio &&
292
if (osd->my_stat.read_latency >= g_conf.osd_shed_reads_min_latency) {
293
for (unsigned i=1; i<acting.size(); ++i) {
294
int peer = acting[i];
295
if (osd->peer_stat.count(peer) == 0/* ||
296
osd->peer_stat[peer].read_latency <= 0*/) continue;
298
if (osd->peer_stat[peer].qlen < osd->my_stat.qlen) {
300
if (osd->my_stat.read_latency - osd->peer_stat[peer].read_latency >
301
g_conf.osd_shed_reads_min_latency_diff) continue;
303
double qratio = osd->pending_ops / osd->peer_stat[peer].qlen;
305
double c = .002; // add in a constant to smooth it a bit
307
(c+osd->my_stat.read_latency)/
308
(c+osd->peer_stat[peer].read_latency);
309
double p = (latratio - 1.0) / 2.0 / latratio;
311
dout(-15) << "preprocess_op " << op->get_reqid()
312
<< " my qlen / rdlat "
313
<< osd->pending_ops << " " << osd->my_stat.read_latency
314
<< ", peer osd" << peer << " is "
315
<< osd->peer_stat[peer].qlen << " " << osd->peer_stat[peer].read_latency
316
<< ", qratio " << qratio
317
<< ", latratio " << latratio
320
if (latratio > g_conf.osd_shed_reads_min_latency_ratio &&
332
case LOAD_QUEUE_SIZE:
333
// am i above my average? -- dumb
334
if (osd->pending_ops > osd->my_stat.qlen) {
335
// yes. is there a peer who is below my average?
336
for (unsigned i=1; i<acting.size(); ++i) {
337
int peer = acting[i];
338
if (osd->peer_stat.count(peer) == 0) continue;
339
if (osd->peer_stat[peer].qlen < osd->my_stat.qlen) {
340
// calculate a probability that we should redirect
341
float p = (osd->my_stat.qlen - osd->peer_stat[peer].qlen) / osd->my_stat.qlen; // this is dumb.
344
dout(10) << "my qlen " << osd->pending_ops << " > my_avg " << osd->my_stat.qlen
345
<< ", peer osd" << peer << " has qlen " << osd->peer_stat[peer].qlen
363
dout(10) << "preprocess_op shedding read to peer osd" << shedto
364
<< " " << op->get_reqid()
366
op->set_peer_stat(osd->my_stat);
367
osd->messenger->forward_message(op, osd->osdmap->get_inst(shedto));
368
osd->stat_rd_ops_shed_out++;
369
osd->logger->inc(l_osd_shdout);
373
} // endif balance reads
377
// if this is a read and the data is in the cache, do an immediate read..
378
if ( g_conf.osd_immediate_read_from_cache ) {
379
if (osd->store->is_cached(info.pgid.to_coll(), soid,
380
readop.extent.offset,
381
readop.length) == 0) {
382
if (!is_primary() && !op->get_source().is_osd()) {
385
if (osd->store->getattr(info.pgid.to_coll(), soid, "balance-reads", &v, 1) < 0) {
386
dout(-10) << "preprocess_op in-cache but no balance-reads on " << oid
387
<< ", fwd to primary" << dendl;
388
osd->messenger->forward_message(op, osd->osdmap->get_inst(get_primary()));
394
dout(10) << "preprocess_op data is in cache, reading from cache" << *op << dendl;
403
141
void ReplicatedPG::do_pg_op(MOSDOp *op)
405
dout(0) << "do_pg_op " << *op << dendl;
143
dout(10) << "do_pg_op " << *op << dendl;
407
145
bufferlist outdata;
502
242
return do_pg_op(op);
504
244
dout(10) << "do_op " << *op << dendl;
245
if (finalizing_scrub && op->may_write()) {
246
dout(20) << __func__ << ": waiting for scrub" << dendl;
247
waiting_for_active.push_back(op);
506
251
entity_inst_t client = op->get_source_inst();
508
253
ObjectContext *obc;
509
254
bool can_create = op->may_write();
510
int r = find_object_context(op->get_oid(), op->get_snapid(), &obc, can_create);
256
int r = find_object_context(op->get_oid(), op->get_object_locator(),
257
op->get_snapid(), &obc, can_create, &snapid);
512
assert(r != -EAGAIN);
260
// missing the specific snap we need; requeue and wait.
261
assert(!can_create); // only happens on a read
262
sobject_t soid(op->get_oid(), snapid);
263
wait_for_missing_object(soid, op);
513
266
osd->reply_op_error(op, r);
270
if ((op->may_read()) && (obc->obs.oi.lost)) {
271
// This object is lost. Reading from it returns an error.
272
dout(20) << __func__ << ": object " << obc->obs.oi.soid
273
<< " is lost" << dendl;
274
osd->reply_op_error(op, -ENFILE);
277
dout(25) << __func__ << ": object " << obc->obs.oi.soid
278
<< " has oi of " << obc->obs.oi << dendl;
518
281
dout(10) << "do_op mode is " << mode << dendl;
770
565
if (!mode.try_write(nobody)) {
771
566
dout(10) << " can't write, waiting" << dendl;
773
mode.waiting_cond.push_back(&cond);
568
list<Cond*>::iterator q = mode.waiting_cond.insert(mode.waiting_cond.end(), &cond);
774
569
while (!mode.try_write(nobody))
775
570
cond.Wait(_lock);
571
mode.waiting_cond.erase(q);
776
572
dout(10) << " done waiting" << dendl;
777
if (!is_primary() || !is_active())
573
if (!(current_set_started == info.history.last_epoch_started &&
781
579
// load clone info
783
osd->store->getattr(coll_t::build_pg_coll(info.pgid), coid, OI_ATTR, bl);
784
object_info_t coi(bl);
582
int r = find_object_context(coid.oid, OLOC_BLANK, sn, &obc, false, NULL);
584
assert(obc->registered);
585
object_info_t &coi = obc->obs.oi;
586
vector<snapid_t>& snaps = coi.snaps;
786
588
// get snap set context
787
SnapSetContext *ssc = get_snapset_context(coid.oid, false);
590
obc->obs.ssc = get_snapset_context(coid.oid, false);
591
SnapSetContext *ssc = obc->obs.ssc;
789
593
SnapSet& snapset = ssc->snapset;
790
vector<snapid_t>& snaps = coi.snaps;
792
595
dout(10) << coid << " snaps " << snaps << " old snapset " << snapset << dendl;
793
596
assert(snapset.seq);
797
int r = find_object_context(coid.oid, sn, &obc, false);
799
register_object_context(obc);
801
598
vector<OSDOp> ops;
802
599
tid_t rep_tid = osd->get_tid();
803
osd_reqid_t reqid(osd->messenger->get_myname(), 0, rep_tid);
600
osd_reqid_t reqid(osd->cluster_messenger->get_myname(), 0, rep_tid);
804
601
OpContext *ctx = new OpContext(NULL, reqid, ops, &obc->obs, this);
805
602
ctx->mtime = g_clock.now();
861
658
// save adjusted snaps for this object
862
659
dout(10) << coid << " snaps " << snaps << " -> " << newsnaps << dendl;
863
660
coi.snaps.swap(newsnaps);
661
coi.prior_version = coi.version;
662
coi.version = ctx->at_version;
865
664
::encode(coi, bl);
866
t->setattr(coll_t::build_pg_coll(info.pgid), coid, OI_ATTR, bl);
665
t->setattr(coll_t(info.pgid), coid, OI_ATTR, bl);
868
667
if (snaps[0] != newsnaps[0]) {
869
t->collection_remove(coll_t::build_snap_pg_coll(info.pgid, snaps[0]), coid);
870
t->collection_add(coll_t::build_snap_pg_coll(info.pgid, newsnaps[0]), coll_t::build_pg_coll(info.pgid), coid);
668
t->collection_remove(coll_t(info.pgid, snaps[0]), coid);
669
t->collection_add(coll_t(info.pgid, newsnaps[0]), coll_t(info.pgid), coid);
872
671
if (snaps.size() > 1 && snaps[snaps.size()-1] != newsnaps[newsnaps.size()-1]) {
873
t->collection_remove(coll_t::build_snap_pg_coll(info.pgid, snaps[snaps.size()-1]), coid);
672
t->collection_remove(coll_t(info.pgid, snaps[snaps.size()-1]), coid);
874
673
if (newsnaps.size() > 1)
875
t->collection_add(coll_t::build_snap_pg_coll(info.pgid, newsnaps[newsnaps.size()-1]), coll_t::build_pg_coll(info.pgid), coid);
674
t->collection_add(coll_t(info.pgid, newsnaps[newsnaps.size()-1]), coll_t(info.pgid), coid);
878
ctx->log.push_back(Log::Entry(Log::Entry::MODIFY, coid, ctx->at_version, ctx->obs->oi.version,
677
ctx->log.push_back(Log::Entry(Log::Entry::MODIFY, coid, coi.version, coi.prior_version,
879
678
osd_reqid_t(), ctx->mtime));
880
679
ctx->at_version.version++;
925
case CEPH_OSD_OP_MAPEXT:
927
// read into a buffer
929
int r = osd->store->fiemap(coll, soid, op.extent.offset, op.extent.length, bl);
931
if (odata.length() == 0)
932
ctx->data_off = op.extent.offset; */
936
info.stats.num_rd_kb += SHIFT_ROUND_UP(op.extent.length, 10);
938
dout(10) << " map_extents done on object " << soid << dendl;
943
case CEPH_OSD_OP_SPARSE_READ:
945
if (op.extent.truncate_seq) {
946
dout(0) << "sparse_read does not support truncation sequence " << dendl;
950
// read into a buffer
953
int r = osd->store->fiemap(coll, soid, op.extent.offset, op.extent.length, bl);
958
map<off_t, size_t> m;
959
bufferlist::iterator iter = bl.begin();
961
map<off_t, size_t>::iterator miter;
963
for (miter = m.begin(); miter != m.end(); ++miter) {
965
r = osd->store->read(coll, soid, miter->first, miter->second, tmpbl);
969
if (r < (int)miter->second) /* this is usually happen when we get extent that exceeds the actual file size */
972
dout(10) << "sparse-read " << miter->first << "@" << miter->second << dendl;
973
data_bl.claim_append(tmpbl);
981
op.extent.length = total_read;
984
::encode(data_bl, odata);
986
info.stats.num_rd_kb += SHIFT_ROUND_UP(op.extent.length, 10);
989
dout(10) << " sparse_read got " << total_read << " bytes from object " << soid << dendl;
1117
993
case CEPH_OSD_OP_CALL:
1119
995
bufferlist indata;
1782
1665
* 2) Clone correct snapshot into head
1783
1666
* 3) Calculate clone_overlaps by following overlaps
1784
1667
* forward from rollback snapshot */
1785
dout(10) << "_rollback_to deleting " << soid.oid
1786
<< " and rolling back to old snap" << dendl;
1788
sobject_t new_head = get_object_context(ctx->obs->oi.soid)->obs.oi.soid;
1668
dout(10) << "_rollback_to deleting " << soid.oid
1669
<< " and rolling back to old snap" << dendl;
1790
1671
_delete_head(ctx);
1791
1672
ctx->obs->exists = true; //we're about to recreate it
1793
1674
map<string, bufferptr> attrs;
1794
t.clone(coll_t::build_pg_coll(info.pgid),
1795
rollback_to_sobject, new_head);
1796
osd->store->getattrs(coll_t::build_pg_coll(info.pgid),
1675
t.clone(coll_t(info.pgid),
1676
rollback_to_sobject, soid);
1677
osd->store->getattrs(coll_t(info.pgid),
1797
1678
rollback_to_sobject, attrs, false);
1798
1679
osd->filter_xattrs(attrs);
1799
t.setattrs(coll_t::build_pg_coll(info.pgid), new_head, attrs);
1680
t.setattrs(coll_t(info.pgid), soid, attrs);
1800
1681
ssc->snapset.head_exists = true;
1683
// Adjust the cached objectcontext
1684
ObjectContext *clone_context = get_object_context(rollback_to_sobject,
1687
assert(clone_context);
1688
ctx->obs->oi.size = clone_context->obs.oi.size;
1802
1690
map<snapid_t, interval_set<uint64_t> >::iterator iter =
1803
1691
ssc->snapset.clone_overlap.lower_bound(snapid);
1804
1692
interval_set<uint64_t> overlaps = iter->second;
1693
assert(iter != ssc->snapset.clone_overlap.end());
1806
1695
iter != ssc->snapset.clone_overlap.end();
1862
1751
snaps[i] = snapc.snaps[i];
1864
1753
// prepare clone
1865
ctx->clone_obc = new ObjectContext(coid);
1866
ctx->clone_obc->obs.oi.version = ctx->at_version;
1867
ctx->clone_obc->obs.oi.prior_version = oi.version;
1868
ctx->clone_obc->obs.oi.last_reqid = oi.last_reqid;
1869
ctx->clone_obc->obs.oi.mtime = oi.mtime;
1870
ctx->clone_obc->obs.oi.snaps = snaps;
1871
ctx->clone_obc->obs.exists = true;
1872
ctx->clone_obc->get();
1754
object_info_t static_snap_oi(coid, oi.oloc);
1755
object_info_t *snap_oi;
1757
ctx->clone_obc = new ObjectContext(static_snap_oi, true, NULL);
1758
ctx->clone_obc->get();
1875
1759
register_object_context(ctx->clone_obc);
1877
_make_clone(t, soid, coid, &ctx->clone_obc->obs.oi);
1760
snap_oi = &ctx->clone_obc->obs.oi;
1762
snap_oi = &static_snap_oi;
1764
snap_oi->version = ctx->at_version;
1765
snap_oi->prior_version = oi.version;
1766
snap_oi->copy_user_bits(oi);
1767
snap_oi->snaps = snaps;
1768
_make_clone(t, soid, coid, snap_oi);
1879
1770
// add to snap bound collections
1880
1771
coll_t fc = make_snap_collection(t, snaps[0]);
1881
t.collection_add(fc, coll_t::build_pg_coll(info.pgid), coid);
1772
t.collection_add(fc, coll_t(info.pgid), coid);
1882
1773
if (snaps.size() > 1) {
1883
1774
coll_t lc = make_snap_collection(t, snaps[snaps.size()-1]);
1884
t.collection_add(lc, coll_t::build_pg_coll(info.pgid), coid);
1775
t.collection_add(lc, coll_t(info.pgid), coid);
1887
1778
info.stats.num_objects++;
1888
1779
info.stats.num_object_clones++;
1889
1780
ssc->snapset.clones.push_back(coid.snap);
1890
1781
ssc->snapset.clone_size[coid.snap] = ctx->obs->oi.size;
1783
// clone_overlap should contain an entry for each clone
1784
// (an empty interval_set if there is no overlap)
1785
ssc->snapset.clone_overlap[coid.snap];
1891
1786
if (ctx->obs->oi.size)
1892
1787
ssc->snapset.clone_overlap[coid.snap].insert(0, ctx->obs->oi.size);
1997
1892
ctx->log.push_back(Log::Entry(Log::Entry::MODIFY, snapoid, ctx->at_version, old_version,
1998
1893
osd_reqid_t(), ctx->mtime));
2000
ctx->snapset_obc = get_object_context(snapoid, true);
1895
ctx->snapset_obc = get_object_context(snapoid, poi->oloc, true);
2001
1896
ctx->snapset_obc->obs.exists = true;
2002
1897
ctx->snapset_obc->obs.oi.version = ctx->at_version;
2003
1898
ctx->snapset_obc->obs.oi.last_reqid = ctx->reqid;
2004
1899
ctx->snapset_obc->obs.oi.mtime = ctx->mtime;
2005
register_object_context(ctx->snapset_obc);
1900
assert(ctx->snapset_obc->registered);
2007
1902
bufferlist bv(sizeof(*poi));
2008
1903
::encode(ctx->snapset_obc->obs.oi, bv);
2009
ctx->op_t.touch(coll_t::build_pg_coll(info.pgid), snapoid);
2010
ctx->op_t.setattr(coll_t::build_pg_coll(info.pgid), snapoid, OI_ATTR, bv);
2011
ctx->op_t.setattr(coll_t::build_pg_coll(info.pgid), snapoid, SS_ATTR, bss);
1904
ctx->op_t.touch(coll_t(info.pgid), snapoid);
1905
ctx->op_t.setattr(coll_t(info.pgid), snapoid, OI_ATTR, bv);
1906
ctx->op_t.setattr(coll_t(info.pgid), snapoid, SS_ATTR, bss);
2630
// ----------------------
2631
// balance reads cruft
2636
if ((op->get_snapid() == 0 || op->get_snapid() == CEPH_NOSNAP) &&
2637
block_if_wrlocked(op, *ctx.poi))
2642
// !primary and unbalanced?
2643
// (ignore ops forwarded from the primary)
2644
if (!is_primary()) {
2645
if (op->get_source().is_osd() &&
2646
op->get_source().num() == get_primary()) {
2647
// read was shed to me by the primary
2648
int from = op->get_source().num();
2649
assert(op->get_flags() & CEPH_OSD_FLAG_PEERSTAT);
2650
osd->take_peer_stat(from, op->get_peer_stat());
2651
dout(10) << "read shed IN from " << op->get_source()
2652
<< " " << op->get_reqid()
2653
<< ", me = " << osd->my_stat.read_latency_mine
2654
<< ", them = " << op->get_peer_stat().read_latency
2655
<< (osd->my_stat.read_latency_mine > op->get_peer_stat().read_latency ? " WTF":"")
2657
osd->logger->inc(l_osd_shdin);
2659
// does it look like they were wrong to do so?
2660
Mutex::Locker lock(osd->peer_stat_lock);
2661
if (osd->my_stat.read_latency_mine > op->get_peer_stat().read_latency &&
2662
osd->my_stat_on_peer[from].read_latency_mine < op->get_peer_stat().read_latency) {
2663
dout(-10) << "read shed IN from " << op->get_source()
2664
<< " " << op->get_reqid()
2665
<< " and me " << osd->my_stat.read_latency_mine
2666
<< " > them " << op->get_peer_stat().read_latency
2667
<< ", but they didn't know better, sharing" << dendl;
2668
osd->my_stat_on_peer[from] = osd->my_stat;
2670
osd->messenger->send_message(new MOSDPing(osd->osdmap->get_fsid(), osd->osdmap->get_epoch(),
2672
osd->osdmap->get_inst(from));
2676
// make sure i exist and am balanced, otherwise fw back to acker.
2678
if (!osd->store->exists(coll_t::build_pg_coll(info.pgid), soid) ||
2679
osd->store->getattr(coll_t::build_pg_coll(info.pgid), soid, "balance-reads", &b, 1) < 0) {
2680
dout(-10) << "read on replica, object " << soid
2681
<< " dne or no balance-reads, fw back to primary" << dendl;
2682
osd->messenger->forward_message(op, osd->osdmap->get_inst(get_primary()));
2691
// balance-reads set?
2693
if ((op->get_op() != CEPH_OSD_OP_BALANCEREADS && op->get_op() != CEPH_OSD_OP_UNBALANCEREADS) &&
2694
(osd->store->getattr(coll_t::build_pg_coll(info.pgid), soid, "balance-reads", &v, 1) >= 0 ||
2695
balancing_reads.count(soid.oid))) {
2697
if (!unbalancing_reads.count(soid.oid)) {
2699
dout(-10) << "preprocess_op unbalancing-reads on " << soid.oid << dendl;
2700
unbalancing_reads.insert(soid.oid);
2702
ceph_object_layout layout;
2703
layout.ol_pgid = info.pgid.u.pg64;
2704
layout.ol_stripe_unit = 0;
2705
MOSDOp *pop = new MOSDOp(0, osd->get_tid(),
2708
osd->osdmap->get_epoch(),
2709
CEPH_OSD_OP_UNBALANCEREADS, 0);
2713
// add to wait queue
2714
dout(-10) << "preprocess_op waiting for unbalance-reads on " << soid.oid << dendl;
2715
waiting_for_unbalanced_reads[soid.oid].push_back(op);
2723
if (!ctx->ops.empty() && // except noop; we just want to flush
2724
block_if_wrlocked(op, obc->oi)) {
2725
put_object_context(obc);
2727
return; // op will be handled later, after the object unlocks
2736
2550
// sub op modify
2738
2552
void ReplicatedPG::sub_op_modify(MOSDSubOp *op)
3376
3206
assert(!is_primary()); // we should be a replica or stray.
3378
3208
struct stat st;
3379
int r = osd->store->stat(coll_t::build_pg_coll(info.pgid), soid, &st);
3381
uint64_t size = st.st_size;
3383
bool complete = false;
3384
if (!op->data_subset.empty() && op->data_subset.end() >= size)
3387
send_push_op(soid, op->get_source().num(), size, op->first, complete, op->data_subset, op->clone_subsets);
3209
int r = osd->store->stat(coll_t(info.pgid), soid, &st);
3213
ss << op->get_source() << " tried to pull " << soid << " in " << info.pgid
3214
<< " but got " << strerror_r(-r, buf, sizeof(buf));
3215
osd->logclient.log(LOG_ERROR, ss);
3217
// FIXME: do something more intelligent.. mark the pg as needing repair?
3220
uint64_t size = st.st_size;
3222
bool complete = false;
3223
if (!op->data_subset.empty() && op->data_subset.range_end() >= size)
3226
// complete==true implies we are definitely complete.
3227
// complete==false means nothing. we don't know because the primary may
3228
// not be pulling the entire object.
3230
send_push_op(soid, op->get_source().num(), size, op->first, complete, op->data_subset, op->clone_subsets);
3520
3369
bufferlist result;
3522
for (map<uint64_t,uint64_t>::iterator p = data_subset.m.begin();
3523
p != data_subset.m.end();
3371
for (interval_set<uint64_t>::const_iterator p = data_subset.begin();
3372
p != data_subset.end();
3525
3374
interval_set<uint64_t> x;
3526
x.insert(p->first, p->second);
3375
x.insert(p.get_start(), p.get_len());
3527
3376
x.intersection_of(data_needed);
3528
dout(20) << " data_subset object extent " << p->first << "~" << p->second << " need " << x << dendl;
3377
dout(20) << " data_subset object extent " << p.get_start() << "~" << p.get_len() << " need " << x << dendl;
3529
3378
if (!x.empty()) {
3530
uint64_t first = x.m.begin()->first;
3531
uint64_t len = x.m.begin()->second;
3379
uint64_t first = x.begin().get_start();
3380
uint64_t len = x.begin().get_len();
3532
3381
bufferlist sub;
3533
int boff = off + (first - p->first);
3382
int boff = off + (first - p.get_start());
3534
3383
dout(20) << " keeping buffer extent " << boff << "~" << len << dendl;
3535
3384
sub.substr_of(data, boff, len);
3536
3385
result.claim_append(sub);
3540
3389
data.claim(result);
3541
3390
dout(20) << " new data len is " << data.length() << dendl;
3393
// did we get everything we wanted?
3394
if (pi->data_subset.empty()) {
3397
complete = pi->data_subset.range_end() == data_subset.range_end();
3400
if (op->complete && !complete) {
3401
dout(0) << " uh oh, we reached EOF on peer before we got everything we wanted" << dendl;
3403
// hmm, do we have another source?
3404
int from = op->get_source().num();
3405
set<int>& reps = missing_loc[soid];
3406
dout(0) << " we have reps on osds " << reps << dendl;
3407
set<int>::iterator q = reps.begin();
3408
if (q != reps.end() && *q == from) {
3410
if (q != reps.end()) {
3411
dout(0) << " trying next replica on osd" << *q << dendl;
3412
reps.erase(reps.begin()); // forget about the bad replica...
3413
finish_recovery_op(soid); // close out this attempt,
3414
pulling.erase(soid);
3415
pull(soid); // and try again.
3544
// head|unversioned. for now, primary will _only_ pull full copies of the head.
3423
// head|unversioned. for now, primary will _only_ pull data copies of the head (no cloning)
3545
3424
assert(op->clone_subsets.empty());
3548
if (pi->data_subset.empty()) {
3551
complete = pi->data_subset.end() == data_subset.end();
3553
assert(complete == op->complete);
3555
3427
dout(15) << " data_subset " << data_subset
3556
3428
<< " clone_subsets " << clone_subsets
3575
3447
uint64_t boff = 0;
3576
for (map<uint64_t,uint64_t>::iterator p = data_subset.m.begin();
3577
p != data_subset.m.end();
3448
for (interval_set<uint64_t>::const_iterator p = data_subset.begin();
3449
p != data_subset.end();
3579
3451
bufferlist bit;
3580
bit.substr_of(data, boff, p->second);
3581
dout(15) << " write " << p->first << "~" << p->second << dendl;
3582
t->write(target, soid, p->first, p->second, bit);
3452
bit.substr_of(data, boff, p.get_len());
3453
dout(15) << " write " << p.get_start() << "~" << p.get_len() << dendl;
3454
t->write(target, soid, p.get_start(), p.get_len(), bit);
3455
boff += p.get_len();
3586
3458
if (complete) {
3588
t->remove(coll_t::build_pg_coll(info.pgid), soid);
3589
t->collection_add(coll_t::build_pg_coll(info.pgid), target, soid);
3460
t->remove(coll_t(info.pgid), soid);
3461
t->collection_add(coll_t(info.pgid), target, soid);
3590
3462
t->collection_remove(target, soid);
3594
for (map<sobject_t, interval_set<uint64_t> >::iterator p = clone_subsets.begin();
3466
for (map<sobject_t, interval_set<uint64_t> >::const_iterator p = clone_subsets.begin();
3595
3467
p != clone_subsets.end();
3597
for (map<uint64_t,uint64_t>::iterator q = p->second.m.begin();
3598
q != p->second.m.end();
3600
dout(15) << " clone_range " << p->first << " " << q->first << "~" << q->second << dendl;
3601
t->clone_range(coll_t::build_pg_coll(info.pgid), p->first, soid, q->first, q->second);
3470
for (interval_set<uint64_t>::const_iterator q = p->second.begin();
3471
q != p->second.end();
3474
dout(15) << " clone_range " << p->first << " "
3475
<< q.get_start() << "~" << q.get_len() << dendl;
3476
t->clone_range(coll_t(info.pgid), p->first, soid,
3477
q.get_start(), q.get_len());
3604
3481
if (data_subset.empty())
3605
t->touch(coll_t::build_pg_coll(info.pgid), soid);
3482
t->touch(coll_t(info.pgid), soid);
3607
t->setattrs(coll_t::build_pg_coll(info.pgid), soid, op->attrset);
3484
t->setattrs(coll_t(info.pgid), soid, op->attrset);
3608
3485
if (soid.snap && soid.snap < CEPH_NOSNAP &&
3609
3486
op->attrset.count(OI_ATTR)) {
3829
3707
int started = 0;
3830
3708
assert(is_primary());
3834
if (uptodate_set.count(osd->whoami))
3835
n = recover_replicas(max);
3837
n = recover_primary(max);
3839
osd->logger->inc(l_osd_rop, n);
3710
int num_missing = missing.num_missing();
3711
int num_unfound = get_num_unfound();
3713
if (num_missing == 0) {
3714
info.last_complete = info.last_update;
3717
if (num_missing == num_unfound) {
3718
// All of the missing objects we have are unfound.
3719
// Recover the replicas.
3720
started = recover_replicas(max);
3722
// We still have missing objects that we should grab from replicas.
3723
started = recover_primary(max);
3726
osd->logger->inc(l_osd_rop, started);
3731
if (is_all_uptodate()) {
3732
dout(10) << __func__ << ": all OSDs in the PG are up-to-date!" << dendl;
3733
log.reset_recovery_pointers();
3734
ObjectStore::Transaction *t = new ObjectStore::Transaction;
3735
C_Contexts *fin = new C_Contexts;
3736
finish_recovery(*t, fin->contexts);
3737
int tr = osd->store->queue_transaction(&osr, t, new ObjectStore::C_DeleteTransaction(t), fin);
3741
dout(10) << __func__ << ": some OSDs are not up-to-date yet. "
3742
<< "Recovery isn't finished yet." << dendl;
3851
3749
* do one recovery op.
3852
3750
* return true if done, false if nothing left to do.
4005
3878
int ReplicatedPG::recover_replicas(int max)
3880
dout(10) << __func__ << "(" << max << ")" << dendl;
4007
3881
int started = 0;
4008
dout(-10) << "recover_replicas" << dendl;
4010
3883
// this is FAR from an optimal recovery order. pretty lame, really.
4011
3884
for (unsigned i=1; i<acting.size(); i++) {
4012
3885
int peer = acting[i];
4013
assert(peer_missing.count(peer));
3886
map<int, Missing>::const_iterator pm = peer_missing.find(peer);
3887
assert(pm != peer_missing.end());
3888
size_t m_sz = pm->second.num_missing();
4015
dout(10) << " peer osd" << peer << " missing " << peer_missing[peer] << dendl;
4016
dout(20) << " " << peer_missing[peer].missing << dendl;
3890
dout(10) << " peer osd" << peer << " missing " << m_sz << " objects." << dendl;
3891
dout(20) << " peer osd" << peer << " missing " << pm->second.missing << dendl;
4018
3893
// oldest first!
4019
Missing &m = peer_missing[peer];
4020
for (map<eversion_t, sobject_t>::iterator p = m.rmissing.begin();
3894
const Missing &m(pm->second);
3895
for (map<eversion_t, sobject_t>::const_iterator p = m.rmissing.begin();
4021
3896
p != m.rmissing.end() && started < max;
4023
sobject_t soid = p->second;
4024
if (pushing.count(soid))
4025
dout(10) << " already pushing " << soid << dendl;
4027
started += recover_object_replicas(soid);
3898
const sobject_t soid(p->second);
3899
if (pushing.count(soid)) {
3900
dout(10) << __func__ << ": already pushing " << soid << dendl;
3903
if (missing.is_missing(soid)) {
3904
if (missing_loc.find(soid) == missing_loc.end())
3905
dout(10) << __func__ << ": " << soid << " still unfound" << dendl;
3907
dout(10) << __func__ << ": " << soid << " still missing on primary" << dendl;
3911
dout(10) << __func__ << ": recover_object_replicas(" << soid << ")" << dendl;
3912
started += recover_object_replicas(soid);
4032
dout(-10) << "recover_replicas - nothing to do!" << dendl;
4034
if (is_all_uptodate()) {
4035
ObjectStore::Transaction *t = new ObjectStore::Transaction;
4036
C_Contexts *fin = new C_Contexts;
4037
finish_recovery(*t, fin->contexts);
4038
int tr = osd->store->queue_transaction(&osr, t, new ObjectStore::C_DeleteTransaction(t), fin);
4041
dout(10) << "recover_replicas not all uptodate, acting " << acting << ", uptodate " << uptodate_set << dendl;
4044
3916
return started;
4048
3919
void ReplicatedPG::remove_object_with_snap_hardlinks(ObjectStore::Transaction& t, const sobject_t& soid)
4050
t.remove(coll_t::build_pg_coll(info.pgid), soid);
3921
t.remove(coll_t(info.pgid), soid);
4051
3922
if (soid.snap < CEPH_MAXSNAP) {
4053
int r = osd->store->getattr(coll_t::build_pg_coll(info.pgid), soid, OI_ATTR, ba);
3924
int r = osd->store->getattr(coll_t(info.pgid), soid, OI_ATTR, ba);
4055
3926
// grr, need first snap bound, too.
4056
3927
object_info_t oi(ba);
4057
3928
if (oi.snaps[0] != soid.snap)
4058
t.remove(coll_t::build_snap_pg_coll(info.pgid, oi.snaps[0]), soid);
4059
t.remove(coll_t::build_snap_pg_coll(info.pgid, soid.snap), soid);
3929
t.remove(coll_t(info.pgid, oi.snaps[0]), soid);
3930
t.remove(coll_t(info.pgid, soid.snap), soid);