~ubuntu-branches/ubuntu/precise/ceph/precise

« back to all changes in this revision

Viewing changes to src/client/Client.cc

  • Committer: Bazaar Package Importer
  • Author(s): Clint Byrum, Clint Byrum, Micah Gersten
  • Date: 2011-02-12 22:50:26 UTC
  • mfrom: (1.1.1 upstream)
  • Revision ID: james.westby@ubuntu.com-20110212225026-yyyw4tk0msgql3ul
Tags: 0.24.2-0ubuntu1
[ Clint Byrum <clint@ubuntu.com> ]
* New upstream release. (LP: #658670, LP: #684011)
* debian/patches/fix-mkcephfs.patch: dropped (applied upstream)
* Removed .la files from libceph1-dev, libcrush1-dev and 
  librados1-dev (per Debian policy v3.9.1 10.2).
* debian/control: adding pkg-config as a build dependency
* debian/control: depend on libcrypto++-dev instead of libssl-dev
* debian/watch: added watch file

[ Micah Gersten <micahg@ubuntu.com> ]
* debian/control: add Homepage

Show diffs side-by-side

added added

removed removed

Lines of Context:
149
149
  // osd interfaces
150
150
  osdmap = new OSDMap;     // initially blank.. see mount()
151
151
  mdsmap = new MDSMap;
152
 
  objecter = new Objecter(messenger, monclient, osdmap, client_lock);
 
152
  objecter = new Objecter(messenger, monclient, osdmap, client_lock, timer);
153
153
  objecter->set_client_incarnation(0);  // client always 0, for now.
154
154
  objectcacher = new ObjectCacher(objecter, client_lock, 
155
155
                                  0,                            // all ack callback
256
256
void Client::init() 
257
257
{
258
258
  Mutex::Locker lock(client_lock);
 
259
  timer.init();
259
260
 
260
261
  objectcacher->start();
261
262
 
298
299
  objectcacher->stop();  // outside of client_lock! this does a join.
299
300
 
300
301
  client_lock.Lock();
 
302
  timer.shutdown();
301
303
  objecter->shutdown();
302
304
  client_lock.Unlock();
303
305
  monclient->shutdown();
357
359
  dout(25) << "truncate_seq: mds " << truncate_seq <<  " local "
358
360
           << in->truncate_seq << " time_warp_seq: mds " << time_warp_seq
359
361
           << " local " << in->time_warp_seq << dendl;
 
362
  uint64_t prior_size = in->size;
 
363
 
360
364
  if (truncate_seq > in->truncate_seq ||
361
365
      (truncate_seq == in->truncate_seq && size > in->size)) {
362
366
    dout(10) << "size " << in->size << " -> " << size << dendl;
371
375
  }
372
376
  if (truncate_seq >= in->truncate_seq &&
373
377
      in->truncate_size != truncate_size) {
374
 
    dout(10) << "truncate_size " << in->truncate_size << " -> "
375
 
             << truncate_size << dendl;
376
 
    in->truncate_size = truncate_size;
377
 
    in->oset.truncate_size = truncate_size;
 
378
    if (in->is_file()) {
 
379
      dout(10) << "truncate_size " << in->truncate_size << " -> "
 
380
               << truncate_size << dendl;
 
381
      in->truncate_size = truncate_size;
 
382
      in->oset.truncate_size = truncate_size;
 
383
      if (g_conf.client_oc && prior_size) { //do actual truncation
 
384
        vector<ObjectExtent> ls;
 
385
        filer->file_to_extents(in->ino, &in->layout,
 
386
                               truncate_size, prior_size - truncate_size,
 
387
                               ls);
 
388
        objectcacher->truncate_set(&in->oset, ls);
 
389
      }
 
390
    } else {
 
391
      dout(0) << "Hmmm, truncate_seq && truncate_size changed on non-file inode!" << dendl;
 
392
    }
378
393
  }
379
394
  
380
395
  // be careful with size, mtime, atime
421
436
Inode * Client::add_update_inode(InodeStat *st, utime_t from, int mds)
422
437
{
423
438
  Inode *in;
424
 
  if (inode_map.count(st->vino))
 
439
  if (inode_map.count(st->vino)) {
425
440
    in = inode_map[st->vino];
426
 
  else {
 
441
    dout(12) << "add_update_inode had " << *in << " caps " << ccap_string(st->cap.caps) << dendl;
 
442
  } else {
427
443
    in = new Inode(st->vino, &st->layout);
428
444
    inode_map[st->vino] = in;
429
445
    bool new_root = false;
442
458
    if (new_root) dout(10) << "setting this inode as root! " << *in << "; ino: " << root->ino << "; snapid" << root->snapid << dendl;
443
459
    if (in->is_symlink())
444
460
      in->symlink = st->symlink;
 
461
 
 
462
    dout(12) << "add_update_inode adding " << *in << " caps " << ccap_string(st->cap.caps) << dendl;
445
463
  }
446
464
 
447
 
  dout(12) << "add_update_inode " << *in << " caps " << ccap_string(st->cap.caps) << dendl;
448
465
  if (!st->cap.caps)
449
466
    return in;   // as with readdir returning indoes in different snaprealms (no caps!)
450
467
 
451
 
  int implemented = 0;
452
 
  int issued = in->caps_issued(&implemented) | in->caps_dirty();
453
 
  issued |= implemented;
 
468
  // only update inode if mds info is strictly newer, or it is the same and projected (odd).
 
469
  if (st->version == 0 ||
 
470
      (in->version & ~1) < st->version) {
 
471
 
 
472
    int implemented = 0;
 
473
    int issued = in->caps_issued(&implemented) | in->caps_dirty();
 
474
    issued |= implemented;
 
475
 
 
476
    in->version = st->version;
 
477
 
 
478
    if ((issued & CEPH_CAP_AUTH_EXCL) == 0) {
 
479
      in->mode = st->mode;
 
480
      in->uid = st->uid;
 
481
      in->gid = st->gid;
 
482
    }
 
483
 
 
484
    if ((issued & CEPH_CAP_LINK_EXCL) == 0) {
 
485
      in->nlink = st->nlink;
 
486
    }
 
487
 
 
488
    if ((issued & CEPH_CAP_XATTR_EXCL) == 0 &&
 
489
        st->xattrbl.length() &&
 
490
        st->xattr_version > in->xattr_version) {
 
491
      bufferlist::iterator p = st->xattrbl.begin();
 
492
      ::decode(in->xattrs, p);
 
493
      in->xattr_version = st->xattr_version;
 
494
    }
 
495
 
 
496
    in->dirstat = st->dirstat;
 
497
    in->rstat = st->rstat;
 
498
 
 
499
    if (in->is_dir()) {
 
500
      in->dir_layout = st->dir_layout;
 
501
      dout(20) << " dir hash is " << (int)in->dir_layout.dl_dir_hash << dendl;
 
502
    }
 
503
 
 
504
    in->layout = st->layout;
 
505
    in->ctime = st->ctime;
 
506
    in->max_size = st->max_size;  // right?
 
507
  
 
508
    update_inode_file_bits(in, st->truncate_seq, st->truncate_size, st->size,
 
509
                           st->time_warp_seq, st->ctime, st->mtime, st->atime,
 
510
                           issued);
 
511
 
 
512
    if (in->is_dir() &&
 
513
        (st->cap.caps & CEPH_CAP_FILE_SHARED) &&
 
514
        (issued & CEPH_CAP_FILE_EXCL) == 0 &&
 
515
        in->dirstat.nfiles == 0 &&
 
516
        in->dirstat.nsubdirs == 0) {
 
517
      dout(10) << " marking I_COMPLETE on empty dir " << *in << dendl;
 
518
      in->flags |= I_COMPLETE;
 
519
      if (in->dir) {
 
520
        dout(0) << "WARNING: dir is open on empty dir " << in->ino << " with "
 
521
                << in->dir->dentry_map.size() << " entries" << dendl;
 
522
        in->dir->max_offset = 2;
 
523
        
 
524
        // FIXME: tear down dir?
 
525
      }
 
526
    }  
 
527
  }
 
528
 
 
529
  // move me if/when version reflects fragtree changes.
 
530
  in->dirfragtree = st->dirfragtree;
454
531
 
455
532
  if (in->snapid == CEPH_NOSNAP)
456
533
    add_update_cap(in, mds, st->cap.cap_id, st->cap.caps, st->cap.seq, st->cap.mseq, inodeno_t(st->cap.realm), st->cap.flags);
457
534
  else
458
535
    in->snap_caps |= st->cap.caps;
459
536
 
460
 
  in->dirfragtree = st->dirfragtree;  // FIXME look at the mask!
461
 
 
462
 
  if ((issued & CEPH_CAP_AUTH_EXCL) == 0) {
463
 
    in->mode = st->mode;
464
 
    in->uid = st->uid;
465
 
    in->gid = st->gid;
466
 
  }
467
 
 
468
 
  if ((issued & CEPH_CAP_LINK_EXCL) == 0) {
469
 
    in->nlink = st->nlink;
470
 
  }
471
 
 
472
 
  if ((issued & CEPH_CAP_XATTR_EXCL) == 0 &&
473
 
      st->xattrbl.length() &&
474
 
      st->xattr_version > in->xattr_version) {
475
 
    bufferlist::iterator p = st->xattrbl.begin();
476
 
    ::decode(in->xattrs, p);
477
 
    in->xattr_version = st->xattr_version;
478
 
  }
479
 
 
480
 
  in->dirstat = st->dirstat;
481
 
  in->rstat = st->rstat;
482
 
 
483
 
  in->layout = st->layout;
484
 
  in->ctime = st->ctime;
485
 
  in->max_size = st->max_size;  // right?
486
 
  
487
 
  update_inode_file_bits(in, st->truncate_seq, st->truncate_size, st->size,
488
 
                         st->time_warp_seq, st->ctime, st->mtime, st->atime,
489
 
                         issued);
490
 
  
491
 
  if (in->is_dir() &&
492
 
      (st->cap.caps & CEPH_CAP_FILE_SHARED) &&
493
 
      in->dirstat.nfiles == 0 &&
494
 
      in->dirstat.nsubdirs == 0) {
495
 
    dout(10) << " marking I_COMPLETE on empty dir " << *in << dendl;
496
 
    in->flags |= I_COMPLETE;
497
 
  }
498
 
 
499
537
  return in;
500
538
}
501
539
 
503
541
/*
504
542
 * insert_dentry_inode - insert + link a single dentry + inode into the metadata cache.
505
543
 */
506
 
void Client::insert_dentry_inode(Dir *dir, const string& dname, LeaseStat *dlease, 
507
 
                                 Inode *in, utime_t from, int mds)
 
544
Dentry *Client::insert_dentry_inode(Dir *dir, const string& dname, LeaseStat *dlease, 
 
545
                                    Inode *in, utime_t from, int mds, bool set_offset)
508
546
{
509
547
  utime_t dttl = from;
510
548
  dttl += (float)dlease->duration_ms / 1000.0;
514
552
    dn = dir->dentries[dname];
515
553
  
516
554
  dout(12) << "insert_dentry_inode " << dname << " vino " << in->vino()
517
 
           << " in dir " << dir->parent_inode->ino
 
555
           << " in dir " << dir->parent_inode->vino()
518
556
           << dendl;
519
557
  
520
558
  if (dn && dn->inode) {
545
583
               << " unlinked, linking" << dendl;
546
584
      dn = link(dir, dname, in, dn);
547
585
    }
 
586
    if (set_offset) {
 
587
      dout(15) << " setting dn offset to " << dir->max_offset << dendl;
 
588
      dn->offset = dir->max_offset++;
 
589
    }
548
590
  }
549
591
 
550
592
  assert(dn && dn->inode);
560
602
    }
561
603
  }
562
604
  dn->cap_shared_gen = dir->parent_inode->shared_gen;
 
605
  return dn;
563
606
}
564
607
 
565
608
 
615
658
    return NULL;
616
659
  }
617
660
 
 
661
  Connection *con = request->reply->get_connection();
 
662
  int features = con->get_features();
 
663
  dout(10) << " features 0x" << hex << features << dec << dendl;
 
664
 
618
665
  // snap trace
619
666
  if (reply->snapbl.length())
620
667
    update_snap_trace(reply->snapbl);
631
678
  InodeStat ist;
632
679
 
633
680
  if (reply->head.is_dentry) {
634
 
    dirst.decode(p);
 
681
    dirst.decode(p, features);
635
682
    dst.decode(p);
636
683
    ::decode(dname, p);
637
684
    ::decode(dlease, p);
639
686
 
640
687
  Inode *in = 0;
641
688
  if (reply->head.is_target) {
642
 
    ist.decode(p);
 
689
    ist.decode(p, features);
643
690
    in = add_update_inode(&ist, from, mds);
644
691
  }
645
692
 
649
696
 
650
697
    if (in) {
651
698
      Dir *dir = diri->open_dir();
652
 
      insert_dentry_inode(dir, dname, &dlease, in, from, mds);
 
699
      insert_dentry_inode(dir, dname, &dlease, in, from, mds, true);
653
700
    } else {
654
701
      Dentry *dn = NULL;
655
702
      if (diri->dir && diri->dir->dentries.count(dname)) {
674
721
 
675
722
    if (in) {
676
723
      Dir *dir = diri->open_dir();
677
 
      insert_dentry_inode(dir, dname, &dlease, in, from, mds);
 
724
      insert_dentry_inode(dir, dname, &dlease, in, from, mds, true);
678
725
    } else {
679
726
      Dentry *dn = NULL;
680
727
      if (diri->dir && diri->dir->dentries.count(dname)) {
686
733
  }
687
734
 
688
735
  // insert readdir results too
 
736
  assert(request->readdir_result.empty());
689
737
 
690
738
  // the rest?
691
 
  p = reply->get_dir_bl().begin();
 
739
  p = reply->get_extra_bl().begin();
692
740
  if (!p.end()) {
 
741
    // snapdir?
 
742
    if (request->head.op == CEPH_MDS_OP_LSSNAP)
 
743
      in = open_snapdir(in);
 
744
 
693
745
    // only open dir if we're actually adding stuff to it!
694
746
    Dir *dir = in->open_dir();
695
747
    assert(dir);
702
754
    ::decode(end, p);
703
755
    ::decode(complete, p);
704
756
    
 
757
    dout(10) << "insert_trace " << numdn << " readdir items, end=" << (int)end
 
758
             << ", offset " << request->readdir_offset << dendl;
 
759
 
 
760
    request->readdir_end = end;
 
761
    request->readdir_num = numdn;
 
762
 
 
763
    map<string,Dentry*>::iterator pd = dir->dentry_map.upper_bound(request->readdir_start);
 
764
 
 
765
    frag_t fg = request->readdir_frag;
 
766
    Inode *diri = in;
 
767
 
705
768
    string dname;
706
769
    LeaseStat dlease;
707
 
    while (numdn) {
 
770
    for (unsigned i=0; i<numdn; i++) {
708
771
      ::decode(dname, p);
709
772
      ::decode(dlease, p);
710
 
      InodeStat ist(p);
 
773
      InodeStat ist(p, features);
711
774
      
712
775
      Inode *in = add_update_inode(&ist, from, mds);
713
 
      insert_dentry_inode(dir, dname, &dlease, in, from, mds);
 
776
      Dentry *dn = insert_dentry_inode(dir, dname, &dlease, in, from, mds, false);
 
777
      dn->offset = DirResult::make_fpos(request->readdir_frag, i + request->readdir_offset);
 
778
 
 
779
      // remove any extra names
 
780
      while (pd != dir->dentry_map.end() && pd->first <= dname) {
 
781
        if (pd->first < dname &&
 
782
            diri->dirfragtree[ceph_str_hash_linux(pd->first.c_str(),
 
783
                                                  pd->first.length())] == fg) {  // do not remove items in earlier frags
 
784
          dout(15) << "insert_trace  unlink '" << pd->first << "'" << dendl;
 
785
          Dentry *dn = pd->second;
 
786
          pd++;
 
787
          unlink(dn, true);
 
788
        } else
 
789
          pd++;
 
790
      }
714
791
      
715
 
      numdn--;
 
792
      // add to cached result list
 
793
      in->get();
 
794
      request->readdir_result.push_back(pair<string,Inode*>(dname, in));
 
795
 
 
796
      dout(15) << "insert_trace  " << hex << dn->offset << dec << ": '" << dname << "' -> " << in->ino << dendl;
716
797
    }
 
798
    request->readdir_last_name = dname;
717
799
    
 
800
    // remove trailing names
 
801
    if (end) {
 
802
      while (pd != dir->dentry_map.end()) {
 
803
        if (diri->dirfragtree[ceph_str_hash_linux(pd->first.c_str(),
 
804
                                                  pd->first.length())] == fg) {
 
805
          dout(15) << "insert_trace  unlink '" << pd->first << "'" << dendl;
 
806
          Dentry *dn = pd->second;
 
807
          pd++;
 
808
          unlink(dn, true);
 
809
        } else 
 
810
          pd++;
 
811
      }
 
812
    }
 
813
 
718
814
    if (dir->is_empty())
719
815
      close_dir(dir);
720
816
  }
731
827
  __u32 hash = 0;
732
828
  bool is_hash = false;
733
829
 
734
 
  Inode *dir_inode = NULL;
 
830
  Inode *in = NULL;
735
831
  InodeCap *cap = NULL;
736
832
 
737
833
  if (req->resend_mds >= 0) {
744
840
  if (g_conf.client_use_random_mds) goto random_mds;
745
841
 
746
842
  if (req->inode) {
747
 
    dir_inode = req->inode;
 
843
    in = req->inode;
 
844
    if (req->path.depth()) {
 
845
      hash = ceph_str_hash(in->dir_layout.dl_dir_hash,
 
846
                           req->path[0].data(),
 
847
                           req->path[0].length());
 
848
      dout(20) << " dir hash is " << (int)in->dir_layout.dl_dir_hash << " on " << req->path[0]
 
849
               << " => " << hash << dendl;
 
850
      is_hash = true;
 
851
 
 
852
    }
748
853
  } else if (req->dentry) {
749
854
    if (req->dentry->inode) {
750
 
      dir_inode = req->dentry->inode;
 
855
      in = req->dentry->inode;
751
856
    } else {
752
 
      dir_inode = req->dentry->dir->parent_inode;
753
 
      hash = ceph_str_hash_linux(req->dentry->name.data(),
754
 
                                 req->dentry->name.length());
 
857
      in = req->dentry->dir->parent_inode;
 
858
      hash = ceph_str_hash(in->dir_layout.dl_dir_hash,
 
859
                           req->dentry->name.data(),
 
860
                           req->dentry->name.length());
 
861
      dout(20) << " dir hash is " << (int)in->dir_layout.dl_dir_hash << " on " << req->dentry->name
 
862
               << " => " << hash << dendl;
755
863
      is_hash = true;
756
864
    }
757
865
  }
 
866
  if (in && in->snapid != CEPH_NOSNAP) {
 
867
    dout(10) << "choose_target_mds " << *in << " is snapped, using nonsnap parent" << dendl;
 
868
    while (in->snapid != CEPH_NOSNAP) {
 
869
      if (in->snapid == CEPH_SNAPDIR)
 
870
        in = in->snapdir_parent;
 
871
      else if (in->dn)
 
872
        in = in->dn->dir->parent_inode;
 
873
      else {
 
874
        dout(10) << "got unlinked inode, can't look at parent" << dendl;
 
875
        break;
 
876
      }
 
877
    }
 
878
    is_hash = false;
 
879
  }
758
880
  
759
 
  dout(20) << "choose_target_mds dir_inode" << dir_inode << " is_hash=" << is_hash
 
881
  dout(20) << "choose_target_mds " << in << " is_hash=" << is_hash
760
882
           << " hash=" << hash << dendl;
761
883
 
762
 
  if (!dir_inode) goto random_mds;
 
884
  if (!in) goto random_mds;
763
885
 
764
 
  if (is_hash && S_ISDIR(dir_inode->mode) && !dir_inode->dirfragtree.empty()) {
765
 
    if (dir_inode->dirfragtree.contains(hash)) {
766
 
      mds = dir_inode->fragmap[dir_inode->dirfragtree[hash].value()];
 
886
  if (is_hash && S_ISDIR(in->mode) && !in->dirfragtree.empty()) {
 
887
    if (in->dirfragtree.contains(hash)) {
 
888
      mds = in->fragmap[in->dirfragtree[hash].value()];
767
889
      dout(10) << "choose_target_mds from dirfragtree hash" << dendl;
768
890
      goto out;
769
891
    }
770
892
  }
771
893
 
772
894
  if (req->auth_is_best())
773
 
    cap = dir_inode->auth_cap;
774
 
  if (!cap && !dir_inode->caps.empty())
775
 
    cap = dir_inode->caps.begin()->second;
 
895
    cap = in->auth_cap;
 
896
  if (!cap && !in->caps.empty())
 
897
    cap = in->caps.begin()->second;
776
898
  if (!cap)
777
899
    goto random_mds;
778
900
  mds = cap->session->mds_num;
796
918
void Client::connect_mds_targets(int mds)
797
919
{
798
920
  //this function shouldn't be called unless we lost a connection
799
 
  assert (mds_sessions.count(mds));
800
 
  MDSSession *s = mds_sessions[mds];
801
 
  if (!s->requests.empty()) {
802
 
    const MDSMap::mds_info_t& info = mdsmap->get_mds_info(mds);
803
 
    for (set<int>::const_iterator q = info.export_targets.begin();
804
 
         q != info.export_targets.end();
805
 
         q++) {
806
 
      if (mds_sessions.count(*q) == 0 && waiting_for_session.count(mds) == 0) {
807
 
        dout(10) << "check_mds_sessions opening mds" << mds
808
 
                 << " export target mds" << *q << dendl;
809
 
        messenger->send_message(new MClientSession(CEPH_SESSION_REQUEST_OPEN),
810
 
                                mdsmap->get_inst(*q));
811
 
        waiting_for_session[*q].size();
812
 
      }
 
921
  dout(10) << "connect_mds_targets for mds" << mds << dendl;
 
922
  assert(mds_sessions.count(mds));
 
923
  const MDSMap::mds_info_t& info = mdsmap->get_mds_info(mds);
 
924
  for (set<int>::const_iterator q = info.export_targets.begin();
 
925
       q != info.export_targets.end();
 
926
       q++) {
 
927
    if (mds_sessions.count(*q) == 0 && waiting_for_session.count(mds) == 0) {
 
928
      dout(10) << "check_mds_sessions opening mds" << mds
 
929
               << " export target mds" << *q << dendl;
 
930
      messenger->send_message(new MClientSession(CEPH_SESSION_REQUEST_OPEN),
 
931
                              mdsmap->get_inst(*q));
 
932
      waiting_for_session[*q].size();
813
933
    }
814
934
  }
815
935
}
932
1052
 
933
1053
  int r = reply->get_result();
934
1054
  if (pdirbl)
935
 
    pdirbl->claim(reply->get_dir_bl());
 
1055
    pdirbl->claim(reply->get_extra_bl());
936
1056
  reply->put();
937
1057
  return r;
938
1058
}
939
1059
 
940
 
int Client::encode_inode_release(Inode *in, MClientRequest *req,
 
1060
int Client::encode_inode_release(Inode *in, MetaRequest *req,
941
1061
                         int mds, int drop,
942
1062
                         int unless, int force)
943
1063
{
969
1089
    rel.wanted = caps->wanted;
970
1090
    rel.dname_len = 0;
971
1091
    rel.dname_seq = 0;
972
 
    req->releases.push_back(MClientRequest::Release(rel,""));
 
1092
    req->cap_releases.push_back(MClientRequest::Release(rel,""));
973
1093
  }
974
1094
  dout(25) << "encode_inode_release exit(in:" << *in << ") released:"
975
1095
           << released << dendl;
976
1096
  return released;
977
1097
}
978
1098
 
979
 
void Client::encode_dentry_release(Dentry *dn, MClientRequest *req,
 
1099
void Client::encode_dentry_release(Dentry *dn, MetaRequest *req,
980
1100
                           int mds, int drop, int unless)
981
1101
{
982
1102
  dout(20) << "encode_dentry_release enter(dn:"
985
1105
                                      mds, drop, unless, 1);
986
1106
  if (released && dn->lease_mds == mds) {
987
1107
    dout(25) << "preemptively releasing dn to mds" << dendl;
988
 
    MClientRequest::Release& rel = req->releases.back();
 
1108
    MClientRequest::Release& rel = req->cap_releases.back();
989
1109
    rel.item.dname_len = dn->name.length();
990
1110
    rel.item.dname_seq = dn->lease_seq;
991
1111
    rel.dname = dn->name;
1001
1121
 * Additionally, if you set any *drop member, you'd better have
1002
1122
 * set the corresponding dentry!
1003
1123
 */
1004
 
void Client::encode_cap_releases(MetaRequest *req, MClientRequest *m, int mds) {
 
1124
void Client::encode_cap_releases(MetaRequest *req, int mds) {
1005
1125
  dout(20) << "encode_cap_releases enter (req: "
1006
1126
           << req << ", mds: " << mds << ")" << dendl;
1007
1127
  if (req->inode_drop && req->inode)
1008
 
    encode_inode_release(req->inode, m,
 
1128
    encode_inode_release(req->inode, req,
1009
1129
                         mds, req->inode_drop,
1010
1130
                         req->inode_unless);
1011
1131
  
1012
1132
  if (req->old_inode_drop && req->old_inode)
1013
 
    encode_inode_release(req->old_inode, m,
 
1133
    encode_inode_release(req->old_inode, req,
1014
1134
                         mds, req->old_inode_drop,
1015
1135
                         req->old_inode_unless);
1016
1136
  
1017
1137
  if (req->dentry_drop && req->dentry)
1018
 
    encode_dentry_release(req->dentry, m,
 
1138
    encode_dentry_release(req->dentry, req,
1019
1139
                          mds, req->dentry_drop,
1020
1140
                          req->dentry_unless);
1021
1141
  
1022
1142
  if (req->old_dentry_drop && req->old_dentry)
1023
 
    encode_dentry_release(req->old_dentry, m,
 
1143
    encode_dentry_release(req->old_dentry, req,
1024
1144
                          mds, req->old_dentry_drop,
1025
1145
                          req->old_dentry_unless);
1026
1146
  dout(25) << "encode_cap_releases exit (req: "
1095
1215
    r->set_replayed_op();
1096
1216
  r->set_mdsmap_epoch(mdsmap->get_epoch());
1097
1217
 
1098
 
  encode_cap_releases(request, r, mds);
 
1218
  if (request->cap_releases.empty())
 
1219
    encode_cap_releases(request, mds);
 
1220
  r->releases = request->cap_releases;
1099
1221
 
1100
1222
  if (request->mds == -1) {
1101
1223
    request->sent_stamp = g_clock.now();
1140
1262
  req->set_filepath2(request->get_filepath2());
1141
1263
  req->set_data(request->data);
1142
1264
  req->set_retry_attempt(request->retry_attempt);
 
1265
  req->head.num_fwd = request->num_fwd;
1143
1266
  return req;
1144
1267
}
1145
1268
 
1420
1543
                 path.get_ino(), path.get_path(),   // ino
1421
1544
                 in->caps_wanted(), // wanted
1422
1545
                 in->caps[mds]->issued,     // issued
1423
 
                 in->size, in->mtime, in->atime, in->snaprealm->ino);
 
1546
                 in->snaprealm->ino);
1424
1547
 
1425
1548
      if (did_snaprealm.count(in->snaprealm->ino) == 0) {
1426
1549
        dout(10) << " snaprealm " << *in->snaprealm << dendl;
1659
1782
    }
1660
1783
    
1661
1784
    if (endoff >= 0 && endoff > (loff_t)in->max_size) {
1662
 
      dout(10) << "waiting on max_size" << dendl;
 
1785
      dout(10) << "waiting on max_size, endoff " << endoff << " max_size " << in->max_size << dendl;
1663
1786
    } else if (!in->cap_snaps.empty() && in->cap_snaps.rbegin()->second.writing) {
1664
1787
      dout(10) << "waiting on cap_snap write to complete" << dendl;
1665
1788
    } else {
1738
1861
  
1739
1862
  m->head.nlink = in->nlink;
1740
1863
  
1741
 
  m->head.xattr_len = 0; // FIXME
 
1864
  if (flush & CEPH_CAP_XATTR_EXCL) {
 
1865
    ::encode(in->xattrs, m->xattrbl);
 
1866
    m->head.xattr_version = in->xattr_version;
 
1867
  }
1742
1868
  
1743
1869
  m->head.layout = in->layout;
1744
1870
  m->head.size = in->size;
1876
2002
void Client::queue_cap_snap(Inode *in, snapid_t seq)
1877
2003
{
1878
2004
  int used = in->caps_used();
 
2005
  int dirty = in->caps_dirty();
1879
2006
  dout(10) << "queue_cap_snap " << *in << " seq " << seq << " used " << ccap_string(used) << dendl;
1880
2007
 
1881
2008
  if (in->cap_snaps.size() &&
1883
2010
    dout(10) << "queue_cap_snap already have pending cap_snap on " << *in << dendl;
1884
2011
    return;
1885
2012
  } else if (in->caps_dirty() ||
1886
 
            (used & CEPH_CAP_FILE_WR)) {
 
2013
            (used & CEPH_CAP_FILE_WR) ||
 
2014
             (dirty & CEPH_CAP_ANY_WR)) {
1887
2015
    in->get();
1888
2016
    CapSnap *capsnap = &in->cap_snaps[seq];
1889
2017
    capsnap->context = in->snaprealm->cached_snap_context;
1892
2020
    
1893
2021
    capsnap->dirty_data = (used & CEPH_CAP_FILE_BUFFER);
1894
2022
    
 
2023
    capsnap->uid = in->uid;
 
2024
    capsnap->gid = in->gid;
 
2025
    capsnap->mode = in->mode;
 
2026
    capsnap->xattrs = in->xattrs;
 
2027
    capsnap->xattr_version = in->xattr_version;
 
2028
 
1895
2029
    if (used & CEPH_CAP_FILE_WR) {
1896
2030
      dout(10) << "queue_cap_snap WR used on " << *in << dendl;
1897
2031
      capsnap->writing = 1;
1911
2045
  capsnap->atime = in->atime;
1912
2046
  capsnap->ctime = in->ctime;
1913
2047
  capsnap->time_warp_seq = in->time_warp_seq;
 
2048
 
1914
2049
  if (used & CEPH_CAP_FILE_BUFFER) {
1915
2050
    dout(10) << "finish_cap_snap " << *in << " cap_snap " << capsnap << " used " << used
1916
2051
             << " WRBUFFER, delaying" << dendl;
1954
2089
    MClientCaps *m = new MClientCaps(CEPH_CAP_OP_FLUSHSNAP, in->ino, in->snaprealm->ino, 0, mseq);
1955
2090
    m->set_client_tid(p->second.flush_tid);
1956
2091
    m->head.snap_follows = p->first;
1957
 
    m->head.size = p->second.size;
 
2092
 
1958
2093
    m->head.caps = p->second.issued;
1959
2094
    m->head.dirty = p->second.dirty;
 
2095
 
 
2096
    m->head.uid = p->second.uid;
 
2097
    m->head.gid = p->second.gid;
 
2098
    m->head.mode = p->second.mode;
 
2099
 
 
2100
    m->head.size = p->second.size;
 
2101
 
 
2102
    m->head.xattr_version = p->second.xattr_version;
 
2103
    ::encode(p->second.xattrs, m->xattrbl);
 
2104
 
1960
2105
    p->second.ctime.encode_timeval(&m->head.ctime);
1961
2106
    p->second.mtime.encode_timeval(&m->head.mtime);
1962
2107
    p->second.atime.encode_timeval(&m->head.atime);
 
2108
    m->head.time_warp_seq = p->second.time_warp_seq;
 
2109
 
1963
2110
    messenger->send_message(m, mdsmap->get_inst(mds));
1964
2111
  }
1965
2112
}
2092
2239
 
2093
2240
  check_cap_issue(in, cap, issued);
2094
2241
 
2095
 
  if (flags & CEPH_CAP_FLAG_AUTH)
2096
 
    in->auth_cap = cap;
 
2242
  if (flags & CEPH_CAP_FLAG_AUTH) {
 
2243
    if (in->auth_cap != cap) {
 
2244
      if (in->auth_cap && in->flushing_cap_item.is_on_list()) {
 
2245
        dout(10) << "add_update_cap changing auth cap: removing myself from flush_caps list" << dendl;
 
2246
        in->flushing_cap_item.remove_myself();
 
2247
      }
 
2248
      in->auth_cap = cap;
 
2249
    }
 
2250
  }
2097
2251
 
2098
2252
  unsigned old_caps = cap->issued;
2099
2253
  cap->cap_id = cap_id;
2133
2287
  
2134
2288
  cap->cap_item.remove_myself();
2135
2289
 
2136
 
  if (in->auth_cap == cap)
 
2290
  if (in->auth_cap == cap) {
 
2291
    if (in->flushing_cap_item.is_on_list()) {
 
2292
      dout(10) << " removing myself from flushing_cap list" << dendl;
 
2293
      in->flushing_cap_item.remove_myself();
 
2294
    }
2137
2295
    in->auth_cap = NULL;
 
2296
  }
2138
2297
  assert(in->caps.count(mds));
2139
2298
  in->caps.erase(mds);
2140
2299
 
2495
2654
 
2496
2655
  Inode *in = 0;
2497
2656
  vinodeno_t vino(m->get_ino(), CEPH_NOSNAP);
2498
 
  if (inode_map.count(vino)) in = inode_map[vino];
 
2657
  if (inode_map.count(vino))
 
2658
    in = inode_map[vino];
2499
2659
  if (!in) {
2500
2660
    dout(5) << "handle_caps don't have vino " << vino << dendl;
2501
2661
    m->put();
2556
2716
void Client::handle_cap_export(Inode *in, MClientCaps *m)
2557
2717
{
2558
2718
  int mds = m->get_source().num();
2559
 
  assert(in->caps[mds]);
2560
 
  InodeCap *cap = in->caps[mds];
 
2719
  InodeCap *cap = NULL;
2561
2720
 
2562
2721
  // note?
2563
2722
  bool found_higher_mseq = false;
2564
2723
  for (map<int,InodeCap*>::iterator p = in->caps.begin();
2565
2724
       p != in->caps.end();
2566
2725
       p++) {
 
2726
    if (p->first == mds)
 
2727
      cap = p->second;
2567
2728
    if (p->second->mseq > m->get_mseq()) {
2568
2729
      found_higher_mseq = true;
2569
2730
      dout(5) << "handle_cap_export ino " << m->get_ino() << " mseq " << m->get_mseq() 
2570
2731
              << " EXPORT from mds" << mds
2571
2732
              << ", but mds" << p->first << " has higher mseq " << p->second->mseq << dendl;
2572
 
      break;
2573
2733
    }
2574
2734
  }
2575
2735
 
2576
 
  if (!found_higher_mseq) {
2577
 
    dout(5) << "handle_cap_export ino " << m->get_ino() << " mseq " << m->get_mseq() 
2578
 
            << " EXPORT from mds" << mds
2579
 
            << ", setting exporting_issued " << ccap_string(cap->issued) << dendl;
2580
 
    in->exporting_issued = cap->issued;
2581
 
    in->exporting_mseq = m->get_mseq();
2582
 
    in->exporting_mds = mds;
2583
 
  } else 
2584
 
    dout(5) << "handle_cap_export ino " << m->get_ino() << " mseq " << m->get_mseq() 
2585
 
            << " EXPORT from mds" << mds
2586
 
            << ", just removing old cap" << dendl;
2587
 
 
2588
 
  remove_cap(in, mds);
 
2736
  if (cap) {
 
2737
    if (!found_higher_mseq) {
 
2738
      dout(5) << "handle_cap_export ino " << m->get_ino() << " mseq " << m->get_mseq() 
 
2739
              << " EXPORT from mds" << mds
 
2740
              << ", setting exporting_issued " << ccap_string(cap->issued) << dendl;
 
2741
      in->exporting_issued = cap->issued;
 
2742
      in->exporting_mseq = m->get_mseq();
 
2743
      in->exporting_mds = mds;
 
2744
 
 
2745
      // open export targets, so we'll get the matching IMPORT
 
2746
      connect_mds_targets(mds);
 
2747
    } else 
 
2748
      dout(5) << "handle_cap_export ino " << m->get_ino() << " mseq " << m->get_mseq() 
 
2749
              << " EXPORT from mds" << mds
 
2750
              << ", just removing old cap" << dendl;
 
2751
 
 
2752
    remove_cap(in, mds);
 
2753
  }
 
2754
  // else we already released it
2589
2755
 
2590
2756
  m->put();
2591
2757
}
2598
2764
  dout(10) << "handle_cap_trunc on ino " << *in
2599
2765
           << " size " << in->size << " -> " << m->get_size()
2600
2766
           << dendl;
2601
 
  // trim filecache?
2602
 
  if (g_conf.client_oc &&
2603
 
      m->get_size() < in->size) {
2604
 
    // map range to objects
2605
 
    vector<ObjectExtent> ls;
2606
 
    filer->file_to_extents(in->ino, &in->layout, 
2607
 
                           m->get_size(), in->size - m->get_size(),
2608
 
                           ls);
2609
 
    objectcacher->truncate_set(&in->oset, ls);
2610
 
  }
2611
2767
  
2612
 
  in->reported_size = in->size = m->get_size(); 
 
2768
  int implemented = 0;
 
2769
  int issued = in->caps_issued(&implemented) | in->caps_dirty();
 
2770
  issued |= implemented;
 
2771
  update_inode_file_bits(in, m->get_truncate_seq(), m->get_truncate_size(),
 
2772
                         m->get_size(), m->get_time_warp_seq(), m->get_ctime(),
 
2773
                         m->get_mtime(), m->get_atime(), issued);
2613
2774
  m->put();
2614
2775
}
2615
2776
 
3723
3884
}
3724
3885
 
3725
3886
 
3726
 
int Client::getdir(const char *relpath, list<string>& contents)
3727
 
{
3728
 
  dout(3) << "getdir(" << relpath << ")" << dendl;
3729
 
  {
3730
 
    Mutex::Locker lock(client_lock);
3731
 
    tout << "getdir" << std::endl;
3732
 
    tout << relpath << std::endl;
3733
 
  }
3734
 
 
3735
 
  DIR *d;
3736
 
  int r = opendir(relpath, &d);
3737
 
  if (r < 0) return r;
3738
 
 
3739
 
  struct dirent de;
3740
 
  int n = 0;
3741
 
  while (readdir_r(d, &de) > 0) {
3742
 
    contents.push_back(de.d_name);
3743
 
    n++;
3744
 
  }
3745
 
  closedir(d);
3746
 
 
3747
 
  return n;
3748
 
}
3749
 
 
3750
3887
int Client::opendir(const char *relpath, DIR **dirpp) 
3751
3888
{
3752
3889
  Mutex::Locker lock(client_lock);
3775
3912
  return 0;
3776
3913
}
3777
3914
 
3778
 
void Client::_readdir_add_dirent(DirResult *dirp, const string& name, Inode *in)
3779
 
{
3780
 
  struct stat st;
3781
 
  int stmask = fill_stat(in, &st);  
3782
 
  frag_t fg = dirp->frag();
3783
 
  dirp->buffer[fg].push_back(DirEntry(name, st, stmask));
3784
 
  dout(10) << "_readdir_add_dirent " << dirp << " added '" << name << "' -> " << in->ino
3785
 
           << ", size now " << dirp->buffer[fg].size() << dendl;
3786
 
}
 
3915
 
 
3916
int Client::closedir(DIR *dir) 
 
3917
{
 
3918
  Mutex::Locker lock(client_lock);
 
3919
  tout << "closedir" << std::endl;
 
3920
  tout << (unsigned long)dir << std::endl;
 
3921
 
 
3922
  dout(3) << "closedir(" << dir << ") = 0" << dendl;
 
3923
  _closedir((DirResult*)dir);
 
3924
  return 0;
 
3925
}
 
3926
 
 
3927
void Client::_closedir(DirResult *dirp)
 
3928
{
 
3929
  dout(10) << "_closedir(" << dirp << ")" << dendl;
 
3930
  if (dirp->inode) {
 
3931
    dout(10) << "_closedir detaching inode " << dirp->inode << dendl;
 
3932
    put_inode(dirp->inode);
 
3933
    dirp->inode = 0;
 
3934
  }
 
3935
  _readdir_drop_dirp_buffer(dirp);
 
3936
  delete dirp;
 
3937
}
 
3938
 
 
3939
void Client::rewinddir(DIR *dirp)
 
3940
{
 
3941
  dout(3) << "rewinddir(" << dirp << ")" << dendl;
 
3942
  DirResult *d = (DirResult*)dirp;
 
3943
  d->reset();
 
3944
}
 
3945
 
 
3946
loff_t Client::telldir(DIR *dirp)
 
3947
{
 
3948
  DirResult *d = (DirResult*)dirp;
 
3949
  dout(3) << "telldir(" << dirp << ") = " << d->offset << dendl;
 
3950
  return d->offset;
 
3951
}
 
3952
 
 
3953
void Client::seekdir(DIR *dirp, loff_t offset)
 
3954
{
 
3955
  dout(3) << "seekdir(" << dirp << ", " << offset << ")" << dendl;
 
3956
  DirResult *d = (DirResult*)dirp;
 
3957
 
 
3958
  if (offset == 0 ||
 
3959
      DirResult::fpos_frag(offset) != d->frag() ||
 
3960
      DirResult::fpos_off(offset) < d->fragpos()) {
 
3961
    d->reset();
 
3962
  }
 
3963
 
 
3964
  if (offset > d->offset)
 
3965
    d->release_count--;   // bump if we do a forward seek
 
3966
 
 
3967
  d->offset = offset;
 
3968
  if (!d->frag().is_leftmost() && d->next_offset == 2)
 
3969
    d->next_offset = 0;  // not 2 on non-leftmost frags!
 
3970
}
 
3971
 
 
3972
 
 
3973
 
 
3974
 
3787
3975
 
3788
3976
//struct dirent {
3789
3977
//  ino_t          d_ino;       /* inode number */
3792
3980
//  unsigned char  d_type;      /* type of file */
3793
3981
//  char           d_name[256]; /* filename */
3794
3982
//};
3795
 
void Client::_readdir_fill_dirent(struct dirent *de, DirEntry *entry, loff_t off)
 
3983
void Client::fill_dirent(struct dirent *de, const char *name, int type, uint64_t ino, loff_t next_off)
3796
3984
{
3797
 
  strncpy(de->d_name, entry->d_name.c_str(), 256);
 
3985
  strncpy(de->d_name, name, 256);
3798
3986
#ifndef __CYGWIN__
3799
 
  de->d_ino = entry->st.st_ino;
 
3987
  de->d_ino = ino;
3800
3988
#ifndef DARWIN
3801
 
  de->d_off = off + 1;
 
3989
  de->d_off = next_off;
3802
3990
#endif
3803
3991
  de->d_reclen = 1;
3804
 
  de->d_type = MODE_TO_DT(entry->st.st_mode);
3805
 
  dout(10) << "_readdir_fill_dirent '" << de->d_name << "' -> " << de->d_ino
3806
 
           << " type " << (int)de->d_type << " at off " << off << dendl;
 
3992
  de->d_type = MODE_TO_DT(type);
 
3993
  dout(10) << "fill_dirent '" << de->d_name << "' -> " << inodeno_t(de->d_ino)
 
3994
           << " type " << (int)de->d_type << " w/ next_off " << hex << next_off << dec << dendl;
3807
3995
#endif
3808
3996
}
3809
3997
 
3811
3999
{
3812
4000
  frag_t fg = dirp->frag();
3813
4001
 
3814
 
  // hose old data
3815
 
  assert(dirp->buffer.count(fg));
3816
 
  dirp->buffer.erase(fg);
3817
 
 
3818
4002
  // advance
3819
4003
  dirp->next_frag();
3820
4004
  if (dirp->at_end()) {
3836
4020
  }
3837
4021
}
3838
4022
 
 
4023
void Client::_readdir_drop_dirp_buffer(DirResult *dirp)
 
4024
{
 
4025
  dout(10) << "_readdir_drop_dirp_buffer " << dirp << dendl;
 
4026
  if (dirp->buffer) {
 
4027
    for (unsigned i = 0; i < dirp->buffer->size(); i++)
 
4028
      put_inode((*dirp->buffer)[i].second);
 
4029
    delete dirp->buffer;
 
4030
    dirp->buffer = NULL;
 
4031
  }
 
4032
}
 
4033
 
3839
4034
int Client::_readdir_get_frag(DirResult *dirp)
3840
4035
{
3841
4036
  // get the current frag.
3842
4037
  frag_t fg = dirp->frag();
3843
 
  assert(dirp->buffer.count(fg) == 0);
3844
4038
  
3845
 
  dout(10) << "_readdir_get_frag " << dirp << " on " << dirp->inode->ino << " fg " << fg << dendl;
 
4039
  dout(10) << "_readdir_get_frag " << dirp << " on " << dirp->inode->ino << " fg " << fg
 
4040
           << " next_offset " << dirp->next_offset
 
4041
           << dendl;
3846
4042
 
3847
4043
  int op = CEPH_MDS_OP_READDIR;
3848
4044
  if (dirp->inode && dirp->inode->snapid == CEPH_SNAPDIR)
3856
4052
  req->set_filepath(path); 
3857
4053
  req->inode = diri;
3858
4054
  req->head.args.readdir.frag = fg;
 
4055
  if (dirp->last_name.length()) {
 
4056
    req->path2.set_path(dirp->last_name.c_str());
 
4057
    req->readdir_start = dirp->last_name;
 
4058
  }
 
4059
  req->readdir_offset = dirp->next_offset;
 
4060
  req->readdir_frag = fg;
 
4061
  
3859
4062
  
3860
4063
  bufferlist dirbl;
3861
4064
  int res = make_request(req, -1, -1, 0, -1, &dirbl);
3870
4073
    // stuff dir contents to cache, DirResult
3871
4074
    assert(diri);
3872
4075
 
3873
 
    // create empty result vector
3874
 
    dirp->buffer[fg].clear();
3875
 
 
3876
 
    if (fg.is_leftmost()) {
3877
 
      // add . and ..?
3878
 
      string dot(".");
3879
 
      _readdir_add_dirent(dirp, dot, diri);
3880
 
      string dotdot("..");
3881
 
      if (diri->dn)
3882
 
        _readdir_add_dirent(dirp, dotdot, diri->dn->dir->parent_inode);
3883
 
      //else
3884
 
      //_readdir_add_dirent(dirp, dotdot, DT_DIR);
3885
 
    }
3886
 
    
3887
 
    // the rest?
3888
 
    bufferlist::iterator p = dirbl.begin();
3889
 
    if (!p.end()) {
3890
 
      // dirstat
3891
 
      DirStat dst(p);
3892
 
      __u32 numdn;
3893
 
      __u8 complete, end;
3894
 
      ::decode(numdn, p);
3895
 
      ::decode(end, p);
3896
 
      ::decode(complete, p);
3897
 
 
3898
 
      string dname;
3899
 
      LeaseStat dlease;
3900
 
      while (numdn) {
3901
 
        ::decode(dname, p);
3902
 
        ::decode(dlease, p);
3903
 
        InodeStat ist(p);
3904
 
 
3905
 
        Inode *in = _ll_get_inode(ist.vino);
3906
 
        dout(15) << "_readdir_get_frag got " << dname << " to " << in->ino << dendl;
3907
 
        _readdir_add_dirent(dirp, dname, in);
3908
 
 
3909
 
        numdn--;
3910
 
      }
 
4076
    _readdir_drop_dirp_buffer(dirp);
 
4077
 
 
4078
    dirp->buffer = new vector<pair<string,Inode*> >;
 
4079
    dirp->buffer->swap(req->readdir_result);
 
4080
    dirp->buffer_frag = fg;
 
4081
 
 
4082
    dirp->this_offset = dirp->next_offset;
 
4083
    dout(10) << "_readdir_get_frag " << dirp << " got frag " << dirp->buffer_frag
 
4084
             << " this_offset " << dirp->this_offset
 
4085
             << " size " << dirp->buffer->size() << dendl;
 
4086
 
 
4087
    if (req->readdir_end) {
 
4088
      dirp->last_name.clear();
 
4089
      if (fg.is_rightmost())
 
4090
        dirp->next_offset = 2;
 
4091
      else
 
4092
        dirp->next_offset = 0;
 
4093
    } else {
 
4094
      dirp->last_name = req->readdir_last_name;
 
4095
      dirp->next_offset += req->readdir_num;
 
4096
    }
 
4097
  } else {
 
4098
    dout(10) << "_readdir_get_frag got error " << res << ", setting end flag" << dendl;
 
4099
    dirp->set_end();
 
4100
  }
 
4101
 
 
4102
  return res;
 
4103
}
 
4104
 
 
4105
int Client::_readdir_cache_cb(DirResult *dirp, add_dirent_cb_t cb, void *p)
 
4106
{
 
4107
  dout(10) << "_readdir_cache_cb " << dirp << " on " << dirp->inode->ino
 
4108
           << " at_cache_name " << dirp->at_cache_name << " offset " << hex << dirp->offset << dec
 
4109
           << dendl;
 
4110
  Dir *dir = dirp->inode->dir;
 
4111
 
 
4112
  if (!dir) {
 
4113
    dout(10) << " dir is empty" << dendl;
 
4114
    dirp->set_end();
 
4115
    return 1;
 
4116
  }
 
4117
 
 
4118
  map<string,Dentry*>::iterator pd;
 
4119
  if (dirp->at_cache_name.length()) {
 
4120
    pd = dir->dentry_map.find(dirp->at_cache_name);
 
4121
    if (pd == dir->dentry_map.end())
 
4122
      return -EAGAIN;  // weird, i give up
 
4123
    pd++;
 
4124
  } else {
 
4125
    pd = dir->dentry_map.begin();
 
4126
  }
 
4127
 
 
4128
  string prev_name;
 
4129
  while (pd != dir->dentry_map.end()) {
 
4130
    Dentry *dn = pd->second;
 
4131
    if (dn->inode == NULL) {
 
4132
      dout(15) << " skipping null '" << pd->first << "'" << dendl;
 
4133
      pd++;
 
4134
      continue;
 
4135
    }
 
4136
 
 
4137
    struct stat st;
 
4138
    struct dirent de;
 
4139
    int stmask = fill_stat(dn->inode, &st);  
 
4140
    fill_dirent(&de, pd->first.c_str(), st.st_mode, st.st_ino, dirp->offset + 1);
 
4141
      
 
4142
    uint64_t next_off = dn->offset + 1;
 
4143
    pd++;
 
4144
    if (pd == dir->dentry_map.end())
 
4145
      next_off = DirResult::END;
 
4146
 
 
4147
    int r = cb(p, &de, &st, stmask, next_off);  // _next_ offset
 
4148
    dout(15) << " de " << de.d_name << " off " << hex << dn->offset << dec
 
4149
             << " = " << r
 
4150
             << dendl;
 
4151
    if (r < 0) {
 
4152
      dirp->next_offset = dn->offset;
 
4153
      dirp->at_cache_name = prev_name;
 
4154
      return r;
 
4155
    }
 
4156
 
 
4157
    prev_name = dn->name;
 
4158
    dirp->offset = next_off;
 
4159
  }
 
4160
 
 
4161
  dout(10) << "_readdir_cache_cb " << dirp << " on " << dirp->inode->ino << " at end" << dendl;
 
4162
  dirp->set_end();
 
4163
  return 1;
 
4164
}
 
4165
 
 
4166
int Client::readdir_r_cb(DIR *d, add_dirent_cb_t cb, void *p)
 
4167
{
 
4168
  DirResult *dirp = (DirResult*)d;
 
4169
 
 
4170
  dout(10) << "readdir_r_cb " << *dirp->inode << " offset " << hex << dirp->offset << dec
 
4171
           << " frag " << dirp->frag() << " fragpos " << hex << dirp->fragpos() << dec
 
4172
           << " at_end=" << dirp->at_end()
 
4173
           << dendl;
 
4174
 
 
4175
  struct dirent de;
 
4176
  struct stat st;
 
4177
  memset(&de, 0, sizeof(de));
 
4178
  memset(&st, 0, sizeof(st));
 
4179
 
 
4180
  frag_t fg = dirp->frag();
 
4181
  uint32_t off = dirp->fragpos();
 
4182
 
 
4183
  Inode *diri = dirp->inode;
 
4184
 
 
4185
  if (dirp->at_end())
 
4186
    return 0;
 
4187
 
 
4188
  if (dirp->offset == 0) {
 
4189
    dout(15) << " including ." << dendl;
 
4190
    uint64_t next_off = diri->dn ? 1 : 2;
 
4191
 
 
4192
    fill_dirent(&de, ".", S_IFDIR, diri->ino, next_off);
 
4193
 
 
4194
    fill_stat(diri, &st);
 
4195
 
 
4196
    int r = cb(p, &de, &st, -1, next_off);
 
4197
    if (r < 0)
 
4198
      return r;
 
4199
 
 
4200
    dirp->offset = next_off;
 
4201
    off = next_off;
 
4202
  }
 
4203
  if (dirp->offset == 1) {
 
4204
    dout(15) << " including .." << dendl;
 
4205
    assert(diri->dn);
 
4206
    Inode *in = diri->dn->inode;
 
4207
    fill_dirent(&de, "..", S_IFDIR, in->ino, 2);
 
4208
 
 
4209
    fill_stat(in, &st);
 
4210
 
 
4211
    int r = cb(p, &de, &st, -1, 2);
 
4212
    if (r < 0)
 
4213
      return r;
 
4214
 
 
4215
    dirp->offset = 2;
 
4216
    off = 2;
 
4217
  }
 
4218
 
 
4219
  // can we read from our cache?
 
4220
  dout(10) << "offset " << hex << dirp->offset << dec << " at_cache_name " << dirp->at_cache_name
 
4221
           << " snapid " << dirp->inode->snapid << " complete " << (bool)(dirp->inode->flags & I_COMPLETE)
 
4222
           << " issued " << ccap_string(dirp->inode->caps_issued())
 
4223
           << dendl;
 
4224
  if ((dirp->offset == 2 || dirp->at_cache_name.length()) &&
 
4225
      dirp->inode->snapid != CEPH_SNAPDIR &&
 
4226
      (dirp->inode->flags & I_COMPLETE) &&
 
4227
      dirp->inode->caps_issued_mask(CEPH_CAP_FILE_SHARED)) {
 
4228
    int err = _readdir_cache_cb(dirp, cb, p);
 
4229
    if (err != -EAGAIN)
 
4230
      return err;
 
4231
  }                                         
 
4232
  if (dirp->at_cache_name.length()) {
 
4233
    dirp->last_name = dirp->at_cache_name;
 
4234
    dirp->at_cache_name.clear();
 
4235
  }
 
4236
 
 
4237
  while (1) {
 
4238
    if (dirp->at_end())
 
4239
      return 0;
 
4240
 
 
4241
    if (dirp->buffer_frag != dirp->frag() || dirp->buffer == NULL) {
 
4242
      Mutex::Locker lock(client_lock);
 
4243
      int r = _readdir_get_frag(dirp);
 
4244
      if (r)
 
4245
        return r;
 
4246
      fg = dirp->buffer_frag;
 
4247
    }
 
4248
 
 
4249
    dout(10) << "off " << off << " this_offset " << hex << dirp->this_offset << dec << " size " << dirp->buffer->size()
 
4250
             << " frag " << fg << dendl;
 
4251
    while (off - dirp->this_offset >= 0 &&
 
4252
           off - dirp->this_offset < dirp->buffer->size()) {
 
4253
      uint64_t pos = DirResult::make_fpos(fg, off);
 
4254
      pair<string,Inode*>& ent = (*dirp->buffer)[off - dirp->this_offset];
 
4255
 
 
4256
      int stmask = fill_stat(ent.second, &st);  
 
4257
      fill_dirent(&de, ent.first.c_str(), st.st_mode, st.st_ino, dirp->offset + 1);
 
4258
      
 
4259
      int r = cb(p, &de, &st, stmask, dirp->offset + 1);  // _next_ offset
 
4260
      dout(15) << " de " << de.d_name << " off " << hex << dirp->offset << dec
 
4261
               << " = " << r
 
4262
               << dendl;
 
4263
      if (r < 0)
 
4264
        return r;
 
4265
      
 
4266
      off++;
 
4267
      dirp->offset = pos + 1;
 
4268
    }
 
4269
 
 
4270
    if (dirp->last_name.length()) {
 
4271
      dout(10) << " fetching next chunk of this frag" << dendl;
 
4272
      _readdir_drop_dirp_buffer(dirp);
 
4273
      continue;  // more!
 
4274
    }
 
4275
 
 
4276
    if (!fg.is_rightmost()) {
 
4277
      // next frag!
 
4278
      dirp->next_frag();
 
4279
      off = 0;
 
4280
      dout(10) << " advancing to next frag: " << fg << " -> " << dirp->frag() << dendl;
 
4281
      fg = dirp->frag();
 
4282
      continue;
3911
4283
    }
3912
4284
 
3913
4285
    if (diri->dir && diri->dir->release_count == dirp->release_count) {
3914
4286
      dout(10) << " marking I_COMPLETE on " << *diri << dendl;
3915
4287
      diri->flags |= I_COMPLETE;
 
4288
      if (diri->dir)
 
4289
        diri->dir->max_offset = dirp->offset;
3916
4290
    }
3917
 
  } else {
3918
 
    dout(10) << "_readdir_get_frag got error " << res << ", setting end flag" << dendl;
 
4291
 
3919
4292
    dirp->set_end();
 
4293
    return 1;
3920
4294
  }
3921
 
 
3922
 
  return res;
 
4295
  assert(0);
 
4296
  return 0;
3923
4297
}
3924
4298
 
 
4299
 
 
4300
 
 
4301
 
3925
4302
int Client::readdir_r(DIR *d, struct dirent *de)
3926
4303
{  
3927
4304
  return readdirplus_r(d, de, 0, 0);
3928
4305
}
3929
4306
 
3930
4307
/*
 
4308
 * readdirplus_r
 
4309
 *
3931
4310
 * returns
3932
4311
 *  1 if we got a dirent
3933
4312
 *  0 for end of directory
3934
4313
 * <0 on error
3935
4314
 */
 
4315
 
 
4316
struct single_readdir {
 
4317
  struct dirent *de;
 
4318
  struct stat *st;
 
4319
  int *stmask;
 
4320
  bool full;
 
4321
};
 
4322
 
 
4323
static int _readdir_single_dirent_cb(void *p, struct dirent *de, struct stat *st, int stmask, off_t off)
 
4324
{
 
4325
  single_readdir *c = (single_readdir *)p;
 
4326
 
 
4327
  if (c->full)
 
4328
    return -1;
 
4329
 
 
4330
  *c->de = *de;
 
4331
  *c->st = *st;
 
4332
  *c->stmask = stmask;
 
4333
  c->full = true;
 
4334
  return 0;  
 
4335
}
 
4336
 
3936
4337
int Client::readdirplus_r(DIR *d, struct dirent *de, struct stat *st, int *stmask)
3937
4338
{  
3938
 
  DirResult *dirp = (DirResult*)d;
3939
 
  
3940
 
  dout(10) << "readdirplus_r " << *dirp->inode << " offset " << dirp->offset
3941
 
           << " frag " << dirp->frag() << " fragpos " << dirp->fragpos()
3942
 
           << " at_end=" << dirp->at_end()
3943
 
           << dendl;
3944
 
 
3945
 
  while (1) {
3946
 
    if (dirp->at_end())
3947
 
      return 0;
3948
 
 
3949
 
    if (dirp->buffer.count(dirp->frag()) == 0) {
3950
 
      Mutex::Locker lock(client_lock);
3951
 
      _readdir_get_frag(dirp);
3952
 
      if (dirp->at_end())
3953
 
        return 0;
3954
 
    }
3955
 
 
3956
 
    frag_t fg = dirp->frag();
3957
 
    uint32_t pos = dirp->fragpos();
3958
 
    assert(dirp->buffer.count(fg));   
3959
 
    vector<DirEntry> &ent = dirp->buffer[fg];
3960
 
 
3961
 
    if (ent.empty() ||
3962
 
        pos >= ent.size()) {
3963
 
      dout(10) << "empty frag " << fg << ", moving on to next" << dendl;
3964
 
      _readdir_next_frag(dirp);
3965
 
      continue;
3966
 
    }
3967
 
 
3968
 
    assert(pos < ent.size());
3969
 
    _readdir_fill_dirent(de, &ent[pos], dirp->offset);
3970
 
    if (st)
3971
 
      *st = ent[pos].st;
3972
 
    if (stmask)
3973
 
      *stmask = ent[pos].stmask;
3974
 
    pos++;
3975
 
    dirp->offset++;
3976
 
 
3977
 
    if (pos == ent.size()) 
3978
 
      _readdir_next_frag(dirp);
3979
 
 
 
4339
  single_readdir sr;
 
4340
  sr.de = de;
 
4341
  sr.st = st;
 
4342
  sr.stmask = stmask;
 
4343
  sr.full = false;
 
4344
 
 
4345
  int r = readdir_r_cb(d, _readdir_single_dirent_cb, (void *)&sr);
 
4346
  if (r < 0)
 
4347
    return r;
 
4348
  if (sr.full)
3980
4349
    return 1;
 
4350
  return 0;
 
4351
}
 
4352
 
 
4353
 
 
4354
/* getdents */
 
4355
struct getdents_result {
 
4356
  char *buf;
 
4357
  int buflen;
 
4358
  int pos;
 
4359
  bool fullent;
 
4360
};
 
4361
 
 
4362
static int _readdir_getdent_cb(void *p, struct dirent *de, struct stat *st, int stmask, off_t off)
 
4363
{
 
4364
  struct getdents_result *c = (getdents_result *)p;
 
4365
 
 
4366
  int dlen;
 
4367
  if (c->fullent)
 
4368
    dlen = sizeof(*de);
 
4369
  else
 
4370
    dlen = strlen(de->d_name) + 1;
 
4371
 
 
4372
  if (c->pos + dlen > c->buflen)
 
4373
    return -1;  // doesn't fit
 
4374
 
 
4375
  if (c->fullent) {
 
4376
    memcpy(c->buf + c->pos, de, sizeof(*de));
 
4377
  } else {
 
4378
    memcpy(c->buf + c->pos, de->d_name, dlen);
3981
4379
  }
3982
 
  assert(0);
 
4380
  c->pos += dlen;
 
4381
  return 0;
3983
4382
}
3984
4383
 
3985
4384
int Client::_getdents(DIR *dir, char *buf, int buflen, bool fullent)
3986
4385
{
3987
 
  DirResult *dirp = (DirResult *)dir;
3988
 
  int ret = 0;
3989
 
  
3990
 
  while (1) {
3991
 
    if (dirp->at_end())
3992
 
      return ret;
3993
 
 
3994
 
    if (dirp->buffer.count(dirp->frag()) == 0) {
3995
 
      Mutex::Locker lock(client_lock);
3996
 
      _readdir_get_frag(dirp);
3997
 
      if (dirp->at_end())
3998
 
        return ret;
3999
 
    }
4000
 
 
4001
 
    frag_t fg = dirp->frag();
4002
 
    uint32_t pos = dirp->fragpos();
4003
 
    assert(dirp->buffer.count(fg));   
4004
 
    vector<DirEntry> &ent = dirp->buffer[fg];
4005
 
 
4006
 
    if (ent.empty()) {
4007
 
      dout(10) << "empty frag " << fg << ", moving on to next" << dendl;
4008
 
      _readdir_next_frag(dirp);
4009
 
      continue;
4010
 
    }
4011
 
 
4012
 
    assert(pos < ent.size());
4013
 
 
4014
 
    // is there room?
4015
 
    int dlen = ent[pos].d_name.length();
4016
 
    if (fullent)
4017
 
      dlen += sizeof(struct dirent);
4018
 
    else
4019
 
      dlen += 1; // null terminator
4020
 
    if (ret + dlen > buflen) {
4021
 
      if (!ret)
4022
 
        return -ERANGE;  // the buffer is too small for the first name!
4023
 
      return ret;
4024
 
    }
4025
 
    if (fullent)
4026
 
      _readdir_fill_dirent((struct dirent *)(buf + ret), &ent[pos], dirp->offset);
4027
 
    else
4028
 
      memcpy(buf + ret, ent[pos].d_name.c_str(), dlen);
4029
 
    ret += dlen;
4030
 
 
4031
 
    pos++;
4032
 
    dirp->offset++;
4033
 
 
4034
 
    if (pos == ent.size()) 
4035
 
      _readdir_next_frag(dirp);
4036
 
  }
4037
 
  assert(0);
 
4386
  getdents_result gr;
 
4387
  gr.buf = buf;
 
4388
  gr.buflen = buflen;
 
4389
  gr.fullent = fullent;
 
4390
  gr.pos = 0;
 
4391
 
 
4392
  int r = readdir_r_cb(dir, _readdir_getdent_cb, (void *)&gr);
 
4393
  if (r < 0) 
 
4394
    return r;
 
4395
 
 
4396
  if (gr.pos == 0)
 
4397
    return -ERANGE;
 
4398
 
 
4399
  return gr.pos;
4038
4400
}
4039
4401
 
4040
4402
 
 
4403
/* getdir */
 
4404
struct getdir_result {
 
4405
  list<string> *contents;
 
4406
  int num;
 
4407
};
4041
4408
 
4042
 
int Client::closedir(DIR *dir) 
 
4409
static int _getdir_cb(void *p, struct dirent *de, struct stat *st, int stmask, off_t off)
4043
4410
{
4044
 
  Mutex::Locker lock(client_lock);
4045
 
  tout << "closedir" << std::endl;
4046
 
  tout << (unsigned long)dir << std::endl;
 
4411
  getdir_result *r = (getdir_result *)p;
4047
4412
 
4048
 
  dout(3) << "closedir(" << dir << ") = 0" << dendl;
4049
 
  _closedir((DirResult*)dir);
 
4413
  r->contents->push_back(de->d_name);
 
4414
  r->num++;
4050
4415
  return 0;
4051
4416
}
4052
4417
 
4053
 
void Client::_closedir(DirResult *dirp)
 
4418
int Client::getdir(const char *relpath, list<string>& contents)
4054
4419
{
4055
 
  dout(10) << "_closedir(" << dirp << ")" << dendl;
4056
 
  if (dirp->inode) {
4057
 
    dout(10) << "_closedir detaching inode " << dirp->inode << dendl;
4058
 
    put_inode(dirp->inode);
4059
 
    dirp->inode = 0;
 
4420
  dout(3) << "getdir(" << relpath << ")" << dendl;
 
4421
  {
 
4422
    Mutex::Locker lock(client_lock);
 
4423
    tout << "getdir" << std::endl;
 
4424
    tout << relpath << std::endl;
4060
4425
  }
4061
 
  delete dirp;
4062
 
}
4063
 
 
4064
 
void Client::rewinddir(DIR *dirp)
4065
 
{
4066
 
  dout(3) << "rewinddir(" << dirp << ")" << dendl;
4067
 
  DirResult *d = (DirResult*)dirp;
4068
 
  d->offset = 0;
4069
 
  d->buffer.clear();
4070
 
}
4071
 
 
4072
 
loff_t Client::telldir(DIR *dirp)
4073
 
{
4074
 
  DirResult *d = (DirResult*)dirp;
4075
 
  dout(3) << "telldir(" << dirp << ") = " << d->offset << dendl;
4076
 
  return d->offset;
4077
 
}
4078
 
 
4079
 
void Client::seekdir(DIR *dirp, loff_t offset)
4080
 
{
4081
 
  dout(3) << "seekdir(" << dirp << ", " << offset << ")" << dendl;
4082
 
  DirResult *d = (DirResult*)dirp;
4083
 
  d->offset = offset;
 
4426
 
 
4427
  DIR *d;
 
4428
  int r = opendir(relpath, &d);
 
4429
  if (r < 0)
 
4430
    return r;
 
4431
 
 
4432
  getdir_result gr;
 
4433
  gr.contents = &contents;
 
4434
  gr.num = 0;
 
4435
  r = readdir_r_cb(d, _getdir_cb, (void *)&gr);
 
4436
 
 
4437
  closedir(d);
 
4438
 
 
4439
  if (r < 0)
 
4440
    return r;
 
4441
  return gr.num;
4084
4442
}
4085
4443
 
4086
4444
 
6065
6423
  assert(extents.size() == 1);
6066
6424
 
6067
6425
  // now we have the object and its 'layout'
6068
 
  pg_t pg = (pg_t)extents[0].layout.ol_pgid;
 
6426
  pg_t pg = osdmap->object_locator_to_pg(extents[0].oid, extents[0].oloc);
6069
6427
  vector<int> osds;
6070
6428
  osdmap->pg_to_osds(pg, osds);
6071
6429
  if (!osds.size())