2
* See the file LICENSE for redistribution information.
4
* Copyright (c) 1999-2002
5
* Sleepycat Software. All rights reserved.
11
static const char revid[] = "$Id$";
14
#ifndef NO_SYSTEM_INCLUDES
15
#include <sys/types.h>
21
#include "dbinc/db_page.h"
22
#include "dbinc/db_shash.h"
23
#include "dbinc/btree.h"
24
#include "dbinc/lock.h"
25
#include "dbinc/log.h"
26
#include "dbinc/qam.h"
28
static int __qam_bulk __P((DBC *, DBT *, u_int32_t));
29
static int __qam_c_close __P((DBC *, db_pgno_t, int *));
30
static int __qam_c_del __P((DBC *));
31
static int __qam_c_destroy __P((DBC *));
32
static int __qam_c_get __P((DBC *, DBT *, DBT *, u_int32_t, db_pgno_t *));
33
static int __qam_c_put __P((DBC *, DBT *, DBT *, u_int32_t, db_pgno_t *));
34
static int __qam_consume __P((DBC *, QMETA *, db_recno_t));
35
static int __qam_getno __P((DB *, const DBT *, db_recno_t *));
39
* Position a queued access method cursor at a record. This returns
40
* the page locked. *exactp will be set if the record is valid.
41
* PUBLIC: int __qam_position
42
* PUBLIC: __P((DBC *, db_recno_t *, qam_position_mode, int *));
45
__qam_position(dbc, recnop, mode, exactp)
46
DBC *dbc; /* open cursor */
47
db_recno_t *recnop; /* pointer to recno to find */
48
qam_position_mode mode;/* locking: read or write */
49
int *exactp; /* indicate if it was found */
58
cp = (QUEUE_CURSOR *)dbc->internal;
60
/* Fetch the page for this recno. */
61
pg = QAM_RECNO_PAGE(dbp, *recnop);
63
if ((ret = __db_lget(dbc, 0, pg, mode == QAM_READ ?
64
DB_LOCK_READ : DB_LOCK_WRITE, 0, &cp->lock)) != 0)
68
if ((ret = __qam_fget(dbp, &pg,
69
mode == QAM_WRITE ? DB_MPOOL_CREATE : 0, &cp->page)) != 0) {
70
/* We did not fetch it, we can release the lock. */
71
(void)__LPUT(dbc, cp->lock);
72
if (mode != QAM_WRITE &&
73
(ret == DB_PAGE_NOTFOUND || ret == ENOENT))
78
cp->indx = QAM_RECNO_INDEX(dbp, pg, *recnop);
80
if (PGNO(cp->page) == 0) {
81
if (F_ISSET(dbp, DB_AM_RDONLY)) {
86
TYPE(cp->page) = P_QAMDATA;
89
qp = QAM_GET_RECORD(dbp, cp->page, cp->indx);
90
*exactp = F_ISSET(qp, QAM_VALID) ? 1 : 0;
97
* Put an item on a queue page. Copy the data to the page and set the
98
* VALID and SET bits. If logging and the record was previously set,
99
* log that data, otherwise just log the new data.
101
* pagep must be write locked
103
* PUBLIC: int __qam_pitem
104
* PUBLIC: __P((DBC *, QPAGE *, u_int32_t, db_recno_t, DBT *));
107
__qam_pitem(dbc, pagep, indx, recno, data)
115
DBT olddata, pdata, *datap;
125
t = (QUEUE *)dbp->q_internal;
127
if (data->size > t->re_len)
130
qp = QAM_GET_RECORD(dbp, pagep, indx);
134
if (F_ISSET(data, DB_DBT_PARTIAL)) {
135
if (data->doff + data->dlen > t->re_len) {
136
alloced = data->dlen;
139
if (data->size != data->dlen) {
140
len_err: __db_err(dbp->dbenv,
141
"Length improper for fixed length record %lu",
142
(u_long)(alloced ? alloced : data->size));
145
if (data->size == t->re_len)
149
* If we are logging, then we have to build the record
150
* first, otherwise, we can simply drop the change
151
* directly on the page. After this clause, make
152
* sure that datap and p are set up correctly so that
153
* copying datap into p does the right thing.
155
* Note, I am changing this so that if the existing
156
* record is not valid, we create a complete record
157
* to log so that both this and the recovery code is simpler.
160
if (DBC_LOGGING(dbc) || !F_ISSET(qp, QAM_VALID)) {
162
memset(datap, 0, sizeof(*datap));
164
if ((ret = __os_malloc(dbp->dbenv,
165
t->re_len, &datap->data)) != 0)
168
datap->size = t->re_len;
171
* Construct the record if it's valid, otherwise set it
172
* all to the pad character.
175
if (F_ISSET(qp, QAM_VALID))
176
memcpy(dest, p, t->re_len);
178
memset(dest, t->re_pad, t->re_len);
181
memcpy(dest, data->data, data->size);
189
if (DBC_LOGGING(dbc)) {
191
if (F_ISSET(qp, QAM_SET)) {
192
olddata.data = qp->data;
193
olddata.size = t->re_len;
195
if ((ret = __qam_add_log(dbp, dbc->txn, &LSN(pagep),
196
0, &LSN(pagep), pagep->pgno,
197
indx, recno, datap, qp->flags,
198
olddata.size == 0 ? NULL : &olddata)) != 0)
202
F_SET(qp, QAM_VALID | QAM_SET);
203
memcpy(p, datap->data, datap->size);
204
if (!F_ISSET(data, DB_DBT_PARTIAL))
205
memset(p + datap->size, t->re_pad, t->re_len - datap->size);
208
__os_free(dbp->dbenv, datap->data);
214
* Cursor put for queued access method.
215
* BEFORE and AFTER cannot be specified.
218
__qam_c_put(dbc, key, data, flags, pgnop)
230
db_recno_t new_cur, new_first;
232
int exact, ret, t_ret;
237
*pgnop = PGNO_INVALID;
239
cp = (QUEUE_CURSOR *)dbc->internal;
244
if ((ret = __qam_getno(dbp, key, &cp->recno)) != 0)
250
/* The interface shouldn't let anything else through. */
252
return (__db_ferr(dbp->dbenv, "__qam_c_put", flags));
255
/* Write lock the record. */
256
if ((ret = __db_lget(dbc,
257
0, cp->recno, DB_LOCK_WRITE, DB_LOCK_RECORD, &lock)) != 0)
260
if ((ret = __qam_position(dbc,
261
&cp->recno, QAM_WRITE, &exact)) != 0) {
262
/* We could not get the page, we can release the record lock. */
267
/* Put the item on the page. */
268
ret = __qam_pitem(dbc, (QPAGE *)cp->page, cp->indx, cp->recno, data);
270
/* Doing record locking, release the page lock */
271
if ((t_ret = __LPUT(dbc, cp->lock)) != 0 && ret == 0)
273
if ((t_ret = __qam_fput(
274
dbp, cp->pgno, cp->page, DB_MPOOL_DIRTY)) != 0 && ret == 0)
278
cp->lock_mode = DB_LOCK_WRITE;
282
/* We may need to reset the head or tail of the queue. */
283
pg = ((QUEUE *)dbp->q_internal)->q_meta;
286
* Get the meta page first, we don't want to write lock it while
289
if ((ret = mpf->get(mpf, &pg, 0, &meta)) != 0)
291
if ((ret = __db_lget(dbc, 0, pg, DB_LOCK_WRITE, 0, &lock)) != 0) {
292
(void)mpf->put(mpf, meta, 0);
297
new_cur = new_first = 0;
300
* If the put address is outside the queue, adjust the head and
301
* tail of the queue. If the order is inverted we move
302
* the one which is closer. The first case is when the
303
* queue is empty, move first and current to where the new
307
if (meta->first_recno == meta->cur_recno) {
308
new_first = cp->recno;
309
new_cur = cp->recno + 1;
310
if (new_cur == RECNO_OOB)
312
opcode |= QAM_SETFIRST;
313
opcode |= QAM_SETCUR;
315
if (QAM_BEFORE_FIRST(meta, cp->recno) &&
316
(meta->first_recno <= meta->cur_recno ||
317
meta->first_recno - cp->recno <
318
cp->recno - meta->cur_recno)) {
319
new_first = cp->recno;
320
opcode |= QAM_SETFIRST;
323
if (meta->cur_recno == cp->recno ||
324
(QAM_AFTER_CURRENT(meta, cp->recno) &&
325
(meta->first_recno <= meta->cur_recno ||
326
cp->recno - meta->cur_recno <=
327
meta->first_recno - cp->recno))) {
328
new_cur = cp->recno + 1;
329
if (new_cur == RECNO_OOB)
331
opcode |= QAM_SETCUR;
335
if (opcode != 0 && DBC_LOGGING(dbc)) {
336
ret = __qam_mvptr_log(dbp, dbc->txn, &meta->dbmeta.lsn,
337
0, opcode, meta->first_recno, new_first,
338
meta->cur_recno, new_cur, &meta->dbmeta.lsn, PGNO_BASE_MD);
343
if (opcode & QAM_SETCUR)
344
meta->cur_recno = new_cur;
345
if (opcode & QAM_SETFIRST)
346
meta->first_recno = new_first;
348
if ((t_ret = mpf->put(
349
mpf, meta, opcode != 0 ? DB_MPOOL_DIRTY : 0)) != 0 && ret == 0)
352
/* Don't hold the meta page long term. */
353
if ((t_ret = __LPUT(dbc, lock)) != 0 && ret == 0)
360
* Perform a put(DB_APPEND) in queue.
362
* PUBLIC: int __qam_append __P((DBC *, DBT *, DBT *));
365
__qam_append(dbc, key, data)
382
cp = (QUEUE_CURSOR *)dbc->internal;
384
pg = ((QUEUE *)dbp->q_internal)->q_meta;
386
* Get the meta page first, we don't want to write lock it while
389
if ((ret = mpf->get(mpf, &pg, 0, &meta)) != 0)
391
/* Write lock the meta page. */
392
if ((ret = __db_lget(dbc, 0, pg, DB_LOCK_WRITE, 0, &lock)) != 0) {
393
(void)mpf->put(mpf, meta, 0);
397
/* Get the next record number. */
398
recno = meta->cur_recno;
400
if (meta->cur_recno == RECNO_OOB)
402
if (meta->cur_recno == meta->first_recno) {
404
if (meta->cur_recno == RECNO_OOB)
406
(void)__LPUT(dbc, lock);
411
if (QAM_BEFORE_FIRST(meta, recno))
412
meta->first_recno = recno;
414
/* Lock the record and release meta page lock. */
415
if ((ret = __db_lget(dbc, LCK_COUPLE_ALWAYS,
416
recno, DB_LOCK_WRITE, DB_LOCK_RECORD, &lock)) != 0) {
417
(void)__LPUT(dbc, lock);
422
* The application may modify the data based on the selected record
425
if (dbc->dbp->db_append_recno != NULL &&
426
(ret = dbc->dbp->db_append_recno(dbc->dbp, data, recno)) != 0) {
427
(void)__LPUT(dbc, lock);
432
cp->lock_mode = DB_LOCK_WRITE;
434
pg = QAM_RECNO_PAGE(dbp, recno);
436
/* Fetch and write lock the data page. */
437
if ((ret = __db_lget(dbc, 0, pg, DB_LOCK_WRITE, 0, &lock)) != 0)
439
if ((ret = __qam_fget(dbp, &pg, DB_MPOOL_CREATE, &page)) != 0) {
440
/* We did not fetch it, we can release the lock. */
441
(void)__LPUT(dbc, lock);
445
/* See if this is a new page. */
446
if (page->pgno == 0) {
448
page->type = P_QAMDATA;
451
/* Put the item on the page and log it. */
452
ret = __qam_pitem(dbc, page,
453
QAM_RECNO_INDEX(dbp, pg, recno), recno, data);
455
/* Doing record locking, release the page lock */
456
if ((t_ret = __LPUT(dbc, lock)) != 0 && ret == 0)
460
= __qam_fput(dbp, pg, page, DB_MPOOL_DIRTY)) != 0 && ret == 0)
463
/* Return the record number to the user. */
465
ret = __db_retcopy(dbp->dbenv, key,
466
&recno, sizeof(recno), &dbc->rkey->data, &dbc->rkey->ulen);
468
/* Position the cursor on this record. */
471
/* See if we are leaving the extent. */
472
qp = (QUEUE *) dbp->q_internal;
473
if (qp->page_ext != 0 &&
474
(recno % (qp->page_ext * qp->rec_page) == 0 ||
475
recno == UINT32_T_MAX)) {
476
if ((ret = __db_lget(dbc,
477
0, ((QUEUE *)dbp->q_internal)->q_meta,
478
DB_LOCK_WRITE, 0, &lock)) != 0)
480
if (!QAM_AFTER_CURRENT(meta, recno))
481
ret = __qam_fclose(dbp, pg);
482
(void)__LPUT(dbc, lock);
486
/* Release the meta page. */
487
if ((t_ret = mpf->put(mpf, meta, DB_MPOOL_DIRTY)) != 0 && ret == 0)
495
* Qam cursor->am_del function
511
int exact, ret, t_ret;
515
cp = (QUEUE_CURSOR *)dbc->internal;
517
pg = ((QUEUE *)dbp->q_internal)->q_meta;
519
* Get the meta page first, we don't want to write lock it while
522
if ((ret = mpf->get(mpf, &pg, 0, &meta)) != 0)
524
/* Write lock the meta page. */
525
if ((ret = __db_lget(dbc, 0, pg, DB_LOCK_READ, 0, &lock)) != 0) {
526
(void)mpf->put(mpf, meta, 0);
530
if (QAM_NOT_VALID(meta, cp->recno))
533
first = meta->first_recno;
535
/* Don't hold the meta page long term. */
536
if ((t_ret = __LPUT(dbc, lock)) != 0 && ret == 0)
542
if ((ret = __db_lget(dbc,
543
0, cp->recno, DB_LOCK_WRITE, DB_LOCK_RECORD, &lock)) != 0)
546
cp->lock_mode = DB_LOCK_WRITE;
547
/* Find the record ; delete only deletes exact matches. */
548
if ((ret = __qam_position(dbc,
549
&cp->recno, QAM_WRITE, &exact)) != 0) {
559
qp = QAM_GET_RECORD(dbp, pagep, cp->indx);
561
if (DBC_LOGGING(dbc)) {
562
if (((QUEUE *)dbp->q_internal)->page_ext == 0 ||
563
((QUEUE *)dbp->q_internal)->re_len == 0) {
564
if ((ret = __qam_del_log(dbp,
565
dbc->txn, &LSN(pagep), 0, &LSN(pagep),
566
pagep->pgno, cp->indx, cp->recno)) != 0)
569
data.size = ((QUEUE *)dbp->q_internal)->re_len;
570
data.data = qp->data;
571
if ((ret = __qam_delext_log(dbp,
572
dbc->txn, &LSN(pagep), 0, &LSN(pagep),
573
pagep->pgno, cp->indx, cp->recno, &data)) != 0)
578
F_CLR(qp, QAM_VALID);
580
if (cp->recno == first) {
581
pg = ((QUEUE *)dbp->q_internal)->q_meta;
583
__db_lget(dbc, 0, pg, DB_LOCK_WRITE, 0, &lock)) != 0)
585
ret = __qam_consume(dbc, meta, first);
586
if ((t_ret = __LPUT(dbc, lock)) != 0 && ret == 0)
591
if ((t_ret = mpf->put(mpf, meta, 0)) != 0 && ret == 0)
593
if (cp->page != NULL && (t_ret = __qam_fput(dbp, cp->pgno,
594
cp->page, ret == 0 ? DB_MPOOL_DIRTY : 0)) != 0 && ret == 0)
598
/* Doing record locking, release the page lock */
599
if ((t_ret = __LPUT(dbc, cp->lock)) != 0 && ret == 0)
612
* Queue cursor->c_get function.
615
__qam_c_get(dbc, key, data, flags, pgnop)
625
DB_LOCK lock, pglock, metalock;
632
db_lockmode_t lock_mode;
635
qam_position_mode mode;
636
int exact, is_first, locked, ret, t_ret, wait, with_delete;
637
int put_mode, meta_dirty, retrying;
642
cp = (QUEUE_CURSOR *)dbc->internal;
649
lock_mode = DB_LOCK_READ;
656
if (F_ISSET(dbc, DBC_RMW)) {
657
lock_mode = DB_LOCK_WRITE;
661
if (flags == DB_CONSUME_WAIT) {
665
if (flags == DB_CONSUME) {
666
if ((ret = __db_check_txn(dbp, dbc->txn, dbc->locker, 0)) != 0)
671
lock_mode = DB_LOCK_WRITE;
675
DEBUG_LREAD(dbc, dbc->txn, "qam_c_get",
676
flags == DB_SET || flags == DB_SET_RANGE ? key : NULL, NULL, flags);
678
/* Make lint and friends happy. */
684
t = (QUEUE *)dbp->q_internal;
688
* Get the meta page first, we don't want to write lock it while
689
* trying to pin it. This is because someone my have it pinned
692
if ((ret = mpf->get(mpf, &metapno, 0, &meta)) != 0)
694
if ((ret = __db_lget(dbc, 0, metapno, lock_mode, 0, &metalock)) != 0)
700
/* Release any previous lock if not in a transaction. */
701
(void)__TLPUT(dbc, cp->lock);
703
retry: /* Update the record number. */
713
if (cp->recno != RECNO_OOB) {
715
/* Wrap around, skipping zero. */
716
if (cp->recno == RECNO_OOB)
725
/* get the first record number */
726
cp->recno = first = meta->first_recno;
731
if (cp->recno != RECNO_OOB) {
732
if (QAM_BEFORE_FIRST(meta, cp->recno) ||
733
cp->recno == meta->first_recno) {
738
/* Wrap around, skipping zero. */
739
if (cp->recno == RECNO_OOB)
745
if (meta->first_recno == meta->cur_recno) {
749
cp->recno = meta->cur_recno - 1;
750
if (cp->recno == RECNO_OOB)
756
case DB_GET_BOTH_RANGE:
757
if ((ret = __qam_getno(dbp, key, &cp->recno)) != 0)
761
ret = __db_unknown_flag(dbenv, "__qam_c_get", flags);
766
* Check to see if we are out of data. Current points to
767
* the first free slot.
769
if (cp->recno == meta->cur_recno ||
770
QAM_AFTER_CURRENT(meta, cp->recno)) {
776
* If first is not set, then we skipped a
777
* locked record, go back and find it.
778
* If we find a locked record again
785
if (CDB_LOCKING(dbenv)) {
786
if ((ret = dbenv->lock_get(
788
DB_LOCK_SWITCH, &dbc->lock_dbt,
789
DB_LOCK_WAIT, &dbc->mylock)) != 0)
791
if ((ret = dbenv->lock_get(
793
DB_LOCK_UPGRADE, &dbc->lock_dbt,
794
DB_LOCK_WRITE, &dbc->mylock)) != 0)
799
* Wait for someone to update the meta page.
800
* This will probably mean there is something
801
* in the queue. We then go back up and
805
if ((ret = __db_lget( dbc,
806
0, metapno, lock_mode, 0, &metalock)) != 0)
809
if (cp->recno != RECNO_OOB &&
810
!QAM_AFTER_CURRENT(meta, cp->recno))
813
if ((ret = __db_lget(dbc, 0, metapno,
814
DB_LOCK_WAIT, DB_LOCK_SWITCH, &metalock)) != 0)
816
if ((ret = dbenv->lock_get(dbenv, dbc->locker,
817
DB_LOCK_UPGRADE, &dbc->lock_dbt, DB_LOCK_WRITE,
827
/* Don't hold the meta page long term. */
829
if ((ret = __LPUT(dbc, metalock)) != 0)
834
/* Lock the record. */
835
if ((ret = __db_lget(dbc, 0, cp->recno, lock_mode,
836
(with_delete && !retrying) ?
837
DB_LOCK_NOWAIT | DB_LOCK_RECORD : DB_LOCK_RECORD,
838
&lock)) == DB_LOCK_NOTGRANTED && with_delete) {
841
dbc->txn, "Queue S", 0, "%x %d %d %d",
842
dbc->locker, cp->recno, first, meta->first_recno);
846
__db_lget(dbc, 0, metapno, lock_mode, 0, &metalock)) != 0)
856
* In the DB_FIRST or DB_LAST cases we must wait and then start over
857
* since the first/last may have moved while we slept.
858
* We release our locks and try again.
860
if ((!with_delete && is_first) || flags == DB_LAST) {
862
__db_lget(dbc, 0, metapno, lock_mode, 0, &metalock)) != 0)
865
(is_first ? meta->first_recno : (meta->cur_recno - 1))) {
872
/* Don't hold the meta page long term. */
873
if ((ret = __LPUT(dbc, metalock)) != 0)
877
/* Position the cursor on the record. */
878
if ((ret = __qam_position(dbc, &cp->recno, mode, &exact)) != 0) {
879
/* We cannot get the page, release the record lock. */
880
(void)__LPUT(dbc, lock);
887
cp->lock_mode = lock_mode;
890
if (flags == DB_NEXT || flags == DB_NEXT_NODUP ||
891
flags == DB_PREV || flags == DB_PREV_NODUP ||
893
/* Release locks and try again. */
895
(void)__qam_fput(dbp, cp->pgno, pg, 0);
896
cp->page = pg = NULL;
897
(void)__LPUT(dbc, pglock);
898
(void)__LPUT(dbc, cp->lock);
899
if (flags == DB_LAST)
906
/* this is for the SET and SET_RANGE cases */
911
/* Return the key if the user didn't give us one. */
913
if (flags != DB_GET_BOTH && flags != DB_GET_BOTH_RANGE &&
914
flags != DB_SET && flags != DB_SET_RANGE &&
915
(ret = __db_retcopy(dbp->dbenv,
916
key, &cp->recno, sizeof(cp->recno),
917
&dbc->rkey->data, &dbc->rkey->ulen)) != 0)
919
F_SET(key, DB_DBT_ISSET);
922
qp = QAM_GET_RECORD(dbp, pg, cp->indx);
924
/* Return the data item. */
925
if (flags == DB_GET_BOTH || flags == DB_GET_BOTH_RANGE) {
930
tmp.size = t->re_len;
931
if ((ret = __bam_defcmp(dbp, data, &tmp)) != 0) {
937
!F_ISSET(dbc, DBC_MULTIPLE|DBC_MULTIPLE_KEY) &&
938
(ret = __db_retcopy(dbp->dbenv, data,
939
qp->data, t->re_len, &dbc->rdata->data, &dbc->rdata->ulen)) != 0)
943
F_SET(data, DB_DBT_ISSET);
945
/* Finally, if we are doing DB_CONSUME mark the record. */
948
* Assert that we're not a secondary index. Doing a DB_CONSUME
949
* on a secondary makes very little sense, since one can't
950
* DB_APPEND there; attempting one should be forbidden by
953
DB_ASSERT(!F_ISSET(dbp, DB_AM_SECONDARY));
956
* Check and see if we *have* any secondary indices.
957
* If we do, we're a primary, so call __db_c_del_primary
958
* to delete the references to the item we're about to
961
* Note that we work on a duplicated cursor, since the
962
* __db_ret work has already been done, so it's not safe
963
* to perform any additional ops on this cursor.
965
if (LIST_FIRST(&dbp->s_secondaries) != NULL) {
966
if ((ret = __db_c_idup(dbc,
967
&dbcdup, DB_POSITIONI)) != 0)
970
if ((ret = __db_c_del_primary(dbcdup)) != 0) {
972
* The __db_c_del_primary return is more
975
(void)dbcdup->c_close(dbcdup);
979
if ((ret = dbcdup->c_close(dbcdup)) != 0)
983
if (DBC_LOGGING(dbc)) {
984
if (t->page_ext == 0 || t->re_len == 0) {
985
if ((ret = __qam_del_log(dbp, dbc->txn,
986
&LSN(pg), 0, &LSN(pg),
987
pg->pgno, cp->indx, cp->recno)) != 0)
991
tmp.size = t->re_len;
992
if ((ret = __qam_delext_log(dbp,
993
dbc->txn, &LSN(pg), 0, &LSN(pg),
994
pg->pgno, cp->indx, cp->recno, &tmp)) != 0)
999
F_CLR(qp, QAM_VALID);
1000
put_mode = DB_MPOOL_DIRTY;
1002
if ((ret = __LPUT(dbc, pglock)) != 0)
1006
* Now we need to update the metapage
1007
* first pointer. If we have deleted
1008
* the record that is pointed to by
1009
* first_recno then we move it as far
1010
* forward as we can without blocking.
1011
* The metapage lock must be held for
1012
* the whole scan otherwise someone could
1013
* do a random insert behind where we are
1017
if (locked == 0 && (ret = __db_lget(
1018
dbc, 0, metapno, lock_mode, 0, &metalock)) != 0)
1024
dbc->txn, "Queue D", 0, "%x %d %d %d",
1025
dbc->locker, cp->recno, first, meta->first_recno);
1028
* See if we deleted the "first" record. If
1029
* first is zero then we skipped something,
1030
* see if first_recno has been move passed
1031
* that to the record that we deleted.
1035
if (first != meta->first_recno)
1038
if ((ret = __qam_consume(dbc, meta, first)) != 0)
1043
err1: if (cp->page != NULL) {
1044
t_ret = __qam_fput(dbp, cp->pgno, cp->page, put_mode);
1048
/* Doing record locking, release the page lock */
1049
t_ret = __LPUT(dbc, pglock);
1057
/* release the meta page */
1058
t_ret = mpf->put(mpf, meta, meta_dirty ? DB_MPOOL_DIRTY : 0);
1063
/* Don't hold the meta page long term. */
1065
t_ret = __LPUT(dbc, metalock);
1067
DB_ASSERT(!LOCK_ISSET(metalock));
1070
* There is no need to keep the record locked if we are
1071
* not in a transaction.
1074
t_ret = __TLPUT(dbc, cp->lock);
1076
return (ret ? ret : t_ret);
1080
* __qam_consume -- try to reset the head of the queue.
1085
__qam_consume(dbc, meta, first)
1091
DB_LOCK lock, save_lock;
1094
db_indx_t save_indx;
1095
db_pgno_t save_page;
1096
db_recno_t current, save_recno;
1097
u_int32_t rec_extent;
1098
int exact, put_mode, ret, t_ret, wrapped;
1102
cp = (QUEUE_CURSOR *)dbc->internal;
1103
put_mode = DB_MPOOL_DIRTY;
1106
save_page = cp->pgno;
1107
save_indx = cp->indx;
1108
save_recno = cp->recno;
1109
save_lock = cp->lock;
1112
* If we skipped some deleted records, we need to
1113
* reposition on the first one. Get a lock
1114
* in case someone is trying to put it back.
1116
if (first != cp->recno) {
1117
ret = __db_lget(dbc, 0, first, DB_LOCK_READ,
1118
DB_LOCK_NOWAIT | DB_LOCK_RECORD, &lock);
1119
if (ret == DB_LOCK_NOTGRANTED) {
1126
__qam_fput(dbp, cp->pgno, cp->page, put_mode)) != 0)
1130
if ((ret = __qam_position(dbc,
1131
&first, QAM_READ, &exact)) != 0 || exact != 0) {
1132
(void)__LPUT(dbc, lock);
1135
if ((ret =__LPUT(dbc, lock)) != 0)
1137
if ((ret = __LPUT(dbc, cp->lock)) != 0)
1141
current = meta->cur_recno;
1143
if (first > current)
1145
rec_extent = meta->page_ext * meta->rec_page;
1147
/* Loop until we find a record or hit current */
1150
* Check to see if we are moving off the extent
1151
* and remove the extent.
1152
* If we are moving off a page we need to
1153
* get rid of the buffer.
1154
* Wait for the lagging readers to move off the
1157
if (cp->page != NULL && rec_extent != 0 &&
1158
((exact = (first % rec_extent == 0)) ||
1159
first % meta->rec_page == 0 ||
1160
first == UINT32_T_MAX)) {
1161
if (exact == 1 && (ret = __db_lget(dbc,
1162
0, cp->pgno, DB_LOCK_WRITE, 0, &cp->lock)) != 0)
1166
__db_logmsg(dbp->dbenv,
1167
dbc->txn, "Queue R", 0, "%x %d %d %d",
1168
dbc->locker, cp->pgno, first, meta->first_recno);
1170
put_mode |= DB_MPOOL_DISCARD;
1171
if ((ret = __qam_fput(dbp,
1172
cp->pgno, cp->page, put_mode)) != 0)
1177
ret = __qam_fremove(dbp, cp->pgno);
1178
t_ret = __LPUT(dbc, cp->lock);
1186
} else if (cp->page != NULL && (ret =
1187
__qam_fput(dbp, cp->pgno, cp->page, put_mode)) != 0)
1191
if (first == RECNO_OOB) {
1197
* LOOP EXIT when we come move to the current
1200
if (!wrapped && first >= current)
1203
ret = __db_lget(dbc, 0, first, DB_LOCK_READ,
1204
DB_LOCK_NOWAIT | DB_LOCK_RECORD, &lock);
1205
if (ret == DB_LOCK_NOTGRANTED) {
1212
if ((ret = __qam_position(dbc,
1213
&first, QAM_READ, &exact)) != 0) {
1214
(void)__LPUT(dbc, lock);
1218
if ((ret =__LPUT(dbc, lock)) != 0 ||
1219
(ret = __LPUT(dbc, cp->lock)) != 0 || exact) {
1220
if ((t_ret = __qam_fput(dbp, cp->pgno,
1221
cp->page, put_mode)) != 0 && ret == 0)
1228
cp->pgno = save_page;
1229
cp->indx = save_indx;
1230
cp->recno = save_recno;
1231
cp->lock = save_lock;
1234
* We have advanced as far as we can.
1235
* Advance first_recno to this point.
1237
if (ret == 0 && meta->first_recno != first) {
1239
__db_logmsg(dbp->dbenv, dbc->txn, "Queue M",
1240
0, "%x %d %d %d", dbc->locker, cp->recno,
1241
first, meta->first_recno);
1243
if (DBC_LOGGING(dbc))
1244
if ((ret = __qam_incfirst_log(dbp,
1245
dbc->txn, &meta->dbmeta.lsn, 0,
1246
cp->recno, PGNO_BASE_MD)) != 0)
1248
meta->first_recno = first;
1249
(void)mpf->set(mpf, meta, DB_MPOOL_DIRTY);
1257
__qam_bulk(dbc, data, flags)
1271
qam_position_mode mode;
1272
int32_t *endp, *offp;
1273
u_int8_t *dbuf, *dp, *np;
1274
int exact, recs, re_len, ret, t_ret, valid;
1275
int is_key, need_pg, pagesize, size, space;
1279
cp = (QUEUE_CURSOR *)dbc->internal;
1282
if (F_ISSET(dbc, DBC_RMW))
1285
pagesize = dbp->pgsize;
1286
re_len = ((QUEUE *)dbp->q_internal)->re_len;
1287
recs = ((QUEUE *)dbp->q_internal)->rec_page;
1288
metapno = ((QUEUE *)dbp->q_internal)->q_meta;
1290
is_key = LF_ISSET(DB_MULTIPLE_KEY) ? 1 : 0;
1293
if ((ret = __db_lget(dbc, 0, metapno, DB_LOCK_READ, 0, &metalock)) != 0)
1295
if ((ret = mpf->get(mpf, &metapno, 0, &meta)) != 0) {
1296
/* We did not fetch it, we can release the lock. */
1297
(void)__LPUT(dbc, metalock);
1304
/* Keep track of space that is left. There is an termination entry */
1306
space -= sizeof(*offp);
1308
/* Build the offset/size table form the end up. */
1309
endp = (int32_t *) ((u_int8_t *)dbuf + data->ulen);
1314
if ((ret = __qam_position(dbc, &cp->recno, mode, &exact)) != 0)
1323
* If this page is a nonexistent page at the end of an
1324
* extent, pg may be NULL. A NULL page has no valid records,
1325
* so just keep looping as though qp exists and isn't QAM_VALID;
1326
* calling QAM_GET_RECORD is unsafe.
1330
/* Wrap around, skipping zero. */
1331
if (cp->recno == RECNO_OOB)
1334
qp = QAM_GET_RECORD(dbp, pg, indx);
1335
if (F_ISSET(qp, QAM_VALID)) {
1337
space -= (is_key ? 3 : 2) * sizeof(*offp);
1342
size = pagesize - QPAGE_SZ(dbp);
1359
(char *)pg + QPAGE_SZ(dbp), size);
1365
*offp-- = cp->recno;
1366
*offp-- = (int32_t)((u_int8_t*)qp -
1367
(u_int8_t*)pg - QPAGE_SZ(dbp) +
1368
dp - dbuf + SSZA(QAMDATA, data));
1372
if (!valid && is_key == 0) {
1377
} while (++indx < recs && indx != RECNO_OOB
1378
&& cp->recno != meta->cur_recno
1379
&& !QAM_AFTER_CURRENT(meta, cp->recno));
1381
if ((t_ret = __TLPUT(dbc, cp->lock)) != 0 && ret == 0)
1384
if (cp->page != NULL) {
1386
__qam_fput(dbp, cp->pgno, cp->page, 0)) != 0 && ret == 0)
1392
&& (indx >= recs || indx == RECNO_OOB)
1393
&& cp->recno != meta->cur_recno
1394
&& !QAM_AFTER_CURRENT(meta, cp->recno))
1403
/* release the meta page */
1404
t_ret = mpf->put(mpf, meta, 0);
1409
t_ret = __LPUT(dbc, metalock);
1416
* Close down the cursor from a single use.
1419
__qam_c_close(dbc, root_pgno, rmroot)
1421
db_pgno_t root_pgno;
1426
COMPQUIET(root_pgno, 0);
1427
COMPQUIET(rmroot, NULL);
1429
cp = (QUEUE_CURSOR *)dbc->internal;
1431
/* Discard any locks not acquired inside of a transaction. */
1432
(void)__TLPUT(dbc, cp->lock);
1433
LOCK_INIT(cp->lock);
1436
cp->pgno = PGNO_INVALID;
1438
cp->lock_mode = DB_LOCK_NG;
1439
cp->recno = RECNO_OOB;
1447
* Duplicate a queue cursor, such that the new one holds appropriate
1448
* locks for the position of the original.
1450
* PUBLIC: int __qam_c_dup __P((DBC *, DBC *));
1453
__qam_c_dup(orig_dbc, new_dbc)
1454
DBC *orig_dbc, *new_dbc;
1456
QUEUE_CURSOR *orig, *new;
1458
orig = (QUEUE_CURSOR *)orig_dbc->internal;
1459
new = (QUEUE_CURSOR *)new_dbc->internal;
1461
new->recno = orig->recno;
1463
/* reget the long term lock if we are not in a xact */
1464
if (orig_dbc->txn != NULL ||
1465
!STD_LOCKING(orig_dbc) || !LOCK_ISSET(orig->lock))
1468
return (__db_lget(new_dbc,
1469
0, new->recno, new->lock_mode, DB_LOCK_RECORD, &new->lock));
1475
* PUBLIC: int __qam_c_init __P((DBC *));
1487
/* Allocate the internal structure. */
1488
cp = (QUEUE_CURSOR *)dbc->internal;
1491
__os_calloc(dbp->dbenv, 1, sizeof(QUEUE_CURSOR), &cp)) != 0)
1493
dbc->internal = (DBC_INTERNAL *)cp;
1496
/* Initialize methods. */
1497
dbc->c_close = __db_c_close;
1498
dbc->c_count = __db_c_count;
1499
dbc->c_del = __db_c_del;
1500
dbc->c_dup = __db_c_dup;
1501
dbc->c_get = dbc->c_real_get = __db_c_get;
1502
dbc->c_pget = __db_c_pget;
1503
dbc->c_put = __db_c_put;
1504
dbc->c_am_bulk = __qam_bulk;
1505
dbc->c_am_close = __qam_c_close;
1506
dbc->c_am_del = __qam_c_del;
1507
dbc->c_am_destroy = __qam_c_destroy;
1508
dbc->c_am_get = __qam_c_get;
1509
dbc->c_am_put = __qam_c_put;
1510
dbc->c_am_writelock = NULL;
1516
* __qam_c_destroy --
1517
* Close a single cursor -- internal version.
1520
__qam_c_destroy(dbc)
1523
/* Discard the structures. */
1524
__os_free(dbc->dbp->dbenv, dbc->internal);
1531
* Check the user's record number.
1534
__qam_getno(dbp, key, rep)
1539
if ((*rep = *(db_recno_t *)key->data) == 0) {
1540
__db_err(dbp->dbenv, "illegal record number of 0");
1548
* Truncate a queue database
1550
* PUBLIC: int __qam_truncate __P((DB *, DB_TXN *, u_int32_t *));
1553
__qam_truncate(dbp, txn, countp)
1563
int count, ret, t_ret;
1567
/* Acquire a cursor. */
1568
if ((ret = dbp->cursor(dbp, txn, &dbc, 0)) != 0)
1571
/* Walk the queue, counting rows. */
1573
while ((ret = __qam_c_get(dbc, NULL, NULL, DB_CONSUME, &metapno)) == 0)
1576
if (ret == DB_NOTFOUND)
1579
/* Discard the cursor. */
1580
if ((t_ret = dbc->c_close(dbc)) != 0 && ret == 0)
1586
/* update the meta page */
1587
/* get the meta page */
1588
metapno = ((QUEUE *)dbp->q_internal)->q_meta;
1590
__db_lget(dbc, 0, metapno, DB_LOCK_WRITE, 0, &metalock)) != 0)
1593
if ((ret = mpf->get(mpf, &metapno, 0, &meta)) != 0) {
1594
/* We did not fetch it, we can release the lock. */
1595
(void)__LPUT(dbc, metalock);
1598
if (DBC_LOGGING(dbc)) {
1599
ret = __qam_mvptr_log(dbp, dbc->txn, &meta->dbmeta.lsn, 0,
1600
QAM_SETCUR | QAM_SETFIRST | QAM_TRUNCATE, meta->first_recno,
1601
1, meta->cur_recno, 1, &meta->dbmeta.lsn, PGNO_BASE_MD);
1604
meta->first_recno = meta->cur_recno = 1;
1607
mpf->put(mpf, meta, ret == 0 ? DB_MPOOL_DIRTY: 0)) != 0 && ret == 0)
1609
if ((t_ret = __LPUT(dbc, metalock)) != 0 && ret == 0)