1
/*___INFO__MARK_BEGIN__*/
2
/*************************************************************************
4
* The Contents of this file are made available subject to the terms of
5
* the Sun Industry Standards Source License Version 1.2
7
* Sun Microsystems Inc., March, 2001
10
* Sun Industry Standards Source License Version 1.2
11
* =================================================
12
* The contents of this file are subject to the Sun Industry Standards
13
* Source License Version 1.2 (the "License"); You may not use this file
14
* except in compliance with the License. You may obtain a copy of the
15
* License at http://gridengine.sunsource.net/Gridengine_SISSL_license.html
17
* Software provided under this License is provided on an "AS IS" basis,
18
* WITHOUT WARRANTY OF ANY KIND, EITHER EXPRESSED OR IMPLIED, INCLUDING,
19
* WITHOUT LIMITATION, WARRANTIES THAT THE SOFTWARE IS FREE OF DEFECTS,
20
* MERCHANTABLE, FIT FOR A PARTICULAR PURPOSE, OR NON-INFRINGING.
21
* See the License for the specific provisions governing your rights and
22
* obligations concerning the Software.
24
* The Initial Developer of the Original Code is: Sun Microsystems, Inc.
26
* Copyright: 2001 by Sun Microsystems, Inc.
28
* All Rights Reserved.
30
************************************************************************/
31
/*___INFO__MARK_END__*/
36
#include "rmon/sgermon.h"
37
#include "uti/sge_log.h"
39
#include "uti/sge_profiling.h"
40
#include "uti/sge_string.h"
41
#include "uti/sge_unistd.h"
43
#include "cull/cull.h"
45
#include "sgeobj/sge_answer.h"
46
#include "sgeobj/sge_cqueue.h"
47
#include "sgeobj/sge_ja_task.h"
48
#include "sgeobj/sge_job.h"
49
#include "sgeobj/sge_object.h"
50
#include "sgeobj/sge_str.h"
53
#include "msg_common.h"
54
#include "spool/berkeleydb/msg_spoollib_berkeleydb.h"
56
#include "spool/berkeleydb/sge_bdb.h"
59
static const int pack_part = CULL_SPOOL | CULL_SUBLIST | CULL_SPOOL_PROJECT |
62
static const int pack_part = 0;
66
spool_berkeleydb_error_close(bdb_info info);
69
spool_berkeleydb_handle_bdb_error(lList **answer_list, bdb_info info,
73
spool_berkeleydb_clear_log(lList **answer_list, bdb_info info);
76
spool_berkeleydb_trigger_rpc(lList **answer_list, bdb_info info);
79
spool_berkeleydb_checkpoint(lList **answer_list, bdb_info info);
81
/****** spool/berkeleydb/spool_berkeleydb_check_version() **********************
83
* spool_berkeleydb_check_version() -- check version of shared libs
87
* spool_berkeleydb_check_version(lList **answer_list)
90
* Checks if major and minor version number returned by the db_version()
91
* library call of Berkeley DB matches the version numbers set at compile
94
* The major and minor number must be equal, the patch level may differ.
97
* lList **answer_list - used to return info and error messages
100
* bool - true, on success, else false
103
* MT-NOTE: spool_berkeleydb_check_version() is MT safe
105
*******************************************************************************/
107
spool_berkeleydb_check_version(lList **answer_list)
113
DENTER(TOP_LAYER, "spool_berkeleydb_check_version");
115
version = db_version(&major, &minor, NULL);
117
answer_list_add_sprintf(answer_list, STATUS_EUNKNOWN,
119
MSG_BERKELEY_USINGBDBVERSION_S,
122
if (major != DB_VERSION_MAJOR || minor != DB_VERSION_MINOR) {
123
answer_list_add_sprintf(answer_list, STATUS_EUNKNOWN,
124
ANSWER_QUALITY_ERROR,
125
MSG_BERKELEY_WRONGBDBVERSIONEXPECTING_SDD,
126
version, DB_VERSION_MAJOR, DB_VERSION_MINOR);
134
/****** spool/berkeleydb/spool_berkeleydb_create_environment() *****************
136
* spool_berkeleydb_create_environment() -- ???
139
* bool spool_berkeleydb_create_environment(lList **answer_list, struct
140
* bdb_info info, const char *url)
146
* lList **answer_list - ???
147
* bdb_info info - ???
148
* const char *url - ???
157
* MT-NOTE: spool_berkeleydb_create_environment() is not MT safe
164
*******************************************************************************/
165
bool spool_berkeleydb_create_environment(lList **answer_list,
170
const char *server, *path;
174
DENTER(TOP_LAYER, "spool_berkeleydb_create_environment");
176
server = bdb_get_server(info);
177
path = bdb_get_path(info);
179
/* check database directory (only in case of local spooling) */
180
if (server == NULL && !sge_is_directory(path)) {
181
answer_list_add_sprintf(answer_list, STATUS_EUNKNOWN,
182
ANSWER_QUALITY_ERROR,
183
MSG_BERKELEY_DATABASEDIRDOESNTEXIST_S,
189
/* we have to lock the info structure, as multiple threads might try
190
* to open the env in parallel.
194
/* check, if env has been initialized in the meantime */
195
env = bdb_get_env(info);
198
/* continue only, if env isn't initialized yet */
199
if (ret && env == NULL) {
202
if (server != NULL) {
203
flags |= DB_RPCCLIENT;
206
PROF_START_MEASUREMENT(SGE_PROF_SPOOLINGIO);
207
dbret = db_env_create(&env, flags);
208
PROF_STOP_MEASUREMENT(SGE_PROF_SPOOLINGIO);
210
answer_list_add_sprintf(answer_list, STATUS_EUNKNOWN,
211
ANSWER_QUALITY_ERROR,
212
MSG_BERKELEY_COULDNTCREATEENVIRONMENT_IS,
213
dbret, db_strerror(dbret));
217
/* do deadlock detection internally (only in case of local spooling) */
218
if (ret && server == NULL) {
219
PROF_START_MEASUREMENT(SGE_PROF_SPOOLINGIO);
220
dbret = env->set_lk_detect(env, DB_LOCK_DEFAULT);
221
PROF_STOP_MEASUREMENT(SGE_PROF_SPOOLINGIO);
223
answer_list_add_sprintf(answer_list, STATUS_EUNKNOWN,
224
ANSWER_QUALITY_ERROR,
225
MSG_BERKELEY_COULDNTESETUPLOCKDETECTION_IS,
226
dbret, db_strerror(dbret));
232
* Switch off flushing of transaction log for every single transaction.
233
* This tuning option has huge impact on performance, but only a slight impact
234
* on database durability: In case of a server/filesystem crash, we might loose
235
* the last transactions committed before the crash. Still all transactions will
236
* be atomic, isolated and the database will be consistent at any time.
239
dbret = env->set_flags(env, DB_TXN_WRITE_NOSYNC, 1);
241
answer_list_add_sprintf(answer_list, STATUS_EUNKNOWN,
242
ANSWER_QUALITY_ERROR,
243
MSG_BERKELEY_CANTSETENVFLAGS_IS,
244
dbret, db_strerror(dbret));
251
* increase the cache size
254
dbret = env->set_cachesize(env, 0, 4 * 1024 * 1024, 1);
256
spool_berkeleydb_handle_bdb_error(answer_list, info, dbret);
257
answer_list_add_sprintf(answer_list, STATUS_EUNKNOWN,
258
ANSWER_QUALITY_ERROR,
259
MSG_BERKELEY_CANTSETENVCACHE_IS,
260
dbret, db_strerror(dbret));
266
/* if we use a RPC server, set it in the DB_ENV */
267
if (ret && server != NULL) {
268
PROF_START_MEASUREMENT(SGE_PROF_SPOOLINGIO);
269
dbret = env->set_rpc_server(env, NULL, server, 0, 0, 0);
270
PROF_STOP_MEASUREMENT(SGE_PROF_SPOOLINGIO);
272
answer_list_add_sprintf(answer_list, STATUS_EUNKNOWN,
273
ANSWER_QUALITY_ERROR,
274
MSG_BERKELEY_COULDNTESETRPCSERVER_IS,
275
dbret, db_strerror(dbret));
280
/* the lock parameters only can be set, if we have local spooling.
281
* RPC server: use DB_CONFIG file.
283
if (server == NULL) {
284
/* worst case scenario: n lockers, all changing m objects in
290
int locks = lockers * 2 * objects;
292
/* set locking params: max lockers */
294
PROF_START_MEASUREMENT(SGE_PROF_SPOOLINGIO);
295
dbret = env->set_lk_max_lockers(env, lockers);
296
PROF_STOP_MEASUREMENT(SGE_PROF_SPOOLINGIO);
298
answer_list_add_sprintf(answer_list, STATUS_EUNKNOWN,
299
ANSWER_QUALITY_ERROR,
300
MSG_BERKELEY_COULDNTSETLOCKERS_IS,
301
dbret, db_strerror(dbret));
305
/* set locking params: max objects */
307
PROF_START_MEASUREMENT(SGE_PROF_SPOOLINGIO);
308
dbret = env->set_lk_max_objects(env, objects);
309
PROF_STOP_MEASUREMENT(SGE_PROF_SPOOLINGIO);
311
answer_list_add_sprintf(answer_list, STATUS_EUNKNOWN,
312
ANSWER_QUALITY_ERROR,
313
MSG_BERKELEY_COULDNTSETOBJECTS_IS,
314
dbret, db_strerror(dbret));
318
/* set locking params: max locks */
320
PROF_START_MEASUREMENT(SGE_PROF_SPOOLINGIO);
321
dbret = env->set_lk_max_locks(env, locks);
322
PROF_STOP_MEASUREMENT(SGE_PROF_SPOOLINGIO);
324
answer_list_add_sprintf(answer_list, STATUS_EUNKNOWN,
325
ANSWER_QUALITY_ERROR,
326
MSG_BERKELEY_COULDNTSETLOCKS_IS,
327
dbret, db_strerror(dbret));
334
/* open the environment */
336
int flags = DB_CREATE | DB_INIT_LOCK | DB_INIT_LOG | DB_INIT_MPOOL |
339
if (server == NULL) {
343
if (bdb_get_recover(info)) {
347
PROF_START_MEASUREMENT(SGE_PROF_SPOOLINGIO);
348
dbret = env->open(env, path, flags,
350
PROF_STOP_MEASUREMENT(SGE_PROF_SPOOLINGIO);
352
answer_list_add_sprintf(answer_list, STATUS_EUNKNOWN,
353
ANSWER_QUALITY_ERROR,
354
MSG_BERKELEY_COULDNTOPENENVIRONMENT_SSIS,
355
server == NULL ? "local spooling" : server,
356
path, dbret, db_strerror(dbret));
361
bdb_set_env(info, env);
365
/* now unlock the info structure */
366
bdb_unlock_info(info);
373
spool_berkeleydb_open_database(lList **answer_list, bdb_info info,
379
DENTER(TOP_LAYER, "spool_berkeleydb_open_database");
381
for (i = BDB_CONFIG_DB; i < BDB_ALL_DBS && ret; i++) {
387
/* we have to lock info, as multiple threads might try to (re)open
388
* the database connection in parallel
392
env = bdb_get_env(info);
395
dstring dbname_dstring = DSTRING_INIT;
398
dbname = bdb_get_dbname(info, &dbname_dstring);
399
answer_list_add_sprintf(answer_list, STATUS_EUNKNOWN,
400
ANSWER_QUALITY_ERROR,
401
MSG_BERKELEY_NOCONNECTIONOPEN_S,
403
sge_dstring_free(&dbname_dstring);
407
/* check db - another thread could have opened it in the meantime */
409
db = bdb_get_db(info, i);
412
if (ret && db == NULL) {
413
/* create a database handle */
415
PROF_START_MEASUREMENT(SGE_PROF_SPOOLINGIO);
416
dbret = db_create(&db, env, 0);
417
PROF_STOP_MEASUREMENT(SGE_PROF_SPOOLINGIO);
419
answer_list_add_sprintf(answer_list, STATUS_EUNKNOWN,
420
ANSWER_QUALITY_ERROR,
421
MSG_BERKELEY_COULDNTCREATEDBHANDLE_IS,
422
dbret, db_strerror(dbret));
428
/* open database handle */
433
if (bdb_get_server(info) == NULL) {
437
/* the config db will only be created, if explicitly requested
438
* (in spoolinit). DB already existing will be handled as error.
439
* Other databases will be created as needed.
441
if (i == BDB_CONFIG_DB) {
443
flags |= DB_CREATE | DB_EXCL;
444
mode = S_IRUSR | S_IWUSR;
448
mode = S_IRUSR | S_IWUSR;
451
ret = spool_berkeleydb_start_transaction(answer_list, info);
453
const char *db_name = bdb_get_database_name(i);
454
DB_TXN *txn = bdb_get_txn(info);
455
PROF_START_MEASUREMENT(SGE_PROF_SPOOLINGIO);
456
dbret = db->open(db, txn, db_name, NULL,
457
DB_BTREE, flags, mode);
458
PROF_STOP_MEASUREMENT(SGE_PROF_SPOOLINGIO);
459
ret = spool_berkeleydb_end_transaction(answer_list, info, true);
462
spool_berkeleydb_handle_bdb_error(answer_list, info, dbret);
463
answer_list_add_sprintf(answer_list, STATUS_EUNKNOWN,
464
ANSWER_QUALITY_ERROR,
465
create ? MSG_BERKELEY_COULDNTCREATEDB_SIS
466
: MSG_BERKELEY_COULDNTOPENDB_SIS,
467
bdb_get_database_name(i),
468
dbret, db_strerror(dbret));
473
/* if everything is ok - set the database handle */
475
bdb_set_db(info, db, i);
476
DPRINTF(("opened database connection, env = %p, db = %p\n", env, db));
480
bdb_unlock_info(info);
488
spool_berkeleydb_close_database(lList **answer_list, bdb_info info)
494
/* database name for info or error output */
495
char dbname_buffer[MAX_STRING_SIZE];
496
dstring dbname_dstring = DSTRING_INIT;
499
DENTER(TOP_LAYER, "spool_berkeleydb_close_database");
501
sge_dstring_init(&dbname_dstring, dbname_buffer, sizeof(dbname_buffer));
502
dbname = bdb_get_dbname(info, &dbname_dstring);
504
/* lock the database info, multiple threads might try to close it */
506
env = bdb_get_env(info);
508
answer_list_add_sprintf(answer_list, STATUS_EUNKNOWN,
509
ANSWER_QUALITY_ERROR,
510
MSG_BERKELEY_NOCONNECTIONOPEN_S,
516
for (i = BDB_CONFIG_DB; i < BDB_ALL_DBS; i++) {
519
/* close open database */
520
db = bdb_get_db(info, i);
524
PROF_START_MEASUREMENT(SGE_PROF_SPOOLINGIO);
525
dbret = db->close(db, 0);
526
PROF_STOP_MEASUREMENT(SGE_PROF_SPOOLINGIO);
528
answer_list_add_sprintf(answer_list, STATUS_EUNKNOWN,
529
ANSWER_QUALITY_ERROR,
530
MSG_BERKELEY_COULDNTCLOSEDB_SIS,
531
bdb_get_database_name(i),
532
dbret, db_strerror(dbret));
537
bdb_set_db(info, db, i);
541
/* close env in any case, even if db->close failed */
542
PROF_START_MEASUREMENT(SGE_PROF_SPOOLINGIO);
543
dbret = env->close(env, 0);
544
PROF_STOP_MEASUREMENT(SGE_PROF_SPOOLINGIO);
546
answer_list_add_sprintf(answer_list, STATUS_EUNKNOWN,
547
ANSWER_QUALITY_ERROR,
548
MSG_BERKELEY_COULDNTCLOSEENVIRONMENT_SIS,
549
dbname, dbret, db_strerror(dbret));
552
answer_list_add_sprintf(answer_list, STATUS_EUNKNOWN,
554
MSG_BERKELEY_CLOSEDDB_S,
559
bdb_set_env(info, env);
562
bdb_unlock_info(info);
568
/****** sge_bdb/spool_berkeleydb_start_transaction() ***************************
570
* spool_berkeleydb_start_transaction() -- start a transaction
574
* spool_berkeleydb_start_transaction(lList **answer_list, bdb_info info)
577
* Starts a transaction.
578
* Transactions are bound to a certain thread, multiple threads can start
579
* transactions in parallel.
582
* lList **answer_list - used to return error messages
583
* bdb_info info - database handle
586
* bool - true on success, else false
589
* MT-NOTE: spool_berkeleydb_start_transaction() is MT safe
592
* spool/berkeleydb/spool_berkeleydb_end_transaction()
593
*******************************************************************************/
595
spool_berkeleydb_start_transaction(lList **answer_list, bdb_info info)
602
DENTER(TOP_LAYER, "spool_berkeleydb_start_transaction");
604
env = bdb_get_env(info);
605
txn = bdb_get_txn(info);
608
dstring dbname_dstring = DSTRING_INIT;
611
dbname = bdb_get_dbname(info, &dbname_dstring);
612
answer_list_add_sprintf(answer_list, STATUS_EUNKNOWN,
613
ANSWER_QUALITY_ERROR,
614
MSG_BERKELEY_NOCONNECTIONOPEN_S,
616
sge_dstring_free(&dbname_dstring);
620
answer_list_add_sprintf(answer_list, STATUS_EUNKNOWN,
621
ANSWER_QUALITY_ERROR,
622
MSG_BERKELEY_TXNALREADYOPEN);
629
* RPC server does no deadlock detection - if a lock cannot be
630
* obtained, exit immediately
632
if (bdb_get_server(info) != NULL) {
633
flags |= DB_TXN_NOWAIT;
636
PROF_START_MEASUREMENT(SGE_PROF_SPOOLINGIO);
637
dbret = env->txn_begin(env, NULL, &txn, flags);
638
PROF_STOP_MEASUREMENT(SGE_PROF_SPOOLINGIO);
640
spool_berkeleydb_handle_bdb_error(answer_list, info, dbret);
641
answer_list_add_sprintf(answer_list, STATUS_EUNKNOWN,
642
ANSWER_QUALITY_ERROR,
643
MSG_BERKELEY_ERRORSTARTINGTRANSACTION_IS,
644
dbret, db_strerror(dbret));
650
bdb_set_txn(info, txn);
651
DEBUG((SGE_EVENT, "BEGIN transaction\n"));
659
spool_berkeleydb_end_transaction(lList **answer_list, bdb_info info,
668
DENTER(TOP_LAYER, "spool_berkeleydb_end_transaction");
670
env = bdb_get_env(info);
671
txn = bdb_get_txn(info);
674
dstring dbname_dstring = DSTRING_INIT;
677
dbname = bdb_get_dbname(info, &dbname_dstring);
678
answer_list_add_sprintf(answer_list, STATUS_EUNKNOWN,
679
ANSWER_QUALITY_ERROR,
680
MSG_BERKELEY_NOCONNECTIONOPEN_S,
682
sge_dstring_free(&dbname_dstring);
686
answer_list_add_sprintf(answer_list, STATUS_EUNKNOWN,
687
ANSWER_QUALITY_ERROR,
688
MSG_BERKELEY_TXNNOTOPEN);
692
DEBUG((SGE_EVENT, "COMMIT transaction\n"));
693
PROF_START_MEASUREMENT(SGE_PROF_SPOOLINGIO);
694
dbret = txn->commit(txn, 0);
695
PROF_STOP_MEASUREMENT(SGE_PROF_SPOOLINGIO);
697
DEBUG((SGE_EVENT, "ABORT transaction\n"));
698
answer_list_add_sprintf(answer_list, STATUS_EUNKNOWN,
699
ANSWER_QUALITY_WARNING,
700
MSG_BERKELEY_ABORTINGTRANSACTION);
701
PROF_START_MEASUREMENT(SGE_PROF_SPOOLINGIO);
702
dbret = txn->abort(txn);
703
PROF_STOP_MEASUREMENT(SGE_PROF_SPOOLINGIO);
707
spool_berkeleydb_handle_bdb_error(answer_list, info, dbret);
708
answer_list_add_sprintf(answer_list, STATUS_EUNKNOWN,
709
ANSWER_QUALITY_ERROR,
710
MSG_BERKELEY_ERRORENDINGTRANSACTION_IS,
711
dbret, db_strerror(dbret));
716
bdb_set_txn(info, txn);
725
spool_berkeleydb_trigger(lList **answer_list, bdb_info info,
726
time_t trigger, time_t *next_trigger)
730
DENTER(TOP_LAYER, "spool_berkeleydb_trigger");
732
if (bdb_get_next_clear(info) <= trigger) {
734
* in the clear interval, we
735
* - clear unused transaction logs for local spooling
736
* - do a dummy request in case of RPC spooling to avoid timeouts
738
if (bdb_get_server(info) == NULL) {
739
ret = spool_berkeleydb_clear_log(answer_list, info);
741
ret = spool_berkeleydb_trigger_rpc(answer_list, info);
743
bdb_set_next_clear(info, trigger + BERKELEYDB_CLEAR_INTERVAL);
746
if (bdb_get_next_checkpoint(info) <= trigger) {
747
ret = spool_berkeleydb_checkpoint(answer_list, info);
748
bdb_set_next_checkpoint(info, trigger + BERKELEYDB_CHECKPOINT_INTERVAL);
751
/* set time of next trigger */
752
*next_trigger = MIN(bdb_get_next_clear(info), bdb_get_next_checkpoint(info));
759
spool_berkeleydb_read_list(lList **answer_list, bdb_info info,
760
const bdb_database database,
761
lList **list, const lDescr *descr,
770
DBT key_dbt, data_dbt;
773
DENTER(TOP_LAYER, "spool_berkeleydb_read_list");
775
db = bdb_get_db(info, database);
776
txn = bdb_get_txn(info);
779
answer_list_add_sprintf(answer_list, STATUS_EUNKNOWN,
780
ANSWER_QUALITY_ERROR,
781
MSG_BERKELEY_NOCONNECTIONOPEN_S,
782
bdb_get_database_name(database));
783
spool_berkeleydb_error_close(info);
786
DEBUG((SGE_EVENT, "querying objects with keys %s*\n", key));
788
PROF_START_MEASUREMENT(SGE_PROF_SPOOLINGIO);
789
dbret = db->cursor(db, txn, &dbc, 0);
790
PROF_STOP_MEASUREMENT(SGE_PROF_SPOOLINGIO);
792
spool_berkeleydb_handle_bdb_error(answer_list, info, dbret);
793
answer_list_add_sprintf(answer_list, STATUS_EUNKNOWN,
794
ANSWER_QUALITY_ERROR,
795
MSG_BERKELEY_CANNOTCREATECURSOR_IS,
796
dbret, db_strerror(dbret));
800
/* initialize query to first record for this object type */
801
memset(&key_dbt, 0, sizeof(key_dbt));
802
memset(&data_dbt, 0, sizeof(data_dbt));
803
key_dbt.data = (void *)key;
804
key_dbt.size = strlen(key) + 1;
805
PROF_START_MEASUREMENT(SGE_PROF_SPOOLINGIO);
806
dbret = dbc->c_get(dbc, &key_dbt, &data_dbt, DB_SET_RANGE);
807
PROF_STOP_MEASUREMENT(SGE_PROF_SPOOLINGIO);
810
if (dbret != 0 && dbret != DB_NOTFOUND) {
811
spool_berkeleydb_handle_bdb_error(answer_list, info, dbret);
812
answer_list_add_sprintf(answer_list, STATUS_EUNKNOWN,
813
ANSWER_QUALITY_ERROR,
814
MSG_BERKELEY_QUERYERROR_SIS,
815
key, dbret, db_strerror(dbret));
819
} else if (dbret == DB_NOTFOUND) {
820
DPRINTF(("last record reached\n"));
823
} else if (key_dbt.data != NULL &&
824
strncmp(key_dbt.data, key, strlen(key))
826
DPRINTF(("current key is %s\n", key_dbt.data));
827
DPRINTF(("last record of this object type reached\n"));
832
lListElem *object = NULL;
835
DPRINTF(("read object with key "SFQ", size %d\n",
836
key_dbt.data, data_dbt.size));
837
cull_ret = init_packbuffer_from_buffer(&pb, data_dbt.data,
839
if (cull_ret != PACK_SUCCESS) {
840
answer_list_add_sprintf(answer_list, STATUS_EUNKNOWN,
841
ANSWER_QUALITY_ERROR,
842
MSG_BERKELEY_UNPACKINITERROR_SS,
844
cull_pack_strerror(cull_ret));
850
cull_ret = cull_unpack_elem_partial(&pb, &object, descr, pack_part);
851
if (cull_ret != PACK_SUCCESS) {
852
answer_list_add_sprintf(answer_list, STATUS_EUNKNOWN,
853
ANSWER_QUALITY_ERROR,
854
MSG_BERKELEY_UNPACKERROR_SS,
856
cull_pack_strerror(cull_ret));
861
/* we may not free the packbuffer: it references the buffer
862
* delivered from the database
863
* clear_packbuffer(&pb);
865
if (object != NULL) {
867
*list = lCreateList(key, descr);
869
lAppendElem(*list, object);
872
/* get next record */
873
PROF_START_MEASUREMENT(SGE_PROF_SPOOLINGIO);
874
dbret = dbc->c_get(dbc, &key_dbt, &data_dbt, DB_NEXT);
875
PROF_STOP_MEASUREMENT(SGE_PROF_SPOOLINGIO);
878
PROF_START_MEASUREMENT(SGE_PROF_SPOOLINGIO);
880
PROF_STOP_MEASUREMENT(SGE_PROF_SPOOLINGIO);
889
spool_berkeleydb_write_object(lList **answer_list, bdb_info info,
890
const bdb_database database,
891
const lListElem *object, const char *key)
894
lList *tmp_list = NULL;
896
DENTER(TOP_LAYER, "spool_berkeleydb_write_object");
898
/* do not spool free elems. If a free elem is passed, put a copy
899
* into a temporary list and spool this copy.
901
if (object->status == FREE_ELEM) {
902
tmp_list = lCreateList("tmp", object->descr);
903
lAppendElem(tmp_list, (lListElem *)object);
910
cull_ret = init_packbuffer(&pb, 8192, 0);
911
if (cull_ret != PACK_SUCCESS) {
912
answer_list_add_sprintf(answer_list, STATUS_EUNKNOWN,
913
ANSWER_QUALITY_ERROR,
914
MSG_BERKELEY_PACKINITERROR_SS,
916
cull_pack_strerror(cull_ret));
919
cull_ret = cull_pack_elem_partial(&pb, object, NULL, pack_part);
920
if (cull_ret != PACK_SUCCESS) {
921
answer_list_add_sprintf(answer_list, STATUS_EUNKNOWN,
922
ANSWER_QUALITY_ERROR,
923
MSG_BERKELEY_PACKERROR_SS,
925
cull_pack_strerror(cull_ret));
929
DBT key_dbt, data_dbt;
931
DB *db = bdb_get_db(info, database);
932
DB_TXN *txn = bdb_get_txn(info);
935
answer_list_add_sprintf(answer_list, STATUS_EUNKNOWN,
936
ANSWER_QUALITY_ERROR,
937
MSG_BERKELEY_NOCONNECTIONOPEN_S,
938
bdb_get_database_name(database));
939
spool_berkeleydb_error_close(info);
944
memset(&key_dbt, 0, sizeof(key_dbt));
945
memset(&data_dbt, 0, sizeof(data_dbt));
946
key_dbt.data = (void *)key;
947
key_dbt.size = strlen(key) + 1;
948
data_dbt.data = pb.head_ptr;
949
data_dbt.size = pb.bytes_used;
951
DPRINTF(("storing object with key "SFQ", size = %d "
952
"to env = %p, db = %p, txn = %p, txn_id = %d\n",
953
key, data_dbt.size, bdb_get_env(info), db,
954
txn, (txn->id == NULL) ? 0 : txn->id(txn)));
956
/* Store a key/data pair. */
957
PROF_START_MEASUREMENT(SGE_PROF_SPOOLINGIO);
958
dbret = db->put(db, txn, &key_dbt, &data_dbt, 0);
959
PROF_STOP_MEASUREMENT(SGE_PROF_SPOOLINGIO);
962
spool_berkeleydb_handle_bdb_error(answer_list, info, dbret);
963
answer_list_add_sprintf(answer_list, STATUS_EUNKNOWN,
964
ANSWER_QUALITY_ERROR,
965
MSG_BERKELEY_PUTERROR_SIS,
966
key, dbret, db_strerror(dbret));
969
DEBUG((SGE_EVENT, "stored object with key "SFQ", size %d\n",
970
key, data_dbt.size));
975
clear_packbuffer(&pb);
979
if (tmp_list != NULL) {
980
lDechainElem(tmp_list, (lListElem *)object);
981
lFreeList(&tmp_list);
988
bool spool_berkeleydb_write_string(lList **answer_list, bdb_info info,
989
const bdb_database database,
990
const char *key, const char *str)
994
DENTER(TOP_LAYER, "spool_berkeleydb_write_string");
998
DBT key_dbt, data_dbt;
1000
DB *db = bdb_get_db(info, database);
1001
DB_TXN *txn = bdb_get_txn(info);
1004
answer_list_add_sprintf(answer_list, STATUS_EUNKNOWN,
1005
ANSWER_QUALITY_ERROR,
1006
MSG_BERKELEY_NOCONNECTIONOPEN_S,
1007
bdb_get_database_name(database));
1008
spool_berkeleydb_error_close(info);
1011
memset(&key_dbt, 0, sizeof(key_dbt));
1012
memset(&data_dbt, 0, sizeof(data_dbt));
1013
key_dbt.data = (void *)key;
1014
key_dbt.size = strlen(key) + 1;
1015
data_dbt.data = (void *) str;
1016
data_dbt.size = strlen(str) + 1;
1018
DPRINTF(("storing string with key "SFQ", size = %d "
1019
"to env = %p, db = %p, txn = %p, txn_id = %d\n",
1020
key, data_dbt.size, bdb_get_env(info), db,
1021
txn, (txn->id == NULL) ? 0 : txn->id(txn)));
1023
/* Store a key/data pair. */
1024
PROF_START_MEASUREMENT(SGE_PROF_SPOOLINGIO);
1025
dbret = db->put(db, txn, &key_dbt, &data_dbt, 0);
1026
PROF_STOP_MEASUREMENT(SGE_PROF_SPOOLINGIO);
1029
spool_berkeleydb_handle_bdb_error(answer_list, info, dbret);
1030
answer_list_add_sprintf(answer_list, STATUS_EUNKNOWN,
1031
ANSWER_QUALITY_ERROR,
1032
MSG_BERKELEY_PUTERROR_SIS,
1033
key, dbret, db_strerror(dbret));
1036
DEBUG((SGE_EVENT, "stored object with key "SFQ", size %d\n",
1037
key, data_dbt.size));
1047
spool_berkeleydb_write_pe_task(lList **answer_list, bdb_info info,
1048
const lListElem *object,
1049
u_long32 job_id, u_long32 ja_task_id,
1050
const char *pe_task_id)
1053
dstring dbkey_dstring;
1054
char dbkey_buffer[MAX_STRING_SIZE];
1057
sge_dstring_init(&dbkey_dstring,
1058
dbkey_buffer, sizeof(dbkey_buffer));
1060
dbkey = sge_dstring_sprintf(&dbkey_dstring, "%s:%8d.%8d %s",
1061
object_type_get_name(SGE_TYPE_PETASK),
1062
job_id, ja_task_id, pe_task_id);
1064
ret = spool_berkeleydb_write_object(answer_list, info, BDB_JOB_DB,
1071
spool_berkeleydb_write_ja_task(lList **answer_list, bdb_info info,
1072
const lListElem *object,
1073
u_long32 job_id, u_long32 ja_task_id)
1076
dstring dbkey_dstring;
1077
char dbkey_buffer[MAX_STRING_SIZE];
1079
lList *tmp_list = NULL;
1081
sge_dstring_init(&dbkey_dstring,
1082
dbkey_buffer, sizeof(dbkey_buffer));
1084
dbkey = sge_dstring_sprintf(&dbkey_dstring, "%s:%8d.%8d",
1085
object_type_get_name(SGE_TYPE_JATASK),
1086
job_id, ja_task_id);
1088
lXchgList((lListElem *)object, JAT_task_list, &tmp_list);
1089
ret = spool_berkeleydb_write_object(answer_list, info, BDB_JOB_DB,
1091
lXchgList((lListElem *)object, JAT_task_list, &tmp_list);
1097
spool_berkeleydb_write_job(lList **answer_list, bdb_info info,
1098
const lListElem *object,
1099
u_long32 job_id, bool only_job)
1102
dstring dbkey_dstring;
1103
char dbkey_buffer[MAX_STRING_SIZE];
1105
lList *tmp_list = NULL;
1107
sge_dstring_init(&dbkey_dstring,
1108
dbkey_buffer, sizeof(dbkey_buffer));
1110
dbkey = sge_dstring_sprintf(&dbkey_dstring, "%s:%8d",
1111
object_type_get_name(SGE_TYPE_JOB),
1114
lXchgList((lListElem *)object, JB_ja_tasks, &tmp_list);
1116
ret = spool_berkeleydb_write_object(answer_list, info, BDB_JOB_DB,
1119
lXchgList((lListElem *)object, JB_ja_tasks, &tmp_list);
1121
if (ret && !only_job) {
1123
for_each(ja_task, lGetList(object, JB_ja_tasks)) {
1124
ret = spool_berkeleydb_write_ja_task(answer_list, info,
1139
spool_berkeleydb_write_cqueue(lList **answer_list, bdb_info info,
1140
const lListElem *object, const char *key)
1143
dstring dbkey_dstring;
1144
char dbkey_buffer[MAX_STRING_SIZE];
1146
lList *tmp_list = NULL;
1148
sge_dstring_init(&dbkey_dstring,
1149
dbkey_buffer, sizeof(dbkey_buffer));
1151
dbkey = sge_dstring_sprintf(&dbkey_dstring, "%s:%s",
1152
object_type_get_name(SGE_TYPE_CQUEUE),
1155
lXchgList((lListElem *)object, CQ_qinstances, &tmp_list);
1157
ret = spool_berkeleydb_write_object(answer_list, info, BDB_CONFIG_DB,
1160
lXchgList((lListElem *)object, CQ_qinstances, &tmp_list);
1165
/****** spool/berkeleydb/spool_berkeleydb_delete_object() **********************
1167
* spool_berkeleydb_delete_object() -- delete one or multiple objects
1171
* spool_berkeleydb_delete_object(lList **answer_list, bdb_info info,
1172
* const char *key, bool sub_objects)
1175
* If sub_objects = false, deletes the object specified by key.
1176
* If sub_objects = true, key will be used as pattern to delete multiple
1180
* lList **answer_list - used to return error messages
1181
* bdb_info info - database handle
1182
* const char *key - key
1183
* bool sub_objects - use key as pattern?
1186
* bool - true on success, else false
1189
* MT-NOTE: spool_berkeleydb_delete_object() is MT safe
1190
*******************************************************************************/
1192
spool_berkeleydb_delete_object(lList **answer_list, bdb_info info,
1193
const bdb_database database,
1194
const char *key, bool sub_objects)
1203
DENTER(TOP_LAYER, "spool_berkeleydb_delete_object");
1205
db = bdb_get_db(info, database);
1206
txn = bdb_get_txn(info);
1209
answer_list_add_sprintf(answer_list, STATUS_EUNKNOWN,
1210
ANSWER_QUALITY_ERROR,
1211
MSG_BERKELEY_NOCONNECTIONOPEN_S,
1212
bdb_get_database_name(database));
1213
spool_berkeleydb_error_close(info);
1219
DPRINTF(("querying objects with keys %s*\n", key));
1221
PROF_START_MEASUREMENT(SGE_PROF_SPOOLINGIO);
1222
dbret = db->cursor(db, txn, &dbc, 0);
1223
PROF_STOP_MEASUREMENT(SGE_PROF_SPOOLINGIO);
1225
spool_berkeleydb_handle_bdb_error(answer_list, info, dbret);
1226
answer_list_add_sprintf(answer_list, STATUS_EUNKNOWN,
1227
ANSWER_QUALITY_ERROR,
1228
MSG_BERKELEY_CANNOTCREATECURSOR_IS,
1229
dbret, db_strerror(dbret));
1233
DBT cursor_dbt, data_dbt;
1234
/* initialize query to first record for this object type */
1235
memset(&cursor_dbt, 0, sizeof(cursor_dbt));
1236
memset(&data_dbt, 0, sizeof(data_dbt));
1237
cursor_dbt.data = (void *)key;
1238
cursor_dbt.size = strlen(key) + 1;
1239
PROF_START_MEASUREMENT(SGE_PROF_SPOOLINGIO);
1240
dbret = dbc->c_get(dbc, &cursor_dbt, &data_dbt, DB_SET_RANGE);
1241
PROF_STOP_MEASUREMENT(SGE_PROF_SPOOLINGIO);
1244
if (dbret != 0 && dbret != DB_NOTFOUND) {
1245
spool_berkeleydb_handle_bdb_error(answer_list, info, dbret);
1246
answer_list_add_sprintf(answer_list, STATUS_EUNKNOWN,
1247
ANSWER_QUALITY_ERROR,
1248
MSG_BERKELEY_QUERYERROR_SIS,
1249
key, dbret, db_strerror(dbret));
1253
} else if (dbret == DB_NOTFOUND) {
1254
DPRINTF(("last record reached\n"));
1257
} else if (cursor_dbt.data != NULL &&
1258
strncmp(cursor_dbt.data, key, strlen(key))
1260
DPRINTF(("current key is %s\n", cursor_dbt.data));
1261
DPRINTF(("last record of this object type reached\n"));
1268
/* remember key of record to delete */
1269
memset(&delete_dbt, 0, sizeof(delete_dbt));
1270
delete_dbt.data = strdup(cursor_dbt.data);
1271
delete_dbt.size = cursor_dbt.size;
1273
/* switch cursor to next position */
1274
memset(&cursor_dbt, 0, sizeof(cursor_dbt));
1275
memset(&data_dbt, 0, sizeof(data_dbt));
1276
PROF_START_MEASUREMENT(SGE_PROF_SPOOLINGIO);
1277
dbret = dbc->c_get(dbc, &cursor_dbt, &data_dbt, DB_NEXT);
1278
PROF_STOP_MEASUREMENT(SGE_PROF_SPOOLINGIO);
1280
/* delete record with stored key */
1281
PROF_START_MEASUREMENT(SGE_PROF_SPOOLINGIO);
1282
delete_ret = db->del(db, txn, &delete_dbt, 0);
1283
PROF_STOP_MEASUREMENT(SGE_PROF_SPOOLINGIO);
1284
if (delete_ret != 0) {
1285
answer_list_add_sprintf(answer_list, STATUS_EUNKNOWN,
1286
ANSWER_QUALITY_ERROR,
1287
MSG_BERKELEY_DELETEERROR_SIS,
1289
delete_ret, db_strerror(delete_ret));
1291
free(delete_dbt.data);
1295
DEBUG((SGE_EVENT, "deleted record with key "SFQ"\n", (char *)delete_dbt.data));
1297
free(delete_dbt.data);
1300
PROF_START_MEASUREMENT(SGE_PROF_SPOOLINGIO);
1302
PROF_STOP_MEASUREMENT(SGE_PROF_SPOOLINGIO);
1306
memset(&delete_dbt, 0, sizeof(delete_dbt));
1307
delete_dbt.data = (void *)key;
1308
delete_dbt.size = strlen(key) + 1;
1309
PROF_START_MEASUREMENT(SGE_PROF_SPOOLINGIO);
1310
dbret = db->del(db, txn, &delete_dbt, 0);
1311
PROF_STOP_MEASUREMENT(SGE_PROF_SPOOLINGIO);
1312
if (dbret != 0 /* && dbret != DB_NOTFOUND */) {
1313
spool_berkeleydb_handle_bdb_error(answer_list, info, dbret);
1314
answer_list_add_sprintf(answer_list, STATUS_EUNKNOWN,
1315
ANSWER_QUALITY_ERROR,
1316
MSG_BERKELEY_DELETEERROR_SIS,
1317
key, dbret, db_strerror(dbret));
1320
DEBUG((SGE_EVENT, "deleted record with key "SFQ"\n", key));
1329
/****** spool/berkeleydb/spool_berkeleydb_delete_pe_task() *********************
1331
* spool_berkeleydb_delete_pe_task() -- delete one or multiple pe task(s)
1335
* spool_berkeleydb_delete_pe_task(lList **answer_list, bdb_info info,
1336
* const char *key, bool sub_objects)
1339
* Deletes one or multiple pe_tasks specified by key.
1341
* The key has the form "<job_id>.<ja_task_id> <pe_task_id>" formatted as
1343
* If sub_objects = true, it can be used as pattern, typically used to
1344
* delete all pe_tasks of a certain ja_task by setting key to
1345
* "<job_id>.<ja_task_id>" or just "<job_id>" to delete all pe_tasks
1346
* dependent on a certain job.
1349
* lList **answer_list - used to return error messages
1350
* bdb_info info - database handle
1351
* const char *key - key
1352
* bool sub_objects - interpret key as pattern?
1355
* bool - true on success, else false
1358
* MT-NOTE: spool_berkeleydb_delete_pe_task() is MT safe
1361
* spool/berkeleydb/spool_berkeleydb_delete_object()
1362
*******************************************************************************/
1364
spool_berkeleydb_delete_pe_task(lList **answer_list, bdb_info info,
1365
const char *key, bool sub_objects)
1369
dstring dbkey_dstring;
1370
char dbkey_buffer[MAX_STRING_SIZE];
1372
const char *table_name;
1374
sge_dstring_init(&dbkey_dstring, dbkey_buffer, sizeof(dbkey_buffer));
1375
table_name = object_type_get_name(SGE_TYPE_PETASK);
1376
dbkey = sge_dstring_sprintf(&dbkey_dstring, "%s:%s", table_name, key);
1377
ret = spool_berkeleydb_delete_object(answer_list, info, BDB_JOB_DB,
1378
dbkey, sub_objects);
1383
/****** spool/berkeleydb/spool_berkeleydb_delete_ja_task() *********************
1385
* spool_berkeleydb_delete_ja_task() -- delete ja_task(s)
1389
* spool_berkeleydb_delete_ja_task(lList **answer_list, bdb_info info,
1390
* const char *key, bool sub_objects)
1393
* Deletes one or multiple ja_tasks specified by key.
1394
* The ja_task(s) and all dependent pe_tasks are deleted.
1396
* The key has the form "<job_id>.<ja_task_id>" formatted as "%8d.%8d".
1397
* If sub_objects = true, it can be used as pattern, typically used to
1398
* delete all ja_tasks of a certain job by setting key to "<job_id>.".
1401
* lList **answer_list - used to return error messages
1402
* bdb_info info - database handle
1403
* const char *key - key
1404
* bool sub_objects - use key as pattern?
1407
* bool - true on success, else false
1410
* MT-NOTE: spool_berkeleydb_delete_ja_task() is MT safe
1413
* spool/berkeleydb/spool_berkeleydb_delete_object()
1414
* spool/berkeleydb/spool_berkeleydb_delete_pe_task()
1415
*******************************************************************************/
1417
spool_berkeleydb_delete_ja_task(lList **answer_list, bdb_info info,
1418
const char *key, bool sub_objects)
1422
dstring dbkey_dstring;
1423
char dbkey_buffer[MAX_STRING_SIZE];
1425
const char *table_name;
1427
sge_dstring_init(&dbkey_dstring, dbkey_buffer, sizeof(dbkey_buffer));
1428
table_name = object_type_get_name(SGE_TYPE_JATASK);
1429
dbkey = sge_dstring_sprintf(&dbkey_dstring, "%s:%s", table_name, key);
1430
ret = spool_berkeleydb_delete_object(answer_list, info, BDB_JOB_DB,
1431
dbkey, sub_objects);
1434
ret = spool_berkeleydb_delete_pe_task(answer_list, info, key, true);
1440
/****** spool/berkeleydb/spool_berkeleydb_delete_job() *************************
1442
* spool_berkeleydb_delete_job() -- delete a job
1446
* spool_berkeleydb_delete_job(lList **answer_list, bdb_info info,
1447
* const char *key, bool sub_objects)
1450
* Deletes the given job and all its ja_tasks.
1451
* Key usually will be the unique job id formatted with %8d, but the function
1452
* allows for some sort of pattern matching by specifying only parts of the
1453
* jobid, e.g. the key "00001" will delete all jobs from 1000 to 1999,
1454
* an empty string will mean "delete all jobs", if sub_objects = true.
1457
* lList **answer_list - used to return error messages
1458
* bdb_info info - database handle
1459
* const char *key - key (job_number)
1460
* bool sub_objects - is the given key a pattern?
1463
* bool - true on success, else false
1466
* MT-NOTE: spool_berkeleydb_delete_job() is MT safe
1469
* spool/berkeleydb/spool_berkeleydb_delete_object()
1470
* spool/berkeleydb/spool_berkeleydb_delete_ja_task()
1471
*******************************************************************************/
1473
spool_berkeleydb_delete_job(lList **answer_list, bdb_info info,
1474
const char *key, bool sub_objects)
1478
dstring dbkey_dstring;
1479
char dbkey_buffer[MAX_STRING_SIZE];
1481
const char *table_name;
1483
sge_dstring_init(&dbkey_dstring, dbkey_buffer, sizeof(dbkey_buffer));
1484
table_name = object_type_get_name(SGE_TYPE_JOB);
1485
dbkey = sge_dstring_sprintf(&dbkey_dstring, "%s:%s", table_name, key);
1486
ret = spool_berkeleydb_delete_object(answer_list, info, BDB_JOB_DB,
1487
dbkey, sub_objects);
1490
ret = spool_berkeleydb_delete_ja_task(answer_list, info, key, true);
1496
/****** spool/berkeleydb/spool_berkeleydb_delete_cqueue() **********************
1498
* spool_berkeleydb_delete_cqueue() -- delete a cluster queue
1502
* spool_berkeleydb_delete_cqueue(lList **answer_list, bdb_info info,
1506
* Deletes a cluster queue and all its queue instances.
1509
* lList **answer_list - used to return error messages
1510
* bdb_info info - database handle
1511
* const char *key - key (name) of cluster queue to delete
1514
* bool - true on success, else false
1517
* MT-NOTE: spool_berkeleydb_delete_cqueue() is MT safe
1520
* spool/berkeleydb/spool_berkeleydb_delete_object()
1521
*******************************************************************************/
1523
spool_berkeleydb_delete_cqueue(lList **answer_list, bdb_info info,
1528
dstring dbkey_dstring;
1529
char dbkey_buffer[MAX_STRING_SIZE];
1531
const char *table_name;
1533
sge_dstring_init(&dbkey_dstring, dbkey_buffer, sizeof(dbkey_buffer));
1534
table_name = object_type_get_name(SGE_TYPE_CQUEUE);
1535
dbkey = sge_dstring_sprintf(&dbkey_dstring, "%s:%s", table_name, key);
1536
ret = spool_berkeleydb_delete_object(answer_list, info, BDB_CONFIG_DB,
1540
table_name = object_type_get_name(SGE_TYPE_QINSTANCE);
1541
dbkey = sge_dstring_sprintf(&dbkey_dstring, "%s:%s@", table_name, key);
1542
ret = spool_berkeleydb_delete_object(answer_list, info, BDB_CONFIG_DB,
1550
/* ---- static functions ---- */
1553
spool_berkeleydb_error_close(bdb_info info)
1560
/* try to shutdown all open resources */
1561
txn = bdb_get_txn(info);
1564
bdb_set_txn(info, NULL);
1567
for (i = BDB_CONFIG_DB; i < BDB_ALL_DBS; i++) {
1568
db = bdb_get_db(info, i);
1571
bdb_set_db(info, NULL, i);
1575
env = bdb_get_env(info);
1578
bdb_set_env(info, NULL);
1583
spool_berkeleydb_handle_bdb_error(lList **answer_list, bdb_info info,
1586
/* we lost the connection to a RPC server */
1587
if (bdb_errno == DB_NOSERVER || bdb_errno == DB_NOSERVER_ID) {
1588
const char *server = bdb_get_server(info);
1589
const char *path = bdb_get_path(info);
1591
answer_list_add_sprintf(answer_list, STATUS_EUNKNOWN,
1592
ANSWER_QUALITY_ERROR,
1593
MSG_BERKELEY_CONNECTION_LOST_SS,
1594
server != NULL ? server : "no server defined",
1595
path != NULL ? path : "no database path defined");
1597
spool_berkeleydb_error_close(info);
1598
} else if (bdb_errno == DB_NOSERVER_HOME) {
1599
const char *server = bdb_get_server(info);
1600
const char *path = bdb_get_path(info);
1602
answer_list_add_sprintf(answer_list, STATUS_EUNKNOWN,
1603
ANSWER_QUALITY_ERROR,
1604
MSG_BERKELEY_RPCSERVERLOSTHOME_SS,
1605
server != NULL ? server : "no server defined",
1606
path != NULL ? path : "no database path defined");
1608
spool_berkeleydb_error_close(info);
1609
} else if (bdb_errno == DB_RUNRECOVERY) {
1610
answer_list_add_sprintf(answer_list, STATUS_EUNKNOWN,
1611
ANSWER_QUALITY_ERROR,
1612
MSG_BERKELEY_RUNRECOVERY);
1614
spool_berkeleydb_error_close(info);
1619
spool_berkeleydb_check_reopen_database(lList **answer_list,
1625
DENTER(TOP_LAYER, "spool_berkeleydb_check_reopen_database");
1627
env = bdb_get_env(info);
1630
* if environment is not set, it was either
1631
* - closed due to an error condition
1632
* - never open for this thread
1636
ret = spool_berkeleydb_create_environment(answer_list, info);
1639
ret = spool_berkeleydb_open_database(answer_list, info, false);
1648
spool_berkeleydb_read_keys(lList **answer_list, bdb_info info,
1649
const bdb_database database,
1650
lList **list, const char *key)
1658
DBT key_dbt, data_dbt;
1661
DENTER(TOP_LAYER, "spool_berkeleydb_read_keys");
1663
db = bdb_get_db(info, database);
1664
txn = bdb_get_txn(info);
1667
answer_list_add_sprintf(answer_list, STATUS_EUNKNOWN,
1668
ANSWER_QUALITY_ERROR,
1669
MSG_BERKELEY_NOCONNECTIONOPEN_S,
1670
bdb_get_database_name(database));
1673
DPRINTF(("querying objects with keys %s*\n", key));
1675
PROF_START_MEASUREMENT(SGE_PROF_SPOOLINGIO);
1676
dbret = db->cursor(db, txn, &dbc, 0);
1677
PROF_STOP_MEASUREMENT(SGE_PROF_SPOOLINGIO);
1679
spool_berkeleydb_handle_bdb_error(answer_list, info, dbret);
1680
answer_list_add_sprintf(answer_list, STATUS_EUNKNOWN,
1681
ANSWER_QUALITY_ERROR,
1682
MSG_BERKELEY_CANNOTCREATECURSOR_IS,
1683
dbret, db_strerror(dbret));
1687
/* initialize query to first record for this object type */
1688
memset(&key_dbt, 0, sizeof(key_dbt));
1689
memset(&data_dbt, 0, sizeof(data_dbt));
1690
key_dbt.data = (void *)key;
1691
key_dbt.size = strlen(key) + 1;
1692
PROF_START_MEASUREMENT(SGE_PROF_SPOOLINGIO);
1693
dbret = dbc->c_get(dbc, &key_dbt, &data_dbt, DB_SET_RANGE);
1694
PROF_STOP_MEASUREMENT(SGE_PROF_SPOOLINGIO);
1697
if (dbret != 0 && dbret != DB_NOTFOUND) {
1698
spool_berkeleydb_handle_bdb_error(answer_list, info, dbret);
1699
answer_list_add_sprintf(answer_list, STATUS_EUNKNOWN,
1700
ANSWER_QUALITY_ERROR,
1701
MSG_BERKELEY_QUERYERROR_SIS,
1702
key, dbret, db_strerror(dbret));
1706
} else if (dbret == DB_NOTFOUND) {
1707
DPRINTF(("last record reached\n"));
1710
} else if (key_dbt.data != NULL &&
1711
strncmp(key_dbt.data, key, strlen(key))
1713
DPRINTF(("current key is %s\n", key_dbt.data));
1714
DPRINTF(("last record of this object type reached\n"));
1718
DPRINTF(("read object with key "SFQ", size %d\n",
1719
key_dbt.data, data_dbt.size));
1720
lAddElemStr(list, STU_name, key_dbt.data, STU_Type);
1722
PROF_START_MEASUREMENT(SGE_PROF_SPOOLINGIO);
1723
dbret = dbc->c_get(dbc, &key_dbt, &data_dbt, DB_NEXT);
1724
PROF_STOP_MEASUREMENT(SGE_PROF_SPOOLINGIO);
1727
PROF_START_MEASUREMENT(SGE_PROF_SPOOLINGIO);
1729
PROF_STOP_MEASUREMENT(SGE_PROF_SPOOLINGIO);
1738
spool_berkeleydb_read_object(lList **answer_list, bdb_info info,
1739
const bdb_database database,
1742
lListElem *ret = NULL;
1748
DBT key_dbt, data_dbt;
1750
DENTER(TOP_LAYER, "spool_berkeleydb_read_object");
1752
db = bdb_get_db(info, database);
1753
txn = bdb_get_txn(info);
1756
answer_list_add_sprintf(answer_list, STATUS_EUNKNOWN,
1757
ANSWER_QUALITY_ERROR,
1758
MSG_BERKELEY_NOCONNECTIONOPEN_S,
1759
bdb_get_database_name(database));
1761
DPRINTF(("querying object with key %s\n", key));
1763
/* initialize query to first record for this object type */
1764
memset(&key_dbt, 0, sizeof(key_dbt));
1765
key_dbt.data = (void *)key;
1766
key_dbt.size = strlen(key) + 1;
1767
memset(&data_dbt, 0, sizeof(data_dbt));
1768
data_dbt.flags = DB_DBT_MALLOC;
1769
PROF_START_MEASUREMENT(SGE_PROF_SPOOLINGIO);
1770
dbret = db->get(db, txn, &key_dbt, &data_dbt, 0);
1771
PROF_STOP_MEASUREMENT(SGE_PROF_SPOOLINGIO);
1773
spool_berkeleydb_handle_bdb_error(answer_list, info, dbret);
1774
answer_list_add_sprintf(answer_list, STATUS_EUNKNOWN,
1775
ANSWER_QUALITY_ERROR,
1776
MSG_BERKELEY_QUERYERROR_SIS,
1777
key, dbret, db_strerror(dbret));
1781
const lDescr *descr;
1783
DPRINTF(("read object with key "SFQ", size %d\n",
1784
key_dbt.data, data_dbt.size));
1785
cull_ret = init_packbuffer_from_buffer(&pb, data_dbt.data,
1787
if (cull_ret != PACK_SUCCESS) {
1788
answer_list_add_sprintf(answer_list, STATUS_EUNKNOWN,
1789
ANSWER_QUALITY_ERROR,
1790
MSG_BERKELEY_UNPACKINITERROR_SS,
1792
cull_pack_strerror(cull_ret));
1795
DPRINTF(("init_packbuffer succeeded\n"));
1797
descr = object_type_get_descr(object_name_get_type(key_dbt.data));
1798
cull_ret = cull_unpack_elem_partial(&pb, &ret, descr, pack_part);
1799
if (cull_ret != PACK_SUCCESS) {
1800
answer_list_add_sprintf(answer_list, STATUS_EUNKNOWN,
1801
ANSWER_QUALITY_ERROR,
1802
MSG_BERKELEY_UNPACKERROR_SS,
1804
cull_pack_strerror(cull_ret));
1808
/* We specified DB_DBT_MALLOC - BDB will malloc memory for each
1809
* object found and we have to free it.
1811
if (data_dbt.data != NULL) {
1812
FREE(data_dbt.data);
1821
spool_berkeleydb_read_string(lList **answer_list, bdb_info info,
1822
const bdb_database database,
1831
DBT key_dbt, data_dbt;
1833
DENTER(TOP_LAYER, "spool_berkeleydb_read_string");
1835
db = bdb_get_db(info, database);
1836
txn = bdb_get_txn(info);
1839
answer_list_add_sprintf(answer_list, STATUS_EUNKNOWN,
1840
ANSWER_QUALITY_ERROR,
1841
MSG_BERKELEY_NOCONNECTIONOPEN_S,
1842
bdb_get_database_name(database));
1844
DPRINTF(("querying string with key %s\n", key));
1846
/* initialize query to first record for this object type */
1847
memset(&key_dbt, 0, sizeof(key_dbt));
1848
key_dbt.data = (void *)key;
1849
key_dbt.size = strlen(key) + 1;
1850
memset(&data_dbt, 0, sizeof(data_dbt));
1851
data_dbt.flags = DB_DBT_MALLOC;
1852
PROF_START_MEASUREMENT(SGE_PROF_SPOOLINGIO);
1853
dbret = db->get(db, txn, &key_dbt, &data_dbt, 0);
1854
PROF_STOP_MEASUREMENT(SGE_PROF_SPOOLINGIO);
1856
spool_berkeleydb_handle_bdb_error(answer_list, info, dbret);
1857
answer_list_add_sprintf(answer_list, STATUS_EUNKNOWN,
1858
ANSWER_QUALITY_ERROR,
1859
MSG_BERKELEY_QUERYERROR_SIS,
1860
key, dbret, db_strerror(dbret));
1862
ret = (char *) data_dbt.data;
1870
spool_berkeleydb_clear_log(lList **answer_list, bdb_info info)
1875
DENTER(TOP_LAYER, "spool_berkeleydb_clear_log");
1877
/* check connection */
1878
env = bdb_get_env(info);
1880
dstring dbname_dstring = DSTRING_INIT;
1883
dbname = bdb_get_dbname(info, &dbname_dstring);
1884
answer_list_add_sprintf(answer_list, STATUS_EUNKNOWN,
1885
ANSWER_QUALITY_ERROR,
1886
MSG_BERKELEY_NOCONNECTIONOPEN_S,
1888
sge_dstring_free(&dbname_dstring);
1896
PROF_START_MEASUREMENT(SGE_PROF_SPOOLINGIO);
1897
dbret = env->log_archive(env, &list, DB_ARCH_ABS);
1898
PROF_STOP_MEASUREMENT(SGE_PROF_SPOOLINGIO);
1900
spool_berkeleydb_handle_bdb_error(answer_list, info, dbret);
1901
answer_list_add_sprintf(answer_list, STATUS_EUNKNOWN,
1902
ANSWER_QUALITY_ERROR,
1903
MSG_BERKELEY_CANNOTRETRIEVELOGARCHIVE_IS,
1904
dbret, db_strerror(dbret));
1908
if (ret && list != NULL) {
1911
for (file = list; *file != NULL; file++) {
1912
if (remove(*file) != 0) {
1913
dstring error_dstring = DSTRING_INIT;
1915
answer_list_add_sprintf(answer_list, STATUS_EDISK,
1916
ANSWER_QUALITY_ERROR,
1917
MSG_ERRORDELETINGFILE_SS,
1919
sge_strerror(errno, &error_dstring));
1920
sge_dstring_free(&error_dstring);
1935
spool_berkeleydb_trigger_rpc(lList **answer_list, bdb_info info)
1940
DENTER(TOP_LAYER, "spool_berkeleydb_trigger_rpc");
1942
/* check connection */
1943
env = bdb_get_env(info);
1945
dstring dbname_dstring = DSTRING_INIT;
1948
dbname = bdb_get_dbname(info, &dbname_dstring);
1949
answer_list_add_sprintf(answer_list, STATUS_EUNKNOWN,
1950
ANSWER_QUALITY_ERROR,
1951
MSG_BERKELEY_NOCONNECTIONOPEN_S,
1953
sge_dstring_free(&dbname_dstring);
1958
lList *local_answer_list = NULL;
1961
ep = spool_berkeleydb_read_object(&local_answer_list, info, BDB_CONFIG_DB,
1962
"..trigger_bdb_rpc_server..");
1964
lFreeList(&local_answer_list);
1972
spool_berkeleydb_checkpoint(lList **answer_list, bdb_info info)
1976
DENTER(TOP_LAYER, "spool_berkeleydb_checkpoint");
1978
/* only necessary for local spooling */
1979
if (bdb_get_server(info) == NULL) {
1982
env = bdb_get_env(info);
1984
dstring dbname_dstring = DSTRING_INIT;
1987
dbname = bdb_get_dbname(info, &dbname_dstring);
1988
answer_list_add_sprintf(answer_list, STATUS_EUNKNOWN,
1989
ANSWER_QUALITY_ERROR,
1990
MSG_BERKELEY_NOCONNECTIONOPEN_S,
1992
sge_dstring_free(&dbname_dstring);
1999
PROF_START_MEASUREMENT(SGE_PROF_SPOOLINGIO);
2000
dbret = env->txn_checkpoint(env, 0, 0, 0);
2001
PROF_STOP_MEASUREMENT(SGE_PROF_SPOOLINGIO);
2003
spool_berkeleydb_handle_bdb_error(answer_list, info, dbret);
2004
answer_list_add_sprintf(answer_list, STATUS_EUNKNOWN,
2005
ANSWER_QUALITY_ERROR,
2006
MSG_BERKELEY_CANNOTCHECKPOINT_IS,
2007
dbret, db_strerror(dbret));