1
// -*- indent-tabs-mode: nil -*-
11
#include <arc/ArcConfig.h>
12
#include <arc/FileLock.h>
13
#include <arc/IString.h>
14
#include <arc/Logger.h>
15
#include <arc/StringConv.h>
16
#include <arc/XMLNode.h>
17
#include <arc/client/JobControllerPlugin.h>
18
#include <arc/data/DataHandle.h>
19
#include <arc/data/DataMover.h>
20
#include <arc/data/FileCache.h>
24
#define JXMLTOSTRING(NAME) \
26
NAME = (std::string)job[ #NAME ];\
29
#define JXMLSTRINGTO(TYPE, NAME) \
31
TYPE temp##TYPE##NAME;\
32
if (stringto((std::string)job[ #NAME ], temp##TYPE##NAME)) {\
33
NAME = temp##TYPE##NAME;\
37
#define JXMLTOTIME(NAME) \
39
Time temp##NAME((std::string)job[ #NAME ]);\
40
if (temp##NAME.GetTime() != -1) {\
45
#define JXMLTOSTRINGLIST(NAME) \
47
for (XMLNode n = job[ #NAME ]; n; ++n) {\
48
NAME.push_back((std::string)n);\
51
#define STRINGTOXML(NAME) \
52
if (!(NAME).empty()) {\
53
node.NewChild( #NAME ) = NAME;\
56
#define URLTOXML(NAME) \
58
node.NewChild( #NAME ) = NAME.fullstr();\
61
#define INTTOXML(NAME) \
63
node.NewChild( #NAME ) = tostring(NAME);\
66
#define TIMETOSTRING(NAME) \
68
node.NewChild( #NAME ) = (NAME).str(UTCTime);\
71
#define PERIODTOSTRING(NAME) \
73
node.NewChild( #NAME ) = (std::string)NAME;\
76
#define STRINGLISTTOXML(NAME) \
77
for (std::list<std::string>::const_iterator it = NAME.begin();\
78
it != NAME.end(); it++) {\
79
node.NewChild( #NAME ) = *it;\
84
Logger Job::logger(Logger::getRootLogger(), "Job");
86
JobControllerPluginLoader Job::loader;
88
DataHandle* Job::data_source = NULL;
89
DataHandle* Job::data_destination = NULL;
94
RequestedTotalWallTime(-1),
95
RequestedTotalCPUTime(-1),
97
UsedTotalWallTime(-1),
100
LocalSubmissionTime(-1),
102
ComputingManagerSubmissionTime(-1),
104
ComputingManagerEndTime(-1),
106
WorkingAreaEraseTime(-1),
107
ProxyExpirationTime(-1),
110
VirtualMachine(false),
115
Job::Job(const Job& j)
118
RequestedTotalWallTime(-1),
119
RequestedTotalCPUTime(-1),
121
UsedTotalWallTime(-1),
122
UsedTotalCPUTime(-1),
124
LocalSubmissionTime(-1),
126
ComputingManagerSubmissionTime(-1),
128
ComputingManagerEndTime(-1),
130
WorkingAreaEraseTime(-1),
131
ProxyExpirationTime(-1),
134
VirtualMachine(false),
142
RequestedTotalWallTime(-1),
143
RequestedTotalCPUTime(-1),
145
UsedTotalWallTime(-1),
146
UsedTotalCPUTime(-1),
148
LocalSubmissionTime(-1),
150
ComputingManagerSubmissionTime(-1),
152
ComputingManagerEndTime(-1),
154
WorkingAreaEraseTime(-1),
155
ProxyExpirationTime(-1),
158
VirtualMachine(false) {
162
Job& Job::operator=(const Job& j) {
167
InterfaceName = j.InterfaceName;
171
IDFromEndpoint = j.IDFromEndpoint;
172
LocalIDFromManager = j.LocalIDFromManager;
173
JobDescription = j.JobDescription;
174
JobDescriptionDocument = j.JobDescriptionDocument;
176
RestartState = j.RestartState;
177
ExitCode = j.ExitCode;
178
ComputingManagerExitCode = j.ComputingManagerExitCode;
180
WaitingPosition = j.WaitingPosition;
181
UserDomain = j.UserDomain;
183
LocalOwner = j.LocalOwner;
184
RequestedTotalWallTime = j.RequestedTotalWallTime;
185
RequestedTotalCPUTime = j.RequestedTotalCPUTime;
186
RequestedSlots = j.RequestedSlots;
187
RequestedApplicationEnvironment = j.RequestedApplicationEnvironment;
192
ExecutionNode = j.ExecutionNode;
194
UsedTotalWallTime = j.UsedTotalWallTime;
195
UsedTotalCPUTime = j.UsedTotalCPUTime;
196
UsedMainMemory = j.UsedMainMemory;
197
RequestedApplicationEnvironment = j.RequestedApplicationEnvironment;
198
RequestedSlots = j.RequestedSlots;
199
LocalSubmissionTime = j.LocalSubmissionTime;
200
SubmissionTime = j.SubmissionTime;
201
ComputingManagerSubmissionTime = j.ComputingManagerSubmissionTime;
202
StartTime = j.StartTime;
203
ComputingManagerEndTime = j.ComputingManagerEndTime;
205
WorkingAreaEraseTime = j.WorkingAreaEraseTime;
206
ProxyExpirationTime = j.ProxyExpirationTime;
207
SubmissionHost = j.SubmissionHost;
208
SubmissionClientName = j.SubmissionClientName;
209
CreationTime = j.CreationTime;
210
Validity = j.Validity;
211
OtherMessages = j.OtherMessages;
213
ActivityOldID = j.ActivityOldID;
214
LocalInputFiles = j.LocalInputFiles;
216
VirtualMachine = j.VirtualMachine;
217
UsedCPUType = j.UsedCPUType;
218
UsedOSFamily = j.UsedOSFamily;
219
UsedPlatform = j.UsedPlatform;
224
int Job::operator==(const Job& other) { return JobID == other.JobID; }
226
Job& Job::operator=(XMLNode job) {
229
// Information specific to how job is stored in jobs list
231
JobID = URL((std::string)job["JobID"]);
232
} else if (job["IDFromEndpoint"]) { // Backwardscompatibility: Pre 2.0.0 format.
233
JobID = URL((std::string)job["IDFromEndpoint"]);
235
if (job["IDFromEndpoint"]
236
/* If this is pre 2.0.0 format then JobID element would not
237
* exist, and the usage of IDFromEndpoint element corresponded
238
* to the current usage of the JobID element. Therefore only set
239
* the IDFromEndpoint member if the JobID _does_ exist.
243
IDFromEndpoint = (std::string)job["IDFromEndpoint"];
247
JXMLTOSTRING(Cluster)
248
if (job["InterfaceName"]) {
249
JXMLTOSTRING(InterfaceName)
251
else if (job["Flavour"]) {
252
if ((std::string)job["Flavour"] == "ARC0") InterfaceName = "org.nordugrid.gridftpjob";
253
else if ((std::string)job["Flavour"] == "BES") InterfaceName = "org.ogf.bes";
254
else if ((std::string)job["Flavour"] == "ARC1") InterfaceName = "org.nordugrid.xbes";
255
else if ((std::string)job["Flavour"] == "EMIES") InterfaceName = "org.ogf.emies";
256
else if ((std::string)job["Flavour"] == "TEST") InterfaceName = "org.nordugrid.test";
259
if (job["InfoEndpoint"] && job["Flavour"] && (std::string)job["Flavour"] == "ARC0") {
260
IDFromEndpoint = (std::string)job["InfoEndpoint"];
263
if (job["JobDescription"]) {
264
const std::string sjobdesc = job["JobDescription"];
265
if (job["JobDescriptionDocument"] || job["State"] ||
266
!job["LocalSubmissionTime"]) {
267
// If the 'JobDescriptionDocument' or 'State' element is set assume that the 'JobDescription' element is the GLUE2 one.
268
// Default is to assume it is the GLUE2 one.
269
JobDescription = sjobdesc;
272
// If the 'LocalSubmissionTime' element is set assume that the 'JobDescription' element contains the actual job description.
273
JobDescriptionDocument = sjobdesc;
277
JXMLTOSTRING(JobDescriptionDocument)
278
JXMLTOTIME(LocalSubmissionTime)
280
if (job["Associations"]["ActivityOldID"]) {
281
ActivityOldID.clear();
282
for (XMLNode n = job["Associations"]["ActivityOldID"]; n; ++n) {
283
ActivityOldID.push_back((std::string)n);
286
else if (job["OldJobID"]) { // Included for backwards compatibility.
287
ActivityOldID.clear();
288
for (XMLNode n = job["OldJobID"]; n; ++n) {
289
ActivityOldID.push_back((std::string)n);
293
if (job["Associations"]["LocalInputFile"]) {
294
LocalInputFiles.clear();
295
for (XMLNode n = job["Associations"]["LocalInputFile"]; n; ++n) {
296
if (n["Source"] && n["CheckSum"]) {
297
LocalInputFiles[(std::string)n["Source"]] = (std::string)n["CheckSum"];
301
else if (job["LocalInputFiles"]["File"]) { // Included for backwards compatibility.
302
LocalInputFiles.clear();
303
for (XMLNode n = job["LocalInputFiles"]["File"]; n; ++n) {
304
if (n["Source"] && n["CheckSum"]) {
305
LocalInputFiles[(std::string)n["Source"]] = (std::string)n["CheckSum"];
310
// Pick generic GLUE2 information
316
void Job::Update(XMLNode job) {
320
// TODO: find out how to treat IDFromEndpoint in case of pure GLUE2
322
JXMLTOSTRING(LocalIDFromManager)
324
/* Earlier the 'JobDescription' element in a XMLNode representing a Job
325
* object contained the actual job description, but in GLUE2 the name
326
* 'JobDescription' specifies the job description language which was used to
327
* describe the job. Due to the name clash we must guess what is meant when
328
* parsing the 'JobDescription' element.
331
// TODO: same for JobDescription
333
// Parse libarcclient special state format.
334
if (job["State"]["General"] && job["State"]["Specific"]) {
335
State.state = (std::string)job["State"]["Specific"];
336
State.type = JobState::GetStateType((std::string)job["State"]["General"]);
338
// Only use the first state. ACC modules should set the state them selves.
339
else if (job["State"] && job["State"].Size() == 0) {
340
State.state = (std::string)job["State"];
341
State.type = JobState::OTHER;
343
if (job["RestartState"]["General"] && job["RestartState"]["Specific"]) {
344
RestartState.state = (std::string)job["RestartState"]["Specific"];
345
RestartState.type = JobState::GetStateType((std::string)job["RestartState"]["General"]);
347
// Only use the first state. ACC modules should set the state them selves.
348
else if (job["RestartState"] && job["RestartState"].Size() == 0) {
349
RestartState.state = (std::string)job["RestartState"];
350
RestartState.type = JobState::OTHER;
353
JXMLSTRINGTO(int, ExitCode)
354
JXMLTOSTRING(ComputingManagerExitCode)
355
JXMLTOSTRINGLIST(Error)
356
JXMLSTRINGTO(int, WaitingPosition)
357
JXMLTOSTRING(UserDomain)
359
JXMLTOSTRING(LocalOwner)
360
JXMLSTRINGTO(long, RequestedTotalWallTime)
361
JXMLSTRINGTO(long, RequestedTotalCPUTime)
362
JXMLSTRINGTO(int, RequestedSlots)
363
JXMLTOSTRINGLIST(RequestedApplicationEnvironment)
368
JXMLTOSTRINGLIST(ExecutionNode)
370
JXMLSTRINGTO(long, UsedTotalWallTime)
371
JXMLSTRINGTO(long, UsedTotalCPUTime)
372
JXMLSTRINGTO(int, UsedMainMemory)
373
JXMLTOTIME(SubmissionTime)
374
JXMLTOTIME(ComputingManagerSubmissionTime)
375
JXMLTOTIME(StartTime)
376
JXMLTOTIME(ComputingManagerEndTime)
378
JXMLTOTIME(WorkingAreaEraseTime)
379
JXMLTOTIME(ProxyExpirationTime)
380
JXMLTOSTRING(SubmissionHost)
381
JXMLTOSTRING(SubmissionClientName)
382
JXMLTOSTRINGLIST(OtherMessages)
385
void Job::ToXML(XMLNode node) const {
389
STRINGTOXML(InterfaceName)
391
STRINGTOXML(IDFromEndpoint)
392
STRINGTOXML(LocalIDFromManager)
393
STRINGTOXML(JobDescription)
394
STRINGTOXML(JobDescriptionDocument)
396
node.NewChild("State");
397
node["State"].NewChild("Specific") = State();
398
node["State"].NewChild("General") = State.GetGeneralState();
401
node.NewChild("RestartState");
402
node["RestartState"].NewChild("Specific") = RestartState();
403
node["RestartState"].NewChild("General") = RestartState.GetGeneralState();
406
STRINGTOXML(ComputingManagerExitCode)
407
STRINGLISTTOXML(Error)
408
INTTOXML(WaitingPosition)
409
STRINGTOXML(UserDomain)
411
STRINGTOXML(LocalOwner)
412
PERIODTOSTRING(RequestedTotalWallTime)
413
PERIODTOSTRING(RequestedTotalCPUTime)
414
INTTOXML(RequestedSlots)
415
STRINGLISTTOXML(RequestedApplicationEnvironment)
420
STRINGLISTTOXML(ExecutionNode)
422
PERIODTOSTRING(UsedTotalWallTime)
423
PERIODTOSTRING(UsedTotalCPUTime)
424
INTTOXML(UsedMainMemory)
425
TIMETOSTRING(LocalSubmissionTime)
426
TIMETOSTRING(SubmissionTime)
427
TIMETOSTRING(ComputingManagerSubmissionTime)
428
TIMETOSTRING(StartTime)
429
TIMETOSTRING(ComputingManagerEndTime)
430
TIMETOSTRING(EndTime)
431
TIMETOSTRING(WorkingAreaEraseTime)
432
TIMETOSTRING(ProxyExpirationTime)
433
STRINGTOXML(SubmissionHost)
434
STRINGTOXML(SubmissionClientName)
435
STRINGLISTTOXML(OtherMessages)
437
if ((ActivityOldID.size() > 0 || LocalInputFiles.size() > 0) && !node["Associations"]) {
438
node.NewChild("Associations");
441
for (std::list<std::string>::const_iterator it = ActivityOldID.begin();
442
it != ActivityOldID.end(); it++) {
443
node["Associations"].NewChild("ActivityOldID") = *it;
446
for (std::map<std::string, std::string>::const_iterator it = LocalInputFiles.begin();
447
it != LocalInputFiles.end(); it++) {
448
XMLNode lif = node["Associations"].NewChild("LocalInputFile");
449
lif.NewChild("Source") = it->first;
450
lif.NewChild("CheckSum") = it->second;
454
void Job::SaveToStream(std::ostream& out, bool longlist) const {
455
out << IString("Job: %s", JobID.fullstr()) << std::endl;
457
out << IString(" Name: %s", Name) << std::endl;
458
if (!State().empty())
459
out << IString(" State: %s (%s)", State.GetGeneralState(), State())
461
if (State == JobState::QUEUING && WaitingPosition != -1) {
462
out << IString(" Waiting Position: %d", WaitingPosition) << std::endl;
466
out << IString(" Exit Code: %d", ExitCode) << std::endl;
467
if (!Error.empty()) {
468
for (std::list<std::string>::const_iterator it = Error.begin();
469
it != Error.end(); it++)
470
out << IString(" Job Error: %s", *it) << std::endl;
475
out << IString(" Owner: %s", Owner) << std::endl;
476
if (!OtherMessages.empty())
477
for (std::list<std::string>::const_iterator it = OtherMessages.begin();
478
it != OtherMessages.end(); it++)
479
out << IString(" Other Messages: %s", *it)
482
out << IString(" Queue: %s", Queue) << std::endl;
483
if (RequestedSlots != -1)
484
out << IString(" Requested Slots: %d", RequestedSlots) << std::endl;
485
if (WaitingPosition != -1)
486
out << IString(" Waiting Position: %d", WaitingPosition)
489
out << IString(" Stdin: %s", StdIn) << std::endl;
491
out << IString(" Stdout: %s", StdOut) << std::endl;
493
out << IString(" Stderr: %s", StdErr) << std::endl;
495
out << IString(" Grid Manager Log Directory: %s", LogDir)
497
if (SubmissionTime != -1)
498
out << IString(" Submitted: %s",
499
(std::string)SubmissionTime) << std::endl;
501
out << IString(" End Time: %s", (std::string)EndTime)
503
if (!SubmissionHost.empty())
504
out << IString(" Submitted from: %s", SubmissionHost)
506
if (!SubmissionClientName.empty())
507
out << IString(" Submitting client: %s",
508
SubmissionClientName) << std::endl;
509
if (RequestedTotalCPUTime != -1)
510
out << IString(" Requested CPU Time: %s",
511
RequestedTotalCPUTime.istr())
513
if (UsedTotalCPUTime != -1)
514
out << IString(" Used CPU Time: %s",
515
UsedTotalCPUTime.istr()) << std::endl;
516
if (UsedTotalWallTime != -1)
517
out << IString(" Used Wall Time: %s",
518
UsedTotalWallTime.istr()) << std::endl;
519
if (UsedMainMemory != -1)
520
out << IString(" Used Memory: %d", UsedMainMemory)
522
if (WorkingAreaEraseTime != -1)
523
out << IString((State == JobState::DELETED) ?
524
istring(" Results were deleted: %s") :
525
istring(" Results must be retrieved before: %s"),
526
(std::string)WorkingAreaEraseTime)
528
if (ProxyExpirationTime != -1)
529
out << IString(" Proxy valid until: %s",
530
(std::string)ProxyExpirationTime)
532
if (CreationTime != -1)
533
out << IString(" Entry valid from: %s",
534
(std::string)CreationTime) << std::endl;
536
out << IString(" Entry valid for: %s",
537
Validity.istr()) << std::endl;
539
if (!ActivityOldID.empty()) {
540
out << IString(" Old job IDs:") << std::endl;
541
for (std::list<std::string>::const_iterator it = ActivityOldID.begin();
542
it != ActivityOldID.end(); ++it) {
543
out << " " << *it << std::endl;
547
out << IString(" Cluster: %s", Cluster.fullstr()) << std::endl;
548
out << IString(" Management Interface: %s", InterfaceName) << std::endl;
554
bool Job::GetURLToResource(ResourceType resource, URL& url) const { return jc ? jc->GetURLToJobResource(*this, resource, url) : false; }
556
bool Job::Retrieve(const UserConfig& uc, const URL& destination, bool force) const {
558
logger.msg(ERROR, "Invalid download destination path specified (%s)", destination.fullstr());
563
logger.msg(DEBUG, "Unable to download job (%s), no JobControllerPlugin plugin was set to handle the job.", JobID.str());
567
logger.msg(VERBOSE, "Downloading job: %s", JobID.str());
569
URL src, dst(destination);
570
if (!jc->GetURLToJobResource(*this, STAGEOUTDIR, src)) {
571
logger.msg(ERROR, "Cant retrieve job files for job (%s) - unable to determine URL of stage out directory", JobID.fullstr());
576
logger.msg(ERROR, "Invalid stage out path specified (%s)", src.fullstr());
580
if (!force && Glib::file_test(dst.Path(), Glib::FILE_TEST_EXISTS)) {
581
logger.msg(WARNING, "%s directory exist! Skipping job.", dst.Path());
585
std::list<std::string> files;
586
if (!ListFilesRecursive(uc, src, files)) {
587
logger.msg(ERROR, "Unable to retrieve list of job files to download for job %s", JobID.fullstr());
591
const std::string srcpath = src.Path() + (src.Path().empty() || *src.Path().rbegin() != '/' ? "/" : "");
592
const std::string dstpath = dst.Path() + (dst.Path().empty() || *dst.Path().rbegin() != G_DIR_SEPARATOR ? G_DIR_SEPARATOR_S : "");
595
for (std::list<std::string>::const_iterator it = files.begin(); it != files.end(); ++it) {
596
src.ChangePath(srcpath + *it);
597
dst.ChangePath(dstpath + *it);
598
if (!CopyJobFile(uc, src, dst)) {
599
logger.msg(INFO, "Failed downloading %s to %s", src.str(), dst.str());
607
bool Job::ListFilesRecursive(const UserConfig& uc, const URL& dir, std::list<std::string>& files, const std::string& prefix) {
608
std::list<FileInfo> outputfiles;
610
DataHandle handle(dir, uc);
612
logger.msg(INFO, "Unable to list files at %s", dir.str());
615
if(!handle->List(outputfiles, (Arc::DataPoint::DataPointInfoType)
616
(DataPoint::INFO_TYPE_NAME | DataPoint::INFO_TYPE_TYPE))) {
617
logger.msg(INFO, "Unable to list files at %s", dir.str());
621
for (std::list<FileInfo>::iterator i = outputfiles.begin();
622
i != outputfiles.end(); i++) {
623
if (i->GetName() == ".." || i->GetName() == ".") {
627
if (i->GetType() == FileInfo::file_type_unknown ||
628
i->GetType() == FileInfo::file_type_file) {
629
files.push_back(prefix + i->GetName());
631
else if (i->GetType() == FileInfo::file_type_dir) {
632
std::string path = dir.Path();
633
if (path[path.size() - 1] != '/') {
637
tmpdir.ChangePath(path + i->GetName());
639
std::string dirname = i->GetName();
640
if (dirname[dirname.size() - 1] != '/') {
643
if (!ListFilesRecursive(uc, tmpdir, files, dirname)) {
652
bool Job::CopyJobFile(const UserConfig& uc, const URL& src, const URL& dst) {
657
mover.verbose(false);
659
logger.msg(VERBOSE, "Now copying (from -> to)");
660
logger.msg(VERBOSE, " %s -> %s", src.str(), dst.str());
664
src_.AddOption("checksum=no");
665
dst_.AddOption("checksum=no");
667
if ((!data_source) || (!*data_source) ||
668
(!(*data_source)->SetURL(src_))) {
669
if(data_source) delete data_source;
670
data_source = new DataHandle(src_, uc);
672
DataHandle& source = *data_source;
674
logger.msg(ERROR, "Unable to initialise connection to source: %s", src.str());
678
if ((!data_destination) || (!*data_destination) ||
679
(!(*data_destination)->SetURL(dst_))) {
680
if(data_destination) delete data_destination;
681
data_destination = new DataHandle(dst_, uc);
683
DataHandle& destination = *data_destination;
685
logger.msg(ERROR, "Unable to initialise connection to destination: %s",
690
// Set desired number of retries. Also resets any lost
691
// tries from previous files.
692
source->SetTries((src.Protocol() == "file")?1:3);
693
destination->SetTries((dst.Protocol() == "file")?1:3);
695
// Turn off all features we do not need
696
source->SetAdditionalChecks(false);
697
destination->SetAdditionalChecks(false);
701
mover.Transfer(*source, *destination, cache, URLMap(), 0, 0, 0,
704
if (!res.GetDesc().empty())
705
logger.msg(ERROR, "File download failed: %s - %s", std::string(res), res.GetDesc());
707
logger.msg(ERROR, "File download failed: %s", std::string(res));
708
// Reset connection because one can't be sure how failure
709
// affects server and/or connection state.
710
// TODO: Investigate/define DMC behavior in such case.
713
delete data_destination;
714
data_destination = NULL;
721
bool Job::ReadAllJobsFromFile(const std::string& filename, std::list<Job>& jobs, unsigned nTries, unsigned tryInterval) {
725
FileLock lock(filename);
726
bool acquired = false;
727
for (int tries = (int)nTries; tries > 0; --tries) {
728
acquired = lock.acquire();
730
if (!jobstorage.ReadFromFile(filename)) {
739
logger.msg(VERBOSE, "Waiting for lock on job list file %s", filename);
742
Glib::usleep(tryInterval);
749
XMLNodeList xmljobs = jobstorage.Path("Job");
750
for (XMLNodeList::iterator xit = xmljobs.begin(); xit != xmljobs.end(); ++xit) {
751
jobs.push_back(*xit);
757
bool Job::ReadJobsFromFile(const std::string& filename, std::list<Job>& jobs, std::list<std::string>& jobIdentifiers, bool all, const std::list<std::string>& endpoints, const std::list<std::string>& rEndpoints, unsigned nTries, unsigned tryInterval) {
758
if (!ReadAllJobsFromFile(filename, jobs, nTries, tryInterval)) { return false; }
760
std::list<std::string> jobIdentifiersCopy = jobIdentifiers;
761
for (std::list<Arc::Job>::iterator itJ = jobs.begin();
762
itJ != jobs.end();) {
763
// Check if the job (itJ) is selected by the job identifies, either by job ID or Name.
764
std::list<std::string>::iterator itJIdentifier = jobIdentifiers.begin();
765
for (;itJIdentifier != jobIdentifiers.end(); ++itJIdentifier) {
766
if ((!itJ->Name.empty() && itJ->Name == *itJIdentifier) ||
767
(itJ->JobID.fullstr() == URL(*itJIdentifier).fullstr())) {
771
if (itJIdentifier != jobIdentifiers.end()) {
772
// Job explicitly specified. Remove id from the copy list, in order to keep track of used identifiers.
773
std::list<std::string>::iterator itJIdentifierCopy = std::find(jobIdentifiersCopy.begin(), jobIdentifiersCopy.end(), *itJIdentifier);
774
if (itJIdentifierCopy != jobIdentifiersCopy.end()) {
775
jobIdentifiersCopy.erase(itJIdentifierCopy);
782
// Check if the job (itJ) is selected by endpoints.
783
std::list<std::string>::const_iterator itC = endpoints.begin();
784
for (; itC != endpoints.end(); ++itC) {
785
if (itJ->Cluster.StringMatches(*itC)) {
789
if (itC != endpoints.end()) {
790
// Cluster on which job reside is explicitly specified.
795
// Job is not selected - remove it.
796
itJ = jobs.erase(itJ);
803
// Filter jobs on rejected clusters.
804
for (std::list<std::string>::const_iterator itC = rEndpoints.begin();
805
itC != rEndpoints.end(); ++itC) {
806
for (std::list<Arc::Job>::iterator itJ = jobs.begin(); itJ != jobs.end();) {
807
if (itJ->Cluster.StringMatches(*itC)) {
808
itJ = jobs.erase(itJ);
816
jobIdentifiers = jobIdentifiersCopy;
821
bool Job::WriteJobsToTruncatedFile(const std::string& filename, const std::list<Job>& jobs, unsigned nTries, unsigned tryInterval) {
823
std::map<std::string, XMLNode> jobIDXMLMap;
824
for (std::list<Job>::const_iterator it = jobs.begin();
825
it != jobs.end(); it++) {
826
std::map<std::string, XMLNode>::iterator itJobXML = jobIDXMLMap.find(it->JobID.fullstr());
827
if (itJobXML == jobIDXMLMap.end()) {
828
XMLNode xJob = jobfile.NewChild("Job");
830
jobIDXMLMap[it->JobID.fullstr()] = xJob;
833
itJobXML->second.Replace(XMLNode(NS(), "Job"));
834
it->ToXML(itJobXML->second);
838
FileLock lock(filename);
839
for (int tries = (int)nTries; tries > 0; --tries) {
840
if (lock.acquire()) {
841
if (!jobfile.SaveToFile(filename)) {
850
logger.msg(WARNING, "Waiting for lock on job list file %s", filename);
853
Glib::usleep(tryInterval);
859
bool Job::WriteJobsToFile(const std::string& filename, const std::list<Job>& jobs, unsigned nTries, unsigned tryInterval) {
860
std::list<const Job*> newJobs;
861
return WriteJobsToFile(filename, jobs, newJobs, nTries, tryInterval);
864
bool Job::WriteJobsToFile(const std::string& filename, const std::list<Job>& jobs, std::list<const Job*>& newJobs, unsigned nTries, unsigned tryInterval) {
865
FileLock lock(filename);
866
for (int tries = (int)nTries; tries > 0; --tries) {
867
if (lock.acquire()) {
869
jobfile.ReadFromFile(filename);
871
// Use std::map to store job IDs to be searched for duplicates.
872
std::map<std::string, XMLNode> jobIDXMLMap;
873
for (Arc::XMLNode j = jobfile["Job"]; j; ++j) {
874
if (!((std::string)j["JobID"]).empty()) {
875
jobIDXMLMap[(std::string)j["JobID"]] = j;
879
std::map<std::string, const Job*> newJobsMap;
880
for (std::list<Job>::const_iterator it = jobs.begin(); it != jobs.end(); ++it) {
881
std::map<std::string, XMLNode>::iterator itJobXML = jobIDXMLMap.find(it->JobID.fullstr());
882
if (itJobXML == jobIDXMLMap.end()) {
883
XMLNode xJob = jobfile.NewChild("Job");
885
jobIDXMLMap[it->JobID.fullstr()] = xJob;
886
newJobsMap[it->JobID.fullstr()] = &(*it);
889
// Duplicate found, replace it.
890
itJobXML->second.Replace(XMLNode(NS(), "Job"));
891
it->ToXML(itJobXML->second);
893
// Only add to newJobsMap if this is a new job, i.e. not previous present in jobfile.
894
std::map<std::string, const Job*>::iterator itNewJobsMap = newJobsMap.find(it->JobID.fullstr());
895
if (itNewJobsMap != newJobsMap.end()) {
896
itNewJobsMap->second = &(*it);
901
// Add pointers to new Job objects to the newJobs list.
902
for (std::map<std::string, const Job*>::const_iterator it = newJobsMap.begin();
903
it != newJobsMap.end(); ++it) {
904
newJobs.push_back(it->second);
907
if (!jobfile.SaveToFile(filename)) {
916
logger.msg(WARNING, "Waiting for lock on job list file %s", filename);
919
Glib::usleep(tryInterval);
925
bool Job::RemoveJobsFromFile(const std::string& filename, const std::list<URL>& jobids, unsigned nTries, unsigned tryInterval) {
926
if (jobids.empty()) {
930
FileLock lock(filename);
931
for (int tries = nTries; tries > 0; --tries) {
932
if (lock.acquire()) {
934
if (!jobstorage.ReadFromFile(filename)) {
939
XMLNodeList xmlJobs = jobstorage.Path("Job");
940
for (std::list<URL>::const_iterator it = jobids.begin(); it != jobids.end(); ++it) {
941
for (XMLNodeList::iterator xJIt = xmlJobs.begin(); xJIt != xmlJobs.end(); ++xJIt) {
942
if ((*xJIt)["JobID"] == it->fullstr() ||
943
(*xJIt)["IDFromEndpoint"] == it->fullstr() // Included for backwards compatibility.
945
xJIt->Destroy(); // Do not break, since for some reason there might be multiple identical jobs in the file.
950
if (!jobstorage.SaveToFile(filename)) {
959
logger.msg(VERBOSE, "Waiting for lock on job list file %s", filename);
961
Glib::usleep(tryInterval);
967
bool Job::ReadJobIDsFromFile(const std::string& filename, std::list<std::string>& jobids, unsigned nTries, unsigned tryInterval) {
968
if (!Glib::file_test(filename, Glib::FILE_TEST_IS_REGULAR)) return false;
970
FileLock lock(filename);
971
for (int tries = (int)nTries; tries > 0; --tries) {
972
if (lock.acquire()) {
973
std::ifstream is(filename.c_str());
980
while (std::getline(is, line)) {
981
line = Arc::trim(line, " \t");
982
if (!line.empty() && line[0] != '#') {
983
jobids.push_back(line);
992
logger.msg(WARNING, "Waiting for lock on file %s", filename);
995
Glib::usleep(tryInterval);
1001
bool Job::WriteJobIDToFile(const URL& jobid, const std::string& filename, unsigned nTries, unsigned tryInterval) {
1002
if (Glib::file_test(filename, Glib::FILE_TEST_IS_DIR)) return false;
1004
FileLock lock(filename);
1005
for (int tries = (int)nTries; tries > 0; --tries) {
1006
if (lock.acquire()) {
1007
std::ofstream os(filename.c_str(), std::ios::app);
1013
os << jobid.fullstr() << std::endl;
1014
bool good = os.good();
1021
logger.msg(WARNING, "Waiting for lock on file %s", filename);
1024
Glib::usleep(tryInterval);
1030
bool Job::WriteJobIDsToFile(const std::list<URL>& jobids, const std::string& filename, unsigned nTries, unsigned tryInterval) {
1031
if (Glib::file_test(filename, Glib::FILE_TEST_IS_DIR)) return false;
1032
FileLock lock(filename);
1033
for (int tries = (int)nTries; tries > 0; --tries) {
1034
if (lock.acquire()) {
1035
std::ofstream os(filename.c_str(), std::ios::app);
1041
for (std::list<URL>::const_iterator it = jobids.begin();
1042
it != jobids.end(); ++it) {
1043
os << it->fullstr() << std::endl;
1046
bool good = os.good();
1053
logger.msg(WARNING, "Waiting for lock on file %s", filename);
1056
Glib::usleep(tryInterval);
1062
bool Job::WriteJobIDsToFile(const std::list<Job>& jobs, const std::string& filename, unsigned nTries, unsigned tryInterval) {
1063
if (Glib::file_test(filename, Glib::FILE_TEST_IS_DIR)) return false;
1065
FileLock lock(filename);
1066
for (int tries = (int)nTries; tries > 0; --tries) {
1067
if (lock.acquire()) {
1068
std::ofstream os(filename.c_str(), std::ios::app);
1074
for (std::list<Job>::const_iterator it = jobs.begin();
1075
it != jobs.end(); ++it) {
1076
os << it->JobID.fullstr() << std::endl;
1079
bool good = os.good();
1086
logger.msg(WARNING, "Waiting for lock on file %s", filename);
1089
Glib::usleep(tryInterval);