1
// -*- indent-tabs-mode: nil -*-
9
#include <arc/ArcConfig.h>
10
#include <arc/FileLock.h>
11
#include <arc/Logger.h>
12
#include <arc/StringConv.h>
13
#include <arc/Utils.h>
15
#include "JobInformationStorage.h"
19
Logger JobInformationStorageXML::logger(Logger::getRootLogger(), "JobInformationStorageXML");
21
bool JobInformationStorageXML::ReadAll(std::list<Job>& jobs, const std::list<std::string>& rEndpoints) {
26
bool acquired = false;
27
for (int tries = (int)nTries; tries > 0; --tries) {
28
acquired = lock.acquire();
30
if (!jobstorage.ReadFromFile(name)) {
39
logger.msg(VERBOSE, "Waiting for lock on job list file %s", name);
42
Glib::usleep(tryInterval);
49
XMLNodeList xmljobs = jobstorage.Path("Job");
50
for (XMLNodeList::iterator xit = xmljobs.begin(); xit != xmljobs.end(); ++xit) {
52
for (std::list<std::string>::const_iterator rEIt = rEndpoints.begin();
53
rEIt != rEndpoints.end(); ++rEIt) {
54
if (jobs.back().JobManagementURL.StringMatches(*rEIt)) {
64
bool JobInformationStorageXML::Read(std::list<Job>& jobs, std::list<std::string>& jobIdentifiers, const std::list<std::string>& endpoints, const std::list<std::string>& rEndpoints) {
65
if (!ReadAll(jobs, rEndpoints)) { return false; }
67
std::list<std::string> jobIdentifiersCopy = jobIdentifiers;
68
for (std::list<Job>::iterator itJ = jobs.begin();
70
// Check if the job (itJ) is selected by the job identifies, either by job ID or Name.
71
std::list<std::string>::iterator itJIdentifier = jobIdentifiers.begin();
72
for (;itJIdentifier != jobIdentifiers.end(); ++itJIdentifier) {
73
if ((!itJ->Name.empty() && itJ->Name == *itJIdentifier) ||
74
(itJ->JobID == *itJIdentifier)) {
78
if (itJIdentifier != jobIdentifiers.end()) {
79
// Job explicitly specified. Remove id from the copy list, in order to keep track of used identifiers.
80
std::list<std::string>::iterator itJIdentifierCopy;
81
while ((itJIdentifierCopy = std::find(jobIdentifiersCopy.begin(), jobIdentifiersCopy.end(), *itJIdentifier))
82
!= jobIdentifiersCopy.end()) {
83
jobIdentifiersCopy.erase(itJIdentifierCopy);
89
// Check if the job (itJ) is selected by endpoints.
90
std::list<std::string>::const_iterator itC = endpoints.begin();
91
for (; itC != endpoints.end(); ++itC) {
92
if (itJ->JobManagementURL.StringMatches(*itC)) {
96
if (itC != endpoints.end()) {
97
// Cluster on which job reside is explicitly specified.
102
// Job is not selected - remove it.
103
itJ = jobs.erase(itJ);
106
jobIdentifiers = jobIdentifiersCopy;
111
bool JobInformationStorageXML::Clean() {
114
for (int tries = (int)nTries; tries > 0; --tries) {
115
if (lock.acquire()) {
116
if (!jobfile.SaveToFile(name)) {
125
logger.msg(WARNING, "Waiting for lock on job list file %s", name);
128
Glib::usleep(tryInterval);
134
bool JobInformationStorageXML::Write(const std::list<Job>& jobs, const std::set<std::string>& prunedServices, std::list<const Job*>& newJobs) {
136
for (int tries = (int)nTries; tries > 0; --tries) {
137
if (lock.acquire()) {
139
jobfile.ReadFromFile(name);
141
// Use std::map to store job IDs to be searched for duplicates.
142
std::map<std::string, XMLNode> jobIDXMLMap;
143
std::map<std::string, XMLNode> jobsToRemove;
144
for (Arc::XMLNode j = jobfile["Job"]; j; ++j) {
145
if (!((std::string)j["JobID"]).empty()) {
146
std::string serviceName = URL(j["ServiceInformationURL"]).Host();
147
if (!serviceName.empty() && prunedServices.count(serviceName)) {
148
logger.msg(DEBUG, "Will remove %s on service %s.",
149
((std::string)j["JobID"]).c_str(), serviceName);
150
jobsToRemove[(std::string)j["JobID"]] = j;
153
jobIDXMLMap[(std::string)j["JobID"]] = j;
158
// Remove jobs which belong to our list of endpoints to prune.
159
for (std::map<std::string, XMLNode>::iterator it = jobsToRemove.begin();
160
it != jobsToRemove.end(); ++it) {
161
it->second.Destroy();
164
std::map<std::string, const Job*> newJobsMap;
165
for (std::list<Job>::const_iterator it = jobs.begin(); it != jobs.end(); ++it) {
166
std::map<std::string, XMLNode>::iterator itJobXML = jobIDXMLMap.find(it->JobID);
167
if (itJobXML == jobIDXMLMap.end()) {
168
XMLNode xJob = jobfile.NewChild("Job");
170
jobIDXMLMap[it->JobID] = xJob;
172
std::map<std::string, XMLNode>::iterator itRemovedJobs = jobsToRemove.find(it->JobID);
173
if (itRemovedJobs == jobsToRemove.end()) {
174
newJobsMap[it->JobID] = &(*it);
178
// Duplicate found, replace it.
179
itJobXML->second.Replace(XMLNode(NS(), "Job"));
180
it->ToXML(itJobXML->second);
182
// Only add to newJobsMap if this is a new job, i.e. not previous present in jobfile.
183
std::map<std::string, const Job*>::iterator itNewJobsMap = newJobsMap.find(it->JobID);
184
if (itNewJobsMap != newJobsMap.end()) {
185
itNewJobsMap->second = &(*it);
190
// Add pointers to new Job objects to the newJobs list.
191
for (std::map<std::string, const Job*>::const_iterator it = newJobsMap.begin();
192
it != newJobsMap.end(); ++it) {
193
newJobs.push_back(it->second);
196
if (!jobfile.SaveToFile(name)) {
205
logger.msg(WARNING, "Waiting for lock on job list file %s", name);
208
Glib::usleep(tryInterval);
214
bool JobInformationStorageXML::Remove(const std::list<std::string>& jobids) {
215
if (jobids.empty()) {
220
for (int tries = nTries; tries > 0; --tries) {
221
if (lock.acquire()) {
223
if (!jobstorage.ReadFromFile(name)) {
228
XMLNodeList xmlJobs = jobstorage.Path("Job");
229
for (std::list<std::string>::const_iterator it = jobids.begin(); it != jobids.end(); ++it) {
230
for (XMLNodeList::iterator xJIt = xmlJobs.begin(); xJIt != xmlJobs.end(); ++xJIt) {
231
if ((*xJIt)["JobID"] == *it ||
232
(*xJIt)["IDFromEndpoint"] == *it // Included for backwards compatibility.
234
xJIt->Destroy(); // Do not break, since for some reason there might be multiple identical jobs in the file.
239
if (!jobstorage.SaveToFile(name)) {
248
logger.msg(VERBOSE, "Waiting for lock on job list file %s", name);
250
Glib::usleep(tryInterval);
257
#ifdef DBJSTORE_ENABLED
258
static void* store_string(const std::string& str, void* buf) {
259
uint32_t l = str.length();
260
unsigned char* p = (unsigned char*)buf;
261
*p = (unsigned char)l; l >>= 8; ++p;
262
*p = (unsigned char)l; l >>= 8; ++p;
263
*p = (unsigned char)l; l >>= 8; ++p;
264
*p = (unsigned char)l; l >>= 8; ++p;
265
::memcpy(p,str.c_str(),str.length());
270
static void* parse_string(std::string& str, const void* buf, uint32_t& size) {
272
const unsigned char* p = (unsigned char*)buf;
273
if(size < 4) { p += size; size = 0; return (void*)p; };
274
l |= ((uint32_t)(*p)) << 0; ++p; --size;
275
l |= ((uint32_t)(*p)) << 8; ++p; --size;
276
l |= ((uint32_t)(*p)) << 16; ++p; --size;
277
l |= ((uint32_t)(*p)) << 24; ++p; --size;
278
if(l > size) l = size;
279
// TODO: sanity check
280
str.assign((const char*)p,l);
285
static void serialiseJob(const Job& j, Dbt& data) {
286
const std::string version = "3.0.0";
287
const unsigned nItems = 14;
288
const std::string dataItems[] =
289
{version, j.IDFromEndpoint, j.Name,
290
j.JobStatusInterfaceName, j.JobStatusURL.fullstr(),
291
j.JobManagementInterfaceName, j.JobManagementURL.fullstr(),
292
j.ServiceInformationInterfaceName, j.ServiceInformationURL.fullstr(),
293
j.SessionDir.fullstr(), j.StageInDir.fullstr(), j.StageOutDir.fullstr(),
294
j.JobDescriptionDocument, tostring(j.LocalSubmissionTime.GetTime())};
296
data.set_data(NULL); data.set_size(0);
298
for (unsigned i = 0; i < nItems; ++i) l += 4 + dataItems[i].length();
299
void* d = (void*)::malloc(l);
301
data.set_data(d); data.set_size(l);
303
for (unsigned i = 0; i < nItems; ++i) d = store_string(dataItems[i], d);
306
static void deserialiseJob(Job& j, const Dbt& data) {
310
d = (void*)data.get_data();
311
size = (uint32_t)data.get_size();
314
d = parse_string(version, d, size);
315
if (version == "3.0.0") {
316
/* Order of items in record. Version 3.0.0
317
{version, j.IDFromEndpoint, j.Name,
318
j.JobStatusInterfaceName, j.JobStatusURL.fullstr(),
319
j.JobManagementInterfaceName, j.JobManagementURL.fullstr(),
320
j.ServiceInformationInterfaceName, j.ServiceInformationURL.fullstr(),
321
j.SessionDir.fullstr(), j.StageInDir.fullstr(), j.StageOutDir.fullstr(),
322
j.JobDescriptionDocument, tostring(j.LocalSubmissionTime.GetTime())};
325
d = parse_string(j.IDFromEndpoint, d, size);
326
d = parse_string(j.Name, d, size);
327
d = parse_string(j.JobStatusInterfaceName, d, size);
328
d = parse_string(s, d, size); j.JobStatusURL = URL(s);
329
d = parse_string(j.JobManagementInterfaceName, d, size);
330
d = parse_string(s, d, size); j.JobManagementURL = URL(s);
331
d = parse_string(j.ServiceInformationInterfaceName, d, size);
332
d = parse_string(s, d, size); j.ServiceInformationURL = URL(s);
333
d = parse_string(s, d, size); j.SessionDir = URL(s);
334
d = parse_string(s, d, size); j.StageInDir = URL(s);
335
d = parse_string(s, d, size); j.StageOutDir = URL(s);
336
d = parse_string(j.JobDescriptionDocument, d, size);
337
d = parse_string(s, d, size); j.LocalSubmissionTime.SetTime(stringtoi(s));
341
static void deserialiseNthJobAttribute(std::string& attr, const Dbt& data, unsigned n) {
345
d = (void*)data.get_data();
346
size = (uint32_t)data.get_size();
349
d = parse_string(version, d, size);
350
if (version == "3.0.0") {
351
for (unsigned i = 0; i < n-1; ++i) {
352
d = parse_string(attr, d, size);
357
static int getNameKey(Db *secondary, const Dbt *key, const Dbt *data, Dbt *result) {
359
// 3rd attribute in job record is job name.
360
deserialiseNthJobAttribute(name, *data, 3);
361
result->set_flags(DB_DBT_APPMALLOC);
362
result->set_size(name.size());
363
result->set_data(strdup(name.c_str()));
367
static int getEndpointKey(Db *secondary, const Dbt *key, const Dbt *data, Dbt *result) {
368
std::string endpointS;
369
// 7th attribute in job record is job management URL.
370
deserialiseNthJobAttribute(endpointS, *data, 7);
371
endpointS = URL(endpointS).Host();
372
if (endpointS.empty()) {
373
return DB_DONOTINDEX;
375
result->set_flags(DB_DBT_APPMALLOC);
376
result->set_size(endpointS.size());
377
result->set_data(strdup(endpointS.c_str()));
381
static int getServiceInfoHostnameKey(Db *secondary, const Dbt *key, const Dbt *data, Dbt *result) {
382
std::string endpointS;
383
// 9th attribute in job record is service information URL.
384
deserialiseNthJobAttribute(endpointS, *data, 9);
385
endpointS = URL(endpointS).Host();
386
if (endpointS.empty()) {
387
return DB_DONOTINDEX;
389
result->set_flags(DB_DBT_APPMALLOC);
390
result->set_size(endpointS.size());
391
result->set_data(strdup(endpointS.c_str()));
395
Logger JobInformationStorageBDB::logger(Logger::getRootLogger(), "JobInformationStorageBDB");
397
JobInformationStorageBDB::JobDB::JobDB(const std::string& name, u_int32_t flags)
398
: dbEnv(NULL), jobDB(NULL), endpointSecondaryKeyDB(NULL), nameSecondaryKeyDB(NULL), serviceInfoSecondaryKeyDB(NULL)
401
const DBTYPE type = (flags == DB_CREATE ? DB_BTREE : DB_UNKNOWN);
402
std::string basepath = "";
404
dbEnv = new DbEnv(DB_CXX_NO_EXCEPTIONS);
405
if ((ret = dbEnv->open(NULL, DB_CREATE | DB_INIT_CDB | DB_INIT_MPOOL, 0)) != 0) {
406
JobInformationStorageBDB::logger.msg(ERROR, "Unable to create data base environment (%s)", name);
407
throw std::exception();
410
jobDB = new Db(dbEnv, DB_CXX_NO_EXCEPTIONS);
411
nameSecondaryKeyDB = new Db(dbEnv, DB_CXX_NO_EXCEPTIONS);
412
endpointSecondaryKeyDB = new Db(dbEnv, DB_CXX_NO_EXCEPTIONS);
413
serviceInfoSecondaryKeyDB = new Db(dbEnv, DB_CXX_NO_EXCEPTIONS);
415
if ((ret = nameSecondaryKeyDB->set_flags(DB_DUPSORT)) != 0) {
416
JobInformationStorageBDB::logger.msg(ERROR, "Unable to set duplicate flags for secondary key DB (%s)", name);
417
logErrorMessage(ret);
418
throw std::exception();
420
if ((ret = endpointSecondaryKeyDB->set_flags(DB_DUPSORT)) != 0) {
421
JobInformationStorageBDB::logger.msg(ERROR, "Unable to set duplicate flags for secondary key DB (%s)", name);
422
logErrorMessage(ret);
423
throw std::exception();
425
if ((ret = serviceInfoSecondaryKeyDB->set_flags(DB_DUPSORT)) != 0) {
426
JobInformationStorageBDB::logger.msg(ERROR, "Unable to set duplicate flags for secondary key DB (%s)", name);
427
logErrorMessage(ret);
428
throw std::exception();
431
if ((ret = jobDB->open(NULL, name.c_str(), "job_records", type, flags, 0)) != 0) {
432
JobInformationStorageBDB::logger.msg(ERROR, "Unable to create job database (%s)", name);
433
logErrorMessage(ret);
434
throw std::exception();
436
if ((ret = nameSecondaryKeyDB->open(NULL, name.c_str(), "name_keys", type, flags, 0)) != 0) {
437
JobInformationStorageBDB::logger.msg(ERROR, "Unable to create DB for secondary name keys (%s)", name);
438
logErrorMessage(ret);
439
throw std::exception();
441
if ((ret = endpointSecondaryKeyDB->open(NULL, name.c_str(), "endpoint_keys", type, flags, 0)) != 0) {
442
JobInformationStorageBDB::logger.msg(ERROR, "Unable to create DB for secondary endpoint keys (%s)", name);
443
logErrorMessage(ret);
444
throw std::exception();
446
if ((ret = serviceInfoSecondaryKeyDB->open(NULL, name.c_str(), "serviceinfo_keys", type, flags, 0)) != 0) {
447
JobInformationStorageBDB::logger.msg(ERROR, "Unable to create DB for secondary service info keys (%s)", name);
448
logErrorMessage(ret);
449
throw std::exception();
452
if ((ret = jobDB->associate(NULL, nameSecondaryKeyDB, (flags != DB_RDONLY ? getNameKey : NULL), 0)) != 0) {
453
JobInformationStorageBDB::logger.msg(ERROR, "Unable to associate secondary DB with primary DB (%s)", name);
454
logErrorMessage(ret);
455
throw std::exception();
457
if ((ret = jobDB->associate(NULL, endpointSecondaryKeyDB, (flags != DB_RDONLY ? getEndpointKey : NULL), 0)) != 0) {
458
JobInformationStorageBDB::logger.msg(ERROR, "Unable to associate secondary DB with primary DB (%s)", name);
459
logErrorMessage(ret);
460
throw std::exception();
462
if ((ret = jobDB->associate(NULL, serviceInfoSecondaryKeyDB, (flags != DB_RDONLY ? getServiceInfoHostnameKey : NULL), 0)) != 0) {
463
JobInformationStorageBDB::logger.msg(ERROR, "Unable to associate secondary DB with primary DB (%s)", name);
464
logErrorMessage(ret);
465
throw std::exception();
468
JobInformationStorageBDB::logger.msg(DEBUG, "Job database created successfully (%s)", name);
471
JobInformationStorageBDB::JobDB::~JobDB() {
472
if (nameSecondaryKeyDB) {
473
nameSecondaryKeyDB->close(0);
475
if (endpointSecondaryKeyDB) {
476
endpointSecondaryKeyDB->close(0);
478
if (serviceInfoSecondaryKeyDB) {
479
serviceInfoSecondaryKeyDB->close(0);
488
delete endpointSecondaryKeyDB; endpointSecondaryKeyDB = NULL;
489
delete nameSecondaryKeyDB; nameSecondaryKeyDB = NULL;
490
delete serviceInfoSecondaryKeyDB; serviceInfoSecondaryKeyDB = NULL;
491
delete jobDB; jobDB = NULL;
492
delete dbEnv; dbEnv = NULL;
494
dbEnv = new DbEnv(DB_CXX_NO_EXCEPTIONS);
495
dbEnv->remove(NULL, 0);
496
delete dbEnv; dbEnv = NULL;
499
bool JobInformationStorageBDB::Write(const std::list<Job>& jobs) {
500
if (jobs.empty()) return true;
503
JobDB db(name, DB_CREATE);
505
std::list<Job>::const_iterator it = jobs.begin();
509
InterruptGuard guard;
512
key.set_size(it->JobID.size());
513
key.set_data((char*)it->JobID.c_str());
514
serialiseJob(*it, data);
515
pdata = data.get_data();
516
} while ((ret = db->put(NULL, &key, &data, 0)) == 0 && ++it != jobs.end());
521
logger.msg(ERROR, "Unable to write key/value pair to job database (%s): Key \"%s\"", name, (char*)key.get_data());
522
logErrorMessage(ret);
525
} catch (const std::exception& e) {
532
bool JobInformationStorageBDB::Write(const std::list<Job>& jobs, const std::set<std::string>& prunedServices, std::list<const Job*>& newJobs) {
533
if (jobs.empty()) return true;
536
JobDB db(name, DB_CREATE);
539
std::set<std::string> idsOfPrunedJobs;
541
if ((ret = db.viaServiceInfoKeys()->cursor(NULL, &cursor, DB_WRITECURSOR)) != 0) return false;
542
for (std::set<std::string>::const_iterator itPruned = prunedServices.begin();
543
itPruned != prunedServices.end(); ++itPruned) {
544
Dbt key((char *)itPruned->c_str(), itPruned->size()), pkey, data;
545
if (cursor->pget(&key, &pkey, &data, DB_SET) != 0) continue;
547
idsOfPrunedJobs.insert(std::string((char *)pkey.get_data(), pkey.get_size()));
549
} while (cursor->pget(&key, &pkey, &data, DB_NEXT_DUP) == 0);
553
std::list<Job>::const_iterator it = jobs.begin();
558
InterruptGuard guard;
561
key.set_size(it->JobID.size());
562
key.set_data((char*)it->JobID.c_str());
563
serialiseJob(*it, data);
564
pdata = data.get_data();
565
jobWasPruned = (idsOfPrunedJobs.count(it->JobID) != 0);
566
if (!jobWasPruned) { // Check if job already exist.
568
if (db->get(NULL, &key, &existingData, 0) == DB_NOTFOUND) {
569
newJobs.push_back(&*it);
572
} while (((ret = db->put(NULL, &key, &data, 0)) == 0 && ++it != jobs.end()));
577
logger.msg(ERROR, "Unable to write key/value pair to job database (%s): Key \"%s\"", name, (char*)key.get_data());
578
logErrorMessage(ret);
581
} catch (const std::exception& e) {
588
bool JobInformationStorageBDB::ReadAll(std::list<Job>& jobs, const std::list<std::string>& rejectEndpoints) {
596
if ((ret = db->cursor(NULL, &cursor, 0)) != 0) {
597
//dbp->err(dbp, ret, "DB->cursor");
602
while ((ret = cursor->get(&key, &data, DB_NEXT)) == 0) {
603
jobs.push_back(Job());
604
jobs.back().JobID = std::string((char *)key.get_data(), key.get_size());
605
deserialiseJob(jobs.back(), data);
606
for (std::list<std::string>::const_iterator it = rejectEndpoints.begin();
607
it != rejectEndpoints.end(); ++it) {
608
if (jobs.back().JobManagementURL.StringMatches(*it)) {
617
if (ret != DB_NOTFOUND) {
618
//dbp->err(dbp, ret, "DBcursor->get");
621
} catch (const std::exception& e) {
628
static void addJobFromDB(const Dbt& key, const Dbt& data, std::list<Job>& jobs, std::set<std::string>& idsOfAddedJobs, const std::list<std::string>& rejectEndpoints) {
629
jobs.push_back(Job());
630
jobs.back().JobID.assign((char *)key.get_data(), key.get_size());
631
deserialiseJob(jobs.back(), data);
632
if (idsOfAddedJobs.count(jobs.back().JobID) != 0) { // Look for duplicates and remove them.
636
idsOfAddedJobs.insert(jobs.back().JobID);
637
for (std::list<std::string>::const_iterator it = rejectEndpoints.begin();
638
it != rejectEndpoints.end(); ++it) {
639
if (jobs.back().JobManagementURL.StringMatches(*it)) {
640
idsOfAddedJobs.erase(jobs.back().JobID);
647
static bool addJobsFromDuplicateKeys(Db& db, Dbt& key, std::list<Job>& jobs, std::set<std::string>& idsOfAddedJobs, const std::list<std::string>& rejectEndpoints) {
651
if ((ret = db.cursor(NULL, &cursor, 0)) != 0) {
652
//dbp->err(dbp, ret, "DB->cursor");
655
ret = cursor->pget(&key, &pkey, &data, DB_SET);
656
if (ret != 0) return false;
658
addJobFromDB(pkey, data, jobs, idsOfAddedJobs, rejectEndpoints);
659
while ((ret = cursor->pget(&key, &pkey, &data, DB_NEXT_DUP)) == 0) {
660
addJobFromDB(pkey, data, jobs, idsOfAddedJobs, rejectEndpoints);
665
bool JobInformationStorageBDB::Read(std::list<Job>& jobs, std::list<std::string>& jobIdentifiers,
666
const std::list<std::string>& endpoints,
667
const std::list<std::string>& rejectEndpoints) {
673
std::set<std::string> idsOfAddedJobs;
674
for (std::list<std::string>::iterator it = jobIdentifiers.begin();
675
it != jobIdentifiers.end();) {
676
if (it->empty()) continue;
678
Dbt key((char *)it->c_str(), it->size()), data;
679
ret = db->get(NULL, &key, &data, 0);
680
if (ret == DB_NOTFOUND) {
681
if (addJobsFromDuplicateKeys(*db.viaNameKeys(), key, jobs, idsOfAddedJobs, rejectEndpoints)) {
682
it = jobIdentifiers.erase(it);
689
addJobFromDB(key, data, jobs, idsOfAddedJobs, rejectEndpoints);
690
it = jobIdentifiers.erase(it);
692
if (endpoints.empty()) return true;
695
if ((ret = db.viaEndpointKeys()->cursor(NULL, &cursor, 0)) != 0) return false;
696
for (std::list<std::string>::const_iterator it = endpoints.begin();
697
it != endpoints.end(); ++it) {
698
// Extract hostname from iterator.
700
if (u.Protocol() == "file") {
701
u = URL("http://" + *it); // Only need to extract hostname. Prefix with "http://".
703
if (u.Host().empty()) continue;
705
Dbt key((char *)u.Host().c_str(), u.Host().size()), pkey, data;
706
ret = cursor->pget(&key, &pkey, &data, DB_SET);
710
std::string tmpEndpoint;
711
deserialiseNthJobAttribute(tmpEndpoint, data, 7);
712
URL jobManagementURL(tmpEndpoint);
713
if (jobManagementURL.StringMatches(*it)) {
714
addJobFromDB(pkey, data, jobs, idsOfAddedJobs, rejectEndpoints);
716
while ((ret = cursor->pget(&key, &pkey, &data, DB_NEXT_DUP)) == 0) {
717
deserialiseNthJobAttribute(tmpEndpoint, data, 7);
718
URL jobManagementURL(tmpEndpoint);
719
if (jobManagementURL.StringMatches(*it)) {
720
addJobFromDB(pkey, data, jobs, idsOfAddedJobs, rejectEndpoints);
724
} catch (const std::exception& e) {
731
bool JobInformationStorageBDB::Clean() {
732
if (remove(name.c_str()) != 0) {
733
if (errno == ENOENT) return true; // No such file. DB already cleaned.
734
logger.msg(ERROR, "Unable to truncate job database (%s)", name);
742
bool JobInformationStorageBDB::Remove(const std::list<std::string>& jobids) {
744
InterruptGuard guard;
745
JobDB db(name, DB_CREATE);
746
for (std::list<std::string>::const_iterator it = jobids.begin();
747
it != jobids.end(); ++it) {
748
Dbt key((char *)it->c_str(), it->size());
749
db->del(NULL, &key, 0);
751
} catch (const std::exception& e) {
758
void JobInformationStorageBDB::logErrorMessage(int err) {
761
logger.msg(DEBUG, "ENOENT: The file or directory does not exist, Or a nonexistent re_source file was specified.");
764
logger.msg(DEBUG, "DB_OLD_VERSION: The database cannot be opened without being first upgraded.");
767
logger.msg(DEBUG, "EEXIST: DB_CREATE and DB_EXCL were specified and the database exists.");
769
logger.msg(DEBUG, "EINVAL");
772
logger.msg(DEBUG, "Unable to determine error (%d)", err);