2
* See the file LICENSE for redistribution information.
4
* Copyright (c) 1999-2001
5
* Sleepycat Software. All rights reserved.
11
static const char revid[] = "$Id: qam.c,v 11.106 2001/07/02 16:45:54 krinsky Exp $";
14
#ifndef NO_SYSTEM_INCLUDES
15
#include <sys/types.h>
30
static int __qam_bulk __P((DBC *, DBT *, u_int32_t));
31
static int __qam_c_close __P((DBC *, db_pgno_t, int *));
32
static int __qam_c_del __P((DBC *));
33
static int __qam_c_destroy __P((DBC *));
34
static int __qam_c_get __P((DBC *, DBT *, DBT *, u_int32_t, db_pgno_t *));
35
static int __qam_c_put __P((DBC *, DBT *, DBT *, u_int32_t, db_pgno_t *));
36
static int __qam_getno __P((DB *, const DBT *, db_recno_t *));
40
* Position a queued access method cursor at a record. This returns
41
* the page locked. *exactp will be set if the record is valid.
42
* PUBLIC: int __qam_position
43
* PUBLIC: __P((DBC *, db_recno_t *, qam_position_mode, int *));
46
__qam_position(dbc, recnop, mode, exactp)
47
DBC *dbc; /* open cursor */
48
db_recno_t *recnop; /* pointer to recno to find */
49
qam_position_mode mode;/* locking: read or write */
50
int *exactp; /* indicate if it was found */
59
cp = (QUEUE_CURSOR *)dbc->internal;
61
/* Fetch the page for this recno. */
62
pg = QAM_RECNO_PAGE(dbp, *recnop);
64
if ((ret = __db_lget(dbc, 0, pg, mode == QAM_READ ?
65
DB_LOCK_READ : DB_LOCK_WRITE, 0, &cp->lock)) != 0)
69
if ((ret = __qam_fget(dbp, &pg,
70
mode == QAM_WRITE ? DB_MPOOL_CREATE : 0, &cp->page)) != 0) {
71
/* We did not fetch it, we can release the lock. */
72
(void)__LPUT(dbc, cp->lock);
73
if (mode != QAM_WRITE &&
74
(ret == DB_PAGE_NOTFOUND || ret == ENOENT))
79
cp->indx = QAM_RECNO_INDEX(dbp, pg, *recnop);
81
if (PGNO(cp->page) == 0) {
82
if (F_ISSET(dbp, DB_AM_RDONLY)) {
87
TYPE(cp->page) = P_QAMDATA;
90
qp = QAM_GET_RECORD(dbp, cp->page, cp->indx);
91
*exactp = F_ISSET(qp, QAM_VALID) ? 1 : 0;
98
* Put an item on a queue page. Copy the data to the page and set the
99
* VALID and SET bits. If logging and the record was previously set,
100
* log that data, otherwise just log the new data.
102
* pagep must be write locked
104
* PUBLIC: int __qam_pitem
105
* PUBLIC: __P((DBC *, QPAGE *, u_int32_t, db_recno_t, DBT *));
108
__qam_pitem(dbc, pagep, indx, recno, data)
116
DBT olddata, pdata, *datap;
126
t = (QUEUE *)dbp->q_internal;
128
if (data->size > t->re_len)
131
qp = QAM_GET_RECORD(dbp, pagep, indx);
135
if (F_ISSET(data, DB_DBT_PARTIAL)) {
136
if (data->doff + data->dlen > t->re_len) {
137
alloced = data->dlen;
140
if (data->size != data->dlen) {
141
len_err: __db_err(dbp->dbenv,
142
"Length improper for fixed length record %lu",
143
(u_long)(alloced ? alloced : data->size));
146
if (data->size == t->re_len)
150
* If we are logging, then we have to build the record
151
* first, otherwise, we can simply drop the change
152
* directly on the page. After this clause, make
153
* sure that datap and p are set up correctly so that
154
* copying datap into p does the right thing.
156
* Note, I am changing this so that if the existing
157
* record is not valid, we create a complete record
158
* to log so that both this and the recovery code is simpler.
161
if (DB_LOGGING(dbc) || !F_ISSET(qp, QAM_VALID)) {
163
memset(datap, 0, sizeof(*datap));
165
if ((ret = __os_malloc(dbp->dbenv,
166
t->re_len, &datap->data)) != 0)
169
datap->size = t->re_len;
172
* Construct the record if it's valid, otherwise set it
173
* all to the pad character.
176
if (F_ISSET(qp, QAM_VALID))
177
memcpy(dest, p, t->re_len);
179
memset(dest, t->re_pad, t->re_len);
182
memcpy(dest, data->data, data->size);
190
if (DB_LOGGING(dbc)) {
192
if (F_ISSET(qp, QAM_SET)) {
193
olddata.data = qp->data;
194
olddata.size = t->re_len;
196
if ((ret = __qam_add_log(dbp->dbenv, dbc->txn, &LSN(pagep),
197
0, dbp->log_fileid, &LSN(pagep), pagep->pgno,
198
indx, recno, datap, qp->flags,
199
olddata.size == 0 ? NULL : &olddata)) != 0)
203
F_SET(qp, QAM_VALID | QAM_SET);
204
memcpy(p, datap->data, datap->size);
205
if (!F_ISSET(data, DB_DBT_PARTIAL))
206
memset(p + datap->size, t->re_pad, t->re_len - datap->size);
209
__os_free(dbp->dbenv, datap->data, t->re_len);
215
* Cursor put for queued access method.
216
* BEFORE and AFTER cannot be specified.
219
__qam_c_put(dbc, key, data, flags, pgnop)
230
db_recno_t new_cur, new_first;
232
int exact, ret, t_ret;
236
*pgnop = PGNO_INVALID;
238
cp = (QUEUE_CURSOR *)dbc->internal;
243
if ((ret = __qam_getno(dbp, key, &cp->recno)) != 0)
249
/* The interface shouldn't let anything else through. */
251
return (__db_ferr(dbp->dbenv, "__qam_c_put", flags));
254
/* Write lock the record. */
255
if ((ret = __db_lget(dbc,
256
0, cp->recno, DB_LOCK_WRITE, DB_LOCK_RECORD, &lock)) != 0)
259
if ((ret = __qam_position(dbc,
260
&cp->recno, QAM_WRITE, &exact)) != 0) {
261
/* We could not get the page, we can release the record lock. */
266
/* Put the item on the page. */
267
ret = __qam_pitem(dbc, (QPAGE *)cp->page, cp->indx, cp->recno, data);
269
/* Doing record locking, release the page lock */
270
if ((t_ret = __LPUT(dbc, cp->lock)) != 0 && ret == 0)
272
if ((t_ret = __qam_fput(
273
dbp, cp->pgno, cp->page, DB_MPOOL_DIRTY)) != 0 && ret == 0)
277
cp->lock_mode = DB_LOCK_WRITE;
281
/* We may need to reset the head or tail of the queue. */
282
pg = ((QUEUE *)dbp->q_internal)->q_meta;
283
if ((ret = __db_lget(dbc, 0, pg, DB_LOCK_WRITE, 0, &lock)) != 0)
285
if ((ret = memp_fget(dbp->mpf, &pg, 0, &meta)) != 0) {
286
/* We did not fetch it, we can release the lock. */
287
(void)__LPUT(dbc, lock);
292
new_cur = new_first = 0;
295
* If the put address is outside the queue, adjust the head and
296
* tail of the queue. If the order is inverted we move
297
* the one which is closer. The first case is when the
298
* queue is empty, move first and current to where the new
302
if (meta->first_recno == meta->cur_recno) {
303
new_first = cp->recno;
304
new_cur = cp->recno + 1;
305
if (new_cur == RECNO_OOB)
307
opcode |= QAM_SETFIRST;
308
opcode |= QAM_SETCUR;
310
if (QAM_BEFORE_FIRST(meta, cp->recno) &&
311
(meta->first_recno <= meta->cur_recno ||
312
meta->first_recno - cp->recno <
313
cp->recno - meta->cur_recno)) {
314
new_first = cp->recno;
315
opcode |= QAM_SETFIRST;
318
if (meta->cur_recno == cp->recno ||
319
(QAM_AFTER_CURRENT(meta, cp->recno) &&
320
(meta->first_recno <= meta->cur_recno ||
321
cp->recno - meta->cur_recno <=
322
meta->first_recno - cp->recno))) {
323
new_cur = cp->recno + 1;
324
if (new_cur == RECNO_OOB)
326
opcode |= QAM_SETCUR;
330
if (opcode != 0 && DB_LOGGING(dbc)) {
331
ret = __qam_mvptr_log(dbp->dbenv, dbc->txn, &meta->dbmeta.lsn,
332
0, opcode, dbp->log_fileid, meta->first_recno, new_first,
333
meta->cur_recno, new_cur, &meta->dbmeta.lsn);
338
if (opcode & QAM_SETCUR)
339
meta->cur_recno = new_cur;
340
if (opcode & QAM_SETFIRST)
341
meta->first_recno = new_first;
344
memp_fput(dbp->mpf, meta, opcode != 0 ? DB_MPOOL_DIRTY : 0)) != 0 &&
348
/* Don't hold the meta page long term. */
349
if ((t_ret = __LPUT(dbc, lock)) != 0 && ret == 0)
356
* Perform a put(DB_APPEND) in queue.
358
* PUBLIC: int __qam_append __P((DBC *, DBT *, DBT *));
361
__qam_append(dbc, key, data)
376
cp = (QUEUE_CURSOR *)dbc->internal;
378
/* Write lock the meta page. */
379
pg = ((QUEUE *)dbp->q_internal)->q_meta;
380
if ((ret = memp_fget(dbp->mpf, &pg, 0, &meta)) != 0)
382
if ((ret = __db_lget(dbc, 0, pg, DB_LOCK_WRITE, 0, &lock)) != 0) {
383
(void)memp_fput(dbp->mpf, meta, 0);
387
/* Get the next record number. */
388
recno = meta->cur_recno;
390
if (meta->cur_recno == RECNO_OOB)
392
if (meta->cur_recno == meta->first_recno) {
394
if (meta->cur_recno == RECNO_OOB)
396
(void)__LPUT(dbc, lock);
401
if (QAM_BEFORE_FIRST(meta, recno))
402
meta->first_recno = recno;
404
/* Lock the record and release meta page lock. */
405
if ((ret = __db_lget(dbc,
406
LCK_COUPLE, recno, DB_LOCK_WRITE, DB_LOCK_RECORD, &lock)) != 0) {
407
(void)__LPUT(dbc, lock);
412
* The application may modify the data based on the selected record
415
if (dbc->dbp->db_append_recno != NULL &&
416
(ret = dbc->dbp->db_append_recno(dbc->dbp, data, recno)) != 0) {
417
(void)__LPUT(dbc, lock);
422
cp->lock_mode = DB_LOCK_WRITE;
424
pg = QAM_RECNO_PAGE(dbp, recno);
426
/* Fetch and write lock the data page. */
427
if ((ret = __db_lget(dbc, 0, pg, DB_LOCK_WRITE, 0, &lock)) != 0)
429
if ((ret = __qam_fget(dbp, &pg, DB_MPOOL_CREATE, &page)) != 0) {
430
/* We did not fetch it, we can release the lock. */
431
(void)__LPUT(dbc, lock);
435
/* See if this is a new page. */
436
if (page->pgno == 0) {
438
page->type = P_QAMDATA;
441
/* Put the item on the page and log it. */
442
ret = __qam_pitem(dbc, page,
443
QAM_RECNO_INDEX(dbp, pg, recno), recno, data);
445
/* Doing record locking, release the page lock */
446
if ((t_ret = __LPUT(dbc, lock)) != 0 && ret == 0)
450
= __qam_fput(dbp, pg, page, DB_MPOOL_DIRTY)) != 0 && ret == 0)
453
/* Return the record number to the user. */
455
ret = __db_retcopy(dbp, key,
456
&recno, sizeof(recno), &dbc->rkey->data, &dbc->rkey->ulen);
458
/* Position the cursor on this record. */
461
/* See if we are leaving the extent. */
462
qp = (QUEUE *) dbp->q_internal;
463
if (qp->page_ext != 0
464
&& (recno % (qp->page_ext * qp->rec_page) == 0
465
|| recno == UINT32_T_MAX)) {
467
__db_lget(dbc, 0, pg, DB_LOCK_WRITE, 0, &lock)) != 0)
469
if (!QAM_AFTER_CURRENT(meta, recno))
470
ret = __qam_fclose(dbp, pg);
471
(void)__LPUT(dbc, lock);
475
/* Release the meta page. */
477
= memp_fput(dbp->mpf, meta, DB_MPOOL_DIRTY)) != 0 && ret == 0)
485
* Qam cursor->am_del function
499
int exact, ret, t_ret;
502
cp = (QUEUE_CURSOR *)dbc->internal;
504
pg = ((QUEUE *)dbp->q_internal)->q_meta;
505
if ((ret = __db_lget(dbc, 0, pg, DB_LOCK_READ, 0, &lock)) != 0)
507
if ((ret = memp_fget(dbp->mpf, &pg, 0, &meta)) != 0) {
508
/* We did not fetch it, we can release the lock. */
509
(void)__LPUT(dbc, lock);
513
if (QAM_NOT_VALID(meta, cp->recno))
516
/* Don't hold the meta page long term. */
517
if ((t_ret = __LPUT(dbc, lock)) != 0 && ret == 0)
519
if ((t_ret = memp_fput(dbp->mpf, meta, 0)) != 0 && ret == 0)
525
if ((ret = __db_lget(dbc,
526
0, cp->recno, DB_LOCK_WRITE, DB_LOCK_RECORD, &lock)) != 0)
529
cp->lock_mode = DB_LOCK_WRITE;
530
/* Find the record ; delete only deletes exact matches. */
531
if ((ret = __qam_position(dbc,
532
&cp->recno, QAM_WRITE, &exact)) != 0) {
542
qp = QAM_GET_RECORD(dbp, pagep, cp->indx);
544
if (DB_LOGGING(dbc)) {
545
if (((QUEUE *)dbp->q_internal)->page_ext == 0
546
|| ((QUEUE *)dbp->q_internal)->re_len == 0) {
548
__qam_del_log(dbp->dbenv,
549
dbc->txn, &LSN(pagep), 0,
550
dbp->log_fileid, &LSN(pagep),
551
pagep->pgno, cp->indx, cp->recno)) != 0)
554
data.size = ((QUEUE *)dbp->q_internal)->re_len;
555
data.data = qp->data;
557
__qam_delext_log(dbp->dbenv, dbc->txn,
558
&LSN(pagep), 0, dbp->log_fileid, &LSN(pagep),
559
pagep->pgno, cp->indx, cp->recno, &data)) != 0)
564
F_CLR(qp, QAM_VALID);
567
if ((t_ret = __qam_fput(dbp, cp->pgno,
568
cp->page, ret == 0 ? DB_MPOOL_DIRTY : 0)) != 0 && ret == 0)
572
/* Doing record locking, release the page lock */
573
if ((t_ret = __LPUT(dbc, cp->lock)) != 0 && ret == 0)
586
* Queue cursor->c_get function.
589
__qam_c_get(dbc, key, data, flags, pgnop)
596
DB_LOCK lock, pglock, metalock, save_lock;
605
db_lockmode_t lock_mode;
606
db_pgno_t metapno, save_page;
607
db_recno_t current, first, save_recno;
608
qam_position_mode mode;
609
u_int32_t rec_extent;
610
int exact, is_first, locked, ret, t_ret, wait, with_delete;
611
int put_mode, meta_dirty, retrying, wrapped;
613
cp = (QUEUE_CURSOR *)dbc->internal;
616
PANIC_CHECK(dbp->dbenv);
622
lock_mode = DB_LOCK_READ;
629
if (F_ISSET(dbc, DBC_RMW)) {
630
lock_mode = DB_LOCK_WRITE;
634
if (flags == DB_CONSUME_WAIT) {
638
if (flags == DB_CONSUME) {
639
DB_CHECK_TXN(dbp, dbc->txn);
642
lock_mode = DB_LOCK_WRITE;
646
DEBUG_LREAD(dbc, dbc->txn, "qam_c_get",
647
flags == DB_SET || flags == DB_SET_RANGE ? key : NULL, NULL, flags);
651
t = (QUEUE *)dbp->q_internal;
652
/* get the meta page */
654
if ((ret = __db_lget(dbc, 0, metapno, lock_mode, 0, &metalock)) != 0)
657
if ((ret = memp_fget(dbp->mpf, &metapno, 0, &meta)) != 0) {
658
/* We did not fetch it, we can release the lock. */
659
(void)__LPUT(dbc, metalock);
665
/* Make lint and friends happy. */
668
/* Release any previous lock if not in a transaction. */
669
(void)__TLPUT(dbc, cp->lock);
671
retry: /* Update the record number. */
681
if (cp->recno != RECNO_OOB) {
683
/* Wrap around, skipping zero. */
684
if (cp->recno == RECNO_OOB)
693
/* get the first record number */
694
cp->recno = first = meta->first_recno;
699
if (cp->recno != RECNO_OOB) {
700
if (QAM_BEFORE_FIRST(meta, cp->recno)
701
|| cp->recno == meta->first_recno) {
706
/* Wrap around, skipping zero. */
707
if (cp->recno == RECNO_OOB)
713
if (meta->first_recno == meta->cur_recno) {
717
cp->recno = meta->cur_recno - 1;
718
if (cp->recno == RECNO_OOB)
724
if ((ret = __qam_getno(dbp, key, &cp->recno)) != 0)
728
ret = __db_unknown_flag(dbp->dbenv, "__qam_c_get", flags);
733
* Check to see if we are out of data. Current points to
734
* the first free slot.
736
if (cp->recno == meta->cur_recno ||
737
QAM_AFTER_CURRENT(meta, cp->recno)) {
743
* If first is not set, then we skipped a
744
* locked record, go back and find it.
745
* If we find a locked record again
752
if (CDB_LOCKING(dbp->dbenv)) {
753
if ((ret = lock_get(dbp->dbenv, dbc->locker,
754
DB_LOCK_SWITCH, &dbc->lock_dbt,
755
DB_LOCK_WAIT, &dbc->mylock)) != 0)
757
if ((ret = lock_get(dbp->dbenv, dbc->locker,
758
DB_LOCK_UPGRADE, &dbc->lock_dbt, DB_LOCK_WRITE,
764
* Wait for someone to update the meta page.
765
* This will probably mean there is something
766
* in the queue. We then go back up and
770
if ((ret = __db_lget( dbc,
771
0, metapno, lock_mode, 0, &metalock)) != 0)
774
if (cp->recno != RECNO_OOB &&
775
!QAM_AFTER_CURRENT(meta, cp->recno))
778
if ((ret = __db_lget(dbc, 0, metapno,
779
DB_LOCK_WAIT, DB_LOCK_SWITCH, &metalock)) != 0)
781
if ((ret = lock_get(dbp->dbenv, dbc->locker,
782
DB_LOCK_UPGRADE, &dbc->lock_dbt, DB_LOCK_WRITE,
792
/* Don't hold the meta page long term. */
794
if ((ret = __LPUT(dbc, metalock)) != 0)
799
/* Lock the record. */
800
if ((ret = __db_lget(dbc, 0, cp->recno, lock_mode,
801
(with_delete && !retrying) ?
802
DB_LOCK_NOWAIT | DB_LOCK_RECORD : DB_LOCK_RECORD,
803
&lock)) == DB_LOCK_NOTGRANTED && with_delete) {
805
__db_logmsg(dbp->dbenv,
806
dbc->txn, "Queue S", 0, "%x %d %d %d",
807
dbc->locker, cp->recno, first, meta->first_recno);
817
* In the DB_FIRST or DB_LAST cases we must wait and then start over
818
* since the first/last may have moved while we slept.
819
* We release our locks and try again.
821
if ((!with_delete && is_first) || flags == DB_LAST) {
823
__db_lget(dbc, 0, metapno, lock_mode, 0, &metalock)) != 0)
826
(is_first ? meta->first_recno : (meta->cur_recno - 1))) {
833
/* Don't hold the meta page long term. */
834
if ((ret = __LPUT(dbc, metalock)) != 0)
838
/* Position the cursor on the record. */
839
if ((ret = __qam_position(dbc, &cp->recno, mode, &exact)) != 0) {
840
/* We cannot get the page, release the record lock. */
841
(void)__LPUT(dbc, lock);
848
cp->lock_mode = lock_mode;
851
if (flags == DB_NEXT || flags == DB_NEXT_NODUP
852
|| flags == DB_PREV || flags == DB_PREV_NODUP
853
|| flags == DB_LAST) {
854
/* Release locks and try again. */
856
(void)__qam_fput(dbp, cp->pgno, pg, 0);
857
cp->page = pg = NULL;
858
(void)__LPUT(dbc, pglock);
859
(void)__LPUT(dbc, cp->lock);
860
if (flags == DB_LAST)
867
/* this is for the SET and SET_RANGE cases */
872
/* Return the key if the user didn't give us one. */
873
if (key != NULL && flags != DB_SET && flags != DB_GET_BOTH &&
874
(ret = __db_retcopy(dbp, key, &cp->recno, sizeof(cp->recno),
875
&dbc->rkey->data, &dbc->rkey->ulen)) != 0)
879
F_SET(key, DB_DBT_ISSET);
881
qp = QAM_GET_RECORD(dbp, pg, cp->indx);
883
/* Return the data item. */
884
if (flags == DB_GET_BOTH) {
889
tmp.size = t->re_len;
890
if ((ret = __bam_defcmp(dbp, data, &tmp)) != 0) {
896
&& !F_ISSET(dbc, DBC_MULTIPLE|DBC_MULTIPLE_KEY)
897
&& (ret = __db_retcopy(dbp, data,
898
qp->data, t->re_len, &dbc->rdata->data, &dbc->rdata->ulen)) != 0)
902
F_SET(data, DB_DBT_ISSET);
904
/* Finally, if we are doing DB_CONSUME mark the record. */
907
* Assert that we're not a secondary index. Doing a DB_CONSUME
908
* on a secondary makes very little sense, since one can't
909
* DB_APPEND there; attempting one should be forbidden by
912
DB_ASSERT(!F_ISSET(dbp, DB_AM_SECONDARY));
915
* Check and see if we *have* any secondary indices.
916
* If we do, we're a primary, so call __db_c_del_primary
917
* to delete the references to the item we're about to
920
* Note that we work on a duplicated cursor, since the
921
* __db_ret work has already been done, so it's not safe
922
* to perform any additional ops on this cursor.
924
if (LIST_FIRST(&dbp->s_secondaries) != NULL) {
925
if ((ret = __db_c_idup(dbc,
926
&dbcdup, DB_POSITIONI)) != 0)
929
if ((ret = __db_c_del_primary(dbcdup)) != 0) {
931
* The __db_c_del_primary return is more
934
(void)dbcdup->c_close(dbcdup);
938
if ((ret = dbcdup->c_close(dbcdup)) != 0)
942
if (DB_LOGGING(dbc)) {
943
if (t->page_ext == 0 || t->re_len == 0) {
944
if ((ret = __qam_del_log(dbp->dbenv, dbc->txn,
945
&LSN(pg), 0, dbp->log_fileid, &LSN(pg),
946
pg->pgno, cp->indx, cp->recno)) != 0)
950
tmp.size = t->re_len;
952
__qam_delext_log(dbp->dbenv, dbc->txn,
953
&LSN(pg), 0, dbp->log_fileid, &LSN(pg),
954
pg->pgno, cp->indx, cp->recno, &tmp)) != 0)
959
F_CLR(qp, QAM_VALID);
960
put_mode = DB_MPOOL_DIRTY;
962
if ((ret = __LPUT(dbc, pglock)) != 0)
966
* Now we need to update the metapage
967
* first pointer. If we have deleted
968
* the record that is pointed to by
969
* first_recno then we move it as far
970
* forward as we can without blocking.
971
* The metapage lock must be held for
972
* the whole scan otherwise someone could
973
* do a random insert behind where we are
977
if (locked == 0 && (ret = __db_lget(
978
dbc, 0, metapno, lock_mode, 0, &metalock)) != 0)
982
__db_logmsg(dbp->dbenv,
983
dbc->txn, "Queue D", 0, "%x %d %d %d",
984
dbc->locker, cp->recno, first, meta->first_recno);
987
* See if we deleted the "first" record. If
988
* first is zero then we skipped something,
989
* see if first_recno has been move passed
990
* that to the record that we deleted.
994
if (first != meta->first_recno)
997
save_page = cp->pgno;
998
save_indx = cp->indx;
999
save_recno = cp->recno;
1000
save_lock = cp->lock;
1003
* If we skipped some deleted records, we need to
1004
* reposition on the first one. Get a lock
1005
* in case someone is trying to put it back.
1007
if (first != cp->recno) {
1008
ret = __db_lget(dbc, 0, first, DB_LOCK_READ,
1009
DB_LOCK_NOWAIT | DB_LOCK_RECORD, &lock);
1010
if (ret == DB_LOCK_NOTGRANTED) {
1017
__qam_fput(dbp, cp->pgno, cp->page, put_mode)) != 0)
1021
if ((ret = __qam_position(dbc,
1022
&first, QAM_READ, &exact)) != 0 || exact != 0) {
1023
(void)__LPUT(dbc, lock);
1026
if ((ret =__LPUT(dbc, lock)) != 0)
1028
if ((ret = __LPUT(dbc, cp->lock)) != 0)
1032
current = meta->cur_recno;
1034
if (first > current)
1036
rec_extent = meta->page_ext * meta->rec_page;
1038
/* Loop until we find a record or hit current */
1041
* Check to see if we are moving off the extent
1042
* and remove the extent.
1043
* If we are moving off a page we need to
1044
* get rid of the buffer.
1045
* Wait for the lagging readers to move off the
1048
if (cp->page != NULL && rec_extent != 0
1049
&& ((exact = (first % rec_extent == 0))
1050
|| first % meta->rec_page == 0
1051
|| first == UINT32_T_MAX)) {
1052
if (exact == 1 && (ret = __db_lget(dbc,
1053
0, cp->pgno, DB_LOCK_WRITE, 0, &cp->lock)) != 0)
1057
__db_logmsg(dbp->dbenv,
1058
dbc->txn, "Queue R", 0, "%x %d %d %d",
1059
dbc->locker, cp->pgno, first, meta->first_recno);
1061
put_mode |= DB_MPOOL_DISCARD;
1062
if ((ret = __qam_fput(dbp,
1063
cp->pgno, cp->page, put_mode)) != 0)
1068
ret = __qam_fremove(dbp, cp->pgno);
1069
t_ret = __LPUT(dbc, cp->lock);
1077
} else if (cp->page != NULL && (ret =
1078
__qam_fput(dbp, cp->pgno, cp->page, put_mode)) != 0)
1082
if (first == RECNO_OOB) {
1088
* LOOP EXIT when we come move to the current
1091
if (!wrapped && first >= current)
1094
ret = __db_lget(dbc, 0, first, DB_LOCK_READ,
1095
DB_LOCK_NOWAIT | DB_LOCK_RECORD, &lock);
1096
if (ret == DB_LOCK_NOTGRANTED) {
1103
if ((ret = __qam_position(dbc,
1104
&first, QAM_READ, &exact)) != 0) {
1105
(void)__LPUT(dbc, lock);
1109
if ((ret =__LPUT(dbc, lock)) != 0
1110
|| (ret = __LPUT(dbc, cp->lock)) != 0 ||exact) {
1111
if ((t_ret = __qam_fput(dbp, cp->pgno,
1112
cp->page, put_mode)) != 0 && ret == 0)
1119
cp->pgno = save_page;
1120
cp->indx = save_indx;
1121
cp->recno = save_recno;
1122
cp->lock = save_lock;
1125
* We have advanced as far as we can.
1126
* Advance first_recno to this point.
1128
if (ret == 0 && meta->first_recno != first) {
1130
__db_logmsg(dbp->dbenv, dbc->txn, "Queue M",
1131
0, "%x %d %d %d", dbc->locker, cp->recno,
1132
first, meta->first_recno);
1134
if (DB_LOGGING(dbc))
1136
__qam_incfirst_log(dbp->dbenv,
1137
dbc->txn, &meta->dbmeta.lsn, 0,
1138
dbp->log_fileid, cp->recno)) != 0)
1140
meta->first_recno = first;
1146
err1: if (cp->page != NULL) {
1147
t_ret = __qam_fput(dbp, cp->pgno, cp->page, put_mode);
1151
/* Doing record locking, release the page lock */
1152
t_ret = __LPUT(dbc, pglock);
1160
/* release the meta page */
1162
dbp->mpf, meta, meta_dirty ? DB_MPOOL_DIRTY : 0);
1167
/* Don't hold the meta page long term. */
1169
t_ret = __LPUT(dbc, metalock);
1171
DB_ASSERT(!LOCK_ISSET(metalock));
1174
* There is no need to keep the record locked if we are
1175
* not in a transaction.
1178
t_ret = __TLPUT(dbc, cp->lock);
1180
return (ret ? ret : t_ret);
1184
__qam_bulk(dbc, data, flags)
1197
int32_t *endp, *offp;
1198
u_int8_t *dbuf, *dp, *np;
1199
int exact, mode, recs, re_len, ret, t_ret, valid;
1200
int is_key, need_pg, pagesize, size, space;
1204
if (F_ISSET(dbc, DBC_RMW))
1208
cp = (QUEUE_CURSOR *)dbc->internal;
1210
pagesize = dbc->dbp->pgsize;
1211
re_len = ((QUEUE *)dbc->dbp->q_internal)->re_len;
1212
recs = ((QUEUE *)dbc->dbp->q_internal)->rec_page;
1213
metapno = ((QUEUE *)dbc->dbp->q_internal)->q_meta;
1215
is_key = LF_ISSET(DB_MULTIPLE_KEY) ? 1 : 0;
1217
if ((ret = __db_lget(dbc, 0, metapno, DB_LOCK_READ, 0, &metalock)) != 0)
1219
if ((ret = memp_fget(dbp->mpf, &metapno, 0, &meta)) != 0) {
1220
/* We did not fetch it, we can release the lock. */
1221
(void)__LPUT(dbc, metalock);
1228
/* Keep track of space that is left. There is an termination entry */
1230
space -= sizeof(*offp);
1232
/* Build the offset/size table form the end up. */
1233
endp = (int32_t *) ((u_int8_t *)dbuf + data->ulen);
1238
if ((ret = __qam_position(dbc, &cp->recno, mode, &exact)) != 0)
1247
* If this page is a nonexistent page at the end of an
1248
* extent, pg may be NULL. A NULL page has no valid records,
1249
* so just keep looping as though qp exists and isn't QAM_VALID;
1250
* calling QAM_GET_RECORD is unsafe.
1254
qp = QAM_GET_RECORD(dbp, pg, indx);
1255
if (F_ISSET(qp, QAM_VALID)) {
1257
space -= (is_key ? 3 : 2) * sizeof(*offp);
1262
size = pagesize - sizeof(QPAGE);
1279
(char *)pg + sizeof(QPAGE), size);
1285
*offp-- = cp->recno;
1286
*offp-- = (u_int8_t*)qp - (u_int8_t*)pg -
1287
sizeof(QPAGE) + dp - dbuf +
1288
SSZA(QAMDATA, data);
1292
if (!valid && is_key == 0) {
1297
} while (++indx < recs && cp->recno < meta->cur_recno);
1299
if ((t_ret = __TLPUT(dbc, cp->lock)) != 0 && ret == 0)
1302
if (cp->page != NULL) {
1304
__qam_fput(dbp, cp->pgno, cp->page, 0)) != 0 && ret == 0)
1309
if (ret == 0 && indx >= recs && cp->recno < meta->cur_recno)
1315
/* release the meta page */
1316
t_ret = memp_fput(dbp->mpf, meta, 0);
1321
t_ret = __LPUT(dbc, metalock);
1328
* Close down the cursor from a single use.
1331
__qam_c_close(dbc, root_pgno, rmroot)
1333
db_pgno_t root_pgno;
1338
COMPQUIET(root_pgno, 0);
1339
COMPQUIET(rmroot, NULL);
1341
cp = (QUEUE_CURSOR *)dbc->internal;
1343
/* Discard any locks not acquired inside of a transaction. */
1344
(void)__TLPUT(dbc, cp->lock);
1345
LOCK_INIT(cp->lock);
1348
cp->pgno = PGNO_INVALID;
1350
cp->lock_mode = DB_LOCK_NG;
1351
cp->recno = RECNO_OOB;
1359
* Duplicate a queue cursor, such that the new one holds appropriate
1360
* locks for the position of the original.
1362
* PUBLIC: int __qam_c_dup __P((DBC *, DBC *));
1365
__qam_c_dup(orig_dbc, new_dbc)
1366
DBC *orig_dbc, *new_dbc;
1368
QUEUE_CURSOR *orig, *new;
1370
orig = (QUEUE_CURSOR *)orig_dbc->internal;
1371
new = (QUEUE_CURSOR *)new_dbc->internal;
1373
new->recno = orig->recno;
1375
/* reget the long term lock if we are not in a xact */
1376
if (orig_dbc->txn != NULL ||
1377
!STD_LOCKING(orig_dbc) || !LOCK_ISSET(orig->lock))
1380
return (__db_lget(new_dbc,
1381
0, new->recno, new->lock_mode, DB_LOCK_RECORD, &new->lock));
1387
* PUBLIC: int __qam_c_init __P((DBC *));
1399
/* Allocate the internal structure. */
1400
cp = (QUEUE_CURSOR *)dbc->internal;
1403
__os_calloc(dbp->dbenv, 1, sizeof(QUEUE_CURSOR), &cp)) != 0)
1405
dbc->internal = (DBC_INTERNAL *)cp;
1408
/* Initialize methods. */
1409
dbc->c_close = __db_c_close;
1410
dbc->c_count = __db_c_count;
1411
dbc->c_del = __db_c_del;
1412
dbc->c_dup = __db_c_dup;
1413
dbc->c_get = dbc->c_real_get = __db_c_get;
1414
dbc->c_pget = __db_c_pget;
1415
dbc->c_put = __db_c_put;
1416
dbc->c_am_bulk = __qam_bulk;
1417
dbc->c_am_close = __qam_c_close;
1418
dbc->c_am_del = __qam_c_del;
1419
dbc->c_am_destroy = __qam_c_destroy;
1420
dbc->c_am_get = __qam_c_get;
1421
dbc->c_am_put = __qam_c_put;
1422
dbc->c_am_writelock = NULL;
1428
* __qam_c_destroy --
1429
* Close a single cursor -- internal version.
1432
__qam_c_destroy(dbc)
1435
/* Discard the structures. */
1436
__os_free(dbc->dbp->dbenv, dbc->internal, sizeof(QUEUE_CURSOR));
1443
* Check the user's record number.
1446
__qam_getno(dbp, key, rep)
1451
if ((*rep = *(db_recno_t *)key->data) == 0) {
1452
__db_err(dbp->dbenv, "illegal record number of 0");
1460
* Truncate a queue database
1462
* PUBLIC: int __qam_truncate __P((DB *, DB_TXN *, u_int32_t *));
1465
__qam_truncate(dbp, txn, countp)
1474
int count, ret, t_ret;
1476
/* Acquire a cursor. */
1477
if ((ret = dbp->cursor(dbp, txn, &dbc, 0)) != 0)
1480
/* Walk the queue, counting rows. */
1482
while ((ret = __qam_c_get(dbc, NULL, NULL, DB_CONSUME, &metapno)) == 0)
1485
if (ret == DB_NOTFOUND)
1488
/* Discard the cursor. */
1489
if ((t_ret = dbc->c_close(dbc)) != 0 && ret == 0)
1495
/* update the meta page */
1496
/* get the meta page */
1497
metapno = ((QUEUE *)dbp->q_internal)->q_meta;
1499
__db_lget(dbc, 0, metapno, DB_LOCK_WRITE, 0, &metalock)) != 0)
1502
if ((ret = memp_fget(dbp->mpf, &metapno, 0, &meta)) != 0) {
1503
/* We did not fetch it, we can release the lock. */
1504
(void)__LPUT(dbc, metalock);
1507
if (DB_LOGGING(dbc)) {
1508
ret = __qam_mvptr_log(dbp->dbenv, dbc->txn,
1509
&meta->dbmeta.lsn, 0, QAM_SETCUR|QAM_SETFIRST|QAM_TRUNCATE,
1510
dbp->log_fileid, meta->first_recno, 1,
1511
meta->cur_recno, 1, &meta->dbmeta.lsn);
1514
meta->first_recno = meta->cur_recno = 1;
1516
if ((t_ret = memp_fput(dbp->mpf,
1517
meta, ret == 0 ? DB_MPOOL_DIRTY: 0)) != 0 && ret == 0)
1519
if ((t_ret = __LPUT(dbc, metalock)) != 0 && ret == 0)