~ubuntu-branches/ubuntu/precise/ceph/precise

« back to all changes in this revision

Viewing changes to src/osdc/Objecter.h

  • Committer: Bazaar Package Importer
  • Author(s): Clint Byrum, Clint Byrum, Micah Gersten
  • Date: 2011-02-12 22:50:26 UTC
  • mfrom: (1.1.1 upstream)
  • Revision ID: james.westby@ubuntu.com-20110212225026-yyyw4tk0msgql3ul
Tags: 0.24.2-0ubuntu1
[ Clint Byrum <clint@ubuntu.com> ]
* New upstream release. (LP: #658670, LP: #684011)
* debian/patches/fix-mkcephfs.patch: dropped (applied upstream)
* Removed .la files from libceph1-dev, libcrush1-dev and 
  librados1-dev (per Debian policy v3.9.1 10.2).
* debian/control: adding pkg-config as a build dependency
* debian/control: depend on libcrypto++-dev instead of libssl-dev
* debian/watch: added watch file

[ Micah Gersten <micahg@ubuntu.com> ]
* debian/control: add Homepage

Show diffs side-by-side

added added

removed removed

Lines of Context:
26
26
 
27
27
#include <list>
28
28
#include <map>
 
29
#include <memory>
29
30
#include <ext/hash_map>
30
31
using namespace std;
31
32
using namespace __gnu_cxx;
119
120
    bufferlist bl;
120
121
    add_data(CEPH_OSD_OP_DELETE, 0, 0, bl);
121
122
  }
 
123
  void mapext(uint64_t off, uint64_t len) {
 
124
    bufferlist bl;
 
125
    add_data(CEPH_OSD_OP_MAPEXT, off, len, bl);
 
126
  }
 
127
  void sparse_read(uint64_t off, uint64_t len) {
 
128
    bufferlist bl;
 
129
    add_data(CEPH_OSD_OP_SPARSE_READ, off, len, bl);
 
130
  }
122
131
 
123
132
  // object attrs
124
133
  void getxattr(const char *name) {
185
194
  int client_inc;
186
195
  int num_unacked;
187
196
  int num_uncommitted;
 
197
  bool keep_balanced_budget;
 
198
  bool honor_osdmap_full;
188
199
 
189
200
  void maybe_request_map();
190
201
 
192
203
  version_t last_seen_pgmap_version;
193
204
 
194
205
  Mutex &client_lock;
195
 
  SafeTimer timer;
 
206
  SafeTimer &timer;
196
207
  
197
208
  class C_Tick : public Context {
198
209
    Objecter *ob;
211
222
    xlist<Op*>::item session_item;
212
223
 
213
224
    object_t oid;
214
 
    ceph_object_layout layout;
 
225
    object_locator_t oloc;
 
226
    pg_t pgid;
 
227
 
 
228
    Connection *con;
 
229
 
215
230
    vector<OSDOp> ops;
216
231
 
217
232
    snapid_t snapid;
218
233
    SnapContext snapc;
219
234
    utime_t mtime;
220
235
 
221
 
    bufferlist inbl;
222
236
    bufferlist *outbl;
223
237
 
224
238
    int flags, priority;
230
244
 
231
245
    bool paused;
232
246
 
233
 
    Op(const object_t& o, ceph_object_layout& l, vector<OSDOp>& op,
 
247
    Op(const object_t& o, const object_locator_t& ol, vector<OSDOp>& op,
234
248
       int f, Context *ac, Context *co) :
235
249
      session_item(this),
236
 
      oid(o), layout(l), 
 
250
      oid(o), oloc(ol),
 
251
      con(NULL),
237
252
      snapid(CEPH_NOSNAP), outbl(0), flags(f), priority(0), onack(ac), oncommit(co), 
238
253
      tid(0), attempts(0),
239
254
      paused(false) {
410
425
 
411
426
  void resend_mon_ops();
412
427
 
 
428
  /**
 
429
   * handle a budget for in-flight ops
 
430
   * budget is taken whenever an op goes into the op_osd map
 
431
   * and returned whenever an op is removed from the map
 
432
   * If throttle_op needs to throttle it will unlock client_lock.
 
433
   */
 
434
  int calc_op_budget(Op *op);
 
435
  void throttle_op(Op *op, int op_size=0);
 
436
  void take_op_budget(Op *op) {
 
437
    int op_budget = calc_op_budget(op);
 
438
    if (keep_balanced_budget)
 
439
      throttle_op(op, op_budget);
 
440
    else
 
441
      op_throttler.take(op_budget);
 
442
  }
 
443
  void put_op_budget(Op *op) {
 
444
    int op_budget = calc_op_budget(op);
 
445
    op_throttler.put(op_budget);
 
446
  }
 
447
  Throttle op_throttler;
 
448
 
413
449
 public:
414
 
  Objecter(Messenger *m, MonClient *mc, OSDMap *om, Mutex& l) : 
 
450
  Objecter(Messenger *m, MonClient *mc, OSDMap *om, Mutex& l, SafeTimer& t) : 
415
451
    messenger(m), monc(mc), osdmap(om),
416
452
    last_tid(0), client_inc(-1),
417
453
    num_unacked(0), num_uncommitted(0),
 
454
    keep_balanced_budget(false), honor_osdmap_full(true),
418
455
    last_seen_osdmap_version(0),
419
456
    last_seen_pgmap_version(0),
420
 
    client_lock(l), timer(l)
 
457
    client_lock(l), timer(t),
 
458
    op_throttler(g_conf.objecter_inflight_op_bytes)
421
459
  { }
422
460
  ~Objecter() { }
423
461
 
424
462
  void init();
425
463
  void shutdown();
426
464
 
 
465
  /**
 
466
   * Tell the objecter to throttle outgoing ops according to its
 
467
   * budget (in g_conf). If you do this, ops can block, in
 
468
   * which case it will unlock client_lock and sleep until
 
469
   * incoming messages reduce the used budget low enough for
 
470
   * the ops to continue going; then it will lock client_lock again.
 
471
   */
 
472
  void set_balanced_budget() { keep_balanced_budget = true; }
 
473
  void unset_balanced_budget() { keep_balanced_budget = false; }
 
474
 
 
475
  void set_honor_osdmap_full() { honor_osdmap_full = true; }
 
476
  void unset_honor_osdmap_full() { honor_osdmap_full = false; }
 
477
 
427
478
  // messages
428
479
 public:
429
480
  void dispatch(Message *m);
430
481
  void handle_osd_op_reply(class MOSDOpReply *m);
431
482
  void handle_osd_map(class MOSDMap *m);
 
483
  void wait_for_osd_map();
432
484
 
433
485
private:
434
486
  // low-level
450
502
  }
451
503
 
452
504
  // mid-level helpers
453
 
  tid_t mutate(const object_t& oid, ceph_object_layout ol, 
 
505
  tid_t mutate(const object_t& oid, const object_locator_t& oloc, 
454
506
               ObjectOperation& op,
455
507
               const SnapContext& snapc, utime_t mtime, int flags,
456
508
               Context *onack, Context *oncommit) {
457
 
    Op *o = new Op(oid, ol, op.ops, flags, onack, oncommit);
 
509
    Op *o = new Op(oid, oloc, op.ops, flags | CEPH_OSD_FLAG_WRITE, onack, oncommit);
458
510
    o->priority = op.priority;
459
511
    o->mtime = mtime;
460
512
    o->snapc = snapc;
461
513
    return op_submit(o);
462
514
  }
463
 
  tid_t read(const object_t& oid, ceph_object_layout ol, 
 
515
  tid_t read(const object_t& oid, const object_locator_t& oloc, 
464
516
             ObjectOperation& op,
465
517
             snapid_t snapid, bufferlist *pbl, int flags,
466
518
             Context *onack) {
467
 
    Op *o = new Op(oid, ol, op.ops, flags, onack, NULL);
 
519
    Op *o = new Op(oid, oloc, op.ops, flags | CEPH_OSD_FLAG_READ, onack, NULL);
468
520
    o->priority = op.priority;
469
521
    o->snapid = snapid;
470
522
    o->outbl = pbl;
472
524
  }
473
525
 
474
526
  // high-level helpers
475
 
  tid_t stat(const object_t& oid, ceph_object_layout ol, snapid_t snap,
 
527
  tid_t stat(const object_t& oid, const object_locator_t& oloc, snapid_t snap,
476
528
             uint64_t *psize, utime_t *pmtime, int flags, 
477
529
             Context *onfinish) {
478
530
    vector<OSDOp> ops(1);
479
531
    ops[0].op.op = CEPH_OSD_OP_STAT;
480
532
    C_Stat *fin = new C_Stat(psize, pmtime, onfinish);
481
 
    Op *o = new Op(oid, ol, ops, flags, fin, 0);
 
533
    Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_READ, fin, 0);
482
534
    o->snapid = snap;
483
535
    o->outbl = &fin->bl;
484
536
    return op_submit(o);
485
537
  }
486
538
 
487
 
  tid_t read(const object_t& oid, ceph_object_layout ol, 
 
539
  tid_t read(const object_t& oid, const object_locator_t& oloc, 
488
540
             uint64_t off, uint64_t len, snapid_t snap, bufferlist *pbl, int flags,
489
541
             Context *onfinish) {
490
542
    vector<OSDOp> ops(1);
493
545
    ops[0].op.extent.length = len;
494
546
    ops[0].op.extent.truncate_size = 0;
495
547
    ops[0].op.extent.truncate_seq = 0;
496
 
    Op *o = new Op(oid, ol, ops, flags, onfinish, 0);
 
548
    Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_READ, onfinish, 0);
497
549
    o->snapid = snap;
498
550
    o->outbl = pbl;
499
551
    return op_submit(o);
500
552
  }
501
 
  tid_t read_trunc(const object_t& oid, ceph_object_layout ol, 
 
553
  tid_t read_trunc(const object_t& oid, const object_locator_t& oloc, 
502
554
             uint64_t off, uint64_t len, snapid_t snap, bufferlist *pbl, int flags,
503
555
             uint64_t trunc_size, __u32 trunc_seq,
504
556
             Context *onfinish) {
508
560
    ops[0].op.extent.length = len;
509
561
    ops[0].op.extent.truncate_size = trunc_size;
510
562
    ops[0].op.extent.truncate_seq = trunc_seq;
511
 
    Op *o = new Op(oid, ol, ops, flags, onfinish, 0);
 
563
    Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_READ, onfinish, 0);
 
564
    o->snapid = snap;
 
565
    o->outbl = pbl;
 
566
    return op_submit(o);
 
567
  }
 
568
  tid_t mapext(const object_t& oid, const object_locator_t& oloc,
 
569
             uint64_t off, uint64_t len, snapid_t snap, bufferlist *pbl, int flags,
 
570
             Context *onfinish) {
 
571
    vector<OSDOp> ops(1);
 
572
    ops[0].op.op = CEPH_OSD_OP_MAPEXT;
 
573
    ops[0].op.extent.offset = off;
 
574
    ops[0].op.extent.length = len;
 
575
    ops[0].op.extent.truncate_size = 0;
 
576
    ops[0].op.extent.truncate_seq = 0;
 
577
    Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_READ, onfinish, 0);
 
578
    o->snapid = snap;
 
579
    o->outbl = pbl;
 
580
    return op_submit(o);
 
581
  }
 
582
  tid_t sparse_read(const object_t& oid, const object_locator_t& oloc,
 
583
             uint64_t off, uint64_t len, snapid_t snap, bufferlist *pbl, int flags,
 
584
             Context *onfinish) {
 
585
    vector<OSDOp> ops(1);
 
586
    ops[0].op.op = CEPH_OSD_OP_SPARSE_READ;
 
587
    ops[0].op.extent.offset = off;
 
588
    ops[0].op.extent.length = len;
 
589
    ops[0].op.extent.truncate_size = 0;
 
590
    ops[0].op.extent.truncate_seq = 0;
 
591
    Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_READ, onfinish, 0);
512
592
    o->snapid = snap;
513
593
    o->outbl = pbl;
514
594
    return op_submit(o);
515
595
  }
516
596
 
517
 
  tid_t getxattr(const object_t& oid, ceph_object_layout ol,
 
597
  tid_t getxattr(const object_t& oid, const object_locator_t& oloc,
518
598
             const char *name, snapid_t snap, bufferlist *pbl, int flags,
519
599
             Context *onfinish) {
520
600
    vector<OSDOp> ops(1);
523
603
    ops[0].op.xattr.value_len = 0;
524
604
    if (name)
525
605
      ops[0].data.append(name);
526
 
    Op *o = new Op(oid, ol, ops, flags, onfinish, 0);
 
606
    Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_READ, onfinish, 0);
527
607
    o->snapid = snap;
528
608
    o->outbl = pbl;
529
609
    return op_submit(o);
530
610
  }
531
611
 
532
 
  tid_t getxattrs(const object_t& oid, ceph_object_layout ol, snapid_t snap,
 
612
  tid_t getxattrs(const object_t& oid, const object_locator_t& oloc, snapid_t snap,
533
613
             map<string,bufferlist>& attrset,
534
614
             int flags, Context *onfinish) {
535
615
    vector<OSDOp> ops(1);
536
616
    ops[0].op.op = CEPH_OSD_OP_GETXATTRS;
537
617
    C_GetAttrs *fin = new C_GetAttrs(attrset, onfinish);
538
 
    Op *o = new Op(oid, ol, ops, flags, fin, 0);
 
618
    Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_READ, fin, 0);
539
619
    o->snapid = snap;
540
620
    o->outbl = &fin->bl;
541
621
    return op_submit(o);
542
622
  }
543
623
 
544
 
  tid_t read_full(const object_t& oid, ceph_object_layout ol,
 
624
  tid_t read_full(const object_t& oid, const object_locator_t& oloc,
545
625
                  snapid_t snap, bufferlist *pbl, int flags,
546
626
                  Context *onfinish) {
547
 
    return read(oid, ol, 0, 0, snap, pbl, flags, onfinish);
 
627
    return read(oid, oloc, 0, 0, snap, pbl, flags | CEPH_OSD_FLAG_READ, onfinish);
548
628
  }
549
629
     
550
630
  // writes
551
 
  tid_t _modify(const object_t& oid, ceph_object_layout ol, 
 
631
  tid_t _modify(const object_t& oid, const object_locator_t& oloc, 
552
632
                vector<OSDOp>& ops, utime_t mtime,
553
633
                const SnapContext& snapc, int flags,
554
634
               Context *onack, Context *oncommit) {
555
 
    Op *o = new Op(oid, ol, ops, flags, onack, oncommit);
 
635
    Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_WRITE, onack, oncommit);
556
636
    o->mtime = mtime;
557
637
    o->snapc = snapc;
558
638
    return op_submit(o);
559
639
  }
560
 
  tid_t write(const object_t& oid, ceph_object_layout ol,
 
640
  tid_t write(const object_t& oid, const object_locator_t& oloc,
561
641
              uint64_t off, uint64_t len, const SnapContext& snapc, const bufferlist &bl,
562
642
              utime_t mtime, int flags,
563
643
              Context *onack, Context *oncommit) {
568
648
    ops[0].op.extent.truncate_size = 0;
569
649
    ops[0].op.extent.truncate_seq = 0;
570
650
    ops[0].data = bl;
571
 
    Op *o = new Op(oid, ol, ops, flags, onack, oncommit);
 
651
    Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_WRITE, onack, oncommit);
572
652
    o->mtime = mtime;
573
653
    o->snapc = snapc;
574
654
    return op_submit(o);
575
655
  }
576
 
  tid_t write_trunc(const object_t& oid, ceph_object_layout ol,
 
656
  tid_t write_trunc(const object_t& oid, const object_locator_t& oloc,
577
657
              uint64_t off, uint64_t len, const SnapContext& snapc, const bufferlist &bl,
578
658
              utime_t mtime, int flags,
579
659
             uint64_t trunc_size, __u32 trunc_seq,
585
665
    ops[0].op.extent.truncate_size = trunc_size;
586
666
    ops[0].op.extent.truncate_seq = trunc_seq;
587
667
    ops[0].data = bl;
588
 
    Op *o = new Op(oid, ol, ops, flags, onack, oncommit);
 
668
    Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_WRITE, onack, oncommit);
589
669
    o->mtime = mtime;
590
670
    o->snapc = snapc;
591
671
    return op_submit(o);
592
672
  }
593
 
  tid_t write_full(const object_t& oid, ceph_object_layout ol,
 
673
  tid_t write_full(const object_t& oid, const object_locator_t& oloc,
594
674
                   const SnapContext& snapc, const bufferlist &bl, utime_t mtime, int flags,
595
675
                   Context *onack, Context *oncommit) {
596
676
    vector<OSDOp> ops(1);
598
678
    ops[0].op.extent.offset = 0;
599
679
    ops[0].op.extent.length = bl.length();
600
680
    ops[0].data = bl;
601
 
    Op *o = new Op(oid, ol, ops, flags, onack, oncommit);
 
681
    Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_WRITE, onack, oncommit);
602
682
    o->mtime = mtime;
603
683
    o->snapc = snapc;
604
684
    return op_submit(o);
605
685
  }
606
 
  tid_t trunc(const object_t& oid, ceph_object_layout ol,
 
686
  tid_t trunc(const object_t& oid, const object_locator_t& oloc,
607
687
              const SnapContext& snapc,
608
688
              utime_t mtime, int flags,
609
689
              uint64_t trunc_size, __u32 trunc_seq,
613
693
    ops[0].op.extent.offset = trunc_size;
614
694
    ops[0].op.extent.truncate_size = trunc_size;
615
695
    ops[0].op.extent.truncate_seq = trunc_seq;
616
 
    Op *o = new Op(oid, ol, ops, flags, onack, oncommit);
 
696
    Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_WRITE, onack, oncommit);
617
697
    o->mtime = mtime;
618
698
    o->snapc = snapc;
619
699
    return op_submit(o);
620
700
  }
621
 
  tid_t zero(const object_t& oid, ceph_object_layout ol, 
 
701
  tid_t zero(const object_t& oid, const object_locator_t& oloc, 
622
702
             uint64_t off, uint64_t len, const SnapContext& snapc, utime_t mtime, int flags,
623
703
             Context *onack, Context *oncommit) {
624
704
    vector<OSDOp> ops(1);
625
705
    ops[0].op.op = CEPH_OSD_OP_ZERO;
626
706
    ops[0].op.extent.offset = off;
627
707
    ops[0].op.extent.length = len;
628
 
    Op *o = new Op(oid, ol, ops, flags, onack, oncommit);
 
708
    Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_WRITE, onack, oncommit);
629
709
    o->mtime = mtime;
630
710
    o->snapc = snapc;
631
711
    return op_submit(o);
632
712
  }
633
 
  tid_t rollback_object(const object_t& oid, ceph_object_layout ol,
 
713
  tid_t rollback_object(const object_t& oid, const object_locator_t& oloc,
634
714
                 const SnapContext& snapc, snapid_t snapid,
635
715
                 utime_t mtime, Context *onack, Context *oncommit) {
636
716
    vector<OSDOp> ops(1);
637
717
    ops[0].op.op = CEPH_OSD_OP_ROLLBACK;
638
718
    ops[0].op.snap.snapid = snapid;
639
 
    Op *o = new Op(oid, ol, ops, 0, onack, oncommit);
 
719
    Op *o = new Op(oid, oloc, ops, CEPH_OSD_FLAG_WRITE, onack, oncommit);
640
720
    o->mtime = mtime;
641
721
    o->snapc = snapc;
642
722
    return op_submit(o);
643
723
  }
644
 
  tid_t create(const object_t& oid, ceph_object_layout ol, 
 
724
  tid_t create(const object_t& oid, const object_locator_t& oloc, 
645
725
             const SnapContext& snapc, utime_t mtime,
646
726
             int global_flags, int create_flags,
647
727
             Context *onack, Context *oncommit) {
648
728
    vector<OSDOp> ops(1);
649
729
    ops[0].op.op = CEPH_OSD_OP_CREATE;
650
730
    ops[0].op.flags = create_flags;
651
 
    Op *o = new Op(oid, ol, ops, global_flags, onack, oncommit);
 
731
    Op *o = new Op(oid, oloc, ops, global_flags | CEPH_OSD_FLAG_WRITE, onack, oncommit);
652
732
    o->mtime = mtime;
653
733
    o->snapc = snapc;
654
734
    return op_submit(o);
655
735
  }
656
 
  tid_t remove(const object_t& oid, ceph_object_layout ol, 
 
736
  tid_t remove(const object_t& oid, const object_locator_t& oloc, 
657
737
               const SnapContext& snapc, utime_t mtime, int flags,
658
738
               Context *onack, Context *oncommit) {
659
739
    vector<OSDOp> ops(1);
660
740
    ops[0].op.op = CEPH_OSD_OP_DELETE;
661
 
    Op *o = new Op(oid, ol, ops, flags, onack, oncommit);
 
741
    Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_WRITE, onack, oncommit);
662
742
    o->mtime = mtime;
663
743
    o->snapc = snapc;
664
744
    return op_submit(o);
665
745
  }
666
746
 
667
 
  tid_t lock(const object_t& oid, ceph_object_layout ol, int op, int flags,
 
747
  tid_t lock(const object_t& oid, const object_locator_t& oloc, int op, int flags,
668
748
             Context *onack, Context *oncommit) {
669
749
    SnapContext snapc;  // no snapc for lock ops
670
750
    vector<OSDOp> ops(1);
671
751
    ops[0].op.op = op;
672
 
    Op *o = new Op(oid, ol, ops, flags, onack, oncommit);
 
752
    Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_WRITE, onack, oncommit);
673
753
    o->snapc = snapc;
674
754
    return op_submit(o);
675
755
  }
676
 
  tid_t setxattr(const object_t& oid, ceph_object_layout ol,
 
756
  tid_t setxattr(const object_t& oid, const object_locator_t& oloc,
677
757
              const char *name, const SnapContext& snapc, const bufferlist &bl,
678
758
              utime_t mtime, int flags,
679
759
              Context *onack, Context *oncommit) {
683
763
    ops[0].op.xattr.value_len = bl.length();
684
764
    if (name)
685
765
      ops[0].data.append(name);
686
 
   ops[0].data.append(bl);
687
 
    Op *o = new Op(oid, ol, ops, flags, onack, oncommit);
688
 
    o->mtime = mtime;
689
 
    o->snapc = snapc;
690
 
    return op_submit(o);
691
 
  }
692
 
 
 
766
    ops[0].data.append(bl);
 
767
    Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_WRITE, onack, oncommit);
 
768
    o->mtime = mtime;
 
769
    o->snapc = snapc;
 
770
    return op_submit(o);
 
771
  }
 
772
  tid_t removexattr(const object_t& oid, const object_locator_t& oloc,
 
773
              const char *name, const SnapContext& snapc,
 
774
              utime_t mtime, int flags,
 
775
              Context *onack, Context *oncommit) {
 
776
    vector<OSDOp> ops(1);
 
777
    ops[0].op.op = CEPH_OSD_OP_RMXATTR;
 
778
    ops[0].op.xattr.name_len = (name ? strlen(name) : 0);
 
779
    ops[0].op.xattr.value_len = 0;
 
780
    if (name)
 
781
      ops[0].data.append(name);
 
782
    Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_WRITE, onack, oncommit);
 
783
    o->mtime = mtime;
 
784
    o->snapc = snapc;
 
785
    return op_submit(o);
 
786
  }
693
787
 
694
788
  void list_objects(ListContext *p, Context *onfinish);
695
789
 
753
847
  void sg_read_trunc(vector<ObjectExtent>& extents, snapid_t snap, bufferlist *bl, int flags,
754
848
                uint64_t trunc_size, __u32 trunc_seq, Context *onfinish) {
755
849
    if (extents.size() == 1) {
756
 
      read_trunc(extents[0].oid, extents[0].layout, extents[0].offset, extents[0].length,
 
850
      read_trunc(extents[0].oid, extents[0].oloc, extents[0].offset, extents[0].length,
757
851
           snap, bl, flags, trunc_size, trunc_seq, onfinish);
758
852
    } else {
759
853
      C_Gather *g = new C_Gather;
760
854
      vector<bufferlist> resultbl(extents.size());
761
855
      int i=0;
762
856
      for (vector<ObjectExtent>::iterator p = extents.begin(); p != extents.end(); p++) {
763
 
        read_trunc(p->oid, p->layout, p->offset, p->length,
 
857
        read_trunc(p->oid, p->oloc, p->offset, p->length,
764
858
             snap, &resultbl[i++], flags, trunc_size, trunc_seq, g->new_sub());
765
859
      }
766
860
      g->set_finisher(new C_SGRead(this, extents, resultbl, bl, onfinish));
775
869
                int flags, uint64_t trunc_size, __u32 trunc_seq,
776
870
                Context *onack, Context *oncommit) {
777
871
    if (extents.size() == 1) {
778
 
      write_trunc(extents[0].oid, extents[0].layout, extents[0].offset, extents[0].length,
 
872
      write_trunc(extents[0].oid, extents[0].oloc, extents[0].offset, extents[0].length,
779
873
            snapc, bl, mtime, flags, trunc_size, trunc_seq, onack, oncommit);
780
874
    } else {
781
875
      C_Gather *gack = 0, *gcom = 0;
790
884
             bit++)
791
885
          bl.copy(bit->first, bit->second, cur);
792
886
        assert(cur.length() == p->length);
793
 
        write_trunc(p->oid, p->layout, p->offset, p->length, 
 
887
        write_trunc(p->oid, p->oloc, p->offset, p->length, 
794
888
              snapc, cur, mtime, flags, trunc_size, trunc_seq,
795
889
              gack ? gack->new_sub():0,
796
890
              gcom ? gcom->new_sub():0);
806
900
  void ms_handle_connect(Connection *con);
807
901
  void ms_handle_reset(Connection *con);
808
902
  void ms_handle_remote_reset(Connection *con);
809
 
 
810
903
};
811
904
 
812
905
#endif