14
14
#include <arc/data/DataHandle.h>
15
15
#include <arc/data/URLMap.h>
16
16
#include <arc/message/MCC.h>
17
#include <arc/Utils.h>
18
19
#include "AREXClient.h"
19
20
#include "JobControllerPluginARC1.h"
21
#include "JobStateARC1.h"
23
25
Logger JobControllerPluginARC1::logger(Logger::getRootLogger(), "JobControllerPlugin.ARC1");
27
URL JobControllerPluginARC1::GetAddressOfResource(const Job& job) {
28
return URL(XMLNode(job.IDFromEndpoint)["Address"]);
25
31
bool JobControllerPluginARC1::isEndpointNotSupported(const std::string& endpoint) const {
26
32
const std::string::size_type pos = endpoint.find("://");
30
36
void JobControllerPluginARC1::UpdateJobs(std::list<Job*>& jobs, std::list<URL>& IDsProcessed, std::list<URL>& IDsNotProcessed, bool isGrouped) const {
32
usercfg.ApplyToConfig(cfg);
34
37
for (std::list<Job*>::iterator it = jobs.begin(); it != jobs.end(); ++it) {
35
AREXClient ac((*it)->Cluster, cfg, usercfg.Timeout());
38
AutoPointer<AREXClient> ac(((AREXClients&)clients).acquire(GetAddressOfResource(**it), true));
37
40
AREXClient::createActivityIdentifier((*it)->JobID, idstr);
38
if (!ac.stat(idstr, **it)) {
41
if (!ac->stat(idstr, **it)) {
39
42
logger.msg(WARNING, "Job information not found in the information system: %s", (*it)->JobID.fullstr());
40
43
IDsNotProcessed.push_back((*it)->JobID);
44
((AREXClients&)clients).release(ac.Release());
43
47
IDsProcessed.push_back((*it)->JobID);
48
((AREXClients&)clients).release(ac.Release());
47
52
bool JobControllerPluginARC1::CleanJobs(const std::list<Job*>& jobs, std::list<URL>& IDsProcessed, std::list<URL>& IDsNotProcessed, bool isGrouped) const {
49
usercfg.ApplyToConfig(cfg);
51
54
for (std::list<Job*>::const_iterator it = jobs.begin(); it != jobs.end(); ++it) {
53
AREXClient ac(job.Cluster, cfg, usercfg.Timeout());
56
AutoPointer<AREXClient> ac(((AREXClients&)clients).acquire(GetAddressOfResource(job), true));
55
58
AREXClient::createActivityIdentifier(job.JobID, idstr);
56
if (!ac.clean(idstr)) {
59
if (!ac->clean(idstr)) {
58
61
IDsNotProcessed.push_back(job.JobID);
62
((AREXClients&)clients).release(ac.Release());
61
65
IDsProcessed.push_back(job.JobID);
66
((AREXClients&)clients).release(ac.Release());
67
72
bool JobControllerPluginARC1::CancelJobs(const std::list<Job*>& jobs, std::list<URL>& IDsProcessed, std::list<URL>& IDsNotProcessed, bool isGrouped) const {
69
usercfg.ApplyToConfig(cfg);
71
74
for (std::list<Job*>::const_iterator it = jobs.begin(); it != jobs.end(); ++it) {
73
AREXClient ac(job.Cluster, cfg, usercfg.Timeout());
76
AutoPointer<AREXClient> ac(((AREXClients&)clients).acquire(GetAddressOfResource(job), true));
75
78
AREXClient::createActivityIdentifier(job.JobID, idstr);
76
if (!ac.kill(idstr)) {
79
if (!ac->kill(idstr)) {
78
81
IDsNotProcessed.push_back(job.JobID);
82
((AREXClients&)clients).release(ac.Release());
85
job.State = JobStateARC1("killed");
81
86
IDsProcessed.push_back(job.JobID);
87
((AREXClients&)clients).release(ac.Release());
108
112
logger.msg(VERBOSE, "Resuming job: %s at state: %s (%s)", job.JobID.fullstr(), job.RestartState.GetGeneralState(), job.RestartState());
110
AREXClient ac(job.Cluster, cfg, usercfg.Timeout());
114
AutoPointer<AREXClient> ac(((AREXClients&)clients).acquire(GetAddressOfResource(job), true));
111
115
std::string idstr;
112
116
AREXClient::createActivityIdentifier(job.JobID, idstr);
113
if (!ac.resume(idstr)) {
117
if (!ac->resume(idstr)) {
115
119
IDsNotProcessed.push_back(job.JobID);
120
((AREXClients&)clients).release(ac.Release());
119
124
IDsProcessed.push_back(job.JobID);
125
((AREXClients&)clients).release(ac.Release());
120
126
logger.msg(VERBOSE, "Job resuming successful");
153
159
bool JobControllerPluginARC1::GetJobDescription(const Job& job, std::string& desc_str) const {
155
161
usercfg.ApplyToConfig(cfg);
156
AREXClient ac(job.Cluster, cfg, usercfg.Timeout());
162
AutoPointer<AREXClient> ac(((AREXClients&)clients).acquire(GetAddressOfResource(job), true));
157
163
std::string idstr;
158
164
AREXClient::createActivityIdentifier(job.JobID, idstr);
159
if (ac.getdesc(idstr, desc_str)) {
165
if (ac->getdesc(idstr, desc_str)) {
160
166
std::list<JobDescription> descs;
161
167
if (JobDescription::Parse(desc_str, descs) && !descs.empty()) {
168
((AREXClients&)clients).release(ac.Release());
172
((AREXClients&)clients).release(ac.Release());
166
174
logger.msg(ERROR, "Failed retrieving job description for job: %s", job.JobID.fullstr());