~ubuntu-branches/ubuntu/precise/nordugrid-arc/precise

« back to all changes in this revision

Viewing changes to src/libs/data-staging/Scheduler.cpp

  • Committer: Package Import Robot
  • Author(s): Mattias Ellert
  • Date: 2012-03-01 19:48:16 UTC
  • mfrom: (3.1.5 sid)
  • Revision ID: package-import@ubuntu.com-20120301194816-m1ezrwnwt2qgnc2e
Tags: 1.1.1-1
* 1.1.1 Bugfix Release
* Fixes FTBFS (Closes: #661774) (LP: #935007)
* Fix typo in package description (Closes: #646979)
* Split binary rule in debian/rules for arch and indep

Show diffs side-by-side

added added

removed removed

Lines of Context:
50
50
      preferred_pattern = pattern;
51
51
  }
52
52
 
53
 
  void Scheduler::SetTransferShares(const TransferShares& shares) {
54
 
    if (scheduler_state == INITIATED)
55
 
      transferShares = shares;
56
 
  }
57
 
 
58
 
  void Scheduler::AddSharePriority(const std::string& name, int priority) {
59
 
    if (scheduler_state == INITIATED)
60
 
      transferShares.set_reference_share(name, priority);
61
 
  }
62
 
 
63
 
  void Scheduler::SetSharePriorities(const std::map<std::string, int>& shares) {
64
 
    if (scheduler_state == INITIATED)
65
 
      transferShares.set_reference_shares(shares);
66
 
  }
67
 
 
68
 
  void Scheduler::SetShareType(TransferShares::ShareType share_type) {
69
 
    if (scheduler_state == INITIATED)
70
 
      transferShares.set_share_type(share_type);
 
53
  void Scheduler::SetTransferSharesConf(const TransferSharesConf& share_conf) {
 
54
    if (scheduler_state == INITIATED)
 
55
      transferSharesConf = share_conf;
71
56
  }
72
57
 
73
58
  void Scheduler::SetTransferParameters(const TransferParameters& params) {
95
80
      services.push_back(DTR::LOCAL_DELIVERY);
96
81
      delivery_services = services;
97
82
    }
98
 
    DeliverySlots *= delivery_services.size();
99
 
    DeliveryEmergencySlots *= delivery_services.size();
100
83
    Arc::CreateThreadFunction(&main_thread, this);
101
84
    return true;
102
85
  }
103
86
 
 
87
  void Scheduler::log_to_root_logger(Arc::LogLevel level, const std::string& message) {
 
88
    Arc::Logger::getRootLogger().addDestinations(root_destinations);
 
89
    logger.msg(level, message);
 
90
    Arc::Logger::getRootLogger().removeDestinations();
 
91
  }
 
92
 
104
93
  /* Function to sort the list of the pointers to DTRs 
105
94
   * according to the priorities the DTRs have.
106
95
   * DTRs with higher priority go first to the beginning,
231
220
  }
232
221
 
233
222
  void Scheduler::ProcessDTRNEW(DTR* request){
234
 
    // Fresh DTRs should receive the initial priority
235
 
    // This is to be implemented
236
 
    //compute_priority(request);
 
223
 
237
224
    request->get_logger()->msg(Arc::INFO, "Scheduler received new DTR %s with source: %s,"
238
225
        " destination: %s, assigned to transfer share %s with priority %d",
239
226
        request->get_id(), request->get_source()->str(), request->get_destination()->str(),
248
235
      // take a long time to download a big file
249
236
      request->set_timeout(3600);
250
237
      request->get_logger()->msg(Arc::VERBOSE, "DTR %s: File is cacheable, will check cache", request->get_short_id());
251
 
      request->set_status(DTRStatus::CHECK_CACHE);
 
238
      if (DtrList.is_being_cached(request)) {
 
239
        Arc::Period cache_wait_period(10);
 
240
        request->get_logger()->msg(Arc::VERBOSE, "DTR %s: File is currently being cached, will wait %is",
 
241
                                   request->get_short_id(), cache_wait_period.GetPeriod());
 
242
        request->set_process_time(cache_wait_period);
 
243
        request->set_status(DTRStatus::CACHE_WAIT);
 
244
      } else {
 
245
        request->set_status(DTRStatus::CHECK_CACHE);
 
246
      }
252
247
    }
253
248
  }
254
249
  
264
259
                                "Timed out while waiting for cache for " + request->get_source()->str());
265
260
      request->get_logger()->msg(Arc::ERROR, "DTR %s: Timed out while waiting for cache lock", request->get_short_id());
266
261
      request->set_status(DTRStatus::CACHE_PROCESSED);
 
262
    } else if (DtrList.is_being_cached(request)) {
 
263
      // TODO A low priority DTR holding the cache lock can block a high priority DTR
 
264
      // downloading the same file. Here we should cancel the low priority one to let
 
265
      // the high priority one go through
 
266
      Arc::Period cache_wait_period(10);
 
267
      request->get_logger()->msg(Arc::VERBOSE, "DTR %s: File is currently being cached, will wait %is",
 
268
                                 request->get_short_id(), cache_wait_period.GetPeriod());
 
269
      request->set_process_time(cache_wait_period);
267
270
    } else {
268
271
      // Try to check cache again
269
272
      request->get_logger()->msg(Arc::VERBOSE, "DTR %s: Checking cache again", request->get_short_id());
275
278
    // There's no need to check additionally for cache error
276
279
    // If the error has occurred -- we just proceed the normal
277
280
    // workflow as if it was not cached at all.
278
 
    // But we should clear error flag if it was set by
279
 
    // the pre-processor
 
281
    // But we should clear error flag if it was set by the pre-processor
280
282
    request->reset_error_status();
 
283
    if (request->get_cache_state() == CACHEABLE) DtrList.caching_started(request);
281
284
 
282
285
    if(request->get_cache_state() == CACHE_ALREADY_PRESENT){
283
286
      // File is on place already. After the post-processor
374
377
      // to limit per remote host. For now count staging transfers in this
375
378
      // share already in transfer queue and apply limit. In order not to block
376
379
      // the highest priority DTRs here we allow them to bypass the limit.
377
 
      std::list<DTR*> DeliveryQueue;
378
 
      DtrList.filter_dtrs_by_next_receiver(DELIVERY,DeliveryQueue);
379
 
 
380
380
      int share_queue = 0, highest_priority = 0;
381
 
      for (std::list<DTR*>::iterator dtr = DeliveryQueue.begin(); dtr != DeliveryQueue.end(); ++dtr) {
 
381
      for (std::list<DTR*>::iterator dtr = staged_queue.begin(); dtr != staged_queue.end(); ++dtr) {
382
382
        if ((*dtr)->get_transfer_share() == request->get_transfer_share() &&
383
383
            ((*dtr)->get_source()->IsStageable() ||
384
384
             (*dtr)->get_destination()->IsStageable())) {
386
386
          if ((*dtr)->get_priority() > highest_priority) highest_priority = (*dtr)->get_priority();
387
387
        }
388
388
      }
389
 
      if (share_queue >= DeliverySlots*2 && request->get_priority() <= highest_priority) {
390
 
        request->get_logger()->msg(Arc::INFO, "DTR %s: Large transfer queue - will wait 10s before staging", request->get_short_id());
 
389
      if (share_queue >= DeliverySlots*4 && request->get_priority() <= highest_priority) {
 
390
        request->get_logger()->msg(Arc::VERBOSE, "DTR %s: Large transfer queue - will wait 10s before staging", request->get_short_id());
391
391
        request->set_process_time(10);
392
392
      }
393
393
      else {
395
395
        request->set_timeout(3600);
396
396
        // processor will take care of staging source or destination or both
397
397
        request->get_logger()->msg(Arc::VERBOSE, "DTR %s: Source or destination requires staging", request->get_short_id());
 
398
        staged_queue.push_back(request);
398
399
        request->set_status(DTRStatus::STAGE_PREPARE);
399
400
      }
400
401
    }
458
459
      }
459
460
    }
460
461
 
461
 
    // Probably operations in the preprocessor may
462
 
    // influence the priority in the delivery.
463
 
    // Recomputation might be needed.
464
 
    // compute_priority(request);
465
 
    // After normal workflow the DTR has become ready for delivery
 
462
    // After normal workflow the DTR is ready for delivery
466
463
    request->get_logger()->msg(Arc::VERBOSE, "DTR %s: DTR is ready for transfer, moving to delivery queue", request->get_short_id());
467
464
 
468
465
    // set long timeout for waiting for transfer slot
469
466
    // (setting timeouts for active transfers is done in Delivery)
470
467
    request->set_timeout(7200);
471
 
    request->set_status(DTRStatus::TRANSFER_WAIT);
 
468
    request->set_status(DTRStatus::TRANSFER);
472
469
  }
473
470
  
474
471
  void Scheduler::ProcessDTRTRANSFERRED(DTR* request){
475
 
    // We don't do check if the error has happened
476
 
    // If it has -- the post-processor will take needed
477
 
    // steps in RELEASE_REQUEST in any case
478
 
    // The error flag will work now as a sign to return
479
 
    // the DTR to QUERY_REPLICA again
 
472
    // We don't check if error has happened - if it has the post-processor
 
473
    // will take needed steps in RELEASE_REQUEST in any case. The error flag
 
474
    // will work now as a sign to return the DTR to QUERY_REPLICA again.
480
475
 
481
476
    // Delivery will clean up destination physical file on error
482
477
    if (request->error())
483
478
      request->get_logger()->msg(Arc::ERROR, "DTR %s: Transfer failed: %s", request->get_short_id(), request->get_error_status().GetDesc());
484
479
 
485
 
    // Resuming normal workflow after the DTR
486
 
    // has finished transferring
 
480
    // Resuming normal workflow after the DTR has finished transferring
487
481
    // The next state is RELEASE_REQUEST
488
482
 
489
483
    // if cacheable and no cancellation or error, mark the DTR as CACHE_DOWNLOADED
547
541
  void Scheduler::ProcessDTRCACHE_PROCESSED(DTR* request){
548
542
    // Final stage within scheduler. Retries are initiated from here if necessary,
549
543
    // otherwise report success or failure to generator
 
544
 
 
545
    // First remove from caching list
 
546
    DtrList.caching_finished(request);
 
547
 
550
548
    if (request->cancel_requested()) {
551
549
      // Cancellation steps finished
552
550
      request->get_logger()->msg(Arc::VERBOSE, "DTR %s: Cancellation complete", request->get_short_id());
554
552
    }
555
553
    else if(request->error()) {
556
554
      // If the error occurred in cache processing we send back
557
 
      // to REPLICA_QUERIED to try the same replica again without cache.
558
 
      // If there was a cache timeout we go back to CACHE_CHECKED. If in
 
555
      // to REPLICA_QUERIED to try the same replica again without cache,
 
556
      // or to CACHE_CHECKED if the file was already in cache. If there
 
557
      // was a cache timeout we also go back to CACHE_CHECKED. If in
559
558
      // another place we are finished and report error to generator
560
559
      if (request->get_error_status().GetLastErrorState() == DTRStatus::PROCESSING_CACHE) {
561
560
        request->get_logger()->msg(Arc::ERROR, "DTR %s: Error in cache processing, will retry without caching", request->get_short_id());
 
561
        request->reset_error_status();
 
562
        if (request->get_cache_state() == CACHE_ALREADY_PRESENT) request->set_status(DTRStatus::CACHE_CHECKED);
 
563
        else request->set_status(DTRStatus::REPLICA_QUERIED);
562
564
        request->set_cache_state(CACHE_SKIP);
563
 
        request->reset_error_status();
564
 
        request->set_status(DTRStatus::REPLICA_QUERIED);
565
565
        return;
566
566
      }
567
567
      else if (request->get_error_status().GetLastErrorState() == DTRStatus::CACHE_WAIT) {
579
579
            request->get_error_status().GetErrorStatus() == DTRErrorStatus::TRANSFER_SPEED_ERROR ||
580
580
            request->get_error_status().GetErrorStatus() == DTRErrorStatus::INTERNAL_PROCESS_ERROR) {
581
581
          if (request->get_tries_left() > 0) {
582
 
            request->set_process_time(10);
 
582
            // exponential back off - 10s, 40s, 90s, ...
 
583
            request->set_process_time(10*(request->get_initial_tries()-request->get_tries_left())*
 
584
                                         (request->get_initial_tries()-request->get_tries_left()));
583
585
            request->get_logger()->msg(Arc::INFO, "DTR %s: %i retries left, will wait until %s before next attempt",
584
586
                                       request->get_short_id(), request->get_tries_left(), request->get_process_time().str());
585
587
            // set state depending on where the error occurred
610
612
  }
611
613
  
612
614
  void Scheduler::ProcessDTRFINAL_STATE(DTR* request){
613
 
        /* The only place where the DTR is returned to the generator 
614
 
         * and deleted from the global list
615
 
         */
 
615
        // This is the only place where the DTR is returned to the generator
 
616
        // and deleted from the global list
 
617
 
616
618
        // Return to the generator
617
619
    request->get_logger()->msg(Arc::INFO, "DTR %s: Returning to generator", request->get_short_id());
618
620
    request->push(GENERATOR);
619
 
    // Decrease the corresponding transfer share
620
 
    transferShares.decrease_transfer_share(request->get_transfer_share());
621
621
    // Delete from the global list
622
622
    DtrList.delete_dtr(request);
623
623
  }
624
624
  
625
625
  void Scheduler::map_state_and_process(DTR* request){
626
 
    // DTRs that were requested to be cancelled are processed not here
627
 
    if(request->cancel_requested()) map_cancel_state_and_process(request);
 
626
    // For cancelled DTRs set the appropriate post-processor state
 
627
    if(request->cancel_requested()) map_cancel_state(request);
628
628
    // Loop until the DTR is sent somewhere for some action to be done
629
629
    // This is more efficient because many DTRs will skip some states and
630
630
    // we don't want to have to wait for the full list to be processed before
652
652
        default: ; //DoNothing
653
653
      }
654
654
    }
655
 
    if (request->is_in_final_state()) {
656
 
      // If we came here -- we were in the final state,
657
 
      // so the DTR is returned to the generator and deleted
658
 
      ProcessDTRFINAL_STATE(request);
659
 
    }
 
655
 
660
656
  }
661
657
  
662
 
  void Scheduler::map_cancel_state_and_process(DTR* request){
 
658
  void Scheduler::map_cancel_state(DTR* request){
663
659
    switch (request->get_status().GetStatus()) {
664
660
      case DTRStatus::NEW:
665
661
      case DTRStatus::CHECK_CACHE:
 
662
      case DTRStatus::CACHE_WAIT:
666
663
        {
667
664
          // Nothing has yet been done to require cleanup or additional
668
665
          // activities. Return to the generator via CACHE_PROCESSED.
669
666
          request->set_status(DTRStatus::CACHE_PROCESSED);
670
667
        }
671
668
        break;
672
 
      case DTRStatus::CACHE_WAIT:
673
669
      case DTRStatus::CACHE_CHECKED:
674
670
      case DTRStatus::RESOLVE:
675
671
        {
693
689
        break;
694
690
      case DTRStatus::STAGING_PREPARING_WAIT:
695
691
      case DTRStatus::STAGED_PREPARED:
696
 
      case DTRStatus::TRANSFER_WAIT:
697
692
      case DTRStatus::TRANSFER:
698
693
        {
699
694
          // At this stage we in addition to cache work
724
719
    }
725
720
  }
726
721
  
 
722
  void Scheduler::add_event(DTR* event) {
 
723
    event_lock.lock();
 
724
    events.push_back(event);
 
725
    event_lock.unlock();
 
726
  }
 
727
 
727
728
  void Scheduler::process_events(void){
728
 
    std::list<DTR*> Events;
729
 
    
730
 
    // Get the events
731
 
    DtrList.filter_pending_dtrs(Events);
732
 
    
733
 
    std::list<DTR*>::iterator Event;
734
 
    for(Event = Events.begin(); Event != Events.end(); Event++){
735
 
      map_state_and_process(*Event);
736
 
    }
737
 
  }
738
 
 
739
 
  void Scheduler::revise_pre_processor_queue()
740
 
  {
741
 
    std::list<DTR*> PreProcessorQueue;
742
 
    DtrList.filter_dtrs_by_next_receiver(PRE_PROCESSOR,PreProcessorQueue);
743
 
 
744
 
    if (PreProcessorQueue.empty()) return;
745
 
 
746
 
    // Sort the queue by priority
747
 
    PreProcessorQueue.sort(dtr_sort_predicate);
748
 
 
749
 
    DTR* tmp;
750
 
 
751
 
    std::list<DTR*>::iterator dtr = PreProcessorQueue.begin();
752
 
    int highest_priority = (*dtr)->get_priority();
753
 
 
754
 
    while (dtr != PreProcessorQueue.end()) {
755
 
 
756
 
      tmp = *dtr;
757
 
      // The cancellation requests break the normal workflow. A cancelled
758
 
      // request will either go back to generator or be put into a
759
 
      // post-processor state for clean up
760
 
      if(tmp->cancel_requested()){
761
 
        map_cancel_state_and_process(tmp);
762
 
        dtr = PreProcessorQueue.erase(dtr);
763
 
        continue;
764
 
      }
765
 
 
766
 
      // To avoid the situation where DTRs get blocked due to higher
767
 
      // priority DTRs, DTRs that have passed their timeout should have their
768
 
      // priority boosted. But this should only happen if there are higher
769
 
      // priority DTRs, since there could be a large queue of low priority DTRs
770
 
      // which, after having their priority boosted, would then block new
771
 
      // high priority requests.
772
 
      // The simple solution here is to increase priority by 1 every 5 minutes.
773
 
      // There is plenty of scope for more intelligent solutions.
774
 
      // TODO reset priority back to original value after pre-processing
775
 
      if(tmp->get_timeout() < time(NULL) && tmp->get_priority() < highest_priority){
776
 
        tmp->set_priority(tmp->get_priority() + 1);
777
 
        tmp->set_timeout(300);
778
 
      }
779
 
      ++dtr;
780
 
    }
781
 
 
782
 
    transferShares.calculate_shares(PreProcessorSlots);
783
 
 
784
 
    std::list<DTR*> InPreProcessor;
785
 
    DtrList.filter_dtrs_by_owner(PRE_PROCESSOR, InPreProcessor);
786
 
 
787
 
    // Number of the DTRs running in the pre-processor
788
 
    int PreProcessorRunning = InPreProcessor.size();
789
 
    if (PreProcessorRunning == PreProcessorSlots) return;
790
 
 
791
 
    // Decrease shares for those already in the pre-processor
792
 
    for (dtr = InPreProcessor.begin(); dtr != InPreProcessor.end(); ++dtr) {
793
 
      transferShares.decrease_number_of_slots((*dtr)->get_transfer_share());
794
 
    }
795
 
 
796
 
    // Send to pre-processor according to share and place in the queue
797
 
    while(PreProcessorRunning < PreProcessorSlots && !PreProcessorQueue.empty()){
798
 
      tmp = PreProcessorQueue.front();
799
 
      PreProcessorQueue.pop_front();
800
 
      if (transferShares.can_start(tmp->get_transfer_share())) {
801
 
        tmp->push(PRE_PROCESSOR);
802
 
        PreProcessorRunning++;
803
 
        transferShares.decrease_number_of_slots(tmp->get_transfer_share());
804
 
      }
805
 
    }
806
 
  }
807
 
  
808
 
  void Scheduler::revise_post_processor_queue()
809
 
  {
810
 
    // There's no check for cancellation requests.
811
 
    // The post-processor is special. Most DTRs
812
 
    // with cancellation requests will go to the
813
 
    // post-processor for cleanups, hold releases,
814
 
    // etc., so the cancellation requests don't break
815
 
    // normal workflow in the post-processor (as opposed
816
 
    // to any other process), but instead act just as a
817
 
    // sign that the post-processor should do additional
818
 
    // cleanup activities.
819
 
 
820
 
    std::list<DTR*> PostProcessorQueue;
821
 
    DtrList.filter_dtrs_by_next_receiver(POST_PROCESSOR,PostProcessorQueue);
822
 
 
823
 
    if (PostProcessorQueue.empty()) return;
824
 
 
825
 
    // Sort the queue by priority
826
 
    PostProcessorQueue.sort(dtr_sort_predicate);
827
 
 
828
 
    DTR* tmp;
829
 
 
830
 
    std::list<DTR*>::iterator dtr = PostProcessorQueue.begin();
831
 
    int highest_priority = (*dtr)->get_priority();
832
 
 
833
 
    while (dtr != PostProcessorQueue.end()) {
834
 
 
835
 
      tmp = *dtr;
836
 
 
837
 
      // see revise_pre_processor_queue() for explanation
838
 
      if(tmp->get_timeout() < time(NULL) && tmp->get_priority() < highest_priority){
839
 
        tmp->set_priority(tmp->get_priority() + 1);
840
 
        tmp->set_timeout(300);
841
 
      }
842
 
      ++dtr;
843
 
    }
844
 
 
845
 
    transferShares.calculate_shares(PostProcessorSlots);
846
 
 
847
 
    std::list<DTR*> InPostProcessor;
848
 
    DtrList.filter_dtrs_by_owner(POST_PROCESSOR, InPostProcessor);
849
 
 
850
 
    // Number of the DTRs running in the post-processor
851
 
    int PostProcessorRunning = InPostProcessor.size();
852
 
    if (PostProcessorRunning == PostProcessorSlots) return;
853
 
 
854
 
    // Decrease shares for those already in the post-processor
855
 
    for (dtr = InPostProcessor.begin(); dtr != InPostProcessor.end(); ++dtr) {
856
 
      transferShares.decrease_number_of_slots((*dtr)->get_transfer_share());
857
 
    }
858
 
 
859
 
    // Send to pre-processor according to share and place in the queue
860
 
    while(PostProcessorRunning < PostProcessorSlots && !PostProcessorQueue.empty()){
861
 
      tmp = PostProcessorQueue.front();
862
 
      PostProcessorQueue.pop_front();
863
 
      if (transferShares.can_start(tmp->get_transfer_share())) {
864
 
        tmp->push(POST_PROCESSOR);
865
 
        PostProcessorRunning++;
866
 
        transferShares.decrease_number_of_slots(tmp->get_transfer_share());
867
 
      }
868
 
    }
869
 
  }
870
 
  
871
 
  void Scheduler::revise_delivery_queue()
872
 
  {
873
 
    std::list<DTR*> DeliveryQueue;    
874
 
    DtrList.filter_dtrs_by_next_receiver(DELIVERY,DeliveryQueue);
875
 
 
876
 
    // Sort the Delivery Queue according to
877
 
    // the priorities the DTRs have.
878
 
    DeliveryQueue.sort(dtr_sort_predicate);
879
 
 
880
 
    DTR* tmp;
881
 
    int highest_priority = 0;
882
 
 
883
 
    std::list<DTR*>::iterator dtr = DeliveryQueue.begin();
884
 
 
885
 
    while (dtr != DeliveryQueue.end()) {
886
 
      if (dtr == DeliveryQueue.begin()) highest_priority = (*dtr)->get_priority();
887
 
 
888
 
      tmp = *dtr;
889
 
      // The cancellation requests break the normal workflow. A cancelled
890
 
      // request will either go back to generator or be put into a
891
 
      // post-processor state for clean up
892
 
      if(tmp->cancel_requested()){
893
 
        map_cancel_state_and_process(tmp);
894
 
        dtr = DeliveryQueue.erase(dtr);
895
 
        continue;
896
 
      }
897
 
 
898
 
      // see revise_pre_processor_queue() for explanation
899
 
      if(tmp->get_timeout() < time(NULL) && tmp->get_priority() < highest_priority){
900
 
        tmp->set_priority(tmp->get_priority() + 1);
901
 
        tmp->set_timeout(300);
902
 
      }
903
 
      dtr++;
904
 
    }
905
 
 
906
 
    transferShares.calculate_shares(DeliverySlots);
907
 
    
908
 
    // Shares which have at least one DTR in Delivery
909
 
    // Shares can only use emergency slots if they are not in this list
910
 
    std::set<std::string> shares_in_delivery;
911
 
 
912
 
    // The shares are re-calculated. Now we have to determine
913
 
    // how many slots every share has already grabbed.
914
 
    {
915
 
      std::list<DTR*> shareDeliveryQueue;
916
 
      std::list<DTR*>::iterator sharedtr;
917
 
      DTR* sharetmp;
918
 
 
919
 
      DtrList.filter_dtrs_by_owner(DELIVERY,shareDeliveryQueue);
920
 
        
921
 
      for(sharedtr = shareDeliveryQueue.begin(); sharedtr != shareDeliveryQueue.end(); sharedtr++){
922
 
        sharetmp = *sharedtr;
923
 
        // First check for cancellation - send cancellation call to Delivery
924
 
        // and don't count as an active slot
925
 
        if (sharetmp->cancel_requested()) {
926
 
          // check if already cancelled
927
 
          if (sharetmp->get_status() != DTRStatus::TRANSFERRING_CANCEL) {
928
 
            sharetmp->get_logger()->msg(Arc::INFO, "DTR %s: Cancelling active transfer", sharetmp->get_short_id());
929
 
            delivery.cancelDTR(sharetmp);
 
729
    
 
730
    // Get all the DTRs in a staged state
 
731
    staged_queue.clear();
 
732
    DtrList.filter_dtrs_by_statuses(DTRStatus::StagedStates, staged_queue);
 
733
 
 
734
    Arc::Time now;
 
735
    event_lock.lock();
 
736
 
 
737
    for (std::list<DTR*>::iterator event = events.begin(); event != events.end();) {
 
738
      DTR* tmp = *event;
 
739
      event_lock.unlock();
 
740
 
 
741
      if (tmp->get_process_time() <= now) {
 
742
        map_state_and_process(tmp);
 
743
        // If final state, the DTR is returned to the generator and deleted
 
744
        if (tmp->is_in_final_state()) {
 
745
          ProcessDTRFINAL_STATE(tmp);
 
746
          event_lock.lock();
 
747
          event = events.erase(event);
 
748
          continue;
 
749
        }
 
750
        // If the event was sent on to a queue, erase it from the list
 
751
        if (tmp->is_destined_for_pre_processor() ||
 
752
            tmp->is_destined_for_delivery() ||
 
753
            tmp->is_destined_for_post_processor()) {
 
754
          event_lock.lock();
 
755
          event = events.erase(event);
 
756
          continue;
 
757
        }
 
758
      }
 
759
      event_lock.lock();
 
760
      ++event;
 
761
    }
 
762
    event_lock.unlock();
 
763
  }
 
764
 
 
765
  void Scheduler::revise_queues() {
 
766
 
 
767
    // The DTRs ready to go into a processing state
 
768
    std::map<DTRStatus::DTRStatusType, std::list<DTR*> > DTRQueueStates;
 
769
    DtrList.filter_dtrs_by_statuses(DTRStatus::ToProcessStates, DTRQueueStates);
 
770
 
 
771
    // The active DTRs currently in processing states
 
772
    std::map<DTRStatus::DTRStatusType, std::list<DTR*> > DTRRunningStates;
 
773
    DtrList.filter_dtrs_by_statuses(DTRStatus::ProcessingStates, DTRRunningStates);
 
774
 
 
775
    // Go through "to process" states, work out shares and push DTRs
 
776
    for (unsigned int i = 0; i < DTRStatus::ToProcessStates.size(); ++i) {
 
777
 
 
778
      std::list<DTR*> DTRQueue = DTRQueueStates[DTRStatus::ToProcessStates.at(i)];
 
779
      std::list<DTR*> ActiveDTRs = DTRRunningStates[DTRStatus::ProcessingStates.at(i)];
 
780
 
 
781
      if (DTRQueue.empty() && ActiveDTRs.empty()) continue;
 
782
 
 
783
      // Transfer shares for this queue
 
784
      TransferShares transferShares(transferSharesConf);
 
785
 
 
786
      // Sort the DTR queue according to the priorities the DTRs have.
 
787
      // Highest priority will be at the beginning of the list.
 
788
      DTRQueue.sort(dtr_sort_predicate);
 
789
 
 
790
      int highest_priority = 0;
 
791
 
 
792
      // First go over the queue and check for cancellation and timeout
 
793
      for (std::list<DTR*>::iterator dtr = DTRQueue.begin(); dtr != DTRQueue.end();) {
 
794
 
 
795
        DTR* tmp = *dtr;
 
796
        if (dtr == DTRQueue.begin()) highest_priority = tmp->get_priority();
 
797
 
 
798
        // There's no check for cancellation requests for the post-processor.
 
799
        // Most DTRs with cancellation requests will go to the post-processor
 
800
        // for cleanups, hold releases, etc., so the cancellation requests
 
801
        // don't break normal workflow in the post-processor (as opposed
 
802
        // to any other process), but instead act just as a sign that the
 
803
        // post-processor should do additional cleanup activities.
 
804
        if (tmp->is_destined_for_pre_processor() || tmp->is_destined_for_delivery()) {
 
805
 
 
806
          // The cancellation requests break the normal workflow. A cancelled
 
807
          // request will either go back to generator or be put into a
 
808
          // post-processor state for clean up.
 
809
          if (tmp->cancel_requested()) {
 
810
            map_cancel_state(tmp);
 
811
            add_event(tmp);
 
812
            dtr = DTRQueue.erase(dtr);
 
813
            continue;
930
814
          }
 
815
        }
 
816
        // To avoid the situation where DTRs get blocked due to higher
 
817
        // priority DTRs, DTRs that have passed their timeout should have their
 
818
        // priority boosted. But this should only happen if there are higher
 
819
        // priority DTRs, since there could be a large queue of low priority DTRs
 
820
        // which, after having their priority boosted, would then block new
 
821
        // high priority requests.
 
822
        // The simple solution here is to increase priority by 1 every 5 minutes.
 
823
        // There is plenty of scope for more intelligent solutions.
 
824
        // TODO reset priority back to original value once past this stage.
 
825
        if (tmp->get_timeout() < time(NULL) && tmp->get_priority() < highest_priority) {
 
826
          tmp->set_priority(tmp->get_priority() + 1);
 
827
          tmp->set_timeout(300);
 
828
        }
 
829
        transferShares.increase_transfer_share(tmp->get_transfer_share());
 
830
        ++dtr;
 
831
      }
 
832
 
 
833
      // Go over the active DTRs and add to transfer share
 
834
      for (std::list<DTR*>::iterator dtr = ActiveDTRs.begin(); dtr != ActiveDTRs.end();) {
 
835
 
 
836
        DTR* tmp = *dtr;
 
837
        // If the DTR is in Delivery, check for cancellation. The pre- and
 
838
        // post-processor DTRs don't get cancelled here but are allowed to
 
839
        // continue processing.
 
840
        if (tmp->get_status() == DTRStatus::TRANSFERRING && tmp->cancel_requested()) {
 
841
          tmp->get_logger()->msg(Arc::INFO, "DTR %s: Cancelling active transfer", tmp->get_short_id());
 
842
          delivery.cancelDTR(tmp);
 
843
          dtr = ActiveDTRs.erase(dtr);
931
844
          continue;
932
845
        }
933
 
        // Every active DTR for sure has its share represented
934
 
        // in active shares. So we can just decrease the corresponding
935
 
        // number
936
 
        transferShares.decrease_number_of_slots(sharetmp->get_transfer_share());
937
 
        shares_in_delivery.insert(sharetmp->get_transfer_share());
938
 
      }
939
 
    }
940
 
    
941
 
    // Refresh the number of DTRs running in the Delivery,
942
 
    int DeliveryRunning = DtrList.number_of_dtrs_by_owner(DELIVERY);
943
 
 
944
 
    // Now at the beginning of the queue we have DTRs that
945
 
    // should be launched first. Launch them, but with respect
946
 
    // to the transfer shares.
947
 
    for(dtr = DeliveryQueue.begin(); dtr != DeliveryQueue.end(); dtr++){
948
 
      tmp = *dtr;
949
 
      // Limit reached - check if emergency slots are needed for any shares
950
 
      // in the queue but not already running
951
 
      if (DeliveryRunning >= DeliverySlots) {
952
 
        if (DeliveryRunning == DeliverySlots + DeliveryEmergencySlots)
953
 
          break;
954
 
        if ((shares_in_delivery.find(tmp->get_transfer_share()) == shares_in_delivery.end()) &&
955
 
            transferShares.can_start(tmp->get_transfer_share())) {
956
 
          // choose delivery service - random for now
957
 
          // if this is a retry, try to use a different service
958
 
          if (!delivery_services.empty()) {
959
 
            tmp->set_delivery_endpoint(delivery_services.at(rand() % delivery_services.size()));
 
846
        transferShares.increase_transfer_share((*dtr)->get_transfer_share());
 
847
        ++dtr;
 
848
      }
 
849
 
 
850
      // If the queue is empty we can go straight to the next state
 
851
      if (DTRQueue.empty()) continue;
 
852
 
 
853
      // Slot limit for this state
 
854
      int slot_limit = DeliverySlots;
 
855
      if (DTRQueue.front()->is_destined_for_pre_processor()) slot_limit = PreProcessorSlots;
 
856
      else if (DTRQueue.front()->is_destined_for_post_processor()) slot_limit = PostProcessorSlots;
 
857
 
 
858
      // Calculate the slots available for each active share
 
859
      transferShares.calculate_shares(slot_limit);
 
860
 
 
861
      // Shares which have at least one DTR active and running.
 
862
      // Shares can only use emergency slots if they are not in this list.
 
863
      std::set<std::string> active_shares;
 
864
      int running = ActiveDTRs.size();
 
865
 
 
866
      // Go over the active DTRs again and decrease slots in corresponding shares
 
867
      for (std::list<DTR*>::iterator dtr = ActiveDTRs.begin(); dtr != ActiveDTRs.end(); ++dtr) {
 
868
        transferShares.decrease_number_of_slots((*dtr)->get_transfer_share());
 
869
        active_shares.insert((*dtr)->get_transfer_share());
 
870
      }
 
871
 
 
872
      // Now at the beginning of the queue we have DTRs that should be
 
873
      // launched first. Launch them, but with respect to the transfer shares.
 
874
      for (std::list<DTR*>::iterator dtr = DTRQueue.begin(); dtr != DTRQueue.end(); ++dtr) {
 
875
 
 
876
        DTR* tmp = *dtr;
 
877
 
 
878
        // Check if there are any shares left in the queue which might need
 
879
        // an emergency share - if not we are done
 
880
        if (running >= slot_limit &&
 
881
            transferShares.active_shares().size() == active_shares.size()) break;
 
882
 
 
883
        // Are there slots left for this share?
 
884
        bool can_start = transferShares.can_start(tmp->get_transfer_share());
 
885
        // Check if it is possible to use an emergency share
 
886
        if (running >= slot_limit &&
 
887
            active_shares.find(tmp->get_transfer_share()) != active_shares.end()) {
 
888
          can_start = false;
 
889
        }
 
890
 
 
891
        if (can_start) {
 
892
 
 
893
          // If going into Delivery, choose delivery service - random for now
 
894
          if (tmp->is_destined_for_delivery() && !delivery_services.empty()) {
 
895
            // previous endpoint
 
896
            Arc::URL delivery_endpoint(tmp->get_delivery_endpoint());
 
897
            // If this is a retry, use a different service
 
898
            if (tmp->get_tries_left() < tmp->get_initial_tries() && delivery_services.size() > 1) {
 
899
              Arc::URL ep(delivery_endpoint);
 
900
              // Find a random service different from the previous one, looping a
 
901
              // limited number of times in case all delivery_services are the same url
 
902
              for (unsigned int i = 0; ep == delivery_endpoint && i < delivery_services.size() * 10; ++i) {
 
903
                ep = delivery_services.at(rand() % delivery_services.size());
 
904
              }
 
905
              delivery_endpoint = ep;
 
906
            } else {
 
907
              delivery_endpoint = delivery_services.at(rand() % delivery_services.size());
 
908
            }
 
909
            tmp->set_delivery_endpoint(delivery_endpoint);
960
910
          }
961
911
          transferShares.decrease_number_of_slots(tmp->get_transfer_share());
962
 
          tmp->set_status(DTRStatus::TRANSFER);
963
 
          tmp->push(DELIVERY);
964
 
          DeliveryRunning++;
965
 
          shares_in_delivery.insert(tmp->get_transfer_share());
966
 
        }
967
 
      }
968
 
      else if(transferShares.can_start(tmp->get_transfer_share())){
969
 
        // choose delivery service - random for now
970
 
        // if this is a retry, try to use a different service
971
 
        if (!delivery_services.empty()) {
972
 
          tmp->set_delivery_endpoint(delivery_services.at(rand() % delivery_services.size()));
973
 
        }
974
 
        transferShares.decrease_number_of_slots(tmp->get_transfer_share());
975
 
        tmp->set_status(DTRStatus::TRANSFER);
976
 
        tmp->push(DELIVERY);
977
 
        DeliveryRunning++;
978
 
        shares_in_delivery.insert(tmp->get_transfer_share());
 
912
 
 
913
          // Send to processor/delivery
 
914
          if (tmp->is_destined_for_pre_processor()) tmp->push(PRE_PROCESSOR);
 
915
          else if (tmp->is_destined_for_delivery()) tmp->push(DELIVERY);
 
916
          else if (tmp->is_destined_for_post_processor()) tmp->push(POST_PROCESSOR);
 
917
 
 
918
          ++running;
 
919
          active_shares.insert(tmp->get_transfer_share());
 
920
        }
 
921
        // Hard limit with all emergency slots used
 
922
        if (running == slot_limit + DeliveryEmergencySlots) break;
979
923
      }
980
924
    }
981
 
    
982
925
  }
983
926
 
984
927
  void Scheduler::receiveDTR(DTR& request){
985
 
    if(request.get_status() != DTRStatus::NEW) {
986
 
       // If DTR is not NEW scheduler will pick it up itself.
987
 
       return;
988
 
    }
989
 
    
 
928
 
 
929
    if (request.get_status() != DTRStatus::NEW) {
 
930
      add_event(&request);
 
931
      return;
 
932
    }
 
933
    // New DTR - first check it is valid
 
934
    if (!request) {
 
935
      logger.msg(Arc::ERROR, "Scheduler received invalid DTR");
 
936
      request.set_status(DTRStatus::ERROR);
 
937
      request.push(GENERATOR);
 
938
      return;
 
939
    }
 
940
 
990
941
    request.registerCallback(&processor,PRE_PROCESSOR);
991
942
    request.registerCallback(&processor,POST_PROCESSOR);
992
943
    request.registerCallback(&delivery,DELIVERY);
993
944
    /* Shares part*/
994
945
    // First, get the transfer share this dtr should belong to
995
 
    std::string DtrTransferShare = transferShares.extract_share_info(request);
 
946
    std::string DtrTransferShare = transferSharesConf.extract_share_info(request);
996
947
 
997
948
    // If no share information could be obtained, use default share
998
949
    if (DtrTransferShare.empty())
1000
951
 
1001
952
    // If this share is a reference share, we have to add the sub-share
1002
953
    // to the reference list
1003
 
    bool in_reference = transferShares.is_configured(DtrTransferShare);
1004
 
    int priority = transferShares.get_basic_priority(DtrTransferShare);
 
954
    bool in_reference = transferSharesConf.is_configured(DtrTransferShare);
 
955
    int priority = transferSharesConf.get_basic_priority(DtrTransferShare);
1005
956
 
1006
957
    request.set_transfer_share(DtrTransferShare);
1007
958
    DtrTransferShare = request.get_transfer_share();
1008
959
 
1009
960
    // Now the sub-share is added to DtrTransferShare, add it to reference
1010
 
    // shares if appropriate
1011
 
    if (in_reference && !transferShares.is_configured(DtrTransferShare))
1012
 
      transferShares.set_reference_share(DtrTransferShare, priority);
1013
 
        
1014
 
    // Increase the number of DTRs belonging to this share
1015
 
    transferShares.increase_transfer_share(DtrTransferShare);
 
961
    // shares if appropriate and update each TransferShare
 
962
    if (in_reference && !transferSharesConf.is_configured(DtrTransferShare)) {
 
963
      transferSharesConf.set_reference_share(DtrTransferShare, priority);
 
964
    }
1016
965
    
1017
 
    // Compute the priority this DTR receives
1018
 
    // This is the priority of the share adjusted by the priority
1019
 
    // of the parent job
1020
 
    request.set_priority(int(transferShares.get_basic_priority(DtrTransferShare) * request.get_priority() * 0.01));
 
966
    // Compute the priority this DTR receives - this is the priority of the
 
967
    // share adjusted by the priority of the parent job
 
968
    request.set_priority(int(transferSharesConf.get_basic_priority(DtrTransferShare) * request.get_priority() * 0.01));
1021
969
    /* Shares part ends*/               
1022
 
    
1023
 
    DtrList.add_dtr(request);
1024
 
    
1025
 
    // Accepted successfully
1026
 
    return;
 
970
 
 
971
    DTR * new_dtr = DtrList.add_dtr(request);
 
972
    if (new_dtr) {
 
973
      add_event(new_dtr);
 
974
    }
1027
975
  }
1028
976
 
1029
977
  bool Scheduler::cancelDTRs(const std::string& jobid) {
1063
1011
    logger.msg(Arc::INFO, "  Delivery slots: %i", DeliverySlots);
1064
1012
    logger.msg(Arc::INFO, "  Emergency Delivery slots: %i", DeliveryEmergencySlots);
1065
1013
    logger.msg(Arc::INFO, "  Post-processor slots: %i", PostProcessorSlots);
1066
 
    logger.msg(Arc::INFO, "  Shares configuration:\n%s", transferShares.conf());
 
1014
    logger.msg(Arc::INFO, "  Shares configuration:\n%s", transferSharesConf.conf());
1067
1015
    for (std::vector<Arc::URL>::iterator i = delivery_services.begin(); i != delivery_services.end(); ++i) {
1068
1016
      if (*i == DTR::LOCAL_DELIVERY) logger.msg(Arc::INFO, "  Delivery service: LOCAL");
1069
1017
      else logger.msg(Arc::INFO, "  Delivery service: %s", i->str());
1070
1018
    }
1071
1019
 
1072
 
    // disconnect from root logger
1073
 
    // messages are logged to per-DTR Logger
 
1020
    // Disconnect from root logger so that messages are logged to per-DTR Logger
1074
1021
    Arc::Logger::getRootLogger().setThreadContext();
 
1022
    root_destinations = Arc::Logger::getRootLogger().getDestinations();
1075
1023
    Arc::Logger::getRootLogger().removeDestinations();
1076
1024
 
1077
1025
    // flag to say when to dump state
1078
1026
    bool dump = true;
1079
 
    while(scheduler_state != TO_STOP || !DtrList.all_dtrs().empty()) {
 
1027
    while(scheduler_state != TO_STOP || !DtrList.empty()) {
1080
1028
      // first check for cancelled jobs
1081
1029
      cancelled_jobs_lock.lock();
1082
1030
      std::list<std::string>::iterator jobid = cancelled_jobs.begin();
1094
1042
      // Dealing with pending events, i.e. DTRs from another processes
1095
1043
      process_events();
1096
1044
      // Revise all the internal queues and take actions
1097
 
      revise_pre_processor_queue();
1098
 
      revise_delivery_queue();
1099
 
      revise_post_processor_queue();
1100
 
      // log states for debugging
1101
 
      std::list<DTR*> DeliveryQueue;
1102
 
      DtrList.filter_dtrs_by_next_receiver(DELIVERY,DeliveryQueue);
1103
 
      // this logger is now disconnected, but in future this information will
1104
 
      // be available in other ways
1105
 
      logger.msg(Arc::DEBUG, "Pre-processor %i, DeliveryQueue %i, Delivery %i, Post-processor %i",
1106
 
                             DtrList.number_of_dtrs_by_owner(PRE_PROCESSOR),
1107
 
                             DeliveryQueue.size(),
1108
 
                             DtrList.number_of_dtrs_by_owner(DELIVERY),
1109
 
                             DtrList.number_of_dtrs_by_owner(POST_PROCESSOR));
 
1045
      revise_queues();
 
1046
 
1110
1047
      // every 5 seconds, dump state
1111
1048
      if (!dumplocation.empty() && Arc::Time().GetTime() % 5 == 0) {
1112
1049
        if (dump) {
1118
1055
      Glib::usleep(50000);
1119
1056
    }
1120
1057
    DtrList.dumpState(dumplocation);
1121
 
    logger.msg(Arc::INFO, "Scheduler loop exited");
 
1058
    log_to_root_logger(Arc::INFO, "Scheduler loop exited");
1122
1059
    run_signal.signal();
1123
1060
  }
1124
1061