2
* jabberd - Jabber Open Source Server
3
* Copyright (c) 2002 Jeremie Miller, Thomas Muldowney,
4
* Ryan Eatmon, Robert Norris
6
* This program is free software; you can redistribute it and/or modify
7
* it under the terms of the GNU General Public License as published by
8
* the Free Software Foundation; either version 2 of the License, or
9
* (at your option) any later version.
11
* This program is distributed in the hope that it will be useful,
12
* but WITHOUT ANY WARRANTY; without even the implied warranty of
13
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.See the
14
* GNU General Public License for more details.
16
* You should have received a copy of the GNU General Public License
17
* along with this program; if not, write to the Free Software
18
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA02111-1307USA
21
/** @file sm/storage_db.c
22
* @brief berkeley db storage module
23
* @author Robert Norris
24
* $Date: 2005/03/23 19:59:01 $
25
* $Revision: 1.19.2.10 $
29
* !!! we must catch DB_RUNRECOVERY and call _st_db_panic(). I would argue that
30
* Berkeley should do this for all cases, not just for the process that
31
* caused the fault, but I'm not sure they see it that way. (I have asked,
32
* just waiting for a reply)
34
* Sleepycat SR#7019 resolved this. There is an unreleased patch available
35
* (I have a copy) that will be in 4.2 (due in June).
44
/** internal structure, holds our data */
45
typedef struct drvdata_st {
56
/** internal structure, holds a single db handle */
57
typedef struct dbdata_st {
63
/* union for strict alias rules in gcc3 */
69
static st_ret_t _st_db_add_type(st_driver_t drv, const char *type) {
70
drvdata_t data = (drvdata_t) drv->private;
74
dbd = (dbdata_t) malloc(sizeof(struct dbdata_st));
75
memset(dbd, 0, sizeof(struct dbdata_st));
79
if((err = db_create(&(dbd->db), data->env, 0)) != 0) {
80
log_write(drv->st->sm->log, LOG_ERR, "db: couldn't create db handle: %s", db_strerror(err));
85
if((err = dbd->db->set_flags(dbd->db, DB_DUP)) != 0) {
86
log_write(drv->st->sm->log, LOG_ERR, "db: couldn't set database for duplicate storage: %s", db_strerror(err));
87
dbd->db->close(dbd->db, 0);
92
if((err = dbd->db->open(dbd->db, NULL, "sm.db", type, DB_HASH, DB_AUTO_COMMIT | DB_CREATE, 0)) != 0) {
93
log_write(drv->st->sm->log, LOG_ERR, "db: couldn't open storage db: %s", db_strerror(err));
94
dbd->db->close(dbd->db, 0);
99
xhash_put(data->dbs, type, dbd);
104
/** make a new cursor (optionally wrapped in a txn) */
105
static st_ret_t _st_db_cursor_new(st_driver_t drv, dbdata_t dbd, DBC **cursor, DB_TXN **txnid) {
109
if((err = dbd->data->env->txn_begin(dbd->data->env, NULL, txnid, DB_TXN_SYNC)) != 0) {
110
log_write(drv->st->sm->log, LOG_ERR, "db: couldn't begin new transaction: %s", db_strerror(err));
115
err = dbd->db->cursor(dbd->db, NULL, cursor, 0);
117
err = dbd->db->cursor(dbd->db, *txnid, cursor, 0);
120
log_write(drv->st->sm->log, LOG_ERR, "db: couldn't create cursor: %s", db_strerror(err));
122
(*txnid)->abort(*txnid);
129
/** close down a cursor */
130
static st_ret_t _st_db_cursor_free(st_driver_t drv, dbdata_t dbd, DBC *cursor, DB_TXN *txnid) {
133
if((err = cursor->c_close(cursor)) != 0) {
134
log_write(drv->st->sm->log, LOG_ERR, "db: couldn't close cursor: %s", db_strerror(err));
141
if((err = txnid->commit(txnid, DB_TXN_SYNC)) != 0) {
142
log_write(drv->st->sm->log, LOG_ERR, "db: couldn't commit transaction: %s", db_strerror(err));
149
static void _st_db_object_serialise(os_object_t o, char **buf, int *len) {
150
char *key, *xml, *xmlstr;
155
log_debug(ZONE, "serialising object");
160
if(os_object_iter_first(o))
162
os_object_iter_get(o, &key, &val, &ot);
164
log_debug(ZONE, "serialising key %s", key);
166
ser_string_set(key, &cur, buf, len);
167
ser_int_set(ot, &cur, buf, len);
170
case os_type_BOOLEAN:
171
ser_int_set(((int) val) != 0, &cur, buf, len);
174
case os_type_INTEGER:
175
ser_int_set((int) val, &cur, buf, len);
179
ser_string_set((char *) val, &cur, buf, len);
183
nad_print((nad_t) val, 0, &xml, &xlen);
184
xmlstr = (char *) malloc(sizeof(char) * (xlen + 1));
185
sprintf(xmlstr, "%.*s", xlen, xml);
186
ser_string_set(xmlstr, &cur, buf, len);
190
case os_type_UNKNOWN:
193
} while(os_object_iter_next(o));
198
static os_object_t _st_db_object_deserialise(st_driver_t drv, os_t os, const char *buf, int len) {
206
log_debug(ZONE, "deserialising object");
208
o = os_object_new(os);
212
if(ser_string_get(&key, &cur, buf, len) != 0 || ser_int_get(&ot, &cur, buf, len) != 0) {
213
log_debug(ZONE, "ran off the end of the buffer");
217
log_debug(ZONE, "deserialising key %s", key);
219
switch((os_type_t) ot) {
220
case os_type_BOOLEAN:
221
ser_int_get(&ival, &cur, buf, len);
223
os_object_put(o, key, &ival, os_type_BOOLEAN);
226
case os_type_INTEGER:
227
ser_int_get(&ival, &cur, buf, len);
228
os_object_put(o, key, &ival, os_type_INTEGER);
232
ser_string_get(&sval, &cur, buf, len);
233
os_object_put(o, key, sval, os_type_STRING);
238
ser_string_get(&sval, &cur, buf, len);
239
nad = nad_parse(drv->st->sm->router->nad_cache, sval, strlen(sval));
242
log_write(drv->st->sm->log, LOG_ERR, "db: unable to parse stored XML - database corruption?");
245
os_object_put(o, key, nad, os_type_NAD);
249
case os_type_UNKNOWN:
259
static st_ret_t _st_db_put_guts(st_driver_t drv, const char *type, const char *owner, os_t os, dbdata_t dbd, DBC *c, DB_TXN *t) {
265
memset(&key, 0, sizeof(DBT));
266
memset(&val, 0, sizeof(DBT));
268
key.data = (char *) owner;
269
key.size = strlen(owner);
271
if(os_iter_first(os))
273
o = os_iter_object(os);
274
_st_db_object_serialise(o, &buf, &len);
279
if((err = c->c_put(c, &key, &val, DB_KEYLAST)) != 0) {
280
log_write(drv->st->sm->log, LOG_ERR, "db: couldn't store value for type %s owner %s in storage db: %s", type, owner, db_strerror(err));
287
} while(os_iter_next(os));
292
static st_ret_t _st_db_put(st_driver_t drv, const char *type, const char *owner, os_t os) {
293
drvdata_t data = (drvdata_t) drv->private;
294
dbdata_t dbd = xhash_get(data->dbs, type);
299
if(os_count(os) == 0)
302
ret = _st_db_cursor_new(drv, dbd, &c, &t);
303
if(ret != st_SUCCESS)
306
ret = _st_db_put_guts(drv, type, owner, os, dbd, c, t);
307
if(ret != st_SUCCESS) {
309
_st_db_cursor_free(drv, dbd, c, NULL);
313
return _st_db_cursor_free(drv, dbd, c, t);
316
static st_ret_t _st_db_get(st_driver_t drv, const char *type, const char *owner, const char *filter, os_t *os) {
317
drvdata_t data = (drvdata_t) drv->private;
318
dbdata_t dbd = xhash_get(data->dbs, type);
328
ret = _st_db_cursor_new(drv, dbd, &c, &t);
329
if(ret != st_SUCCESS)
334
f = xhash_get(data->filters, filter);
336
f = storage_filter(filter);
337
cfilter = pstrdup(xhash_pool(data->filters), filter);
338
xhash_put(data->filters, cfilter, (void *) f);
339
pool_cleanup(xhash_pool(data->filters), (pool_cleaner) pool_free, f->p);
343
memset(&key, 0, sizeof(DBT));
344
memset(&val, 0, sizeof(DBT));
346
key.data = (char *) owner;
347
key.size = strlen(owner);
351
err = c->c_get(c, &key, &val, DB_SET);
353
o = _st_db_object_deserialise(drv, *os, val.data, val.size);
355
if(o != NULL && !storage_match(f, o, *os))
358
err = c->c_get(c, &key, &val, DB_NEXT_DUP);
361
if(err != 0 && err != DB_NOTFOUND) {
362
log_write(drv->st->sm->log, LOG_ERR, "db: couldn't move cursor for type %s owner %s in storage db: %s", type, owner, db_strerror(err));
364
_st_db_cursor_free(drv, dbd, c, NULL);
369
ret = _st_db_cursor_free(drv, dbd, c, t);
370
if(ret != st_SUCCESS) {
375
if(os_count(*os) == 0) {
383
static st_ret_t _st_db_delete_guts(st_driver_t drv, const char *type, const char *owner, const char *filter, dbdata_t dbd, DBC *c, DB_TXN *t) {
384
drvdata_t data = (drvdata_t) drv->private;
394
f = xhash_get(data->filters, filter);
396
f = storage_filter(filter);
397
cfilter = pstrdup(xhash_pool(data->filters), filter);
398
xhash_put(data->filters, cfilter, (void *) f);
399
pool_cleanup(xhash_pool(data->filters), (pool_cleaner) pool_free, f->p);
403
memset(&key, 0, sizeof(DBT));
404
memset(&val, 0, sizeof(DBT));
406
key.data = (char *) owner;
407
key.size = strlen(owner);
411
err = c->c_get(c, &key, &val, DB_SET);
413
o = _st_db_object_deserialise(drv, os, val.data, val.size);
415
if(o != NULL && storage_match(f, o, os))
416
err = c->c_del(c, 0);
419
err = c->c_get(c, &key, &val, DB_NEXT_DUP);
424
if(err != 0 && err != DB_NOTFOUND) {
425
log_write(drv->st->sm->log, LOG_ERR, "db: couldn't move cursor for type %s owner %s in storage db: %s", type, owner, db_strerror(err));
432
static st_ret_t _st_db_delete(st_driver_t drv, const char *type, const char *owner, const char *filter) {
433
drvdata_t data = (drvdata_t) drv->private;
434
dbdata_t dbd = xhash_get(data->dbs, type);
439
ret = _st_db_cursor_new(drv, dbd, &c, &t);
440
if(ret != st_SUCCESS)
443
ret = _st_db_delete_guts(drv, type, owner, filter, dbd, c, t);
444
if(ret != st_SUCCESS) {
446
_st_db_cursor_free(drv, dbd, c, NULL);
450
return _st_db_cursor_free(drv, dbd, c, t);
453
static st_ret_t _st_db_replace(st_driver_t drv, const char *type, const char *owner, const char *filter, os_t os) {
454
drvdata_t data = (drvdata_t) drv->private;
455
dbdata_t dbd = xhash_get(data->dbs, type);
460
ret = _st_db_cursor_new(drv, dbd, &c, &t);
461
if(ret != st_SUCCESS)
464
ret = _st_db_delete_guts(drv, type, owner, filter, dbd, c, t);
465
if(ret != st_SUCCESS) {
467
_st_db_cursor_free(drv, dbd, c, NULL);
471
if(os_count(os) == 0)
472
return _st_db_cursor_free(drv, dbd, c, t);
474
ret = _st_db_put_guts(drv, type, owner, os, dbd, c, t);
475
if(ret != st_SUCCESS) {
477
_st_db_cursor_free(drv, dbd, c, NULL);
481
return _st_db_cursor_free(drv, dbd, c, t);
484
static void _st_db_free(st_driver_t drv) {
485
drvdata_t data = (drvdata_t) drv->private;
492
if(xhash_iter_first(data->dbs))
494
xhash_iter_get(data->dbs, &key, xhv.val);
496
log_debug(ZONE, "closing %s db", key);
498
dbd->db->close(dbd->db, 0);
500
} while(xhash_iter_next(data->dbs));
502
xhash_free(data->dbs);
504
xhash_free(data->filters);
506
data->env->close(data->env, 0);
508
/* remove db environment files if no longer in use */
509
if (db_env_create(&env, 0) == 0)
510
env->remove(env, data->path, 0);
515
/** panic function */
516
static void _st_db_panic(DB_ENV *env, int errval) {
517
log_t log = (log_t) env->app_private;
519
log_write(log, LOG_CRIT, "db: corruption detected! close all jabberd processes and run db_recover");
524
st_ret_t st_db_init(st_driver_t drv) {
530
path = config_get_one(drv->st->sm->config, "storage.db.path", 0);
532
log_write(drv->st->sm->log, LOG_ERR, "db: no path specified in config file");
536
if((err = db_env_create(&env, 0)) != 0) {
537
log_write(drv->st->sm->log, LOG_ERR, "db: couldn't create environment: %s", db_strerror(err));
541
if((err = env->set_paniccall(env, _st_db_panic)) != 0) {
542
log_write(drv->st->sm->log, LOG_ERR, "db: couldn't set panic call: %s", db_strerror(err));
546
/* store the log context in case we panic */
547
env->app_private = drv->st->sm->log;
549
if((err = env->open(env, path, DB_INIT_LOCK | DB_INIT_MPOOL | DB_INIT_LOG | DB_INIT_TXN | DB_CREATE, 0)) != 0) {
550
log_write(drv->st->sm->log, LOG_ERR, "db: couldn't open environment: %s", db_strerror(err));
555
data = (drvdata_t) malloc(sizeof(struct drvdata_st));
556
memset(data, 0, sizeof(struct drvdata_st));
561
if(config_get_one(drv->st->sm->config, "storage.db.sync", 0) != NULL)
564
data->dbs = xhash_new(101);
566
data->filters = xhash_new(17);
568
drv->private = (void *) data;
570
drv->add_type = _st_db_add_type;
571
drv->put = _st_db_put;
572
drv->get = _st_db_get;
573
drv->replace = _st_db_replace;
574
drv->delete = _st_db_delete;
575
drv->free = _st_db_free;