~ubuntu-branches/ubuntu/utopic/gridengine/utopic

« back to all changes in this revision

Viewing changes to source/libs/spool/berkeleydb/sge_bdb.c

  • Committer: Bazaar Package Importer
  • Author(s): Mark Hymers
  • Date: 2008-06-25 22:36:13 UTC
  • Revision ID: james.westby@ubuntu.com-20080625223613-tvd9xlhuoct9kyhm
Tags: upstream-6.2~beta2
ImportĀ upstreamĀ versionĀ 6.2~beta2

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*___INFO__MARK_BEGIN__*/
 
2
/*************************************************************************
 
3
 *
 
4
 *  The Contents of this file are made available subject to the terms of
 
5
 *  the Sun Industry Standards Source License Version 1.2
 
6
 *
 
7
 *  Sun Microsystems Inc., March, 2001
 
8
 *
 
9
 *
 
10
 *  Sun Industry Standards Source License Version 1.2
 
11
 *  =================================================
 
12
 *  The contents of this file are subject to the Sun Industry Standards
 
13
 *  Source License Version 1.2 (the "License"); You may not use this file
 
14
 *  except in compliance with the License. You may obtain a copy of the
 
15
 *  License at http://gridengine.sunsource.net/Gridengine_SISSL_license.html
 
16
 *
 
17
 *  Software provided under this License is provided on an "AS IS" basis,
 
18
 *  WITHOUT WARRANTY OF ANY KIND, EITHER EXPRESSED OR IMPLIED, INCLUDING,
 
19
 *  WITHOUT LIMITATION, WARRANTIES THAT THE SOFTWARE IS FREE OF DEFECTS,
 
20
 *  MERCHANTABLE, FIT FOR A PARTICULAR PURPOSE, OR NON-INFRINGING.
 
21
 *  See the License for the specific provisions governing your rights and
 
22
 *  obligations concerning the Software.
 
23
 *
 
24
 *   The Initial Developer of the Original Code is: Sun Microsystems, Inc.
 
25
 *
 
26
 *   Copyright: 2001 by Sun Microsystems, Inc.
 
27
 *
 
28
 *   All Rights Reserved.
 
29
 *
 
30
 ************************************************************************/
 
31
/*___INFO__MARK_END__*/                                   
 
32
 
 
33
#include <errno.h>
 
34
#include <string.h>
 
35
 
 
36
#include "rmon/sgermon.h"
 
37
#include "uti/sge_log.h"
 
38
 
 
39
#include "uti/sge_profiling.h"
 
40
#include "uti/sge_string.h"
 
41
#include "uti/sge_unistd.h"
 
42
 
 
43
#include "cull/cull.h"
 
44
 
 
45
#include "sgeobj/sge_answer.h"
 
46
#include "sgeobj/sge_cqueue.h"
 
47
#include "sgeobj/sge_ja_task.h"
 
48
#include "sgeobj/sge_job.h"
 
49
#include "sgeobj/sge_object.h"
 
50
#include "sgeobj/sge_str.h"
 
51
 
 
52
/* local */
 
53
#include "msg_common.h"
 
54
#include "spool/berkeleydb/msg_spoollib_berkeleydb.h"
 
55
 
 
56
#include "spool/berkeleydb/sge_bdb.h"
 
57
 
 
58
#if 1
 
59
static const int pack_part = CULL_SPOOL | CULL_SUBLIST | CULL_SPOOL_PROJECT | 
 
60
                             CULL_SPOOL_USER;
 
61
#else
 
62
static const int pack_part = 0;
 
63
#endif
 
64
 
 
65
static void 
 
66
spool_berkeleydb_error_close(bdb_info info);
 
67
 
 
68
static void
 
69
spool_berkeleydb_handle_bdb_error(lList **answer_list, bdb_info info,
 
70
                                  int bdb_errno);
 
71
 
 
72
static bool
 
73
spool_berkeleydb_clear_log(lList **answer_list, bdb_info info);
 
74
 
 
75
static bool
 
76
spool_berkeleydb_trigger_rpc(lList **answer_list, bdb_info info);
 
77
 
 
78
static bool
 
79
spool_berkeleydb_checkpoint(lList **answer_list, bdb_info info);
 
80
 
 
81
/****** spool/berkeleydb/spool_berkeleydb_check_version() **********************
 
82
*  NAME
 
83
*     spool_berkeleydb_check_version() -- check version of shared libs 
 
84
*
 
85
*  SYNOPSIS
 
86
*     bool 
 
87
*     spool_berkeleydb_check_version(lList **answer_list) 
 
88
*
 
89
*  FUNCTION
 
90
*     Checks if major and minor version number returned by the db_version()
 
91
*     library call of Berkeley DB matches the version numbers set at compile
 
92
*     time.
 
93
*
 
94
*     The major and minor number must be equal, the patch level may differ.
 
95
*
 
96
*  INPUTS
 
97
*     lList **answer_list - used to return info and error messages
 
98
*
 
99
*  RESULT
 
100
*     bool - true, on success, else false
 
101
*
 
102
*  NOTES
 
103
*     MT-NOTE: spool_berkeleydb_check_version() is MT safe 
 
104
*
 
105
*******************************************************************************/
 
106
bool
 
107
spool_berkeleydb_check_version(lList **answer_list)
 
108
{
 
109
   bool ret = true;
 
110
   const char *version;
 
111
   int major, minor;
 
112
 
 
113
   DENTER(TOP_LAYER, "spool_berkeleydb_check_version");
 
114
   
 
115
   version = db_version(&major, &minor, NULL);
 
116
 
 
117
   answer_list_add_sprintf(answer_list, STATUS_EUNKNOWN, 
 
118
                           ANSWER_QUALITY_INFO, 
 
119
                           MSG_BERKELEY_USINGBDBVERSION_S,
 
120
                           version);
 
121
 
 
122
   if (major != DB_VERSION_MAJOR || minor != DB_VERSION_MINOR) {
 
123
      answer_list_add_sprintf(answer_list, STATUS_EUNKNOWN, 
 
124
                              ANSWER_QUALITY_ERROR, 
 
125
                              MSG_BERKELEY_WRONGBDBVERSIONEXPECTING_SDD,
 
126
                              version, DB_VERSION_MAJOR, DB_VERSION_MINOR);
 
127
      ret = false;
 
128
   }
 
129
  
 
130
   DEXIT;
 
131
   return ret;
 
132
}
 
133
 
 
134
/****** spool/berkeleydb/spool_berkeleydb_create_environment() *****************
 
135
*  NAME
 
136
*     spool_berkeleydb_create_environment() -- ??? 
 
137
*
 
138
*  SYNOPSIS
 
139
*     bool spool_berkeleydb_create_environment(lList **answer_list, struct 
 
140
*     bdb_info info, const char *url) 
 
141
*
 
142
*  FUNCTION
 
143
*     ??? 
 
144
*
 
145
*  INPUTS
 
146
*     lList **answer_list   - ??? 
 
147
*     bdb_info info - ??? 
 
148
*     const char *url       - ??? 
 
149
*
 
150
*  RESULT
 
151
*     bool - 
 
152
*
 
153
*  EXAMPLE
 
154
*     ??? 
 
155
*
 
156
*  NOTES
 
157
*     MT-NOTE: spool_berkeleydb_create_environment() is not MT safe 
 
158
*
 
159
*  BUGS
 
160
*     ??? 
 
161
*
 
162
*  SEE ALSO
 
163
*     ???/???
 
164
*******************************************************************************/
 
165
bool spool_berkeleydb_create_environment(lList **answer_list, 
 
166
                                         bdb_info info)
 
167
 
168
   bool ret = true;
 
169
   int dbret;
 
170
   const char *server, *path;
 
171
 
 
172
   DB_ENV *env = NULL;
 
173
 
 
174
   DENTER(TOP_LAYER, "spool_berkeleydb_create_environment");
 
175
 
 
176
   server = bdb_get_server(info);
 
177
   path   = bdb_get_path(info);
 
178
 
 
179
   /* check database directory (only in case of local spooling) */
 
180
   if (server == NULL && !sge_is_directory(path)) {
 
181
      answer_list_add_sprintf(answer_list, STATUS_EUNKNOWN, 
 
182
                              ANSWER_QUALITY_ERROR, 
 
183
                              MSG_BERKELEY_DATABASEDIRDOESNTEXIST_S,
 
184
                              path);
 
185
      ret = false;
 
186
   }
 
187
 
 
188
   if (ret) {
 
189
      /* we have to lock the info structure, as multiple threads might try
 
190
       * to open the env in parallel.
 
191
       */
 
192
      bdb_lock_info(info);
 
193
 
 
194
      /* check, if env has been initialized in the meantime */
 
195
      env = bdb_get_env(info);
 
196
   }
 
197
 
 
198
   /* continue only, if env isn't initialized yet */
 
199
   if (ret && env == NULL) {
 
200
      int flags = 0;
 
201
 
 
202
      if (server != NULL) {
 
203
         flags |= DB_RPCCLIENT;
 
204
      }
 
205
 
 
206
      PROF_START_MEASUREMENT(SGE_PROF_SPOOLINGIO);
 
207
      dbret = db_env_create(&env, flags);
 
208
      PROF_STOP_MEASUREMENT(SGE_PROF_SPOOLINGIO);
 
209
      if (dbret != 0) {
 
210
         answer_list_add_sprintf(answer_list, STATUS_EUNKNOWN, 
 
211
                                 ANSWER_QUALITY_ERROR, 
 
212
                                 MSG_BERKELEY_COULDNTCREATEENVIRONMENT_IS,
 
213
                                 dbret, db_strerror(dbret));
 
214
         ret = false;
 
215
      }
 
216
 
 
217
      /* do deadlock detection internally (only in case of local spooling) */
 
218
      if (ret && server == NULL) {
 
219
         PROF_START_MEASUREMENT(SGE_PROF_SPOOLINGIO);
 
220
         dbret = env->set_lk_detect(env, DB_LOCK_DEFAULT);
 
221
         PROF_STOP_MEASUREMENT(SGE_PROF_SPOOLINGIO);
 
222
         if (dbret != 0) {
 
223
            answer_list_add_sprintf(answer_list, STATUS_EUNKNOWN, 
 
224
                                    ANSWER_QUALITY_ERROR, 
 
225
                                    MSG_BERKELEY_COULDNTESETUPLOCKDETECTION_IS,
 
226
                                    dbret, db_strerror(dbret));
 
227
            ret = false;
 
228
         } 
 
229
 
 
230
         /* 
 
231
          * performance tuning 
 
232
          * Switch off flushing of transaction log for every single transaction.
 
233
          * This tuning option has huge impact on performance, but only a slight impact 
 
234
          * on database durability: In case of a server/filesystem crash, we might loose
 
235
          * the last transactions committed before the crash. Still all transactions will
 
236
          * be atomic, isolated and the database will be consistent at any time.
 
237
          */
 
238
         if (ret) {
 
239
            dbret = env->set_flags(env, DB_TXN_WRITE_NOSYNC, 1);
 
240
            if (dbret != 0) {
 
241
               answer_list_add_sprintf(answer_list, STATUS_EUNKNOWN, 
 
242
                                       ANSWER_QUALITY_ERROR, 
 
243
                                       MSG_BERKELEY_CANTSETENVFLAGS_IS,
 
244
                                       dbret, db_strerror(dbret));
 
245
               ret = false;
 
246
            } 
 
247
         }
 
248
 
 
249
         /* 
 
250
          * performance tuning 
 
251
          * increase the cache size
 
252
          */
 
253
         if (ret) {
 
254
            dbret = env->set_cachesize(env, 0, 4 * 1024 * 1024, 1);
 
255
            if (dbret != 0) {
 
256
               spool_berkeleydb_handle_bdb_error(answer_list, info, dbret);
 
257
               answer_list_add_sprintf(answer_list, STATUS_EUNKNOWN, 
 
258
                                       ANSWER_QUALITY_ERROR,
 
259
                                       MSG_BERKELEY_CANTSETENVCACHE_IS,
 
260
                                       dbret, db_strerror(dbret));
 
261
               ret = false;
 
262
            }
 
263
         }
 
264
      }
 
265
 
 
266
      /* if we use a RPC server, set it in the DB_ENV */
 
267
      if (ret && server != NULL) {
 
268
         PROF_START_MEASUREMENT(SGE_PROF_SPOOLINGIO);
 
269
         dbret = env->set_rpc_server(env, NULL, server, 0, 0, 0);
 
270
         PROF_STOP_MEASUREMENT(SGE_PROF_SPOOLINGIO);
 
271
         if (dbret != 0) {
 
272
            answer_list_add_sprintf(answer_list, STATUS_EUNKNOWN, 
 
273
                                    ANSWER_QUALITY_ERROR, 
 
274
                                    MSG_BERKELEY_COULDNTESETRPCSERVER_IS,
 
275
                                    dbret, db_strerror(dbret));
 
276
            ret = false;
 
277
         } 
 
278
      }
 
279
 
 
280
      /* the lock parameters only can be set, if we have local spooling.
 
281
       * RPC server: use DB_CONFIG file.
 
282
       */
 
283
      if (server == NULL) {
 
284
         /* worst case scenario: n lockers, all changing m objects in 
 
285
          * parallel 
 
286
          */
 
287
#if 0
 
288
         int lockers = 5;
 
289
         int objects = 5000;
 
290
         int locks = lockers * 2 * objects;
 
291
 
 
292
         /* set locking params: max lockers */
 
293
         if (ret) {
 
294
            PROF_START_MEASUREMENT(SGE_PROF_SPOOLINGIO);
 
295
            dbret = env->set_lk_max_lockers(env, lockers);
 
296
            PROF_STOP_MEASUREMENT(SGE_PROF_SPOOLINGIO);
 
297
            if (dbret != 0) {
 
298
               answer_list_add_sprintf(answer_list, STATUS_EUNKNOWN, 
 
299
                                       ANSWER_QUALITY_ERROR, 
 
300
                                       MSG_BERKELEY_COULDNTSETLOCKERS_IS,
 
301
                                       dbret, db_strerror(dbret));
 
302
               ret = false;
 
303
            } 
 
304
         }
 
305
         /* set locking params: max objects */
 
306
         if (ret) {
 
307
            PROF_START_MEASUREMENT(SGE_PROF_SPOOLINGIO);
 
308
            dbret = env->set_lk_max_objects(env, objects);
 
309
            PROF_STOP_MEASUREMENT(SGE_PROF_SPOOLINGIO);
 
310
            if (dbret != 0) {
 
311
               answer_list_add_sprintf(answer_list, STATUS_EUNKNOWN, 
 
312
                                       ANSWER_QUALITY_ERROR, 
 
313
                                       MSG_BERKELEY_COULDNTSETOBJECTS_IS,
 
314
                                       dbret, db_strerror(dbret));
 
315
               ret = false;
 
316
            } 
 
317
         }
 
318
         /* set locking params: max locks */
 
319
         if (ret) {
 
320
            PROF_START_MEASUREMENT(SGE_PROF_SPOOLINGIO);
 
321
            dbret = env->set_lk_max_locks(env, locks);
 
322
            PROF_STOP_MEASUREMENT(SGE_PROF_SPOOLINGIO);
 
323
            if (dbret != 0) {
 
324
               answer_list_add_sprintf(answer_list, STATUS_EUNKNOWN, 
 
325
                                       ANSWER_QUALITY_ERROR, 
 
326
                                       MSG_BERKELEY_COULDNTSETLOCKS_IS,
 
327
                                       dbret, db_strerror(dbret));
 
328
               ret = false;
 
329
            } 
 
330
         }
 
331
#endif
 
332
      }
 
333
 
 
334
      /* open the environment */
 
335
      if (ret) {
 
336
         int flags = DB_CREATE | DB_INIT_LOCK | DB_INIT_LOG | DB_INIT_MPOOL | 
 
337
                     DB_INIT_TXN;
 
338
 
 
339
         if (server == NULL) {
 
340
            flags |= DB_THREAD;
 
341
         }
 
342
 
 
343
         if (bdb_get_recover(info)) {
 
344
            flags |= DB_RECOVER;
 
345
         }
 
346
 
 
347
         PROF_START_MEASUREMENT(SGE_PROF_SPOOLINGIO);
 
348
         dbret = env->open(env, path, flags,
 
349
                           S_IRUSR | S_IWUSR);
 
350
         PROF_STOP_MEASUREMENT(SGE_PROF_SPOOLINGIO);
 
351
         if (dbret != 0){
 
352
            answer_list_add_sprintf(answer_list, STATUS_EUNKNOWN, 
 
353
                                    ANSWER_QUALITY_ERROR, 
 
354
                                    MSG_BERKELEY_COULDNTOPENENVIRONMENT_SSIS,
 
355
                                    server == NULL ? "local spooling" : server, 
 
356
                                    path, dbret, db_strerror(dbret));
 
357
            ret = false;
 
358
            env = NULL;
 
359
         }
 
360
         
 
361
         bdb_set_env(info, env);
 
362
      }
 
363
   }
 
364
 
 
365
   /* now unlock the info structure */
 
366
   bdb_unlock_info(info);
 
367
 
 
368
   DEXIT;
 
369
   return ret;
 
370
}
 
371
 
 
372
bool 
 
373
spool_berkeleydb_open_database(lList **answer_list, bdb_info info, 
 
374
                               bool create)
 
375
{
 
376
   bool ret = true;
 
377
   bdb_database i;
 
378
 
 
379
   DENTER(TOP_LAYER, "spool_berkeleydb_open_database");
 
380
 
 
381
   for (i = BDB_CONFIG_DB; i < BDB_ALL_DBS && ret; i++) {
 
382
      DB_ENV *env;
 
383
      DB *db;
 
384
 
 
385
      int dbret = 0;
 
386
 
 
387
      /* we have to lock info, as multiple threads might try to (re)open
 
388
       * the database connection in parallel 
 
389
       */
 
390
      bdb_lock_info(info);
 
391
       
 
392
      env = bdb_get_env(info);
 
393
 
 
394
      if (env == NULL) {
 
395
         dstring dbname_dstring = DSTRING_INIT;
 
396
         const char *dbname;
 
397
         
 
398
         dbname = bdb_get_dbname(info, &dbname_dstring);
 
399
         answer_list_add_sprintf(answer_list, STATUS_EUNKNOWN, 
 
400
                                 ANSWER_QUALITY_ERROR, 
 
401
                                 MSG_BERKELEY_NOCONNECTIONOPEN_S,
 
402
                                 dbname);
 
403
         sge_dstring_free(&dbname_dstring);
 
404
         ret = false;
 
405
      }
 
406
 
 
407
      /* check db - another thread could have opened it in the meantime */
 
408
      if (ret) {
 
409
         db = bdb_get_db(info, i);
 
410
      }
 
411
 
 
412
      if (ret && db == NULL) {
 
413
         /* create a database handle */
 
414
         if (ret) {
 
415
            PROF_START_MEASUREMENT(SGE_PROF_SPOOLINGIO);
 
416
            dbret = db_create(&db, env, 0);
 
417
            PROF_STOP_MEASUREMENT(SGE_PROF_SPOOLINGIO);
 
418
            if (dbret != 0) {
 
419
               answer_list_add_sprintf(answer_list, STATUS_EUNKNOWN, 
 
420
                                       ANSWER_QUALITY_ERROR, 
 
421
                                       MSG_BERKELEY_COULDNTCREATEDBHANDLE_IS,
 
422
                                       dbret, db_strerror(dbret));
 
423
               ret = false;
 
424
               db = NULL;
 
425
            }
 
426
         }
 
427
 
 
428
         /* open database handle */
 
429
         if (ret) {
 
430
            int flags = 0;
 
431
            int mode  = 0;
 
432
 
 
433
            if (bdb_get_server(info) == NULL) {
 
434
               flags |= DB_THREAD;
 
435
            }
 
436
 
 
437
            /* the config db will only be created, if explicitly requested
 
438
             * (in spoolinit). DB already existing will be handled as error.
 
439
             * Other databases will be created as needed.
 
440
             */
 
441
            if (i == BDB_CONFIG_DB) {
 
442
               if (create) {
 
443
                  flags |= DB_CREATE | DB_EXCL;
 
444
                  mode =  S_IRUSR | S_IWUSR;
 
445
               }
 
446
            } else {
 
447
                  flags |= DB_CREATE;
 
448
                  mode =  S_IRUSR | S_IWUSR;
 
449
            }
 
450
 
 
451
            ret = spool_berkeleydb_start_transaction(answer_list, info);
 
452
            if (ret) {
 
453
               const char *db_name = bdb_get_database_name(i); 
 
454
               DB_TXN *txn = bdb_get_txn(info);
 
455
               PROF_START_MEASUREMENT(SGE_PROF_SPOOLINGIO);
 
456
               dbret = db->open(db, txn, db_name, NULL, 
 
457
                                DB_BTREE, flags, mode);
 
458
               PROF_STOP_MEASUREMENT(SGE_PROF_SPOOLINGIO);
 
459
               ret = spool_berkeleydb_end_transaction(answer_list, info, true);
 
460
            }
 
461
            if (dbret != 0) {
 
462
               spool_berkeleydb_handle_bdb_error(answer_list, info, dbret);
 
463
               answer_list_add_sprintf(answer_list, STATUS_EUNKNOWN, 
 
464
                                       ANSWER_QUALITY_ERROR,
 
465
                                       create ? MSG_BERKELEY_COULDNTCREATEDB_SIS
 
466
                                              : MSG_BERKELEY_COULDNTOPENDB_SIS,
 
467
                                       bdb_get_database_name(i), 
 
468
                                       dbret, db_strerror(dbret));
 
469
               ret = false;
 
470
            }
 
471
         }
 
472
 
 
473
         /* if everything is ok - set the database handle */
 
474
         if (ret) {
 
475
            bdb_set_db(info, db, i);
 
476
            DPRINTF(("opened database connection, env = %p, db = %p\n", env, db));
 
477
         }
 
478
      }
 
479
 
 
480
      bdb_unlock_info(info);
 
481
   }
 
482
 
 
483
   DEXIT;
 
484
   return ret;
 
485
}
 
486
 
 
487
bool 
 
488
spool_berkeleydb_close_database(lList **answer_list, bdb_info info)
 
489
{
 
490
   bool ret = true;
 
491
 
 
492
   DB_ENV *env;
 
493
 
 
494
   /* database name for info or error output */
 
495
   char dbname_buffer[MAX_STRING_SIZE];
 
496
   dstring dbname_dstring = DSTRING_INIT;
 
497
   const char *dbname;
 
498
 
 
499
   DENTER(TOP_LAYER, "spool_berkeleydb_close_database");
 
500
 
 
501
   sge_dstring_init(&dbname_dstring, dbname_buffer, sizeof(dbname_buffer));
 
502
   dbname = bdb_get_dbname(info, &dbname_dstring);
 
503
 
 
504
   /* lock the database info, multiple threads might try to close it */
 
505
   bdb_lock_info(info);
 
506
   env = bdb_get_env(info);
 
507
   if (env == NULL) {
 
508
      answer_list_add_sprintf(answer_list, STATUS_EUNKNOWN, 
 
509
                              ANSWER_QUALITY_ERROR, 
 
510
                              MSG_BERKELEY_NOCONNECTIONOPEN_S,
 
511
                              dbname);
 
512
      ret = false;
 
513
   } else {
 
514
      bdb_database i;
 
515
      int dbret;
 
516
      for (i = BDB_CONFIG_DB; i < BDB_ALL_DBS; i++) {
 
517
         DB *db;
 
518
 
 
519
         /* close open database */
 
520
         db = bdb_get_db(info, i);
 
521
         if (db != NULL) {
 
522
            int dbret;
 
523
 
 
524
            PROF_START_MEASUREMENT(SGE_PROF_SPOOLINGIO);
 
525
            dbret = db->close(db, 0);
 
526
            PROF_STOP_MEASUREMENT(SGE_PROF_SPOOLINGIO);
 
527
            if (dbret != 0) {
 
528
               answer_list_add_sprintf(answer_list, STATUS_EUNKNOWN, 
 
529
                                       ANSWER_QUALITY_ERROR, 
 
530
                                       MSG_BERKELEY_COULDNTCLOSEDB_SIS,
 
531
                                       bdb_get_database_name(i), 
 
532
                                       dbret, db_strerror(dbret));
 
533
               ret = false;
 
534
            }
 
535
 
 
536
            db = NULL;
 
537
            bdb_set_db(info, db, i);
 
538
         }
 
539
      }
 
540
 
 
541
      /* close env in any case, even if db->close failed */
 
542
      PROF_START_MEASUREMENT(SGE_PROF_SPOOLINGIO);
 
543
      dbret = env->close(env, 0);
 
544
      PROF_STOP_MEASUREMENT(SGE_PROF_SPOOLINGIO);
 
545
      if (dbret != 0) {
 
546
         answer_list_add_sprintf(answer_list, STATUS_EUNKNOWN, 
 
547
                                 ANSWER_QUALITY_ERROR, 
 
548
                                 MSG_BERKELEY_COULDNTCLOSEENVIRONMENT_SIS,
 
549
                                 dbname, dbret, db_strerror(dbret));
 
550
         ret = false;
 
551
      } else {
 
552
        answer_list_add_sprintf(answer_list, STATUS_EUNKNOWN, 
 
553
                                ANSWER_QUALITY_INFO, 
 
554
                                MSG_BERKELEY_CLOSEDDB_S,
 
555
                                dbname);
 
556
      }
 
557
 
 
558
      env = NULL;
 
559
      bdb_set_env(info, env);
 
560
   }
 
561
 
 
562
   bdb_unlock_info(info);
 
563
 
 
564
   DEXIT;
 
565
   return ret;
 
566
}
 
567
 
 
568
/****** sge_bdb/spool_berkeleydb_start_transaction() ***************************
 
569
*  NAME
 
570
*     spool_berkeleydb_start_transaction() -- start a transaction
 
571
*
 
572
*  SYNOPSIS
 
573
*     bool 
 
574
*     spool_berkeleydb_start_transaction(lList **answer_list, bdb_info info) 
 
575
*
 
576
*  FUNCTION
 
577
*     Starts a transaction.
 
578
*     Transactions are bound to a certain thread, multiple threads can start
 
579
*     transactions in parallel.
 
580
*
 
581
*  INPUTS
 
582
*     lList **answer_list   - used to return error messages
 
583
*     bdb_info info - database handle
 
584
*
 
585
*  RESULT
 
586
*     bool - true on success, else false
 
587
*
 
588
*  NOTES
 
589
*     MT-NOTE: spool_berkeleydb_start_transaction() is MT safe 
 
590
*
 
591
*  SEE ALSO
 
592
*     spool/berkeleydb/spool_berkeleydb_end_transaction()
 
593
*******************************************************************************/
 
594
bool
 
595
spool_berkeleydb_start_transaction(lList **answer_list, bdb_info info)
 
596
{
 
597
   bool ret = true;
 
598
 
 
599
   DB_ENV *env;
 
600
   DB_TXN *txn;
 
601
 
 
602
   DENTER(TOP_LAYER, "spool_berkeleydb_start_transaction");
 
603
 
 
604
   env = bdb_get_env(info);
 
605
   txn = bdb_get_txn(info);
 
606
 
 
607
   if (env == NULL) {
 
608
      dstring dbname_dstring = DSTRING_INIT;
 
609
      const char *dbname;
 
610
      
 
611
      dbname = bdb_get_dbname(info, &dbname_dstring);
 
612
      answer_list_add_sprintf(answer_list, STATUS_EUNKNOWN, 
 
613
                              ANSWER_QUALITY_ERROR, 
 
614
                              MSG_BERKELEY_NOCONNECTIONOPEN_S,
 
615
                              dbname);
 
616
      sge_dstring_free(&dbname_dstring);
 
617
      ret = false;
 
618
   } else {
 
619
      if (txn != NULL) {
 
620
         answer_list_add_sprintf(answer_list, STATUS_EUNKNOWN, 
 
621
                                 ANSWER_QUALITY_ERROR, 
 
622
                                 MSG_BERKELEY_TXNALREADYOPEN);
 
623
         ret = false;
 
624
      } else {
 
625
         int dbret;
 
626
         int flags = 0;
 
627
 
 
628
         /* 
 
629
          * RPC server does no deadlock detection - if a lock cannot be 
 
630
          * obtained, exit immediately
 
631
          */
 
632
         if (bdb_get_server(info) != NULL) {
 
633
            flags |= DB_TXN_NOWAIT;
 
634
         }
 
635
 
 
636
         PROF_START_MEASUREMENT(SGE_PROF_SPOOLINGIO);
 
637
         dbret = env->txn_begin(env, NULL, &txn, flags);
 
638
         PROF_STOP_MEASUREMENT(SGE_PROF_SPOOLINGIO);
 
639
         if (dbret != 0) {
 
640
            spool_berkeleydb_handle_bdb_error(answer_list, info, dbret);
 
641
            answer_list_add_sprintf(answer_list, STATUS_EUNKNOWN, 
 
642
                                    ANSWER_QUALITY_ERROR, 
 
643
                                    MSG_BERKELEY_ERRORSTARTINGTRANSACTION_IS,
 
644
                                    dbret, db_strerror(dbret));
 
645
            ret = false;
 
646
            txn = NULL;
 
647
         }
 
648
      }
 
649
 
 
650
      bdb_set_txn(info, txn);
 
651
      DEBUG((SGE_EVENT, "BEGIN transaction\n"));
 
652
   }
 
653
 
 
654
   DEXIT;
 
655
   return ret;
 
656
}
 
657
 
 
658
bool
 
659
spool_berkeleydb_end_transaction(lList **answer_list, bdb_info info, 
 
660
                                 bool commit)
 
661
{
 
662
   bool ret = true;
 
663
   int dbret;
 
664
 
 
665
   DB_ENV *env;
 
666
   DB_TXN *txn;
 
667
 
 
668
   DENTER(TOP_LAYER, "spool_berkeleydb_end_transaction");
 
669
 
 
670
   env = bdb_get_env(info);
 
671
   txn = bdb_get_txn(info);
 
672
 
 
673
   if (env == NULL) {
 
674
      dstring dbname_dstring = DSTRING_INIT;
 
675
      const char *dbname;
 
676
      
 
677
      dbname = bdb_get_dbname(info, &dbname_dstring);
 
678
      answer_list_add_sprintf(answer_list, STATUS_EUNKNOWN, 
 
679
                              ANSWER_QUALITY_ERROR, 
 
680
                              MSG_BERKELEY_NOCONNECTIONOPEN_S,
 
681
                              dbname);
 
682
      sge_dstring_free(&dbname_dstring);
 
683
      ret = false;
 
684
   } else {
 
685
      if (txn == NULL) {
 
686
         answer_list_add_sprintf(answer_list, STATUS_EUNKNOWN, 
 
687
                                 ANSWER_QUALITY_ERROR, 
 
688
                                 MSG_BERKELEY_TXNNOTOPEN);
 
689
         ret = false;
 
690
      } else {
 
691
         if (commit) {
 
692
            DEBUG((SGE_EVENT, "COMMIT transaction\n"));
 
693
            PROF_START_MEASUREMENT(SGE_PROF_SPOOLINGIO);
 
694
            dbret = txn->commit(txn, 0);
 
695
            PROF_STOP_MEASUREMENT(SGE_PROF_SPOOLINGIO);
 
696
         } else {
 
697
            DEBUG((SGE_EVENT, "ABORT transaction\n"));
 
698
            answer_list_add_sprintf(answer_list, STATUS_EUNKNOWN, 
 
699
                                    ANSWER_QUALITY_WARNING, 
 
700
                                    MSG_BERKELEY_ABORTINGTRANSACTION);
 
701
            PROF_START_MEASUREMENT(SGE_PROF_SPOOLINGIO);
 
702
            dbret = txn->abort(txn);
 
703
            PROF_STOP_MEASUREMENT(SGE_PROF_SPOOLINGIO);
 
704
         }
 
705
 
 
706
         if (dbret != 0) {
 
707
            spool_berkeleydb_handle_bdb_error(answer_list, info, dbret);
 
708
            answer_list_add_sprintf(answer_list, STATUS_EUNKNOWN, 
 
709
                                    ANSWER_QUALITY_ERROR, 
 
710
                                    MSG_BERKELEY_ERRORENDINGTRANSACTION_IS,
 
711
                                    dbret, db_strerror(dbret));
 
712
            ret = false;
 
713
         }
 
714
 
 
715
         txn = NULL;
 
716
         bdb_set_txn(info, txn);
 
717
      }
 
718
   }
 
719
 
 
720
   DEXIT;
 
721
   return ret;
 
722
}
 
723
 
 
724
bool 
 
725
spool_berkeleydb_trigger(lList **answer_list, bdb_info info, 
 
726
                         time_t trigger, time_t *next_trigger)
 
727
{
 
728
   bool ret = true;
 
729
 
 
730
   DENTER(TOP_LAYER, "spool_berkeleydb_trigger");
 
731
 
 
732
   if (bdb_get_next_clear(info) <= trigger) {
 
733
      /* 
 
734
       * in the clear interval, we 
 
735
       * - clear unused transaction logs for local spooling
 
736
       * - do a dummy request in case of RPC spooling to avoid timeouts
 
737
       */
 
738
      if (bdb_get_server(info) == NULL) {
 
739
         ret = spool_berkeleydb_clear_log(answer_list, info);
 
740
      } else {
 
741
         ret = spool_berkeleydb_trigger_rpc(answer_list, info);
 
742
      }
 
743
      bdb_set_next_clear(info, trigger + BERKELEYDB_CLEAR_INTERVAL);
 
744
   }
 
745
 
 
746
   if (bdb_get_next_checkpoint(info) <= trigger) {
 
747
      ret = spool_berkeleydb_checkpoint(answer_list, info);
 
748
      bdb_set_next_checkpoint(info, trigger + BERKELEYDB_CHECKPOINT_INTERVAL);
 
749
   }
 
750
 
 
751
   /* set time of next trigger */
 
752
   *next_trigger = MIN(bdb_get_next_clear(info), bdb_get_next_checkpoint(info));
 
753
 
 
754
   DEXIT;
 
755
   return ret;
 
756
}
 
757
 
 
758
bool 
 
759
spool_berkeleydb_read_list(lList **answer_list, bdb_info info,
 
760
                           const bdb_database database,
 
761
                           lList **list, const lDescr *descr,
 
762
                           const char *key)
 
763
{
 
764
   bool ret = true;
 
765
   int dbret;
 
766
 
 
767
   DB *db;
 
768
   DB_TXN *txn;
 
769
 
 
770
   DBT key_dbt, data_dbt;
 
771
   DBC *dbc;
 
772
 
 
773
   DENTER(TOP_LAYER, "spool_berkeleydb_read_list");
 
774
 
 
775
   db  = bdb_get_db(info, database);
 
776
   txn = bdb_get_txn(info);
 
777
 
 
778
   if (db == NULL) {
 
779
      answer_list_add_sprintf(answer_list, STATUS_EUNKNOWN, 
 
780
                              ANSWER_QUALITY_ERROR, 
 
781
                              MSG_BERKELEY_NOCONNECTIONOPEN_S,
 
782
                              bdb_get_database_name(database));
 
783
      spool_berkeleydb_error_close(info);
 
784
      ret = false;
 
785
   } else {
 
786
      DEBUG((SGE_EVENT, "querying objects with keys %s*\n", key));
 
787
 
 
788
      PROF_START_MEASUREMENT(SGE_PROF_SPOOLINGIO);
 
789
      dbret = db->cursor(db, txn, &dbc, 0);
 
790
      PROF_STOP_MEASUREMENT(SGE_PROF_SPOOLINGIO);
 
791
      if (dbret != 0) {
 
792
         spool_berkeleydb_handle_bdb_error(answer_list, info, dbret);
 
793
         answer_list_add_sprintf(answer_list, STATUS_EUNKNOWN, 
 
794
                                 ANSWER_QUALITY_ERROR, 
 
795
                                 MSG_BERKELEY_CANNOTCREATECURSOR_IS,
 
796
                                 dbret, db_strerror(dbret));
 
797
         ret = false;
 
798
      } else {
 
799
         bool done;
 
800
         /* initialize query to first record for this object type */
 
801
         memset(&key_dbt, 0, sizeof(key_dbt));
 
802
         memset(&data_dbt, 0, sizeof(data_dbt));
 
803
         key_dbt.data = (void *)key;
 
804
         key_dbt.size = strlen(key) + 1;
 
805
         PROF_START_MEASUREMENT(SGE_PROF_SPOOLINGIO);
 
806
         dbret = dbc->c_get(dbc, &key_dbt, &data_dbt, DB_SET_RANGE);
 
807
         PROF_STOP_MEASUREMENT(SGE_PROF_SPOOLINGIO);
 
808
         done = false;
 
809
         while (!done) {
 
810
            if (dbret != 0 && dbret != DB_NOTFOUND) {
 
811
               spool_berkeleydb_handle_bdb_error(answer_list, info, dbret);
 
812
               answer_list_add_sprintf(answer_list, STATUS_EUNKNOWN, 
 
813
                                       ANSWER_QUALITY_ERROR, 
 
814
                                       MSG_BERKELEY_QUERYERROR_SIS,
 
815
                                       key, dbret, db_strerror(dbret));
 
816
               ret = false;
 
817
               done = true;
 
818
               break;
 
819
            } else if (dbret == DB_NOTFOUND) {
 
820
               DPRINTF(("last record reached\n"));
 
821
               done = true;
 
822
               break;
 
823
            } else if (key_dbt.data != NULL && 
 
824
                       strncmp(key_dbt.data, key, strlen(key)) 
 
825
                       != 0) {
 
826
               DPRINTF(("current key is %s\n", key_dbt.data));
 
827
               DPRINTF(("last record of this object type reached\n"));
 
828
               done = true;
 
829
               break;
 
830
            } else {
 
831
               sge_pack_buffer pb;
 
832
               lListElem *object = NULL;
 
833
               int cull_ret;
 
834
 
 
835
               DPRINTF(("read object with key "SFQ", size %d\n", 
 
836
                        key_dbt.data, data_dbt.size));
 
837
               cull_ret = init_packbuffer_from_buffer(&pb, data_dbt.data, 
 
838
                                                      data_dbt.size);
 
839
               if (cull_ret != PACK_SUCCESS) {
 
840
                  answer_list_add_sprintf(answer_list, STATUS_EUNKNOWN, 
 
841
                                          ANSWER_QUALITY_ERROR, 
 
842
                                          MSG_BERKELEY_UNPACKINITERROR_SS,
 
843
                                          key_dbt.data,
 
844
                                          cull_pack_strerror(cull_ret));
 
845
                  ret = false;
 
846
                  done = true;
 
847
                  break;
 
848
               }
 
849
 
 
850
               cull_ret = cull_unpack_elem_partial(&pb, &object, descr, pack_part);
 
851
               if (cull_ret != PACK_SUCCESS) {
 
852
                  answer_list_add_sprintf(answer_list, STATUS_EUNKNOWN, 
 
853
                                          ANSWER_QUALITY_ERROR, 
 
854
                                          MSG_BERKELEY_UNPACKERROR_SS,
 
855
                                          key_dbt.data,
 
856
                                          cull_pack_strerror(cull_ret));
 
857
                  ret = false;
 
858
                  done = true;
 
859
                  break;
 
860
               }
 
861
               /* we may not free the packbuffer: it references the buffer
 
862
                * delivered from the database
 
863
                * clear_packbuffer(&pb);
 
864
                */
 
865
               if (object != NULL) {
 
866
                  if (*list == NULL) {
 
867
                     *list = lCreateList(key, descr);
 
868
                  }
 
869
                  lAppendElem(*list, object);
 
870
               }
 
871
 
 
872
               /* get next record */
 
873
               PROF_START_MEASUREMENT(SGE_PROF_SPOOLINGIO);
 
874
               dbret = dbc->c_get(dbc, &key_dbt, &data_dbt, DB_NEXT);
 
875
               PROF_STOP_MEASUREMENT(SGE_PROF_SPOOLINGIO);
 
876
            }
 
877
         }
 
878
         PROF_START_MEASUREMENT(SGE_PROF_SPOOLINGIO);
 
879
         dbc->c_close(dbc);
 
880
         PROF_STOP_MEASUREMENT(SGE_PROF_SPOOLINGIO);
 
881
      }
 
882
   }
 
883
 
 
884
   DEXIT;
 
885
   return ret;
 
886
}
 
887
 
 
888
bool 
 
889
spool_berkeleydb_write_object(lList **answer_list, bdb_info info,
 
890
                              const bdb_database database,
 
891
                              const lListElem *object, const char *key)
 
892
{
 
893
   bool ret = true;
 
894
   lList *tmp_list = NULL;
 
895
 
 
896
   DENTER(TOP_LAYER, "spool_berkeleydb_write_object");
 
897
 
 
898
   /* do not spool free elems. If a free elem is passed, put a copy 
 
899
    * into a temporary list and spool this copy.
 
900
    */
 
901
   if (object->status == FREE_ELEM) {
 
902
      tmp_list = lCreateList("tmp", object->descr);
 
903
      lAppendElem(tmp_list, (lListElem *)object);
 
904
   }
 
905
 
 
906
   {
 
907
      sge_pack_buffer pb;
 
908
      int cull_ret;
 
909
 
 
910
      cull_ret = init_packbuffer(&pb, 8192, 0);
 
911
      if (cull_ret != PACK_SUCCESS) {
 
912
         answer_list_add_sprintf(answer_list, STATUS_EUNKNOWN, 
 
913
                                 ANSWER_QUALITY_ERROR, 
 
914
                                 MSG_BERKELEY_PACKINITERROR_SS,
 
915
                                 key,
 
916
                                 cull_pack_strerror(cull_ret));
 
917
         ret = false;
 
918
      } else {
 
919
         cull_ret = cull_pack_elem_partial(&pb, object, NULL, pack_part);
 
920
         if (cull_ret != PACK_SUCCESS) {
 
921
            answer_list_add_sprintf(answer_list, STATUS_EUNKNOWN, 
 
922
                                    ANSWER_QUALITY_ERROR, 
 
923
                                    MSG_BERKELEY_PACKERROR_SS,
 
924
                                    key,
 
925
                                    cull_pack_strerror(cull_ret));
 
926
            ret = false;
 
927
         } else { 
 
928
            int dbret;
 
929
            DBT key_dbt, data_dbt;
 
930
 
 
931
            DB *db = bdb_get_db(info, database);
 
932
            DB_TXN *txn = bdb_get_txn(info);
 
933
 
 
934
            if (db == NULL) {
 
935
               answer_list_add_sprintf(answer_list, STATUS_EUNKNOWN, 
 
936
                                       ANSWER_QUALITY_ERROR, 
 
937
                                       MSG_BERKELEY_NOCONNECTIONOPEN_S,
 
938
                                       bdb_get_database_name(database));
 
939
               spool_berkeleydb_error_close(info);
 
940
               ret = false;
 
941
            }
 
942
 
 
943
            if (ret) {
 
944
               memset(&key_dbt, 0, sizeof(key_dbt));
 
945
               memset(&data_dbt, 0, sizeof(data_dbt));
 
946
               key_dbt.data = (void *)key;
 
947
               key_dbt.size = strlen(key) + 1;
 
948
               data_dbt.data = pb.head_ptr;
 
949
               data_dbt.size = pb.bytes_used;
 
950
 
 
951
               DPRINTF(("storing object with key "SFQ", size = %d "
 
952
                        "to env = %p, db = %p, txn = %p, txn_id = %d\n",
 
953
                        key, data_dbt.size, bdb_get_env(info), db,
 
954
                        txn, (txn->id == NULL) ? 0 : txn->id(txn)));
 
955
 
 
956
               /* Store a key/data pair. */
 
957
               PROF_START_MEASUREMENT(SGE_PROF_SPOOLINGIO);
 
958
               dbret = db->put(db, txn, &key_dbt, &data_dbt, 0);
 
959
               PROF_STOP_MEASUREMENT(SGE_PROF_SPOOLINGIO);
 
960
 
 
961
               if (dbret != 0) {
 
962
                  spool_berkeleydb_handle_bdb_error(answer_list, info, dbret);
 
963
                  answer_list_add_sprintf(answer_list, STATUS_EUNKNOWN, 
 
964
                                          ANSWER_QUALITY_ERROR, 
 
965
                                          MSG_BERKELEY_PUTERROR_SIS,
 
966
                                          key, dbret, db_strerror(dbret));
 
967
                  ret = false;
 
968
               } else {
 
969
                  DEBUG((SGE_EVENT, "stored object with key "SFQ", size %d\n",
 
970
                         key, data_dbt.size));
 
971
               }
 
972
            }
 
973
         }
 
974
 
 
975
         clear_packbuffer(&pb);
 
976
      }
 
977
   }
 
978
 
 
979
   if (tmp_list != NULL) {
 
980
      lDechainElem(tmp_list, (lListElem *)object);
 
981
      lFreeList(&tmp_list);
 
982
   }
 
983
 
 
984
   DEXIT;
 
985
   return ret;
 
986
}
 
987
 
 
988
bool spool_berkeleydb_write_string(lList **answer_list, bdb_info info,
 
989
                              const bdb_database database,
 
990
                              const char *key, const char *str)
 
991
{
 
992
   bool ret = true;
 
993
 
 
994
   DENTER(TOP_LAYER, "spool_berkeleydb_write_string");
 
995
 
 
996
   {
 
997
      int dbret;
 
998
      DBT key_dbt, data_dbt;
 
999
 
 
1000
      DB *db = bdb_get_db(info, database);
 
1001
      DB_TXN *txn = bdb_get_txn(info);
 
1002
 
 
1003
      if (db == NULL) {
 
1004
         answer_list_add_sprintf(answer_list, STATUS_EUNKNOWN, 
 
1005
                                 ANSWER_QUALITY_ERROR, 
 
1006
                                 MSG_BERKELEY_NOCONNECTIONOPEN_S,
 
1007
                                 bdb_get_database_name(database));
 
1008
         spool_berkeleydb_error_close(info);
 
1009
         ret = false;
 
1010
      } else {
 
1011
         memset(&key_dbt, 0, sizeof(key_dbt));
 
1012
         memset(&data_dbt, 0, sizeof(data_dbt));
 
1013
         key_dbt.data = (void *)key;
 
1014
         key_dbt.size = strlen(key) + 1;
 
1015
         data_dbt.data = (void *) str;
 
1016
         data_dbt.size = strlen(str) + 1;
 
1017
 
 
1018
         DPRINTF(("storing string with key "SFQ", size = %d "
 
1019
                  "to env = %p, db = %p, txn = %p, txn_id = %d\n", 
 
1020
                  key, data_dbt.size, bdb_get_env(info), db, 
 
1021
                  txn, (txn->id == NULL) ? 0 : txn->id(txn)));
 
1022
 
 
1023
         /* Store a key/data pair. */
 
1024
         PROF_START_MEASUREMENT(SGE_PROF_SPOOLINGIO);
 
1025
         dbret = db->put(db, txn, &key_dbt, &data_dbt, 0);
 
1026
         PROF_STOP_MEASUREMENT(SGE_PROF_SPOOLINGIO);
 
1027
 
 
1028
         if (dbret != 0) {
 
1029
            spool_berkeleydb_handle_bdb_error(answer_list, info, dbret);
 
1030
            answer_list_add_sprintf(answer_list, STATUS_EUNKNOWN, 
 
1031
                                    ANSWER_QUALITY_ERROR, 
 
1032
                                    MSG_BERKELEY_PUTERROR_SIS,
 
1033
                                    key, dbret, db_strerror(dbret));
 
1034
            ret = false;
 
1035
         } else {
 
1036
            DEBUG((SGE_EVENT, "stored object with key "SFQ", size %d\n",
 
1037
                   key, data_dbt.size));
 
1038
         }
 
1039
      }
 
1040
   }
 
1041
 
 
1042
   DEXIT;
 
1043
   return ret;
 
1044
}
 
1045
 
 
1046
bool
 
1047
spool_berkeleydb_write_pe_task(lList **answer_list, bdb_info info,
 
1048
                               const lListElem *object, 
 
1049
                               u_long32 job_id, u_long32 ja_task_id,
 
1050
                               const char *pe_task_id)
 
1051
{
 
1052
   bool ret = true;
 
1053
   dstring dbkey_dstring;
 
1054
   char dbkey_buffer[MAX_STRING_SIZE];
 
1055
   const char *dbkey;
 
1056
 
 
1057
   sge_dstring_init(&dbkey_dstring, 
 
1058
                    dbkey_buffer, sizeof(dbkey_buffer));
 
1059
 
 
1060
   dbkey = sge_dstring_sprintf(&dbkey_dstring, "%s:%8d.%8d %s", 
 
1061
                               object_type_get_name(SGE_TYPE_PETASK),
 
1062
                               job_id, ja_task_id, pe_task_id);
 
1063
 
 
1064
   ret = spool_berkeleydb_write_object(answer_list, info, BDB_JOB_DB,
 
1065
                                       object, dbkey);
 
1066
 
 
1067
   return ret;
 
1068
}
 
1069
 
 
1070
bool
 
1071
spool_berkeleydb_write_ja_task(lList **answer_list, bdb_info info,
 
1072
                               const lListElem *object, 
 
1073
                               u_long32 job_id, u_long32 ja_task_id)
 
1074
{
 
1075
   bool ret = true;
 
1076
   dstring dbkey_dstring;
 
1077
   char dbkey_buffer[MAX_STRING_SIZE];
 
1078
   const char *dbkey;
 
1079
   lList *tmp_list = NULL;
 
1080
 
 
1081
   sge_dstring_init(&dbkey_dstring,
 
1082
                    dbkey_buffer, sizeof(dbkey_buffer));
 
1083
 
 
1084
   dbkey = sge_dstring_sprintf(&dbkey_dstring, "%s:%8d.%8d", 
 
1085
                               object_type_get_name(SGE_TYPE_JATASK),
 
1086
                               job_id, ja_task_id);
 
1087
 
 
1088
   lXchgList((lListElem *)object, JAT_task_list, &tmp_list);
 
1089
   ret = spool_berkeleydb_write_object(answer_list, info, BDB_JOB_DB,
 
1090
                                       object, dbkey);
 
1091
   lXchgList((lListElem *)object, JAT_task_list, &tmp_list);
 
1092
 
 
1093
   return ret;
 
1094
}
 
1095
 
 
1096
bool
 
1097
spool_berkeleydb_write_job(lList **answer_list, bdb_info info,
 
1098
                           const lListElem *object, 
 
1099
                           u_long32 job_id, bool only_job)
 
1100
{
 
1101
   bool ret = true;
 
1102
   dstring dbkey_dstring;
 
1103
   char dbkey_buffer[MAX_STRING_SIZE];
 
1104
   const char *dbkey;
 
1105
   lList *tmp_list = NULL;
 
1106
 
 
1107
   sge_dstring_init(&dbkey_dstring,
 
1108
                    dbkey_buffer, sizeof(dbkey_buffer));
 
1109
 
 
1110
   dbkey = sge_dstring_sprintf(&dbkey_dstring, "%s:%8d", 
 
1111
                               object_type_get_name(SGE_TYPE_JOB), 
 
1112
                               job_id);
 
1113
 
 
1114
   lXchgList((lListElem *)object, JB_ja_tasks, &tmp_list);
 
1115
   
 
1116
   ret = spool_berkeleydb_write_object(answer_list, info, BDB_JOB_DB,
 
1117
                                       object, dbkey);
 
1118
 
 
1119
   lXchgList((lListElem *)object, JB_ja_tasks, &tmp_list);
 
1120
 
 
1121
   if (ret && !only_job) {
 
1122
      lListElem *ja_task;
 
1123
      for_each(ja_task, lGetList(object, JB_ja_tasks)) {
 
1124
         ret = spool_berkeleydb_write_ja_task(answer_list, info,
 
1125
                                              ja_task,
 
1126
                                              job_id, 
 
1127
                                              lGetUlong(ja_task, 
 
1128
                                                        JAT_task_number));
 
1129
         if (!ret) {
 
1130
            break;
 
1131
         }
 
1132
      }
 
1133
   }
 
1134
 
 
1135
   return ret;
 
1136
}
 
1137
 
 
1138
bool
 
1139
spool_berkeleydb_write_cqueue(lList **answer_list, bdb_info info, 
 
1140
                              const lListElem *object, const char *key)
 
1141
{
 
1142
   bool ret = true;
 
1143
   dstring dbkey_dstring;
 
1144
   char dbkey_buffer[MAX_STRING_SIZE];
 
1145
   const char *dbkey;
 
1146
   lList *tmp_list = NULL;
 
1147
 
 
1148
   sge_dstring_init(&dbkey_dstring,
 
1149
                    dbkey_buffer, sizeof(dbkey_buffer));
 
1150
 
 
1151
   dbkey = sge_dstring_sprintf(&dbkey_dstring, "%s:%s", 
 
1152
                               object_type_get_name(SGE_TYPE_CQUEUE), 
 
1153
                               key);
 
1154
 
 
1155
   lXchgList((lListElem *)object, CQ_qinstances, &tmp_list);
 
1156
   
 
1157
   ret = spool_berkeleydb_write_object(answer_list, info, BDB_CONFIG_DB,
 
1158
                                       object, dbkey);
 
1159
 
 
1160
   lXchgList((lListElem *)object, CQ_qinstances, &tmp_list);
 
1161
 
 
1162
   return ret;
 
1163
}
 
1164
 
 
1165
/****** spool/berkeleydb/spool_berkeleydb_delete_object() **********************
 
1166
*  NAME
 
1167
*     spool_berkeleydb_delete_object() -- delete one or multiple objects
 
1168
*
 
1169
*  SYNOPSIS
 
1170
*     bool 
 
1171
*     spool_berkeleydb_delete_object(lList **answer_list, bdb_info info,
 
1172
*                                    const char *key, bool sub_objects) 
 
1173
*
 
1174
*  FUNCTION
 
1175
*     If sub_objects = false, deletes the object specified by key.
 
1176
*     If sub_objects = true, key will be used as pattern to delete multiple
 
1177
*     objects.
 
1178
*
 
1179
*  INPUTS
 
1180
*     lList **answer_list   - used to return error messages
 
1181
*     bdb_info info - database handle
 
1182
*     const char *key       - key
 
1183
*     bool sub_objects      - use key as pattern?
 
1184
*
 
1185
*  RESULT
 
1186
*     bool - true on success, else false
 
1187
*
 
1188
*  NOTES
 
1189
*     MT-NOTE: spool_berkeleydb_delete_object() is MT safe 
 
1190
*******************************************************************************/
 
1191
bool
 
1192
spool_berkeleydb_delete_object(lList **answer_list, bdb_info info, 
 
1193
                               const bdb_database database,
 
1194
                               const char *key, bool sub_objects)
 
1195
{
 
1196
   bool ret = true;
 
1197
 
 
1198
   int dbret;
 
1199
 
 
1200
   DB *db;
 
1201
   DB_TXN *txn;
 
1202
 
 
1203
   DENTER(TOP_LAYER, "spool_berkeleydb_delete_object");
 
1204
 
 
1205
   db = bdb_get_db(info, database);
 
1206
   txn = bdb_get_txn(info);
 
1207
 
 
1208
   if (db == NULL) {
 
1209
      answer_list_add_sprintf(answer_list, STATUS_EUNKNOWN, 
 
1210
                              ANSWER_QUALITY_ERROR, 
 
1211
                              MSG_BERKELEY_NOCONNECTIONOPEN_S,
 
1212
                              bdb_get_database_name(database));
 
1213
      spool_berkeleydb_error_close(info);
 
1214
      ret = false;
 
1215
   } else {
 
1216
      if (sub_objects) {
 
1217
         DBC *dbc;
 
1218
 
 
1219
         DPRINTF(("querying objects with keys %s*\n", key));
 
1220
 
 
1221
         PROF_START_MEASUREMENT(SGE_PROF_SPOOLINGIO);
 
1222
         dbret = db->cursor(db, txn, &dbc, 0);
 
1223
         PROF_STOP_MEASUREMENT(SGE_PROF_SPOOLINGIO);
 
1224
         if (dbret != 0) {
 
1225
            spool_berkeleydb_handle_bdb_error(answer_list, info, dbret);
 
1226
            answer_list_add_sprintf(answer_list, STATUS_EUNKNOWN, 
 
1227
                                    ANSWER_QUALITY_ERROR, 
 
1228
                                    MSG_BERKELEY_CANNOTCREATECURSOR_IS,
 
1229
                                    dbret, db_strerror(dbret));
 
1230
            ret = false;
 
1231
         } else {
 
1232
            bool done;
 
1233
            DBT cursor_dbt, data_dbt;
 
1234
            /* initialize query to first record for this object type */
 
1235
            memset(&cursor_dbt, 0, sizeof(cursor_dbt));
 
1236
            memset(&data_dbt, 0, sizeof(data_dbt));
 
1237
            cursor_dbt.data = (void *)key;
 
1238
            cursor_dbt.size = strlen(key) + 1;
 
1239
            PROF_START_MEASUREMENT(SGE_PROF_SPOOLINGIO);
 
1240
            dbret = dbc->c_get(dbc, &cursor_dbt, &data_dbt, DB_SET_RANGE);
 
1241
            PROF_STOP_MEASUREMENT(SGE_PROF_SPOOLINGIO);
 
1242
            done = false;
 
1243
            while (!done) {
 
1244
               if (dbret != 0 && dbret != DB_NOTFOUND) {
 
1245
                  spool_berkeleydb_handle_bdb_error(answer_list, info, dbret);
 
1246
                  answer_list_add_sprintf(answer_list, STATUS_EUNKNOWN, 
 
1247
                                          ANSWER_QUALITY_ERROR, 
 
1248
                                          MSG_BERKELEY_QUERYERROR_SIS,
 
1249
                                          key, dbret, db_strerror(dbret));
 
1250
                  ret = false;
 
1251
                  done = true;
 
1252
                  break;
 
1253
               } else if (dbret == DB_NOTFOUND) {
 
1254
                  DPRINTF(("last record reached\n"));
 
1255
                  done = true;
 
1256
                  break;
 
1257
               } else if (cursor_dbt.data != NULL && 
 
1258
                          strncmp(cursor_dbt.data, key, strlen(key)) 
 
1259
                          != 0) {
 
1260
                  DPRINTF(("current key is %s\n", cursor_dbt.data));
 
1261
                  DPRINTF(("last record of this object type reached\n"));
 
1262
                  done = true;
 
1263
                  break;
 
1264
               } else {
 
1265
                  int delete_ret;
 
1266
                  DBT delete_dbt;
 
1267
 
 
1268
                  /* remember key of record to delete */
 
1269
                  memset(&delete_dbt, 0, sizeof(delete_dbt));
 
1270
                  delete_dbt.data = strdup(cursor_dbt.data);
 
1271
                  delete_dbt.size = cursor_dbt.size;
 
1272
 
 
1273
                  /* switch cursor to next position */
 
1274
                  memset(&cursor_dbt, 0, sizeof(cursor_dbt));
 
1275
                  memset(&data_dbt, 0, sizeof(data_dbt));
 
1276
                  PROF_START_MEASUREMENT(SGE_PROF_SPOOLINGIO);
 
1277
                  dbret = dbc->c_get(dbc, &cursor_dbt, &data_dbt, DB_NEXT);
 
1278
                  PROF_STOP_MEASUREMENT(SGE_PROF_SPOOLINGIO);
 
1279
 
 
1280
                  /* delete record with stored key */
 
1281
                  PROF_START_MEASUREMENT(SGE_PROF_SPOOLINGIO);
 
1282
                  delete_ret = db->del(db, txn, &delete_dbt, 0);
 
1283
                  PROF_STOP_MEASUREMENT(SGE_PROF_SPOOLINGIO);
 
1284
                  if (delete_ret != 0) {
 
1285
                     answer_list_add_sprintf(answer_list, STATUS_EUNKNOWN, 
 
1286
                                             ANSWER_QUALITY_ERROR, 
 
1287
                                             MSG_BERKELEY_DELETEERROR_SIS,
 
1288
                                             delete_dbt.data,
 
1289
                                             delete_ret, db_strerror(delete_ret));
 
1290
                     ret = false;
 
1291
                     free(delete_dbt.data);
 
1292
                     done = true;
 
1293
                     break;
 
1294
                  } else {
 
1295
                     DEBUG((SGE_EVENT, "deleted record with key "SFQ"\n", (char *)delete_dbt.data));
 
1296
                  }
 
1297
                  free(delete_dbt.data);
 
1298
               }
 
1299
            }
 
1300
            PROF_START_MEASUREMENT(SGE_PROF_SPOOLINGIO);
 
1301
            dbc->c_close(dbc);
 
1302
            PROF_STOP_MEASUREMENT(SGE_PROF_SPOOLINGIO);
 
1303
         }
 
1304
      } else {
 
1305
         DBT delete_dbt;
 
1306
         memset(&delete_dbt, 0, sizeof(delete_dbt));
 
1307
         delete_dbt.data = (void *)key;
 
1308
         delete_dbt.size = strlen(key) + 1;
 
1309
         PROF_START_MEASUREMENT(SGE_PROF_SPOOLINGIO);
 
1310
         dbret = db->del(db, txn, &delete_dbt, 0);
 
1311
         PROF_STOP_MEASUREMENT(SGE_PROF_SPOOLINGIO);
 
1312
         if (dbret != 0 /* && dbret != DB_NOTFOUND */) {
 
1313
            spool_berkeleydb_handle_bdb_error(answer_list, info, dbret);
 
1314
            answer_list_add_sprintf(answer_list, STATUS_EUNKNOWN, 
 
1315
                                    ANSWER_QUALITY_ERROR, 
 
1316
                                    MSG_BERKELEY_DELETEERROR_SIS,
 
1317
                                    key, dbret, db_strerror(dbret));
 
1318
            ret = false;
 
1319
         } else {
 
1320
            DEBUG((SGE_EVENT, "deleted record with key "SFQ"\n", key));
 
1321
         }
 
1322
      }
 
1323
   }
 
1324
 
 
1325
   DEXIT;
 
1326
   return ret;
 
1327
}
 
1328
 
 
1329
/****** spool/berkeleydb/spool_berkeleydb_delete_pe_task() *********************
 
1330
*  NAME
 
1331
*     spool_berkeleydb_delete_pe_task() -- delete one or multiple pe task(s)
 
1332
*
 
1333
*  SYNOPSIS
 
1334
*     bool 
 
1335
*     spool_berkeleydb_delete_pe_task(lList **answer_list, bdb_info info,
 
1336
*                                     const char *key, bool sub_objects) 
 
1337
*
 
1338
*  FUNCTION
 
1339
*     Deletes one or multiple pe_tasks specified by key.
 
1340
*  
 
1341
*     The key has the form "<job_id>.<ja_task_id> <pe_task_id>" formatted as 
 
1342
*     "%8d.%8d %s".
 
1343
*     If sub_objects = true, it can be used as pattern, typically used to 
 
1344
*     delete all pe_tasks of a certain ja_task by setting key to 
 
1345
*     "<job_id>.<ja_task_id>" or just "<job_id>" to delete all pe_tasks
 
1346
*     dependent on a certain job.
 
1347
*
 
1348
*  INPUTS
 
1349
*     lList **answer_list   - used to return error messages
 
1350
*     bdb_info info - database handle
 
1351
*     const char *key       - key
 
1352
*     bool sub_objects      - interpret key as pattern?
 
1353
*
 
1354
*  RESULT
 
1355
*     bool - true on success, else false
 
1356
*
 
1357
*  NOTES
 
1358
*     MT-NOTE: spool_berkeleydb_delete_pe_task() is MT safe 
 
1359
*
 
1360
*  SEE ALSO
 
1361
*     spool/berkeleydb/spool_berkeleydb_delete_object()
 
1362
*******************************************************************************/
 
1363
bool
 
1364
spool_berkeleydb_delete_pe_task(lList **answer_list, bdb_info info,
 
1365
                                const char *key, bool sub_objects)
 
1366
{
 
1367
   bool ret = true;
 
1368
 
 
1369
   dstring dbkey_dstring;
 
1370
   char dbkey_buffer[MAX_STRING_SIZE];
 
1371
   const char *dbkey;
 
1372
   const char *table_name;
 
1373
 
 
1374
   sge_dstring_init(&dbkey_dstring, dbkey_buffer, sizeof(dbkey_buffer));
 
1375
   table_name = object_type_get_name(SGE_TYPE_PETASK);
 
1376
   dbkey = sge_dstring_sprintf(&dbkey_dstring, "%s:%s", table_name, key);
 
1377
   ret = spool_berkeleydb_delete_object(answer_list, info, BDB_JOB_DB, 
 
1378
                                        dbkey, sub_objects);
 
1379
 
 
1380
   return ret;
 
1381
}
 
1382
 
 
1383
/****** spool/berkeleydb/spool_berkeleydb_delete_ja_task() *********************
 
1384
*  NAME
 
1385
*     spool_berkeleydb_delete_ja_task() -- delete ja_task(s)
 
1386
*
 
1387
*  SYNOPSIS
 
1388
*     bool 
 
1389
*     spool_berkeleydb_delete_ja_task(lList **answer_list, bdb_info info,
 
1390
*                                     const char *key, bool sub_objects) 
 
1391
*
 
1392
*  FUNCTION
 
1393
*     Deletes one or multiple ja_tasks specified by key.
 
1394
*     The ja_task(s) and all dependent pe_tasks are deleted.
 
1395
*  
 
1396
*     The key has the form "<job_id>.<ja_task_id>" formatted as "%8d.%8d".
 
1397
*     If sub_objects = true, it can be used as pattern, typically used to 
 
1398
*     delete all ja_tasks of a certain job by setting key to "<job_id>.".
 
1399
*
 
1400
*  INPUTS
 
1401
*     lList **answer_list   - used to return error messages
 
1402
*     bdb_info info - database handle
 
1403
*     const char *key       - key
 
1404
*     bool sub_objects      - use key as pattern?
 
1405
*
 
1406
*  RESULT
 
1407
*     bool - true on success, else false
 
1408
*
 
1409
*  NOTES
 
1410
*     MT-NOTE: spool_berkeleydb_delete_ja_task() is MT safe 
 
1411
*
 
1412
*  SEE ALSO
 
1413
*     spool/berkeleydb/spool_berkeleydb_delete_object()
 
1414
*     spool/berkeleydb/spool_berkeleydb_delete_pe_task()
 
1415
*******************************************************************************/
 
1416
bool
 
1417
spool_berkeleydb_delete_ja_task(lList **answer_list, bdb_info info,
 
1418
                                const char *key, bool sub_objects)
 
1419
{
 
1420
   bool ret = true;
 
1421
 
 
1422
   dstring dbkey_dstring;
 
1423
   char dbkey_buffer[MAX_STRING_SIZE];
 
1424
   const char *dbkey;
 
1425
   const char *table_name;
 
1426
 
 
1427
   sge_dstring_init(&dbkey_dstring, dbkey_buffer, sizeof(dbkey_buffer));
 
1428
   table_name = object_type_get_name(SGE_TYPE_JATASK);
 
1429
   dbkey = sge_dstring_sprintf(&dbkey_dstring, "%s:%s", table_name, key);
 
1430
   ret = spool_berkeleydb_delete_object(answer_list, info, BDB_JOB_DB, 
 
1431
                                        dbkey, sub_objects);
 
1432
 
 
1433
   if (ret) {
 
1434
      ret = spool_berkeleydb_delete_pe_task(answer_list, info, key, true);
 
1435
   }
 
1436
 
 
1437
   return ret;
 
1438
}
 
1439
 
 
1440
/****** spool/berkeleydb/spool_berkeleydb_delete_job() *************************
 
1441
*  NAME
 
1442
*     spool_berkeleydb_delete_job() -- delete a job
 
1443
*
 
1444
*  SYNOPSIS
 
1445
*     bool 
 
1446
*     spool_berkeleydb_delete_job(lList **answer_list, bdb_info info, 
 
1447
*                                 const char *key, bool sub_objects) 
 
1448
*
 
1449
*  FUNCTION
 
1450
*     Deletes the given job and all its ja_tasks.
 
1451
*     Key usually will be the unique job id formatted with %8d, but the function
 
1452
*     allows for some sort of pattern matching by specifying only parts of the
 
1453
*     jobid, e.g. the key "00001" will delete all jobs from 1000 to 1999, 
 
1454
*     an empty string will mean "delete all jobs", if sub_objects = true.
 
1455
*
 
1456
*  INPUTS
 
1457
*     lList **answer_list   - used to return error messages
 
1458
*     bdb_info info - database handle
 
1459
*     const char *key       - key (job_number)
 
1460
*     bool sub_objects      - is the given key a pattern?
 
1461
*
 
1462
*  RESULT
 
1463
*     bool - true on success, else false
 
1464
*
 
1465
*  NOTES
 
1466
*     MT-NOTE: spool_berkeleydb_delete_job() is MT safe 
 
1467
*
 
1468
*  SEE ALSO
 
1469
*     spool/berkeleydb/spool_berkeleydb_delete_object()
 
1470
*     spool/berkeleydb/spool_berkeleydb_delete_ja_task()
 
1471
*******************************************************************************/
 
1472
bool
 
1473
spool_berkeleydb_delete_job(lList **answer_list, bdb_info info,
 
1474
                            const char *key, bool sub_objects)
 
1475
{
 
1476
   bool ret = true;
 
1477
 
 
1478
   dstring dbkey_dstring;
 
1479
   char dbkey_buffer[MAX_STRING_SIZE];
 
1480
   const char *dbkey;
 
1481
   const char *table_name;
 
1482
 
 
1483
   sge_dstring_init(&dbkey_dstring, dbkey_buffer, sizeof(dbkey_buffer));
 
1484
   table_name = object_type_get_name(SGE_TYPE_JOB);
 
1485
   dbkey = sge_dstring_sprintf(&dbkey_dstring, "%s:%s", table_name, key);
 
1486
   ret = spool_berkeleydb_delete_object(answer_list, info, BDB_JOB_DB, 
 
1487
                                        dbkey, sub_objects);
 
1488
 
 
1489
   if (ret) {
 
1490
      ret = spool_berkeleydb_delete_ja_task(answer_list, info, key, true);
 
1491
   }
 
1492
 
 
1493
   return ret;
 
1494
}
 
1495
 
 
1496
/****** spool/berkeleydb/spool_berkeleydb_delete_cqueue() **********************
 
1497
*  NAME
 
1498
*     spool_berkeleydb_delete_cqueue() -- delete a cluster queue
 
1499
*
 
1500
*  SYNOPSIS
 
1501
*     bool 
 
1502
*     spool_berkeleydb_delete_cqueue(lList **answer_list, bdb_info info,
 
1503
*                                    const char *key) 
 
1504
*
 
1505
*  FUNCTION
 
1506
*     Deletes a cluster queue and all its queue instances.
 
1507
*
 
1508
*  INPUTS
 
1509
*     lList **answer_list   - used to return error messages
 
1510
*     bdb_info info - database handle
 
1511
*     const char *key       - key (name) of cluster queue to delete
 
1512
*
 
1513
*  RESULT
 
1514
*     bool - true on success, else false
 
1515
*
 
1516
*  NOTES
 
1517
*     MT-NOTE: spool_berkeleydb_delete_cqueue() is MT safe 
 
1518
*
 
1519
*  SEE ALSO
 
1520
*     spool/berkeleydb/spool_berkeleydb_delete_object()
 
1521
*******************************************************************************/
 
1522
bool
 
1523
spool_berkeleydb_delete_cqueue(lList **answer_list, bdb_info info,
 
1524
                               const char *key)
 
1525
{
 
1526
   bool ret = true;
 
1527
 
 
1528
   dstring dbkey_dstring;
 
1529
   char dbkey_buffer[MAX_STRING_SIZE];
 
1530
   const char *dbkey;
 
1531
   const char *table_name;
 
1532
 
 
1533
   sge_dstring_init(&dbkey_dstring, dbkey_buffer, sizeof(dbkey_buffer));
 
1534
   table_name = object_type_get_name(SGE_TYPE_CQUEUE);
 
1535
   dbkey = sge_dstring_sprintf(&dbkey_dstring, "%s:%s", table_name, key);
 
1536
   ret = spool_berkeleydb_delete_object(answer_list, info, BDB_CONFIG_DB,
 
1537
                                        dbkey, false);
 
1538
 
 
1539
   if (ret) {
 
1540
      table_name = object_type_get_name(SGE_TYPE_QINSTANCE);
 
1541
      dbkey = sge_dstring_sprintf(&dbkey_dstring, "%s:%s@", table_name, key);
 
1542
      ret = spool_berkeleydb_delete_object(answer_list, info, BDB_CONFIG_DB,
 
1543
                                           dbkey, true);
 
1544
   }
 
1545
 
 
1546
   return ret;
 
1547
}
 
1548
 
 
1549
 
 
1550
/* ---- static functions ---- */
 
1551
 
 
1552
static void 
 
1553
spool_berkeleydb_error_close(bdb_info info)
 
1554
{
 
1555
   DB_ENV *env;
 
1556
   DB     *db;
 
1557
   DB_TXN *txn;
 
1558
   bdb_database i;
 
1559
 
 
1560
   /* try to shutdown all open resources */
 
1561
   txn = bdb_get_txn(info);
 
1562
   if (txn != NULL) {
 
1563
      txn->abort(txn);
 
1564
      bdb_set_txn(info, NULL);
 
1565
   }
 
1566
 
 
1567
   for (i = BDB_CONFIG_DB; i < BDB_ALL_DBS; i++) {
 
1568
      db = bdb_get_db(info, i);
 
1569
      if (db != NULL) {
 
1570
         db->close(db, 0);
 
1571
         bdb_set_db(info, NULL, i);
 
1572
      }
 
1573
   }
 
1574
 
 
1575
   env = bdb_get_env(info);
 
1576
   if (env != NULL) {
 
1577
      env->close(env, 0);
 
1578
      bdb_set_env(info, NULL);
 
1579
   }
 
1580
}
 
1581
 
 
1582
static void
 
1583
spool_berkeleydb_handle_bdb_error(lList **answer_list, bdb_info info, 
 
1584
                                  int bdb_errno)
 
1585
{
 
1586
   /* we lost the connection to a RPC server */
 
1587
   if (bdb_errno == DB_NOSERVER || bdb_errno == DB_NOSERVER_ID) {
 
1588
      const char *server = bdb_get_server(info);
 
1589
      const char *path   = bdb_get_path(info);
 
1590
 
 
1591
      answer_list_add_sprintf(answer_list, STATUS_EUNKNOWN, 
 
1592
                              ANSWER_QUALITY_ERROR, 
 
1593
                              MSG_BERKELEY_CONNECTION_LOST_SS,
 
1594
                              server != NULL ? server : "no server defined",
 
1595
                              path != NULL ? path : "no database path defined");
 
1596
 
 
1597
      spool_berkeleydb_error_close(info);
 
1598
   } else if (bdb_errno == DB_NOSERVER_HOME) {
 
1599
      const char *server = bdb_get_server(info);
 
1600
      const char *path   = bdb_get_path(info);
 
1601
 
 
1602
      answer_list_add_sprintf(answer_list, STATUS_EUNKNOWN, 
 
1603
                              ANSWER_QUALITY_ERROR, 
 
1604
                              MSG_BERKELEY_RPCSERVERLOSTHOME_SS,
 
1605
                              server != NULL ? server : "no server defined",
 
1606
                              path != NULL ? path : "no database path defined");
 
1607
 
 
1608
      spool_berkeleydb_error_close(info);
 
1609
   } else if (bdb_errno == DB_RUNRECOVERY) {
 
1610
      answer_list_add_sprintf(answer_list, STATUS_EUNKNOWN, 
 
1611
                              ANSWER_QUALITY_ERROR, 
 
1612
                              MSG_BERKELEY_RUNRECOVERY);
 
1613
 
 
1614
      spool_berkeleydb_error_close(info);
 
1615
   }
 
1616
}
 
1617
 
 
1618
bool 
 
1619
spool_berkeleydb_check_reopen_database(lList **answer_list, 
 
1620
                                       bdb_info info)
 
1621
{
 
1622
   bool ret = true;
 
1623
   DB_ENV *env;
 
1624
 
 
1625
   DENTER(TOP_LAYER, "spool_berkeleydb_check_reopen_database");
 
1626
 
 
1627
   env = bdb_get_env(info);
 
1628
 
 
1629
   /*
 
1630
    * if environment is not set, it was either
 
1631
    * - closed due to an error condition
 
1632
    * - never open for this thread
 
1633
    * try to open it.
 
1634
    */
 
1635
   if (env == NULL) {
 
1636
      ret = spool_berkeleydb_create_environment(answer_list, info);
 
1637
 
 
1638
      if (ret) {
 
1639
         ret = spool_berkeleydb_open_database(answer_list, info, false);
 
1640
      }
 
1641
   }
 
1642
 
 
1643
   DEXIT;
 
1644
   return ret;
 
1645
}
 
1646
 
 
1647
bool 
 
1648
spool_berkeleydb_read_keys(lList **answer_list, bdb_info info,
 
1649
                           const bdb_database database,
 
1650
                           lList **list, const char *key)
 
1651
{
 
1652
   bool ret = true;
 
1653
   int dbret;
 
1654
 
 
1655
   DB *db;
 
1656
   DB_TXN *txn;
 
1657
 
 
1658
   DBT key_dbt, data_dbt;
 
1659
   DBC *dbc;
 
1660
 
 
1661
   DENTER(TOP_LAYER, "spool_berkeleydb_read_keys");
 
1662
 
 
1663
   db  = bdb_get_db(info, database);
 
1664
   txn = bdb_get_txn(info);
 
1665
 
 
1666
   if (db == NULL) {
 
1667
      answer_list_add_sprintf(answer_list, STATUS_EUNKNOWN, 
 
1668
                              ANSWER_QUALITY_ERROR, 
 
1669
                              MSG_BERKELEY_NOCONNECTIONOPEN_S,
 
1670
                              bdb_get_database_name(database));
 
1671
      ret = false;
 
1672
   } else {
 
1673
      DPRINTF(("querying objects with keys %s*\n", key));
 
1674
 
 
1675
      PROF_START_MEASUREMENT(SGE_PROF_SPOOLINGIO);
 
1676
      dbret = db->cursor(db, txn, &dbc, 0);
 
1677
      PROF_STOP_MEASUREMENT(SGE_PROF_SPOOLINGIO);
 
1678
      if (dbret != 0) {
 
1679
         spool_berkeleydb_handle_bdb_error(answer_list, info, dbret);
 
1680
         answer_list_add_sprintf(answer_list, STATUS_EUNKNOWN, 
 
1681
                                 ANSWER_QUALITY_ERROR, 
 
1682
                                 MSG_BERKELEY_CANNOTCREATECURSOR_IS,
 
1683
                                 dbret, db_strerror(dbret));
 
1684
         ret = false;
 
1685
      } else {
 
1686
         bool done;
 
1687
         /* initialize query to first record for this object type */
 
1688
         memset(&key_dbt, 0, sizeof(key_dbt));
 
1689
         memset(&data_dbt, 0, sizeof(data_dbt));
 
1690
         key_dbt.data = (void *)key;
 
1691
         key_dbt.size = strlen(key) + 1;
 
1692
         PROF_START_MEASUREMENT(SGE_PROF_SPOOLINGIO);
 
1693
         dbret = dbc->c_get(dbc, &key_dbt, &data_dbt, DB_SET_RANGE);
 
1694
         PROF_STOP_MEASUREMENT(SGE_PROF_SPOOLINGIO);
 
1695
         done = false;
 
1696
         while (!done) {
 
1697
            if (dbret != 0 && dbret != DB_NOTFOUND) {
 
1698
               spool_berkeleydb_handle_bdb_error(answer_list, info, dbret);
 
1699
               answer_list_add_sprintf(answer_list, STATUS_EUNKNOWN, 
 
1700
                                       ANSWER_QUALITY_ERROR, 
 
1701
                                       MSG_BERKELEY_QUERYERROR_SIS,
 
1702
                                       key, dbret, db_strerror(dbret));
 
1703
               ret = false;
 
1704
               done = true;
 
1705
               break;
 
1706
            } else if (dbret == DB_NOTFOUND) {
 
1707
               DPRINTF(("last record reached\n"));
 
1708
               done = true;
 
1709
               break;
 
1710
            } else if (key_dbt.data != NULL && 
 
1711
                       strncmp(key_dbt.data, key, strlen(key)) 
 
1712
                       != 0) {
 
1713
               DPRINTF(("current key is %s\n", key_dbt.data));
 
1714
               DPRINTF(("last record of this object type reached\n"));
 
1715
               done = true;
 
1716
               break;
 
1717
            } else {
 
1718
               DPRINTF(("read object with key "SFQ", size %d\n", 
 
1719
                        key_dbt.data, data_dbt.size));
 
1720
               lAddElemStr(list, STU_name, key_dbt.data, STU_Type);
 
1721
 
 
1722
               PROF_START_MEASUREMENT(SGE_PROF_SPOOLINGIO);
 
1723
               dbret = dbc->c_get(dbc, &key_dbt, &data_dbt, DB_NEXT);
 
1724
               PROF_STOP_MEASUREMENT(SGE_PROF_SPOOLINGIO);
 
1725
            }
 
1726
         }
 
1727
         PROF_START_MEASUREMENT(SGE_PROF_SPOOLINGIO);
 
1728
         dbc->c_close(dbc);
 
1729
         PROF_STOP_MEASUREMENT(SGE_PROF_SPOOLINGIO);
 
1730
      }
 
1731
   }
 
1732
 
 
1733
   DEXIT;
 
1734
   return ret;
 
1735
}
 
1736
 
 
1737
lListElem *
 
1738
spool_berkeleydb_read_object(lList **answer_list, bdb_info info,
 
1739
                             const bdb_database database,
 
1740
                             const char *key)
 
1741
{
 
1742
   lListElem *ret = NULL;
 
1743
   int dbret;
 
1744
 
 
1745
   DB *db;
 
1746
   DB_TXN *txn;
 
1747
 
 
1748
   DBT key_dbt, data_dbt;
 
1749
 
 
1750
   DENTER(TOP_LAYER, "spool_berkeleydb_read_object");
 
1751
 
 
1752
   db  = bdb_get_db(info, database);
 
1753
   txn = bdb_get_txn(info);
 
1754
 
 
1755
   if (db == NULL) {
 
1756
      answer_list_add_sprintf(answer_list, STATUS_EUNKNOWN, 
 
1757
                              ANSWER_QUALITY_ERROR, 
 
1758
                              MSG_BERKELEY_NOCONNECTIONOPEN_S,
 
1759
                              bdb_get_database_name(database));
 
1760
   } else {
 
1761
      DPRINTF(("querying object with key %s\n", key));
 
1762
 
 
1763
      /* initialize query to first record for this object type */
 
1764
      memset(&key_dbt, 0, sizeof(key_dbt));
 
1765
      key_dbt.data = (void *)key;
 
1766
      key_dbt.size = strlen(key) + 1;
 
1767
      memset(&data_dbt, 0, sizeof(data_dbt));
 
1768
      data_dbt.flags = DB_DBT_MALLOC;
 
1769
      PROF_START_MEASUREMENT(SGE_PROF_SPOOLINGIO);
 
1770
      dbret = db->get(db, txn, &key_dbt, &data_dbt, 0);
 
1771
      PROF_STOP_MEASUREMENT(SGE_PROF_SPOOLINGIO);
 
1772
      if (dbret != 0) {
 
1773
         spool_berkeleydb_handle_bdb_error(answer_list, info, dbret);
 
1774
         answer_list_add_sprintf(answer_list, STATUS_EUNKNOWN, 
 
1775
                                 ANSWER_QUALITY_ERROR, 
 
1776
                                 MSG_BERKELEY_QUERYERROR_SIS,
 
1777
                                 key, dbret, db_strerror(dbret));
 
1778
      } else {
 
1779
         sge_pack_buffer pb;
 
1780
         int cull_ret;
 
1781
         const lDescr *descr;
 
1782
 
 
1783
         DPRINTF(("read object with key "SFQ", size %d\n", 
 
1784
                  key_dbt.data, data_dbt.size));
 
1785
         cull_ret = init_packbuffer_from_buffer(&pb, data_dbt.data, 
 
1786
                                                data_dbt.size);
 
1787
         if (cull_ret != PACK_SUCCESS) {
 
1788
            answer_list_add_sprintf(answer_list, STATUS_EUNKNOWN, 
 
1789
                                    ANSWER_QUALITY_ERROR, 
 
1790
                                    MSG_BERKELEY_UNPACKINITERROR_SS,
 
1791
                                    key_dbt.data,
 
1792
                                    cull_pack_strerror(cull_ret));
 
1793
            ret = NULL;
 
1794
         }
 
1795
         DPRINTF(("init_packbuffer succeeded\n"));
 
1796
 
 
1797
         descr = object_type_get_descr(object_name_get_type(key_dbt.data));
 
1798
         cull_ret = cull_unpack_elem_partial(&pb, &ret, descr, pack_part);
 
1799
         if (cull_ret != PACK_SUCCESS) {
 
1800
            answer_list_add_sprintf(answer_list, STATUS_EUNKNOWN, 
 
1801
                                    ANSWER_QUALITY_ERROR, 
 
1802
                                    MSG_BERKELEY_UNPACKERROR_SS,
 
1803
                                    key_dbt.data,
 
1804
                                    cull_pack_strerror(cull_ret));
 
1805
            ret = NULL;
 
1806
         }
 
1807
 
 
1808
         /* We specified DB_DBT_MALLOC - BDB will malloc memory for each
 
1809
          * object found and we have to free it.
 
1810
          */
 
1811
         if (data_dbt.data != NULL) {
 
1812
            FREE(data_dbt.data);
 
1813
         }
 
1814
      }
 
1815
   }
 
1816
 
 
1817
   DRETURN(ret);
 
1818
}
 
1819
 
 
1820
char *
 
1821
spool_berkeleydb_read_string(lList **answer_list, bdb_info info,
 
1822
                             const bdb_database database,
 
1823
                             const char *key)
 
1824
{
 
1825
   char *ret = NULL;
 
1826
   int dbret;
 
1827
 
 
1828
   DB *db;
 
1829
   DB_TXN *txn;
 
1830
 
 
1831
   DBT key_dbt, data_dbt;
 
1832
 
 
1833
   DENTER(TOP_LAYER, "spool_berkeleydb_read_string");
 
1834
 
 
1835
   db  = bdb_get_db(info, database);
 
1836
   txn = bdb_get_txn(info);
 
1837
 
 
1838
   if (db == NULL) {
 
1839
      answer_list_add_sprintf(answer_list, STATUS_EUNKNOWN, 
 
1840
                              ANSWER_QUALITY_ERROR, 
 
1841
                              MSG_BERKELEY_NOCONNECTIONOPEN_S,
 
1842
                              bdb_get_database_name(database));
 
1843
   } else {
 
1844
      DPRINTF(("querying string with key %s\n", key));
 
1845
 
 
1846
      /* initialize query to first record for this object type */
 
1847
      memset(&key_dbt, 0, sizeof(key_dbt));
 
1848
      key_dbt.data = (void *)key;
 
1849
      key_dbt.size = strlen(key) + 1;
 
1850
      memset(&data_dbt, 0, sizeof(data_dbt));
 
1851
      data_dbt.flags = DB_DBT_MALLOC;
 
1852
      PROF_START_MEASUREMENT(SGE_PROF_SPOOLINGIO);
 
1853
      dbret = db->get(db, txn, &key_dbt, &data_dbt, 0);
 
1854
      PROF_STOP_MEASUREMENT(SGE_PROF_SPOOLINGIO);
 
1855
      if (dbret != 0) {
 
1856
         spool_berkeleydb_handle_bdb_error(answer_list, info, dbret);
 
1857
         answer_list_add_sprintf(answer_list, STATUS_EUNKNOWN, 
 
1858
                                 ANSWER_QUALITY_ERROR, 
 
1859
                                 MSG_BERKELEY_QUERYERROR_SIS,
 
1860
                                 key, dbret, db_strerror(dbret));
 
1861
      } else {
 
1862
         ret = (char *) data_dbt.data;
 
1863
      }
 
1864
   }
 
1865
 
 
1866
   DRETURN(ret);
 
1867
}
 
1868
 
 
1869
static bool
 
1870
spool_berkeleydb_clear_log(lList **answer_list, bdb_info info)
 
1871
{
 
1872
   bool ret = true;
 
1873
   DB_ENV *env;
 
1874
 
 
1875
   DENTER(TOP_LAYER, "spool_berkeleydb_clear_log");
 
1876
 
 
1877
   /* check connection */
 
1878
   env = bdb_get_env(info);
 
1879
   if (env == NULL) {
 
1880
      dstring dbname_dstring = DSTRING_INIT;
 
1881
      const char *dbname;
 
1882
   
 
1883
      dbname = bdb_get_dbname(info, &dbname_dstring);
 
1884
      answer_list_add_sprintf(answer_list, STATUS_EUNKNOWN, 
 
1885
                              ANSWER_QUALITY_ERROR, 
 
1886
                              MSG_BERKELEY_NOCONNECTIONOPEN_S,
 
1887
                              dbname);
 
1888
      sge_dstring_free(&dbname_dstring);
 
1889
      ret = false;
 
1890
   }
 
1891
 
 
1892
   if (ret) {
 
1893
      int dbret;
 
1894
      char **list = NULL;
 
1895
 
 
1896
      PROF_START_MEASUREMENT(SGE_PROF_SPOOLINGIO);
 
1897
      dbret = env->log_archive(env, &list, DB_ARCH_ABS);
 
1898
      PROF_STOP_MEASUREMENT(SGE_PROF_SPOOLINGIO);
 
1899
      if (dbret != 0) {
 
1900
         spool_berkeleydb_handle_bdb_error(answer_list, info, dbret);
 
1901
         answer_list_add_sprintf(answer_list, STATUS_EUNKNOWN, 
 
1902
                                 ANSWER_QUALITY_ERROR, 
 
1903
                                 MSG_BERKELEY_CANNOTRETRIEVELOGARCHIVE_IS,
 
1904
                                 dbret, db_strerror(dbret));
 
1905
         ret = false;
 
1906
      }
 
1907
 
 
1908
      if (ret && list != NULL) {
 
1909
         char **file;
 
1910
 
 
1911
         for (file = list; *file != NULL; file++) {
 
1912
            if (remove(*file) != 0) {
 
1913
               dstring error_dstring = DSTRING_INIT;
 
1914
 
 
1915
               answer_list_add_sprintf(answer_list, STATUS_EDISK, 
 
1916
                                       ANSWER_QUALITY_ERROR, 
 
1917
                                       MSG_ERRORDELETINGFILE_SS,
 
1918
                                       *file,
 
1919
                                       sge_strerror(errno, &error_dstring));
 
1920
               sge_dstring_free(&error_dstring);
 
1921
               ret = false;
 
1922
               break;
 
1923
            }
 
1924
         }
 
1925
 
 
1926
         free(list);
 
1927
      }
 
1928
   }
 
1929
 
 
1930
   DEXIT;
 
1931
   return ret;
 
1932
}
 
1933
 
 
1934
static bool
 
1935
spool_berkeleydb_trigger_rpc(lList **answer_list, bdb_info info)
 
1936
{
 
1937
   bool ret = true;
 
1938
   DB_ENV *env;
 
1939
 
 
1940
   DENTER(TOP_LAYER, "spool_berkeleydb_trigger_rpc");
 
1941
 
 
1942
   /* check connection */
 
1943
   env = bdb_get_env(info);
 
1944
   if (env == NULL) {
 
1945
      dstring dbname_dstring = DSTRING_INIT;
 
1946
      const char *dbname;
 
1947
   
 
1948
      dbname = bdb_get_dbname(info, &dbname_dstring);
 
1949
      answer_list_add_sprintf(answer_list, STATUS_EUNKNOWN, 
 
1950
                              ANSWER_QUALITY_ERROR, 
 
1951
                              MSG_BERKELEY_NOCONNECTIONOPEN_S,
 
1952
                              dbname);
 
1953
      sge_dstring_free(&dbname_dstring);
 
1954
      ret = false;
 
1955
   }
 
1956
 
 
1957
   if (ret) {
 
1958
      lList *local_answer_list = NULL;
 
1959
      lListElem *ep;
 
1960
 
 
1961
      ep = spool_berkeleydb_read_object(&local_answer_list, info, BDB_CONFIG_DB,
 
1962
                                        "..trigger_bdb_rpc_server..");
 
1963
      lFreeElem(&ep);
 
1964
      lFreeList(&local_answer_list);
 
1965
   }
 
1966
 
 
1967
   DEXIT;
 
1968
   return ret;
 
1969
}
 
1970
 
 
1971
static bool
 
1972
spool_berkeleydb_checkpoint(lList **answer_list, bdb_info info)
 
1973
{
 
1974
   bool ret = true;
 
1975
 
 
1976
   DENTER(TOP_LAYER, "spool_berkeleydb_checkpoint");
 
1977
 
 
1978
   /* only necessary for local spooling */
 
1979
   if (bdb_get_server(info) == NULL) {
 
1980
      DB_ENV *env;
 
1981
 
 
1982
      env = bdb_get_env(info);
 
1983
      if (env == NULL) {
 
1984
      dstring dbname_dstring = DSTRING_INIT;
 
1985
      const char *dbname;
 
1986
      
 
1987
         dbname = bdb_get_dbname(info, &dbname_dstring);
 
1988
         answer_list_add_sprintf(answer_list, STATUS_EUNKNOWN, 
 
1989
                                 ANSWER_QUALITY_ERROR, 
 
1990
                                 MSG_BERKELEY_NOCONNECTIONOPEN_S,
 
1991
                                 dbname);
 
1992
         sge_dstring_free(&dbname_dstring);
 
1993
         ret = false;
 
1994
      }
 
1995
 
 
1996
      if (ret) {
 
1997
         int dbret;
 
1998
 
 
1999
         PROF_START_MEASUREMENT(SGE_PROF_SPOOLINGIO);
 
2000
         dbret = env->txn_checkpoint(env, 0, 0, 0);
 
2001
         PROF_STOP_MEASUREMENT(SGE_PROF_SPOOLINGIO);
 
2002
         if (dbret != 0) {
 
2003
            spool_berkeleydb_handle_bdb_error(answer_list, info, dbret);
 
2004
            answer_list_add_sprintf(answer_list, STATUS_EUNKNOWN, 
 
2005
                                    ANSWER_QUALITY_ERROR, 
 
2006
                                    MSG_BERKELEY_CANNOTCHECKPOINT_IS,
 
2007
                                    dbret, db_strerror(dbret));
 
2008
            ret = false;
 
2009
         } 
 
2010
      }
 
2011
   }
 
2012
 
 
2013
   DEXIT;
 
2014
   return ret;
 
2015
}
 
2016