722
void Scheduler::add_event(DTR* event) {
724
events.push_back(event);
727
728
void Scheduler::process_events(void){
728
std::list<DTR*> Events;
731
DtrList.filter_pending_dtrs(Events);
733
std::list<DTR*>::iterator Event;
734
for(Event = Events.begin(); Event != Events.end(); Event++){
735
map_state_and_process(*Event);
739
void Scheduler::revise_pre_processor_queue()
741
std::list<DTR*> PreProcessorQueue;
742
DtrList.filter_dtrs_by_next_receiver(PRE_PROCESSOR,PreProcessorQueue);
744
if (PreProcessorQueue.empty()) return;
746
// Sort the queue by priority
747
PreProcessorQueue.sort(dtr_sort_predicate);
751
std::list<DTR*>::iterator dtr = PreProcessorQueue.begin();
752
int highest_priority = (*dtr)->get_priority();
754
while (dtr != PreProcessorQueue.end()) {
757
// The cancellation requests break the normal workflow. A cancelled
758
// request will either go back to generator or be put into a
759
// post-processor state for clean up
760
if(tmp->cancel_requested()){
761
map_cancel_state_and_process(tmp);
762
dtr = PreProcessorQueue.erase(dtr);
766
// To avoid the situation where DTRs get blocked due to higher
767
// priority DTRs, DTRs that have passed their timeout should have their
768
// priority boosted. But this should only happen if there are higher
769
// priority DTRs, since there could be a large queue of low priority DTRs
770
// which, after having their priority boosted, would then block new
771
// high priority requests.
772
// The simple solution here is to increase priority by 1 every 5 minutes.
773
// There is plenty of scope for more intelligent solutions.
774
// TODO reset priority back to original value after pre-processing
775
if(tmp->get_timeout() < time(NULL) && tmp->get_priority() < highest_priority){
776
tmp->set_priority(tmp->get_priority() + 1);
777
tmp->set_timeout(300);
782
transferShares.calculate_shares(PreProcessorSlots);
784
std::list<DTR*> InPreProcessor;
785
DtrList.filter_dtrs_by_owner(PRE_PROCESSOR, InPreProcessor);
787
// Number of the DTRs running in the pre-processor
788
int PreProcessorRunning = InPreProcessor.size();
789
if (PreProcessorRunning == PreProcessorSlots) return;
791
// Decrease shares for those already in the pre-processor
792
for (dtr = InPreProcessor.begin(); dtr != InPreProcessor.end(); ++dtr) {
793
transferShares.decrease_number_of_slots((*dtr)->get_transfer_share());
796
// Send to pre-processor according to share and place in the queue
797
while(PreProcessorRunning < PreProcessorSlots && !PreProcessorQueue.empty()){
798
tmp = PreProcessorQueue.front();
799
PreProcessorQueue.pop_front();
800
if (transferShares.can_start(tmp->get_transfer_share())) {
801
tmp->push(PRE_PROCESSOR);
802
PreProcessorRunning++;
803
transferShares.decrease_number_of_slots(tmp->get_transfer_share());
808
void Scheduler::revise_post_processor_queue()
810
// There's no check for cancellation requests.
811
// The post-processor is special. Most DTRs
812
// with cancellation requests will go to the
813
// post-processor for cleanups, hold releases,
814
// etc., so the cancellation requests don't break
815
// normal workflow in the post-processor (as opposed
816
// to any other process), but instead act just as a
817
// sign that the post-processor should do additional
818
// cleanup activities.
820
std::list<DTR*> PostProcessorQueue;
821
DtrList.filter_dtrs_by_next_receiver(POST_PROCESSOR,PostProcessorQueue);
823
if (PostProcessorQueue.empty()) return;
825
// Sort the queue by priority
826
PostProcessorQueue.sort(dtr_sort_predicate);
830
std::list<DTR*>::iterator dtr = PostProcessorQueue.begin();
831
int highest_priority = (*dtr)->get_priority();
833
while (dtr != PostProcessorQueue.end()) {
837
// see revise_pre_processor_queue() for explanation
838
if(tmp->get_timeout() < time(NULL) && tmp->get_priority() < highest_priority){
839
tmp->set_priority(tmp->get_priority() + 1);
840
tmp->set_timeout(300);
845
transferShares.calculate_shares(PostProcessorSlots);
847
std::list<DTR*> InPostProcessor;
848
DtrList.filter_dtrs_by_owner(POST_PROCESSOR, InPostProcessor);
850
// Number of the DTRs running in the post-processor
851
int PostProcessorRunning = InPostProcessor.size();
852
if (PostProcessorRunning == PostProcessorSlots) return;
854
// Decrease shares for those already in the post-processor
855
for (dtr = InPostProcessor.begin(); dtr != InPostProcessor.end(); ++dtr) {
856
transferShares.decrease_number_of_slots((*dtr)->get_transfer_share());
859
// Send to pre-processor according to share and place in the queue
860
while(PostProcessorRunning < PostProcessorSlots && !PostProcessorQueue.empty()){
861
tmp = PostProcessorQueue.front();
862
PostProcessorQueue.pop_front();
863
if (transferShares.can_start(tmp->get_transfer_share())) {
864
tmp->push(POST_PROCESSOR);
865
PostProcessorRunning++;
866
transferShares.decrease_number_of_slots(tmp->get_transfer_share());
871
void Scheduler::revise_delivery_queue()
873
std::list<DTR*> DeliveryQueue;
874
DtrList.filter_dtrs_by_next_receiver(DELIVERY,DeliveryQueue);
876
// Sort the Delivery Queue according to
877
// the priorities the DTRs have.
878
DeliveryQueue.sort(dtr_sort_predicate);
881
int highest_priority = 0;
883
std::list<DTR*>::iterator dtr = DeliveryQueue.begin();
885
while (dtr != DeliveryQueue.end()) {
886
if (dtr == DeliveryQueue.begin()) highest_priority = (*dtr)->get_priority();
889
// The cancellation requests break the normal workflow. A cancelled
890
// request will either go back to generator or be put into a
891
// post-processor state for clean up
892
if(tmp->cancel_requested()){
893
map_cancel_state_and_process(tmp);
894
dtr = DeliveryQueue.erase(dtr);
898
// see revise_pre_processor_queue() for explanation
899
if(tmp->get_timeout() < time(NULL) && tmp->get_priority() < highest_priority){
900
tmp->set_priority(tmp->get_priority() + 1);
901
tmp->set_timeout(300);
906
transferShares.calculate_shares(DeliverySlots);
908
// Shares which have at least one DTR in Delivery
909
// Shares can only use emergency slots if they are not in this list
910
std::set<std::string> shares_in_delivery;
912
// The shares are re-calculated. Now we have to determine
913
// how many slots every share has already grabbed.
915
std::list<DTR*> shareDeliveryQueue;
916
std::list<DTR*>::iterator sharedtr;
919
DtrList.filter_dtrs_by_owner(DELIVERY,shareDeliveryQueue);
921
for(sharedtr = shareDeliveryQueue.begin(); sharedtr != shareDeliveryQueue.end(); sharedtr++){
922
sharetmp = *sharedtr;
923
// First check for cancellation - send cancellation call to Delivery
924
// and don't count as an active slot
925
if (sharetmp->cancel_requested()) {
926
// check if already cancelled
927
if (sharetmp->get_status() != DTRStatus::TRANSFERRING_CANCEL) {
928
sharetmp->get_logger()->msg(Arc::INFO, "DTR %s: Cancelling active transfer", sharetmp->get_short_id());
929
delivery.cancelDTR(sharetmp);
730
// Get all the DTRs in a staged state
731
staged_queue.clear();
732
DtrList.filter_dtrs_by_statuses(DTRStatus::StagedStates, staged_queue);
737
for (std::list<DTR*>::iterator event = events.begin(); event != events.end();) {
741
if (tmp->get_process_time() <= now) {
742
map_state_and_process(tmp);
743
// If final state, the DTR is returned to the generator and deleted
744
if (tmp->is_in_final_state()) {
745
ProcessDTRFINAL_STATE(tmp);
747
event = events.erase(event);
750
// If the event was sent on to a queue, erase it from the list
751
if (tmp->is_destined_for_pre_processor() ||
752
tmp->is_destined_for_delivery() ||
753
tmp->is_destined_for_post_processor()) {
755
event = events.erase(event);
765
void Scheduler::revise_queues() {
767
// The DTRs ready to go into a processing state
768
std::map<DTRStatus::DTRStatusType, std::list<DTR*> > DTRQueueStates;
769
DtrList.filter_dtrs_by_statuses(DTRStatus::ToProcessStates, DTRQueueStates);
771
// The active DTRs currently in processing states
772
std::map<DTRStatus::DTRStatusType, std::list<DTR*> > DTRRunningStates;
773
DtrList.filter_dtrs_by_statuses(DTRStatus::ProcessingStates, DTRRunningStates);
775
// Go through "to process" states, work out shares and push DTRs
776
for (unsigned int i = 0; i < DTRStatus::ToProcessStates.size(); ++i) {
778
std::list<DTR*> DTRQueue = DTRQueueStates[DTRStatus::ToProcessStates.at(i)];
779
std::list<DTR*> ActiveDTRs = DTRRunningStates[DTRStatus::ProcessingStates.at(i)];
781
if (DTRQueue.empty() && ActiveDTRs.empty()) continue;
783
// Transfer shares for this queue
784
TransferShares transferShares(transferSharesConf);
786
// Sort the DTR queue according to the priorities the DTRs have.
787
// Highest priority will be at the beginning of the list.
788
DTRQueue.sort(dtr_sort_predicate);
790
int highest_priority = 0;
792
// First go over the queue and check for cancellation and timeout
793
for (std::list<DTR*>::iterator dtr = DTRQueue.begin(); dtr != DTRQueue.end();) {
796
if (dtr == DTRQueue.begin()) highest_priority = tmp->get_priority();
798
// There's no check for cancellation requests for the post-processor.
799
// Most DTRs with cancellation requests will go to the post-processor
800
// for cleanups, hold releases, etc., so the cancellation requests
801
// don't break normal workflow in the post-processor (as opposed
802
// to any other process), but instead act just as a sign that the
803
// post-processor should do additional cleanup activities.
804
if (tmp->is_destined_for_pre_processor() || tmp->is_destined_for_delivery()) {
806
// The cancellation requests break the normal workflow. A cancelled
807
// request will either go back to generator or be put into a
808
// post-processor state for clean up.
809
if (tmp->cancel_requested()) {
810
map_cancel_state(tmp);
812
dtr = DTRQueue.erase(dtr);
816
// To avoid the situation where DTRs get blocked due to higher
817
// priority DTRs, DTRs that have passed their timeout should have their
818
// priority boosted. But this should only happen if there are higher
819
// priority DTRs, since there could be a large queue of low priority DTRs
820
// which, after having their priority boosted, would then block new
821
// high priority requests.
822
// The simple solution here is to increase priority by 1 every 5 minutes.
823
// There is plenty of scope for more intelligent solutions.
824
// TODO reset priority back to original value once past this stage.
825
if (tmp->get_timeout() < time(NULL) && tmp->get_priority() < highest_priority) {
826
tmp->set_priority(tmp->get_priority() + 1);
827
tmp->set_timeout(300);
829
transferShares.increase_transfer_share(tmp->get_transfer_share());
833
// Go over the active DTRs and add to transfer share
834
for (std::list<DTR*>::iterator dtr = ActiveDTRs.begin(); dtr != ActiveDTRs.end();) {
837
// If the DTR is in Delivery, check for cancellation. The pre- and
838
// post-processor DTRs don't get cancelled here but are allowed to
839
// continue processing.
840
if (tmp->get_status() == DTRStatus::TRANSFERRING && tmp->cancel_requested()) {
841
tmp->get_logger()->msg(Arc::INFO, "DTR %s: Cancelling active transfer", tmp->get_short_id());
842
delivery.cancelDTR(tmp);
843
dtr = ActiveDTRs.erase(dtr);
933
// Every active DTR for sure has its share represented
934
// in active shares. So we can just decrease the corresponding
936
transferShares.decrease_number_of_slots(sharetmp->get_transfer_share());
937
shares_in_delivery.insert(sharetmp->get_transfer_share());
941
// Refresh the number of DTRs running in the Delivery,
942
int DeliveryRunning = DtrList.number_of_dtrs_by_owner(DELIVERY);
944
// Now at the beginning of the queue we have DTRs that
945
// should be launched first. Launch them, but with respect
946
// to the transfer shares.
947
for(dtr = DeliveryQueue.begin(); dtr != DeliveryQueue.end(); dtr++){
949
// Limit reached - check if emergency slots are needed for any shares
950
// in the queue but not already running
951
if (DeliveryRunning >= DeliverySlots) {
952
if (DeliveryRunning == DeliverySlots + DeliveryEmergencySlots)
954
if ((shares_in_delivery.find(tmp->get_transfer_share()) == shares_in_delivery.end()) &&
955
transferShares.can_start(tmp->get_transfer_share())) {
956
// choose delivery service - random for now
957
// if this is a retry, try to use a different service
958
if (!delivery_services.empty()) {
959
tmp->set_delivery_endpoint(delivery_services.at(rand() % delivery_services.size()));
846
transferShares.increase_transfer_share((*dtr)->get_transfer_share());
850
// If the queue is empty we can go straight to the next state
851
if (DTRQueue.empty()) continue;
853
// Slot limit for this state
854
int slot_limit = DeliverySlots;
855
if (DTRQueue.front()->is_destined_for_pre_processor()) slot_limit = PreProcessorSlots;
856
else if (DTRQueue.front()->is_destined_for_post_processor()) slot_limit = PostProcessorSlots;
858
// Calculate the slots available for each active share
859
transferShares.calculate_shares(slot_limit);
861
// Shares which have at least one DTR active and running.
862
// Shares can only use emergency slots if they are not in this list.
863
std::set<std::string> active_shares;
864
int running = ActiveDTRs.size();
866
// Go over the active DTRs again and decrease slots in corresponding shares
867
for (std::list<DTR*>::iterator dtr = ActiveDTRs.begin(); dtr != ActiveDTRs.end(); ++dtr) {
868
transferShares.decrease_number_of_slots((*dtr)->get_transfer_share());
869
active_shares.insert((*dtr)->get_transfer_share());
872
// Now at the beginning of the queue we have DTRs that should be
873
// launched first. Launch them, but with respect to the transfer shares.
874
for (std::list<DTR*>::iterator dtr = DTRQueue.begin(); dtr != DTRQueue.end(); ++dtr) {
878
// Check if there are any shares left in the queue which might need
879
// an emergency share - if not we are done
880
if (running >= slot_limit &&
881
transferShares.active_shares().size() == active_shares.size()) break;
883
// Are there slots left for this share?
884
bool can_start = transferShares.can_start(tmp->get_transfer_share());
885
// Check if it is possible to use an emergency share
886
if (running >= slot_limit &&
887
active_shares.find(tmp->get_transfer_share()) != active_shares.end()) {
893
// If going into Delivery, choose delivery service - random for now
894
if (tmp->is_destined_for_delivery() && !delivery_services.empty()) {
896
Arc::URL delivery_endpoint(tmp->get_delivery_endpoint());
897
// If this is a retry, use a different service
898
if (tmp->get_tries_left() < tmp->get_initial_tries() && delivery_services.size() > 1) {
899
Arc::URL ep(delivery_endpoint);
900
// Find a random service different from the previous one, looping a
901
// limited number of times in case all delivery_services are the same url
902
for (unsigned int i = 0; ep == delivery_endpoint && i < delivery_services.size() * 10; ++i) {
903
ep = delivery_services.at(rand() % delivery_services.size());
905
delivery_endpoint = ep;
907
delivery_endpoint = delivery_services.at(rand() % delivery_services.size());
909
tmp->set_delivery_endpoint(delivery_endpoint);
961
911
transferShares.decrease_number_of_slots(tmp->get_transfer_share());
962
tmp->set_status(DTRStatus::TRANSFER);
965
shares_in_delivery.insert(tmp->get_transfer_share());
968
else if(transferShares.can_start(tmp->get_transfer_share())){
969
// choose delivery service - random for now
970
// if this is a retry, try to use a different service
971
if (!delivery_services.empty()) {
972
tmp->set_delivery_endpoint(delivery_services.at(rand() % delivery_services.size()));
974
transferShares.decrease_number_of_slots(tmp->get_transfer_share());
975
tmp->set_status(DTRStatus::TRANSFER);
978
shares_in_delivery.insert(tmp->get_transfer_share());
913
// Send to processor/delivery
914
if (tmp->is_destined_for_pre_processor()) tmp->push(PRE_PROCESSOR);
915
else if (tmp->is_destined_for_delivery()) tmp->push(DELIVERY);
916
else if (tmp->is_destined_for_post_processor()) tmp->push(POST_PROCESSOR);
919
active_shares.insert(tmp->get_transfer_share());
921
// Hard limit with all emergency slots used
922
if (running == slot_limit + DeliveryEmergencySlots) break;
984
927
void Scheduler::receiveDTR(DTR& request){
985
if(request.get_status() != DTRStatus::NEW) {
986
// If DTR is not NEW scheduler will pick it up itself.
929
if (request.get_status() != DTRStatus::NEW) {
933
// New DTR - first check it is valid
935
logger.msg(Arc::ERROR, "Scheduler received invalid DTR");
936
request.set_status(DTRStatus::ERROR);
937
request.push(GENERATOR);
990
941
request.registerCallback(&processor,PRE_PROCESSOR);
991
942
request.registerCallback(&processor,POST_PROCESSOR);
992
943
request.registerCallback(&delivery,DELIVERY);
994
945
// First, get the transfer share this dtr should belong to
995
std::string DtrTransferShare = transferShares.extract_share_info(request);
946
std::string DtrTransferShare = transferSharesConf.extract_share_info(request);
997
948
// If no share information could be obtained, use default share
998
949
if (DtrTransferShare.empty())