411
426
void resend_mon_ops();
429
* handle a budget for in-flight ops
430
* budget is taken whenever an op goes into the op_osd map
431
* and returned whenever an op is removed from the map
432
* If throttle_op needs to throttle it will unlock client_lock.
434
int calc_op_budget(Op *op);
435
void throttle_op(Op *op, int op_size=0);
436
void take_op_budget(Op *op) {
437
int op_budget = calc_op_budget(op);
438
if (keep_balanced_budget)
439
throttle_op(op, op_budget);
441
op_throttler.take(op_budget);
443
void put_op_budget(Op *op) {
444
int op_budget = calc_op_budget(op);
445
op_throttler.put(op_budget);
447
Throttle op_throttler;
414
Objecter(Messenger *m, MonClient *mc, OSDMap *om, Mutex& l) :
450
Objecter(Messenger *m, MonClient *mc, OSDMap *om, Mutex& l, SafeTimer& t) :
415
451
messenger(m), monc(mc), osdmap(om),
416
452
last_tid(0), client_inc(-1),
417
453
num_unacked(0), num_uncommitted(0),
454
keep_balanced_budget(false), honor_osdmap_full(true),
418
455
last_seen_osdmap_version(0),
419
456
last_seen_pgmap_version(0),
420
client_lock(l), timer(l)
457
client_lock(l), timer(t),
458
op_throttler(g_conf.objecter_inflight_op_bytes)
466
* Tell the objecter to throttle outgoing ops according to its
467
* budget (in g_conf). If you do this, ops can block, in
468
* which case it will unlock client_lock and sleep until
469
* incoming messages reduce the used budget low enough for
470
* the ops to continue going; then it will lock client_lock again.
472
void set_balanced_budget() { keep_balanced_budget = true; }
473
void unset_balanced_budget() { keep_balanced_budget = false; }
475
void set_honor_osdmap_full() { honor_osdmap_full = true; }
476
void unset_honor_osdmap_full() { honor_osdmap_full = false; }
429
480
void dispatch(Message *m);
430
481
void handle_osd_op_reply(class MOSDOpReply *m);
431
482
void handle_osd_map(class MOSDMap *m);
483
void wait_for_osd_map();
452
504
// mid-level helpers
453
tid_t mutate(const object_t& oid, ceph_object_layout ol,
505
tid_t mutate(const object_t& oid, const object_locator_t& oloc,
454
506
ObjectOperation& op,
455
507
const SnapContext& snapc, utime_t mtime, int flags,
456
508
Context *onack, Context *oncommit) {
457
Op *o = new Op(oid, ol, op.ops, flags, onack, oncommit);
509
Op *o = new Op(oid, oloc, op.ops, flags | CEPH_OSD_FLAG_WRITE, onack, oncommit);
458
510
o->priority = op.priority;
459
511
o->mtime = mtime;
460
512
o->snapc = snapc;
461
513
return op_submit(o);
463
tid_t read(const object_t& oid, ceph_object_layout ol,
515
tid_t read(const object_t& oid, const object_locator_t& oloc,
464
516
ObjectOperation& op,
465
517
snapid_t snapid, bufferlist *pbl, int flags,
466
518
Context *onack) {
467
Op *o = new Op(oid, ol, op.ops, flags, onack, NULL);
519
Op *o = new Op(oid, oloc, op.ops, flags | CEPH_OSD_FLAG_READ, onack, NULL);
468
520
o->priority = op.priority;
469
521
o->snapid = snapid;
474
526
// high-level helpers
475
tid_t stat(const object_t& oid, ceph_object_layout ol, snapid_t snap,
527
tid_t stat(const object_t& oid, const object_locator_t& oloc, snapid_t snap,
476
528
uint64_t *psize, utime_t *pmtime, int flags,
477
529
Context *onfinish) {
478
530
vector<OSDOp> ops(1);
479
531
ops[0].op.op = CEPH_OSD_OP_STAT;
480
532
C_Stat *fin = new C_Stat(psize, pmtime, onfinish);
481
Op *o = new Op(oid, ol, ops, flags, fin, 0);
533
Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_READ, fin, 0);
482
534
o->snapid = snap;
483
535
o->outbl = &fin->bl;
484
536
return op_submit(o);
487
tid_t read(const object_t& oid, ceph_object_layout ol,
539
tid_t read(const object_t& oid, const object_locator_t& oloc,
488
540
uint64_t off, uint64_t len, snapid_t snap, bufferlist *pbl, int flags,
489
541
Context *onfinish) {
490
542
vector<OSDOp> ops(1);
493
545
ops[0].op.extent.length = len;
494
546
ops[0].op.extent.truncate_size = 0;
495
547
ops[0].op.extent.truncate_seq = 0;
496
Op *o = new Op(oid, ol, ops, flags, onfinish, 0);
548
Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_READ, onfinish, 0);
497
549
o->snapid = snap;
499
551
return op_submit(o);
501
tid_t read_trunc(const object_t& oid, ceph_object_layout ol,
553
tid_t read_trunc(const object_t& oid, const object_locator_t& oloc,
502
554
uint64_t off, uint64_t len, snapid_t snap, bufferlist *pbl, int flags,
503
555
uint64_t trunc_size, __u32 trunc_seq,
504
556
Context *onfinish) {
508
560
ops[0].op.extent.length = len;
509
561
ops[0].op.extent.truncate_size = trunc_size;
510
562
ops[0].op.extent.truncate_seq = trunc_seq;
511
Op *o = new Op(oid, ol, ops, flags, onfinish, 0);
563
Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_READ, onfinish, 0);
568
tid_t mapext(const object_t& oid, const object_locator_t& oloc,
569
uint64_t off, uint64_t len, snapid_t snap, bufferlist *pbl, int flags,
571
vector<OSDOp> ops(1);
572
ops[0].op.op = CEPH_OSD_OP_MAPEXT;
573
ops[0].op.extent.offset = off;
574
ops[0].op.extent.length = len;
575
ops[0].op.extent.truncate_size = 0;
576
ops[0].op.extent.truncate_seq = 0;
577
Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_READ, onfinish, 0);
582
tid_t sparse_read(const object_t& oid, const object_locator_t& oloc,
583
uint64_t off, uint64_t len, snapid_t snap, bufferlist *pbl, int flags,
585
vector<OSDOp> ops(1);
586
ops[0].op.op = CEPH_OSD_OP_SPARSE_READ;
587
ops[0].op.extent.offset = off;
588
ops[0].op.extent.length = len;
589
ops[0].op.extent.truncate_size = 0;
590
ops[0].op.extent.truncate_seq = 0;
591
Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_READ, onfinish, 0);
512
592
o->snapid = snap;
514
594
return op_submit(o);
517
tid_t getxattr(const object_t& oid, ceph_object_layout ol,
597
tid_t getxattr(const object_t& oid, const object_locator_t& oloc,
518
598
const char *name, snapid_t snap, bufferlist *pbl, int flags,
519
599
Context *onfinish) {
520
600
vector<OSDOp> ops(1);
523
603
ops[0].op.xattr.value_len = 0;
525
605
ops[0].data.append(name);
526
Op *o = new Op(oid, ol, ops, flags, onfinish, 0);
606
Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_READ, onfinish, 0);
527
607
o->snapid = snap;
529
609
return op_submit(o);
532
tid_t getxattrs(const object_t& oid, ceph_object_layout ol, snapid_t snap,
612
tid_t getxattrs(const object_t& oid, const object_locator_t& oloc, snapid_t snap,
533
613
map<string,bufferlist>& attrset,
534
614
int flags, Context *onfinish) {
535
615
vector<OSDOp> ops(1);
536
616
ops[0].op.op = CEPH_OSD_OP_GETXATTRS;
537
617
C_GetAttrs *fin = new C_GetAttrs(attrset, onfinish);
538
Op *o = new Op(oid, ol, ops, flags, fin, 0);
618
Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_READ, fin, 0);
539
619
o->snapid = snap;
540
620
o->outbl = &fin->bl;
541
621
return op_submit(o);
544
tid_t read_full(const object_t& oid, ceph_object_layout ol,
624
tid_t read_full(const object_t& oid, const object_locator_t& oloc,
545
625
snapid_t snap, bufferlist *pbl, int flags,
546
626
Context *onfinish) {
547
return read(oid, ol, 0, 0, snap, pbl, flags, onfinish);
627
return read(oid, oloc, 0, 0, snap, pbl, flags | CEPH_OSD_FLAG_READ, onfinish);
551
tid_t _modify(const object_t& oid, ceph_object_layout ol,
631
tid_t _modify(const object_t& oid, const object_locator_t& oloc,
552
632
vector<OSDOp>& ops, utime_t mtime,
553
633
const SnapContext& snapc, int flags,
554
634
Context *onack, Context *oncommit) {
555
Op *o = new Op(oid, ol, ops, flags, onack, oncommit);
635
Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_WRITE, onack, oncommit);
556
636
o->mtime = mtime;
557
637
o->snapc = snapc;
558
638
return op_submit(o);
560
tid_t write(const object_t& oid, ceph_object_layout ol,
640
tid_t write(const object_t& oid, const object_locator_t& oloc,
561
641
uint64_t off, uint64_t len, const SnapContext& snapc, const bufferlist &bl,
562
642
utime_t mtime, int flags,
563
643
Context *onack, Context *oncommit) {
568
648
ops[0].op.extent.truncate_size = 0;
569
649
ops[0].op.extent.truncate_seq = 0;
570
650
ops[0].data = bl;
571
Op *o = new Op(oid, ol, ops, flags, onack, oncommit);
651
Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_WRITE, onack, oncommit);
572
652
o->mtime = mtime;
573
653
o->snapc = snapc;
574
654
return op_submit(o);
576
tid_t write_trunc(const object_t& oid, ceph_object_layout ol,
656
tid_t write_trunc(const object_t& oid, const object_locator_t& oloc,
577
657
uint64_t off, uint64_t len, const SnapContext& snapc, const bufferlist &bl,
578
658
utime_t mtime, int flags,
579
659
uint64_t trunc_size, __u32 trunc_seq,
585
665
ops[0].op.extent.truncate_size = trunc_size;
586
666
ops[0].op.extent.truncate_seq = trunc_seq;
587
667
ops[0].data = bl;
588
Op *o = new Op(oid, ol, ops, flags, onack, oncommit);
668
Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_WRITE, onack, oncommit);
589
669
o->mtime = mtime;
590
670
o->snapc = snapc;
591
671
return op_submit(o);
593
tid_t write_full(const object_t& oid, ceph_object_layout ol,
673
tid_t write_full(const object_t& oid, const object_locator_t& oloc,
594
674
const SnapContext& snapc, const bufferlist &bl, utime_t mtime, int flags,
595
675
Context *onack, Context *oncommit) {
596
676
vector<OSDOp> ops(1);
598
678
ops[0].op.extent.offset = 0;
599
679
ops[0].op.extent.length = bl.length();
600
680
ops[0].data = bl;
601
Op *o = new Op(oid, ol, ops, flags, onack, oncommit);
681
Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_WRITE, onack, oncommit);
602
682
o->mtime = mtime;
603
683
o->snapc = snapc;
604
684
return op_submit(o);
606
tid_t trunc(const object_t& oid, ceph_object_layout ol,
686
tid_t trunc(const object_t& oid, const object_locator_t& oloc,
607
687
const SnapContext& snapc,
608
688
utime_t mtime, int flags,
609
689
uint64_t trunc_size, __u32 trunc_seq,
613
693
ops[0].op.extent.offset = trunc_size;
614
694
ops[0].op.extent.truncate_size = trunc_size;
615
695
ops[0].op.extent.truncate_seq = trunc_seq;
616
Op *o = new Op(oid, ol, ops, flags, onack, oncommit);
696
Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_WRITE, onack, oncommit);
617
697
o->mtime = mtime;
618
698
o->snapc = snapc;
619
699
return op_submit(o);
621
tid_t zero(const object_t& oid, ceph_object_layout ol,
701
tid_t zero(const object_t& oid, const object_locator_t& oloc,
622
702
uint64_t off, uint64_t len, const SnapContext& snapc, utime_t mtime, int flags,
623
703
Context *onack, Context *oncommit) {
624
704
vector<OSDOp> ops(1);
625
705
ops[0].op.op = CEPH_OSD_OP_ZERO;
626
706
ops[0].op.extent.offset = off;
627
707
ops[0].op.extent.length = len;
628
Op *o = new Op(oid, ol, ops, flags, onack, oncommit);
708
Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_WRITE, onack, oncommit);
629
709
o->mtime = mtime;
630
710
o->snapc = snapc;
631
711
return op_submit(o);
633
tid_t rollback_object(const object_t& oid, ceph_object_layout ol,
713
tid_t rollback_object(const object_t& oid, const object_locator_t& oloc,
634
714
const SnapContext& snapc, snapid_t snapid,
635
715
utime_t mtime, Context *onack, Context *oncommit) {
636
716
vector<OSDOp> ops(1);
637
717
ops[0].op.op = CEPH_OSD_OP_ROLLBACK;
638
718
ops[0].op.snap.snapid = snapid;
639
Op *o = new Op(oid, ol, ops, 0, onack, oncommit);
719
Op *o = new Op(oid, oloc, ops, CEPH_OSD_FLAG_WRITE, onack, oncommit);
640
720
o->mtime = mtime;
641
721
o->snapc = snapc;
642
722
return op_submit(o);
644
tid_t create(const object_t& oid, ceph_object_layout ol,
724
tid_t create(const object_t& oid, const object_locator_t& oloc,
645
725
const SnapContext& snapc, utime_t mtime,
646
726
int global_flags, int create_flags,
647
727
Context *onack, Context *oncommit) {
648
728
vector<OSDOp> ops(1);
649
729
ops[0].op.op = CEPH_OSD_OP_CREATE;
650
730
ops[0].op.flags = create_flags;
651
Op *o = new Op(oid, ol, ops, global_flags, onack, oncommit);
731
Op *o = new Op(oid, oloc, ops, global_flags | CEPH_OSD_FLAG_WRITE, onack, oncommit);
652
732
o->mtime = mtime;
653
733
o->snapc = snapc;
654
734
return op_submit(o);
656
tid_t remove(const object_t& oid, ceph_object_layout ol,
736
tid_t remove(const object_t& oid, const object_locator_t& oloc,
657
737
const SnapContext& snapc, utime_t mtime, int flags,
658
738
Context *onack, Context *oncommit) {
659
739
vector<OSDOp> ops(1);
660
740
ops[0].op.op = CEPH_OSD_OP_DELETE;
661
Op *o = new Op(oid, ol, ops, flags, onack, oncommit);
741
Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_WRITE, onack, oncommit);
662
742
o->mtime = mtime;
663
743
o->snapc = snapc;
664
744
return op_submit(o);
667
tid_t lock(const object_t& oid, ceph_object_layout ol, int op, int flags,
747
tid_t lock(const object_t& oid, const object_locator_t& oloc, int op, int flags,
668
748
Context *onack, Context *oncommit) {
669
749
SnapContext snapc; // no snapc for lock ops
670
750
vector<OSDOp> ops(1);
671
751
ops[0].op.op = op;
672
Op *o = new Op(oid, ol, ops, flags, onack, oncommit);
752
Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_WRITE, onack, oncommit);
673
753
o->snapc = snapc;
674
754
return op_submit(o);
676
tid_t setxattr(const object_t& oid, ceph_object_layout ol,
756
tid_t setxattr(const object_t& oid, const object_locator_t& oloc,
677
757
const char *name, const SnapContext& snapc, const bufferlist &bl,
678
758
utime_t mtime, int flags,
679
759
Context *onack, Context *oncommit) {
683
763
ops[0].op.xattr.value_len = bl.length();
685
765
ops[0].data.append(name);
686
ops[0].data.append(bl);
687
Op *o = new Op(oid, ol, ops, flags, onack, oncommit);
766
ops[0].data.append(bl);
767
Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_WRITE, onack, oncommit);
772
tid_t removexattr(const object_t& oid, const object_locator_t& oloc,
773
const char *name, const SnapContext& snapc,
774
utime_t mtime, int flags,
775
Context *onack, Context *oncommit) {
776
vector<OSDOp> ops(1);
777
ops[0].op.op = CEPH_OSD_OP_RMXATTR;
778
ops[0].op.xattr.name_len = (name ? strlen(name) : 0);
779
ops[0].op.xattr.value_len = 0;
781
ops[0].data.append(name);
782
Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_WRITE, onack, oncommit);
694
788
void list_objects(ListContext *p, Context *onfinish);
753
847
void sg_read_trunc(vector<ObjectExtent>& extents, snapid_t snap, bufferlist *bl, int flags,
754
848
uint64_t trunc_size, __u32 trunc_seq, Context *onfinish) {
755
849
if (extents.size() == 1) {
756
read_trunc(extents[0].oid, extents[0].layout, extents[0].offset, extents[0].length,
850
read_trunc(extents[0].oid, extents[0].oloc, extents[0].offset, extents[0].length,
757
851
snap, bl, flags, trunc_size, trunc_seq, onfinish);
759
853
C_Gather *g = new C_Gather;
760
854
vector<bufferlist> resultbl(extents.size());
762
856
for (vector<ObjectExtent>::iterator p = extents.begin(); p != extents.end(); p++) {
763
read_trunc(p->oid, p->layout, p->offset, p->length,
857
read_trunc(p->oid, p->oloc, p->offset, p->length,
764
858
snap, &resultbl[i++], flags, trunc_size, trunc_seq, g->new_sub());
766
860
g->set_finisher(new C_SGRead(this, extents, resultbl, bl, onfinish));