5
5
* BerkeleyDB API, but much simplified.
8
* Copyright 2011 Howard Chu, Symas Corp.
8
* Copyright 2011-2012 Howard Chu, Symas Corp.
9
9
* All rights reserved.
11
11
* Redistribution and use in source and binary forms, with or without
72
#include <valgrind/memcheck.h>
73
#define VGMEMP_CREATE(h,r,z) VALGRIND_CREATE_MEMPOOL(h,r,z)
74
#define VGMEMP_ALLOC(h,a,s) VALGRIND_MEMPOOL_ALLOC(h,a,s)
75
#define VGMEMP_FREE(h,a) VALGRIND_MEMPOOL_FREE(h,a)
76
#define VGMEMP_DESTROY(h) VALGRIND_DESTROY_MEMPOOL(h)
77
#define VGMEMP_DEFINED(a,s) VALGRIND_MAKE_MEM_DEFINED(a,s)
79
#define VGMEMP_CREATE(h,r,z)
80
#define VGMEMP_ALLOC(h,a,s)
81
#define VGMEMP_FREE(h,a)
82
#define VGMEMP_DESTROY(h)
83
#define VGMEMP_DEFINED(a,s)
68
87
# if (defined(_LITTLE_ENDIAN) || defined(_BIG_ENDIAN)) && !(defined(_LITTLE_ENDIAN) && defined(_BIG_ENDIAN))
69
88
/* Solaris just defines one or the other */
238
259
#if !(__STDC_VERSION__ >= 199901L || defined(__GNUC__))
239
260
# define DPRINTF (void) /* Vararg macros may be unsupported */
262
static int mdb_debug;
263
static int mdb_debug_start;
241
265
/** Print a debug message with printf formatting. */
242
266
# define DPRINTF(fmt, ...) /**< Requires 2 or more args */ \
243
fprintf(stderr, "%s:%d " fmt "\n", __func__, __LINE__, __VA_ARGS__)
267
if (mdb_debug) fprintf(stderr, "%s:%d " fmt "\n", __func__, __LINE__, __VA_ARGS__)
245
269
# define DPRINTF(fmt, ...) ((void) 0)
313
337
/** @defgroup lazylock Lazy Locking
314
* Macros for locks that are't actually needed.
338
* Macros for locks that aren't actually needed.
315
339
* The DB view is always consistent because all writes are wrapped in
316
340
* the wmutex. Finer-grained locks aren't necessary.
473
497
* unlikely. If a collision occurs, the results are unpredictable.
475
499
typedef struct MDB_txbody {
476
/** Stamp identifying this as an MDB lock file. It must be set
500
/** Stamp identifying this as an MDB file. It must be set
477
501
* to #MDB_MAGIC. */
478
502
uint32_t mtb_magic;
479
503
/** Version number of this lock file. Must be set to #MDB_VERSION. */
524
548
pthread_mutex_t mt2_wmutex;
525
549
#define mti_wmutex mt2.mt2_wmutex
527
char pad[(sizeof(pthread_mutex_t)+CACHELINE-1) & ~(CACHELINE-1)];
551
char pad[(MNAME_LEN+CACHELINE-1) & ~(CACHELINE-1)];
529
553
MDB_reader mti_readers[1];
730
754
/** Meta page content. */
731
755
typedef struct MDB_meta {
732
/** Stamp identifying this as an MDB data file. It must be set
756
/** Stamp identifying this as an MDB file. It must be set
733
757
* to #MDB_MAGIC. */
734
758
uint32_t mm_magic;
735
759
/** Version number of this lock file. Must be set to #MDB_VERSION. */
745
769
txnid_t mm_txnid; /**< txnid that committed this page */
772
/** Buffer for a stack-allocated dirty page.
773
* The members define size and alignment, and silence type
774
* aliasing warnings. They are not used directly; that could
775
* mean incorrectly using several union members in parallel.
777
typedef union MDB_pagebuf {
778
char mb_raw[MDB_PAGESIZE];
781
char mm_pad[PAGEHDRSZ];
748
786
/** Auxiliary DB info.
749
787
* The information here is mostly static/read-only. There is
750
788
* only a single copy of this record in the environment.
838
876
/** The @ref mt_dbflag for this database */
839
877
unsigned char *mc_dbflag;
840
878
unsigned short mc_snum; /**< number of pushed pages */
841
unsigned short mc_top; /**< index of top page, mc_snum-1 */
879
unsigned short mc_top; /**< index of top page, normally mc_snum-1 */
842
880
/** @defgroup mdb_cursor Cursor Flags
843
881
* @ingroup internal
844
882
* Cursor state flags.
906
944
unsigned int me_psize; /**< size of a page, from #GET_PAGESIZE */
907
945
unsigned int me_db_toggle; /**< which DB table is current */
908
946
txnid_t me_wtxnid; /**< ID of last txn we committed */
947
txnid_t me_pgfirst; /**< ID of first old page record we used */
948
txnid_t me_pglast; /**< ID of last old page record we used */
909
949
MDB_dbx *me_dbxs; /**< array of static DB info */
910
950
MDB_db *me_dbs[2]; /**< two arrays of MDB_db info */
911
951
MDB_oldpages *me_pghead; /**< list of old page records */
929
969
/** max number of pages to commit in one writev() call */
930
970
#define MDB_COMMIT_PAGES 64
971
#if defined(IOV_MAX) && IOV_MAX < MDB_COMMIT_PAGES
972
#undef MDB_COMMIT_PAGES
973
#define MDB_COMMIT_PAGES IOV_MAX
932
976
static MDB_page *mdb_page_alloc(MDB_cursor *mc, int num);
933
977
static MDB_page *mdb_page_new(MDB_cursor *mc, uint32_t flags, int num);
1094
/** Display all the keys in the page. */
1096
mdb_page_keys(MDB_page *mp)
1099
unsigned int i, nkeys;
1103
nkeys = NUMKEYS(mp);
1104
DPRINTF("numkeys %d", nkeys);
1105
for (i=0; i<nkeys; i++) {
1106
node = NODEPTR(mp, i);
1107
key.mv_size = node->mn_ksize;
1108
key.mv_data = node->mn_data;
1109
DPRINTF("key %d: %s", i, DKEY(&key));
1115
/** Count all the pages in each DB and in the freelist
1116
* and make sure it matches the actual number of pages
1119
static void mdb_audit(MDB_txn *txn)
1124
ID freecount, count;
1127
mdb_cursor_init(&mc, txn, FREE_DBI, NULL);
1128
while ((rc = mdb_cursor_get(&mc, &key, &data, MDB_NEXT)) == 0)
1129
freecount += *(ID *)data.mv_data;
1130
freecount += txn->mt_dbs[0].md_branch_pages + txn->mt_dbs[0].md_leaf_pages +
1131
txn->mt_dbs[0].md_overflow_pages;
1134
for (i = 0; i<txn->mt_numdbs; i++) {
1135
count += txn->mt_dbs[i].md_branch_pages +
1136
txn->mt_dbs[i].md_leaf_pages +
1137
txn->mt_dbs[i].md_overflow_pages;
1138
if (txn->mt_dbs[i].md_flags & MDB_DUPSORT) {
1140
mdb_cursor_init(&mc, txn, i, &mx);
1141
mdb_page_search(&mc, NULL, 0);
1145
mp = mc.mc_pg[mc.mc_top];
1146
for (j=0; j<NUMKEYS(mp); j++) {
1147
MDB_node *leaf = NODEPTR(mp, j);
1148
if (leaf->mn_flags & F_SUBDATA) {
1150
memcpy(&db, NODEDATA(leaf), sizeof(db));
1151
count += db.md_branch_pages + db.md_leaf_pages +
1152
db.md_overflow_pages;
1156
while (mdb_cursor_sibling(&mc, 1) == 0);
1159
assert(freecount + count + 2 >= txn->mt_next_pgno - 1);
1068
1181
static MDB_page *
1069
1182
mdb_page_malloc(MDB_cursor *mc) {
1071
if (mc->mc_txn->mt_env->me_dpages) {
1072
ret = mc->mc_txn->mt_env->me_dpages;
1184
size_t sz = mc->mc_txn->mt_env->me_psize;
1185
if ((ret = mc->mc_txn->mt_env->me_dpages) != NULL) {
1186
VGMEMP_ALLOC(mc->mc_txn->mt_env, ret, sz);
1187
VGMEMP_DEFINED(ret, sizeof(ret->mp_next));
1073
1188
mc->mc_txn->mt_env->me_dpages = ret->mp_next;
1075
ret = malloc(mc->mc_txn->mt_env->me_psize);
1189
} else if ((ret = malloc(sz)) != NULL) {
1190
VGMEMP_ALLOC(mc->mc_txn->mt_env, ret, sz);
1097
1212
if (txn->mt_txnid > 2) {
1099
if (!txn->mt_env->me_pghead && mc->mc_dbi != FREE_DBI &&
1214
if (!txn->mt_env->me_pghead &&
1100
1215
txn->mt_dbs[FREE_DBI].md_root != P_INVALID) {
1101
1216
/* See if there's anything in the free DB */
1103
1218
MDB_node *leaf;
1104
txnid_t *kptr, oldest;
1220
txnid_t *kptr, oldest, last;
1106
1222
mdb_cursor_init(&m2, txn, FREE_DBI, NULL);
1107
mdb_page_search(&m2, NULL, 0);
1108
leaf = NODEPTR(m2.mc_pg[m2.mc_top], 0);
1109
kptr = (txnid_t *)NODEKEY(leaf);
1223
if (!txn->mt_env->me_pgfirst) {
1224
mdb_page_search(&m2, NULL, 0);
1225
leaf = NODEPTR(m2.mc_pg[m2.mc_top], 0);
1226
kptr = (txnid_t *)NODEKEY(leaf);
1233
last = txn->mt_env->me_pglast + 1;
1235
key.mv_data = &last;
1236
key.mv_size = sizeof(last);
1237
rc = mdb_cursor_set(&m2, &key, &data, MDB_SET, &exact);
1240
last = *(txnid_t *)key.mv_data;
1112
1244
unsigned int i;
1121
if (oldest > *kptr) {
1253
if (oldest > last) {
1122
1254
/* It's usable, grab it.
1124
1256
MDB_oldpages *mop;
1128
mdb_node_read(txn, leaf, &data);
1259
if (!txn->mt_env->me_pgfirst) {
1260
mdb_node_read(txn, leaf, &data);
1262
txn->mt_env->me_pglast = last;
1263
if (!txn->mt_env->me_pgfirst)
1264
txn->mt_env->me_pgfirst = last;
1129
1265
idl = (ID *) data.mv_data;
1266
/* We might have a zero-length IDL due to freelist growth
1267
* during a prior commit
1269
if (!idl[0]) goto again;
1130
1270
mop = malloc(sizeof(MDB_oldpages) + MDB_IDL_SIZEOF(idl) - sizeof(pgno_t));
1131
1271
mop->mo_next = txn->mt_env->me_pghead;
1132
mop->mo_txnid = *kptr;
1272
mop->mo_txnid = last;
1133
1273
txn->mt_env->me_pghead = mop;
1134
1274
memcpy(mop->mo_pages, idl, MDB_IDL_SIZEOF(idl));
1185
1322
if (txn->mt_env->me_dpages && num == 1) {
1186
1323
np = txn->mt_env->me_dpages;
1324
VGMEMP_ALLOC(txn->mt_env, np, txn->mt_env->me_psize);
1325
VGMEMP_DEFINED(np, sizeof(np->mp_next));
1187
1326
txn->mt_env->me_dpages = np->mp_next;
1189
if ((np = malloc(txn->mt_env->me_psize * num )) == NULL)
1328
size_t sz = txn->mt_env->me_psize * num;
1329
if ((np = malloc(sz)) == NULL)
1331
VGMEMP_ALLOC(txn->mt_env, np, sz);
1192
1333
if (pgno == P_INVALID) {
1193
1334
np->mp_pgno = txn->mt_next_pgno;
1234
1375
for (m2 = mc->mc_txn->mt_cursors[dbi]; m2; m2=m2->mc_next) {
1235
1376
if (m2 == mc) continue;
1236
1377
m3 = &m2->mc_xcursor->mx_cursor;
1378
if (m3->mc_snum < mc->mc_snum) continue;
1237
1379
if (m3->mc_pg[mc->mc_top] == mc->mc_pg[mc->mc_top]) {
1238
1380
m3->mc_pg[mc->mc_top] = mp;
1589
1735
dp = txn->mt_u.dirty_list[i].mptr;
1590
1736
if (!IS_OVERFLOW(dp) || dp->mp_pages == 1) {
1591
1737
dp->mp_next = txn->mt_env->me_dpages;
1738
VGMEMP_FREE(txn->mt_env, dp);
1592
1739
txn->mt_env->me_dpages = dp;
1594
1741
/* large pages just get freed directly */
1742
VGMEMP_FREE(txn->mt_env, dp);
1766
1918
/* make sure first page of freeDB is touched and on freelist */
1767
1919
mdb_page_search(&mc, NULL, 1);
1922
/* Delete IDLs we used from the free list */
1923
if (env->me_pgfirst) {
1928
key.mv_size = sizeof(cur);
1929
for (cur = env->me_pgfirst; cur <= env->me_pglast; cur++) {
1932
mdb_cursor_set(&mc, &key, NULL, MDB_SET, &exact);
1933
mdb_cursor_del(&mc, 0);
1935
env->me_pgfirst = 0;
1769
1939
/* save to free list */
1941
freecnt = txn->mt_free_pgs[0];
1770
1942
if (!MDB_IDL_IS_ZERO(txn->mt_free_pgs)) {
1771
1943
MDB_val key, data;
1774
1945
/* make sure last page of freeDB is touched and on freelist */
1775
1946
key.mv_size = MAXKEYSIZE+1;
1797
1968
* and make sure the entire thing got written.
1800
i = txn->mt_free_pgs[0];
1971
freecnt = txn->mt_free_pgs[0];
1801
1972
data.mv_size = MDB_IDL_SIZEOF(txn->mt_free_pgs);
1802
1973
rc = mdb_cursor_put(&mc, &key, &data, 0);
1804
1975
mdb_txn_abort(txn);
1807
} while (i != txn->mt_free_pgs[0]);
1808
if (mdb_midl_shrink(&txn->mt_free_pgs))
1809
env->me_free_pgs = txn->mt_free_pgs;
1978
} while (freecnt != txn->mt_free_pgs[0]);
1811
1980
/* should only be one record now */
1812
1982
if (env->me_pghead) {
1813
1983
MDB_val key, data;
1814
1984
MDB_oldpages *mop;
1816
1988
mop = env->me_pghead;
1817
env->me_pghead = NULL;
1818
key.mv_size = sizeof(pgno_t);
1819
key.mv_data = &mop->mo_txnid;
1990
key.mv_size = sizeof(id);
1820
1992
data.mv_size = MDB_IDL_SIZEOF(mop->mo_pages);
1821
1993
data.mv_data = mop->mo_pages;
1994
orig = mop->mo_pages[0];
1995
/* These steps may grow the freelist again
1996
* due to freed overflow pages...
1822
1998
mdb_cursor_put(&mc, &key, &data, 0);
1999
if (mop == env->me_pghead && env->me_pghead->mo_txnid == id) {
2000
/* could have been used again here */
2001
if (mop->mo_pages[0] != orig) {
2002
data.mv_size = MDB_IDL_SIZEOF(mop->mo_pages);
2003
data.mv_data = mop->mo_pages;
2005
mdb_cursor_put(&mc, &key, &data, 0);
2007
env->me_pghead = NULL;
2010
/* was completely used up */
2011
mdb_cursor_del(&mc, 0);
2015
env->me_pgfirst = 0;
2018
/* Check for growth of freelist again */
2019
if (freecnt != txn->mt_free_pgs[0])
2022
if (!MDB_IDL_IS_ZERO(txn->mt_free_pgs)) {
2023
if (mdb_midl_shrink(&txn->mt_free_pgs))
2024
env->me_free_pgs = txn->mt_free_pgs;
1826
2027
/* Update DB root pointers. Their pages have already been
1885
2089
dp = txn->mt_u.dirty_list[i].mptr;
1886
2090
if (dp->mp_pgno != next) {
1888
DPRINTF("committing %u dirty pages", n);
1889
2092
rc = writev(env->me_fd, iov, n);
1890
2093
if (rc != size) {
1940
2142
dp = txn->mt_u.dirty_list[i].mptr;
1941
2143
if (!IS_OVERFLOW(dp) || dp->mp_pages == 1) {
1942
2144
dp->mp_next = txn->mt_env->me_dpages;
2145
VGMEMP_FREE(txn->mt_env, dp);
1943
2146
txn->mt_env->me_dpages = dp;
2148
VGMEMP_FREE(txn->mt_env, dp);
1947
2151
txn->mt_u.dirty_list[i].mid = 0;
2004
if (!ReadFile(env->me_fd, page, MDB_PAGESIZE, (DWORD *)&rc, NULL) || rc == 0)
2208
if (!ReadFile(env->me_fd, &pbuf, MDB_PAGESIZE, (DWORD *)&rc, NULL) || rc == 0)
2006
if ((rc = read(env->me_fd, page, MDB_PAGESIZE)) == 0)
2210
if ((rc = read(env->me_fd, &pbuf, MDB_PAGESIZE)) == 0)
2581
2788
rc = ErrCode();
2586
env->me_txns = (MDB_txninfo *)mmap(0, rsize, PROT_READ|PROT_WRITE, MAP_SHARED,
2588
if (env->me_txns == MAP_FAILED) {
2792
void *m = mmap(NULL, rsize, PROT_READ|PROT_WRITE, MAP_SHARED,
2794
if (m == MAP_FAILED) {
2795
env->me_txns = NULL;
2595
2804
char hexbuf[17];
4031
4242
/* create a fake page for the dup items */
4032
4243
memcpy(dbuf, dkey.mv_data, dkey.mv_size);
4033
4244
dkey.mv_data = dbuf;
4034
fp = (MDB_page *)pbuf;
4245
fp = (MDB_page *)&pbuf;
4035
4246
fp->mp_pgno = mc->mc_pg[mc->mc_top]->mp_pgno;
4036
4247
fp->mp_flags = P_LEAF|P_DIRTY|P_SUBP;
4037
4248
fp->mp_lower = PAGEHDRSZ;
4129
/* same size, just replace it */
4130
if (!F_ISSET(leaf->mn_flags, F_BIGDATA) &&
4131
NODEDSZ(leaf) == data->mv_size) {
4340
/* overflow page overwrites need special handling */
4341
if (F_ISSET(leaf->mn_flags, F_BIGDATA)) {
4344
int ovpages, dpages;
4346
ovpages = OVPAGES(NODEDSZ(leaf), mc->mc_txn->mt_env->me_psize);
4347
dpages = OVPAGES(data->mv_size, mc->mc_txn->mt_env->me_psize);
4348
memcpy(&pg, NODEDATA(leaf), sizeof(pg));
4349
mdb_page_get(mc->mc_txn, pg, &omp);
4350
/* Is the ov page writable and large enough? */
4351
if ((omp->mp_flags & P_DIRTY) && ovpages >= dpages) {
4352
/* yes, overwrite it. Note in this case we don't
4353
* bother to try shrinking the node if the new data
4354
* is smaller than the overflow threshold.
4356
if (F_ISSET(flags, MDB_RESERVE))
4357
data->mv_data = METADATA(omp);
4359
memcpy(METADATA(omp), data->mv_data, data->mv_size);
4362
/* no, free ovpages */
4364
mc->mc_db->md_overflow_pages -= ovpages;
4365
for (i=0; i<ovpages; i++) {
4366
DPRINTF("freed ov page %zu", pg);
4367
mdb_midl_append(&mc->mc_txn->mt_free_pgs, pg);
4371
} else if (NODEDSZ(leaf) == data->mv_size) {
4372
/* same size, just replace it. Note that we could
4373
* also reuse this node if the new data is smaller,
4374
* but instead we opt to shrink the node in that case.
4132
4376
if (F_ISSET(flags, MDB_RESERVE))
4133
4377
data->mv_data = NODEDATA(leaf);
4208
4452
MDB_page *mp = mc->mc_pg[i];
4210
4454
for (m2 = mc->mc_txn->mt_cursors[mc->mc_dbi]; m2; m2=m2->mc_next) {
4211
if (m2 == mc) continue;
4455
if (m2 == mc || m2->mc_snum < mc->mc_snum) continue;
4212
4456
if (m2->mc_pg[i] == mp && m2->mc_ki[i] == mc->mc_ki[i]) {
4213
4457
mdb_xcursor_init1(m2, leaf);
4218
4462
xflags |= (flags & MDB_APPEND);
4219
4463
rc = mdb_cursor_put(&mc->mc_xcursor->mx_cursor, data, &xdata, xflags);
4220
4464
if (flags & F_SUBDATA) {
4221
db = NODEDATA(leaf);
4465
void *db = NODEDATA(leaf);
4222
4466
memcpy(db, &mc->mc_xcursor->mx_db, sizeof(MDB_db));
4268
4512
if (mc->mc_xcursor->mx_db.md_entries) {
4269
4513
if (leaf->mn_flags & F_SUBDATA) {
4270
4514
/* update subDB info */
4271
MDB_db *db = NODEDATA(leaf);
4515
void *db = NODEDATA(leaf);
4272
4516
memcpy(db, &mc->mc_xcursor->mx_db, sizeof(MDB_db));
4274
4518
/* shrink fake page */
4344
4588
sz = LEAFSIZE(key, data);
4345
if (data->mv_size >= env->me_psize / MDB_MINKEYS) {
4589
if (sz >= env->me_psize / MDB_MINKEYS) {
4346
4590
/* put on overflow page */
4347
4591
sz -= data->mv_size - sizeof(pgno_t);
4435
4679
if (F_ISSET(flags, F_BIGDATA)) {
4436
4680
/* Data already on overflow page. */
4437
4681
node_size += sizeof(pgno_t);
4438
} else if (data->mv_size >= mc->mc_txn->mt_env->me_psize / MDB_MINKEYS) {
4682
} else if (node_size + data->mv_size >= mc->mc_txn->mt_env->me_psize / MDB_MINKEYS) {
4439
4683
int ovpages = OVPAGES(data->mv_size, mc->mc_txn->mt_env->me_psize);
4440
4684
/* Put data on overflow page. */
4441
DPRINTF("data size is %zu, put on overflow page",
4685
DPRINTF("data size is %zu, node would be %zu, put data on overflow page",
4686
data->mv_size, node_size+data->mv_size);
4443
4687
node_size += sizeof(pgno_t);
4444
4688
if ((ofp = mdb_page_new(mc, P_OVERFLOW, ovpages)) == NULL)
4642
4886
mx->mx_cursor.mc_dbi = mc->mc_dbi+1;
4643
4887
mx->mx_cursor.mc_dbflag = &mx->mx_dbflag;
4644
4888
mx->mx_cursor.mc_snum = 0;
4889
mx->mx_cursor.mc_top = 0;
4645
4890
mx->mx_cursor.mc_flags = C_SUB;
4646
4891
mx->mx_dbx.md_cmp = mc->mc_dbx->md_dcmp;
4647
4892
mx->mx_dbx.md_dcmp = NULL;
4660
4905
MDB_xcursor *mx = mc->mc_xcursor;
4662
4907
if (node->mn_flags & F_SUBDATA) {
4663
MDB_db *db = NODEDATA(node);
4908
memcpy(&mx->mx_db, NODEDATA(node), sizeof(MDB_db));
4665
4909
mx->mx_cursor.mc_snum = 0;
4666
4910
mx->mx_cursor.mc_flags = C_SUB;
4730
4975
MDB_xcursor *mx = NULL;
4731
4976
size_t size = sizeof(MDB_cursor);
4733
if (txn == NULL || ret == NULL || !dbi || dbi >= txn->mt_numdbs)
4978
if (txn == NULL || ret == NULL || dbi >= txn->mt_numdbs)
4981
/* Allow read access to the freelist */
4982
if (!dbi && !F_ISSET(txn->mt_flags, MDB_TXN_RDONLY))
4736
4985
if (txn->mt_dbs[dbi].md_flags & MDB_DUPSORT)
4819
5068
mdb_update_key(MDB_page *mp, indx_t indx, MDB_val *key)
4821
indx_t ptr, i, numkeys;
4824
5070
MDB_node *node;
5074
indx_t ptr, i, numkeys;
4828
5077
node = NODEPTR(mp, indx);
4829
5078
ptr = mp->mp_ptrs[indx];
4830
DPRINTF("update key %u (ofs %u) [%.*s] to [%s] on page %zu",
4832
(int)node->mn_ksize, (char *)NODEKEY(node),
4836
delta = key->mv_size - node->mn_ksize;
5082
char kbuf2[(MAXKEYSIZE*2+1)];
5083
k2.mv_data = NODEKEY(node);
5084
k2.mv_size = node->mn_ksize;
5085
DPRINTF("update key %u (ofs %u) [%s] to [%s] on page %zu",
5087
mdb_dkey(&k2, kbuf2),
5093
delta0 = delta = key->mv_size - node->mn_ksize;
5095
/* Must be 2-byte aligned. If new key is
5096
* shorter by 1, the shift will be skipped.
5098
delta += (delta & 1);
4838
5100
if (delta > 0 && SIZELEFT(mp) < delta) {
4839
5101
DPRINTF("OUCH! Not enough room, delta = %d", delta);
4852
5114
mp->mp_upper -= delta;
4854
5116
node = NODEPTR(mp, indx);
5119
/* But even if no shift was needed, update ksize */
4855
5121
node->mn_ksize = key->mv_size;
4858
memcpy(NODEKEY(node), key->mv_data, key->mv_size);
5124
memcpy(NODEKEY(node), key->mv_data, key->mv_size);
4860
5126
return MDB_SUCCESS;
4881
5150
key.mv_data = LEAF2KEY(csrc->mc_pg[csrc->mc_top], csrc->mc_ki[csrc->mc_top], key.mv_size);
4882
5151
data.mv_size = 0;
4883
5152
data.mv_data = NULL;
4885
5156
srcnode = NODEPTR(csrc->mc_pg[csrc->mc_top], csrc->mc_ki[csrc->mc_top]);
5157
assert(!((long)srcnode&1));
5158
srcpg = NODEPGNO(srcnode);
5159
flags = srcnode->mn_flags;
4886
5160
if (csrc->mc_ki[csrc->mc_top] == 0 && IS_BRANCH(csrc->mc_pg[csrc->mc_top])) {
4887
5161
unsigned int snum = csrc->mc_snum;
4900
5174
data.mv_size = NODEDSZ(srcnode);
4901
5175
data.mv_data = NODEDATA(srcnode);
5177
if (IS_BRANCH(cdst->mc_pg[cdst->mc_top]) && cdst->mc_ki[cdst->mc_top] == 0) {
5178
unsigned int snum = cdst->mc_snum;
5181
/* must find the lowest key below dst */
5182
mdb_page_search_root(cdst, NULL, 0);
5183
s2 = NODEPTR(cdst->mc_pg[cdst->mc_top], 0);
5184
bkey.mv_size = NODEKSZ(s2);
5185
bkey.mv_data = NODEKEY(s2);
5186
cdst->mc_snum = snum--;
5187
cdst->mc_top = snum;
5188
rc = mdb_update_key(cdst->mc_pg[cdst->mc_top], 0, &bkey);
4903
5191
DPRINTF("moving %s node %u [%s] on page %zu to node %u on page %zu",
4904
5192
IS_LEAF(csrc->mc_pg[csrc->mc_top]) ? "leaf" : "branch",
4905
5193
csrc->mc_ki[csrc->mc_top],
4910
5198
/* Add the node to the destination page.
4912
rc = mdb_node_add(cdst, cdst->mc_ki[cdst->mc_top], &key, &data, NODEPGNO(srcnode),
5200
rc = mdb_node_add(cdst, cdst->mc_ki[cdst->mc_top], &key, &data, srcpg, flags);
4914
5201
if (rc != MDB_SUCCESS)
5034
5321
for (i = 0; i < NUMKEYS(csrc->mc_pg[csrc->mc_top]); i++, j++) {
5035
5322
srcnode = NODEPTR(csrc->mc_pg[csrc->mc_top], i);
5323
if (i == 0 && IS_BRANCH(csrc->mc_pg[csrc->mc_top])) {
5324
unsigned int snum = csrc->mc_snum;
5326
/* must find the lowest key below src */
5327
mdb_page_search_root(csrc, NULL, 0);
5328
s2 = NODEPTR(csrc->mc_pg[csrc->mc_top], 0);
5329
key.mv_size = NODEKSZ(s2);
5330
key.mv_data = NODEKEY(s2);
5331
csrc->mc_snum = snum--;
5332
csrc->mc_top = snum;
5334
key.mv_size = srcnode->mn_ksize;
5335
key.mv_data = NODEKEY(srcnode);
5037
key.mv_size = srcnode->mn_ksize;
5038
key.mv_data = NODEKEY(srcnode);
5039
5338
data.mv_size = NODEDSZ(srcnode);
5040
5339
data.mv_data = NODEDATA(srcnode);
5041
5340
rc = mdb_node_add(cdst, j, &key, &data, NODEPGNO(srcnode), srcnode->mn_flags);
5073
5372
for (m2 = csrc->mc_txn->mt_cursors[dbi]; m2; m2=m2->mc_next) {
5074
if (m2 == csrc) continue;
5075
5373
if (csrc->mc_flags & C_SUB)
5076
5374
m3 = &m2->mc_xcursor->mx_cursor;
5377
if (m3 == csrc) continue;
5378
if (m3->mc_snum < csrc->mc_snum) continue;
5079
5379
if (m3->mc_pg[csrc->mc_top] == csrc->mc_pg[csrc->mc_top]) {
5080
5380
m3->mc_pg[csrc->mc_top] = mp;
5081
5381
m3->mc_ki[csrc->mc_top] += nkeys;
5277
5579
memcpy(&pg, NODEDATA(leaf), sizeof(pg));
5278
5580
ovpages = OVPAGES(NODEDSZ(leaf), mc->mc_txn->mt_env->me_psize);
5581
mc->mc_db->md_overflow_pages -= ovpages;
5279
5582
for (i=0; i<ovpages; i++) {
5280
5583
DPRINTF("freed ov page %zu", pg);
5281
5584
mdb_midl_append(&mc->mc_txn->mt_free_pgs, pg);
5348
5651
unsigned int nflags)
5350
5653
unsigned int flags;
5351
int rc = MDB_SUCCESS, ins_new = 0, new_root = 0;
5654
int rc = MDB_SUCCESS, ins_new = 0, new_root = 0, newpos = 1;
5352
5655
indx_t newindx;
5353
5656
pgno_t pgno = 0;
5354
5657
unsigned int i, j, split_indx, nkeys, pmax;
5367
5670
IS_LEAF(mp) ? "leaf" : "branch", mp->mp_pgno,
5368
5671
DKEY(newkey), mc->mc_ki[mc->mc_top]);
5673
/* Create a right sibling. */
5674
if ((rp = mdb_page_new(mc, mp->mp_flags, 1)) == NULL)
5676
DPRINTF("new right sibling: page %zu", rp->mp_pgno);
5370
5678
if (mc->mc_snum < 2) {
5371
5679
if ((pp = mdb_page_new(mc, P_BRANCH, 1)) == NULL)
5397
5705
DPRINTF("parent branch page is %zu", mc->mc_pg[ptop]->mp_pgno);
5400
/* Create a right sibling. */
5401
if ((rp = mdb_page_new(mc, mp->mp_flags, 1)) == NULL)
5403
DPRINTF("new right sibling: page %zu", rp->mp_pgno);
5405
5708
mdb_cursor_copy(mc, &mn);
5406
5709
mn.mc_pg[mn.mc_top] = rp;
5407
5710
mn.mc_ki[ptop] = mc->mc_ki[ptop]+1;
5463
5766
/* For leaf pages, check the split point based on what
5464
* fits where, since otherwise add_node can fail.
5767
* fits where, since otherwise mdb_node_add can fail.
5769
* This check is only needed when the data items are
5770
* relatively large, such that being off by one will
5771
* make the difference between success or failure.
5772
* When the size of the data items is much smaller than
5773
* one-half of a page, this check is irrelevant.
5466
5775
if (IS_LEAF(mp)) {
5467
5776
unsigned int psize, nsize;
5468
5777
/* Maximum free space in an empty page */
5469
5778
pmax = mc->mc_txn->mt_env->me_psize - PAGEHDRSZ;
5470
5779
nsize = mdb_leaf_size(mc->mc_txn->mt_env, newkey, newdata);
5471
if (newindx < split_indx) {
5473
for (i=0; i<split_indx; i++) {
5474
node = NODEPTR(mp, i);
5475
psize += NODESIZE + NODEKSZ(node) + sizeof(indx_t);
5476
if (F_ISSET(node->mn_flags, F_BIGDATA))
5477
psize += sizeof(pgno_t);
5479
psize += NODEDSZ(node);
5780
if ((nkeys < 20) || (nsize > pmax/4)) {
5781
if (newindx <= split_indx) {
5784
for (i=0; i<split_indx; i++) {
5785
node = NODEPTR(mp, i);
5786
psize += NODESIZE + NODEKSZ(node) + sizeof(indx_t);
5787
if (F_ISSET(node->mn_flags, F_BIGDATA))
5788
psize += sizeof(pgno_t);
5790
psize += NODEDSZ(node);
5793
if (i == split_indx - 1 && newindx == split_indx)
5488
for (i=nkeys-1; i>=split_indx; i--) {
5489
node = NODEPTR(mp, i);
5490
psize += NODESIZE + NODEKSZ(node) + sizeof(indx_t);
5491
if (F_ISSET(node->mn_flags, F_BIGDATA))
5492
psize += sizeof(pgno_t);
5494
psize += NODEDSZ(node);
5802
for (i=nkeys-1; i>=split_indx; i--) {
5803
node = NODEPTR(mp, i);
5804
psize += NODESIZE + NODEKSZ(node) + sizeof(indx_t);
5805
if (F_ISSET(node->mn_flags, F_BIGDATA))
5806
psize += sizeof(pgno_t);
5808
psize += NODEDSZ(node);
5504
5819
/* First find the separating key between the split pages.
5820
* The case where newindx == split_indx is ambiguous; the
5821
* new item could go to the new page or stay on the original
5822
* page. If newpos == 1 it goes to the new page.
5506
if (newindx == split_indx) {
5824
if (newindx == split_indx && newpos) {
5507
5825
sepkey.mv_size = newkey->mv_size;
5508
5826
sepkey.mv_data = newkey->mv_data;
5541
5859
if (nflags & MDB_APPEND) {
5542
5860
mc->mc_pg[mc->mc_top] = rp;
5543
5861
mc->mc_ki[mc->mc_top] = 0;
5544
return mdb_node_add(mc, 0, newkey, newdata, newpgno, nflags);
5862
rc = mdb_node_add(mc, 0, newkey, newdata, newpgno, nflags);
5546
5867
if (IS_LEAF2(rp)) {
5563
5884
if (i == split_indx) {
5564
5885
/* Insert in right sibling. */
5565
5886
/* Reset insert index for right sibling. */
5566
j = (i == newindx && ins_new);
5567
mc->mc_pg[mc->mc_top] = rp;
5887
if (i != newindx || (newpos ^ ins_new)) {
5889
mc->mc_pg[mc->mc_top] = rp;
5570
5893
if (i == newindx && !ins_new) {
5615
5939
mc->mc_txn->mt_env->me_psize - copy->mp_upper);
5617
5941
/* reset back to original page */
5618
if (newindx < split_indx) {
5942
if (newindx < split_indx || (!newpos && newindx == split_indx)) {
5619
5943
mc->mc_pg[mc->mc_top] = mp;
5620
5944
if (nflags & MDB_RESERVE) {
5621
5945
node = NODEPTR(mp, mc->mc_ki[mc->mc_top]);
5645
5970
if (!(m3->mc_flags & C_INITIALIZED))
5647
5972
if (new_root) {
5648
5974
/* root split */
5649
for (i=m3->mc_top; i>0; i--) {
5650
m3->mc_ki[i+1] = m3->mc_ki[i];
5651
m3->mc_pg[i+1] = m3->mc_pg[i];
5975
for (k=m3->mc_top; k>=0; k--) {
5976
m3->mc_ki[k+1] = m3->mc_ki[k];
5977
m3->mc_pg[k+1] = m3->mc_pg[k];
5653
5979
m3->mc_ki[0] = mc->mc_ki[0];
5654
5980
m3->mc_pg[0] = mc->mc_pg[0];