~ubuntu-branches/ubuntu/trusty/nordugrid-arc/trusty

« back to all changes in this revision

Viewing changes to src/hed/acc/ARC1/JobControllerPluginARC1.cpp

  • Committer: Package Import Robot
  • Author(s): Mattias Ellert
  • Date: 2012-12-13 16:41:31 UTC
  • mfrom: (3.1.11 sid)
  • Revision ID: package-import@ubuntu.com-20121213164131-wii0p2fcv7e3en93
Tags: 2.0.1-1
* 2.0.1 Release
* Drop patches accepted upstream

Show diffs side-by-side

added added

removed removed

Lines of Context:
14
14
#include <arc/data/DataHandle.h>
15
15
#include <arc/data/URLMap.h>
16
16
#include <arc/message/MCC.h>
 
17
#include <arc/Utils.h>
17
18
 
18
19
#include "AREXClient.h"
19
20
#include "JobControllerPluginARC1.h"
 
21
#include "JobStateARC1.h"
20
22
 
21
23
namespace Arc {
22
24
 
23
25
  Logger JobControllerPluginARC1::logger(Logger::getRootLogger(), "JobControllerPlugin.ARC1");
 
26
  
 
27
  URL JobControllerPluginARC1::GetAddressOfResource(const Job& job) {
 
28
    return URL(XMLNode(job.IDFromEndpoint)["Address"]); 
 
29
  }
24
30
 
25
31
  bool JobControllerPluginARC1::isEndpointNotSupported(const std::string& endpoint) const {
26
32
    const std::string::size_type pos = endpoint.find("://");
28
34
  }
29
35
 
30
36
  void JobControllerPluginARC1::UpdateJobs(std::list<Job*>& jobs, std::list<URL>& IDsProcessed, std::list<URL>& IDsNotProcessed, bool isGrouped) const {
31
 
    MCCConfig cfg;
32
 
    usercfg.ApplyToConfig(cfg);
33
 
 
34
37
    for (std::list<Job*>::iterator it = jobs.begin(); it != jobs.end(); ++it) {
35
 
      AREXClient ac((*it)->Cluster, cfg, usercfg.Timeout());
 
38
      AutoPointer<AREXClient> ac(((AREXClients&)clients).acquire(GetAddressOfResource(**it), true));
36
39
      std::string idstr;
37
40
      AREXClient::createActivityIdentifier((*it)->JobID, idstr);
38
 
      if (!ac.stat(idstr, **it)) {
 
41
      if (!ac->stat(idstr, **it)) {
39
42
        logger.msg(WARNING, "Job information not found in the information system: %s", (*it)->JobID.fullstr());
40
43
        IDsNotProcessed.push_back((*it)->JobID);
 
44
        ((AREXClients&)clients).release(ac.Release());
41
45
        continue;
42
46
      }
43
47
      IDsProcessed.push_back((*it)->JobID);
 
48
      ((AREXClients&)clients).release(ac.Release());
44
49
    }
45
50
  }
46
51
 
47
52
  bool JobControllerPluginARC1::CleanJobs(const std::list<Job*>& jobs, std::list<URL>& IDsProcessed, std::list<URL>& IDsNotProcessed, bool isGrouped) const {
48
 
    MCCConfig cfg;
49
 
    usercfg.ApplyToConfig(cfg);
50
53
    bool ok = true;
51
54
    for (std::list<Job*>::const_iterator it = jobs.begin(); it != jobs.end(); ++it) {
52
55
      Job& job = **it;
53
 
      AREXClient ac(job.Cluster, cfg, usercfg.Timeout());
 
56
      AutoPointer<AREXClient> ac(((AREXClients&)clients).acquire(GetAddressOfResource(job), true));
54
57
      std::string idstr;
55
58
      AREXClient::createActivityIdentifier(job.JobID, idstr);
56
 
      if (!ac.clean(idstr)) {
 
59
      if (!ac->clean(idstr)) {
57
60
        ok = false;
58
61
        IDsNotProcessed.push_back(job.JobID);
 
62
        ((AREXClients&)clients).release(ac.Release());
59
63
        continue;
60
64
      }
61
65
      IDsProcessed.push_back(job.JobID);
 
66
      ((AREXClients&)clients).release(ac.Release());
62
67
    }
63
68
    
64
69
    return ok;
65
70
  }
66
71
 
67
72
  bool JobControllerPluginARC1::CancelJobs(const std::list<Job*>& jobs, std::list<URL>& IDsProcessed, std::list<URL>& IDsNotProcessed, bool isGrouped) const {
68
 
    MCCConfig cfg;
69
 
    usercfg.ApplyToConfig(cfg);
70
73
    bool ok = true;
71
74
    for (std::list<Job*>::const_iterator it = jobs.begin(); it != jobs.end(); ++it) {
72
75
      Job& job = **it;
73
 
      AREXClient ac(job.Cluster, cfg, usercfg.Timeout());
 
76
      AutoPointer<AREXClient> ac(((AREXClients&)clients).acquire(GetAddressOfResource(job), true));
74
77
      std::string idstr;
75
78
      AREXClient::createActivityIdentifier(job.JobID, idstr);
76
 
      if (!ac.kill(idstr)) {
 
79
      if (!ac->kill(idstr)) {
77
80
        ok = false;
78
81
        IDsNotProcessed.push_back(job.JobID);
 
82
        ((AREXClients&)clients).release(ac.Release());
79
83
        continue;
80
84
      }
 
85
      job.State = JobStateARC1("killed");
81
86
      IDsProcessed.push_back(job.JobID);
 
87
      ((AREXClients&)clients).release(ac.Release());
82
88
    }
83
89
    
84
90
    return ok;
93
99
  }
94
100
 
95
101
  bool JobControllerPluginARC1::ResumeJobs(const std::list<Job*>& jobs, std::list<URL>& IDsProcessed, std::list<URL>& IDsNotProcessed, bool isGrouped) const {
96
 
    MCCConfig cfg;
97
 
    usercfg.ApplyToConfig(cfg);
98
102
    bool ok = true;
99
103
    for (std::list<Job*>::const_iterator it = jobs.begin(); it != jobs.end(); ++it) {
100
104
      Job& job = **it;
107
111
  
108
112
      logger.msg(VERBOSE, "Resuming job: %s at state: %s (%s)", job.JobID.fullstr(), job.RestartState.GetGeneralState(), job.RestartState());
109
113
  
110
 
      AREXClient ac(job.Cluster, cfg, usercfg.Timeout());
 
114
      AutoPointer<AREXClient> ac(((AREXClients&)clients).acquire(GetAddressOfResource(job), true));
111
115
      std::string idstr;
112
116
      AREXClient::createActivityIdentifier(job.JobID, idstr);
113
 
      if (!ac.resume(idstr)) {
 
117
      if (!ac->resume(idstr)) {
114
118
        ok = false;
115
119
        IDsNotProcessed.push_back(job.JobID);
 
120
        ((AREXClients&)clients).release(ac.Release());
116
121
        continue;
117
122
      }
118
123
 
119
124
      IDsProcessed.push_back(job.JobID);
 
125
      ((AREXClients&)clients).release(ac.Release());
120
126
      logger.msg(VERBOSE, "Job resuming successful");
121
127
    }
122
128
    
153
159
  bool JobControllerPluginARC1::GetJobDescription(const Job& job, std::string& desc_str) const {
154
160
    MCCConfig cfg;
155
161
    usercfg.ApplyToConfig(cfg);
156
 
    AREXClient ac(job.Cluster, cfg, usercfg.Timeout());
 
162
    AutoPointer<AREXClient> ac(((AREXClients&)clients).acquire(GetAddressOfResource(job), true));
157
163
    std::string idstr;
158
164
    AREXClient::createActivityIdentifier(job.JobID, idstr);
159
 
    if (ac.getdesc(idstr, desc_str)) {
 
165
    if (ac->getdesc(idstr, desc_str)) {
160
166
      std::list<JobDescription> descs;
161
167
      if (JobDescription::Parse(desc_str, descs) && !descs.empty()) {
 
168
        ((AREXClients&)clients).release(ac.Release());
162
169
        return true;
163
170
      }
164
171
    }
 
172
    ((AREXClients&)clients).release(ac.Release());
165
173
 
166
174
    logger.msg(ERROR, "Failed retrieving job description for job: %s", job.JobID.fullstr());
167
175
    return false;