~ubuntu-branches/ubuntu/trusty/nordugrid-arc/trusty

« back to all changes in this revision

Viewing changes to src/hed/libs/compute/JobInformationStorage.cpp

  • Committer: Package Import Robot
  • Author(s): Mattias Ellert
  • Date: 2013-11-29 13:39:10 UTC
  • mfrom: (3.1.16 sid)
  • Revision ID: package-import@ubuntu.com-20131129133910-sy6ayoavphc5hozs
Tags: 4.0.0-1
4.0.0 Release (Closes: #715131) (LP: #1049798)

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
// -*- indent-tabs-mode: nil -*-
2
 
 
3
 
#ifdef HAVE_CONFIG_H
4
 
#include <config.h>
5
 
#endif
6
 
 
7
 
#include <errno.h>
8
 
 
9
 
#include <arc/ArcConfig.h>
10
 
#include <arc/FileLock.h>
11
 
#include <arc/Logger.h>
12
 
#include <arc/StringConv.h>
13
 
#include <arc/Utils.h>
14
 
 
15
 
#include "JobInformationStorage.h"
16
 
 
17
 
namespace Arc {
18
 
 
19
 
  Logger JobInformationStorageXML::logger(Logger::getRootLogger(), "JobInformationStorageXML");
20
 
 
21
 
  bool JobInformationStorageXML::ReadAll(std::list<Job>& jobs, const std::list<std::string>& rEndpoints) {
22
 
    jobs.clear();
23
 
    Config jobstorage;
24
 
 
25
 
    FileLock lock(name);
26
 
    bool acquired = false;
27
 
    for (int tries = (int)nTries; tries > 0; --tries) {
28
 
      acquired = lock.acquire();
29
 
      if (acquired) {
30
 
        if (!jobstorage.ReadFromFile(name)) {
31
 
          lock.release();
32
 
          return false;
33
 
        }
34
 
        lock.release();
35
 
        break;
36
 
      }
37
 
 
38
 
      if (tries == 6) {
39
 
        logger.msg(VERBOSE, "Waiting for lock on job list file %s", name);
40
 
      }
41
 
 
42
 
      Glib::usleep(tryInterval);
43
 
    }
44
 
 
45
 
    if (!acquired) {
46
 
      return false;
47
 
    }
48
 
    
49
 
    XMLNodeList xmljobs = jobstorage.Path("Job");
50
 
    for (XMLNodeList::iterator xit = xmljobs.begin(); xit != xmljobs.end(); ++xit) {
51
 
      jobs.push_back(*xit);
52
 
      for (std::list<std::string>::const_iterator rEIt = rEndpoints.begin();
53
 
           rEIt != rEndpoints.end(); ++rEIt) {
54
 
        if (jobs.back().JobManagementURL.StringMatches(*rEIt)) {
55
 
          jobs.pop_back();
56
 
          break;
57
 
        }
58
 
      }
59
 
    }
60
 
 
61
 
    return true;
62
 
  }
63
 
 
64
 
  bool JobInformationStorageXML::Read(std::list<Job>& jobs, std::list<std::string>& jobIdentifiers, const std::list<std::string>& endpoints, const std::list<std::string>& rEndpoints) {
65
 
    if (!ReadAll(jobs, rEndpoints)) { return false; }
66
 
 
67
 
    std::list<std::string> jobIdentifiersCopy = jobIdentifiers;
68
 
    for (std::list<Job>::iterator itJ = jobs.begin();
69
 
         itJ != jobs.end();) {
70
 
      // Check if the job (itJ) is selected by the job identifies, either by job ID or Name.
71
 
      std::list<std::string>::iterator itJIdentifier = jobIdentifiers.begin();
72
 
      for (;itJIdentifier != jobIdentifiers.end(); ++itJIdentifier) {
73
 
        if ((!itJ->Name.empty() && itJ->Name == *itJIdentifier) ||
74
 
            (itJ->JobID == *itJIdentifier)) {
75
 
          break;
76
 
        }
77
 
      }
78
 
      if (itJIdentifier != jobIdentifiers.end()) {
79
 
        // Job explicitly specified. Remove id from the copy list, in order to keep track of used identifiers.
80
 
        std::list<std::string>::iterator itJIdentifierCopy;
81
 
        while ((itJIdentifierCopy = std::find(jobIdentifiersCopy.begin(), jobIdentifiersCopy.end(), *itJIdentifier))
82
 
               != jobIdentifiersCopy.end()) {
83
 
          jobIdentifiersCopy.erase(itJIdentifierCopy);
84
 
        }
85
 
        ++itJ;
86
 
        continue;
87
 
      }
88
 
 
89
 
      // Check if the job (itJ) is selected by endpoints.
90
 
      std::list<std::string>::const_iterator itC = endpoints.begin();
91
 
      for (; itC != endpoints.end(); ++itC) {
92
 
        if (itJ->JobManagementURL.StringMatches(*itC)) {
93
 
          break;
94
 
        }
95
 
      }
96
 
      if (itC != endpoints.end()) {
97
 
        // Cluster on which job reside is explicitly specified.
98
 
        ++itJ;
99
 
        continue;
100
 
      }
101
 
 
102
 
      // Job is not selected - remove it.
103
 
      itJ = jobs.erase(itJ);
104
 
    }
105
 
 
106
 
    jobIdentifiers = jobIdentifiersCopy;
107
 
 
108
 
    return true;
109
 
  }
110
 
 
111
 
  bool JobInformationStorageXML::Clean() {
112
 
    Config jobfile;
113
 
    FileLock lock(name);
114
 
    for (int tries = (int)nTries; tries > 0; --tries) {
115
 
      if (lock.acquire()) {
116
 
        if (!jobfile.SaveToFile(name)) {
117
 
          lock.release();
118
 
          return false;
119
 
        }
120
 
        lock.release();
121
 
        return true;
122
 
      }
123
 
 
124
 
      if (tries == 6) {
125
 
        logger.msg(WARNING, "Waiting for lock on job list file %s", name);
126
 
      }
127
 
 
128
 
      Glib::usleep(tryInterval);
129
 
    }
130
 
 
131
 
    return false;
132
 
  }
133
 
 
134
 
  bool JobInformationStorageXML::Write(const std::list<Job>& jobs, const std::set<std::string>& prunedServices, std::list<const Job*>& newJobs) {
135
 
    FileLock lock(name);
136
 
    for (int tries = (int)nTries; tries > 0; --tries) {
137
 
      if (lock.acquire()) {
138
 
        Config jobfile;
139
 
        jobfile.ReadFromFile(name);
140
 
 
141
 
        // Use std::map to store job IDs to be searched for duplicates.
142
 
        std::map<std::string, XMLNode> jobIDXMLMap;
143
 
        std::map<std::string, XMLNode> jobsToRemove;
144
 
        for (Arc::XMLNode j = jobfile["Job"]; j; ++j) {
145
 
          if (!((std::string)j["JobID"]).empty()) {
146
 
            std::string serviceName = URL(j["ServiceInformationURL"]).Host();
147
 
            if (!serviceName.empty() && prunedServices.count(serviceName)) {
148
 
              logger.msg(DEBUG, "Will remove %s on service %s.",
149
 
                         ((std::string)j["JobID"]).c_str(), serviceName);
150
 
              jobsToRemove[(std::string)j["JobID"]] = j;
151
 
            }
152
 
            else {
153
 
              jobIDXMLMap[(std::string)j["JobID"]] = j;
154
 
            }
155
 
          }
156
 
        }
157
 
 
158
 
        // Remove jobs which belong to our list of endpoints to prune.
159
 
        for (std::map<std::string, XMLNode>::iterator it = jobsToRemove.begin();
160
 
             it != jobsToRemove.end(); ++it) {
161
 
          it->second.Destroy();
162
 
        }
163
 
 
164
 
        std::map<std::string, const Job*> newJobsMap;
165
 
        for (std::list<Job>::const_iterator it = jobs.begin(); it != jobs.end(); ++it) {
166
 
          std::map<std::string, XMLNode>::iterator itJobXML = jobIDXMLMap.find(it->JobID);
167
 
          if (itJobXML == jobIDXMLMap.end()) {
168
 
            XMLNode xJob = jobfile.NewChild("Job");
169
 
            it->ToXML(xJob);
170
 
            jobIDXMLMap[it->JobID] = xJob;
171
 
 
172
 
            std::map<std::string, XMLNode>::iterator itRemovedJobs = jobsToRemove.find(it->JobID);
173
 
            if (itRemovedJobs == jobsToRemove.end()) {
174
 
              newJobsMap[it->JobID] = &(*it);
175
 
            }
176
 
          }
177
 
          else {
178
 
            // Duplicate found, replace it.
179
 
            itJobXML->second.Replace(XMLNode(NS(), "Job"));
180
 
            it->ToXML(itJobXML->second);
181
 
 
182
 
            // Only add to newJobsMap if this is a new job, i.e. not previous present in jobfile.
183
 
            std::map<std::string, const Job*>::iterator itNewJobsMap = newJobsMap.find(it->JobID);
184
 
            if (itNewJobsMap != newJobsMap.end()) {
185
 
              itNewJobsMap->second = &(*it);
186
 
            }
187
 
          }
188
 
        }
189
 
 
190
 
        // Add pointers to new Job objects to the newJobs list.
191
 
        for (std::map<std::string, const Job*>::const_iterator it = newJobsMap.begin();
192
 
             it != newJobsMap.end(); ++it) {
193
 
          newJobs.push_back(it->second);
194
 
        }
195
 
 
196
 
        if (!jobfile.SaveToFile(name)) {
197
 
          lock.release();
198
 
          return false;
199
 
        }
200
 
        lock.release();
201
 
        return true;
202
 
      }
203
 
 
204
 
      if (tries == 6) {
205
 
        logger.msg(WARNING, "Waiting for lock on job list file %s", name);
206
 
      }
207
 
 
208
 
      Glib::usleep(tryInterval);
209
 
    }
210
 
 
211
 
    return false;
212
 
  }
213
 
 
214
 
  bool JobInformationStorageXML::Remove(const std::list<std::string>& jobids) {
215
 
    if (jobids.empty()) {
216
 
      return true;
217
 
    }
218
 
 
219
 
    FileLock lock(name);
220
 
    for (int tries = nTries; tries > 0; --tries) {
221
 
      if (lock.acquire()) {
222
 
        Config jobstorage;
223
 
        if (!jobstorage.ReadFromFile(name)) {
224
 
          lock.release();
225
 
          return false;
226
 
        }
227
 
 
228
 
        XMLNodeList xmlJobs = jobstorage.Path("Job");
229
 
        for (std::list<std::string>::const_iterator it = jobids.begin(); it != jobids.end(); ++it) {
230
 
          for (XMLNodeList::iterator xJIt = xmlJobs.begin(); xJIt != xmlJobs.end(); ++xJIt) {
231
 
            if ((*xJIt)["JobID"] == *it ||
232
 
                (*xJIt)["IDFromEndpoint"] == *it // Included for backwards compatibility.
233
 
                ) {
234
 
              xJIt->Destroy(); // Do not break, since for some reason there might be multiple identical jobs in the file.
235
 
            }
236
 
          }
237
 
        }
238
 
 
239
 
        if (!jobstorage.SaveToFile(name)) {
240
 
          lock.release();
241
 
          return false;
242
 
        }
243
 
        lock.release();
244
 
        return true;
245
 
      }
246
 
 
247
 
      if (tries == 6) {
248
 
        logger.msg(VERBOSE, "Waiting for lock on job list file %s", name);
249
 
      }
250
 
      Glib::usleep(tryInterval);
251
 
    }
252
 
 
253
 
    return false;
254
 
  }
255
 
 
256
 
 
257
 
#ifdef DBJSTORE_ENABLED
258
 
  static void* store_string(const std::string& str, void* buf) {
259
 
    uint32_t l = str.length();
260
 
    unsigned char* p = (unsigned char*)buf;
261
 
    *p = (unsigned char)l; l >>= 8; ++p;
262
 
    *p = (unsigned char)l; l >>= 8; ++p;
263
 
    *p = (unsigned char)l; l >>= 8; ++p;
264
 
    *p = (unsigned char)l; l >>= 8; ++p;
265
 
    ::memcpy(p,str.c_str(),str.length());
266
 
    p += str.length();
267
 
    return (void*)p;
268
 
  }
269
 
 
270
 
  static void* parse_string(std::string& str, const void* buf, uint32_t& size) {
271
 
    uint32_t l = 0;
272
 
    const unsigned char* p = (unsigned char*)buf;
273
 
    if(size < 4) { p += size; size = 0; return (void*)p; };
274
 
    l |= ((uint32_t)(*p)) << 0; ++p; --size;
275
 
    l |= ((uint32_t)(*p)) << 8; ++p; --size;
276
 
    l |= ((uint32_t)(*p)) << 16; ++p; --size;
277
 
    l |= ((uint32_t)(*p)) << 24; ++p; --size;
278
 
    if(l > size) l = size;
279
 
    // TODO: sanity check
280
 
    str.assign((const char*)p,l);
281
 
    p += l; size -= l;
282
 
    return (void*)p;
283
 
  }
284
 
  
285
 
  static void serialiseJob(const Job& j, Dbt& data) {
286
 
    const std::string version = "3.0.0";
287
 
    const unsigned nItems = 14;
288
 
    const std::string dataItems[] =
289
 
      {version, j.IDFromEndpoint, j.Name,
290
 
       j.JobStatusInterfaceName, j.JobStatusURL.fullstr(),
291
 
       j.JobManagementInterfaceName, j.JobManagementURL.fullstr(),
292
 
       j.ServiceInformationInterfaceName, j.ServiceInformationURL.fullstr(),
293
 
       j.SessionDir.fullstr(), j.StageInDir.fullstr(), j.StageOutDir.fullstr(),
294
 
       j.JobDescriptionDocument, tostring(j.LocalSubmissionTime.GetTime())};
295
 
      
296
 
    data.set_data(NULL); data.set_size(0);
297
 
    uint32_t l = 0;
298
 
    for (unsigned i = 0; i < nItems; ++i) l += 4 + dataItems[i].length();
299
 
    void* d = (void*)::malloc(l);
300
 
    if(!d) return;
301
 
    data.set_data(d); data.set_size(l);
302
 
    
303
 
    for (unsigned i = 0; i < nItems; ++i) d = store_string(dataItems[i], d);
304
 
  }
305
 
 
306
 
  static void deserialiseJob(Job& j, const Dbt& data) {
307
 
    uint32_t size = 0;
308
 
    void* d = NULL;
309
 
 
310
 
    d = (void*)data.get_data();
311
 
    size = (uint32_t)data.get_size();
312
 
    
313
 
    std::string version;
314
 
    d = parse_string(version, d, size);
315
 
    if (version == "3.0.0") {
316
 
      /* Order of items in record. Version 3.0.0
317
 
          {version, j.IDFromEndpoint, j.Name,
318
 
           j.JobStatusInterfaceName, j.JobStatusURL.fullstr(),
319
 
           j.JobManagementInterfaceName, j.JobManagementURL.fullstr(),
320
 
           j.ServiceInformationInterfaceName, j.ServiceInformationURL.fullstr(),
321
 
           j.SessionDir.fullstr(), j.StageInDir.fullstr(), j.StageOutDir.fullstr(),
322
 
           j.JobDescriptionDocument, tostring(j.LocalSubmissionTime.GetTime())};
323
 
       */
324
 
      std::string s;
325
 
      d = parse_string(j.IDFromEndpoint, d, size);
326
 
      d = parse_string(j.Name, d, size);
327
 
      d = parse_string(j.JobStatusInterfaceName, d, size);
328
 
      d = parse_string(s, d, size); j.JobStatusURL = URL(s);
329
 
      d = parse_string(j.JobManagementInterfaceName, d, size);
330
 
      d = parse_string(s, d, size); j.JobManagementURL = URL(s);
331
 
      d = parse_string(j.ServiceInformationInterfaceName, d, size);
332
 
      d = parse_string(s, d, size); j.ServiceInformationURL = URL(s);
333
 
      d = parse_string(s, d, size); j.SessionDir = URL(s);
334
 
      d = parse_string(s, d, size); j.StageInDir = URL(s);
335
 
      d = parse_string(s, d, size); j.StageOutDir = URL(s);
336
 
      d = parse_string(j.JobDescriptionDocument, d, size);
337
 
      d = parse_string(s, d, size); j.LocalSubmissionTime.SetTime(stringtoi(s));
338
 
    }
339
 
  }
340
 
  
341
 
  static void deserialiseNthJobAttribute(std::string& attr, const Dbt& data, unsigned n) {
342
 
    uint32_t size = 0;
343
 
    void* d = NULL;
344
 
 
345
 
    d = (void*)data.get_data();
346
 
    size = (uint32_t)data.get_size();
347
 
    
348
 
    std::string version;
349
 
    d = parse_string(version, d, size);
350
 
    if (version == "3.0.0") {
351
 
      for (unsigned i = 0; i < n-1; ++i) {
352
 
        d = parse_string(attr, d, size);
353
 
      }
354
 
    }
355
 
  }
356
 
  
357
 
  static int getNameKey(Db *secondary, const Dbt *key, const Dbt *data, Dbt *result) {
358
 
    std::string name;
359
 
    // 3rd attribute in job record is job name.
360
 
    deserialiseNthJobAttribute(name, *data, 3);
361
 
    result->set_flags(DB_DBT_APPMALLOC);
362
 
    result->set_size(name.size());
363
 
    result->set_data(strdup(name.c_str()));
364
 
    return 0;
365
 
  }
366
 
  
367
 
  static int getEndpointKey(Db *secondary, const Dbt *key, const Dbt *data, Dbt *result) {
368
 
    std::string endpointS;
369
 
    // 7th attribute in job record is job management URL.
370
 
    deserialiseNthJobAttribute(endpointS, *data, 7);
371
 
    endpointS = URL(endpointS).Host();
372
 
    if (endpointS.empty()) {
373
 
      return DB_DONOTINDEX;
374
 
    }
375
 
    result->set_flags(DB_DBT_APPMALLOC);
376
 
    result->set_size(endpointS.size());
377
 
    result->set_data(strdup(endpointS.c_str()));
378
 
    return 0;
379
 
  }
380
 
  
381
 
  static int getServiceInfoHostnameKey(Db *secondary, const Dbt *key, const Dbt *data, Dbt *result) {
382
 
    std::string endpointS;
383
 
    // 9th attribute in job record is service information URL.
384
 
    deserialiseNthJobAttribute(endpointS, *data, 9);
385
 
    endpointS = URL(endpointS).Host();
386
 
    if (endpointS.empty()) {
387
 
      return DB_DONOTINDEX;
388
 
    }
389
 
    result->set_flags(DB_DBT_APPMALLOC);
390
 
    result->set_size(endpointS.size());
391
 
    result->set_data(strdup(endpointS.c_str()));
392
 
    return 0;
393
 
  }
394
 
  
395
 
  Logger JobInformationStorageBDB::logger(Logger::getRootLogger(), "JobInformationStorageBDB");
396
 
 
397
 
  JobInformationStorageBDB::JobDB::JobDB(const std::string& name, u_int32_t flags)
398
 
    : dbEnv(NULL), jobDB(NULL), endpointSecondaryKeyDB(NULL), nameSecondaryKeyDB(NULL), serviceInfoSecondaryKeyDB(NULL)
399
 
  {
400
 
    int ret;
401
 
    const DBTYPE type = (flags == DB_CREATE ? DB_BTREE : DB_UNKNOWN);
402
 
    std::string basepath = "";
403
 
    
404
 
    dbEnv = new DbEnv(DB_CXX_NO_EXCEPTIONS);
405
 
    if ((ret = dbEnv->open(NULL, DB_CREATE | DB_INIT_CDB | DB_INIT_MPOOL, 0)) != 0) {
406
 
      JobInformationStorageBDB::logger.msg(ERROR, "Unable to create data base environment (%s)", name);
407
 
      throw std::exception();
408
 
    }
409
 
 
410
 
    jobDB = new Db(dbEnv, DB_CXX_NO_EXCEPTIONS);
411
 
    nameSecondaryKeyDB = new Db(dbEnv, DB_CXX_NO_EXCEPTIONS);
412
 
    endpointSecondaryKeyDB = new Db(dbEnv, DB_CXX_NO_EXCEPTIONS);
413
 
    serviceInfoSecondaryKeyDB = new Db(dbEnv, DB_CXX_NO_EXCEPTIONS);
414
 
 
415
 
    if ((ret = nameSecondaryKeyDB->set_flags(DB_DUPSORT)) != 0) {
416
 
      JobInformationStorageBDB::logger.msg(ERROR, "Unable to set duplicate flags for secondary key DB (%s)", name);
417
 
      logErrorMessage(ret);
418
 
      throw std::exception();
419
 
    }
420
 
    if ((ret = endpointSecondaryKeyDB->set_flags(DB_DUPSORT)) != 0) {
421
 
      JobInformationStorageBDB::logger.msg(ERROR, "Unable to set duplicate flags for secondary key DB (%s)", name);
422
 
      logErrorMessage(ret);
423
 
      throw std::exception();
424
 
    }
425
 
    if ((ret = serviceInfoSecondaryKeyDB->set_flags(DB_DUPSORT)) != 0) {
426
 
      JobInformationStorageBDB::logger.msg(ERROR, "Unable to set duplicate flags for secondary key DB (%s)", name);
427
 
      logErrorMessage(ret);
428
 
      throw std::exception();
429
 
    }
430
 
 
431
 
    if ((ret = jobDB->open(NULL, name.c_str(), "job_records", type, flags, 0)) != 0) {
432
 
      JobInformationStorageBDB::logger.msg(ERROR, "Unable to create job database (%s)", name);
433
 
      logErrorMessage(ret);
434
 
      throw std::exception();
435
 
    }
436
 
    if ((ret = nameSecondaryKeyDB->open(NULL, name.c_str(), "name_keys", type, flags, 0)) != 0) {
437
 
      JobInformationStorageBDB::logger.msg(ERROR, "Unable to create DB for secondary name keys (%s)", name);
438
 
      logErrorMessage(ret);
439
 
      throw std::exception();
440
 
    }
441
 
    if ((ret = endpointSecondaryKeyDB->open(NULL, name.c_str(), "endpoint_keys", type, flags, 0)) != 0) {
442
 
      JobInformationStorageBDB::logger.msg(ERROR, "Unable to create DB for secondary endpoint keys (%s)", name);
443
 
      logErrorMessage(ret);
444
 
      throw std::exception();
445
 
    }
446
 
    if ((ret = serviceInfoSecondaryKeyDB->open(NULL, name.c_str(), "serviceinfo_keys", type, flags, 0)) != 0) {
447
 
      JobInformationStorageBDB::logger.msg(ERROR, "Unable to create DB for secondary service info keys (%s)", name);
448
 
      logErrorMessage(ret);
449
 
      throw std::exception();
450
 
    }
451
 
 
452
 
    if ((ret = jobDB->associate(NULL, nameSecondaryKeyDB, (flags != DB_RDONLY ? getNameKey : NULL), 0)) != 0) {
453
 
      JobInformationStorageBDB::logger.msg(ERROR, "Unable to associate secondary DB with primary DB (%s)", name);
454
 
      logErrorMessage(ret);
455
 
      throw std::exception();
456
 
    }
457
 
    if ((ret = jobDB->associate(NULL, endpointSecondaryKeyDB, (flags != DB_RDONLY ? getEndpointKey : NULL), 0)) != 0) {
458
 
      JobInformationStorageBDB::logger.msg(ERROR, "Unable to associate secondary DB with primary DB (%s)", name);
459
 
      logErrorMessage(ret);
460
 
      throw std::exception();
461
 
    }
462
 
    if ((ret = jobDB->associate(NULL, serviceInfoSecondaryKeyDB, (flags != DB_RDONLY ? getServiceInfoHostnameKey : NULL), 0)) != 0) {
463
 
      JobInformationStorageBDB::logger.msg(ERROR, "Unable to associate secondary DB with primary DB (%s)", name);
464
 
      logErrorMessage(ret);
465
 
      throw std::exception();
466
 
    }
467
 
 
468
 
    JobInformationStorageBDB::logger.msg(DEBUG, "Job database created successfully (%s)", name);
469
 
  }
470
 
 
471
 
  JobInformationStorageBDB::JobDB::~JobDB() {
472
 
    if (nameSecondaryKeyDB) {
473
 
      nameSecondaryKeyDB->close(0);
474
 
    }
475
 
    if (endpointSecondaryKeyDB) {
476
 
      endpointSecondaryKeyDB->close(0);
477
 
    }
478
 
    if (serviceInfoSecondaryKeyDB) {
479
 
      serviceInfoSecondaryKeyDB->close(0);
480
 
    }
481
 
    if (jobDB) {
482
 
      jobDB->close(0);
483
 
    }
484
 
    if (dbEnv) {
485
 
      dbEnv->close(0);
486
 
    }
487
 
 
488
 
    delete endpointSecondaryKeyDB; endpointSecondaryKeyDB = NULL; 
489
 
    delete nameSecondaryKeyDB; nameSecondaryKeyDB = NULL;
490
 
    delete serviceInfoSecondaryKeyDB; serviceInfoSecondaryKeyDB = NULL;
491
 
    delete jobDB; jobDB = NULL;
492
 
    delete dbEnv; dbEnv = NULL;
493
 
 
494
 
    dbEnv = new DbEnv(DB_CXX_NO_EXCEPTIONS);
495
 
    dbEnv->remove(NULL, 0);
496
 
    delete dbEnv; dbEnv = NULL;
497
 
  }
498
 
 
499
 
  bool JobInformationStorageBDB::Write(const std::list<Job>& jobs) {
500
 
    if (jobs.empty()) return true;
501
 
    
502
 
    try {
503
 
      JobDB db(name, DB_CREATE);
504
 
      int ret;
505
 
      std::list<Job>::const_iterator it = jobs.begin();
506
 
      void* pdata = NULL;
507
 
      Dbt key, data;
508
 
      {
509
 
        InterruptGuard guard;
510
 
        do {
511
 
          ::free(pdata);
512
 
          key.set_size(it->JobID.size());
513
 
          key.set_data((char*)it->JobID.c_str());
514
 
          serialiseJob(*it, data);
515
 
          pdata = data.get_data();
516
 
        } while ((ret = db->put(NULL, &key, &data, 0)) == 0 && ++it != jobs.end());
517
 
        ::free(pdata);
518
 
      };
519
 
      
520
 
      if (ret != 0) {
521
 
        logger.msg(ERROR, "Unable to write key/value pair to job database (%s): Key \"%s\"", name, (char*)key.get_data());
522
 
        logErrorMessage(ret);
523
 
        return false;
524
 
      }
525
 
    } catch (const std::exception& e) {
526
 
      return false;
527
 
    }
528
 
 
529
 
    return true;
530
 
  }
531
 
 
532
 
  bool JobInformationStorageBDB::Write(const std::list<Job>& jobs, const std::set<std::string>& prunedServices, std::list<const Job*>& newJobs) {
533
 
    if (jobs.empty()) return true;
534
 
    
535
 
    try {
536
 
      JobDB db(name, DB_CREATE);
537
 
      int ret = 0;
538
 
      
539
 
      std::set<std::string> idsOfPrunedJobs;
540
 
      Dbc *cursor = NULL;
541
 
      if ((ret = db.viaServiceInfoKeys()->cursor(NULL, &cursor, DB_WRITECURSOR)) != 0) return false;
542
 
      for (std::set<std::string>::const_iterator itPruned = prunedServices.begin();
543
 
           itPruned != prunedServices.end(); ++itPruned) {
544
 
        Dbt key((char *)itPruned->c_str(), itPruned->size()), pkey, data;
545
 
        if (cursor->pget(&key, &pkey, &data, DB_SET) != 0) continue;
546
 
        do {
547
 
          idsOfPrunedJobs.insert(std::string((char *)pkey.get_data(), pkey.get_size()));
548
 
          cursor->del(0);
549
 
        } while (cursor->pget(&key, &pkey, &data, DB_NEXT_DUP) == 0);
550
 
      }
551
 
      cursor->close();
552
 
      
553
 
      std::list<Job>::const_iterator it = jobs.begin();
554
 
      void* pdata = NULL;
555
 
      Dbt key, data;
556
 
      bool jobWasPruned;
557
 
      {
558
 
        InterruptGuard guard;
559
 
        do {
560
 
          ::free(pdata);
561
 
          key.set_size(it->JobID.size());
562
 
          key.set_data((char*)it->JobID.c_str());
563
 
          serialiseJob(*it, data);
564
 
          pdata = data.get_data();
565
 
          jobWasPruned = (idsOfPrunedJobs.count(it->JobID) != 0);
566
 
          if (!jobWasPruned) { // Check if job already exist.
567
 
            Dbt existingData;
568
 
            if (db->get(NULL, &key, &existingData, 0) == DB_NOTFOUND) {
569
 
              newJobs.push_back(&*it);
570
 
            }
571
 
          }
572
 
        } while (((ret = db->put(NULL, &key, &data, 0)) == 0 && ++it != jobs.end()));
573
 
        ::free(pdata);
574
 
      };
575
 
      
576
 
      if (ret != 0) {
577
 
        logger.msg(ERROR, "Unable to write key/value pair to job database (%s): Key \"%s\"", name, (char*)key.get_data());
578
 
        logErrorMessage(ret);
579
 
        return false;
580
 
      }
581
 
    } catch (const std::exception& e) {
582
 
      return false;
583
 
    }
584
 
 
585
 
    return true;
586
 
  }
587
 
 
588
 
  bool JobInformationStorageBDB::ReadAll(std::list<Job>& jobs, const std::list<std::string>& rejectEndpoints) {
589
 
    jobs.clear();
590
 
 
591
 
    try {
592
 
      int ret;
593
 
      JobDB db(name);
594
 
  
595
 
      Dbc *cursor;
596
 
      if ((ret = db->cursor(NULL, &cursor, 0)) != 0) {
597
 
        //dbp->err(dbp, ret, "DB->cursor");
598
 
        return false;
599
 
      }
600
 
      
601
 
      Dbt key, data;
602
 
      while ((ret = cursor->get(&key, &data, DB_NEXT)) == 0) {
603
 
        jobs.push_back(Job());
604
 
        jobs.back().JobID = std::string((char *)key.get_data(), key.get_size());
605
 
        deserialiseJob(jobs.back(), data);
606
 
        for (std::list<std::string>::const_iterator it = rejectEndpoints.begin();
607
 
             it != rejectEndpoints.end(); ++it) {
608
 
          if (jobs.back().JobManagementURL.StringMatches(*it)) {
609
 
            jobs.pop_back();
610
 
            break;
611
 
          }
612
 
        }
613
 
      }
614
 
  
615
 
      cursor->close();
616
 
      
617
 
      if (ret != DB_NOTFOUND) {
618
 
        //dbp->err(dbp, ret, "DBcursor->get");
619
 
        return false;
620
 
      }
621
 
    } catch (const std::exception& e) {
622
 
      return false;
623
 
    }
624
 
    
625
 
    return true;
626
 
  }
627
 
  
628
 
  static void addJobFromDB(const Dbt& key, const Dbt& data, std::list<Job>& jobs, std::set<std::string>& idsOfAddedJobs, const std::list<std::string>& rejectEndpoints) {
629
 
    jobs.push_back(Job());
630
 
    jobs.back().JobID.assign((char *)key.get_data(), key.get_size());
631
 
    deserialiseJob(jobs.back(), data);
632
 
    if (idsOfAddedJobs.count(jobs.back().JobID) != 0) { // Look for duplicates and remove them.
633
 
      jobs.pop_back();
634
 
      return;
635
 
    }
636
 
    idsOfAddedJobs.insert(jobs.back().JobID);
637
 
    for (std::list<std::string>::const_iterator it = rejectEndpoints.begin();
638
 
         it != rejectEndpoints.end(); ++it) {
639
 
      if (jobs.back().JobManagementURL.StringMatches(*it)) {
640
 
        idsOfAddedJobs.erase(jobs.back().JobID);
641
 
        jobs.pop_back();
642
 
        return;
643
 
      }
644
 
    }
645
 
  }
646
 
 
647
 
  static bool addJobsFromDuplicateKeys(Db& db, Dbt& key, std::list<Job>& jobs, std::set<std::string>& idsOfAddedJobs, const std::list<std::string>& rejectEndpoints) {
648
 
    int ret;
649
 
    Dbt pkey, data;
650
 
    Dbc *cursor;
651
 
    if ((ret = db.cursor(NULL, &cursor, 0)) != 0) {
652
 
      //dbp->err(dbp, ret, "DB->cursor");
653
 
      return false;
654
 
    }
655
 
    ret = cursor->pget(&key, &pkey, &data, DB_SET);
656
 
    if (ret != 0) return false;
657
 
    
658
 
    addJobFromDB(pkey, data, jobs, idsOfAddedJobs, rejectEndpoints);
659
 
    while ((ret = cursor->pget(&key, &pkey, &data, DB_NEXT_DUP)) == 0) {
660
 
       addJobFromDB(pkey, data, jobs, idsOfAddedJobs, rejectEndpoints);
661
 
    }
662
 
    return true;
663
 
  }
664
 
  
665
 
  bool JobInformationStorageBDB::Read(std::list<Job>& jobs, std::list<std::string>& jobIdentifiers,
666
 
                                      const std::list<std::string>& endpoints,
667
 
                                      const std::list<std::string>& rejectEndpoints) {
668
 
    jobs.clear();
669
 
    
670
 
    try {
671
 
      JobDB db(name);
672
 
      int ret;
673
 
      std::set<std::string> idsOfAddedJobs;
674
 
      for (std::list<std::string>::iterator it = jobIdentifiers.begin();
675
 
           it != jobIdentifiers.end();) {
676
 
        if (it->empty()) continue;
677
 
        
678
 
        Dbt key((char *)it->c_str(), it->size()), data;
679
 
        ret = db->get(NULL, &key, &data, 0);
680
 
        if (ret == DB_NOTFOUND) {
681
 
          if (addJobsFromDuplicateKeys(*db.viaNameKeys(), key, jobs, idsOfAddedJobs, rejectEndpoints)) {
682
 
            it = jobIdentifiers.erase(it);
683
 
          }
684
 
          else {
685
 
            ++it;
686
 
          }
687
 
          continue;
688
 
        }
689
 
        addJobFromDB(key, data, jobs, idsOfAddedJobs, rejectEndpoints);
690
 
        it = jobIdentifiers.erase(it);
691
 
      }
692
 
      if (endpoints.empty()) return true;
693
 
      
694
 
      Dbc *cursor;
695
 
      if ((ret = db.viaEndpointKeys()->cursor(NULL, &cursor, 0)) != 0) return false;
696
 
      for (std::list<std::string>::const_iterator it = endpoints.begin();
697
 
           it != endpoints.end(); ++it) {
698
 
        // Extract hostname from iterator.
699
 
        URL u(*it);
700
 
        if (u.Protocol() == "file") {
701
 
          u = URL("http://" + *it); // Only need to extract hostname. Prefix with "http://".
702
 
        }
703
 
        if (u.Host().empty()) continue;
704
 
 
705
 
        Dbt key((char *)u.Host().c_str(), u.Host().size()), pkey, data;
706
 
        ret = cursor->pget(&key, &pkey, &data, DB_SET);
707
 
        if (ret != 0) {
708
 
          continue;
709
 
        }
710
 
        std::string tmpEndpoint;
711
 
        deserialiseNthJobAttribute(tmpEndpoint, data, 7);
712
 
        URL jobManagementURL(tmpEndpoint);
713
 
        if (jobManagementURL.StringMatches(*it)) {
714
 
          addJobFromDB(pkey, data, jobs, idsOfAddedJobs, rejectEndpoints);
715
 
        }
716
 
        while ((ret = cursor->pget(&key, &pkey, &data, DB_NEXT_DUP)) == 0) {
717
 
          deserialiseNthJobAttribute(tmpEndpoint, data, 7);
718
 
          URL jobManagementURL(tmpEndpoint);
719
 
          if (jobManagementURL.StringMatches(*it)) {
720
 
           addJobFromDB(pkey, data, jobs, idsOfAddedJobs, rejectEndpoints);
721
 
         }
722
 
        }
723
 
      }
724
 
    } catch (const std::exception& e) {
725
 
      return false;
726
 
    }
727
 
 
728
 
    return true;
729
 
  }
730
 
  
731
 
  bool JobInformationStorageBDB::Clean() {
732
 
    if (remove(name.c_str()) != 0) {
733
 
      if (errno == ENOENT) return true; // No such file. DB already cleaned.
734
 
      logger.msg(ERROR, "Unable to truncate job database (%s)", name);
735
 
      perror("Error");
736
 
      return false;
737
 
    }
738
 
    
739
 
    return true;
740
 
  }
741
 
  
742
 
  bool JobInformationStorageBDB::Remove(const std::list<std::string>& jobids) {
743
 
    try {
744
 
      InterruptGuard guard;
745
 
      JobDB db(name, DB_CREATE);
746
 
      for (std::list<std::string>::const_iterator it = jobids.begin();
747
 
           it != jobids.end(); ++it) {
748
 
        Dbt key((char *)it->c_str(), it->size());
749
 
        db->del(NULL, &key, 0);
750
 
      }
751
 
    } catch (const std::exception& e) {
752
 
      return false;
753
 
    }
754
 
    
755
 
    return true;
756
 
  }
757
 
  
758
 
  void JobInformationStorageBDB::logErrorMessage(int err) {
759
 
    switch (err) {
760
 
    case ENOENT:
761
 
      logger.msg(DEBUG, "ENOENT: The file or directory does not exist, Or a nonexistent re_source file was specified.");
762
 
      break;
763
 
    case DB_OLD_VERSION:
764
 
      logger.msg(DEBUG, "DB_OLD_VERSION: The database cannot be opened without being first upgraded.");
765
 
      break;
766
 
    case EEXIST:
767
 
      logger.msg(DEBUG, "EEXIST: DB_CREATE and DB_EXCL were specified and the database exists.");
768
 
    case EINVAL:
769
 
      logger.msg(DEBUG, "EINVAL");
770
 
      break;
771
 
    default:
772
 
      logger.msg(DEBUG, "Unable to determine error (%d)", err);
773
 
    }
774
 
  }
775
 
#endif
776
 
 
777
 
} // namespace Arc