~ubuntu-branches/ubuntu/raring/ceph/raring

« back to all changes in this revision

Viewing changes to src/osd/ReplicatedPG.cc

  • Committer: Package Import Robot
  • Author(s): Laszlo Boszormenyi (GCS)
  • Date: 2012-02-05 10:07:38 UTC
  • mfrom: (1.1.7) (0.1.11 sid)
  • Revision ID: package-import@ubuntu.com-20120205100738-00s0bxx93mamy8tk
Tags: 0.41-1
New upstream release.

Show diffs side-by-side

added added

removed removed

Lines of Context:
28
28
#include "messages/MOSDPGInfo.h"
29
29
#include "messages/MOSDPGRemove.h"
30
30
#include "messages/MOSDPGTrim.h"
 
31
#include "messages/MOSDPGScan.h"
 
32
#include "messages/MOSDPGBackfill.h"
31
33
 
32
34
#include "messages/MOSDPing.h"
33
35
#include "messages/MWatchNotify.h"
37
39
#include "mds/inode_backtrace.h" // Ugh
38
40
 
39
41
#include "common/config.h"
 
42
#include "include/compat.h"
40
43
 
41
44
#define DOUT_SUBSYS osd
42
 
#define DOUT_PREFIX_ARGS this, osd->whoami, osd->osdmap
 
45
#define DOUT_PREFIX_ARGS this, osd->whoami, get_osdmap()
43
46
#undef dout_prefix
44
 
#define dout_prefix _prefix(_dout, this, osd->whoami, osd->osdmap)
45
 
static ostream& _prefix(std::ostream *_dout, PG *pg, int whoami, OSDMap *osdmap) {
 
47
#define dout_prefix _prefix(_dout, this, osd->whoami, get_osdmap())
 
48
static ostream& _prefix(std::ostream *_dout, PG *pg, int whoami, OSDMapRef osdmap) {
46
49
  return *_dout << "osd." << whoami
47
50
                << " " << (osdmap ? osdmap->get_epoch():0) << " " << *pg << " ";
48
51
}
49
52
 
50
53
 
51
54
#include <sstream>
 
55
#include <utility>
52
56
 
53
57
#include <errno.h>
54
58
 
117
121
  waiting_for_missing_object[soid].push_back(m);
118
122
}
119
123
 
 
124
void ReplicatedPG::wait_for_all_missing(Message *m)
 
125
{
 
126
  waiting_for_all_missing.push_back(m);
 
127
}
 
128
 
120
129
bool ReplicatedPG::is_degraded_object(const hobject_t& soid)
121
130
{
122
131
  if (missing.missing.count(soid))
126
135
    if (peer_missing.count(peer) &&
127
136
        peer_missing[peer].missing.count(soid))
128
137
      return true;
 
138
 
 
139
    // Object is degraded if after last_backfill AND
 
140
    // we have are backfilling it
 
141
    if (peer == backfill_target &&
 
142
        peer_info[peer].last_backfill <= soid &&
 
143
        backfill_pos >= soid &&
 
144
        backfills_in_flight.count(soid))
 
145
      return true;
129
146
  }
130
147
  return false;
131
148
}
225
242
 
226
243
 
227
244
// ==========================================================
 
245
bool ReplicatedPG::pg_op_must_wait(MOSDOp *op)
 
246
{
 
247
  if (missing.missing.empty())
 
248
    return false;
 
249
  for (vector<OSDOp>::iterator p = op->ops.begin(); p != op->ops.end(); ++p) {
 
250
    if (p->op.op == CEPH_OSD_OP_PGLS) {
 
251
      if (op->get_snapid() != CEPH_NOSNAP) {
 
252
        return true;
 
253
      }
 
254
    }
 
255
  }
 
256
  return false;
 
257
}
 
258
 
228
259
void ReplicatedPG::do_pg_op(MOSDOp *op)
229
260
{
230
261
  dout(10) << "do_pg_op " << *op << dendl;
238
269
  snapid_t snapid = op->get_snapid();
239
270
 
240
271
  for (vector<OSDOp>::iterator p = op->ops.begin(); p != op->ops.end(); p++) {
241
 
    bufferlist::iterator bp = p->data.begin();
 
272
    bufferlist::iterator bp = p->indata.begin();
242
273
    switch (p->op.op) {
243
274
    case CEPH_OSD_OP_PGLS_FILTER:
244
 
      ::decode(cname, bp);
245
 
      ::decode(mname, bp);
246
 
 
 
275
      try {
 
276
        ::decode(cname, bp);
 
277
        ::decode(mname, bp);
 
278
      }
 
279
      catch (const buffer::error& e) {
 
280
        dout(0) << "unable to decode PGLS_FILTER description in " << *op << dendl;
 
281
        result = -EINVAL;
 
282
        break;
 
283
      }
247
284
      result = get_pgls_filter(bp, &filter);
248
285
      if (result < 0)
249
286
        break;
261
298
        // read into a buffer
262
299
        vector<hobject_t> sentries;
263
300
        PGLSResponse response;
264
 
        ::decode(response.handle, bp);
265
 
 
266
 
        // reset cookie?
267
 
        if (p->op.pgls.start_epoch &&
268
 
            p->op.pgls.start_epoch < info.history.same_primary_since) {
269
 
          dout(10) << " pgls sequence started epoch " << p->op.pgls.start_epoch
270
 
                   << " < same_primary_since " << info.history.same_primary_since
271
 
                   << ", resetting cookie" << dendl;
272
 
          response.handle = collection_list_handle_t();
273
 
        }
274
 
 
275
 
        if (response.handle.in_missing_set) {
276
 
          // it's an offset into the missing set
277
 
          version_t v = response.handle.index;
278
 
          dout(10) << " handle low/missing " << v << dendl;
279
 
          map<version_t, hobject_t>::iterator mp = missing.rmissing.lower_bound(v);
280
 
          result = 0;
281
 
          while (sentries.size() < p->op.pgls.count) {
282
 
            if (mp == missing.rmissing.end()) {
283
 
              dout(10) << " handle finished low/missing, moving to high/ondisk" << dendl;
284
 
              response.handle.in_missing_set = false;
285
 
              break;
286
 
            }
287
 
            sentries.push_back(mp->second);
288
 
            response.handle.index = mp->first + 1;
289
 
          }
290
 
        }
291
 
        if (sentries.size() < p->op.pgls.count &&
292
 
            !response.handle.in_missing_set) {
293
 
          // it's a readdir cookie
294
 
          dout(10) << " handle high/missing " << response.handle << dendl;
295
 
          osr.flush();  // order wrt preceeding writes
296
 
          result = osd->store->collection_list_partial(coll, snapid,
297
 
                                                       sentries, p->op.pgls.count - sentries.size(),
298
 
                                                       &response.handle);
299
 
          response.handle.in_missing_set = false;
300
 
        }
301
 
 
302
 
        if (result == 0) {
303
 
          vector<hobject_t>::iterator iter;
304
 
          for (iter = sentries.begin(); iter != sentries.end(); ++iter) {
305
 
            bool keep = true;
306
 
            // skip snapdir objects
307
 
            if (iter->snap == CEPH_SNAPDIR)
308
 
              continue;
309
 
 
310
 
            if (snapid != CEPH_NOSNAP) {
311
 
              // skip items not defined for this snapshot
312
 
              if (iter->snap == CEPH_NOSNAP) {
313
 
                bufferlist bl;
314
 
                osd->store->getattr(coll, *iter, SS_ATTR, bl);
315
 
                SnapSet snapset(bl);
316
 
                if (snapid <= snapset.seq)
317
 
                  continue;
318
 
              } else {
319
 
                bufferlist bl;
320
 
                osd->store->getattr(coll, *iter, OI_ATTR, bl);
321
 
                object_info_t oi(bl);
322
 
                bool exists = false;
323
 
                for (vector<snapid_t>::iterator i = oi.snaps.begin(); i != oi.snaps.end(); ++i)
324
 
                  if (*i == snapid) {
325
 
                    exists = true;
326
 
                    break;
327
 
                  }
328
 
                dout(10) << *iter << " has " << oi.snaps << " .. exists=" << exists << dendl;
329
 
                if (!exists)
330
 
                  continue;
331
 
              }
332
 
            }
333
 
            if (filter)
334
 
              keep = pgls_filter(filter, *iter, filter_out);
335
 
 
336
 
            if (keep)
337
 
              response.entries.push_back(iter->oid);
338
 
          }
339
 
          ::encode(response, outdata);
340
 
          if (filter)
341
 
            ::encode(filter_out, outdata);
342
 
        }
343
 
        dout(10) << " pgls result=" << result << " outdata.length()=" << outdata.length() << dendl;
 
301
        try {
 
302
          ::decode(response.handle, bp);
 
303
        }
 
304
        catch (const buffer::error& e) {
 
305
          dout(0) << "unable to decode PGLS handle in " << *op << dendl;
 
306
          result = -EINVAL;
 
307
          break;
 
308
        }
 
309
 
 
310
        hobject_t next;
 
311
        hobject_t current = response.handle;
 
312
        osr.flush();
 
313
        int r = osd->store->collection_list_partial(coll, current,
 
314
                                                    p->op.pgls.count,
 
315
                                                    p->op.pgls.count,
 
316
                                                    snapid,
 
317
                                                    &sentries,
 
318
                                                    &next);
 
319
        if (r != 0) {
 
320
          result = -EINVAL;
 
321
          break;
 
322
        }
 
323
 
 
324
        assert(snapid == CEPH_NOSNAP || missing.missing.empty());
 
325
        map<hobject_t, Missing::item>::iterator missing_iter =
 
326
          missing.missing.lower_bound(current);
 
327
        vector<hobject_t>::iterator ls_iter = sentries.begin();
 
328
        while (1) {
 
329
          if (ls_iter == sentries.end()) {
 
330
            break;
 
331
          }
 
332
 
 
333
          hobject_t candidate;
 
334
          if (missing_iter == missing.missing.end() ||
 
335
              *ls_iter < missing_iter->first) {
 
336
            candidate = *(ls_iter++);
 
337
          } else {
 
338
            candidate = (missing_iter++)->first;
 
339
          }
 
340
 
 
341
          if (response.entries.size() == p->op.pgls.count) {
 
342
            next = candidate;
 
343
          }
 
344
 
 
345
          // skip snapdir objects
 
346
          if (candidate.snap == CEPH_SNAPDIR)
 
347
            continue;
 
348
 
 
349
          if (candidate.snap < snapid)
 
350
            continue;
 
351
 
 
352
          if (snapid != CEPH_NOSNAP) {
 
353
            bufferlist bl;
 
354
            if (candidate.snap == CEPH_NOSNAP) {
 
355
              osd->store->getattr(coll, candidate, SS_ATTR, bl);
 
356
              SnapSet snapset(bl);
 
357
              if (snapid <= snapset.seq)
 
358
                continue;
 
359
            } else {
 
360
              bufferlist attr_bl;
 
361
              osd->store->getattr(coll, candidate, OI_ATTR, attr_bl);
 
362
              object_info_t oi(attr_bl);
 
363
              vector<snapid_t>::iterator i = find(oi.snaps.begin(),
 
364
                                                  oi.snaps.end(),
 
365
                                                  snapid);
 
366
              if (i == oi.snaps.end())
 
367
                continue;
 
368
            }
 
369
          }
 
370
 
 
371
          if (filter && !pgls_filter(filter, candidate, filter_out))
 
372
            continue;
 
373
 
 
374
          response.entries.push_back(make_pair(candidate.oid,
 
375
                                               candidate.get_key()));
 
376
        }
 
377
        response.handle = next;
 
378
        ::encode(response, outdata);
 
379
        if (filter)
 
380
          ::encode(filter_out, outdata);
 
381
        dout(10) << " pgls result=" << result << " outdata.length()="
 
382
                 << outdata.length() << dendl;
344
383
      }
345
384
      break;
346
385
 
351
390
  }
352
391
 
353
392
  // reply
354
 
  MOSDOpReply *reply = new MOSDOpReply(op, 0, osd->osdmap->get_epoch(),
 
393
  MOSDOpReply *reply = new MOSDOpReply(op, 0, get_osdmap()->get_epoch(),
355
394
                                       CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK); 
356
395
  reply->set_data(outdata);
357
396
  reply->set_result(result);
362
401
 
363
402
void ReplicatedPG::calc_trim_to()
364
403
{
365
 
  if (!is_degraded() && !is_scrubbing() &&
366
 
      (is_clean() ||
367
 
       log.head.version - log.tail.version > (unsigned)info.stats.stats.sum.num_objects)) {
 
404
  if (!is_degraded() && !is_scrubbing() && is_clean()) {
368
405
    if (min_last_complete_ondisk != eversion_t() &&
369
 
        min_last_complete_ondisk != pg_trim_to) {
370
 
      dout(10) << "calc_trim_to " << pg_trim_to << " -> " << min_last_complete_ondisk << dendl;
371
 
      pg_trim_to = min_last_complete_ondisk;
 
406
        min_last_complete_ondisk != pg_trim_to &&
 
407
        log.approx_size() > g_conf->osd_min_pg_log_entries) {
 
408
      size_t num_to_trim = log.approx_size() - g_conf->osd_min_pg_log_entries;
 
409
      list<Log::Entry>::const_iterator it = log.log.begin();
 
410
      eversion_t new_trim_to;
 
411
      for (size_t i = 0; i < num_to_trim; ++i) {
 
412
        new_trim_to = it->version;
 
413
        ++it;
 
414
        if (new_trim_to > min_last_complete_ondisk) {
 
415
          new_trim_to = min_last_complete_ondisk;
 
416
          dout(10) << "calc_trim_to trimming to min_last_complete_ondisk" << dendl;
 
417
          break;
 
418
        }
 
419
      }
 
420
      dout(10) << "calc_trim_to " << pg_trim_to << " -> " << new_trim_to << dendl;
 
421
      pg_trim_to = new_trim_to;
372
422
      assert(pg_trim_to <= log.head);
 
423
      assert(pg_trim_to <= min_last_complete_ondisk);
373
424
    }
374
425
  } else {
375
426
    // don't trim
396
447
 */
397
448
void ReplicatedPG::do_op(MOSDOp *op) 
398
449
{
399
 
  if ((op->get_rmw_flags() & CEPH_OSD_FLAG_PGOP))
 
450
  if ((op->get_rmw_flags() & CEPH_OSD_FLAG_PGOP)) {
 
451
    if (pg_op_must_wait(op)) {
 
452
      wait_for_all_missing(op);
 
453
      return;
 
454
    }
400
455
    return do_pg_op(op);
 
456
  }
401
457
 
402
458
  dout(10) << "do_op " << *op << (op->may_write() ? " may_write" : "") << dendl;
403
459
 
420
476
    wait_for_degraded_object(head, op);
421
477
    return;
422
478
  }
 
479
 
 
480
  // missing snapdir?
 
481
  hobject_t snapdir(op->get_oid(), op->get_object_locator().key,
 
482
                 CEPH_SNAPDIR, op->get_pg().ps());
 
483
  if (is_missing_object(snapdir)) {
 
484
    wait_for_missing_object(snapdir, op);
 
485
    return;
 
486
  }
 
487
 
 
488
  // degraded object?
 
489
  if (op->may_write() && is_degraded_object(snapdir)) {
 
490
    wait_for_degraded_object(snapdir, op);
 
491
    return;
 
492
  }
423
493
 
424
494
  entity_inst_t client = op->get_source_inst();
425
495
 
496
566
 
497
567
  dout(10) << "do_op mode now " << mode << dendl;
498
568
 
 
569
  // are writes blocked by another object?
 
570
  if (obc->blocked_by) {
 
571
    dout(10) << "do_op writes for " << obc->obs.oi.soid << " blocked by "
 
572
             << obc->blocked_by->obs.oi.soid << dendl;
 
573
    wait_for_degraded_object(obc->blocked_by->obs.oi.soid, op);
 
574
    put_object_context(obc);
 
575
    return;
 
576
  }
 
577
 
 
578
  // if we have src_oids, we need to be careful of the target being
 
579
  // before and a src being after the last_backfill line, or else the
 
580
  // operation won't apply properly on the backfill_target.  (the
 
581
  // opposite is not a problem; if the target is after the line, we
 
582
  // don't apply on the backfill_target and it doesn't matter.)
 
583
  Info *backfill_target_info = NULL;
 
584
  bool before_backfill = false;
 
585
  if (backfill_target >= 0) {
 
586
    backfill_target_info = &peer_info[backfill_target];
 
587
    before_backfill = obc->obs.oi.soid < backfill_target_info->last_backfill;
 
588
  }
 
589
 
499
590
  // src_oids
500
591
  map<hobject_t,ObjectContext*> src_obc;
501
592
  for (vector<OSDOp>::iterator p = op->ops.begin(); p != op->ops.end(); p++) {
502
593
    OSDOp& osd_op = *p;
503
 
    object_locator_t src_oloc;
504
 
    get_src_oloc(op->get_oid(), op->get_object_locator(), src_oloc);
505
 
    hobject_t toid(osd_op.soid, src_oloc.key, op->get_pg().ps());
 
594
    if (!ceph_osd_op_type_multi(osd_op.op.op))
 
595
      continue;
506
596
    if (osd_op.soid.oid.name.length()) {
507
 
      if (!src_obc.count(toid)) {
 
597
      object_locator_t src_oloc;
 
598
      get_src_oloc(op->get_oid(), op->get_object_locator(), src_oloc);
 
599
      hobject_t src_oid(osd_op.soid, src_oloc.key, op->get_pg().ps());
 
600
      if (!src_obc.count(src_oid)) {
508
601
        ObjectContext *sobc;
509
602
        snapid_t ssnapid;
510
603
 
511
 
        int r = find_object_context(hobject_t(toid.oid, src_oloc.key,
512
 
                                              toid.snap, op->get_pg().ps()),
513
 
                                    src_oloc,
514
 
                                    &sobc, false, &ssnapid);
 
604
        int r = find_object_context(src_oid, src_oloc, &sobc, false, &ssnapid);
515
605
        if (r == -EAGAIN) {
516
606
          // missing the specific snap we need; requeue and wait.
517
607
          hobject_t wait_oid(osd_op.soid.oid, src_oloc.key, ssnapid, op->get_pg().ps());
518
608
          wait_for_missing_object(wait_oid, op);
519
609
        } else if (r) {
520
610
          osd->reply_op_error(op, r);
521
 
        } else if (is_degraded_object(sobc->obs.oi.soid)) { 
522
 
          wait_for_degraded_object(sobc->obs.oi.soid, op);
523
611
        } else if (sobc->obs.oi.oloc.key != obc->obs.oi.oloc.key &&
524
612
                   sobc->obs.oi.oloc.key != obc->obs.oi.soid.oid.name &&
525
613
                   sobc->obs.oi.soid.oid.name != obc->obs.oi.oloc.key) {
526
614
          dout(1) << " src_oid " << osd_op.soid << " oloc " << sobc->obs.oi.oloc << " != "
527
615
                  << op->get_oid() << " oloc " << obc->obs.oi.oloc << dendl;
528
616
          osd->reply_op_error(op, -EINVAL);
 
617
        } else if (is_degraded_object(sobc->obs.oi.soid) ||
 
618
                   (before_backfill && sobc->obs.oi.soid > backfill_target_info->last_backfill)) {
 
619
          wait_for_degraded_object(sobc->obs.oi.soid, op);
 
620
          dout(10) << " writes for " << obc->obs.oi.soid << " now blocked by "
 
621
                   << sobc->obs.oi.soid << dendl;
 
622
          obc->get();
 
623
          obc->blocked_by = sobc;
 
624
          sobc->get();
 
625
          sobc->blocking.insert(obc);
529
626
        } else {
530
 
          src_obc[toid] = sobc;
 
627
          dout(10) << " src_oid " << src_oid << " obc " << src_obc << dendl;
 
628
          src_obc[src_oid] = sobc;
531
629
          continue;
532
630
        }
533
631
        // Error cleanup below
536
634
      }
537
635
      // Error cleanup below
538
636
    } else {
539
 
      continue;
 
637
      dout(10) << "no src oid specified for multi op " << osd_op << dendl;
 
638
      osd->reply_op_error(op, -EINVAL);
 
639
      op->put();
540
640
    }
541
641
    put_object_contexts(src_obc);
542
642
    put_object_context(obc);
590
690
    // version
591
691
    ctx->at_version = log.head;
592
692
 
593
 
    ctx->at_version.epoch = osd->osdmap->get_epoch();
 
693
    ctx->at_version.epoch = get_osdmap()->get_epoch();
594
694
    ctx->at_version.version++;
595
695
    assert(ctx->at_version > info.last_update);
596
696
    assert(ctx->at_version > log.head);
646
746
  }
647
747
 
648
748
  // prepare the reply
649
 
  ctx->reply = new MOSDOpReply(op, 0, osd->osdmap->get_epoch(), 0); 
650
 
  ctx->reply->set_data(ctx->outdata);
 
749
  ctx->reply = new MOSDOpReply(op, 0, get_osdmap()->get_epoch(), 0);
 
750
  ctx->reply->claim_op_out_data(ctx->ops);
651
751
  ctx->reply->get_header().data_off = ctx->data_off;
652
752
  ctx->reply->set_result(result);
653
753
 
715
815
 
716
816
  osd->logger->inc(l_osd_op_outb, outb);
717
817
  osd->logger->inc(l_osd_op_inb, inb);
718
 
  osd->logger->fset(l_osd_op_lat, latency);
 
818
  osd->logger->finc(l_osd_op_lat, latency);
719
819
 
720
820
  if (op->may_read() && op->may_write()) {
721
821
    osd->logger->inc(l_osd_op_rw);
722
822
    osd->logger->inc(l_osd_op_rw_inb, inb);
723
823
    osd->logger->inc(l_osd_op_rw_outb, outb);
724
 
    osd->logger->fset(l_osd_op_rw_rlat, rlatency);
725
 
    osd->logger->fset(l_osd_op_rw_lat, latency);
 
824
    osd->logger->finc(l_osd_op_rw_rlat, rlatency);
 
825
    osd->logger->finc(l_osd_op_rw_lat, latency);
726
826
  } else if (op->may_read()) {
727
827
    osd->logger->inc(l_osd_op_r);
728
828
    osd->logger->inc(l_osd_op_r_outb, outb);
729
 
    osd->logger->fset(l_osd_op_r_lat, latency);
 
829
    osd->logger->finc(l_osd_op_r_lat, latency);
730
830
  } else if (op->may_write()) {
731
831
    osd->logger->inc(l_osd_op_w);
732
832
    osd->logger->inc(l_osd_op_w_inb, inb);
733
 
    osd->logger->fset(l_osd_op_w_rlat, rlatency);
734
 
    osd->logger->fset(l_osd_op_w_lat, latency);
 
833
    osd->logger->finc(l_osd_op_w_rlat, rlatency);
 
834
    osd->logger->finc(l_osd_op_w_lat, latency);
735
835
  } else
736
836
    assert(0);
737
837
 
753
853
  osd->logger->inc(l_osd_sop);
754
854
 
755
855
  osd->logger->inc(l_osd_sop_inb, inb);
756
 
  osd->logger->fset(l_osd_sop_lat, latency);
 
856
  osd->logger->finc(l_osd_sop_lat, latency);
757
857
 
758
858
  if (tag_inb)
759
859
    osd->logger->inc(tag_inb, inb);
760
 
  osd->logger->fset(tag_lat, latency);
 
860
  osd->logger->finc(tag_lat, latency);
761
861
 
762
862
  dout(15) << "log_subop_stats " << *op << " inb " << inb << " latency " << latency << dendl;
763
863
}
777
877
    case CEPH_OSD_OP_PUSH:
778
878
      sub_op_push(op);
779
879
      return;
 
880
    case CEPH_OSD_OP_DELETE:
 
881
      sub_op_remove(op);
 
882
      return;
780
883
    case CEPH_OSD_OP_SCRUB_RESERVE:
781
884
      sub_op_scrub_reserve(op);
782
885
      return;
814
917
  sub_op_modify_reply(r);
815
918
}
816
919
 
 
920
void ReplicatedPG::do_scan(MOSDPGScan *m)
 
921
{
 
922
  dout(10) << "do_scan " << *m << dendl;
 
923
 
 
924
  switch (m->op) {
 
925
  case MOSDPGScan::OP_SCAN_GET_DIGEST:
 
926
    {
 
927
      BackfillInterval bi;
 
928
      osr.flush();
 
929
      scan_range(m->begin, g_conf->osd_backfill_scan_min, g_conf->osd_backfill_scan_max, &bi);
 
930
      MOSDPGScan *reply = new MOSDPGScan(MOSDPGScan::OP_SCAN_DIGEST,
 
931
                                         get_osdmap()->get_epoch(), m->query_epoch,
 
932
                                         info.pgid, bi.begin, bi.end);
 
933
      ::encode(bi.objects, reply->get_data());
 
934
      osd->cluster_messenger->send_message(reply, m->get_connection());
 
935
    }
 
936
    break;
 
937
 
 
938
  case MOSDPGScan::OP_SCAN_DIGEST:
 
939
    {
 
940
      int from = m->get_source().num();
 
941
      assert(from == backfill_target);
 
942
      BackfillInterval& bi = peer_backfill_info;
 
943
      bi.begin = m->begin;
 
944
      bi.end = m->end;
 
945
      bufferlist::iterator p = m->get_data().begin();
 
946
      ::decode(bi.objects, p);
 
947
 
 
948
      backfill_pos = backfill_info.begin > peer_backfill_info.begin ?
 
949
        peer_backfill_info.begin : backfill_info.begin;
 
950
      dout(10) << " backfill_pos now " << backfill_pos << dendl;
 
951
 
 
952
      assert(waiting_on_backfill);
 
953
      waiting_on_backfill = false;
 
954
      finish_recovery_op(bi.begin);
 
955
    }
 
956
    break;
 
957
  }
 
958
 
 
959
  m->put();
 
960
}
 
961
 
 
962
void ReplicatedPG::do_backfill(MOSDPGBackfill *m)
 
963
{
 
964
  dout(10) << "do_backfill " << *m << dendl;
 
965
 
 
966
  switch (m->op) {
 
967
  case MOSDPGBackfill::OP_BACKFILL_FINISH:
 
968
    {
 
969
      assert(is_replica());
 
970
      assert(g_conf->osd_kill_backfill_at != 1);
 
971
 
 
972
      MOSDPGBackfill *reply = new MOSDPGBackfill(MOSDPGBackfill::OP_BACKFILL_FINISH_ACK,
 
973
                                                 get_osdmap()->get_epoch(), m->query_epoch,
 
974
                                                 info.pgid);
 
975
      osd->cluster_messenger->send_message(reply, m->get_connection());
 
976
    }
 
977
    // fall-thru
 
978
 
 
979
  case MOSDPGBackfill::OP_BACKFILL_PROGRESS:
 
980
    {
 
981
      assert(is_replica());
 
982
      assert(g_conf->osd_kill_backfill_at != 2);
 
983
 
 
984
      info.last_backfill = m->last_backfill;
 
985
      info.stats.stats = m->stats;
 
986
 
 
987
      ObjectStore::Transaction *t = new ObjectStore::Transaction;
 
988
      write_info(*t);
 
989
      int tr = osd->store->queue_transaction(&osr, t);
 
990
      assert(tr == 0);
 
991
    }
 
992
    break;
 
993
 
 
994
  case MOSDPGBackfill::OP_BACKFILL_FINISH_ACK:
 
995
    {
 
996
      assert(is_primary());
 
997
      assert(g_conf->osd_kill_backfill_at != 3);
 
998
      finish_recovery_op(hobject_t::get_max());
 
999
    }
 
1000
    break;
 
1001
  }
 
1002
 
 
1003
  m->put();
 
1004
}
 
1005
 
817
1006
/* Returns head of snap_trimq as snap_to_trim and the relevant objects as 
818
1007
 * obs_to_trim */
819
1008
bool ReplicatedPG::get_obs_to_trim(snapid_t &snap_to_trim,
883
1072
  OpContext *ctx = new OpContext(NULL, reqid, ops, &obc->obs, ssc, this);
884
1073
  ctx->mtime = ceph_clock_now(g_ceph_context);
885
1074
 
886
 
  ctx->at_version.epoch = osd->osdmap->get_epoch();
 
1075
  ctx->at_version.epoch = get_osdmap()->get_epoch();
887
1076
  ctx->at_version.version = log.head.version + 1;
888
1077
 
889
1078
 
925
1114
    delta.num_objects--;
926
1115
    delta.num_object_clones--;
927
1116
    delta.num_bytes -= snapset.clone_size[last];
928
 
    delta.num_kb -= SHIFT_ROUND_UP(snapset.clone_size[last], 10);
929
1117
    info.stats.stats.add(delta, obc->obs.oi.category);
930
1118
 
931
1119
    snapset.clones.erase(p);
1017
1205
    if (snap_trimmer_machine.need_share_pg_info) {
1018
1206
      dout(10) << "snap_trimmer share_pg_info" << dendl;
1019
1207
      snap_trimmer_machine.need_share_pg_info = false;
1020
 
      epoch_t cur_epoch = osd->osdmap->get_epoch();
1021
 
      unlock();
1022
 
      osd->map_lock.get_read();
1023
 
      lock();
1024
 
      if (last_peering_reset > cur_epoch) {
1025
 
        osd->map_lock.put_read();
1026
 
        unlock();
1027
 
        return true;
1028
 
      }
1029
1208
      share_pg_info();
1030
 
      osd->map_lock.put_read();
1031
1209
    }
1032
1210
  } else if (is_active() && 
1033
1211
             last_complete_ondisk.epoch > info.history.last_epoch_started) {
1173
1351
    obc->ref++;
1174
1352
    for (map<entity_name_t, OSD::Session *>::iterator witer = obc->watchers.begin();
1175
1353
         witer != obc->watchers.end();
1176
 
         remove_watcher(obc, (witer++)->first));
 
1354
         remove_watcher(obc, (witer++)->first)) ;
1177
1355
    for (map<entity_name_t, Context *>::iterator iter = obc->unconnected_watchers.begin();
1178
1356
         iter != obc->unconnected_watchers.end();
1179
 
      ) {
 
1357
         ) {
1180
1358
      map<entity_name_t, Context *>::iterator i = iter++;
1181
1359
      unregister_unconnected_watcher(obc, i->first);
1182
1360
    }
1183
1361
    for (map<Watch::Notification *, bool>::iterator niter = obc->notifs.begin();
1184
1362
         niter != obc->notifs.end();
1185
 
         remove_notify(obc, (niter++)->first));
 
1363
         remove_notify(obc, (niter++)->first)) ;
1186
1364
    put_object_context(obc);
1187
1365
  }
1188
1366
  osd->watch_lock.Unlock();
1191
1369
// ========================================================================
1192
1370
// low level osd ops
1193
1371
 
1194
 
int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops,
1195
 
                             bufferlist& odata)
 
1372
int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
1196
1373
{
1197
1374
  int result = 0;
1198
1375
  SnapSetContext *ssc = ctx->obc->ssc;
1199
1376
  ObjectState& obs = ctx->new_obs;
1200
1377
  object_info_t& oi = obs.oi;
1201
 
 
1202
 
  bool maybe_created = false;
1203
 
 
1204
1378
  const hobject_t& soid = oi.soid;
1205
1379
 
 
1380
  bool first_read = false;
 
1381
 
1206
1382
  ObjectStore::Transaction& t = ctx->op_t;
1207
1383
 
1208
1384
  dout(10) << "do_osd_op " << soid << " " << ops << dendl;
1209
1385
 
1210
1386
  for (vector<OSDOp>::iterator p = ops.begin(); p != ops.end(); p++) {
1211
1387
    OSDOp& osd_op = *p;
1212
 
    ceph_osd_op& op = osd_op.op; 
1213
 
 
 
1388
    ceph_osd_op& op = osd_op.op;
 
1389
 
1214
1390
    dout(10) << "do_osd_op  " << osd_op << dendl;
1215
1391
 
1216
 
    bufferlist::iterator bp = osd_op.data.begin();
 
1392
    bufferlist::iterator bp = osd_op.indata.begin();
1217
1393
 
1218
1394
    // user-visible modifcation?
1219
1395
    switch (op.op) {
1227
1403
 
1228
1404
    ObjectContext *src_obc = 0;
1229
1405
    if (ceph_osd_op_type_multi(op.op)) {
1230
 
      src_obc = ctx->src_obc[hobject_t(osd_op.soid, 
1231
 
                                       ((MOSDOp *)ctx->op)->get_object_locator().key,
1232
 
                                       soid.hash)];
 
1406
      object_locator_t src_oloc;
 
1407
      get_src_oloc(soid.oid, ((MOSDOp *)ctx->op)->get_object_locator(), src_oloc);
 
1408
      hobject_t src_oid(osd_op.soid, src_oloc.key, soid.hash);
 
1409
      src_obc = ctx->src_obc[src_oid];
 
1410
      dout(10) << " src_oid " << src_oid << " obc " << src_obc << dendl;
1233
1411
      assert(src_obc);
1234
1412
    }
1235
1413
 
 
1414
    // munge -1 truncate to 0 truncate
 
1415
    if (op.extent.truncate_seq == 1 && op.extent.truncate_size == (-1ULL)) {
 
1416
      op.extent.truncate_size = 0;
 
1417
      op.extent.truncate_seq = 0;
 
1418
    }
 
1419
 
1236
1420
    // munge ZERO -> TRUNCATE?  (don't munge to DELETE or we risk hosing attributes)
1237
1421
    if (op.op == CEPH_OSD_OP_ZERO &&
1238
1422
        obs.exists &&
1251
1435
        // read into a buffer
1252
1436
        bufferlist bl;
1253
1437
        int r = osd->store->read(coll, soid, op.extent.offset, op.extent.length, bl);
1254
 
        if (odata.length() == 0)
 
1438
        if (first_read) {
 
1439
          first_read = false;
1255
1440
          ctx->data_off = op.extent.offset;
1256
 
        odata.claim(bl);
 
1441
        }
 
1442
        osd_op.outdata.claim_append(bl);
1257
1443
        if (r >= 0) 
1258
1444
          op.extent.length = r;
1259
1445
        else {
1278
1464
 
1279
1465
          bufferlist keep;
1280
1466
 
1281
 
          // keep first part of odata; trim at truncation point
 
1467
          // keep first part of osd_op.outdata; trim at truncation point
1282
1468
          dout(10) << " obj " << soid << " seq " << seq
1283
1469
                   << ": trimming overlap " << from << "~" << trim << dendl;
1284
 
          keep.substr_of(odata, 0, odata.length() - trim);
1285
 
          odata.claim(keep);
 
1470
          keep.substr_of(osd_op.outdata, 0, osd_op.outdata.length() - trim);
 
1471
          osd_op.outdata.claim(keep);
1286
1472
        }
1287
1473
      }
1288
1474
      break;
1293
1479
        // read into a buffer
1294
1480
        bufferlist bl;
1295
1481
        int r = osd->store->fiemap(coll, soid, op.extent.offset, op.extent.length, bl);
1296
 
/*
1297
 
        if (odata.length() == 0)
1298
 
          ctx->data_off = op.extent.offset; */
1299
 
        odata.claim(bl);
 
1482
        osd_op.outdata.claim(bl);
1300
1483
        if (r < 0)
1301
1484
          result = r;
1302
1485
        ctx->delta_stats.num_rd_kb += SHIFT_ROUND_UP(op.extent.length, 10);
1346
1529
 
1347
1530
        op.extent.length = total_read;
1348
1531
 
1349
 
        ::encode(m, odata);
1350
 
        ::encode(data_bl, odata);
 
1532
        ::encode(m, osd_op.outdata);
 
1533
        ::encode(data_bl, osd_op.outdata);
1351
1534
 
1352
1535
        ctx->delta_stats.num_rd_kb += SHIFT_ROUND_UP(op.extent.length, 10);
1353
1536
        ctx->delta_stats.num_rd++;
1385
1568
        result = method->exec((cls_method_context_t)&ctx, indata, outdata);
1386
1569
        dout(10) << "method called response length=" << outdata.length() << dendl;
1387
1570
        op.extent.length = outdata.length();
1388
 
        odata.claim_append(outdata);
 
1571
        osd_op.outdata.claim_append(outdata);
1389
1572
      }
1390
1573
      break;
1391
1574
 
1392
1575
    case CEPH_OSD_OP_STAT:
1393
1576
      {
1394
1577
        if (obs.exists) {
1395
 
          ::encode(oi.size, odata);
1396
 
          ::encode(oi.mtime, odata);
 
1578
          ::encode(oi.size, osd_op.outdata);
 
1579
          ::encode(oi.mtime, osd_op.outdata);
1397
1580
          dout(10) << "stat oi has " << oi.size << " " << oi.mtime << dendl;
1398
1581
        } else {
1399
1582
          result = -ENOENT;
1400
1583
          dout(10) << "stat oi object does not exist" << dendl;
1401
1584
        }
1402
 
        if (1) {  // REMOVE ME LATER!
1403
 
          struct stat st;
1404
 
          memset(&st, 0, sizeof(st));
1405
 
          int checking_result = osd->store->stat(coll, soid, &st);
1406
 
          if ((checking_result != result) ||
1407
 
              ((uint64_t)st.st_size != oi.size)) {
1408
 
            osd->clog.error() << info.pgid << " " << soid << " oi.size " << oi.size
1409
 
                              << " but stat got " << checking_result << " size " << st.st_size << "\n";
1410
 
            assert(0 == "oi disagrees with stat, or error code on stat");
1411
 
          }
1412
 
        }
1413
1585
 
1414
1586
        ctx->delta_stats.num_rd++;
1415
1587
      }
1420
1592
        string aname;
1421
1593
        bp.copy(op.xattr.name_len, aname);
1422
1594
        string name = "_" + aname;
1423
 
        int r = osd->store->getattr(coll, soid, name.c_str(), odata);
 
1595
        int r = osd->store->getattr(coll, soid, name.c_str(), osd_op.outdata);
1424
1596
        if (r >= 0) {
1425
1597
          op.xattr.value_len = r;
1426
1598
          result = 0;
1444
1616
        
1445
1617
        bufferlist bl;
1446
1618
        ::encode(newattrs, bl);
1447
 
        odata.claim_append(bl);
 
1619
        osd_op.outdata.claim_append(bl);
1448
1620
      }
1449
1621
      break;
1450
1622
      
1461
1633
          result = osd->store->getattr(coll, soid, name.c_str(), xattr);
1462
1634
        else
1463
1635
          result = osd->store->getattr(coll, src_obc->obs.oi.soid, name.c_str(), xattr);
1464
 
        if (result < 0 && result != -EEXIST && result !=-ENODATA)
 
1636
        if (result < 0 && result != -EEXIST && result != -ENODATA)
1465
1637
          break;
1466
1638
        
1467
1639
        switch (op.xattr.cmp_mode) {
1595
1767
          t.truncate(coll, soid, op.extent.truncate_size);
1596
1768
          oi.truncate_seq = op.extent.truncate_seq;
1597
1769
          oi.truncate_size = op.extent.truncate_size;
 
1770
          if (op.extent.truncate_size != oi.size) {
 
1771
            ctx->delta_stats.num_bytes -= oi.size;
 
1772
            ctx->delta_stats.num_bytes += op.extent.truncate_size;
 
1773
            oi.size = op.extent.truncate_size;
 
1774
          }
1598
1775
        }
1599
 
        if (op.extent.length) {
1600
 
          bufferlist nbl;
1601
 
          bp.copy(op.extent.length, nbl);
1602
 
          t.write(coll, soid, op.extent.offset, op.extent.length, nbl);
1603
 
        } else {
1604
 
          t.touch(coll, soid);
1605
 
        }
 
1776
        bufferlist nbl;
 
1777
        bp.copy(op.extent.length, nbl);
 
1778
        t.write(coll, soid, op.extent.offset, op.extent.length, nbl);
1606
1779
        write_update_size_and_usage(ctx->delta_stats, oi, ssc->snapset, ctx->modified_ranges,
1607
1780
                                    op.extent.offset, op.extent.length, true);
1608
 
        maybe_created = true;
 
1781
        if (!obs.exists) {
 
1782
          ctx->delta_stats.num_objects++;
 
1783
          obs.exists = true;
 
1784
        }
1609
1785
      }
1610
1786
      break;
1611
1787
      
1613
1789
      { // write full object
1614
1790
        bufferlist nbl;
1615
1791
        bp.copy(op.extent.length, nbl);
1616
 
        if (obs.exists)
 
1792
        if (obs.exists) {
1617
1793
          t.truncate(coll, soid, 0);
1618
 
        else
1619
 
          maybe_created = true;
 
1794
        } else {
 
1795
          ctx->delta_stats.num_objects++;
 
1796
          obs.exists = true;
 
1797
        }
1620
1798
        t.write(coll, soid, op.extent.offset, op.extent.length, nbl);
1621
 
        if (ssc->snapset.clones.size() && oi.size > 0) {
1622
 
          interval_set<uint64_t> ch;
 
1799
        interval_set<uint64_t> ch;
 
1800
        if (oi.size > 0)
1623
1801
          ch.insert(0, oi.size);
1624
 
          ctx->modified_ranges.union_of(ch);
1625
 
          oi.size = 0;
1626
 
        }
1627
 
        if (op.extent.length != oi.size) {
 
1802
        ctx->modified_ranges.union_of(ch);
 
1803
        if (op.extent.length + op.extent.offset != oi.size) {
1628
1804
          ctx->delta_stats.num_bytes -= oi.size;
1629
 
          ctx->delta_stats.num_kb -= SHIFT_ROUND_UP(oi.size, 10);
1630
 
          ctx->delta_stats.num_bytes += op.extent.length;
1631
 
          ctx->delta_stats.num_kb += SHIFT_ROUND_UP(op.extent.length, 10);
1632
 
          oi.size = op.extent.length;
 
1805
          oi.size = op.extent.length + op.extent.offset;
 
1806
          ctx->delta_stats.num_bytes += oi.size;
1633
1807
        }
1634
1808
        ctx->delta_stats.num_wr++;
1635
1809
        ctx->delta_stats.num_wr_kb += SHIFT_ROUND_UP(op.extent.length, 10);
1645
1819
        assert(op.extent.length);
1646
1820
        if (obs.exists) {
1647
1821
          t.zero(coll, soid, op.extent.offset, op.extent.length);
1648
 
          if (ssc->snapset.clones.size()) {
1649
 
            interval_set<uint64_t> ch;
1650
 
            ch.insert(op.extent.offset, op.extent.length);
1651
 
            ctx->modified_ranges.union_of(ch);
1652
 
            add_interval_usage(ch, ctx->delta_stats);
1653
 
          }
 
1822
          interval_set<uint64_t> ch;
 
1823
          ch.insert(op.extent.offset, op.extent.length);
 
1824
          ctx->modified_ranges.union_of(ch);
1654
1825
          ctx->delta_stats.num_wr++;
1655
1826
        } else {
1656
1827
          // no-op
1658
1829
      }
1659
1830
      break;
1660
1831
    case CEPH_OSD_OP_CREATE:
1661
 
      { // zero
 
1832
      {
1662
1833
        int flags = le32_to_cpu(op.flags);
1663
 
        if (obs.exists && (flags & CEPH_OSD_OP_FLAG_EXCL))
 
1834
        if (obs.exists && (flags & CEPH_OSD_OP_FLAG_EXCL)) {
1664
1835
          result = -EEXIST; /* this is an exclusive create */
1665
 
        else {
1666
 
          t.touch(coll, soid);
1667
 
          maybe_created = true;
1668
 
        }
1669
 
        if (osd_op.data.length()) {
1670
 
          bufferlist::iterator p = osd_op.data.begin();
1671
 
          string category;
1672
 
          ::decode(category, p);
1673
 
          if (category.size()) {
1674
 
            if (obs.exists) {
1675
 
              if (obs.oi.category != category)
1676
 
                result = -EEXIST;
1677
 
            } else {
1678
 
              obs.oi.category = category;
 
1836
        } else {
 
1837
          if (osd_op.indata.length()) {
 
1838
            bufferlist::iterator p = osd_op.indata.begin();
 
1839
            string category;
 
1840
            ::decode(category, p);
 
1841
            if (category.size()) {
 
1842
              if (obs.exists) {
 
1843
                if (obs.oi.category != category)
 
1844
                  result = -EEXIST;  // category cannot be reset
 
1845
              } else {
 
1846
                obs.oi.category = category;
 
1847
              }
1679
1848
            }
1680
 
          }
 
1849
          }
 
1850
          if (result >= 0 && !obs.exists) {
 
1851
            t.touch(coll, soid);
 
1852
            ctx->delta_stats.num_objects++;
 
1853
            obs.exists = true;
 
1854
          }
1681
1855
        }
1682
1856
      }
1683
1857
      break;
1687
1861
      // falling through
1688
1862
 
1689
1863
    case CEPH_OSD_OP_TRUNCATE:
1690
 
      { // truncate
 
1864
      {
 
1865
        // truncate
1691
1866
        if (!obs.exists) {
1692
1867
          dout(10) << " object dne, truncate is a no-op" << dendl;
1693
1868
          break;
1707
1882
        }
1708
1883
 
1709
1884
        t.truncate(coll, soid, op.extent.offset);
1710
 
        if (ssc->snapset.clones.size()) {
1711
 
          snapid_t newest = *ssc->snapset.clones.rbegin();
 
1885
        if (oi.size > op.extent.offset) {
1712
1886
          interval_set<uint64_t> trim;
1713
 
          if (oi.size > op.extent.offset) {
1714
 
            trim.insert(op.extent.offset, oi.size-op.extent.offset);
1715
 
            ctx->modified_ranges.union_of(trim);
1716
 
            trim.intersection_of(ssc->snapset.clone_overlap[newest]);
1717
 
            add_interval_usage(trim, ctx->delta_stats);
1718
 
          }
 
1887
          trim.insert(op.extent.offset, oi.size-op.extent.offset);
 
1888
          ctx->modified_ranges.union_of(trim);
1719
1889
        }
1720
1890
        if (op.extent.offset != oi.size) {
1721
1891
          ctx->delta_stats.num_bytes -= oi.size;
1722
 
          ctx->delta_stats.num_kb -= SHIFT_ROUND_UP(oi.size, 10);
1723
1892
          ctx->delta_stats.num_bytes += op.extent.offset;
1724
 
          ctx->delta_stats.num_kb += SHIFT_ROUND_UP(op.extent.offset, 10);
1725
1893
          oi.size = op.extent.offset;
1726
1894
        }
1727
1895
        ctx->delta_stats.num_wr++;
1730
1898
      break;
1731
1899
    
1732
1900
    case CEPH_OSD_OP_DELETE:
1733
 
      if (!obs.exists) {
1734
 
        result = -ENOENT;
1735
 
      } else {
1736
 
        _delete_head(ctx);
1737
 
      }
 
1901
      result = _delete_head(ctx);
1738
1902
      break;
1739
1903
 
1740
1904
    case CEPH_OSD_OP_CLONERANGE:
1741
1905
      {
1742
1906
        if (!obs.exists) {
1743
1907
          t.touch(coll, obs.oi.soid);
1744
 
          maybe_created = true;
 
1908
          ctx->delta_stats.num_objects++;
 
1909
          obs.exists = true;
1745
1910
        }
1746
1911
        if (op.clonerange.src_offset + op.clonerange.length > src_obc->obs.oi.size) {
1747
1912
          dout(10) << " clonerange source " << osd_op.soid << " "
1808
1973
      {
1809
1974
        if (!obs.exists) {
1810
1975
          t.touch(coll, soid);
1811
 
          maybe_created = true;
 
1976
          ctx->delta_stats.num_objects++;
 
1977
          obs.exists = true;
1812
1978
        }
1813
1979
        string aname;
1814
1980
        bp.copy(op.xattr.name_len, aname);
1841
2007
        newop.op.op = CEPH_OSD_OP_WRITE;
1842
2008
        newop.op.extent.offset = oi.size;
1843
2009
        newop.op.extent.length = op.extent.length;
1844
 
        newop.data = osd_op.data;
1845
 
        do_osd_ops(ctx, nops, odata);
 
2010
        newop.indata = osd_op.indata;
 
2011
        do_osd_ops(ctx, nops);
 
2012
        osd_op.outdata.claim(newop.outdata);
1846
2013
      }
1847
2014
      break;
1848
2015
 
1859
2026
        newop.op.op = CEPH_OSD_OP_READ;
1860
2027
        newop.op.extent.offset = 0;
1861
2028
        newop.op.extent.length = 0;
1862
 
        do_osd_ops(ctx, nops, odata);
 
2029
        do_osd_ops(ctx, nops);
 
2030
        osd_op.outdata.claim(newop.outdata);
1863
2031
      }
1864
2032
      break;
1865
2033
 
1883
2051
        OSDOp& newop = nops[0];
1884
2052
        newop.op.op = CEPH_OSD_OP_WRITEFULL;
1885
2053
        newop.op.extent.offset = 0;
1886
 
        newop.op.extent.length = osd_op.data.length();
1887
 
        newop.data = osd_op.data;
1888
 
        bufferlist r;
1889
 
        do_osd_ops(ctx, nops, r);
 
2054
        newop.op.extent.length = osd_op.indata.length();
 
2055
        newop.indata = osd_op.indata;
 
2056
        do_osd_ops(ctx, nops);
1890
2057
      }
1891
2058
      break;
1892
2059
 
1895
2062
        dout(10) << "tmapup is a no-op" << dendl;
1896
2063
      } else {
1897
2064
        // read the whole object
1898
 
        bufferlist ibl;
1899
2065
        vector<OSDOp> nops(1);
1900
2066
        OSDOp& newop = nops[0];
1901
2067
        newop.op.op = CEPH_OSD_OP_READ;
1902
2068
        newop.op.extent.offset = 0;
1903
2069
        newop.op.extent.length = 0;
1904
 
        do_osd_ops(ctx, nops, ibl);
1905
 
        dout(10) << "tmapup read " << ibl.length() << dendl;
 
2070
        do_osd_ops(ctx, nops);
 
2071
 
 
2072
        dout(10) << "tmapup read " << newop.outdata.length() << dendl;
1906
2073
 
1907
2074
        dout(30) << " starting is \n";
1908
 
        ibl.hexdump(*_dout);
 
2075
        newop.outdata.hexdump(*_dout);
1909
2076
        *_dout << dendl;
1910
2077
 
1911
 
        bufferlist::iterator ip = ibl.begin();
 
2078
        bufferlist::iterator ip = newop.outdata.begin();
1912
2079
        bufferlist obl;
1913
2080
 
1914
2081
        dout(30) << "the update command is: \n";
1915
 
        osd_op.data.hexdump(*_dout);
 
2082
        osd_op.indata.hexdump(*_dout);
1916
2083
        *_dout << dendl;
1917
2084
 
1918
2085
        // header
1919
2086
        bufferlist header;
1920
2087
        __u32 nkeys = 0;
1921
 
        if (ibl.length()) {
 
2088
        if (newop.outdata.length()) {
1922
2089
          ::decode(header, ip);
1923
2090
          ::decode(nkeys, ip);
1924
2091
        }
2013
2180
        }
2014
2181
        if (!ip.end()) {
2015
2182
          bufferlist rest;
2016
 
          rest.substr_of(ibl, ip.get_off(), ibl.length() - ip.get_off());
 
2183
          rest.substr_of(newop.outdata, ip.get_off(), newop.outdata.length() - ip.get_off());
2017
2184
          dout(20) << "  keep trailing " << rest.length()
2018
 
                             << " at " << newkeydata.length() << dendl;
 
2185
                   << " at " << newkeydata.length() << dendl;
2019
2186
          newkeydata.claim_append(rest);
2020
2187
        }
2021
2188
 
2045
2212
          newop.op.op = CEPH_OSD_OP_WRITEFULL;
2046
2213
          newop.op.extent.offset = 0;
2047
2214
          newop.op.extent.length = obl.length();
2048
 
          newop.data = obl;
2049
 
          do_osd_ops(ctx, nops, odata);
 
2215
          newop.indata = obl;
 
2216
          do_osd_ops(ctx, nops);
 
2217
          osd_op.outdata.claim(newop.outdata);
2050
2218
        }
2051
2219
      }
2052
2220
      break;
2059
2227
      result = -EOPNOTSUPP;
2060
2228
    }
2061
2229
 
2062
 
    if (!obs.exists && maybe_created) {
2063
 
      ctx->delta_stats.num_objects++;
2064
 
      obs.exists = true;
2065
 
    }
 
2230
    ctx->bytes_read += osd_op.outdata.length();
2066
2231
 
2067
2232
    if (result < 0 && (op.flags & CEPH_OSD_OP_FLAG_FAILOK))
2068
2233
      result = 0;
2073
2238
  return result;
2074
2239
}
2075
2240
 
2076
 
inline void ReplicatedPG::_delete_head(OpContext *ctx)
 
2241
inline int ReplicatedPG::_delete_head(OpContext *ctx)
2077
2242
{
2078
2243
  SnapSet& snapset = ctx->new_snapset;
2079
2244
  ObjectState& obs = ctx->new_obs;
2081
2246
  const hobject_t& soid = oi.soid;
2082
2247
  ObjectStore::Transaction& t = ctx->op_t;
2083
2248
 
2084
 
  if (obs.exists)
2085
 
    t.remove(coll, soid);
2086
 
  if (snapset.clones.size()) {
2087
 
    snapid_t newest = *snapset.clones.rbegin();
2088
 
    add_interval_usage(snapset.clone_overlap[newest], ctx->delta_stats);
 
2249
  if (!obs.exists)
 
2250
    return -ENOENT;
 
2251
  
 
2252
  t.remove(coll, soid);
2089
2253
 
2090
 
    if (oi.size > 0) {
2091
 
      interval_set<uint64_t> ch;
2092
 
      ch.insert(0, oi.size);
2093
 
      ctx->modified_ranges.union_of(ch);
2094
 
    }
 
2254
  if (oi.size > 0) {
 
2255
    interval_set<uint64_t> ch;
 
2256
    ch.insert(0, oi.size);
 
2257
    ctx->modified_ranges.union_of(ch);
2095
2258
  }
2096
 
  if (obs.exists) {
2097
 
    ctx->delta_stats.num_objects--;
2098
 
    ctx->delta_stats.num_bytes -= oi.size;
2099
 
    ctx->delta_stats.num_kb -= SHIFT_ROUND_UP(oi.size, 10);
2100
 
    oi.size = 0;
2101
 
    snapset.head_exists = false;
2102
 
    obs.exists = false;
2103
 
  }      
 
2259
 
 
2260
  ctx->delta_stats.num_objects--;
 
2261
  ctx->delta_stats.num_bytes -= oi.size;
 
2262
 
 
2263
  oi.size = 0;
 
2264
  snapset.head_exists = false;
 
2265
  obs.exists = false;
 
2266
 
2104
2267
  ctx->delta_stats.num_wr++;
 
2268
  return 0;
2105
2269
}
2106
2270
 
2107
2271
int ReplicatedPG::_rollback_to(OpContext *ctx, ceph_osd_op& op)
2193
2357
        ctx->delta_stats.num_objects++;
2194
2358
      }
2195
2359
      ctx->delta_stats.num_bytes -= obs.oi.size;
2196
 
      ctx->delta_stats.num_bytes -= SHIFT_ROUND_UP(obs.oi.size, 10);
2197
2360
      ctx->delta_stats.num_bytes += rollback_to->obs.oi.size;
2198
 
      ctx->delta_stats.num_bytes += SHIFT_ROUND_UP(rollback_to->obs.oi.size, 10);
2199
2361
      obs.oi.size = rollback_to->obs.oi.size;
2200
2362
      snapset.head_exists = true;
2201
2363
    }
2303
2465
    ctx->at_version.version++;
2304
2466
  }
2305
2467
 
2306
 
  // update most recent clone_overlap
 
2468
  // update most recent clone_overlap and usage stats
2307
2469
  if (ctx->new_snapset.clones.size() > 0) {
2308
2470
    interval_set<uint64_t> &newest_overlap = ctx->new_snapset.clone_overlap.rbegin()->second;
2309
2471
    ctx->modified_ranges.intersection_of(newest_overlap);
 
2472
    // modified_ranges is still in use by the clone
 
2473
    add_interval_usage(ctx->modified_ranges, ctx->delta_stats);
2310
2474
    newest_overlap.subtract(ctx->modified_ranges);
2311
2475
  }
2312
2476
  
2326
2490
                                               SnapSet& ss, interval_set<uint64_t>& modified,
2327
2491
                                               uint64_t offset, uint64_t length, bool count_bytes)
2328
2492
{
2329
 
  if (ss.clones.size()) {
2330
 
    interval_set<uint64_t> ch;
2331
 
    if (length)
2332
 
      ch.insert(offset, length);
2333
 
    modified.union_of(ch);
2334
 
    add_interval_usage(ch, delta_stats);
2335
 
  }
 
2493
  interval_set<uint64_t> ch;
 
2494
  if (length)
 
2495
    ch.insert(offset, length);
 
2496
  modified.union_of(ch);
2336
2497
  if (length && (offset + length > oi.size)) {
2337
2498
    uint64_t new_size = offset + length;
2338
2499
    delta_stats.num_bytes += new_size - oi.size;
2339
 
    delta_stats.num_kb += SHIFT_ROUND_UP(new_size, 10) - SHIFT_ROUND_UP(oi.size, 10);
2340
2500
    oi.size = new_size;
2341
2501
  }
2342
2502
  delta_stats.num_wr++;
2348
2508
{
2349
2509
  for (interval_set<uint64_t>::const_iterator p = s.begin(); p != s.end(); ++p) {
2350
2510
    delta_stats.num_bytes += p.get_len();
2351
 
    delta_stats.num_kb += SHIFT_ROUND_UP(p.get_start() + p.get_len(), 10) - (p.get_start() >> 10);
2352
2511
  }
2353
2512
}
2354
2513
 
2375
2534
        dout(10) << " connected to " << w << " by " << entity << " session " << session << dendl;
2376
2535
        obc->watchers[entity] = session;
2377
2536
        session->get();
2378
 
        session->watches[obc] = osd->osdmap->object_locator_to_pg(soid.oid, obc->obs.oi.oloc);
 
2537
        session->watches[obc] = get_osdmap()->object_locator_to_pg(soid.oid, obc->obs.oi.oloc);
2379
2538
        obc->ref++;
2380
2539
      } else if (iter->second == session) {
2381
2540
        // already there
2389
2548
        iter->second->put();
2390
2549
        iter->second = session;
2391
2550
        session->get();
2392
 
        session->watches[obc] = osd->osdmap->object_locator_to_pg(soid.oid, obc->obs.oi.oloc);
 
2551
        session->watches[obc] = get_osdmap()->object_locator_to_pg(soid.oid, obc->obs.oi.oloc);
2393
2552
      }
2394
2553
      map<entity_name_t, Context *>::iterator un_iter =
2395
2554
        obc->unconnected_watchers.find(entity);
2428
2587
      Watch::Notification *notif = new Watch::Notification(ctx->reqid.name, session, p->cookie, p->bl);
2429
2588
      session->get();  // notif got a reference
2430
2589
      session->con->get();
2431
 
      notif->pgid = osd->osdmap->object_locator_to_pg(soid.oid, obc->obs.oi.oloc);
 
2590
      notif->pgid = get_osdmap()->object_locator_to_pg(soid.oid, obc->obs.oi.oloc);
2432
2591
 
2433
2592
      osd->watch->add_notification(notif);
2434
2593
 
2449
2608
          osd->client_messenger->send_message(notify_msg, s->con);
2450
2609
        } else {
2451
2610
          // unconnected
2452
 
          utime_t now = ceph_clock_now(g_ceph_context);
2453
2611
          entity_name_t name = i->first;
2454
2612
          notif->add_watcher(name, Watch::WATCHER_PENDING);
2455
2613
        }
2499
2657
  bool head_existed = ctx->obs->exists;
2500
2658
 
2501
2659
  // prepare the actual mutation
2502
 
  int result = do_osd_ops(ctx, ctx->ops, ctx->outdata);
 
2660
  int result = do_osd_ops(ctx, ctx->ops);
2503
2661
  if (result < 0)
2504
2662
    return result;
2505
2663
 
2507
2665
  if (result == 0)
2508
2666
    do_osd_op_effects(ctx);
2509
2667
 
2510
 
  ctx->bytes_read = ctx->outdata.length();
2511
 
 
2512
2668
  // read-op?  done?
2513
2669
  if (ctx->op_t.empty() && !ctx->modify) {
2514
2670
    ctx->reply_version = ctx->obs->oi.user_version;
2612
2768
  ctx->obc->ssc->snapset = ctx->new_snapset;
2613
2769
  info.stats.stats.add(ctx->delta_stats, ctx->obc->obs.oi.category);
2614
2770
 
 
2771
  if (backfill_target >= 0) {
 
2772
    Info& pinfo = peer_info[backfill_target];
 
2773
    if (soid < pinfo.last_backfill)
 
2774
      pinfo.stats.stats.add(ctx->delta_stats, ctx->obc->obs.oi.category);
 
2775
    else if (soid < backfill_pos)
 
2776
      pending_backfill_updates[soid].stats.add(ctx->delta_stats, ctx->obc->obs.oi.category);
 
2777
  }
 
2778
 
2615
2779
  return result;
2616
2780
}
2617
2781
 
2660
2824
 
2661
2825
  repop->applying = true;
2662
2826
 
 
2827
  repop->tls.push_back(&repop->ctx->local_t);
2663
2828
  repop->tls.push_back(&repop->ctx->op_t);
2664
 
  repop->tls.push_back(&repop->ctx->local_t);
2665
2829
 
2666
2830
  repop->obc->ondisk_write_lock();
2667
2831
  if (repop->ctx->clone_obc)
2715
2879
  put_object_contexts(repop->src_obc);
2716
2880
  repop->obc = 0;
2717
2881
 
 
2882
  assert(info.last_update >= repop->v);
 
2883
  assert(last_update_applied < repop->v);
2718
2884
  last_update_applied = repop->v;
2719
2885
  if (last_update_applied == info.last_update && finalizing_scrub) {
2720
2886
    dout(10) << "requeueing scrub for cleanup" << dendl;
2792
2958
        if (reply)
2793
2959
          repop->ctx->reply = NULL;
2794
2960
        else
2795
 
          reply = new MOSDOpReply(op, 0, osd->osdmap->get_epoch(), 0);
 
2961
          reply = new MOSDOpReply(op, 0, get_osdmap()->get_epoch(), 0);
2796
2962
        reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK);
2797
2963
        dout(10) << " sending commit on " << *repop << " " << reply << dendl;
2798
2964
        assert(entity_name_t::TYPE_OSD != op->get_connection()->peer_type);
2809
2975
        if (reply)
2810
2976
          repop->ctx->reply = NULL;
2811
2977
        else
2812
 
          reply = new MOSDOpReply(op, 0, osd->osdmap->get_epoch(), 0);
 
2978
          reply = new MOSDOpReply(op, 0, get_osdmap()->get_epoch(), 0);
2813
2979
        reply->add_flags(CEPH_OSD_FLAG_ACK);
2814
2980
        dout(10) << " sending ack on " << *repop << " " << reply << dendl;
2815
2981
        assert(entity_name_t::TYPE_OSD != op->get_connection()->peer_type);
2861
3027
 
2862
3028
  repop->v = ctx->at_version;
2863
3029
 
 
3030
  // add myself to gather set
 
3031
  repop->waitfor_ack.insert(acting[0]);
 
3032
  repop->waitfor_disk.insert(acting[0]);
 
3033
 
2864
3034
  int acks_wanted = CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK;
2865
3035
 
2866
3036
  for (unsigned i=1; i<acting.size(); i++) {
2867
3037
    int peer = acting[i];
2868
 
    
 
3038
    Info &pinfo = peer_info[peer];
 
3039
 
 
3040
    repop->waitfor_ack.insert(peer);
 
3041
    repop->waitfor_disk.insert(peer);
 
3042
 
2869
3043
    // forward the write/update/whatever
2870
3044
    MOSDSubOp *wr = new MOSDSubOp(repop->ctx->reqid, info.pgid, soid,
2871
3045
                                  false, acks_wanted,
2872
 
                                  osd->osdmap->get_epoch(), 
 
3046
                                  get_osdmap()->get_epoch(),
2873
3047
                                  repop->rep_tid, repop->ctx->at_version);
2874
3048
 
2875
3049
    if (op && op->get_flags() & CEPH_OSD_FLAG_PARALLELEXEC) {
2876
3050
      // replicate original op for parallel execution on replica
 
3051
      assert(0 == "broken implementation, do not use");
2877
3052
      wr->oloc = repop->ctx->obs->oi.oloc;
2878
3053
      wr->ops = repop->ctx->ops;
2879
3054
      wr->mtime = repop->ctx->mtime;
2885
3060
      wr->set_data(repop->ctx->op->get_data());   // _copy_ bufferlist
2886
3061
    } else {
2887
3062
      // ship resulting transaction, log entries, and pg_stats
2888
 
      ::encode(repop->ctx->op_t, wr->get_data());
 
3063
      if (peer == backfill_target && soid >= backfill_pos) {
 
3064
        dout(10) << "issue_repop shipping empty opt to osd." << peer << ", object beyond backfill_pos "
 
3065
                 << backfill_pos << ", last_backfill is " << pinfo.last_backfill << dendl;
 
3066
        ObjectStore::Transaction t;
 
3067
        ::encode(t, wr->get_data());
 
3068
      } else {
 
3069
        ::encode(repop->ctx->op_t, wr->get_data());
 
3070
      }
2889
3071
      ::encode(repop->ctx->log, wr->logbl);
2890
 
      wr->pg_stats = info.stats;
 
3072
 
 
3073
      if (backfill_target >= 0 && backfill_target == peer)
 
3074
        wr->pg_stats = pinfo.stats;  // reflects backfill progress
 
3075
      else
 
3076
        wr->pg_stats = info.stats;
2891
3077
    }
2892
3078
    
2893
3079
    wr->pg_trim_to = pg_trim_to;
2894
 
    osd->cluster_messenger->send_message(wr, osd->osdmap->get_cluster_inst(peer));
 
3080
    osd->cluster_messenger->send_message(wr, get_osdmap()->get_cluster_inst(peer));
2895
3081
 
2896
3082
    // keep peer_info up to date
2897
 
    Info &in = peer_info[peer];
2898
 
    in.last_update = ctx->at_version;
2899
 
    if (in.last_complete == old_last_update)
2900
 
      in.last_update = ctx->at_version;
 
3083
    if (pinfo.last_complete == pinfo.last_update)
 
3084
      pinfo.last_update = ctx->at_version;
 
3085
    pinfo.last_update = ctx->at_version;
2901
3086
  }
2902
3087
}
2903
3088
 
2915
3100
  mode.write_start();
2916
3101
  dout(10) << "new_repop mode now " << mode << " (start_write)" << dendl;
2917
3102
 
2918
 
  // initialize gather sets
2919
 
  for (unsigned i=0; i<acting.size(); i++) {
2920
 
    int osd = acting[i];
2921
 
    repop->waitfor_ack.insert(osd);
2922
 
    repop->waitfor_disk.insert(osd);
2923
 
  }
2924
 
 
2925
3103
  repop->start = ceph_clock_now(g_ceph_context);
2926
3104
 
2927
3105
  repop_queue.push_back(&repop->queue_item);
3053
3231
  OpContext *ctx = new OpContext(NULL, reqid, ops, &obc->obs, obc->ssc, this);
3054
3232
  ctx->mtime = ceph_clock_now(g_ceph_context);
3055
3233
 
3056
 
  ctx->at_version.epoch = osd->osdmap->get_epoch();
 
3234
  ctx->at_version.epoch = get_osdmap()->get_epoch();
3057
3235
  ctx->at_version.version = log.head.version + 1;
3058
3236
 
3059
3237
  entity_inst_t nobody;
3081
3259
  ::encode(obc->obs.oi, bl);
3082
3260
  t->setattr(coll, obc->obs.oi.soid, OI_ATTR, bl);
3083
3261
 
3084
 
  ctx->at_version.version++;
3085
 
 
3086
3262
  append_log(repop->ctx->log, eversion_t(), repop->ctx->local_t);
3087
3263
 
3088
3264
  // obc ref swallowed by repop!
3091
3267
  eval_repop(repop);
3092
3268
}
3093
3269
 
 
3270
ReplicatedPG::ObjectContext *ReplicatedPG::_lookup_object_context(const hobject_t& oid)
 
3271
{
 
3272
  map<hobject_t, ObjectContext*>::iterator p = object_contexts.find(oid);
 
3273
  if (p != object_contexts.end())
 
3274
    return p->second;
 
3275
  return NULL;
 
3276
}
 
3277
 
3094
3278
ReplicatedPG::ObjectContext *ReplicatedPG::get_object_context(const hobject_t& soid,
3095
3279
                                                              const object_locator_t& oloc,
3096
3280
                                                              bool can_create)
3099
3283
  ObjectContext *obc;
3100
3284
  if (p != object_contexts.end()) {
3101
3285
    obc = p->second;
3102
 
    dout(10) << "get_object_context " << soid << " " << obc->ref
 
3286
    dout(10) << "get_object_context " << obc << " " << soid << " " << obc->ref
3103
3287
             << " -> " << (obc->ref+1) << dendl;
3104
3288
  } else {
3105
3289
    // check disk
3138
3322
    } else {
3139
3323
      obc->obs.exists = false;
3140
3324
    }
3141
 
    dout(10) << "get_object_context " << soid << " read " << obc->obs.oi << dendl;
 
3325
    dout(10) << "get_object_context " << obc << " " << soid << " 0 -> 1 read " << obc->obs.oi << dendl;
3142
3326
  }
3143
3327
  obc->ref++;
3144
3328
  return obc;
3256
3440
 
3257
3441
void ReplicatedPG::put_object_context(ObjectContext *obc)
3258
3442
{
3259
 
  dout(10) << "put_object_context " << obc->obs.oi.soid << " "
 
3443
  dout(10) << "put_object_context " << obc << " " << obc->obs.oi.soid << " "
3260
3444
           << obc->ref << " -> " << (obc->ref-1) << dendl;
3261
3445
 
3262
3446
  if (mode.wake) {
3290
3474
  obcv.clear();
3291
3475
}
3292
3476
 
 
3477
void ReplicatedPG::add_object_context_to_pg_stat(ObjectContext *obc, pg_stat_t *pgstat)
 
3478
{
 
3479
  object_info_t& oi = obc->obs.oi;
 
3480
 
 
3481
  dout(10) << "add_object_context_to_pg_stat " << oi.soid << dendl;
 
3482
  object_stat_sum_t stat;
 
3483
 
 
3484
  if (oi.soid.snap != CEPH_SNAPDIR)
 
3485
    stat.num_objects++;
 
3486
 
 
3487
  stat.num_bytes += oi.size;
 
3488
 
 
3489
  if (oi.soid.snap && oi.soid.snap != CEPH_NOSNAP) {
 
3490
    stat.num_object_clones++;
 
3491
 
 
3492
    if (!obc->ssc)
 
3493
      obc->ssc = get_snapset_context(oi.soid.oid,
 
3494
                                     oi.soid.get_key(),
 
3495
                                     oi.soid.hash,
 
3496
                                     false);
 
3497
 
 
3498
    // subtract off clone overlap
 
3499
    if (obc->ssc->snapset.clone_overlap.count(oi.soid.snap)) {
 
3500
      interval_set<uint64_t>& o = obc->ssc->snapset.clone_overlap[oi.soid.snap];
 
3501
      for (interval_set<uint64_t>::const_iterator r = o.begin();
 
3502
           r != o.end();
 
3503
           ++r) {
 
3504
        stat.num_bytes -= r.get_len();
 
3505
      }   
 
3506
    }
 
3507
  }
 
3508
 
 
3509
  // add it in
 
3510
  pgstat->stats.sum.add(stat);
 
3511
  if (oi.category.length())
 
3512
    pgstat->stats.cat_sum[oi.category].add(stat);
 
3513
}
 
3514
 
3293
3515
ReplicatedPG::SnapSetContext *ReplicatedPG::get_snapset_context(const object_t& oid,
3294
3516
                                                                const string& key,
3295
3517
                                                                ps_t seed,
3451
3673
 
3452
3674
  if (!rm->committed) {
3453
3675
    // send ack to acker only if we haven't sent a commit already
3454
 
    MOSDSubOpReply *ack = new MOSDSubOpReply(rm->op, 0, osd->osdmap->get_epoch(), CEPH_OSD_FLAG_ACK);
 
3676
    MOSDSubOpReply *ack = new MOSDSubOpReply(rm->op, 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ACK);
3455
3677
    ack->set_priority(CEPH_MSG_PRIO_HIGH); // this better match commit priority!
3456
 
    osd->cluster_messenger->send_message(ack, osd->osdmap->get_cluster_inst(rm->ackerosd));
 
3678
    osd->cluster_messenger->send_message(ack, get_osdmap()->get_cluster_inst(rm->ackerosd));
3457
3679
  }
3458
3680
 
3459
3681
  rm->applied = true;
3460
3682
  bool done = rm->applied && rm->committed;
3461
3683
 
 
3684
  assert(info.last_update >= rm->op->version);
 
3685
  assert(last_update_applied < rm->op->version);
3462
3686
  last_update_applied = rm->op->version;
 
3687
  if (finalizing_scrub) {
 
3688
    assert(active_rep_scrub);
 
3689
    assert(info.last_update <= active_rep_scrub->scrub_to);
 
3690
    if (last_update_applied == active_rep_scrub->scrub_to) {
 
3691
      osd->rep_scrub_wq.queue(active_rep_scrub);
 
3692
      active_rep_scrub = 0;
 
3693
    }
 
3694
  }
3463
3695
 
3464
3696
  unlock();
3465
3697
  if (done) {
3481
3713
 
3482
3714
  log_subop_stats(rm->op, l_osd_sop_w_inb, l_osd_sop_w_lat);
3483
3715
 
3484
 
  if (osd->osdmap->is_up(rm->ackerosd)) {
 
3716
  if (get_osdmap()->is_up(rm->ackerosd)) {
3485
3717
    last_complete_ondisk = rm->last_complete;
3486
 
    MOSDSubOpReply *commit = new MOSDSubOpReply(rm->op, 0, osd->osdmap->get_epoch(), CEPH_OSD_FLAG_ONDISK);
 
3718
    MOSDSubOpReply *commit = new MOSDSubOpReply(rm->op, 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ONDISK);
3487
3719
    commit->set_last_complete_ondisk(rm->last_complete);
3488
3720
    commit->set_priority(CEPH_MSG_PRIO_HIGH); // this better match ack priority!
3489
 
    osd->cluster_messenger->send_message(commit, osd->osdmap->get_cluster_inst(rm->ackerosd));
 
3721
    osd->cluster_messenger->send_message(commit, get_osdmap()->get_cluster_inst(rm->ackerosd));
3490
3722
  }
3491
3723
  
3492
3724
  rm->committed = true;
3531
3763
 
3532
3764
void ReplicatedPG::calc_head_subsets(SnapSet& snapset, const hobject_t& head,
3533
3765
                                     Missing& missing,
 
3766
                                     const hobject_t &last_backfill,
3534
3767
                                     interval_set<uint64_t>& data_subset,
3535
3768
                                     map<hobject_t, interval_set<uint64_t> >& clone_subsets)
3536
3769
{
3549
3782
    hobject_t c = head;
3550
3783
    c.snap = snapset.clones[j];
3551
3784
    prev.intersection_of(snapset.clone_overlap[snapset.clones[j]]);
3552
 
    if (!missing.is_missing(c)) {
 
3785
    if (!missing.is_missing(c) && c < last_backfill) {
3553
3786
      dout(10) << "calc_head_subsets " << head << " has prev " << c
3554
3787
               << " overlap " << prev << dendl;
3555
3788
      clone_subsets[c] = prev;
3572
3805
 
3573
3806
void ReplicatedPG::calc_clone_subsets(SnapSet& snapset, const hobject_t& soid,
3574
3807
                                      Missing& missing,
 
3808
                                      const hobject_t &last_backfill,
3575
3809
                                      interval_set<uint64_t>& data_subset,
3576
3810
                                      map<hobject_t, interval_set<uint64_t> >& clone_subsets)
3577
3811
{
3594
3828
    hobject_t c = soid;
3595
3829
    c.snap = snapset.clones[j];
3596
3830
    prev.intersection_of(snapset.clone_overlap[snapset.clones[j]]);
3597
 
    if (!missing.is_missing(c)) {
 
3831
    if (!missing.is_missing(c) && c < last_backfill) {
3598
3832
      dout(10) << "calc_clone_subsets " << soid << " has prev " << c
3599
3833
               << " overlap " << prev << dendl;
3600
3834
      clone_subsets[c] = prev;
3613
3847
    hobject_t c = soid;
3614
3848
    c.snap = snapset.clones[j];
3615
3849
    next.intersection_of(snapset.clone_overlap[snapset.clones[j-1]]);
3616
 
    if (!missing.is_missing(c)) {
 
3850
    if (!missing.is_missing(c) && c < last_backfill) {
3617
3851
      dout(10) << "calc_clone_subsets " << soid << " has next " << c
3618
3852
               << " overlap " << next << dendl;
3619
3853
      clone_subsets[c] = next;
3654
3888
    for (set<int>::iterator p = q->second.begin();
3655
3889
         p != q->second.end();
3656
3890
         p++) {
3657
 
      if (osd->osdmap->is_up(*p)) {
 
3891
      if (get_osdmap()->is_up(*p)) {
3658
3892
        fromosd = *p;
3659
3893
        break;
3660
3894
      }
3709
3943
    // check snapset
3710
3944
    SnapSetContext *ssc = get_snapset_context(soid.oid, soid.get_key(), soid.hash, false);
3711
3945
    dout(10) << " snapset " << ssc->snapset << dendl;
3712
 
    calc_clone_subsets(ssc->snapset, soid, missing,
 
3946
    calc_clone_subsets(ssc->snapset, soid, missing, info.last_backfill,
3713
3947
                       data_subset, clone_subsets);
3714
3948
    put_snapset_context(ssc);
3715
3949
    // FIXME: this may overestimate if we are pulling multiple clones in parallel...
3755
3989
           << " tid " << tid << dendl;
3756
3990
 
3757
3991
  MOSDSubOp *subop = new MOSDSubOp(rid, info.pgid, soid, false, CEPH_OSD_FLAG_ACK,
3758
 
                                   osd->osdmap->get_epoch(), tid, v);
 
3992
                                   get_osdmap()->get_epoch(), tid, v);
3759
3993
  subop->ops = vector<OSDOp>(1);
3760
3994
  subop->ops[0].op.op = CEPH_OSD_OP_PULL;
3761
3995
  subop->data_subset = data_subset;
3765
3999
  // when the object is pushed back.
3766
4000
  //subop->clone_subsets.swap(clone_subsets);
3767
4001
 
3768
 
  osd->cluster_messenger->send_message(subop, osd->osdmap->get_cluster_inst(fromosd));
 
4002
  osd->cluster_messenger->send_message(subop, get_osdmap()->get_cluster_inst(fromosd));
3769
4003
 
3770
4004
  osd->logger->inc(l_osd_pull);
3771
4005
}
3772
4006
 
 
4007
void ReplicatedPG::send_remove_op(const hobject_t& oid, eversion_t v, int peer)
 
4008
{
 
4009
  tid_t tid = osd->get_tid();
 
4010
  osd_reqid_t rid(osd->cluster_messenger->get_myname(), 0, tid);
 
4011
 
 
4012
  dout(10) << "send_remove_op " << oid << " from osd." << peer
 
4013
           << " tid " << tid << dendl;
 
4014
  
 
4015
  MOSDSubOp *subop = new MOSDSubOp(rid, info.pgid, oid, false, CEPH_OSD_FLAG_ACK,
 
4016
                                   get_osdmap()->get_epoch(), tid, v);
 
4017
  subop->ops = vector<OSDOp>(1);
 
4018
  subop->ops[0].op.op = CEPH_OSD_OP_DELETE;
 
4019
 
 
4020
  osd->cluster_messenger->send_message(subop, get_osdmap()->get_cluster_inst(peer));
 
4021
}
3773
4022
 
3774
4023
/*
3775
4024
 * intelligently push an object to a replica.  make use of existing
3803
4052
    }
3804
4053
 
3805
4054
    // try to base push off of clones that succeed/preceed poid
3806
 
    // we need the head (and current SnapSet) to do that.
 
4055
    // we need the head (and current SnapSet) locally to do that.
3807
4056
    if (missing.is_missing(head)) {
3808
4057
      dout(15) << "push_to_replica missing head " << head << ", pushing raw clone" << dendl;
3809
4058
      return push_start(soid, peer);
3812
4061
    snapdir.snap = CEPH_SNAPDIR;
3813
4062
    if (missing.is_missing(snapdir)) {
3814
4063
      dout(15) << "push_to_replica missing snapdir " << snapdir << ", pushing raw clone" << dendl;
3815
 
      return push_start(snapdir, peer);
 
4064
      return push_start(soid, peer);
3816
4065
    }
3817
4066
    
3818
4067
    SnapSetContext *ssc = get_snapset_context(soid.oid, soid.get_key(), soid.hash, false);
3819
4068
    dout(15) << "push_to_replica snapset is " << ssc->snapset << dendl;
3820
4069
    calc_clone_subsets(ssc->snapset, soid, peer_missing[peer],
 
4070
                       peer_info[peer].last_backfill,
3821
4071
                       data_subset, clone_subsets);
3822
4072
    put_snapset_context(ssc);
3823
4073
  } else if (soid.snap == CEPH_NOSNAP) {
3825
4075
    // base this on partially on replica's clones?
3826
4076
    SnapSetContext *ssc = get_snapset_context(soid.oid, soid.get_key(), soid.hash, false);
3827
4077
    dout(15) << "push_to_replica snapset is " << ssc->snapset << dendl;
3828
 
    calc_head_subsets(ssc->snapset, soid, peer_missing[peer], data_subset, clone_subsets);
 
4078
    calc_head_subsets(ssc->snapset, soid, peer_missing[peer],
 
4079
                      peer_info[peer].last_backfill,
 
4080
                      data_subset, clone_subsets);
3829
4081
    put_snapset_context(ssc);
3830
4082
  }
3831
4083
 
3925
4177
  tid_t tid = osd->get_tid();
3926
4178
  osd_reqid_t rid(osd->cluster_messenger->get_myname(), 0, tid);
3927
4179
  MOSDSubOp *subop = new MOSDSubOp(rid, info.pgid, soid, false, 0,
3928
 
                                   osd->osdmap->get_epoch(), tid, oi.version);
 
4180
                                   get_osdmap()->get_epoch(), tid, oi.version);
3929
4181
  subop->oloc = oi.oloc;
3930
4182
  subop->ops = vector<OSDOp>(1);
3931
4183
  subop->ops[0].op.op = CEPH_OSD_OP_PUSH;
3932
4184
  //subop->ops[0].op.extent.offset = 0;
3933
4185
  //subop->ops[0].op.extent.length = size;
3934
 
  subop->ops[0].data = bl;
 
4186
  subop->ops[0].indata = bl;
3935
4187
  subop->data_subset = data_subset;
3936
4188
  subop->clone_subsets = clone_subsets;
3937
4189
  subop->attrset.swap(attrset);
3939
4191
  subop->first = first;
3940
4192
  subop->complete = complete;
3941
4193
  osd->cluster_messenger->
3942
 
    send_message(subop, osd->osdmap->get_cluster_inst(peer));
 
4194
    send_message(subop, get_osdmap()->get_cluster_inst(peer));
3943
4195
  return 0;
3944
4196
}
3945
4197
 
3949
4201
  tid_t tid = osd->get_tid();
3950
4202
  osd_reqid_t rid(osd->cluster_messenger->get_myname(), 0, tid);
3951
4203
  MOSDSubOp *subop = new MOSDSubOp(rid, info.pgid, soid, false, 0,
3952
 
                                   osd->osdmap->get_epoch(), tid, eversion_t());
 
4204
                                   get_osdmap()->get_epoch(), tid, eversion_t());
3953
4205
  subop->ops = vector<OSDOp>(1);
3954
4206
  subop->ops[0].op.op = CEPH_OSD_OP_PUSH;
3955
4207
  subop->first = false;
3956
4208
  subop->complete = false;
3957
 
  osd->cluster_messenger->send_message(subop, osd->osdmap->get_cluster_inst(peer));
 
4209
  osd->cluster_messenger->send_message(subop, get_osdmap()->get_cluster_inst(peer));
3958
4210
}
3959
4211
 
3960
4212
void ReplicatedPG::sub_op_push_reply(MOSDSubOpReply *reply)
3989
4241
                   pi->data_subset_pushing, pi->clone_subsets);
3990
4242
    } else {
3991
4243
      // done!
3992
 
      peer_missing[peer].got(soid, pi->version);
 
4244
      if (peer == backfill_target && backfills_in_flight.count(soid))
 
4245
        backfills_in_flight.erase(soid);
 
4246
      else
 
4247
        peer_missing[peer].got(soid, pi->version);
3993
4248
      
3994
4249
      pushing[soid].erase(peer);
3995
4250
      pi = NULL;
4004
4259
          osd->requeue_ops(this, waiting_for_degraded_object[soid]);
4005
4260
          waiting_for_degraded_object.erase(soid);
4006
4261
        }
4007
 
        map<hobject_t, ObjectContext *>::iterator i =
4008
 
          object_contexts.find(soid);
4009
 
        if (i != object_contexts.end()) {
4010
 
          populate_obc_watchers(i->second);
4011
 
        }
 
4262
        finish_degraded_object(soid);
4012
4263
      } else {
4013
4264
        dout(10) << "pushed " << soid << ", still waiting for push ack from " 
4014
4265
                 << pushing[soid].size() << " others" << dendl;
4018
4269
  reply->put();
4019
4270
}
4020
4271
 
 
4272
void ReplicatedPG::finish_degraded_object(const hobject_t& oid)
 
4273
{
 
4274
  dout(10) << "finish_degraded_object " << oid << dendl;
 
4275
  map<hobject_t, ObjectContext *>::iterator i = object_contexts.find(oid);
 
4276
  if (i != object_contexts.end()) {
 
4277
    i->second->get();
 
4278
    for (set<ObjectContext*>::iterator j = i->second->blocking.begin();
 
4279
         j != i->second->blocking.end();
 
4280
         i->second->blocking.erase(j++)) {
 
4281
      dout(10) << " no longer blocking writes for " << (*j)->obs.oi.soid << dendl;
 
4282
      (*j)->blocked_by = NULL;
 
4283
      put_object_context(*j);
 
4284
      put_object_context(i->second);
 
4285
    }
 
4286
    put_object_context(i->second);
 
4287
  }
 
4288
}
4021
4289
 
4022
4290
/** op_pull
4023
4291
 * process request to pull an entire object.
4073
4341
      if (is_replica()) {
4074
4342
        // we are fully up to date.  tell the primary!
4075
4343
        osd->cluster_messenger->
4076
 
          send_message(new MOSDPGTrim(osd->osdmap->get_epoch(), info.pgid,
 
4344
          send_message(new MOSDPGTrim(get_osdmap()->get_epoch(), info.pgid,
4077
4345
                                      last_complete_ondisk),
4078
 
                       osd->osdmap->get_cluster_inst(get_primary()));
 
4346
                       get_osdmap()->get_cluster_inst(get_primary()));
4079
4347
 
4080
4348
        // adjust local snaps!
4081
4349
        adjust_local_snaps();
4097
4365
  put();
4098
4366
}
4099
4367
 
4100
 
void ReplicatedPG::_applied_pushed_object(ObjectStore::Transaction *t, ObjectContext *obc)
 
4368
void ReplicatedPG::_applied_recovered_object(ObjectStore::Transaction *t, ObjectContext *obc)
4101
4369
{
4102
 
  dout(10) << "_applied_pushed_object " << *obc << dendl;
4103
4370
  lock();
 
4371
  dout(10) << "_applied_recovered_object " << *obc << dendl;
 
4372
  if (is_primary())
 
4373
    populate_obc_watchers(obc);
4104
4374
  put_object_context(obc);
4105
4375
  unlock();
4106
4376
  delete t;
4107
4377
}
4108
4378
 
4109
 
void ReplicatedPG::recover_primary_got(hobject_t oid, eversion_t v)
 
4379
void ReplicatedPG::recover_got(hobject_t oid, eversion_t v)
4110
4380
{
4111
4381
  if (missing.is_missing(oid, v)) {
4112
4382
    dout(10) << "got missing " << oid << " v " << v << dendl;
4130
4400
      dout(10) << "last_complete now " << info.last_complete
4131
4401
               << " log.complete_to at end" << dendl;
4132
4402
      assert(missing.num_missing() == 0);  // otherwise, complete_to was wrong.
4133
 
      if (info.last_complete != info.last_update) {
4134
 
        // this happens if the log we are recovering from was a
4135
 
        // backlog, and the most recent entry wasn't last_update.
4136
 
        info.last_complete = info.last_update;
4137
 
        dout(10) << "setting last_complete to last_update " << info.last_complete << dendl;
4138
 
      }
 
4403
      assert(info.last_complete == info.last_update);
4139
4404
    }
4140
4405
  }
4141
4406
}
4215
4480
      clone_subsets.clear();   // forget what pusher said; recalculate cloning.
4216
4481
 
4217
4482
      interval_set<uint64_t> data_needed;
4218
 
      calc_clone_subsets(ssc->snapset, soid, missing, data_needed, clone_subsets);
 
4483
      calc_clone_subsets(ssc->snapset, soid, missing, info.last_backfill,
 
4484
                         data_needed, clone_subsets);
4219
4485
      pi->data_subset = data_needed;
4220
4486
      put_snapset_context(ssc);
4221
4487
 
4261
4527
      // did we get everything we wanted?
4262
4528
      if (pi->data_subset.empty()) {
4263
4529
        complete = true;
 
4530
      } else if (data_subset.empty()) {
 
4531
        complete = false;
4264
4532
      } else {
4265
4533
        complete = pi->data_subset.range_end() == data_subset.range_end();
4266
4534
      }
4367
4635
      }
4368
4636
    }
4369
4637
 
4370
 
    recover_primary_got(soid, v);
 
4638
    recover_got(soid, v);
4371
4639
 
4372
4640
    // update pg
4373
4641
    write_info(*t);
4398
4666
        ssc->snapset.decode(sp);
4399
4667
      }
4400
4668
 
4401
 
      onreadable = new C_OSD_AppliedPushedObject(this, t, obc);
 
4669
      onreadable = new C_OSD_AppliedRecoveredObject(this, t, obc);
4402
4670
      onreadable_sync = new C_OSD_OndiskWriteUnlock(obc);
4403
4671
    } else {
4404
4672
      onreadable = new ObjectStore::C_DeleteTransaction(t);
4429
4697
      update_stats();
4430
4698
    } else {
4431
4699
      // pull more
4432
 
      pi->data_subset_pulling.span_of(pi->data_subset, data_subset.range_end(), g_conf->osd_recovery_max_chunk);
 
4700
      pi->data_subset_pulling.span_of(pi->data_subset, data_subset.empty() ? 0 : data_subset.range_end(),
 
4701
                                      g_conf->osd_recovery_max_chunk);
4433
4702
      dout(10) << " pulling more, " << pi->data_subset_pulling << " of " << pi->data_subset << dendl;
4434
4703
      send_pull_op(soid, v, false, pi->data_subset_pulling, pi->from);
4435
4704
    }
4452
4721
 
4453
4722
  } else {
4454
4723
    // ack if i'm a replica and being pushed to.
4455
 
    MOSDSubOpReply *reply = new MOSDSubOpReply(op, 0, osd->osdmap->get_epoch(), CEPH_OSD_FLAG_ACK);
 
4724
    MOSDSubOpReply *reply = new MOSDSubOpReply(op, 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ACK);
4456
4725
    assert(entity_name_t::TYPE_OSD == op->get_connection()->peer_type);
4457
4726
    osd->cluster_messenger->send_message(reply, op->get_connection());
4458
4727
  }
4463
4732
      dout(20) << " kicking waiters on " << soid << dendl;
4464
4733
      osd->requeue_ops(this, waiting_for_missing_object[soid]);
4465
4734
      waiting_for_missing_object.erase(soid);
 
4735
      if (missing.missing.size() == 0) {
 
4736
        osd->requeue_ops(this, waiting_for_all_missing);
 
4737
        waiting_for_all_missing.clear();
 
4738
      }
4466
4739
    } else {
4467
4740
      dout(20) << " no waiters on " << soid << dendl;
4468
4741
      /*for (hash_map<hobject_t,list<class Message*> >::iterator p = waiting_for_missing_object.begin();
4500
4773
  op->put();
4501
4774
}
4502
4775
 
 
4776
void ReplicatedPG::sub_op_remove(MOSDSubOp *op)
 
4777
{
 
4778
  dout(7) << "sub_op_remove " << op->poid << dendl;
 
4779
 
 
4780
  ObjectStore::Transaction *t = new ObjectStore::Transaction;
 
4781
  remove_object_with_snap_hardlinks(*t, op->poid);
 
4782
  int r = osd->store->queue_transaction(&osr, t);
 
4783
  assert(r == 0);
 
4784
  
 
4785
  op->put();
 
4786
}
4503
4787
 
4504
4788
 
4505
4789
eversion_t ReplicatedPG::pick_newest_available(const hobject_t& oid)
4511
4795
  dout(10) << "pick_newest_available " << oid << " " << v << " on osd." << osd->whoami << " (local)" << dendl;
4512
4796
 
4513
4797
  for (unsigned i=1; i<acting.size(); ++i) {
4514
 
    assert(peer_missing[acting[i]].is_missing(oid));
4515
 
    eversion_t h = peer_missing[acting[i]].missing[oid].have;
4516
 
    dout(10) << "pick_newest_available " << oid << " " << h << " on osd." << acting[i] << dendl;
 
4798
    int peer = acting[i];
 
4799
    if (!peer_missing[peer].is_missing(oid)) {
 
4800
      assert(peer == backfill_target);
 
4801
      continue;
 
4802
    }
 
4803
    eversion_t h = peer_missing[peer].missing[oid].have;
 
4804
    dout(10) << "pick_newest_available " << oid << " " << h << " on osd." << peer << dendl;
4517
4805
    if (h > v)
4518
4806
      v = h;
4519
4807
  }
4585
4873
  C_PG_MarkUnfoundLost *c = new C_PG_MarkUnfoundLost(this);
4586
4874
 
4587
4875
  utime_t mtime = ceph_clock_now(g_ceph_context);
4588
 
  eversion_t old_last_update = info.last_update;
4589
 
  info.last_update.epoch = osd->osdmap->get_epoch();
 
4876
  info.last_update.epoch = get_osdmap()->get_epoch();
4590
4877
  map<hobject_t, Missing::item>::iterator m = missing.missing.begin();
4591
4878
  map<hobject_t, Missing::item>::iterator mend = missing.missing.end();
4592
4879
  while (m != mend) {
4671
4958
 
4672
4959
void ReplicatedPG::_finish_mark_all_unfound_lost(list<ObjectContext*>& obcs)
4673
4960
{
 
4961
  lock();
4674
4962
  dout(10) << "_finish_mark_all_unfound_lost " << dendl;
4675
 
  lock();
 
4963
 
 
4964
  osd->requeue_ops(this, waiting_for_all_missing);
 
4965
  waiting_for_all_missing.clear();
 
4966
 
4676
4967
  while (!obcs.empty()) {
4677
4968
    ObjectContext *obc = obcs.front();
4678
4969
    put_object_context(obc);
4727
5018
       ++i) {
4728
5019
    populate_obc_watchers(i->second);
4729
5020
  }
 
5021
 
 
5022
  for (unsigned i = 1; i<acting.size(); i++) {
 
5023
    if (peer_info[acting[i]].last_backfill != hobject_t::get_max()) {
 
5024
      assert(backfill_target == -1);
 
5025
      backfill_target = acting[i];
 
5026
      backfill_pos = peer_info[acting[i]].last_backfill;
 
5027
      dout(10) << " chose backfill target osd." << backfill_target
 
5028
               << " from " << backfill_pos << dendl;
 
5029
    }
 
5030
  }
4730
5031
}
4731
5032
 
4732
5033
void ReplicatedPG::on_change()
4748
5049
  context_registry_on_change();
4749
5050
 
4750
5051
  // take object waiters
4751
 
  take_object_waiters(waiting_for_missing_object);
4752
 
  take_object_waiters(waiting_for_degraded_object);
 
5052
  requeue_object_waiters(waiting_for_missing_object);
 
5053
  for (map<hobject_t,list<Message*> >::iterator p = waiting_for_degraded_object.begin();
 
5054
       p != waiting_for_degraded_object.end();
 
5055
       waiting_for_degraded_object.erase(p++)) {
 
5056
    osd->requeue_ops(this, p->second);
 
5057
    finish_degraded_object(p->first);
 
5058
  }
 
5059
  osd->requeue_ops(this, waiting_for_all_missing);
 
5060
  waiting_for_all_missing.clear();
4753
5061
 
4754
5062
  // clear pushing/pulling maps
4755
5063
  pushing.clear();
4768
5076
  for (map<eversion_t, list<Message*> >::iterator p = waiting_for_ondisk.begin();
4769
5077
       p != waiting_for_ondisk.end();
4770
5078
       p++)
4771
 
    osd->take_waiters(p->second);
 
5079
    osd->requeue_ops(this, p->second);
4772
5080
  waiting_for_ondisk.clear();
4773
5081
}
4774
5082
 
4781
5089
#ifdef DEBUG_RECOVERY_OIDS
4782
5090
  recovering_oids.clear();
4783
5091
#endif
 
5092
  backfill_pos = hobject_t();
 
5093
  backfills_in_flight.clear();
 
5094
  pending_backfill_updates.clear();
4784
5095
  pulling.clear();
4785
5096
  pushing.clear();
4786
5097
  pull_from_peer.clear();
4787
5098
}
4788
5099
 
4789
 
void ReplicatedPG::check_recovery_op_pulls(const OSDMap *osdmap)
 
5100
void ReplicatedPG::check_recovery_op_pulls(const OSDMapRef osdmap)
4790
5101
{
4791
5102
  for (map<int, set<hobject_t> >::iterator j = pull_from_peer.begin();
4792
5103
       j != pull_from_peer.end();
4836
5147
    // second chance to recovery replicas
4837
5148
    started = recover_replicas(max);
4838
5149
  }
 
5150
  if (backfill_target >= 0 && started < max &&
 
5151
      missing.num_missing() == 0 &&
 
5152
      !waiting_on_backfill) {
 
5153
    started += recover_backfill(max - started);
 
5154
  }
4839
5155
 
4840
5156
  dout(10) << " started " << started << dendl;
4841
 
 
4842
5157
  osd->logger->inc(l_osd_rop, started);
4843
5158
 
4844
 
  if (started)
 
5159
  if (started || recovery_ops_active > 0)
4845
5160
    return started;
4846
5161
 
4847
 
  if (is_all_uptodate()) {
4848
 
    dout(10) << __func__ << ": all OSDs in the PG are up-to-date!" << dendl;
4849
 
    log.reset_recovery_pointers();
4850
 
    ObjectStore::Transaction *t = new ObjectStore::Transaction;
4851
 
    C_Contexts *fin = new C_Contexts(g_ceph_context);
4852
 
    finish_recovery(*t, fin->contexts);
4853
 
    int tr = osd->store->queue_transaction(&osr, t, new ObjectStore::C_DeleteTransaction(t), fin);
4854
 
    assert(tr == 0);
4855
 
  }
4856
 
  else {
4857
 
    dout(10) << __func__ << ": some OSDs are not up-to-date yet. "
4858
 
             << "Recovery isn't finished yet." << dendl;
4859
 
  }
 
5162
  assert(recovery_ops_active == 0);
4860
5163
 
 
5164
  PG::RecoveryCtx rctx(0, 0, 0, 0, 0);
 
5165
  handle_recovery_complete(&rctx);
4861
5166
  return 0;
4862
5167
}
4863
5168
 
4927
5232
            ObjectContext *headobc = get_object_context(head, OLOC_BLANK, false);
4928
5233
 
4929
5234
            object_info_t oi(headobc->obs.oi);
 
5235
            oi.soid = soid;
4930
5236
            oi.version = latest->version;
4931
5237
            oi.prior_version = latest->prior_version;
4932
5238
            bufferlist::iterator i = latest->snaps.begin();
4933
5239
            ::decode(oi.snaps, i);
4934
5240
            assert(oi.snaps.size() > 0);
4935
5241
            oi.copy_user_bits(headobc->obs.oi);
4936
 
            _make_clone(*t, head, soid, &oi);
 
5242
 
 
5243
            ObjectContext *clone_obc = new ObjectContext(oi, true, NULL);
 
5244
            clone_obc->get();
 
5245
            clone_obc->ondisk_write_lock();
 
5246
            clone_obc->ssc = get_snapset_context(soid.oid, soid.get_key(), soid.hash, true);
 
5247
            register_object_context(clone_obc);
 
5248
 
 
5249
            _make_clone(*t, head, soid, &clone_obc->obs.oi);
 
5250
 
 
5251
            Context *onreadable = new C_OSD_AppliedRecoveredObject(this, t, clone_obc);
 
5252
            Context *onreadable_sync = new C_OSD_OndiskWriteUnlock(clone_obc);
 
5253
            int tr = osd->store->queue_transaction(&osr, t, onreadable, NULL, onreadable_sync);
 
5254
            assert(tr == 0);
4937
5255
 
4938
5256
            put_object_context(headobc);
4939
5257
 
4940
 
            // XXX: track objectcontext!
4941
 
            int tr = osd->store->queue_transaction(&osr, t);
4942
 
            assert(tr == 0);
4943
5258
            missing.got(latest->soid, latest->version);
4944
5259
            missing_loc.erase(latest->soid);
4945
5260
            continue;
4969
5284
              obc->obs.oi.encode(b2);
4970
5285
              t->setattr(coll, soid, OI_ATTR, b2);
4971
5286
 
4972
 
              recover_primary_got(soid, latest->version);
 
5287
              recover_got(soid, latest->version);
4973
5288
 
4974
5289
              osd->store->queue_transaction(&osr, t,
4975
 
                                            new C_OSD_AppliedPushedObject(this, t, obc),
 
5290
                                            new C_OSD_AppliedRecoveredObject(this, t, obc),
4976
5291
                                            new C_OSD_CommittedPushedObject(this, NULL,
4977
5292
                                                                            info.history.same_interval_since,
4978
5293
                                                                            info.last_complete),
5129
5444
  return started;
5130
5445
}
5131
5446
 
 
5447
/**
 
5448
 * recover_backfill
 
5449
 *
 
5450
 * Invariants:
 
5451
 *
 
5452
 * backfilled: fully pushed to replica or present in replica's missing set (both
 
5453
 * our copy and theirs).
 
5454
 *
 
5455
 * All objects on backfill_target in [MIN,peer_backfill_info.begin) are either
 
5456
 * not present or backfilled (all removed objects have been removed).
 
5457
 * There may be PG objects in this interval yet to be backfilled.
 
5458
 *
 
5459
 * All objects in PG in [MIN,backfill_info.begin) have been backfilled to
 
5460
 * backfill_target.  There may be objects on backfill_target yet to be deleted.
 
5461
 *
 
5462
 * All objects < MIN(peer_backfill_info.begin, backfill_info.begin) in PG are
 
5463
 * backfilled.  No deleted objects in this interval remain on backfill_target.
 
5464
 *
 
5465
 * peer_info[backfill_target].last_backfill = MIN(peer_backfill_info.begin,
 
5466
 * backfill_info.begin, backfills_in_flight)
 
5467
 */
 
5468
int ReplicatedPG::recover_backfill(int max)
 
5469
{
 
5470
  dout(10) << "recover_backfill (" << max << ")" << dendl;
 
5471
  assert(backfill_target >= 0);
 
5472
 
 
5473
  Info& pinfo = peer_info[backfill_target];
 
5474
  BackfillInterval& pbi = peer_backfill_info;
 
5475
 
 
5476
  // Initialize from prior backfill state
 
5477
  if (pbi.begin < pinfo.last_backfill) {
 
5478
    pbi.reset(pinfo.last_backfill);
 
5479
    backfill_info.reset(pinfo.last_backfill);
 
5480
  }
 
5481
 
 
5482
  dout(10) << " peer osd." << backfill_target
 
5483
           << " pos " << backfill_pos
 
5484
           << " info " << pinfo
 
5485
           << " interval " << pbi.begin << "-" << pbi.end
 
5486
           << " " << pbi.objects.size() << " objects" << dendl;
 
5487
 
 
5488
  int local_min = osd->store->get_ideal_list_min();
 
5489
  int local_max = osd->store->get_ideal_list_max();
 
5490
 
 
5491
  // re-scan our local interval to cope with recent changes
 
5492
  // FIXME: we could track the eversion_t when we last scanned, and invalidate
 
5493
  // that way.  or explicitly modify/invalidate when we actually change specific
 
5494
  // objects.
 
5495
  dout(10) << " rescanning local backfill_info from " << backfill_pos << dendl;
 
5496
  backfill_info.clear();
 
5497
  osr.flush();
 
5498
  scan_range(backfill_pos, local_min, local_max, &backfill_info);
 
5499
 
 
5500
  int ops = 0;
 
5501
  map<hobject_t, pair<eversion_t, eversion_t> > to_push;
 
5502
  map<hobject_t, eversion_t> to_remove;
 
5503
  set<hobject_t> add_to_stat;
 
5504
 
 
5505
  pbi.trim();
 
5506
  backfill_info.trim();
 
5507
 
 
5508
  while (ops < max) {
 
5509
    if (backfill_info.begin <= pbi.begin &&
 
5510
        !backfill_info.extends_to_end() && backfill_info.empty()) {
 
5511
      osr.flush();
 
5512
      scan_range(backfill_info.end, local_min, local_max, &backfill_info);
 
5513
      backfill_info.trim();
 
5514
    }
 
5515
    backfill_pos = backfill_info.begin > pbi.begin ? pbi.begin : backfill_info.begin;
 
5516
 
 
5517
    dout(20) << "   my backfill " << backfill_info.begin << "-" << backfill_info.end
 
5518
             << " " << backfill_info.objects << dendl;
 
5519
    dout(20) << " peer backfill " << pbi.begin << "-" << pbi.end << " " << pbi.objects << dendl;
 
5520
 
 
5521
    if (pbi.begin <= backfill_info.begin &&
 
5522
        !pbi.extends_to_end() && pbi.empty()) {
 
5523
      dout(10) << " scanning peer osd." << backfill_target << " from " << pbi.end << dendl;
 
5524
      epoch_t e = get_osdmap()->get_epoch();
 
5525
      MOSDPGScan *m = new MOSDPGScan(MOSDPGScan::OP_SCAN_GET_DIGEST, e, e, info.pgid,
 
5526
                                     pbi.end, hobject_t());
 
5527
      osd->cluster_messenger->send_message(m, get_osdmap()->get_cluster_inst(backfill_target));
 
5528
      waiting_on_backfill = true;
 
5529
      start_recovery_op(pbi.end);
 
5530
      ops++;
 
5531
      break;
 
5532
    }
 
5533
 
 
5534
    if (backfill_info.empty() && pbi.empty()) {
 
5535
      dout(10) << " reached end for both local and peer" << dendl;
 
5536
      break;
 
5537
    }
 
5538
 
 
5539
    if (pbi.begin < backfill_info.begin) {
 
5540
      dout(20) << " removing peer " << pbi.begin << dendl;
 
5541
      to_remove[pbi.begin] = pbi.objects.begin()->second;
 
5542
      pbi.pop_front();
 
5543
      ops++;
 
5544
    } else if (pbi.begin == backfill_info.begin) {
 
5545
      if (pbi.objects.begin()->second !=
 
5546
          backfill_info.objects.begin()->second) {
 
5547
        dout(20) << " replacing peer " << pbi.begin << " with local "
 
5548
                 << backfill_info.objects.begin()->second << dendl;
 
5549
        to_push[pbi.begin] = make_pair(backfill_info.objects.begin()->second,
 
5550
                                       pbi.objects.begin()->second);
 
5551
        ops++;
 
5552
      } else {
 
5553
        dout(20) << " keeping peer " << pbi.begin << " "
 
5554
                 << pbi.objects.begin()->second << dendl;
 
5555
      }
 
5556
      add_to_stat.insert(pbi.begin);
 
5557
      backfill_info.pop_front();
 
5558
      pbi.pop_front();
 
5559
    } else {
 
5560
      dout(20) << " pushing local " << backfill_info.begin << " "
 
5561
               << backfill_info.objects.begin()->second
 
5562
               << " to peer osd." << backfill_target << dendl;
 
5563
      to_push[backfill_info.begin] =
 
5564
        make_pair(backfill_info.objects.begin()->second,
 
5565
                  eversion_t());
 
5566
      add_to_stat.insert(backfill_info.begin);
 
5567
      backfill_info.pop_front();
 
5568
      ops++;
 
5569
    }
 
5570
  }
 
5571
  backfill_pos = backfill_info.begin > pbi.begin ? pbi.begin : backfill_info.begin;
 
5572
 
 
5573
  for (set<hobject_t>::iterator i = add_to_stat.begin();
 
5574
       i != add_to_stat.end();
 
5575
       ++i) {
 
5576
    ObjectContext *obc = get_object_context(*i, OLOC_BLANK, false);
 
5577
    pg_stat_t stat;
 
5578
    add_object_context_to_pg_stat(obc, &stat);
 
5579
    pending_backfill_updates[*i] = stat;
 
5580
    put_object_context(obc);
 
5581
  }
 
5582
  for (map<hobject_t, eversion_t>::iterator i = to_remove.begin();
 
5583
       i != to_remove.end();
 
5584
       ++i) {
 
5585
    send_remove_op(i->first, i->second, backfill_target);
 
5586
  }
 
5587
  for (map<hobject_t, pair<eversion_t, eversion_t> >::iterator i = to_push.begin();
 
5588
       i != to_push.end();
 
5589
       ++i) {
 
5590
    push_backfill_object(i->first, i->second.first, i->second.second, backfill_target);
 
5591
  }
 
5592
 
 
5593
  hobject_t bound = backfills_in_flight.size() ?
 
5594
    *(backfills_in_flight.begin()) : backfill_pos;
 
5595
  if (bound > pinfo.last_backfill) {
 
5596
    pinfo.last_backfill = bound;
 
5597
    for (map<hobject_t, pg_stat_t>::iterator i = pending_backfill_updates.begin();
 
5598
         i != pending_backfill_updates.end() && i->first < bound;
 
5599
         pending_backfill_updates.erase(i++)) {
 
5600
      pinfo.stats.add(i->second);
 
5601
    }
 
5602
    epoch_t e = get_osdmap()->get_epoch();
 
5603
    MOSDPGBackfill *m = NULL;
 
5604
    if (bound.is_max()) {
 
5605
      m = new MOSDPGBackfill(MOSDPGBackfill::OP_BACKFILL_FINISH, e, e, info.pgid);
 
5606
      if (info.stats.stats.sum.num_bytes != pinfo.stats.stats.sum.num_bytes)
 
5607
        osd->clog.error() << info.pgid << " backfill osd." << backfill_target << " stat mismatch on finish: "
 
5608
                          << "num_bytes " << pinfo.stats.stats.sum.num_bytes
 
5609
                          << " != expected " << info.stats.stats.sum.num_bytes << "\n";
 
5610
      if (info.stats.stats.sum.num_objects != pinfo.stats.stats.sum.num_objects)
 
5611
        osd->clog.error() << info.pgid << " backfill osd." << backfill_target << " stat mismatch on finish: "
 
5612
                          << "num_objects " << pinfo.stats.stats.sum.num_objects
 
5613
                          << " != expected " << info.stats.stats.sum.num_objects << "\n";
 
5614
      start_recovery_op(hobject_t::get_max());
 
5615
    } else {
 
5616
      m = new MOSDPGBackfill(MOSDPGBackfill::OP_BACKFILL_PROGRESS, e, e, info.pgid);
 
5617
    }
 
5618
    m->last_backfill = bound;
 
5619
    m->stats = pinfo.stats.stats;
 
5620
    osd->cluster_messenger->send_message(m, get_osdmap()->get_cluster_inst(backfill_target));
 
5621
  }
 
5622
 
 
5623
  dout(10) << " peer num_objects now " << pinfo.stats.stats.sum.num_objects
 
5624
           << " / " << info.stats.stats.sum.num_objects << dendl;
 
5625
  return ops;
 
5626
}
 
5627
 
 
5628
void ReplicatedPG::push_backfill_object(hobject_t oid, eversion_t v, eversion_t have, int peer)
 
5629
{
 
5630
  dout(10) << "push_backfill_object " << oid << " v " << v << " to osd." << peer << dendl;
 
5631
 
 
5632
  backfills_in_flight.insert(oid);
 
5633
 
 
5634
  start_recovery_op(oid);
 
5635
  ObjectContext *obc = get_object_context(oid, OLOC_BLANK, false);
 
5636
  obc->ondisk_read_lock();
 
5637
  push_to_replica(obc, oid, peer);
 
5638
  obc->ondisk_read_unlock();
 
5639
  put_object_context(obc);
 
5640
}
 
5641
 
 
5642
void ReplicatedPG::scan_range(hobject_t begin, int min, int max, BackfillInterval *bi)
 
5643
{
 
5644
  assert(is_locked());
 
5645
  dout(10) << "scan_range from " << begin << dendl;
 
5646
  bi->begin = begin;
 
5647
  bi->objects.clear();  // for good measure
 
5648
 
 
5649
  vector<hobject_t> ls;
 
5650
  ls.reserve(max);
 
5651
  int r = osd->store->collection_list_partial(coll, begin, min, max,
 
5652
                                              0, &ls, &bi->end);
 
5653
  assert(r >= 0);
 
5654
  dout(10) << " got " << ls.size() << " items, next " << bi->end << dendl;
 
5655
  dout(20) << ls << dendl;
 
5656
 
 
5657
  for (vector<hobject_t>::iterator p = ls.begin(); p != ls.end(); ++p) {
 
5658
    ObjectContext *obc = NULL;
 
5659
    if (is_primary())
 
5660
      obc = _lookup_object_context(*p);
 
5661
    if (obc) {
 
5662
      bi->objects[*p] = obc->obs.oi.version;
 
5663
      dout(20) << "  " << *p << " " << obc->obs.oi.version << dendl;
 
5664
    } else {
 
5665
      bufferlist bl;
 
5666
      int r = osd->store->getattr(coll, *p, OI_ATTR, bl);
 
5667
      assert(r >= 0);
 
5668
      object_info_t oi(bl);
 
5669
      bi->objects[*p] = oi.version;
 
5670
      dout(20) << "  " << *p << " " << oi.version << dendl;
 
5671
    }
 
5672
  }
 
5673
}
 
5674
 
 
5675
 
5132
5676
void ReplicatedPG::remove_object_with_snap_hardlinks(ObjectStore::Transaction& t, const hobject_t& soid)
5133
5677
{
5134
5678
  t.remove(coll, soid);
5155
5699
 
5156
5700
  assert(info.last_update >= log.tail);  // otherwise we need some help!
5157
5701
 
5158
 
  if (log.backlog) {
5159
 
 
5160
 
    // be thorough.
5161
 
    vector<hobject_t> ls;
5162
 
    osd->store->collection_list(coll, ls);
5163
 
 
5164
 
    set<hobject_t> s;   
5165
 
    for (vector<hobject_t>::iterator i = ls.begin();
5166
 
         i != ls.end();
5167
 
         i++)
5168
 
      if (i->snap == CEPH_NOSNAP)
5169
 
        s.insert(*i);
5170
 
 
5171
 
    dout(10) << " " << s.size() << " local objects" << dendl;
5172
 
 
5173
 
    set<hobject_t> did;
5174
 
    for (list<Log::Entry>::reverse_iterator p = log.log.rbegin();
5175
 
         p != log.log.rend();
5176
 
         p++) {
5177
 
      if (did.count(p->soid)) continue;
5178
 
      did.insert(p->soid);
5179
 
      
5180
 
      if (p->is_delete()) {
5181
 
        if (s.count(p->soid)) {
5182
 
          dout(10) << " deleting " << p->soid
5183
 
                   << " when " << p->version << dendl;
5184
 
          remove_object_with_snap_hardlinks(t, p->soid);
5185
 
        }
5186
 
        s.erase(p->soid);
5187
 
      } else {
5188
 
        // just leave old objects.. they're missing or whatever
5189
 
        s.erase(p->soid);
5190
 
      }
5191
 
    }
5192
 
 
5193
 
    for (set<hobject_t>::iterator i = s.begin(); 
5194
 
         i != s.end();
5195
 
         i++) {
5196
 
      dout(10) << " deleting stray " << *i << dendl;
5197
 
      remove_object_with_snap_hardlinks(t, *i);
5198
 
    }
5199
 
 
5200
 
  } else {
5201
 
    // just scan the log.
5202
 
    set<hobject_t> did;
5203
 
    for (list<Log::Entry>::reverse_iterator p = log.log.rbegin();
5204
 
         p != log.log.rend();
5205
 
         p++) {
5206
 
      if (did.count(p->soid))
5207
 
        continue;
5208
 
      did.insert(p->soid);
5209
 
 
5210
 
      if (p->is_delete()) {
5211
 
        dout(10) << " deleting " << p->soid
5212
 
                 << " when " << p->version << dendl;
5213
 
        remove_object_with_snap_hardlinks(t, p->soid);
5214
 
      } else {
5215
 
        // keep old(+missing) objects, just for kicks.
5216
 
      }
 
5702
  // just scan the log.
 
5703
  set<hobject_t> did;
 
5704
  for (list<Log::Entry>::reverse_iterator p = log.log.rbegin();
 
5705
       p != log.log.rend();
 
5706
       p++) {
 
5707
    if (did.count(p->soid))
 
5708
      continue;
 
5709
    did.insert(p->soid);
 
5710
 
 
5711
    if (p->is_delete()) {
 
5712
      dout(10) << " deleting " << p->soid
 
5713
               << " when " << p->version << dendl;
 
5714
      remove_object_with_snap_hardlinks(t, p->soid);
 
5715
    } else {
 
5716
      // keep old(+missing) objects, just for kicks.
5217
5717
    }
5218
5718
  }
5219
5719
}
5247
5747
       p++) {
5248
5748
    const hobject_t& soid = p->first;
5249
5749
    object_stat_sum_t stat;
5250
 
    stat.num_objects++;
 
5750
    if (soid.snap != CEPH_SNAPDIR)
 
5751
      stat.num_objects++;
5251
5752
 
5252
5753
    // new snapset?
5253
5754
    if (soid.snap == CEPH_SNAPDIR ||
5284
5785
             r != q->second.end();
5285
5786
             ++r) {
5286
5787
          stat.num_bytes -= r.get_len();
5287
 
          stat.num_kb -= SHIFT_ROUND_UP(r.get_start()+r.get_len(), 10) - (r.get_start() >> 10);
5288
5788
        }         
5289
5789
      }
5290
5790
    }
5291
 
    if (soid.snap == CEPH_SNAPDIR)
 
5791
    if (soid.snap == CEPH_SNAPDIR) {
 
5792
      string cat;
 
5793
      cstat.add(stat, cat);
5292
5794
      continue;
 
5795
    }
5293
5796
 
5294
5797
    // basic checks.
5295
5798
    if (p->second.attrs.count(OI_ATTR) == 0) {
5311
5814
    dout(20) << mode << "  " << soid << " " << oi << dendl;
5312
5815
 
5313
5816
    stat.num_bytes += p->second.size;
5314
 
    stat.num_kb += SHIFT_ROUND_UP(p->second.size, 10);
5315
5817
 
5316
5818
    //bufferlist data;
5317
5819
    //osd->store->read(c, poid, 0, 0, data);
5354
5856
  dout(10) << mode << " got "
5355
5857
           << cstat.sum.num_objects << "/" << info.stats.stats.sum.num_objects << " objects, "
5356
5858
           << cstat.sum.num_object_clones << "/" << info.stats.stats.sum.num_object_clones << " clones, "
5357
 
           << cstat.sum.num_bytes << "/" << info.stats.stats.sum.num_bytes << " bytes, "
5358
 
           << cstat.sum.num_kb << "/" << info.stats.stats.sum.num_kb << " kb."
 
5859
           << cstat.sum.num_bytes << "/" << info.stats.stats.sum.num_bytes << " bytes."
5359
5860
           << dendl;
5360
5861
 
5361
5862
  if (cstat.sum.num_objects != info.stats.stats.sum.num_objects ||
5362
5863
      cstat.sum.num_object_clones != info.stats.stats.sum.num_object_clones ||
5363
 
      cstat.sum.num_bytes != info.stats.stats.sum.num_bytes ||
5364
 
      cstat.sum.num_kb != info.stats.stats.sum.num_kb) {
 
5864
      cstat.sum.num_bytes != info.stats.stats.sum.num_bytes) {
5365
5865
    osd->clog.error() << info.pgid << " " << mode
5366
 
       << " stat mismatch, got "
5367
 
       << cstat.sum.num_objects << "/" << info.stats.stats.sum.num_objects << " objects, "
5368
 
       << cstat.sum.num_object_clones << "/" << info.stats.stats.sum.num_object_clones << " clones, "
5369
 
       << cstat.sum.num_bytes << "/" << info.stats.stats.sum.num_bytes << " bytes, "
5370
 
       << cstat.sum.num_kb << "/" << info.stats.stats.sum.num_kb << " kb.\n";
 
5866
                      << " stat mismatch, got "
 
5867
                      << cstat.sum.num_objects << "/" << info.stats.stats.sum.num_objects << " objects, "
 
5868
                      << cstat.sum.num_object_clones << "/" << info.stats.stats.sum.num_object_clones << " clones, "
 
5869
                      << cstat.sum.num_bytes << "/" << info.stats.stats.sum.num_bytes << " bytes.\n";
5371
5870
    errors++;
5372
5871
 
5373
5872
    if (repair) {
5377
5876
 
5378
5877
      // tell replicas
5379
5878
      for (unsigned i=1; i<acting.size(); i++) {
5380
 
        MOSDPGInfo *m = new MOSDPGInfo(osd->osdmap->get_epoch());
 
5879
        MOSDPGInfo *m = new MOSDPGInfo(get_osdmap()->get_epoch());
5381
5880
        m->pg_info.push_back(info);
5382
 
        osd->cluster_messenger->send_message(m, osd->osdmap->get_cluster_inst(acting[i]));
 
5881
        osd->cluster_messenger->send_message(m, get_osdmap()->get_cluster_inst(acting[i]));
5383
5882
      }
5384
5883
    }
5385
5884
  }