~ubuntu-branches/ubuntu/precise/ceph/precise

« back to all changes in this revision

Viewing changes to src/osd/ReplicatedPG.cc

  • Committer: Bazaar Package Importer
  • Author(s): Clint Byrum, Clint Byrum, Micah Gersten
  • Date: 2011-02-12 22:50:26 UTC
  • mfrom: (1.1.1 upstream)
  • Revision ID: james.westby@ubuntu.com-20110212225026-yyyw4tk0msgql3ul
Tags: 0.24.2-0ubuntu1
[ Clint Byrum <clint@ubuntu.com> ]
* New upstream release. (LP: #658670, LP: #684011)
* debian/patches/fix-mkcephfs.patch: dropped (applied upstream)
* Removed .la files from libceph1-dev, libcrush1-dev and 
  librados1-dev (per Debian policy v3.9.1 10.2).
* debian/control: adding pkg-config as a build dependency
* debian/control: depend on libcrypto++-dev instead of libssl-dev
* debian/watch: added watch file

[ Micah Gersten <micahg@ubuntu.com> ]
* debian/control: add Homepage

Show diffs side-by-side

added added

removed removed

Lines of Context:
11
11
 * 
12
12
 */
13
13
 
14
 
 
15
14
#include "ReplicatedPG.h"
16
15
#include "OSD.h"
17
16
#include "PGLS.h"
51
50
static const int LOAD_QUEUE_SIZE = 2;
52
51
static const int LOAD_HYBRID     = 3;
53
52
 
 
53
// Blank object locator
 
54
static const object_locator_t OLOC_BLANK;
54
55
 
55
56
// =======================
56
57
// pg changes
84
85
  assert(is_missing_object(soid));
85
86
 
86
87
  // we don't have it (yet).
87
 
  eversion_t v = missing.missing[soid].need;
88
 
  if (pulling.count(soid)) {
89
 
    dout(7) << "missing "
90
 
            << soid 
91
 
            << " v " << v
92
 
            << ", already pulling"
93
 
            << dendl;
94
 
  } else {
95
 
    dout(7) << "missing " 
96
 
            << soid 
97
 
            << " v " << v
98
 
            << ", pulling"
99
 
            << dendl;
 
88
  map<sobject_t, Missing::item>::const_iterator g = missing.missing.find(soid);
 
89
  assert(g != missing.missing.end());
 
90
  const eversion_t &v(g->second.need);
 
91
 
 
92
  map<sobject_t, pull_info_t>::const_iterator p = pulling.find(soid);
 
93
  if (p != pulling.end()) {
 
94
    dout(7) << "missing " << soid << " v " << v << ", already pulling." << dendl;
 
95
  }
 
96
  else if (missing_loc.find(soid) == missing_loc.end()) {
 
97
    dout(7) << "missing " << soid << " v " << v << ", is unfound." << dendl;
 
98
  }
 
99
  else {
 
100
    dout(7) << "missing " << soid << " v " << v << ", pulling." << dendl;
100
101
    pull(soid);
101
102
  }
102
103
  waiting_for_missing_object[soid].push_back(m);
137
138
 
138
139
 
139
140
// ==========================================================
140
 
 
141
 
/** preprocess_op - preprocess an op (before it gets queued).
142
 
 * fasttrack read
143
 
 */
144
 
bool ReplicatedPG::preprocess_op(MOSDOp *op, utime_t now)
145
 
{
146
 
#if 0
147
 
  // we only care about reads here on out..
148
 
  if (op->may_write() ||
149
 
      op->ops.size() < 1)
150
 
    return false;
151
 
 
152
 
  ceph_osd_op& readop = op->ops[0];
153
 
 
154
 
  object_t oid = op->get_oid();
155
 
  sobject_t soid(oid, op->get_snapid());
156
 
 
157
 
  // -- load balance reads --
158
 
  if (is_primary()) {
159
 
    // -- read on primary+acker ---
160
 
    
161
 
    // test
162
 
    if (false) {
163
 
      if (acting.size() > 1) {
164
 
        int peer = acting[1];
165
 
        dout(-10) << "preprocess_op fwd client read op to osd" << peer
166
 
                  << " for " << op->get_orig_source() << " " << op->get_orig_source_inst() << dendl;
167
 
        osd->messenger->forward_message(op, osd->osdmap->get_inst(peer));
168
 
        return true;
169
 
      }
170
 
    }
171
 
    
172
 
#if 0
173
 
    // -- balance reads?
174
 
    if (g_conf.osd_balance_reads &&
175
 
        !op->get_source().is_osd()) {
176
 
      // flash crowd?
177
 
      bool is_flash_crowd_candidate = false;
178
 
      if (g_conf.osd_flash_crowd_iat_threshold > 0) {
179
 
        osd->iat_averager.add_sample( oid, (double)g_clock.now() );
180
 
        is_flash_crowd_candidate = osd->iat_averager.is_flash_crowd_candidate( oid );
181
 
      }
182
 
 
183
 
      // hot?
184
 
      double temp = 0;
185
 
      if (stat_object_temp_rd.count(soid))
186
 
        temp = stat_object_temp_rd[soid].get(op->get_recv_stamp());
187
 
      bool is_hotly_read = temp > g_conf.osd_balance_reads_temp;
188
 
 
189
 
      dout(20) << "balance_reads oid " << oid << " temp " << temp 
190
 
                << (is_hotly_read ? " hotly_read":"")
191
 
                << (is_flash_crowd_candidate ? " flash_crowd_candidate":"")
192
 
                << dendl;
193
 
 
194
 
      bool should_balance = is_flash_crowd_candidate || is_hotly_read;
195
 
      bool is_balanced = false;
196
 
      bool b;
197
 
      // *** FIXME *** this may block, and we're in the fast path! ***
198
 
      if (g_conf.osd_balance_reads &&
199
 
          osd->store->getattr(info.pgid.to_coll(), soid, "balance-reads", &b, 1) >= 0)
200
 
        is_balanced = true;
201
 
      
202
 
      if (!is_balanced && should_balance &&
203
 
          balancing_reads.count(soid) == 0) {
204
 
        dout(-10) << "preprocess_op balance-reads on " << oid << dendl;
205
 
        balancing_reads.insert(soid);
206
 
        ceph_object_layout layout;
207
 
        layout.ol_pgid = info.pgid.u.pg64;
208
 
        layout.ol_stripe_unit = 0;
209
 
        MOSDOp *pop = new MOSDOp(0, osd->get_tid(),
210
 
                                 oid,
211
 
                                 layout,
212
 
                                 osd->osdmap->get_epoch(),
213
 
                                 CEPH_OSD_FLAG_MODIFY);
214
 
        pop->add_simple_op(CEPH_OSD_OP_BALANCEREADS, 0, 0);
215
 
        do_op(pop);
216
 
      }
217
 
      if (is_balanced && !should_balance &&
218
 
          !unbalancing_reads.count(soid) == 0) {
219
 
        dout(-10) << "preprocess_op unbalance-reads on " << oid << dendl;
220
 
        unbalancing_reads.insert(soid);
221
 
        ceph_object_layout layout;
222
 
        layout.ol_pgid = info.pgid.u.pg64;
223
 
        layout.ol_stripe_unit = 0;
224
 
        MOSDOp *pop = new MOSDOp(0, osd->get_tid(),
225
 
                                 oid,
226
 
                                 layout,
227
 
                                 osd->osdmap->get_epoch(),
228
 
                                 CEPH_OSD_FLAG_MODIFY);
229
 
        pop->add_simple_op(CEPH_OSD_OP_UNBALANCEREADS, 0, 0);
230
 
        do_op(pop);
231
 
      }
232
 
    }
233
 
#endif
234
 
    
235
 
    // -- read shedding
236
 
    if (g_conf.osd_shed_reads &&
237
 
        g_conf.osd_stat_refresh_interval > 0 &&
238
 
        !op->get_source().is_osd()) {        // no re-shedding!
239
 
      Mutex::Locker lock(osd->peer_stat_lock);
240
 
 
241
 
      osd->_refresh_my_stat(now);
242
 
 
243
 
      // check my load. 
244
 
      // TODO xxx we must also compare with our own load
245
 
      // if i am x percentage higher than replica , 
246
 
      // redirect the read 
247
 
 
248
 
      int shedto = -1;
249
 
      double bestscore = 0.0;  // highest positive score wins
250
 
      
251
 
      // we calculate score values such that we can interpret them as a probability.
252
 
 
253
 
      switch (g_conf.osd_shed_reads) {
254
 
      case LOAD_LATENCY:
255
 
        // above some minimum?
256
 
        if (osd->my_stat.read_latency >= g_conf.osd_shed_reads_min_latency) {  
257
 
          for (unsigned i=1; i<acting.size(); ++i) {
258
 
            int peer = acting[i];
259
 
            if (osd->peer_stat.count(peer) == 0) continue;
260
 
 
261
 
            // assume a read_latency of 0 (technically, undefined) is OK, since
262
 
            // we'll be corrected soon enough if we're wrong.
263
 
 
264
 
            double plat = osd->peer_stat[peer].read_latency_mine;
265
 
 
266
 
            double diff = osd->my_stat.read_latency - plat;
267
 
            if (diff < g_conf.osd_shed_reads_min_latency_diff) continue;
268
 
 
269
 
            double c = .002; // add in a constant to smooth it a bit
270
 
            double latratio = 
271
 
              (c+osd->my_stat.read_latency) /   
272
 
              (c+plat);
273
 
            double p = (latratio - 1.0) / 2.0 / latratio;
274
 
            dout(15) << "preprocess_op " << op->get_reqid() 
275
 
                      << " my read latency " << osd->my_stat.read_latency
276
 
                      << ", peer osd" << peer << " is " << plat << " (" << osd->peer_stat[peer].read_latency << ")"
277
 
                      << ", latratio " << latratio
278
 
                      << ", p=" << p
279
 
                      << dendl;
280
 
            if (latratio > g_conf.osd_shed_reads_min_latency_ratio &&
281
 
                p > bestscore &&
282
 
                drand48() < p) {
283
 
              shedto = peer;
284
 
              bestscore = p;
285
 
            }
286
 
          }
287
 
        }
288
 
        break;
289
 
        
290
 
      case LOAD_HYBRID:
291
 
        // dumb mostly
292
 
        if (osd->my_stat.read_latency >= g_conf.osd_shed_reads_min_latency) {
293
 
          for (unsigned i=1; i<acting.size(); ++i) {
294
 
            int peer = acting[i];
295
 
            if (osd->peer_stat.count(peer) == 0/* ||
296
 
                osd->peer_stat[peer].read_latency <= 0*/) continue;
297
 
 
298
 
            if (osd->peer_stat[peer].qlen < osd->my_stat.qlen) {
299
 
              
300
 
              if (osd->my_stat.read_latency - osd->peer_stat[peer].read_latency >
301
 
                  g_conf.osd_shed_reads_min_latency_diff) continue;
302
 
 
303
 
              double qratio = osd->pending_ops / osd->peer_stat[peer].qlen;
304
 
              
305
 
              double c = .002; // add in a constant to smooth it a bit
306
 
              double latratio = 
307
 
                (c+osd->my_stat.read_latency)/   
308
 
                (c+osd->peer_stat[peer].read_latency);
309
 
              double p = (latratio - 1.0) / 2.0 / latratio;
310
 
              
311
 
              dout(-15) << "preprocess_op " << op->get_reqid() 
312
 
                        << " my qlen / rdlat " 
313
 
                        << osd->pending_ops << " " << osd->my_stat.read_latency
314
 
                        << ", peer osd" << peer << " is "
315
 
                        << osd->peer_stat[peer].qlen << " " << osd->peer_stat[peer].read_latency
316
 
                        << ", qratio " << qratio
317
 
                        << ", latratio " << latratio
318
 
                        << ", p=" << p
319
 
                        << dendl;
320
 
              if (latratio > g_conf.osd_shed_reads_min_latency_ratio &&
321
 
                  p > bestscore &&
322
 
                  drand48() < p) {
323
 
                shedto = peer;
324
 
                bestscore = p;
325
 
              }
326
 
            }
327
 
          }
328
 
        }
329
 
        break;
330
 
 
331
 
        /*
332
 
      case LOAD_QUEUE_SIZE:
333
 
        // am i above my average?  -- dumb
334
 
        if (osd->pending_ops > osd->my_stat.qlen) {
335
 
          // yes. is there a peer who is below my average?
336
 
          for (unsigned i=1; i<acting.size(); ++i) {
337
 
            int peer = acting[i];
338
 
            if (osd->peer_stat.count(peer) == 0) continue;
339
 
            if (osd->peer_stat[peer].qlen < osd->my_stat.qlen) {
340
 
              // calculate a probability that we should redirect
341
 
              float p = (osd->my_stat.qlen - osd->peer_stat[peer].qlen) / osd->my_stat.qlen;  // this is dumb.
342
 
              float v = 1.0 - p;
343
 
              
344
 
              dout(10) << "my qlen " << osd->pending_ops << " > my_avg " << osd->my_stat.qlen
345
 
                       << ", peer osd" << peer << " has qlen " << osd->peer_stat[peer].qlen
346
 
                       << ", p=" << p
347
 
                       << ", v= "<< v
348
 
                       << dendl;
349
 
              
350
 
              if (v > bestscore) {
351
 
                shedto = peer;
352
 
                bestscore = v;
353
 
              }
354
 
            }
355
 
          }
356
 
        }
357
 
        break;*/
358
 
 
359
 
      }
360
 
        
361
 
      // shed?
362
 
      if (shedto >= 0) {
363
 
        dout(10) << "preprocess_op shedding read to peer osd" << shedto
364
 
                  << " " << op->get_reqid()
365
 
                  << dendl;
366
 
        op->set_peer_stat(osd->my_stat);
367
 
        osd->messenger->forward_message(op, osd->osdmap->get_inst(shedto));
368
 
        osd->stat_rd_ops_shed_out++;
369
 
        osd->logger->inc(l_osd_shdout);
370
 
        return true;
371
 
      }
372
 
    }
373
 
  } // endif balance reads
374
 
 
375
 
 
376
 
  // -- fastpath read?
377
 
  // if this is a read and the data is in the cache, do an immediate read.. 
378
 
  if ( g_conf.osd_immediate_read_from_cache ) {
379
 
    if (osd->store->is_cached(info.pgid.to_coll(), soid,
380
 
                              readop.extent.offset,
381
 
                              readop.length) == 0) {
382
 
      if (!is_primary() && !op->get_source().is_osd()) {
383
 
        // am i allowed?
384
 
        bool v;
385
 
        if (osd->store->getattr(info.pgid.to_coll(), soid, "balance-reads", &v, 1) < 0) {
386
 
          dout(-10) << "preprocess_op in-cache but no balance-reads on " << oid
387
 
                    << ", fwd to primary" << dendl;
388
 
          osd->messenger->forward_message(op, osd->osdmap->get_inst(get_primary()));
389
 
          return true;
390
 
        }
391
 
      }
392
 
 
393
 
      // do it now
394
 
      dout(10) << "preprocess_op data is in cache, reading from cache" << *op <<  dendl;
395
 
      do_op(op);
396
 
      return true;
397
 
    }
398
 
  }
399
 
#endif
400
 
  return false;
401
 
}
402
 
 
403
141
void ReplicatedPG::do_pg_op(MOSDOp *op)
404
142
{
405
 
  dout(0) << "do_pg_op " << *op << dendl;
 
143
  dout(10) << "do_pg_op " << *op << dendl;
406
144
 
407
145
  bufferlist outdata;
408
146
  int result = 0;
421
159
        PGLSResponse response;
422
160
        response.handle = (collection_list_handle_t)(uint64_t)(p->op.pgls.cookie);
423
161
        vector<sobject_t> sentries;
424
 
        result = osd->store->collection_list_partial(coll_t::build_pg_coll(info.pgid), snapid,
 
162
        result = osd->store->collection_list_partial(coll_t(info.pgid), snapid,
425
163
                                                     sentries, p->op.pgls.count,
426
164
                                                     &response.handle);
427
165
        if (result == 0) {
435
173
              // skip items not defined for this snapshot
436
174
              if (iter->snap == CEPH_NOSNAP) {
437
175
                bufferlist bl;
438
 
                osd->store->getattr(coll_t::build_pg_coll(info.pgid), *iter, SS_ATTR, bl);
 
176
                osd->store->getattr(coll_t(info.pgid), *iter, SS_ATTR, bl);
439
177
                SnapSet snapset(bl);
440
178
                if (snapid <= snapset.seq)
441
179
                  continue;
442
180
              } else {
443
181
                bufferlist bl;
444
 
                osd->store->getattr(coll_t::build_pg_coll(info.pgid), *iter, OI_ATTR, bl);
 
182
                osd->store->getattr(coll_t(info.pgid), *iter, OI_ATTR, bl);
445
183
                object_info_t oi(bl);
446
184
                bool exists = false;
447
185
                for (vector<snapid_t>::iterator i = oi.snaps.begin(); i != oi.snaps.end(); ++i)
473
211
                                       CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK); 
474
212
  reply->set_data(outdata);
475
213
  reply->set_result(result);
476
 
  osd->messenger->send_message(reply, op->get_connection());
 
214
  osd->client_messenger->send_message(reply, op->get_connection());
477
215
  op->put();
478
216
}
479
217
 
480
218
void ReplicatedPG::calc_trim_to()
481
219
{
482
 
  if (!is_degraded() &&
 
220
  if (!is_degraded() && !is_scrubbing() &&
483
221
      (is_clean() ||
484
222
       log.head.version - log.tail.version > info.stats.num_objects)) {
485
223
    if (min_last_complete_ondisk != eversion_t() &&
488
226
      pg_trim_to = min_last_complete_ondisk;
489
227
      assert(pg_trim_to <= log.head);
490
228
    }
491
 
  } else
 
229
  } else {
 
230
    // don't trim
492
231
    pg_trim_to = eversion_t();
 
232
  }
493
233
}
494
234
 
495
235
/** do_op - do an op
502
242
    return do_pg_op(op);
503
243
 
504
244
  dout(10) << "do_op " << *op << dendl;
 
245
  if (finalizing_scrub && op->may_write()) {
 
246
    dout(20) << __func__ << ": waiting for scrub" << dendl;
 
247
    waiting_for_active.push_back(op);
 
248
    return;
 
249
  }
505
250
 
506
251
  entity_inst_t client = op->get_source_inst();
507
252
 
508
253
  ObjectContext *obc;
509
254
  bool can_create = op->may_write();
510
 
  int r = find_object_context(op->get_oid(), op->get_snapid(), &obc, can_create);
 
255
  snapid_t snapid;
 
256
  int r = find_object_context(op->get_oid(), op->get_object_locator(),
 
257
                              op->get_snapid(), &obc, can_create, &snapid);
511
258
  if (r) {
512
 
    assert(r != -EAGAIN);
 
259
    if (r == -EAGAIN) {
 
260
      // missing the specific snap we need; requeue and wait.
 
261
      assert(!can_create); // only happens on a read
 
262
      sobject_t soid(op->get_oid(), snapid);
 
263
      wait_for_missing_object(soid, op);
 
264
      return;
 
265
    }
513
266
    osd->reply_op_error(op, r);
514
267
    return;
515
268
  }    
 
269
 
 
270
  if ((op->may_read()) && (obc->obs.oi.lost)) {
 
271
    // This object is lost. Reading from it returns an error.
 
272
    dout(20) << __func__ << ": object " << obc->obs.oi.soid
 
273
             << " is lost" << dendl;
 
274
    osd->reply_op_error(op, -ENFILE);
 
275
    return;
 
276
  }
 
277
  dout(25) << __func__ << ": object " << obc->obs.oi.soid
 
278
           << " has oi of " << obc->obs.oi << dendl;
516
279
  
517
280
  bool ok;
518
281
  dout(10) << "do_op mode is " << mode << dendl;
570
333
      dout(3) << "do_op dup " << ctx->reqid << " was " << oldv << dendl;
571
334
      delete ctx;
572
335
      put_object_context(obc);
573
 
      if (oldv >= last_update_ondisk) {
 
336
      if (oldv <= last_update_ondisk) {
574
337
        osd->reply_op_error(op, 0);
575
338
      } else {
576
339
        dout(10) << " waiting for " << oldv << " to commit" << dendl;
623
386
      obc->ondisk_read_unlock();
624
387
    }
625
388
 
626
 
    if (result == -EAGAIN)
 
389
    if (result == -EAGAIN) // must have referenced non-existent class
627
390
      return;
628
391
 
629
392
    // prepare the reply
639
402
      MOSDOpReply *reply = ctx->reply;
640
403
      ctx->reply = NULL;
641
404
      reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK);
642
 
      osd->messenger->send_message(reply, op->get_connection());
 
405
      osd->client_messenger->send_message(reply, op->get_connection());
643
406
      op->put();
644
407
      delete ctx;
645
408
      put_object_context(obc);
658
421
  }
659
422
  
660
423
  // continuing on to write path, make sure object context is registered
661
 
  register_object_context(obc);
 
424
  assert(obc->registered);
662
425
 
663
426
  // issue replica writes
664
427
  tid_t rep_tid = osd->get_tid();
712
475
  if (op->ops.size() >= 1) {
713
476
    OSDOp& first = op->ops[0];
714
477
    switch (first.op.op) {
715
 
      // rep stuff
716
478
    case CEPH_OSD_OP_PULL:
717
479
      sub_op_pull(op);
718
480
      return;
722
484
    case CEPH_OSD_OP_SCRUB:
723
485
      sub_op_scrub(op);
724
486
      return;
 
487
    case CEPH_OSD_OP_SCRUB_RESERVE:
 
488
      sub_op_scrub_reserve(op);
 
489
      return;
 
490
    case CEPH_OSD_OP_SCRUB_UNRESERVE:
 
491
      sub_op_scrub_unreserve(op);
 
492
      return;
 
493
    case CEPH_OSD_OP_SCRUB_STOP:
 
494
      sub_op_scrub_stop(op);
 
495
      return;
725
496
    }
726
497
  }
727
498
 
732
503
{
733
504
  if (r->ops.size() >= 1) {
734
505
    OSDOp& first = r->ops[0];
735
 
    if (first.op.op == CEPH_OSD_OP_PUSH) {
 
506
    switch (first.op.op) {
 
507
    case CEPH_OSD_OP_PUSH:
736
508
      // continue peer recovery
737
509
      sub_op_push_reply(r);
738
510
      return;
739
 
    }
740
 
    if (first.op.op == CEPH_OSD_OP_SCRUB) {
 
511
 
 
512
    case CEPH_OSD_OP_SCRUB:
741
513
      sub_op_scrub_reply(r);
742
514
      return;
 
515
 
 
516
    case CEPH_OSD_OP_SCRUB_RESERVE:
 
517
      sub_op_scrub_reserve_reply(r);
 
518
      return;
743
519
    }
744
520
  }
745
521
 
749
525
 
750
526
bool ReplicatedPG::snap_trimmer()
751
527
{
 
528
  assert(is_primary() && is_clean());
752
529
  lock();
753
530
  dout(10) << "snap_trimmer start, purged_snaps " << info.purged_snaps << dendl;
754
531
 
 
532
  interval_set<snapid_t> s;
 
533
  s.intersection_of(snap_trimq, info.purged_snaps);
 
534
  if (!s.empty()) {
 
535
    dout(0) << "WARNING - snap_trimmer: snap_trimq contained snaps already in "
 
536
            << "purged_snaps" << dendl;
 
537
    snap_trimq.subtract(s);
 
538
  }
 
539
 
 
540
 
 
541
  epoch_t current_set_started = info.history.last_epoch_started;
 
542
 
755
543
  while (snap_trimq.size() &&
756
 
         is_primary() &&
 
544
         current_set_started == info.history.last_epoch_started &&
757
545
         is_active()) {
758
546
 
759
 
    snapid_t sn = snap_trimq.start();
760
 
    coll_t c = coll_t::build_snap_pg_coll(info.pgid, sn);
 
547
    snapid_t sn = snap_trimq.range_start();
 
548
    coll_t c(info.pgid, sn);
 
549
    if (!snap_collections.contains(sn)) {
 
550
      // adjust pg info
 
551
      info.purged_snaps.insert(sn);
 
552
      snap_trimq.erase(sn);
 
553
      dout(10) << "purged_snaps now " << info.purged_snaps << ", snap_trimq now " << snap_trimq << dendl;
 
554
      continue;
 
555
    }
761
556
    vector<sobject_t> ls;
762
557
    osd->store->collection_list(c, ls);
763
558
 
770
565
      if (!mode.try_write(nobody)) {
771
566
        dout(10) << " can't write, waiting" << dendl;
772
567
        Cond cond;
773
 
        mode.waiting_cond.push_back(&cond);
 
568
        list<Cond*>::iterator q = mode.waiting_cond.insert(mode.waiting_cond.end(), &cond);
774
569
        while (!mode.try_write(nobody))
775
570
          cond.Wait(_lock);
 
571
        mode.waiting_cond.erase(q);
776
572
        dout(10) << " done waiting" << dendl;
777
 
        if (!is_primary() || !is_active())
 
573
        if (!(current_set_started == info.history.last_epoch_started &&
 
574
              is_active())) {
778
575
          break;
 
576
        }
779
577
      }
780
578
 
781
579
      // load clone info
782
580
      bufferlist bl;
783
 
      osd->store->getattr(coll_t::build_pg_coll(info.pgid), coid, OI_ATTR, bl);
784
 
      object_info_t coi(bl);
 
581
      ObjectContext *obc;
 
582
      int r = find_object_context(coid.oid, OLOC_BLANK, sn, &obc, false, NULL);
 
583
      assert(r == 0);
 
584
      assert(obc->registered);
 
585
      object_info_t &coi = obc->obs.oi;
 
586
      vector<snapid_t>& snaps = coi.snaps;
785
587
 
786
588
      // get snap set context
787
 
      SnapSetContext *ssc = get_snapset_context(coid.oid, false);
 
589
      if (!obc->obs.ssc)
 
590
        obc->obs.ssc = get_snapset_context(coid.oid, false);
 
591
      SnapSetContext *ssc = obc->obs.ssc;
788
592
      assert(ssc);
789
593
      SnapSet& snapset = ssc->snapset;
790
 
      vector<snapid_t>& snaps = coi.snaps;
791
594
 
792
595
      dout(10) << coid << " snaps " << snaps << " old snapset " << snapset << dendl;
793
596
      assert(snapset.seq);
794
597
 
795
 
      // set up repop
796
 
      ObjectContext *obc;
797
 
      int r = find_object_context(coid.oid, sn, &obc, false);
798
 
      assert(r == 0);
799
 
      register_object_context(obc);
800
 
 
801
598
      vector<OSDOp> ops;
802
599
      tid_t rep_tid = osd->get_tid();
803
 
      osd_reqid_t reqid(osd->messenger->get_myname(), 0, rep_tid);
 
600
      osd_reqid_t reqid(osd->cluster_messenger->get_myname(), 0, rep_tid);
804
601
      OpContext *ctx = new OpContext(NULL, reqid, ops, &obc->obs, this);
805
602
      ctx->mtime = g_clock.now();
806
603
 
825
622
      if (newsnaps.empty()) {
826
623
        // remove clone
827
624
        dout(10) << coid << " snaps " << snaps << " -> " << newsnaps << " ... deleting" << dendl;
828
 
        t->remove(coll_t::build_pg_coll(info.pgid), coid);
829
 
        t->collection_remove(coll_t::build_snap_pg_coll(info.pgid, snaps[0]), coid);
 
625
        t->remove(coll_t(info.pgid), coid);
 
626
        t->collection_remove(coll_t(info.pgid, snaps[0]), coid);
830
627
        if (snaps.size() > 1)
831
 
          t->collection_remove(coll_t::build_snap_pg_coll(info.pgid, snaps[snaps.size()-1]), coid);
 
628
          t->collection_remove(coll_t(info.pgid, snaps[snaps.size()-1]), coid);
832
629
 
833
630
        // ...from snapset
834
631
        snapid_t last = coid.snap;
861
658
        // save adjusted snaps for this object
862
659
        dout(10) << coid << " snaps " << snaps << " -> " << newsnaps << dendl;
863
660
        coi.snaps.swap(newsnaps);
 
661
        coi.prior_version = coi.version;
 
662
        coi.version = ctx->at_version;
864
663
        bl.clear();
865
664
        ::encode(coi, bl);
866
 
        t->setattr(coll_t::build_pg_coll(info.pgid), coid, OI_ATTR, bl);
 
665
        t->setattr(coll_t(info.pgid), coid, OI_ATTR, bl);
867
666
 
868
667
        if (snaps[0] != newsnaps[0]) {
869
 
          t->collection_remove(coll_t::build_snap_pg_coll(info.pgid, snaps[0]), coid);
870
 
          t->collection_add(coll_t::build_snap_pg_coll(info.pgid, newsnaps[0]), coll_t::build_pg_coll(info.pgid), coid);
 
668
          t->collection_remove(coll_t(info.pgid, snaps[0]), coid);
 
669
          t->collection_add(coll_t(info.pgid, newsnaps[0]), coll_t(info.pgid), coid);
871
670
        }
872
671
        if (snaps.size() > 1 && snaps[snaps.size()-1] != newsnaps[newsnaps.size()-1]) {
873
 
          t->collection_remove(coll_t::build_snap_pg_coll(info.pgid, snaps[snaps.size()-1]), coid);
 
672
          t->collection_remove(coll_t(info.pgid, snaps[snaps.size()-1]), coid);
874
673
          if (newsnaps.size() > 1)
875
 
            t->collection_add(coll_t::build_snap_pg_coll(info.pgid, newsnaps[newsnaps.size()-1]), coll_t::build_pg_coll(info.pgid), coid);
 
674
            t->collection_add(coll_t(info.pgid, newsnaps[newsnaps.size()-1]), coll_t(info.pgid), coid);
876
675
        }             
877
676
 
878
 
        ctx->log.push_back(Log::Entry(Log::Entry::MODIFY, coid, ctx->at_version, ctx->obs->oi.version,
 
677
        ctx->log.push_back(Log::Entry(Log::Entry::MODIFY, coid, coi.version, coi.prior_version,
879
678
                                      osd_reqid_t(), ctx->mtime));
880
679
        ctx->at_version.version++;
881
680
      }
884
683
      dout(10) << coid << " new snapset " << snapset << dendl;
885
684
 
886
685
      sobject_t snapoid(coid.oid, snapset.head_exists ? CEPH_NOSNAP:CEPH_SNAPDIR);
887
 
      ctx->snapset_obc = get_object_context(snapoid, false);
888
 
      register_object_context(ctx->snapset_obc);
 
686
      ctx->snapset_obc = get_object_context(snapoid, coi.oloc, false);
 
687
      assert(ctx->snapset_obc->registered);
889
688
      if (snapset.clones.empty() && !snapset.head_exists) {
890
689
        dout(10) << coid << " removing " << snapoid << dendl;
891
690
        ctx->log.push_back(Log::Entry(Log::Entry::DELETE, snapoid, ctx->at_version, 
892
691
                                      ctx->snapset_obc->obs.oi.version, osd_reqid_t(), ctx->mtime));
893
692
        ctx->snapset_obc->obs.exists = false;
894
693
 
895
 
        t->remove(coll_t::build_pg_coll(info.pgid), snapoid);
 
694
        t->remove(coll_t(info.pgid), snapoid);
896
695
      } else {
897
696
        dout(10) << coid << " updating snapset on " << snapoid << dendl;
898
697
        ctx->log.push_back(Log::Entry(Log::Entry::MODIFY, snapoid, ctx->at_version, 
903
702
 
904
703
        bl.clear();
905
704
        ::encode(snapset, bl);
906
 
        t->setattr(coll_t::build_pg_coll(info.pgid), snapoid, SS_ATTR, bl);
 
705
        t->setattr(coll_t(info.pgid), snapoid, SS_ATTR, bl);
907
706
 
908
707
        bl.clear();
909
708
        ::encode(ctx->snapset_obc->obs.oi, bl);
910
 
        t->setattr(coll_t::build_pg_coll(info.pgid), snapoid, OI_ATTR, bl);
 
709
        t->setattr(coll_t(info.pgid), snapoid, OI_ATTR, bl);
911
710
      }
912
711
 
913
712
      log_op(ctx->log, eversion_t(), ctx->local_t);
924
723
      // give other threads a chance at this pg
925
724
      unlock();
926
725
      lock();
 
726
      if (!(current_set_started == info.history.last_epoch_started &&
 
727
            is_active())) {
 
728
        break;
 
729
      }
927
730
    }
928
731
 
929
732
    // adjust pg info
939
742
    t->remove_collection(c);
940
743
    int tr = osd->store->queue_transaction(&osr, t);
941
744
    assert(tr == 0);
 
745
 
942
746
 
943
 
    // share new PG::Info with replicas
944
 
    for (unsigned i=1; i<acting.size(); i++) {
945
 
      int peer = acting[i];
946
 
      MOSDPGInfo *m = new MOSDPGInfo(osd->osdmap->get_epoch());
947
 
      m->pg_info.push_back(info);
948
 
      osd->messenger->send_message(m, osd->osdmap->get_inst(peer));
949
 
    }
 
747
    unlock();
 
748
    osd->map_lock.get_read();
 
749
    lock();
 
750
    share_pg_info();
 
751
    unlock();
 
752
    osd->map_lock.put_read();
 
753
 
 
754
    // flush, to make sure the collection adjustments we just made are
 
755
    // reflected when we scan the next collection set.
 
756
    osd->store->flush();
 
757
    lock();
950
758
  }  
951
759
 
952
760
  // done
1074
882
 
1075
883
    case CEPH_OSD_OP_READ:
1076
884
      {
1077
 
        dout(0) << "CEPH_OSD_OP_READ" << dendl;
1078
885
        // read into a buffer
1079
886
        bufferlist bl;
1080
 
        int r = osd->store->read(coll_t::build_pg_coll(info.pgid), soid, op.extent.offset, op.extent.length, bl);
 
887
        int r = osd->store->read(coll_t(info.pgid), soid, op.extent.offset, op.extent.length, bl);
1081
888
        if (odata.length() == 0)
1082
889
          ctx->data_off = op.extent.offset;
1083
890
        odata.claim(bl);
1089
896
        }
1090
897
        info.stats.num_rd_kb += SHIFT_ROUND_UP(op.extent.length, 10);
1091
898
        info.stats.num_rd++;
1092
 
        dout(0) << " read got " << r << " / " << op.extent.length << " bytes from obj " << soid << dendl;
 
899
        dout(10) << " read got " << r << " / " << op.extent.length << " bytes from obj " << soid << dendl;
1093
900
 
1094
901
        __u32 seq = oi.truncate_seq;
1095
902
        // are we beyond truncate_size?
1106
913
          bufferlist keep;
1107
914
 
1108
915
          // keep first part of odata; trim at truncation point
1109
 
          dout(0) << " obj " << soid << " seq " << seq
 
916
          dout(10) << " obj " << soid << " seq " << seq
1110
917
                   << ": trimming overlap " << from << "~" << trim << dendl;
1111
918
          keep.substr_of(odata, 0, odata.length() - trim);
1112
919
          odata.claim(keep);
1114
921
      }
1115
922
      break;
1116
923
 
 
924
    /* map extents */
 
925
    case CEPH_OSD_OP_MAPEXT:
 
926
      {
 
927
        // read into a buffer
 
928
        bufferlist bl;
 
929
        int r = osd->store->fiemap(coll, soid, op.extent.offset, op.extent.length, bl);
 
930
/*
 
931
        if (odata.length() == 0)
 
932
          ctx->data_off = op.extent.offset; */
 
933
        odata.claim(bl);
 
934
        if (r < 0)
 
935
          result = r;
 
936
        info.stats.num_rd_kb += SHIFT_ROUND_UP(op.extent.length, 10);
 
937
        info.stats.num_rd++;
 
938
        dout(10) << " map_extents done on object " << soid << dendl;
 
939
      }
 
940
      break;
 
941
 
 
942
    /* map extents */
 
943
    case CEPH_OSD_OP_SPARSE_READ:
 
944
      {
 
945
        if (op.extent.truncate_seq) {
 
946
          dout(0) << "sparse_read does not support truncation sequence " << dendl;
 
947
          result = -EINVAL;
 
948
          break;
 
949
        }
 
950
        // read into a buffer
 
951
        bufferlist bl;
 
952
        int total_read = 0;
 
953
        int r = osd->store->fiemap(coll, soid, op.extent.offset, op.extent.length, bl);
 
954
        if (r < 0)  {
 
955
          result = r;
 
956
          break;
 
957
        }
 
958
        map<off_t, size_t> m;
 
959
        bufferlist::iterator iter = bl.begin();
 
960
        ::decode(m, iter);
 
961
        map<off_t, size_t>::iterator miter;
 
962
        bufferlist data_bl;
 
963
        for (miter = m.begin(); miter != m.end(); ++miter) {
 
964
          bufferlist tmpbl;
 
965
          r = osd->store->read(coll, soid, miter->first, miter->second, tmpbl);
 
966
          if (r < 0)
 
967
            break;
 
968
 
 
969
          if (r < (int)miter->second) /* this is usually happen when we get extent that exceeds the actual file size */
 
970
            miter->second = r;
 
971
          total_read += r;
 
972
          dout(10) << "sparse-read " << miter->first << "@" << miter->second << dendl;
 
973
          data_bl.claim_append(tmpbl);
 
974
        }
 
975
 
 
976
        if (r < 0) {
 
977
          result = r;
 
978
          break;
 
979
        }
 
980
 
 
981
        op.extent.length = total_read;
 
982
 
 
983
        ::encode(m, odata);
 
984
        ::encode(data_bl, odata);
 
985
 
 
986
        info.stats.num_rd_kb += SHIFT_ROUND_UP(op.extent.length, 10);
 
987
        info.stats.num_rd++;
 
988
 
 
989
        dout(10) << " sparse_read got " << total_read << " bytes from object " << soid << dendl;
 
990
      }
 
991
      break;
 
992
 
1117
993
    case CEPH_OSD_OP_CALL:
1118
994
      {
1119
995
        bufferlist indata;
1148
1024
      {
1149
1025
        struct stat st;
1150
1026
        memset(&st, 0, sizeof(st));
1151
 
        result = osd->store->stat(coll_t::build_pg_coll(info.pgid), soid, &st);
 
1027
        result = osd->store->stat(coll_t(info.pgid), soid, &st);
1152
1028
        if (result >= 0) {
1153
1029
          uint64_t size = st.st_size;
1154
1030
          ::encode(size, odata);
1163
1039
        string aname;
1164
1040
        bp.copy(op.xattr.name_len, aname);
1165
1041
        string name = "_" + aname;
1166
 
        int r = osd->store->getattr(coll_t::build_pg_coll(info.pgid), soid, name.c_str(), odata);
 
1042
        int r = osd->store->getattr(coll_t(info.pgid), soid, name.c_str(), odata);
1167
1043
        if (r >= 0) {
1168
1044
          op.xattr.value_len = r;
1169
1045
          result = 0;
1176
1052
   case CEPH_OSD_OP_GETXATTRS:
1177
1053
      {
1178
1054
        map<string,bufferptr> attrset;
1179
 
        result = osd->store->getattrs(coll_t::build_pg_coll(info.pgid), soid, attrset, true);
 
1055
        result = osd->store->getattrs(coll_t(info.pgid), soid, attrset, true);
1180
1056
        map<string, bufferptr>::iterator iter;
1181
1057
        map<string, bufferlist> newattrs;
1182
1058
        for (iter = attrset.begin(); iter != attrset.end(); ++iter) {
1193
1069
      
1194
1070
    case CEPH_OSD_OP_CMPXATTR:
1195
1071
      {
1196
 
        dout(0) << "CEPH_OSD_OP_CMPXATTR" << dendl;
1197
1072
        string aname;
1198
1073
        bp.copy(op.xattr.name_len, aname);
1199
1074
        string name = "_" + aname;
1200
1075
        name[op.xattr.name_len + 1] = 0;
1201
1076
        
1202
1077
        bufferlist xattr;
1203
 
        result = osd->store->getattr(coll_t::build_pg_coll(info.pgid), soid, name.c_str(), xattr);
 
1078
        result = osd->store->getattr(coll_t(info.pgid), soid, name.c_str(), xattr);
1204
1079
        if (result < 0 && result != -EEXIST && result !=-ENODATA)
1205
1080
          break;
1206
1081
        
1231
1106
        }
1232
1107
 
1233
1108
        if (!result) {
1234
 
          dout(0) << "comparison returned false" << dendl;
 
1109
          dout(10) << "comparison returned false" << dendl;
1235
1110
          result = -ECANCELED;
1236
1111
          break;
1237
1112
        }
1238
1113
        if (result < 0) {
1239
 
          dout(0) << "comparison returned " << result << " " << strerror(-result) << dendl;
 
1114
          dout(10) << "comparison returned " << result << " " << strerror(-result) << dendl;
1240
1115
          break;
1241
1116
        }
1242
1117
 
1243
 
        dout(0) << "comparison returned true" << dendl;
 
1118
        dout(10) << "comparison returned true" << dendl;
1244
1119
        info.stats.num_rd++;
1245
1120
      }
1246
1121
      break;
1263
1138
          // write arrives before trimtrunc
1264
1139
          dout(10) << " truncate_seq " << op.extent.truncate_seq << " > current " << seq
1265
1140
                   << ", truncating to " << op.extent.truncate_size << dendl;
1266
 
          t.truncate(coll_t::build_pg_coll(info.pgid), soid, op.extent.truncate_size);
 
1141
          t.truncate(coll_t(info.pgid), soid, op.extent.truncate_size);
1267
1142
          oi.truncate_seq = op.extent.truncate_seq;
1268
1143
          oi.truncate_size = op.extent.truncate_size;
1269
1144
        }
1270
1145
        if (op.extent.length) {
1271
1146
          bufferlist nbl;
1272
1147
          bp.copy(op.extent.length, nbl);
1273
 
          t.write(coll_t::build_pg_coll(info.pgid), soid, op.extent.offset, op.extent.length, nbl);
 
1148
          t.write(coll_t(info.pgid), soid, op.extent.offset, op.extent.length, nbl);
1274
1149
        } else {
1275
 
          t.touch(coll_t::build_pg_coll(info.pgid), soid);
 
1150
          t.touch(coll_t(info.pgid), soid);
1276
1151
        }
1277
1152
        if (ssc->snapset.clones.size()) {
1278
1153
          snapid_t newest = *ssc->snapset.clones.rbegin();
1300
1175
        bufferlist nbl;
1301
1176
        bp.copy(op.extent.length, nbl);
1302
1177
        if (ctx->obs->exists)
1303
 
          t.truncate(coll_t::build_pg_coll(info.pgid), soid, 0);
1304
 
        t.write(coll_t::build_pg_coll(info.pgid), soid, op.extent.offset, op.extent.length, nbl);
 
1178
          t.truncate(coll_t(info.pgid), soid, 0);
 
1179
        t.write(coll_t(info.pgid), soid, op.extent.offset, op.extent.length, nbl);
1305
1180
        if (ssc->snapset.clones.size()) {
1306
1181
          snapid_t newest = *ssc->snapset.clones.rbegin();
 
1182
 
 
1183
          // Replace clone_overlap[newest] with an empty interval set since there
 
1184
          // should no longer be any overlap
1307
1185
          ssc->snapset.clone_overlap.erase(newest);
 
1186
          ssc->snapset.clone_overlap[newest];
1308
1187
          oi.size = 0;
1309
1188
        }
1310
1189
        if (op.extent.length != oi.size) {
1328
1207
      { // zero
1329
1208
        assert(op.extent.length);
1330
1209
        if (!ctx->obs->exists)
1331
 
          t.touch(coll_t::build_pg_coll(info.pgid), soid);
1332
 
        t.zero(coll_t::build_pg_coll(info.pgid), soid, op.extent.offset, op.extent.length);
 
1210
          t.touch(coll_t(info.pgid), soid);
 
1211
        t.zero(coll_t(info.pgid), soid, op.extent.offset, op.extent.length);
1333
1212
        if (ssc->snapset.clones.size()) {
1334
1213
          snapid_t newest = *ssc->snapset.clones.rbegin();
1335
1214
          interval_set<uint64_t> ch;
1348
1227
        if (ctx->obs->exists && (flags & CEPH_OSD_OP_FLAG_EXCL))
1349
1228
          result = -EEXIST; /* this is an exclusive create */
1350
1229
        else {
1351
 
          t.touch(coll_t::build_pg_coll(info.pgid), soid);
 
1230
          t.touch(coll_t(info.pgid), soid);
1352
1231
          ssc->snapset.head_exists = true;
1353
1232
        }
1354
1233
      }
1378
1257
          oi.truncate_size = op.extent.truncate_size;
1379
1258
        }
1380
1259
 
1381
 
        t.truncate(coll_t::build_pg_coll(info.pgid), soid, op.extent.offset);
 
1260
        t.truncate(coll_t(info.pgid), soid, op.extent.offset);
1382
1261
        if (ssc->snapset.clones.size()) {
1383
1262
          snapid_t newest = *ssc->snapset.clones.rbegin();
1384
1263
          interval_set<uint64_t> trim;
1414
1293
    case CEPH_OSD_OP_SETXATTR:
1415
1294
      {
1416
1295
        if (!ctx->obs->exists)
1417
 
          t.touch(coll_t::build_pg_coll(info.pgid), soid);
 
1296
          t.touch(coll_t(info.pgid), soid);
1418
1297
        string aname;
1419
1298
        bp.copy(op.xattr.name_len, aname);
1420
1299
        string name = "_" + aname;
1421
1300
        bufferlist bl;
1422
1301
        bp.copy(op.xattr.value_len, bl);
1423
1302
        if (!ctx->obs->exists)  // create object if it doesn't yet exist.
1424
 
          t.touch(coll_t::build_pg_coll(info.pgid), soid);
1425
 
        t.setattr(coll_t::build_pg_coll(info.pgid), soid, name, bl);
 
1303
          t.touch(coll_t(info.pgid), soid);
 
1304
        t.setattr(coll_t(info.pgid), soid, name, bl);
1426
1305
        ssc->snapset.head_exists = true;
1427
1306
        info.stats.num_wr++;
1428
1307
      }
1433
1312
        string aname;
1434
1313
        bp.copy(op.xattr.name_len, aname);
1435
1314
        string name = "_" + aname;
1436
 
        t.rmattr(coll_t::build_pg_coll(info.pgid), soid, name);
 
1315
        t.rmattr(coll_t(info.pgid), soid, name);
1437
1316
        info.stats.num_wr++;
1438
1317
      }
1439
1318
      break;
1703
1582
              << " " << ceph_osd_op_name(op.op)
1704
1583
              << dendl;
1705
1584
      result = -EOPNOTSUPP;
1706
 
      assert(0);  // for now
1707
1585
    }
1708
1586
 
1709
1587
    if ((is_modify) &&
1727
1605
  ObjectStore::Transaction& t = ctx->op_t;
1728
1606
 
1729
1607
  if (ctx->obs->exists)
1730
 
    t.remove(coll_t::build_pg_coll(info.pgid), soid);
 
1608
    t.remove(coll_t(info.pgid), soid);
1731
1609
  if (ssc->snapset.clones.size()) {
1732
1610
    snapid_t newest = *ssc->snapset.clones.rbegin();
1733
1611
    add_interval_usage(ssc->snapset.clone_overlap[newest], info.stats);
 
1612
 
 
1613
    // Replace clone_overlap[newest] with an empty interval set since there
 
1614
    // should no longer be any overlap
1734
1615
    ssc->snapset.clone_overlap.erase(newest);  // ok, redundant.
 
1616
    ssc->snapset.clone_overlap[newest];
1735
1617
  }
1736
1618
  if (ctx->obs->exists) {
1737
1619
    info.stats.num_objects--;
1747
1629
void ReplicatedPG::_rollback_to(OpContext *ctx, ceph_osd_op& op)
1748
1630
{
1749
1631
  SnapSetContext *ssc = ctx->obs->ssc;
1750
 
  const sobject_t& soid = ctx->obs->oi.soid;
 
1632
  object_info_t& oi = ctx->obs->oi;
 
1633
  const sobject_t& soid = oi.soid;
1751
1634
  ObjectStore::Transaction& t = ctx->op_t;
1752
1635
  snapid_t snapid = (uint64_t)op.snap.snapid;
1753
1636
 
1754
1637
  dout(10) << "_rollback_to " << soid << " snapid " << snapid << dendl;
1755
1638
 
1756
1639
  ObjectContext *rollback_to;
1757
 
  int ret = find_object_context(soid.oid, snapid, &rollback_to, false);
1758
 
  sobject_t& rollback_to_sobject = rollback_to->obs.oi.soid;
 
1640
  int ret = find_object_context(soid.oid, oi.oloc, snapid, &rollback_to, false);
1759
1641
  if (ret) {
1760
1642
    if (-ENOENT == ret) {
1761
1643
      // there's no snapshot here, or there's no object.
1773
1655
      assert(0);
1774
1656
    }
1775
1657
  } else { //we got our context, let's use it to do the rollback!
 
1658
    sobject_t& rollback_to_sobject = rollback_to->obs.oi.soid;
1776
1659
    if (ctx->clone_obc &&
1777
 
        (ctx->clone_obc->obs.oi.prior_version == ctx->obs->oi.version)) {
 
1660
        (ctx->clone_obc->obs.oi.soid.snap == snapid)) {
1778
1661
      //just cloned the rollback target, we don't need to do anything!
1779
1662
    
1780
1663
    } else {
1782
1665
       * 2) Clone correct snapshot into head
1783
1666
       * 3) Calculate clone_overlaps by following overlaps
1784
1667
       *    forward from rollback snapshot */
1785
 
       dout(10) << "_rollback_to deleting " << soid.oid
1786
 
                << " and rolling back to old snap" << dendl;
1787
 
 
1788
 
      sobject_t new_head = get_object_context(ctx->obs->oi.soid)->obs.oi.soid;
 
1668
      dout(10) << "_rollback_to deleting " << soid.oid
 
1669
               << " and rolling back to old snap" << dendl;
1789
1670
      
1790
1671
      _delete_head(ctx);
1791
1672
      ctx->obs->exists = true; //we're about to recreate it
1792
1673
      
1793
1674
      map<string, bufferptr> attrs;
1794
 
      t.clone(coll_t::build_pg_coll(info.pgid),
1795
 
              rollback_to_sobject, new_head);
1796
 
      osd->store->getattrs(coll_t::build_pg_coll(info.pgid),
 
1675
      t.clone(coll_t(info.pgid),
 
1676
              rollback_to_sobject, soid);
 
1677
      osd->store->getattrs(coll_t(info.pgid),
1797
1678
                           rollback_to_sobject, attrs, false);
1798
1679
      osd->filter_xattrs(attrs);
1799
 
      t.setattrs(coll_t::build_pg_coll(info.pgid), new_head, attrs);
 
1680
      t.setattrs(coll_t(info.pgid), soid, attrs);
1800
1681
      ssc->snapset.head_exists = true;
1801
1682
 
 
1683
      // Adjust the cached objectcontext
 
1684
      ObjectContext *clone_context = get_object_context(rollback_to_sobject,
 
1685
                                                        oi.oloc,
 
1686
                                                        false);
 
1687
      assert(clone_context);
 
1688
      ctx->obs->oi.size = clone_context->obs.oi.size;
 
1689
 
1802
1690
      map<snapid_t, interval_set<uint64_t> >::iterator iter =
1803
1691
        ssc->snapset.clone_overlap.lower_bound(snapid);
1804
1692
      interval_set<uint64_t> overlaps = iter->second;
 
1693
      assert(iter != ssc->snapset.clone_overlap.end());
1805
1694
      for ( ;
1806
1695
            iter != ssc->snapset.clone_overlap.end();
1807
1696
            ++iter)
1819
1708
  ::encode(*poi, bv);
1820
1709
 
1821
1710
  map<string, bufferptr> attrs;
1822
 
  osd->store->getattrs(coll_t::build_pg_coll(info.pgid), head, attrs);
 
1711
  osd->store->getattrs(coll_t(info.pgid), head, attrs);
1823
1712
  osd->filter_xattrs(attrs);
1824
1713
 
1825
 
  t.clone(coll_t::build_pg_coll(info.pgid), head, coid);
1826
 
  t.setattr(coll_t::build_pg_coll(info.pgid), coid, OI_ATTR, bv);
1827
 
  t.setattrs(coll_t::build_pg_coll(info.pgid), coid, attrs);
 
1714
  t.clone(coll_t(info.pgid), head, coid);
 
1715
  t.setattr(coll_t(info.pgid), coid, OI_ATTR, bv);
 
1716
  t.setattrs(coll_t(info.pgid), coid, attrs);
1828
1717
}
1829
1718
 
1830
1719
void ReplicatedPG::make_writeable(OpContext *ctx)
1862
1751
      snaps[i] = snapc.snaps[i];
1863
1752
    
1864
1753
    // prepare clone
1865
 
    ctx->clone_obc = new ObjectContext(coid);
1866
 
    ctx->clone_obc->obs.oi.version = ctx->at_version;
1867
 
    ctx->clone_obc->obs.oi.prior_version = oi.version;
1868
 
    ctx->clone_obc->obs.oi.last_reqid = oi.last_reqid;
1869
 
    ctx->clone_obc->obs.oi.mtime = oi.mtime;
1870
 
    ctx->clone_obc->obs.oi.snaps = snaps;
1871
 
    ctx->clone_obc->obs.exists = true;
1872
 
    ctx->clone_obc->get();
1873
 
 
1874
 
    if (is_primary())
 
1754
    object_info_t static_snap_oi(coid, oi.oloc);
 
1755
    object_info_t *snap_oi;
 
1756
    if (is_primary()) {
 
1757
      ctx->clone_obc = new ObjectContext(static_snap_oi, true, NULL);
 
1758
      ctx->clone_obc->get();
1875
1759
      register_object_context(ctx->clone_obc);
1876
 
    
1877
 
    _make_clone(t, soid, coid, &ctx->clone_obc->obs.oi);
 
1760
      snap_oi = &ctx->clone_obc->obs.oi;
 
1761
    } else {
 
1762
      snap_oi = &static_snap_oi;
 
1763
    }
 
1764
    snap_oi->version = ctx->at_version;
 
1765
    snap_oi->prior_version = oi.version;
 
1766
    snap_oi->copy_user_bits(oi);
 
1767
    snap_oi->snaps = snaps;
 
1768
    _make_clone(t, soid, coid, snap_oi);
1878
1769
    
1879
1770
    // add to snap bound collections
1880
1771
    coll_t fc = make_snap_collection(t, snaps[0]);
1881
 
    t.collection_add(fc, coll_t::build_pg_coll(info.pgid), coid);
 
1772
    t.collection_add(fc, coll_t(info.pgid), coid);
1882
1773
    if (snaps.size() > 1) {
1883
1774
      coll_t lc = make_snap_collection(t, snaps[snaps.size()-1]);
1884
 
      t.collection_add(lc, coll_t::build_pg_coll(info.pgid), coid);
 
1775
      t.collection_add(lc, coll_t(info.pgid), coid);
1885
1776
    }
1886
1777
    
1887
1778
    info.stats.num_objects++;
1888
1779
    info.stats.num_object_clones++;
1889
1780
    ssc->snapset.clones.push_back(coid.snap);
1890
1781
    ssc->snapset.clone_size[coid.snap] = ctx->obs->oi.size;
 
1782
 
 
1783
    // clone_overlap should contain an entry for each clone 
 
1784
    // (an empty interval_set if there is no overlap)
 
1785
    ssc->snapset.clone_overlap[coid.snap];
1891
1786
    if (ctx->obs->oi.size)
1892
1787
      ssc->snapset.clone_overlap[coid.snap].insert(0, ctx->obs->oi.size);
1893
1788
    
1911
1806
 
1912
1807
void ReplicatedPG::add_interval_usage(interval_set<uint64_t>& s, pg_stat_t& stats)
1913
1808
{
1914
 
  for (map<uint64_t,uint64_t>::iterator p = s.m.begin(); p != s.m.end(); p++) {
1915
 
    stats.num_bytes += p->second;
1916
 
    stats.num_kb += SHIFT_ROUND_UP(p->first+p->second, 10) - (p->first >> 10);
 
1809
  for (interval_set<uint64_t>::const_iterator p = s.begin(); p != s.end(); ++p) {
 
1810
    stats.num_bytes += p.get_len();
 
1811
    stats.num_kb += SHIFT_ROUND_UP(p.get_start() + p.get_len(), 10) - (p.get_start() >> 10);
1917
1812
  }
1918
1813
}
1919
1814
 
1966
1861
 
1967
1862
    bufferlist bv(sizeof(*poi));
1968
1863
    ::encode(*poi, bv);
1969
 
    ctx->op_t.setattr(coll_t::build_pg_coll(info.pgid), soid, OI_ATTR, bv);
 
1864
    ctx->op_t.setattr(coll_t(info.pgid), soid, OI_ATTR, bv);
1970
1865
 
1971
1866
    dout(10) << " final snapset " << ctx->obs->ssc->snapset
1972
1867
             << " in " << soid << dendl;
1973
 
    ctx->op_t.setattr(coll_t::build_pg_coll(info.pgid), soid, SS_ATTR, bss);   
 
1868
    ctx->op_t.setattr(coll_t(info.pgid), soid, SS_ATTR, bss);   
1974
1869
    if (!head_existed) {
1975
1870
      // if we logically recreated the head, remove old _snapdir object
1976
1871
      sobject_t snapoid(soid.oid, CEPH_SNAPDIR);
1977
1872
 
1978
 
      ctx->snapset_obc = get_object_context(snapoid, false);
 
1873
      ctx->snapset_obc = get_object_context(snapoid, poi->oloc, false);
1979
1874
      if (ctx->snapset_obc && ctx->snapset_obc->obs.exists) {
1980
 
        ctx->op_t.remove(coll_t::build_pg_coll(info.pgid), snapoid);
 
1875
        ctx->op_t.remove(coll_t(info.pgid), snapoid);
1981
1876
        dout(10) << " removing old " << snapoid << dendl;
1982
1877
 
1983
1878
        ctx->at_version.version++;
1985
1880
                                      osd_reqid_t(), ctx->mtime));
1986
1881
 
1987
1882
        ctx->snapset_obc->obs.exists = false;
1988
 
        register_object_context(ctx->snapset_obc);
 
1883
        assert(ctx->snapset_obc->registered);
1989
1884
      }
1990
1885
    }
1991
1886
  } else if (ctx->obs->ssc->snapset.clones.size()) {
1997
1892
    ctx->log.push_back(Log::Entry(Log::Entry::MODIFY, snapoid, ctx->at_version, old_version,
1998
1893
                                  osd_reqid_t(), ctx->mtime));
1999
1894
 
2000
 
    ctx->snapset_obc = get_object_context(snapoid, true);
 
1895
    ctx->snapset_obc = get_object_context(snapoid, poi->oloc, true);
2001
1896
    ctx->snapset_obc->obs.exists = true;
2002
1897
    ctx->snapset_obc->obs.oi.version = ctx->at_version;
2003
1898
    ctx->snapset_obc->obs.oi.last_reqid = ctx->reqid;
2004
1899
    ctx->snapset_obc->obs.oi.mtime = ctx->mtime;
2005
 
    register_object_context(ctx->snapset_obc);
 
1900
    assert(ctx->snapset_obc->registered);
2006
1901
 
2007
1902
    bufferlist bv(sizeof(*poi));
2008
1903
    ::encode(ctx->snapset_obc->obs.oi, bv);
2009
 
    ctx->op_t.touch(coll_t::build_pg_coll(info.pgid), snapoid);
2010
 
    ctx->op_t.setattr(coll_t::build_pg_coll(info.pgid), snapoid, OI_ATTR, bv);
2011
 
    ctx->op_t.setattr(coll_t::build_pg_coll(info.pgid), snapoid, SS_ATTR, bss);
 
1904
    ctx->op_t.touch(coll_t(info.pgid), snapoid);
 
1905
    ctx->op_t.setattr(coll_t(info.pgid), snapoid, OI_ATTR, bv);
 
1906
    ctx->op_t.setattr(coll_t(info.pgid), snapoid, SS_ATTR, bss);
2012
1907
  }
2013
1908
 
2014
1909
  return result;
2083
1978
  repop->tls.push_back(&repop->ctx->local_t);
2084
1979
 
2085
1980
  repop->obc->ondisk_write_lock();
 
1981
  if (repop->ctx->clone_obc)
 
1982
    repop->ctx->clone_obc->ondisk_write_lock();
2086
1983
 
2087
1984
  Context *oncommit = new C_OSD_OpCommit(this, repop);
2088
1985
  Context *onapplied = new C_OSD_OpApplied(this, repop);
2089
 
  Context *onapplied_sync = new C_OSD_OndiskWriteUnlock(repop->obc);
 
1986
  Context *onapplied_sync = new C_OSD_OndiskWriteUnlock(repop->obc,
 
1987
                                                        repop->ctx->clone_obc);
2090
1988
  int r = osd->store->queue_transactions(&osr, repop->tls, onapplied, oncommit, onapplied_sync);
2091
1989
  if (r) {
2092
1990
    dout(-10) << "apply_repop  queue_transactions returned " << r << " on " << *repop << dendl;
2130
2028
  put_object_context(repop->obc);
2131
2029
  repop->obc = 0;
2132
2030
 
 
2031
  last_update_applied = repop->v;
 
2032
  if (last_update_applied == info.last_update && finalizing_scrub) {
 
2033
    kick();
 
2034
  }
2133
2035
  update_stats();
2134
2036
 
 
2037
#if 0
2135
2038
  // any completion stuff to do here?
2136
2039
  if (repop->ctx->ops.size()) {
2137
2040
    const sobject_t& soid = repop->ctx->obs->oi.soid;
2138
2041
    OSDOp& first = repop->ctx->ops[0];
2139
2042
 
2140
2043
    switch (first.op.op) { 
2141
 
#if 0
2142
2044
    case CEPH_OSD_OP_UNBALANCEREADS:
2143
2045
      dout(-10) << "op_applied  completed unbalance-reads on " << oid << dendl;
2144
2046
      unbalancing_reads.erase(oid);
2157
2059
        }
2158
2060
      */
2159
2061
      break;
2160
 
#endif
2161
2062
 
2162
2063
    case CEPH_OSD_OP_WRUNLOCK:
2163
2064
      dout(-10) << "op_applied  completed wrunlock on " << soid << dendl;
2168
2069
      break;
2169
2070
    }   
2170
2071
  }
 
2072
#endif
2171
2073
 
2172
2074
  if (!repop->aborted)
2173
2075
    eval_repop(repop);
2242
2144
          reply = new MOSDOpReply(op, 0, osd->osdmap->get_epoch(), 0);
2243
2145
        reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK);
2244
2146
        dout(10) << " sending commit on " << *repop << " " << reply << dendl;
2245
 
        osd->messenger->send_message(reply, op->get_connection());
 
2147
        assert(entity_name_t::TYPE_OSD != op->get_connection()->peer_type);
 
2148
        osd->client_messenger->send_message(reply, op->get_connection());
2246
2149
        repop->sent_disk = true;
2247
2150
      }
2248
2151
    }
2258
2161
          reply = new MOSDOpReply(op, 0, osd->osdmap->get_epoch(), 0);
2259
2162
        reply->add_flags(CEPH_OSD_FLAG_ACK);
2260
2163
        dout(10) << " sending ack on " << *repop << " " << reply << dendl;
2261
 
        osd->messenger->send_message(reply, op->get_connection());
 
2164
        osd->cluster_messenger->send_message(reply, op->get_connection());
2262
2165
        repop->sent_ack = true;
2263
2166
      }
2264
2167
      
2312
2215
 
2313
2216
    if (op && op->get_flags() & CEPH_OSD_FLAG_PARALLELEXEC) {
2314
2217
      // replicate original op for parallel execution on replica
 
2218
      wr->oloc = repop->ctx->obs->oi.oloc;
2315
2219
      wr->ops = repop->ctx->ops;
2316
2220
      wr->mtime = repop->ctx->mtime;
2317
2221
      wr->old_exists = old_exists;
2329
2233
    
2330
2234
    wr->pg_trim_to = pg_trim_to;
2331
2235
    wr->peer_stat = osd->get_my_stat_for(now, peer);
2332
 
    osd->messenger->send_message(wr, osd->osdmap->get_inst(peer));
 
2236
    osd->cluster_messenger->
 
2237
      send_message(wr, osd->osdmap->get_cluster_inst(peer));
2333
2238
 
2334
2239
    // keep peer_info up to date
2335
2240
    if (!repop->noop) {
2431
2336
// -------------------------------------------------------
2432
2337
 
2433
2338
 
2434
 
ReplicatedPG::ObjectContext *ReplicatedPG::get_object_context(const sobject_t& soid, bool can_create)
 
2339
ReplicatedPG::ObjectContext *ReplicatedPG::get_object_context(const sobject_t& soid,
 
2340
                                                              const object_locator_t& oloc,
 
2341
                                                              bool can_create)
2435
2342
{
2436
2343
  map<sobject_t, ObjectContext*>::iterator p = object_contexts.find(soid);
2437
2344
  ObjectContext *obc;
2442
2349
  } else {
2443
2350
    // check disk
2444
2351
    bufferlist bv;
2445
 
    int r = osd->store->getattr(coll_t::build_pg_coll(info.pgid), soid, OI_ATTR, bv);
2446
 
    if (r < 0 && !can_create)
2447
 
      return 0;   // -ENOENT!
2448
 
 
2449
 
    obc = new ObjectContext(soid);
2450
 
 
2451
 
    if (can_create)
 
2352
    int r = osd->store->getattr(coll_t(info.pgid), soid, OI_ATTR, bv);
 
2353
    if (r < 0) {
 
2354
      if (!can_create)
 
2355
        return NULL;   // -ENOENT!
 
2356
      object_info_t oi(soid, oloc);
 
2357
      obc = new ObjectContext(oi, false, NULL);
 
2358
    }
 
2359
    else {
 
2360
      object_info_t oi(bv);
 
2361
      SnapSetContext *ssc = NULL;
 
2362
      if (can_create)
 
2363
        ssc = get_snapset_context(soid.oid, true);
 
2364
      obc = new ObjectContext(oi, true, ssc);
 
2365
    }
 
2366
    register_object_context(obc);
 
2367
 
 
2368
    if (can_create && !obc->obs.ssc)
2452
2369
      obc->obs.ssc = get_snapset_context(soid.oid, true);
2453
2370
 
2454
2371
    if (r >= 0) {
2464
2381
}
2465
2382
 
2466
2383
 
2467
 
int ReplicatedPG::find_object_context(const object_t& oid, snapid_t snapid,
 
2384
int ReplicatedPG::find_object_context(const object_t& oid, const object_locator_t& oloc,
 
2385
                                      snapid_t snapid,
2468
2386
                                      ObjectContext **pobc,
2469
 
                                      bool can_create)
 
2387
                                      bool can_create,
 
2388
                                      snapid_t *psnapid)
2470
2389
{
2471
2390
  // want the head?
2472
2391
  sobject_t head(oid, CEPH_NOSNAP);
2473
2392
  if (snapid == CEPH_NOSNAP) {
2474
 
    ObjectContext *obc = get_object_context(head, can_create);
 
2393
    ObjectContext *obc = get_object_context(head, oloc, can_create);
2475
2394
    if (!obc)
2476
2395
      return -ENOENT;
2477
2396
    dout(10) << "find_object_context " << oid << " @" << snapid << dendl;
2494
2413
  // head?
2495
2414
  if (snapid > ssc->snapset.seq) {
2496
2415
    if (ssc->snapset.head_exists) {
2497
 
      ObjectContext *obc = get_object_context(head, false);
 
2416
      ObjectContext *obc = get_object_context(head, oloc, false);
2498
2417
      dout(10) << "find_object_context  " << head
2499
2418
               << " want " << snapid << " > snapset seq " << ssc->snapset.seq
2500
2419
               << " -- HIT " << obc->obs
2533
2452
 
2534
2453
  if (missing.is_missing(soid)) {
2535
2454
    dout(20) << "get_object_context  " << soid << " missing, try again later" << dendl;
 
2455
    if (psnapid)
 
2456
      *psnapid = soid.snap;
2536
2457
    return -EAGAIN;
2537
2458
  }
2538
2459
 
2539
 
  ObjectContext *obc = get_object_context(soid);
 
2460
  ObjectContext *obc = get_object_context(soid, oloc, false);
2540
2461
 
2541
2462
  // clone
2542
2463
  dout(20) << "get_object_context  " << soid << " snaps " << obc->obs.oi.snaps << dendl;
2591
2512
  } else {
2592
2513
    bufferlist bv;
2593
2514
    sobject_t head(oid, CEPH_NOSNAP);
2594
 
    int r = osd->store->getattr(coll_t::build_pg_coll(info.pgid), head, SS_ATTR, bv);
 
2515
    int r = osd->store->getattr(coll_t(info.pgid), head, SS_ATTR, bv);
2595
2516
    if (r < 0) {
2596
2517
      // try _snapset
2597
2518
      sobject_t snapdir(oid, CEPH_SNAPDIR);
2598
 
      r = osd->store->getattr(coll_t::build_pg_coll(info.pgid), snapdir, SS_ATTR, bv);
 
2519
      r = osd->store->getattr(coll_t(info.pgid), snapdir, SS_ATTR, bv);
2599
2520
      if (r < 0 && !can_create)
2600
2521
        return NULL;
2601
2522
    }
2602
2523
    ssc = new SnapSetContext(oid);
 
2524
    register_snapset_context(ssc);
2603
2525
    if (r >= 0) {
2604
2526
      bufferlist::iterator bvp = bv.begin();
2605
2527
      ssc->snapset.decode(bvp);
2625
2547
  }
2626
2548
}
2627
2549
 
2628
 
 
2629
 
 
2630
 
// ----------------------
2631
 
// balance reads cruft
2632
 
 
2633
 
// for reads
2634
 
#if 0
2635
 
  // wrlocked?
2636
 
  if ((op->get_snapid() == 0 || op->get_snapid() == CEPH_NOSNAP) &&
2637
 
      block_if_wrlocked(op, *ctx.poi)) 
2638
 
    return;
2639
 
#endif
2640
 
 
2641
 
#if 0
2642
 
  // !primary and unbalanced?
2643
 
  //  (ignore ops forwarded from the primary)
2644
 
  if (!is_primary()) {
2645
 
    if (op->get_source().is_osd() &&
2646
 
        op->get_source().num() == get_primary()) {
2647
 
      // read was shed to me by the primary
2648
 
      int from = op->get_source().num();
2649
 
      assert(op->get_flags() & CEPH_OSD_FLAG_PEERSTAT);
2650
 
      osd->take_peer_stat(from, op->get_peer_stat());
2651
 
      dout(10) << "read shed IN from " << op->get_source() 
2652
 
                << " " << op->get_reqid()
2653
 
                << ", me = " << osd->my_stat.read_latency_mine
2654
 
                << ", them = " << op->get_peer_stat().read_latency
2655
 
                << (osd->my_stat.read_latency_mine > op->get_peer_stat().read_latency ? " WTF":"")
2656
 
                << dendl;
2657
 
      osd->logger->inc(l_osd_shdin);
2658
 
 
2659
 
      // does it look like they were wrong to do so?
2660
 
      Mutex::Locker lock(osd->peer_stat_lock);
2661
 
      if (osd->my_stat.read_latency_mine > op->get_peer_stat().read_latency &&
2662
 
          osd->my_stat_on_peer[from].read_latency_mine < op->get_peer_stat().read_latency) {
2663
 
        dout(-10) << "read shed IN from " << op->get_source() 
2664
 
                  << " " << op->get_reqid()
2665
 
                  << " and me " << osd->my_stat.read_latency_mine
2666
 
                  << " > them " << op->get_peer_stat().read_latency
2667
 
                  << ", but they didn't know better, sharing" << dendl;
2668
 
        osd->my_stat_on_peer[from] = osd->my_stat;
2669
 
        /*
2670
 
        osd->messenger->send_message(new MOSDPing(osd->osdmap->get_fsid(), osd->osdmap->get_epoch(),
2671
 
                                                  osd->my_stat),
2672
 
                                     osd->osdmap->get_inst(from));
2673
 
        */
2674
 
      }
2675
 
    } else {
2676
 
      // make sure i exist and am balanced, otherwise fw back to acker.
2677
 
      bool b;
2678
 
      if (!osd->store->exists(coll_t::build_pg_coll(info.pgid), soid) || 
2679
 
          osd->store->getattr(coll_t::build_pg_coll(info.pgid), soid, "balance-reads", &b, 1) < 0) {
2680
 
        dout(-10) << "read on replica, object " << soid 
2681
 
                  << " dne or no balance-reads, fw back to primary" << dendl;
2682
 
        osd->messenger->forward_message(op, osd->osdmap->get_inst(get_primary()));
2683
 
        return;
2684
 
      }
2685
 
    }
2686
 
  }
2687
 
#endif
2688
 
 
2689
 
// for writes
2690
 
#if 0
2691
 
  // balance-reads set?
2692
 
  char v;
2693
 
  if ((op->get_op() != CEPH_OSD_OP_BALANCEREADS && op->get_op() != CEPH_OSD_OP_UNBALANCEREADS) &&
2694
 
      (osd->store->getattr(coll_t::build_pg_coll(info.pgid), soid, "balance-reads", &v, 1) >= 0 ||
2695
 
       balancing_reads.count(soid.oid))) {
2696
 
    
2697
 
    if (!unbalancing_reads.count(soid.oid)) {
2698
 
      // unbalance
2699
 
      dout(-10) << "preprocess_op unbalancing-reads on " << soid.oid << dendl;
2700
 
      unbalancing_reads.insert(soid.oid);
2701
 
      
2702
 
      ceph_object_layout layout;
2703
 
      layout.ol_pgid = info.pgid.u.pg64;
2704
 
      layout.ol_stripe_unit = 0;
2705
 
      MOSDOp *pop = new MOSDOp(0, osd->get_tid(),
2706
 
                               soid.oid,
2707
 
                               layout,
2708
 
                               osd->osdmap->get_epoch(),
2709
 
                               CEPH_OSD_OP_UNBALANCEREADS, 0);
2710
 
      do_op(pop);
2711
 
    }
2712
 
 
2713
 
    // add to wait queue
2714
 
    dout(-10) << "preprocess_op waiting for unbalance-reads on " << soid.oid << dendl;
2715
 
    waiting_for_unbalanced_reads[soid.oid].push_back(op);
2716
 
    delete ctx;
2717
 
    return;
2718
 
  }
2719
 
#endif
2720
 
 
2721
 
#if 0
2722
 
  // wrlock?
2723
 
  if (!ctx->ops.empty() &&  // except noop; we just want to flush
2724
 
      block_if_wrlocked(op, obc->oi)) {
2725
 
    put_object_context(obc);
2726
 
    delete ctx;
2727
 
    return; // op will be handled later, after the object unlocks
2728
 
  }
2729
 
#endif
2730
 
 
2731
 
 
2732
 
 
2733
 
 
2734
 
 
2735
 
 
2736
2550
// sub op modify
2737
2551
 
2738
2552
void ReplicatedPG::sub_op_modify(MOSDSubOp *op)
2794
2608
 
2795
2609
    } else {
2796
2610
      // do op
2797
 
      ObjectState obs(op->poid);
2798
 
      obs.oi.version = op->old_version;
2799
 
      obs.oi.size = op->old_size;
2800
 
      obs.exists = op->old_exists;
 
2611
      assert(0);
 
2612
 
 
2613
      // TODO: this is severely broken because we don't know whether this object is really lost or
 
2614
      // not. We just always assume that it's not right now.
 
2615
      // Also, we're taking the address of a variable on the stack. 
 
2616
      object_info_t oi(soid, op->oloc);
 
2617
      oi.lost = false; // I guess?
 
2618
      oi.version = op->old_version;
 
2619
      oi.size = op->old_size;
 
2620
      ObjectState obs(oi, op->old_exists, NULL);
2801
2621
      
2802
2622
      rm->ctx = new OpContext(op, op->reqid, op->ops, &obs, this);
2803
2623
      
2833
2653
    derr(0) << "error applying transaction: r = " << r << dendl;
2834
2654
    assert(0);
2835
2655
  }
 
2656
  // op is cleaned up by oncommit/onapply when both are executed
2836
2657
}
2837
2658
 
2838
2659
void ReplicatedPG::sub_op_modify_applied(RepModify *rm)
2845
2666
    MOSDSubOpReply *ack = new MOSDSubOpReply(rm->op, 0, osd->osdmap->get_epoch(), CEPH_OSD_FLAG_ACK);
2846
2667
    ack->set_peer_stat(osd->get_my_stat_for(g_clock.now(), rm->ackerosd));
2847
2668
    ack->set_priority(CEPH_MSG_PRIO_HIGH);
2848
 
    osd->messenger->send_message(ack, osd->osdmap->get_inst(rm->ackerosd));
 
2669
    osd->cluster_messenger->
 
2670
      send_message(ack, osd->osdmap->get_cluster_inst(rm->ackerosd));
2849
2671
  }
2850
2672
 
2851
2673
  rm->applied = true;
2852
2674
  bool done = rm->applied && rm->committed;
2853
2675
 
 
2676
  last_update_applied = rm->op->version;
 
2677
  if (last_update_applied == info.last_update && finalizing_scrub) {
 
2678
    kick();
 
2679
  }
 
2680
 
2854
2681
  unlock();
2855
2682
  if (done) {
2856
2683
    delete rm->ctx;
2877
2704
    MOSDSubOpReply *commit = new MOSDSubOpReply(rm->op, 0, osd->osdmap->get_epoch(), CEPH_OSD_FLAG_ONDISK);
2878
2705
    commit->set_last_complete_ondisk(rm->last_complete);
2879
2706
    commit->set_peer_stat(osd->get_my_stat_for(g_clock.now(), rm->ackerosd));
2880
 
    osd->messenger->send_message(commit, osd->osdmap->get_inst(rm->ackerosd));
 
2707
    osd->cluster_messenger->
 
2708
      send_message(commit, osd->osdmap->get_cluster_inst(rm->ackerosd));
2881
2709
  }
2882
2710
  
2883
2711
  rm->committed = true;
2931
2759
           << " clone_overlap " << snapset.clone_overlap << dendl;
2932
2760
 
2933
2761
  struct stat st;
2934
 
  osd->store->stat(coll_t::build_pg_coll(info.pgid), head, &st);
 
2762
  osd->store->stat(coll_t(info.pgid), head, &st);
2935
2763
 
2936
2764
  interval_set<uint64_t> cloning;
2937
2765
  interval_set<uint64_t> prev;
3137
2965
  // do not include clone_subsets in pull request; we will recalculate this
3138
2966
  // when the object is pushed back.
3139
2967
  //subop->clone_subsets.swap(clone_subsets);
3140
 
  osd->messenger->send_message(subop, osd->osdmap->get_inst(fromosd));
 
2968
  osd->cluster_messenger->
 
2969
    send_message(subop, osd->osdmap->get_cluster_inst(fromosd));
3141
2970
}
3142
2971
 
3143
2972
 
3205
3034
void ReplicatedPG::push_start(const sobject_t& soid, int peer)
3206
3035
{
3207
3036
  struct stat st;
3208
 
  int r = osd->store->stat(coll_t::build_pg_coll(info.pgid), soid, &st);
 
3037
  int r = osd->store->stat(coll_t(info.pgid), soid, &st);
3209
3038
  assert(r == 0);
3210
3039
  uint64_t size = st.st_size;
3211
3040
 
3212
3041
  bufferlist bl;
3213
 
  r = osd->store->getattr(coll_t::build_pg_coll(info.pgid), soid, OI_ATTR, bl);
 
3042
  r = osd->store->getattr(coll_t(info.pgid), soid, OI_ATTR, bl);
3214
3043
  object_info_t oi(bl);
3215
3044
 
3216
3045
  interval_set<uint64_t> data_subset;
3254
3083
  bufferlist bl;
3255
3084
  map<string,bufferptr> attrset;
3256
3085
 
3257
 
  for (map<uint64_t,uint64_t>::iterator p = data_subset.m.begin();
3258
 
       p != data_subset.m.end();
3259
 
       p++) {
 
3086
  for (interval_set<uint64_t>::iterator p = data_subset.begin();
 
3087
       p != data_subset.end();
 
3088
       ++p) {
3260
3089
    bufferlist bit;
3261
 
    osd->store->read(coll_t::build_pg_coll(info.pgid), soid, p->first, p->second, bit);
3262
 
    if (p->second != bit.length()) {
3263
 
      dout(10) << " extent " << p->first << "~" << p->second
3264
 
               << " is actually " << p->first << "~" << bit.length() << dendl;
3265
 
      p->second = bit.length();
 
3090
    osd->store->read(coll_t(info.pgid),
 
3091
                     soid, p.get_start(), p.get_len(), bit);
 
3092
    if (p.get_len() != bit.length()) {
 
3093
      dout(10) << " extent " << p.get_start() << "~" << p.get_len()
 
3094
               << " is actually " << p.get_start() << "~" << bit.length() << dendl;
 
3095
      p.set_len(bit.length());
3266
3096
    }
3267
3097
    bl.claim_append(bit);
3268
3098
  }
3269
3099
 
3270
 
  osd->store->getattrs(coll_t::build_pg_coll(info.pgid), soid, attrset);
 
3100
  osd->store->getattrs(coll_t(info.pgid), soid, attrset);
3271
3101
 
3272
3102
  bufferlist bv;
3273
3103
  bv.push_back(attrset[OI_ATTR]);
3288
3118
  osd_reqid_t rid;  // useless?
3289
3119
  MOSDSubOp *subop = new MOSDSubOp(rid, info.pgid, soid, false, 0,
3290
3120
                                   osd->osdmap->get_epoch(), osd->get_tid(), oi.version);
 
3121
  subop->oloc = oi.oloc;
3291
3122
  subop->ops = vector<OSDOp>(1);
3292
3123
  subop->ops[0].op.op = CEPH_OSD_OP_PUSH;
3293
3124
  //subop->ops[0].op.extent.offset = 0;
3299
3130
  subop->old_size = size;
3300
3131
  subop->first = first;
3301
3132
  subop->complete = complete;
3302
 
  osd->messenger->send_message(subop, osd->osdmap->get_inst(peer));
 
3133
  osd->cluster_messenger->
 
3134
    send_message(subop, osd->osdmap->get_cluster_inst(peer));
3303
3135
}
3304
3136
 
3305
3137
void ReplicatedPG::sub_op_push_reply(MOSDSubOpReply *reply)
3321
3153
 
3322
3154
    bool complete = false;
3323
3155
    if (pi->data_subset.empty() ||
3324
 
        pi->data_subset.end() == pi->data_subset_pushing.end())
 
3156
        pi->data_subset.range_end() == pi->data_subset_pushing.range_end())
3325
3157
      complete = true;
3326
3158
 
3327
3159
    if (!complete) {
3328
3160
      // push more
3329
 
      uint64_t from = pi->data_subset_pushing.end();
 
3161
      uint64_t from = pi->data_subset_pushing.range_end();
3330
3162
      pi->data_subset_pushing.span_of(pi->data_subset, from, g_conf.osd_recovery_max_chunk);
3331
3163
      dout(10) << " pushing more, " << pi->data_subset_pushing << " of " << pi->data_subset << dendl;
3332
 
      complete = pi->data_subset.end() == pi->data_subset_pushing.end();
 
3164
      complete = pi->data_subset.range_end() == pi->data_subset_pushing.range_end();
3333
3165
      send_push_op(soid, peer, pi->size, false, complete, pi->data_subset_pushing, pi->clone_subsets);
3334
3166
    } else {
3335
3167
      // done!
3336
3168
      peer_missing[peer].got(soid, pi->version);
3337
 
      if (peer_missing[peer].num_missing() == 0) 
3338
 
        uptodate_set.insert(peer);
3339
3169
      
3340
3170
      pushing[soid].erase(peer);
3341
3171
      pi = NULL;
3376
3206
  assert(!is_primary());  // we should be a replica or stray.
3377
3207
 
3378
3208
  struct stat st;
3379
 
  int r = osd->store->stat(coll_t::build_pg_coll(info.pgid), soid, &st);
3380
 
  assert(r == 0);
3381
 
  uint64_t size = st.st_size;
3382
 
 
3383
 
  bool complete = false;
3384
 
  if (!op->data_subset.empty() && op->data_subset.end() >= size)
3385
 
    complete = true;
3386
 
 
3387
 
  send_push_op(soid, op->get_source().num(), size, op->first, complete, op->data_subset, op->clone_subsets);
 
3209
  int r = osd->store->stat(coll_t(info.pgid), soid, &st);
 
3210
  if (r != 0) {
 
3211
    stringstream ss;
 
3212
    char buf[80];
 
3213
    ss << op->get_source() << " tried to pull " << soid << " in " << info.pgid
 
3214
       << " but got " << strerror_r(-r, buf, sizeof(buf));
 
3215
    osd->logclient.log(LOG_ERROR, ss);
 
3216
 
 
3217
    // FIXME: do something more intelligent.. mark the pg as needing repair?
 
3218
 
 
3219
  } else {
 
3220
    uint64_t size = st.st_size;
 
3221
 
 
3222
    bool complete = false;
 
3223
    if (!op->data_subset.empty() && op->data_subset.range_end() >= size)
 
3224
      complete = true;
 
3225
 
 
3226
    // complete==true implies we are definitely complete.
 
3227
    // complete==false means nothing.  we don't know because the primary may
 
3228
    // not be pulling the entire object.
 
3229
 
 
3230
    send_push_op(soid, op->get_source().num(), size, op->first, complete, op->data_subset, op->clone_subsets);
 
3231
  }
3388
3232
  op->put();
3389
3233
}
3390
3234
 
3413
3257
    if (last_complete_ondisk == info.last_update) {
3414
3258
      if (is_replica()) {
3415
3259
        // we are fully up to date.  tell the primary!
3416
 
        osd->messenger->send_message(new MOSDPGTrim(osd->osdmap->get_epoch(), info.pgid,
3417
 
                                                    last_complete_ondisk),
3418
 
                                     osd->osdmap->get_inst(get_primary()));
 
3260
        osd->cluster_messenger->
 
3261
          send_message(new MOSDPGTrim(osd->osdmap->get_epoch(), info.pgid,
 
3262
                                      last_complete_ondisk),
 
3263
                       osd->osdmap->get_cluster_inst(get_primary()));
3419
3264
      } else if (is_primary()) {
3420
3265
        // we are the primary.  tell replicas to trim?
3421
3266
        if (calc_min_last_complete_ondisk())
3449
3294
  dout(7) << "op_push " 
3450
3295
          << soid 
3451
3296
          << " v " << v 
 
3297
          << " " << op->oloc
3452
3298
          << " len " << push.op.extent.length
3453
3299
          << " data_subset " << op->data_subset
3454
3300
          << " clone_subsets " << op->clone_subsets
3478
3324
  pull_info_t *pi = 0;
3479
3325
  bool first = op->first;
3480
3326
  bool complete = op->complete;
 
3327
 
 
3328
  // op->complete == true means we reached the end of the object (file size)
 
3329
  // op->complete == false means nothing; we may not have asked for the whole thing.
 
3330
 
3481
3331
  if (is_primary()) {
3482
3332
    if (pulling.count(soid) == 0) {
3483
3333
      dout(10) << " not pulling, ignoring" << dendl;
3503
3353
      interval_set<uint64_t> data_needed;
3504
3354
      calc_clone_subsets(ssc->snapset, soid, missing, data_needed, clone_subsets);
3505
3355
      put_snapset_context(ssc);
 
3356
 
 
3357
      interval_set<uint64_t> overlap;
 
3358
      overlap.intersection_of(data_subset, data_needed);
3506
3359
      
3507
 
      dout(10) << "sub_op_push need " << data_needed << ", got " << data_subset << dendl;
3508
 
      if (!data_needed.subset_of(data_subset)) {
3509
 
        dout(0) << " we did not get enough of " << soid << " object data" << dendl;
3510
 
        op->put();
3511
 
        return;
3512
 
      }
 
3360
      dout(10) << "sub_op_push need " << data_needed << ", got " << data_subset
 
3361
               << ", overlap " << overlap << dendl;
3513
3362
 
3514
3363
      // did we get more data than we need?
3515
3364
      if (!data_subset.subset_of(data_needed)) {
3519
3368
 
3520
3369
        bufferlist result;
3521
3370
        int off = 0;
3522
 
        for (map<uint64_t,uint64_t>::iterator p = data_subset.m.begin();
3523
 
             p != data_subset.m.end();
3524
 
             p++) {
 
3371
        for (interval_set<uint64_t>::const_iterator p = data_subset.begin();
 
3372
             p != data_subset.end();
 
3373
             ++p) {
3525
3374
          interval_set<uint64_t> x;
3526
 
          x.insert(p->first, p->second);
 
3375
          x.insert(p.get_start(), p.get_len());
3527
3376
          x.intersection_of(data_needed);
3528
 
          dout(20) << " data_subset object extent " << p->first << "~" << p->second << " need " << x << dendl;
 
3377
          dout(20) << " data_subset object extent " << p.get_start() << "~" << p.get_len() << " need " << x << dendl;
3529
3378
          if (!x.empty()) {
3530
 
            uint64_t first = x.m.begin()->first;
3531
 
            uint64_t len = x.m.begin()->second;
 
3379
            uint64_t first = x.begin().get_start();
 
3380
            uint64_t len = x.begin().get_len();
3532
3381
            bufferlist sub;
3533
 
            int boff = off + (first - p->first);
 
3382
            int boff = off + (first - p.get_start());
3534
3383
            dout(20) << "   keeping buffer extent " << boff << "~" << len << dendl;
3535
3384
            sub.substr_of(data, boff, len);
3536
3385
            result.claim_append(sub);
3537
3386
          }
3538
 
          off += p->second;
 
3387
          off += p.get_len();
3539
3388
        }
3540
3389
        data.claim(result);
3541
3390
        dout(20) << " new data len is " << data.length() << dendl;
3542
3391
      }
 
3392
 
 
3393
      // did we get everything we wanted?
 
3394
      if (pi->data_subset.empty()) {
 
3395
        complete = true;
 
3396
      } else {
 
3397
        complete = pi->data_subset.range_end() == data_subset.range_end();
 
3398
      }
 
3399
 
 
3400
      if (op->complete && !complete) {
 
3401
        dout(0) << " uh oh, we reached EOF on peer before we got everything we wanted" << dendl;
 
3402
 
 
3403
        // hmm, do we have another source?
 
3404
        int from = op->get_source().num();
 
3405
        set<int>& reps = missing_loc[soid];
 
3406
        dout(0) << " we have reps on osds " << reps << dendl;
 
3407
        set<int>::iterator q = reps.begin();
 
3408
        if (q != reps.end() && *q == from) {
 
3409
          q++;
 
3410
          if (q != reps.end()) {
 
3411
            dout(0) << " trying next replica on osd" << *q << dendl;
 
3412
            reps.erase(reps.begin());  // forget about the bad replica...
 
3413
            finish_recovery_op(soid);  // close out this attempt,
 
3414
            pulling.erase(soid);
 
3415
            pull(soid);                // and try again.
 
3416
          }
 
3417
        }
 
3418
        op->put();
 
3419
        return;
 
3420
      }
 
3421
 
3543
3422
    } else {
3544
 
      // head|unversioned. for now, primary will _only_ pull full copies of the head.
 
3423
      // head|unversioned. for now, primary will _only_ pull data copies of the head (no cloning)
3545
3424
      assert(op->clone_subsets.empty());
3546
3425
    }
3547
 
 
3548
 
    if (pi->data_subset.empty()) {
3549
 
      complete = true;
3550
 
    } else {
3551
 
      complete = pi->data_subset.end() == data_subset.end();
3552
 
    }
3553
 
    assert(complete == op->complete);
3554
3426
  }
3555
3427
  dout(15) << " data_subset " << data_subset
3556
3428
           << " clone_subsets " << clone_subsets
3559
3431
 
3560
3432
  coll_t target;
3561
3433
  if (first && complete)
3562
 
    target = coll_t::build_pg_coll(info.pgid);
 
3434
    target = coll_t(info.pgid);
3563
3435
  else
3564
 
    target = coll_t(coll_t::TYPE_TEMP);
 
3436
    target = coll_t::TEMP_COLL;
3565
3437
 
3566
3438
  // write object and add it to the PG
3567
3439
  ObjectStore::Transaction *t = new ObjectStore::Transaction;
3573
3445
 
3574
3446
  // write data
3575
3447
  uint64_t boff = 0;
3576
 
  for (map<uint64_t,uint64_t>::iterator p = data_subset.m.begin();
3577
 
       p != data_subset.m.end(); 
3578
 
       p++) {
 
3448
  for (interval_set<uint64_t>::const_iterator p = data_subset.begin();
 
3449
       p != data_subset.end();
 
3450
       ++p) {
3579
3451
    bufferlist bit;
3580
 
    bit.substr_of(data, boff, p->second);
3581
 
    dout(15) << " write " << p->first << "~" << p->second << dendl;
3582
 
    t->write(target, soid, p->first, p->second, bit);
3583
 
    boff += p->second;
 
3452
    bit.substr_of(data, boff, p.get_len());
 
3453
    dout(15) << " write " << p.get_start() << "~" << p.get_len() << dendl;
 
3454
    t->write(target, soid, p.get_start(), p.get_len(), bit);
 
3455
    boff += p.get_len();
3584
3456
  }
3585
3457
  
3586
3458
  if (complete) {
3587
3459
    if (!first) {
3588
 
      t->remove(coll_t::build_pg_coll(info.pgid), soid);
3589
 
      t->collection_add(coll_t::build_pg_coll(info.pgid), target, soid);
 
3460
      t->remove(coll_t(info.pgid), soid);
 
3461
      t->collection_add(coll_t(info.pgid), target, soid);
3590
3462
      t->collection_remove(target, soid);
3591
3463
    }
3592
3464
 
3593
3465
    // clone bits
3594
 
    for (map<sobject_t, interval_set<uint64_t> >::iterator p = clone_subsets.begin();
 
3466
    for (map<sobject_t, interval_set<uint64_t> >::const_iterator p = clone_subsets.begin();
3595
3467
         p != clone_subsets.end();
3596
 
         p++)
3597
 
      for (map<uint64_t,uint64_t>::iterator q = p->second.m.begin();
3598
 
           q != p->second.m.end(); 
3599
 
         q++) {
3600
 
        dout(15) << " clone_range " << p->first << " " << q->first << "~" << q->second << dendl;
3601
 
        t->clone_range(coll_t::build_pg_coll(info.pgid), p->first, soid, q->first, q->second);
 
3468
         ++p)
 
3469
    {
 
3470
      for (interval_set<uint64_t>::const_iterator q = p->second.begin();
 
3471
           q != p->second.end();
 
3472
           ++q)
 
3473
      {
 
3474
        dout(15) << " clone_range " << p->first << " "
 
3475
                 << q.get_start() << "~" << q.get_len() << dendl;
 
3476
        t->clone_range(coll_t(info.pgid), p->first, soid,
 
3477
                       q.get_start(), q.get_len());
3602
3478
      }
 
3479
    }
3603
3480
 
3604
3481
    if (data_subset.empty())
3605
 
      t->touch(coll_t::build_pg_coll(info.pgid), soid);
 
3482
      t->touch(coll_t(info.pgid), soid);
3606
3483
 
3607
 
    t->setattrs(coll_t::build_pg_coll(info.pgid), soid, op->attrset);
 
3484
    t->setattrs(coll_t(info.pgid), soid, op->attrset);
3608
3485
    if (soid.snap && soid.snap < CEPH_NOSNAP &&
3609
3486
        op->attrset.count(OI_ATTR)) {
3610
3487
      bufferlist bl;
3612
3489
      object_info_t oi(bl);
3613
3490
      if (oi.snaps.size()) {
3614
3491
        coll_t lc = make_snap_collection(*t, oi.snaps[0]);
3615
 
        t->collection_add(lc, coll_t::build_pg_coll(info.pgid), soid);
 
3492
        t->collection_add(lc, coll_t(info.pgid), soid);
3616
3493
        if (oi.snaps.size() > 1) {
3617
3494
          coll_t hc = make_snap_collection(*t, oi.snaps[oi.snaps.size()-1]);
3618
 
          t->collection_add(hc, coll_t::build_pg_coll(info.pgid), soid);
 
3495
          t->collection_add(hc, coll_t(info.pgid), soid);
3619
3496
        }
3620
3497
      }
3621
3498
    }
3623
3500
    if (missing.is_missing(soid, v)) {
3624
3501
      dout(10) << "got missing " << soid << " v " << v << dendl;
3625
3502
      missing.got(soid, v);
 
3503
      if (is_primary())
 
3504
        missing_loc.erase(soid);
3626
3505
      
3627
3506
      // raise last_complete?
3628
3507
      while (log.complete_to != log.log.end()) {
3644
3523
    // track ObjectContext
3645
3524
    if (is_primary()) {
3646
3525
      dout(10) << " setting up obc for " << soid << dendl;
3647
 
      ObjectContext *obc = get_object_context(soid, true);
3648
 
      register_object_context(obc);
 
3526
      ObjectContext *obc = get_object_context(soid, op->oloc, true);
 
3527
      assert(obc->registered);
3649
3528
      obc->ondisk_write_lock();
3650
3529
      
3651
3530
      obc->obs.exists = true;
3676
3555
                                        onreadable_sync);
3677
3556
  assert(r == 0);
3678
3557
 
3679
 
  osd->logger->inc(l_osd_r_pull);
3680
 
  osd->logger->inc(l_osd_r_pullb, data.length());
 
3558
  osd->logger->inc(l_osd_r_push);
 
3559
  osd->logger->inc(l_osd_r_pushb, data.length());
3681
3560
 
3682
3561
  if (is_primary()) {
3683
3562
    assert(pi);
3684
3563
 
3685
3564
    if (complete) {
3686
 
      missing_loc.erase(soid);
3687
 
 
3688
3565
      // close out pull op
3689
3566
      pulling.erase(soid);
3690
3567
      finish_recovery_op(soid);
3691
3568
      
3692
3569
      update_stats();
3693
 
 
3694
 
      if (info.is_uptodate())
3695
 
        uptodate_set.insert(osd->get_nodeid());
3696
3570
    } else {
3697
3571
      // pull more
3698
 
      pi->data_subset_pulling.span_of(pi->data_subset, data_subset.end(), g_conf.osd_recovery_max_chunk);
 
3572
      pi->data_subset_pulling.span_of(pi->data_subset, data_subset.range_end(), g_conf.osd_recovery_max_chunk);
3699
3573
      dout(10) << " pulling more, " << pi->data_subset_pulling << " of " << pi->data_subset << dendl;
3700
3574
      send_pull_op(soid, v, false, pi->data_subset_pulling, pi->from);
3701
3575
    }
3718
3592
 
3719
3593
  } else {
3720
3594
    // ack if i'm a replica and being pushed to.
3721
 
    MOSDSubOpReply *reply = new MOSDSubOpReply(op, 0, osd->osdmap->get_epoch(), CEPH_OSD_FLAG_ACK); 
3722
 
    osd->messenger->send_message(reply, op->get_connection());
 
3595
    MOSDSubOpReply *reply = new MOSDSubOpReply(op, 0, osd->osdmap->get_epoch(), CEPH_OSD_FLAG_ACK);
 
3596
    assert(entity_name_t::TYPE_OSD == op->get_connection()->peer_type);
 
3597
    osd->cluster_messenger->send_message(reply, op->get_connection());
3723
3598
  }
3724
3599
 
3725
3600
  if (complete) {
3788
3663
{
3789
3664
  dout(10) << "on_change" << dendl;
3790
3665
  apply_and_flush_repops(is_primary());
3791
 
  
 
3666
 
 
3667
  // clear reserved scrub state
 
3668
  clear_scrub_reserved();
 
3669
 
 
3670
  // take object waiters
 
3671
  take_object_waiters(waiting_for_missing_object);
 
3672
  take_object_waiters(waiting_for_degraded_object);
 
3673
 
3792
3674
  // clear pushing/pulling maps
3793
3675
  pushing.clear();
3794
3676
  pulling.clear();
3804
3686
       p++)
3805
3687
    osd->take_waiters(p->second);
3806
3688
  waiting_for_ondisk.clear();
3807
 
 
3808
 
  // take object waiters
3809
 
  take_object_waiters(waiting_for_missing_object);
3810
 
  take_object_waiters(waiting_for_degraded_object);
3811
3689
}
3812
3690
 
3813
3691
 
3828
3706
{
3829
3707
  int started = 0;
3830
3708
  assert(is_primary());
3831
 
  
3832
 
  while (max > 0) {
3833
 
    int n;
3834
 
    if (uptodate_set.count(osd->whoami))
3835
 
      n = recover_replicas(max);
3836
 
    else
3837
 
      n = recover_primary(max);
3838
 
    started += n;
3839
 
    osd->logger->inc(l_osd_rop, n);
3840
 
    if (n < max)
3841
 
      break;
3842
 
    max -= n;
3843
 
  }
3844
 
  return started;
 
3709
 
 
3710
  int num_missing = missing.num_missing();
 
3711
  int num_unfound = get_num_unfound();
 
3712
 
 
3713
  if (num_missing == 0) {
 
3714
    info.last_complete = info.last_update;
 
3715
  }
 
3716
 
 
3717
  if (num_missing == num_unfound) {
 
3718
    // All of the missing objects we have are unfound.
 
3719
    // Recover the replicas.
 
3720
    started = recover_replicas(max);
 
3721
  } else {
 
3722
    // We still have missing objects that we should grab from replicas.
 
3723
    started = recover_primary(max);
 
3724
  }
 
3725
 
 
3726
  osd->logger->inc(l_osd_rop, started);
 
3727
 
 
3728
  if (started)
 
3729
    return started;
 
3730
 
 
3731
  if (is_all_uptodate()) {
 
3732
    dout(10) << __func__ << ": all OSDs in the PG are up-to-date!" << dendl;
 
3733
    log.reset_recovery_pointers();
 
3734
    ObjectStore::Transaction *t = new ObjectStore::Transaction;
 
3735
    C_Contexts *fin = new C_Contexts;
 
3736
    finish_recovery(*t, fin->contexts);
 
3737
    int tr = osd->store->queue_transaction(&osr, t, new ObjectStore::C_DeleteTransaction(t), fin);
 
3738
    assert(tr == 0);
 
3739
  }
 
3740
  else {
 
3741
    dout(10) << __func__ << ": some OSDs are not up-to-date yet. "
 
3742
             << "Recovery isn't finished yet." << dendl;
 
3743
  }
 
3744
 
 
3745
  return 0;
3845
3746
}
3846
3747
 
3847
 
 
3848
 
 
3849
 
 
3850
3748
/**
3851
3749
 * do one recovery op.
3852
3750
 * return true if done, false if nothing left to do.
3878
3776
      soid = p->second;
3879
3777
    }
3880
3778
    Missing::item& item = missing.missing[p->second];
 
3779
    p++;
3881
3780
 
3882
3781
    sobject_t head = soid;
3883
3782
    head.snap = CEPH_NOSNAP;
3884
3783
 
 
3784
    bool unfound = (missing_loc.find(soid) == missing_loc.end());
 
3785
 
3885
3786
    dout(10) << "recover_primary "
3886
3787
             << soid << " " << item.need
 
3788
             << (unfound ? "":" (unfound)")
3887
3789
             << (missing.is_missing(soid) ? " (missing)":"")
3888
3790
             << (missing.is_missing(head) ? " (missing head)":"")
3889
3791
             << (pulling.count(soid) ? " (pulling)":"")
3893
3795
    if (!pulling.count(soid)) {
3894
3796
      if (pulling.count(head)) {
3895
3797
        ++skipped;
 
3798
      } else if (unfound) {
 
3799
        ++skipped;
3896
3800
      } else {
3897
3801
        // is this a clone operation that we can do locally?
3898
3802
        if (latest && latest->op == Log::Entry::CLONE) {
3903
3807
                     << " snaps " << latest->snaps << dendl;
3904
3808
            ObjectStore::Transaction *t = new ObjectStore::Transaction;
3905
3809
 
3906
 
            ObjectContext *headobc = get_object_context(head);
 
3810
            // NOTE: we know headobc exists on disk, and the oloc will be loaded with it, so
 
3811
            // it is safe to pass in a blank one here.
 
3812
            ObjectContext *headobc = get_object_context(head, OLOC_BLANK, false);
3907
3813
 
3908
 
            object_info_t oi(soid);
 
3814
            object_info_t oi(headobc->obs.oi);
3909
3815
            oi.version = latest->version;
3910
3816
            oi.prior_version = latest->prior_version;
3911
 
            oi.last_reqid = headobc->obs.oi.last_reqid;
3912
 
            oi.mtime = headobc->obs.oi.mtime;
3913
3817
            ::decode(oi.snaps, latest->snaps);
 
3818
            oi.copy_user_bits(headobc->obs.oi);
3914
3819
            _make_clone(*t, head, soid, &oi);
3915
3820
 
3916
3821
            put_object_context(headobc);
3933
3838
      }
3934
3839
    }
3935
3840
    
3936
 
    p++;
3937
 
 
3938
3841
    // only advance last_requested if we haven't skipped anything
3939
3842
    if (!skipped)
3940
3843
      log.last_requested = v;
3941
3844
  }
3942
 
 
3943
 
  // done?
3944
 
  if (!pulling.empty()) {
3945
 
    dout(7) << "recover_primary requested everything, still waiting" << dendl;
3946
 
    return started;
3947
 
  }
3948
 
  if (missing.num_missing()) {
3949
 
    dout(7) << "recover_primary still missing " << missing << dendl;
3950
 
    return started;
3951
 
  }
3952
 
 
3953
 
  // done.
3954
 
  if (info.last_complete != info.last_update) {
3955
 
    dout(7) << "recover_primary last_complete " << info.last_complete << " -> " << info.last_update << dendl;
3956
 
    info.last_complete = info.last_update;
3957
 
  }
3958
 
 
3959
 
  log.reset_recovery_pointers();
3960
 
 
3961
 
  uptodate_set.insert(osd->whoami);
3962
 
  if (is_all_uptodate()) {
3963
 
    dout(-7) << "recover_primary complete" << dendl;
3964
 
    ObjectStore::Transaction *t = new ObjectStore::Transaction;
3965
 
    C_Contexts *fin = new C_Contexts;
3966
 
    finish_recovery(*t, fin->contexts);
3967
 
    int tr = osd->store->queue_transaction(&osr, t, new ObjectStore::C_DeleteTransaction(t), fin);
3968
 
    assert(tr == 0);
3969
 
  } else {
3970
 
    dout(-10) << "recover_primary primary now complete, starting peer recovery" << dendl;
3971
 
  }
3972
 
 
3973
3845
  return started;
3974
3846
}
3975
3847
 
3979
3851
 
3980
3852
  dout(10) << "recover_object_replicas " << soid << dendl;
3981
3853
 
3982
 
  ObjectContext *obc = get_object_context(soid);
 
3854
  // NOTE: we know we will get a valid oloc off of disk here.
 
3855
  ObjectContext *obc = get_object_context(soid, OLOC_BLANK, false);
3983
3856
  dout(10) << " ondisk_read_lock for " << soid << dendl;
3984
3857
  obc->ondisk_read_lock();
3985
3858
  
4004
3877
 
4005
3878
int ReplicatedPG::recover_replicas(int max)
4006
3879
{
 
3880
  dout(10) << __func__ << "(" << max << ")" << dendl;
4007
3881
  int started = 0;
4008
 
  dout(-10) << "recover_replicas" << dendl;
4009
3882
 
4010
3883
  // this is FAR from an optimal recovery order.  pretty lame, really.
4011
3884
  for (unsigned i=1; i<acting.size(); i++) {
4012
3885
    int peer = acting[i];
4013
 
    assert(peer_missing.count(peer));
 
3886
    map<int, Missing>::const_iterator pm = peer_missing.find(peer);
 
3887
    assert(pm != peer_missing.end());
 
3888
    size_t m_sz = pm->second.num_missing();
4014
3889
 
4015
 
    dout(10) << " peer osd" << peer << " missing " << peer_missing[peer] << dendl;
4016
 
    dout(20) << "   " << peer_missing[peer].missing << dendl;
 
3890
    dout(10) << " peer osd" << peer << " missing " << m_sz << " objects." << dendl;
 
3891
    dout(20) << " peer osd" << peer << " missing " << pm->second.missing << dendl;
4017
3892
 
4018
3893
    // oldest first!
4019
 
    Missing &m = peer_missing[peer];
4020
 
    for (map<eversion_t, sobject_t>::iterator p = m.rmissing.begin();
 
3894
    const Missing &m(pm->second);
 
3895
    for (map<eversion_t, sobject_t>::const_iterator p = m.rmissing.begin();
4021
3896
           p != m.rmissing.end() && started < max;
4022
 
           p++) {
4023
 
      sobject_t soid = p->second;
4024
 
      if (pushing.count(soid))
4025
 
        dout(10) << " already pushing " << soid << dendl;
4026
 
      else
4027
 
        started += recover_object_replicas(soid);
 
3897
           ++p) {
 
3898
      const sobject_t soid(p->second);
 
3899
      if (pushing.count(soid)) {
 
3900
        dout(10) << __func__ << ": already pushing " << soid << dendl;
 
3901
        continue;
 
3902
      }
 
3903
      if (missing.is_missing(soid)) {
 
3904
        if (missing_loc.find(soid) == missing_loc.end())
 
3905
          dout(10) << __func__ << ": " << soid << " still unfound" << dendl;
 
3906
        else
 
3907
          dout(10) << __func__ << ": " << soid << " still missing on primary" << dendl;
 
3908
        continue;
 
3909
      }
 
3910
 
 
3911
      dout(10) << __func__ << ": recover_object_replicas(" << soid << ")" << dendl;
 
3912
      started += recover_object_replicas(soid);
4028
3913
    }
4029
3914
  }
4030
 
  
4031
 
  // nothing to do!
4032
 
  dout(-10) << "recover_replicas - nothing to do!" << dendl;
4033
 
 
4034
 
  if (is_all_uptodate()) {
4035
 
    ObjectStore::Transaction *t = new ObjectStore::Transaction;
4036
 
    C_Contexts *fin = new C_Contexts;
4037
 
    finish_recovery(*t, fin->contexts);
4038
 
    int tr = osd->store->queue_transaction(&osr, t, new ObjectStore::C_DeleteTransaction(t), fin);
4039
 
    assert(tr == 0);
4040
 
  } else {
4041
 
    dout(10) << "recover_replicas not all uptodate, acting " << acting << ", uptodate " << uptodate_set << dendl;
4042
 
  }
4043
3915
 
4044
3916
  return started;
4045
3917
}
4046
3918
 
4047
 
 
4048
3919
void ReplicatedPG::remove_object_with_snap_hardlinks(ObjectStore::Transaction& t, const sobject_t& soid)
4049
3920
{
4050
 
  t.remove(coll_t::build_pg_coll(info.pgid), soid);
 
3921
  t.remove(coll_t(info.pgid), soid);
4051
3922
  if (soid.snap < CEPH_MAXSNAP) {
4052
3923
    bufferlist ba;
4053
 
    int r = osd->store->getattr(coll_t::build_pg_coll(info.pgid), soid, OI_ATTR, ba);
 
3924
    int r = osd->store->getattr(coll_t(info.pgid), soid, OI_ATTR, ba);
4054
3925
    if (r >= 0) {
4055
3926
      // grr, need first snap bound, too.
4056
3927
      object_info_t oi(ba);
4057
3928
      if (oi.snaps[0] != soid.snap)
4058
 
        t.remove(coll_t::build_snap_pg_coll(info.pgid, oi.snaps[0]), soid);
4059
 
      t.remove(coll_t::build_snap_pg_coll(info.pgid, soid.snap), soid);
 
3929
        t.remove(coll_t(info.pgid, oi.snaps[0]), soid);
 
3930
      t.remove(coll_t(info.pgid, soid.snap), soid);
4060
3931
    }
4061
3932
  }
4062
3933
}
4077
3948
    
4078
3949
    // be thorough.
4079
3950
    vector<sobject_t> ls;
4080
 
    osd->store->collection_list(coll_t::build_pg_coll(info.pgid), ls);
 
3951
    osd->store->collection_list(coll_t(info.pgid), ls);
4081
3952
 
4082
3953
    set<sobject_t> s;   
4083
3954
    for (vector<sobject_t>::iterator i = ls.begin();
4147
4018
{
4148
4019
  dout(10) << "_scrub" << dendl;
4149
4020
 
4150
 
  coll_t c = coll_t::build_pg_coll(info.pgid);
 
4021
  coll_t c(info.pgid);
4151
4022
  bool repair = state_test(PG_STATE_REPAIR);
4152
4023
  const char *mode = repair ? "repair":"scrub";
4153
4024
 
4154
4025
  // traverse in reverse order.
4155
4026
  sobject_t head;
4156
4027
  SnapSet snapset;
4157
 
  unsigned curclone = 0;
 
4028
  vector<snapid_t>::reverse_iterator curclone;
4158
4029
 
4159
4030
  pg_stat_t stat;
4160
4031
 
4161
4032
  bufferlist last_data;
4162
4033
 
4163
 
  for (vector<ScrubMap::object>::reverse_iterator p = scrubmap.objects.rbegin(); 
 
4034
  for (map<sobject_t,ScrubMap::object>::reverse_iterator p = scrubmap.objects.rbegin(); 
4164
4035
       p != scrubmap.objects.rend(); 
4165
4036
       p++) {
4166
 
    const sobject_t& soid = p->poid;
 
4037
    const sobject_t& soid = p->first;
4167
4038
    stat.num_objects++;
4168
4039
 
4169
4040
    // new snapset?
4170
4041
    if (soid.snap == CEPH_SNAPDIR ||
4171
4042
        soid.snap == CEPH_NOSNAP) {
4172
 
      if (p->attrs.count(SS_ATTR) == 0) {
 
4043
      if (p->second.attrs.count(SS_ATTR) == 0) {
4173
4044
        dout(0) << mode << " no '" << SS_ATTR << "' attr on " << soid << dendl;
4174
4045
        errors++;
4175
4046
        continue;
4176
4047
      }
4177
4048
      bufferlist bl;
4178
 
      bl.push_back(p->attrs[SS_ATTR]);
 
4049
      bl.push_back(p->second.attrs[SS_ATTR]);
4179
4050
      bufferlist::iterator blp = bl.begin();
4180
4051
      ::decode(snapset, blp);
4181
4052
 
4189
4060
      // what will be next?
4190
4061
      if (snapset.clones.empty())
4191
4062
        head = sobject_t();  // no clones.
4192
 
      else
4193
 
        curclone = snapset.clones.size()-1;
 
4063
      else {
 
4064
        curclone = snapset.clones.rbegin();
 
4065
        head = p->first;
 
4066
      }
4194
4067
 
4195
4068
      // subtract off any clone overlap
4196
4069
      for (map<snapid_t,interval_set<uint64_t> >::iterator q = snapset.clone_overlap.begin();
4197
4070
           q != snapset.clone_overlap.end();
4198
 
           q++) {
4199
 
        for (map<uint64_t,uint64_t>::iterator r = q->second.m.begin();
4200
 
             r != q->second.m.end();
4201
 
             r++) {
4202
 
          stat.num_bytes -= r->second;
4203
 
          stat.num_kb -= SHIFT_ROUND_UP(r->first+r->second, 10) - (r->first >> 10);
 
4071
           ++q) {
 
4072
        for (interval_set<uint64_t>::const_iterator r = q->second.begin();
 
4073
             r != q->second.end();
 
4074
             ++r) {
 
4075
          stat.num_bytes -= r.get_len();
 
4076
          stat.num_kb -= SHIFT_ROUND_UP(r.get_start()+r.get_len(), 10) - (r.get_start() >> 10);
4204
4077
        }         
4205
4078
      }
4206
4079
    }
4208
4081
      continue;
4209
4082
 
4210
4083
    // basic checks.
4211
 
    if (p->attrs.count(OI_ATTR) == 0) {
 
4084
    if (p->second.attrs.count(OI_ATTR) == 0) {
4212
4085
      dout(0) << mode << " no '" << OI_ATTR << "' attr on " << soid << dendl;
4213
4086
      errors++;
4214
4087
      continue;
4215
4088
    }
4216
4089
    bufferlist bv;
4217
 
    bv.push_back(p->attrs[OI_ATTR]);
 
4090
    bv.push_back(p->second.attrs[OI_ATTR]);
4218
4091
    object_info_t oi(bv);
4219
4092
 
4220
4093
    dout(20) << mode << "  " << soid << " " << oi << dendl;
4221
4094
 
4222
 
    stat.num_bytes += p->size;
4223
 
    stat.num_kb += SHIFT_ROUND_UP(p->size, 10);
 
4095
    stat.num_bytes += p->second.size;
 
4096
    stat.num_kb += SHIFT_ROUND_UP(p->second.size, 10);
4224
4097
 
4225
4098
    //bufferlist data;
4226
4099
    //osd->store->read(c, poid, 0, 0, data);
4238
4111
 
4239
4112
      stat.num_object_clones++;
4240
4113
      
4241
 
      assert(soid.snap == snapset.clones[curclone]);
 
4114
      assert(soid.snap == *curclone);
4242
4115
 
4243
 
      assert(p->size == snapset.clone_size[curclone]);
 
4116
      assert(p->second.size == snapset.clone_size[*curclone]);
4244
4117
 
4245
4118
      // verify overlap?
4246
4119
      // ...
4247
4120
 
4248
4121
      // what's next?
4249
 
      curclone++;
4250
 
      if (curclone == snapset.clones.size())
 
4122
      if (curclone != snapset.clones.rend())
 
4123
        curclone++;
 
4124
 
 
4125
      if (curclone == snapset.clones.rend())
4251
4126
        head = sobject_t();
4252
4127
 
4253
4128
    } else {
4287
4162
      for (unsigned i=1; i<acting.size(); i++) {
4288
4163
        MOSDPGInfo *m = new MOSDPGInfo(osd->osdmap->get_epoch());
4289
4164
        m->pg_info.push_back(info);
4290
 
        osd->messenger->send_message(m, osd->osdmap->get_inst(acting[i]));
 
4165
        osd->cluster_messenger->
 
4166
          send_message(m, osd->osdmap->get_cluster_inst(acting[i]));
4291
4167
      }
4292
4168
    }
4293
4169
  }