~ubuntu-branches/ubuntu/edgy/rpm/edgy

« back to all changes in this revision

Viewing changes to db/qam/qam.c

  • Committer: Bazaar Package Importer
  • Author(s): Joey Hess
  • Date: 2002-01-22 20:56:57 UTC
  • Revision ID: james.westby@ubuntu.com-20020122205657-l74j50mr9z8ofcl5
Tags: upstream-4.0.3
ImportĀ upstreamĀ versionĀ 4.0.3

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*-
 
2
 * See the file LICENSE for redistribution information.
 
3
 *
 
4
 * Copyright (c) 1999-2001
 
5
 *      Sleepycat Software.  All rights reserved.
 
6
 */
 
7
 
 
8
#include "db_config.h"
 
9
 
 
10
#ifndef lint
 
11
static const char revid[] = "$Id: qam.c,v 11.106 2001/07/02 16:45:54 krinsky Exp $";
 
12
#endif /* not lint */
 
13
 
 
14
#ifndef NO_SYSTEM_INCLUDES
 
15
#include <sys/types.h>
 
16
 
 
17
#include <string.h>
 
18
#endif
 
19
 
 
20
#include "db_int.h"
 
21
#include "db_page.h"
 
22
#include "db_shash.h"
 
23
#include "db_am.h"
 
24
#include "mp.h"
 
25
#include "lock.h"
 
26
#include "log.h"
 
27
#include "btree.h"
 
28
#include "qam.h"
 
29
 
 
30
static int __qam_bulk __P((DBC *, DBT *, u_int32_t));
 
31
static int __qam_c_close __P((DBC *, db_pgno_t, int *));
 
32
static int __qam_c_del __P((DBC *));
 
33
static int __qam_c_destroy __P((DBC *));
 
34
static int __qam_c_get __P((DBC *, DBT *, DBT *, u_int32_t, db_pgno_t *));
 
35
static int __qam_c_put __P((DBC *, DBT *, DBT *, u_int32_t, db_pgno_t *));
 
36
static int __qam_getno __P((DB *, const DBT *, db_recno_t *));
 
37
 
 
38
/*
 
39
 * __qam_position --
 
40
 *      Position a queued access method cursor at a record.  This returns
 
41
 *      the page locked.  *exactp will be set if the record is valid.
 
42
 * PUBLIC: int __qam_position
 
43
 * PUBLIC:       __P((DBC *, db_recno_t *, qam_position_mode, int *));
 
44
 */
 
45
int
 
46
__qam_position(dbc, recnop, mode, exactp)
 
47
        DBC *dbc;               /* open cursor */
 
48
        db_recno_t *recnop;     /* pointer to recno to find */
 
49
        qam_position_mode mode;/* locking: read or write */
 
50
        int *exactp;            /* indicate if it was found */
 
51
{
 
52
        QUEUE_CURSOR *cp;
 
53
        DB *dbp;
 
54
        QAMDATA  *qp;
 
55
        db_pgno_t pg;
 
56
        int ret;
 
57
 
 
58
        dbp = dbc->dbp;
 
59
        cp = (QUEUE_CURSOR *)dbc->internal;
 
60
 
 
61
        /* Fetch the page for this recno. */
 
62
        pg = QAM_RECNO_PAGE(dbp, *recnop);
 
63
 
 
64
        if ((ret = __db_lget(dbc, 0, pg, mode == QAM_READ ?
 
65
             DB_LOCK_READ : DB_LOCK_WRITE, 0, &cp->lock)) != 0)
 
66
                return (ret);
 
67
        cp->page = NULL;
 
68
        *exactp = 0;
 
69
        if ((ret = __qam_fget(dbp, &pg,
 
70
            mode == QAM_WRITE ? DB_MPOOL_CREATE : 0, &cp->page)) != 0) {
 
71
                /* We did not fetch it, we can release the lock. */
 
72
                (void)__LPUT(dbc, cp->lock);
 
73
                if (mode != QAM_WRITE &&
 
74
                    (ret == DB_PAGE_NOTFOUND || ret == ENOENT))
 
75
                        return (0);
 
76
                return (ret);
 
77
        }
 
78
        cp->pgno = pg;
 
79
        cp->indx = QAM_RECNO_INDEX(dbp, pg, *recnop);
 
80
 
 
81
        if (PGNO(cp->page) == 0) {
 
82
                if (F_ISSET(dbp, DB_AM_RDONLY)) {
 
83
                        *exactp = 0;
 
84
                        return (0);
 
85
                }
 
86
                PGNO(cp->page) = pg;
 
87
                TYPE(cp->page) = P_QAMDATA;
 
88
        }
 
89
 
 
90
        qp = QAM_GET_RECORD(dbp, cp->page, cp->indx);
 
91
        *exactp = F_ISSET(qp, QAM_VALID) ? 1 : 0;
 
92
 
 
93
        return (ret);
 
94
}
 
95
 
 
96
/*
 
97
 * __qam_pitem --
 
98
 *      Put an item on a queue page.  Copy the data to the page and set the
 
99
 *      VALID and SET bits.  If logging and the record was previously set,
 
100
 *      log that data, otherwise just log the new data.
 
101
 *
 
102
 *   pagep must be write locked
 
103
 *
 
104
 * PUBLIC: int __qam_pitem
 
105
 * PUBLIC:     __P((DBC *,  QPAGE *, u_int32_t, db_recno_t, DBT *));
 
106
 */
 
107
int
 
108
__qam_pitem(dbc, pagep, indx, recno, data)
 
109
        DBC *dbc;
 
110
        QPAGE *pagep;
 
111
        u_int32_t indx;
 
112
        db_recno_t recno;
 
113
        DBT *data;
 
114
{
 
115
        DB *dbp;
 
116
        DBT olddata, pdata, *datap;
 
117
        QAMDATA *qp;
 
118
        QUEUE *t;
 
119
        u_int32_t alloced;
 
120
        u_int8_t *dest, *p;
 
121
        int ret;
 
122
 
 
123
        alloced = ret = 0;
 
124
 
 
125
        dbp = dbc->dbp;
 
126
        t = (QUEUE *)dbp->q_internal;
 
127
 
 
128
        if (data->size > t->re_len)
 
129
                goto len_err;
 
130
 
 
131
        qp = QAM_GET_RECORD(dbp, pagep, indx);
 
132
 
 
133
        p = qp->data;
 
134
        datap = data;
 
135
        if (F_ISSET(data, DB_DBT_PARTIAL)) {
 
136
                if (data->doff + data->dlen > t->re_len) {
 
137
                        alloced = data->dlen;
 
138
                        goto len_err;
 
139
                }
 
140
                if (data->size != data->dlen) {
 
141
len_err:                __db_err(dbp->dbenv,
 
142
                            "Length improper for fixed length record %lu",
 
143
                            (u_long)(alloced ? alloced : data->size));
 
144
                        return (EINVAL);
 
145
                }
 
146
                if (data->size == t->re_len)
 
147
                        goto no_partial;
 
148
 
 
149
                /*
 
150
                 * If we are logging, then we have to build the record
 
151
                 * first, otherwise, we can simply drop the change
 
152
                 * directly on the page.  After this clause, make
 
153
                 * sure that datap and p are set up correctly so that
 
154
                 * copying datap into p does the right thing.
 
155
                 *
 
156
                 * Note, I am changing this so that if the existing
 
157
                 * record is not valid, we create a complete record
 
158
                 * to log so that both this and the recovery code is simpler.
 
159
                 */
 
160
 
 
161
                if (DB_LOGGING(dbc) || !F_ISSET(qp, QAM_VALID)) {
 
162
                        datap = &pdata;
 
163
                        memset(datap, 0, sizeof(*datap));
 
164
 
 
165
                        if ((ret = __os_malloc(dbp->dbenv,
 
166
                            t->re_len, &datap->data)) != 0)
 
167
                                return (ret);
 
168
                        alloced = 1;
 
169
                        datap->size = t->re_len;
 
170
 
 
171
                        /*
 
172
                         * Construct the record if it's valid, otherwise set it
 
173
                         * all to the pad character.
 
174
                         */
 
175
                        dest = datap->data;
 
176
                        if (F_ISSET(qp, QAM_VALID))
 
177
                                memcpy(dest, p, t->re_len);
 
178
                        else
 
179
                                memset(dest, t->re_pad, t->re_len);
 
180
 
 
181
                        dest += data->doff;
 
182
                        memcpy(dest, data->data, data->size);
 
183
                } else {
 
184
                        datap = data;
 
185
                        p += data->doff;
 
186
                }
 
187
        }
 
188
 
 
189
no_partial:
 
190
        if (DB_LOGGING(dbc)) {
 
191
                olddata.size = 0;
 
192
                if (F_ISSET(qp, QAM_SET)) {
 
193
                        olddata.data = qp->data;
 
194
                        olddata.size = t->re_len;
 
195
                }
 
196
                if ((ret = __qam_add_log(dbp->dbenv, dbc->txn, &LSN(pagep),
 
197
                    0, dbp->log_fileid, &LSN(pagep), pagep->pgno,
 
198
                    indx, recno, datap, qp->flags,
 
199
                    olddata.size == 0 ? NULL : &olddata)) != 0)
 
200
                        goto err;
 
201
        }
 
202
 
 
203
        F_SET(qp, QAM_VALID | QAM_SET);
 
204
        memcpy(p, datap->data, datap->size);
 
205
        if (!F_ISSET(data, DB_DBT_PARTIAL))
 
206
                memset(p + datap->size,  t->re_pad, t->re_len - datap->size);
 
207
 
 
208
err:    if (alloced)
 
209
                __os_free(dbp->dbenv, datap->data, t->re_len);
 
210
 
 
211
        return (ret);
 
212
}
 
213
/*
 
214
 * __qam_c_put
 
215
 *      Cursor put for queued access method.
 
216
 *      BEFORE and AFTER cannot be specified.
 
217
 */
 
218
static int
 
219
__qam_c_put(dbc, key, data, flags, pgnop)
 
220
        DBC *dbc;
 
221
        DBT *key, *data;
 
222
        u_int32_t flags;
 
223
        db_pgno_t *pgnop;
 
224
{
 
225
        QUEUE_CURSOR *cp;
 
226
        DB *dbp;
 
227
        DB_LOCK lock;
 
228
        QMETA *meta;
 
229
        db_pgno_t pg;
 
230
        db_recno_t new_cur, new_first;
 
231
        u_int32_t opcode;
 
232
        int exact, ret, t_ret;
 
233
 
 
234
        dbp = dbc->dbp;
 
235
        if (pgnop != NULL)
 
236
                *pgnop = PGNO_INVALID;
 
237
 
 
238
        cp = (QUEUE_CURSOR *)dbc->internal;
 
239
 
 
240
        switch (flags) {
 
241
        case DB_KEYFIRST:
 
242
        case DB_KEYLAST:
 
243
                if ((ret = __qam_getno(dbp, key, &cp->recno)) != 0)
 
244
                        return (ret);
 
245
                /* FALLTHROUGH */
 
246
        case DB_CURRENT:
 
247
                break;
 
248
        default:
 
249
                /* The interface shouldn't let anything else through. */
 
250
                DB_ASSERT(0);
 
251
                return (__db_ferr(dbp->dbenv, "__qam_c_put", flags));
 
252
        }
 
253
 
 
254
        /* Write lock the record. */
 
255
        if ((ret = __db_lget(dbc,
 
256
            0, cp->recno, DB_LOCK_WRITE, DB_LOCK_RECORD, &lock)) != 0)
 
257
                return (ret);
 
258
 
 
259
        if ((ret = __qam_position(dbc,
 
260
            &cp->recno, QAM_WRITE, &exact)) != 0) {
 
261
                /* We could not get the page, we can release the record lock. */
 
262
                __LPUT(dbc, lock);
 
263
                return (ret);
 
264
        }
 
265
 
 
266
        /* Put the item on the page. */
 
267
        ret = __qam_pitem(dbc, (QPAGE *)cp->page, cp->indx, cp->recno, data);
 
268
 
 
269
        /* Doing record locking, release the page lock */
 
270
        if ((t_ret = __LPUT(dbc, cp->lock)) != 0 && ret == 0)
 
271
                ret = t_ret;
 
272
        if ((t_ret = __qam_fput(
 
273
            dbp, cp->pgno, cp->page, DB_MPOOL_DIRTY)) != 0 && ret == 0)
 
274
                ret = t_ret;
 
275
        cp->page = NULL;
 
276
        cp->lock = lock;
 
277
        cp->lock_mode = DB_LOCK_WRITE;
 
278
        if (ret != 0)
 
279
                return (ret);
 
280
 
 
281
        /* We may need to reset the head or tail of the queue. */
 
282
        pg = ((QUEUE *)dbp->q_internal)->q_meta;
 
283
        if ((ret = __db_lget(dbc, 0, pg,  DB_LOCK_WRITE, 0, &lock)) != 0)
 
284
                return (ret);
 
285
        if ((ret = memp_fget(dbp->mpf, &pg, 0, &meta)) != 0) {
 
286
                /* We did not fetch it, we can release the lock. */
 
287
                (void)__LPUT(dbc, lock);
 
288
                return (ret);
 
289
        }
 
290
 
 
291
        opcode = 0;
 
292
        new_cur = new_first = 0;
 
293
 
 
294
        /*
 
295
         * If the put address is outside the queue, adjust the head and
 
296
         * tail of the queue.  If the order is inverted we move
 
297
         * the one which is closer.  The first case is when the
 
298
         * queue is empty, move first and current to where the new
 
299
         * insert is.
 
300
         */
 
301
 
 
302
        if (meta->first_recno == meta->cur_recno) {
 
303
                new_first = cp->recno;
 
304
                new_cur = cp->recno + 1;
 
305
                if (new_cur == RECNO_OOB)
 
306
                        new_cur++;
 
307
                opcode |= QAM_SETFIRST;
 
308
                opcode |= QAM_SETCUR;
 
309
        } else {
 
310
                if (QAM_BEFORE_FIRST(meta, cp->recno) &&
 
311
                    (meta->first_recno <= meta->cur_recno ||
 
312
                    meta->first_recno - cp->recno <
 
313
                    cp->recno - meta->cur_recno)) {
 
314
                        new_first = cp->recno;
 
315
                        opcode |= QAM_SETFIRST;
 
316
                }
 
317
 
 
318
                if (meta->cur_recno == cp->recno ||
 
319
                    (QAM_AFTER_CURRENT(meta, cp->recno) &&
 
320
                    (meta->first_recno <= meta->cur_recno ||
 
321
                    cp->recno - meta->cur_recno <=
 
322
                    meta->first_recno - cp->recno))) {
 
323
                        new_cur = cp->recno + 1;
 
324
                        if (new_cur == RECNO_OOB)
 
325
                                new_cur++;
 
326
                        opcode |= QAM_SETCUR;
 
327
                }
 
328
        }
 
329
 
 
330
        if (opcode != 0 && DB_LOGGING(dbc)) {
 
331
                ret = __qam_mvptr_log(dbp->dbenv, dbc->txn, &meta->dbmeta.lsn,
 
332
                    0, opcode, dbp->log_fileid, meta->first_recno, new_first,
 
333
                    meta->cur_recno, new_cur, &meta->dbmeta.lsn);
 
334
                if (ret != 0)
 
335
                        opcode = 0;
 
336
        }
 
337
 
 
338
        if (opcode & QAM_SETCUR)
 
339
                meta->cur_recno = new_cur;
 
340
        if (opcode & QAM_SETFIRST)
 
341
                meta->first_recno = new_first;
 
342
 
 
343
        if ((t_ret =
 
344
            memp_fput(dbp->mpf, meta, opcode != 0 ? DB_MPOOL_DIRTY : 0)) != 0 &&
 
345
            ret == 0)
 
346
                ret = t_ret;
 
347
 
 
348
        /* Don't hold the meta page long term. */
 
349
        if ((t_ret = __LPUT(dbc, lock)) != 0 && ret == 0)
 
350
                ret = t_ret;
 
351
        return (ret);
 
352
}
 
353
 
 
354
/*
 
355
 * __qam_append --
 
356
 *      Perform a put(DB_APPEND) in queue.
 
357
 *
 
358
 * PUBLIC: int __qam_append __P((DBC *, DBT *, DBT *));
 
359
 */
 
360
int
 
361
__qam_append(dbc, key, data)
 
362
        DBC *dbc;
 
363
        DBT *key, *data;
 
364
{
 
365
        DB *dbp;
 
366
        DB_LOCK lock;
 
367
        QMETA *meta;
 
368
        QPAGE *page;
 
369
        QUEUE *qp;
 
370
        QUEUE_CURSOR *cp;
 
371
        db_pgno_t pg;
 
372
        db_recno_t recno;
 
373
        int ret, t_ret;
 
374
 
 
375
        dbp = dbc->dbp;
 
376
        cp = (QUEUE_CURSOR *)dbc->internal;
 
377
 
 
378
        /* Write lock the meta page. */
 
379
        pg = ((QUEUE *)dbp->q_internal)->q_meta;
 
380
        if ((ret = memp_fget(dbp->mpf, &pg, 0, &meta)) != 0)
 
381
                return (ret);
 
382
        if ((ret = __db_lget(dbc, 0, pg,  DB_LOCK_WRITE, 0, &lock)) != 0) {
 
383
                (void)memp_fput(dbp->mpf, meta, 0);
 
384
                return (ret);
 
385
        }
 
386
 
 
387
        /* Get the next record number. */
 
388
        recno = meta->cur_recno;
 
389
        meta->cur_recno++;
 
390
        if (meta->cur_recno == RECNO_OOB)
 
391
                meta->cur_recno++;
 
392
        if (meta->cur_recno == meta->first_recno) {
 
393
                meta->cur_recno--;
 
394
                if (meta->cur_recno == RECNO_OOB)
 
395
                        meta->cur_recno--;
 
396
                (void)__LPUT(dbc, lock);
 
397
                ret = EFBIG;
 
398
                goto err;
 
399
        }
 
400
 
 
401
        if (QAM_BEFORE_FIRST(meta, recno))
 
402
                meta->first_recno = recno;
 
403
 
 
404
        /* Lock the record and release meta page lock. */
 
405
        if ((ret = __db_lget(dbc,
 
406
            LCK_COUPLE, recno, DB_LOCK_WRITE, DB_LOCK_RECORD, &lock)) != 0) {
 
407
                (void)__LPUT(dbc, lock);
 
408
                goto err;
 
409
        }
 
410
 
 
411
        /*
 
412
         * The application may modify the data based on the selected record
 
413
         * number.
 
414
         */
 
415
        if (dbc->dbp->db_append_recno != NULL &&
 
416
            (ret = dbc->dbp->db_append_recno(dbc->dbp, data, recno)) != 0) {
 
417
                (void)__LPUT(dbc, lock);
 
418
                goto err;
 
419
        }
 
420
 
 
421
        cp->lock = lock;
 
422
        cp->lock_mode = DB_LOCK_WRITE;
 
423
 
 
424
        pg = QAM_RECNO_PAGE(dbp, recno);
 
425
 
 
426
        /* Fetch and write lock the data page. */
 
427
        if ((ret = __db_lget(dbc, 0, pg,  DB_LOCK_WRITE, 0, &lock)) != 0)
 
428
                goto err;
 
429
        if ((ret = __qam_fget(dbp, &pg, DB_MPOOL_CREATE, &page)) != 0) {
 
430
                /* We did not fetch it, we can release the lock. */
 
431
                (void)__LPUT(dbc, lock);
 
432
                goto err;
 
433
        }
 
434
 
 
435
        /* See if this is a new page. */
 
436
        if (page->pgno == 0) {
 
437
                page->pgno = pg;
 
438
                page->type = P_QAMDATA;
 
439
        }
 
440
 
 
441
        /* Put the item on the page and log it. */
 
442
        ret = __qam_pitem(dbc, page,
 
443
            QAM_RECNO_INDEX(dbp, pg, recno), recno, data);
 
444
 
 
445
        /* Doing record locking, release the page lock */
 
446
        if ((t_ret = __LPUT(dbc, lock)) != 0 && ret == 0)
 
447
                ret = t_ret;
 
448
 
 
449
        if ((t_ret
 
450
            = __qam_fput(dbp, pg, page, DB_MPOOL_DIRTY)) != 0 && ret == 0)
 
451
                ret = t_ret;
 
452
 
 
453
        /* Return the record number to the user. */
 
454
        if (ret == 0)
 
455
                ret = __db_retcopy(dbp, key,
 
456
                    &recno, sizeof(recno), &dbc->rkey->data, &dbc->rkey->ulen);
 
457
 
 
458
        /* Position the cursor on this record. */
 
459
        cp->recno = recno;
 
460
 
 
461
        /* See if we are leaving the extent. */
 
462
        qp = (QUEUE *) dbp->q_internal;
 
463
        if (qp->page_ext != 0
 
464
            && (recno % (qp->page_ext * qp->rec_page) == 0
 
465
            || recno == UINT32_T_MAX)) {
 
466
                if ((ret =
 
467
                    __db_lget(dbc, 0, pg,  DB_LOCK_WRITE, 0, &lock)) != 0)
 
468
                        goto err;
 
469
                if (!QAM_AFTER_CURRENT(meta, recno))
 
470
                        ret = __qam_fclose(dbp, pg);
 
471
                (void)__LPUT(dbc, lock);
 
472
        }
 
473
 
 
474
err:
 
475
        /* Release the meta page. */
 
476
        if ((t_ret
 
477
             = memp_fput(dbp->mpf, meta, DB_MPOOL_DIRTY)) != 0 && ret == 0)
 
478
                ret = t_ret;
 
479
 
 
480
        return (ret);
 
481
}
 
482
 
 
483
/*
 
484
 * __qam_c_del --
 
485
 *      Qam cursor->am_del function
 
486
 */
 
487
static int
 
488
__qam_c_del(dbc)
 
489
        DBC *dbc;
 
490
{
 
491
        QUEUE_CURSOR *cp;
 
492
        DB *dbp;
 
493
        DBT data;
 
494
        DB_LOCK lock;
 
495
        PAGE *pagep;
 
496
        QAMDATA *qp;
 
497
        QMETA *meta;
 
498
        db_pgno_t pg;
 
499
        int exact, ret, t_ret;
 
500
 
 
501
        dbp = dbc->dbp;
 
502
        cp = (QUEUE_CURSOR *)dbc->internal;
 
503
 
 
504
        pg = ((QUEUE *)dbp->q_internal)->q_meta;
 
505
        if ((ret = __db_lget(dbc, 0, pg,  DB_LOCK_READ, 0, &lock)) != 0)
 
506
                return (ret);
 
507
        if ((ret = memp_fget(dbp->mpf, &pg, 0, &meta)) != 0) {
 
508
                /* We did not fetch it, we can release the lock. */
 
509
                (void)__LPUT(dbc, lock);
 
510
                return (ret);
 
511
        }
 
512
 
 
513
        if (QAM_NOT_VALID(meta, cp->recno))
 
514
                ret = DB_NOTFOUND;
 
515
 
 
516
        /* Don't hold the meta page long term. */
 
517
        if ((t_ret = __LPUT(dbc, lock)) != 0 && ret == 0)
 
518
                ret = t_ret;
 
519
        if ((t_ret = memp_fput(dbp->mpf, meta, 0)) != 0 && ret == 0)
 
520
                ret = t_ret;
 
521
 
 
522
        if (ret != 0)
 
523
                return (ret);
 
524
 
 
525
        if ((ret = __db_lget(dbc,
 
526
            0, cp->recno, DB_LOCK_WRITE, DB_LOCK_RECORD, &lock)) != 0)
 
527
                return (ret);
 
528
 
 
529
        cp->lock_mode = DB_LOCK_WRITE;
 
530
        /* Find the record ; delete only deletes exact matches. */
 
531
        if ((ret = __qam_position(dbc,
 
532
            &cp->recno, QAM_WRITE, &exact)) != 0) {
 
533
                cp->lock = lock;
 
534
                return (ret);
 
535
        }
 
536
        if (!exact) {
 
537
                ret = DB_NOTFOUND;
 
538
                goto err1;
 
539
        }
 
540
 
 
541
        pagep = cp->page;
 
542
        qp = QAM_GET_RECORD(dbp, pagep, cp->indx);
 
543
 
 
544
        if (DB_LOGGING(dbc)) {
 
545
                if (((QUEUE *)dbp->q_internal)->page_ext == 0
 
546
                    || ((QUEUE *)dbp->q_internal)->re_len == 0) {
 
547
                        if ((ret =
 
548
                            __qam_del_log(dbp->dbenv,
 
549
                            dbc->txn, &LSN(pagep), 0,
 
550
                            dbp->log_fileid, &LSN(pagep),
 
551
                            pagep->pgno, cp->indx, cp->recno)) != 0)
 
552
                                goto err1;
 
553
                } else {
 
554
                        data.size = ((QUEUE *)dbp->q_internal)->re_len;
 
555
                        data.data = qp->data;
 
556
                        if ((ret =
 
557
                            __qam_delext_log(dbp->dbenv, dbc->txn,
 
558
                            &LSN(pagep), 0, dbp->log_fileid, &LSN(pagep),
 
559
                            pagep->pgno, cp->indx, cp->recno, &data)) != 0)
 
560
                                goto err1;
 
561
                }
 
562
        }
 
563
 
 
564
        F_CLR(qp, QAM_VALID);
 
565
 
 
566
err1:
 
567
        if ((t_ret = __qam_fput(dbp, cp->pgno,
 
568
            cp->page, ret == 0 ? DB_MPOOL_DIRTY : 0)) != 0 && ret == 0)
 
569
                ret = t_ret;
 
570
        cp->page = NULL;
 
571
 
 
572
        /* Doing record locking, release the page lock */
 
573
        if ((t_ret = __LPUT(dbc, cp->lock)) != 0 && ret == 0)
 
574
                ret = t_ret;
 
575
        cp->lock = lock;
 
576
 
 
577
        return (ret);
 
578
}
 
579
 
 
580
#ifdef  DEBUG_WOP
 
581
#define QDEBUG
 
582
#endif
 
583
 
 
584
/*
 
585
 * __qam_c_get --
 
586
 *      Queue cursor->c_get function.
 
587
 */
 
588
static int
 
589
__qam_c_get(dbc, key, data, flags, pgnop)
 
590
        DBC *dbc;
 
591
        DBT *key, *data;
 
592
        u_int32_t flags;
 
593
        db_pgno_t *pgnop;
 
594
{
 
595
        DB *dbp;
 
596
        DB_LOCK lock, pglock, metalock, save_lock;
 
597
        DBC *dbcdup;
 
598
        DBT tmp;
 
599
        PAGE *pg;
 
600
        QAMDATA *qp;
 
601
        QMETA *meta;
 
602
        QUEUE *t;
 
603
        QUEUE_CURSOR *cp;
 
604
        db_indx_t save_indx;
 
605
        db_lockmode_t lock_mode;
 
606
        db_pgno_t metapno, save_page;
 
607
        db_recno_t current, first, save_recno;
 
608
        qam_position_mode mode;
 
609
        u_int32_t rec_extent;
 
610
        int exact, is_first, locked, ret, t_ret, wait, with_delete;
 
611
        int put_mode, meta_dirty, retrying, wrapped;
 
612
 
 
613
        cp = (QUEUE_CURSOR *)dbc->internal;
 
614
        dbp = dbc->dbp;
 
615
 
 
616
        PANIC_CHECK(dbp->dbenv);
 
617
 
 
618
        wait = 0;
 
619
        with_delete = 0;
 
620
        retrying = 0;
 
621
        rec_extent = 0;
 
622
        lock_mode = DB_LOCK_READ;
 
623
        put_mode = 0;
 
624
        t_ret = 0;
 
625
        *pgnop = 0;
 
626
        pg = NULL;
 
627
 
 
628
        mode = QAM_READ;
 
629
        if (F_ISSET(dbc, DBC_RMW)) {
 
630
                lock_mode = DB_LOCK_WRITE;
 
631
                mode = QAM_WRITE;
 
632
        }
 
633
 
 
634
        if (flags == DB_CONSUME_WAIT) {
 
635
                wait = 1;
 
636
                flags = DB_CONSUME;
 
637
        }
 
638
        if (flags == DB_CONSUME) {
 
639
                DB_CHECK_TXN(dbp, dbc->txn);
 
640
                with_delete = 1;
 
641
                flags = DB_FIRST;
 
642
                lock_mode = DB_LOCK_WRITE;
 
643
                mode = QAM_CONSUME;
 
644
        }
 
645
 
 
646
        DEBUG_LREAD(dbc, dbc->txn, "qam_c_get",
 
647
            flags == DB_SET || flags == DB_SET_RANGE ? key : NULL, NULL, flags);
 
648
 
 
649
        is_first = 0;
 
650
 
 
651
        t = (QUEUE *)dbp->q_internal;
 
652
        /* get the meta page */
 
653
        metapno = t->q_meta;
 
654
        if ((ret = __db_lget(dbc, 0, metapno, lock_mode, 0, &metalock)) != 0)
 
655
                return (ret);
 
656
        locked = 1;
 
657
        if ((ret = memp_fget(dbp->mpf, &metapno, 0, &meta)) != 0) {
 
658
                /* We did not fetch it, we can release the lock. */
 
659
                (void)__LPUT(dbc, metalock);
 
660
                return (ret);
 
661
        }
 
662
 
 
663
        first = 0;
 
664
 
 
665
        /* Make lint and friends happy. */
 
666
        meta_dirty = 0;
 
667
 
 
668
        /* Release any previous lock if not in a transaction. */
 
669
        (void)__TLPUT(dbc, cp->lock);
 
670
 
 
671
retry:  /* Update the record number. */
 
672
        switch (flags) {
 
673
        case DB_CURRENT:
 
674
                break;
 
675
        case DB_NEXT_DUP:
 
676
                ret = DB_NOTFOUND;
 
677
                goto err;
 
678
                /* NOTREACHED */
 
679
        case DB_NEXT:
 
680
        case DB_NEXT_NODUP:
 
681
                if (cp->recno != RECNO_OOB) {
 
682
                        ++cp->recno;
 
683
                        /* Wrap around, skipping zero. */
 
684
                        if (cp->recno == RECNO_OOB)
 
685
                                cp->recno++;
 
686
                        break;
 
687
                }
 
688
                /* FALLTHROUGH */
 
689
        case DB_FIRST:
 
690
                flags = DB_NEXT;
 
691
                is_first = 1;
 
692
 
 
693
                /* get the first record number */
 
694
                cp->recno = first = meta->first_recno;
 
695
 
 
696
                break;
 
697
        case DB_PREV:
 
698
        case DB_PREV_NODUP:
 
699
                if (cp->recno != RECNO_OOB) {
 
700
                        if (QAM_BEFORE_FIRST(meta, cp->recno)
 
701
                            || cp->recno == meta->first_recno) {
 
702
                                ret = DB_NOTFOUND;
 
703
                                goto err;
 
704
                        }
 
705
                        --cp->recno;
 
706
                        /* Wrap around, skipping zero. */
 
707
                        if (cp->recno == RECNO_OOB)
 
708
                                --cp->recno;
 
709
                        break;
 
710
                }
 
711
                /* FALLTHROUGH */
 
712
        case DB_LAST:
 
713
                if (meta->first_recno == meta->cur_recno) {
 
714
                        ret = DB_NOTFOUND;
 
715
                        goto err;
 
716
                }
 
717
                cp->recno = meta->cur_recno - 1;
 
718
                if (cp->recno == RECNO_OOB)
 
719
                        cp->recno--;
 
720
                break;
 
721
        case DB_GET_BOTH:
 
722
        case DB_SET:
 
723
        case DB_SET_RANGE:
 
724
                if ((ret = __qam_getno(dbp, key, &cp->recno)) != 0)
 
725
                        goto err;
 
726
                break;
 
727
        default:
 
728
                ret = __db_unknown_flag(dbp->dbenv, "__qam_c_get", flags);
 
729
                goto err;
 
730
        }
 
731
 
 
732
        /*
 
733
         * Check to see if we are out of data.  Current points to
 
734
         * the first free slot.
 
735
         */
 
736
        if (cp->recno == meta->cur_recno ||
 
737
            QAM_AFTER_CURRENT(meta, cp->recno)) {
 
738
                ret = DB_NOTFOUND;
 
739
                pg = NULL;
 
740
                if (wait) {
 
741
                        flags = DB_FIRST;
 
742
                        /*
 
743
                         * If first is not set, then we skipped a
 
744
                         * locked record, go back and find it.
 
745
                         * If we find a locked record again
 
746
                         * wait for it.
 
747
                         */
 
748
                        if (first == 0) {
 
749
                                retrying = 1;
 
750
                                goto retry;
 
751
                        }
 
752
                        if (CDB_LOCKING(dbp->dbenv)) {
 
753
                                if ((ret = lock_get(dbp->dbenv, dbc->locker,
 
754
                                    DB_LOCK_SWITCH, &dbc->lock_dbt,
 
755
                                    DB_LOCK_WAIT, &dbc->mylock)) != 0)
 
756
                                        goto err;
 
757
                                if ((ret = lock_get(dbp->dbenv, dbc->locker,
 
758
                                    DB_LOCK_UPGRADE, &dbc->lock_dbt, DB_LOCK_WRITE,
 
759
                                    &dbc->mylock)) != 0)
 
760
                                        goto err;
 
761
                                goto retry;
 
762
                        }
 
763
                        /*
 
764
                         * Wait for someone to update the meta page.
 
765
                         * This will probably mean there is something
 
766
                         * in the queue.  We then go back up and
 
767
                         * try again.
 
768
                         */
 
769
                        if (locked == 0) {
 
770
                                if ((ret = __db_lget( dbc,
 
771
                                    0, metapno, lock_mode, 0, &metalock)) != 0)
 
772
                                        goto err;
 
773
                                locked = 1;
 
774
                                if (cp->recno != RECNO_OOB &&
 
775
                                    !QAM_AFTER_CURRENT(meta, cp->recno))
 
776
                                        goto retry;
 
777
                        }
 
778
                        if ((ret = __db_lget(dbc, 0, metapno,
 
779
                            DB_LOCK_WAIT, DB_LOCK_SWITCH, &metalock)) != 0)
 
780
                                goto err;
 
781
                        if ((ret = lock_get(dbp->dbenv, dbc->locker,
 
782
                            DB_LOCK_UPGRADE, &dbc->lock_dbt, DB_LOCK_WRITE,
 
783
                            &metalock)) != 0)
 
784
                                goto err;
 
785
                        locked = 1;
 
786
                        goto retry;
 
787
                }
 
788
 
 
789
                goto err;
 
790
        }
 
791
 
 
792
        /* Don't hold the meta page long term. */
 
793
        if (locked) {
 
794
                if ((ret = __LPUT(dbc, metalock)) != 0)
 
795
                        goto err;
 
796
                locked = 0;
 
797
        }
 
798
 
 
799
        /* Lock the record. */
 
800
        if ((ret = __db_lget(dbc, 0, cp->recno, lock_mode,
 
801
            (with_delete && !retrying) ?
 
802
            DB_LOCK_NOWAIT | DB_LOCK_RECORD : DB_LOCK_RECORD,
 
803
            &lock)) == DB_LOCK_NOTGRANTED && with_delete) {
 
804
#ifdef QDEBUG
 
805
                __db_logmsg(dbp->dbenv,
 
806
                    dbc->txn, "Queue S", 0, "%x %d %d %d",
 
807
                    dbc->locker, cp->recno, first, meta->first_recno);
 
808
#endif
 
809
                first = 0;
 
810
                goto retry;
 
811
        }
 
812
 
 
813
        if (ret != 0)
 
814
                goto err;
 
815
 
 
816
        /*
 
817
         * In the DB_FIRST or DB_LAST cases we must wait and then start over
 
818
         * since the first/last may have moved while we slept.
 
819
         * We release our locks and try again.
 
820
         */
 
821
        if ((!with_delete && is_first) || flags == DB_LAST) {
 
822
                if ((ret =
 
823
                    __db_lget(dbc, 0, metapno, lock_mode, 0, &metalock)) != 0)
 
824
                        goto err;
 
825
                if (cp->recno !=
 
826
                    (is_first ? meta->first_recno : (meta->cur_recno - 1))) {
 
827
                        __LPUT(dbc, lock);
 
828
                        if (is_first)
 
829
                                flags = DB_FIRST;
 
830
                        locked = 1;
 
831
                        goto retry;
 
832
                }
 
833
                /* Don't hold the meta page long term. */
 
834
                if ((ret = __LPUT(dbc, metalock)) != 0)
 
835
                        goto err;
 
836
        }
 
837
 
 
838
        /* Position the cursor on the record. */
 
839
        if ((ret = __qam_position(dbc, &cp->recno, mode, &exact)) != 0) {
 
840
                /* We cannot get the page, release the record lock. */
 
841
                (void)__LPUT(dbc, lock);
 
842
                goto err;
 
843
        }
 
844
 
 
845
        pg = cp->page;
 
846
        pglock = cp->lock;
 
847
        cp->lock = lock;
 
848
        cp->lock_mode = lock_mode;
 
849
 
 
850
        if (!exact) {
 
851
                if (flags == DB_NEXT || flags == DB_NEXT_NODUP
 
852
                    || flags == DB_PREV || flags == DB_PREV_NODUP
 
853
                    || flags == DB_LAST) {
 
854
                        /* Release locks and try again. */
 
855
                        if (pg != NULL)
 
856
                                (void)__qam_fput(dbp, cp->pgno, pg, 0);
 
857
                        cp->page = pg = NULL;
 
858
                        (void)__LPUT(dbc, pglock);
 
859
                        (void)__LPUT(dbc, cp->lock);
 
860
                        if (flags == DB_LAST)
 
861
                                flags = DB_PREV;
 
862
                        if (!with_delete)
 
863
                                is_first = 0;
 
864
                        retrying = 0;
 
865
                        goto retry;
 
866
                }
 
867
                /* this is for the SET and SET_RANGE cases */
 
868
                ret = DB_KEYEMPTY;
 
869
                goto err1;
 
870
        }
 
871
 
 
872
        /* Return the key if the user didn't give us one. */
 
873
        if (key != NULL && flags != DB_SET && flags != DB_GET_BOTH &&
 
874
            (ret = __db_retcopy(dbp, key, &cp->recno, sizeof(cp->recno),
 
875
            &dbc->rkey->data, &dbc->rkey->ulen)) != 0)
 
876
                goto err1;
 
877
 
 
878
        if (key != NULL)
 
879
                F_SET(key, DB_DBT_ISSET);
 
880
 
 
881
        qp = QAM_GET_RECORD(dbp, pg, cp->indx);
 
882
 
 
883
        /* Return the data item. */
 
884
        if (flags == DB_GET_BOTH) {
 
885
                /*
 
886
                 * Need to compare
 
887
                 */
 
888
                tmp.data = qp->data;
 
889
                tmp.size = t->re_len;
 
890
                if ((ret = __bam_defcmp(dbp, data, &tmp)) != 0) {
 
891
                        ret = DB_NOTFOUND;
 
892
                        goto err1;
 
893
                }
 
894
        }
 
895
        if (data != NULL
 
896
            && !F_ISSET(dbc, DBC_MULTIPLE|DBC_MULTIPLE_KEY)
 
897
            && (ret = __db_retcopy(dbp, data,
 
898
            qp->data, t->re_len, &dbc->rdata->data, &dbc->rdata->ulen)) != 0)
 
899
                goto err1;
 
900
 
 
901
        if (data != NULL)
 
902
                F_SET(data, DB_DBT_ISSET);
 
903
 
 
904
        /* Finally, if we are doing DB_CONSUME mark the record. */
 
905
        if (with_delete) {
 
906
                /*
 
907
                 * Assert that we're not a secondary index.  Doing a DB_CONSUME
 
908
                 * on a secondary makes very little sense, since one can't
 
909
                 * DB_APPEND there;  attempting one should be forbidden by
 
910
                 * the interface.
 
911
                 */
 
912
                DB_ASSERT(!F_ISSET(dbp, DB_AM_SECONDARY));
 
913
 
 
914
                /*
 
915
                 * Check and see if we *have* any secondary indices.
 
916
                 * If we do, we're a primary, so call __db_c_del_primary
 
917
                 * to delete the references to the item we're about to
 
918
                 * delete.
 
919
                 *
 
920
                 * Note that we work on a duplicated cursor, since the
 
921
                 * __db_ret work has already been done, so it's not safe
 
922
                 * to perform any additional ops on this cursor.
 
923
                 */
 
924
                if (LIST_FIRST(&dbp->s_secondaries) != NULL) {
 
925
                        if ((ret = __db_c_idup(dbc,
 
926
                            &dbcdup, DB_POSITIONI)) != 0)
 
927
                                goto err1;
 
928
 
 
929
                        if ((ret = __db_c_del_primary(dbcdup)) != 0) {
 
930
                                /*
 
931
                                 * The __db_c_del_primary return is more
 
932
                                 * interesting.
 
933
                                 */
 
934
                                (void)dbcdup->c_close(dbcdup);
 
935
                                goto err1;
 
936
                        }
 
937
 
 
938
                        if ((ret = dbcdup->c_close(dbcdup)) != 0)
 
939
                                goto err1;
 
940
                }
 
941
 
 
942
                if (DB_LOGGING(dbc)) {
 
943
                        if (t->page_ext == 0 || t->re_len == 0) {
 
944
                                if ((ret = __qam_del_log(dbp->dbenv, dbc->txn,
 
945
                                    &LSN(pg), 0, dbp->log_fileid, &LSN(pg),
 
946
                                    pg->pgno, cp->indx, cp->recno)) != 0)
 
947
                                        goto err1;
 
948
                        } else {
 
949
                                tmp.data = qp->data;
 
950
                                tmp.size = t->re_len;
 
951
                                if ((ret =
 
952
                                   __qam_delext_log(dbp->dbenv, dbc->txn,
 
953
                                   &LSN(pg), 0, dbp->log_fileid, &LSN(pg),
 
954
                                   pg->pgno, cp->indx, cp->recno, &tmp)) != 0)
 
955
                                        goto err1;
 
956
                        }
 
957
                }
 
958
 
 
959
                F_CLR(qp, QAM_VALID);
 
960
                put_mode = DB_MPOOL_DIRTY;
 
961
 
 
962
                if ((ret = __LPUT(dbc, pglock)) != 0)
 
963
                        goto err1;
 
964
 
 
965
                /*
 
966
                 * Now we need to update the metapage
 
967
                 * first pointer. If we have deleted
 
968
                 * the record that is pointed to by
 
969
                 * first_recno then we move it as far
 
970
                 * forward as we can without blocking.
 
971
                 * The metapage lock must be held for
 
972
                 * the whole scan otherwise someone could
 
973
                 * do a random insert behind where we are
 
974
                 * looking.
 
975
                 */
 
976
 
 
977
                if (locked == 0 && (ret = __db_lget(
 
978
                    dbc, 0, metapno, lock_mode, 0, &metalock)) != 0)
 
979
                        goto err1;
 
980
                locked = 1;
 
981
#ifdef QDEBUG
 
982
                __db_logmsg(dbp->dbenv,
 
983
                    dbc->txn, "Queue D", 0, "%x %d %d %d",
 
984
                    dbc->locker, cp->recno, first, meta->first_recno);
 
985
#endif
 
986
                /*
 
987
                 * See if we deleted the "first" record.  If
 
988
                 * first is zero then we skipped something,
 
989
                 * see if first_recno has been move passed
 
990
                 * that to the record that we deleted.
 
991
                 */
 
992
                if (first == 0)
 
993
                        first = cp->recno;
 
994
                if (first != meta->first_recno)
 
995
                        goto done;
 
996
 
 
997
                save_page = cp->pgno;
 
998
                save_indx = cp->indx;
 
999
                save_recno = cp->recno;
 
1000
                save_lock = cp->lock;
 
1001
 
 
1002
                /*
 
1003
                 * If we skipped some deleted records, we need to
 
1004
                 * reposition on the first one.  Get a lock
 
1005
                 * in case someone is trying to put it back.
 
1006
                 */
 
1007
                if (first != cp->recno) {
 
1008
                        ret = __db_lget(dbc, 0, first, DB_LOCK_READ,
 
1009
                            DB_LOCK_NOWAIT | DB_LOCK_RECORD, &lock);
 
1010
                        if (ret == DB_LOCK_NOTGRANTED) {
 
1011
                                ret = 0;
 
1012
                                goto done;
 
1013
                        }
 
1014
                        if (ret != 0)
 
1015
                                goto err1;
 
1016
                        if ((ret =
 
1017
                            __qam_fput(dbp, cp->pgno, cp->page, put_mode)) != 0)
 
1018
                                goto err1;
 
1019
                        cp->page = NULL;
 
1020
                        put_mode = 0;
 
1021
                        if ((ret = __qam_position(dbc,
 
1022
                            &first, QAM_READ, &exact)) != 0 || exact != 0) {
 
1023
                                (void)__LPUT(dbc, lock);
 
1024
                                goto err1;
 
1025
                        }
 
1026
                        if ((ret =__LPUT(dbc, lock)) != 0)
 
1027
                                goto err1;
 
1028
                        if ((ret = __LPUT(dbc, cp->lock)) != 0)
 
1029
                                goto err1;
 
1030
                }
 
1031
 
 
1032
                current = meta->cur_recno;
 
1033
                wrapped = 0;
 
1034
                if (first > current)
 
1035
                        wrapped = 1;
 
1036
                rec_extent = meta->page_ext * meta->rec_page;
 
1037
 
 
1038
                /* Loop until we find a record or hit current */
 
1039
                for (;;) {
 
1040
                        /*
 
1041
                         * Check to see if we are moving off the extent
 
1042
                         * and remove the extent.
 
1043
                         * If we are moving off a page we need to
 
1044
                         * get rid of the buffer.
 
1045
                         * Wait for the lagging readers to move off the
 
1046
                         * page.
 
1047
                         */
 
1048
                        if (cp->page != NULL && rec_extent != 0
 
1049
                            && ((exact = (first % rec_extent == 0))
 
1050
                            || first % meta->rec_page == 0
 
1051
                            || first == UINT32_T_MAX)) {
 
1052
                                if (exact == 1 && (ret = __db_lget(dbc,
 
1053
                                    0, cp->pgno, DB_LOCK_WRITE, 0, &cp->lock)) != 0)
 
1054
                                        break;
 
1055
 
 
1056
#ifdef QDEBUG
 
1057
                                __db_logmsg(dbp->dbenv,
 
1058
                                    dbc->txn, "Queue R", 0, "%x %d %d %d",
 
1059
                                    dbc->locker, cp->pgno, first, meta->first_recno);
 
1060
#endif
 
1061
                                put_mode |= DB_MPOOL_DISCARD;
 
1062
                                if ((ret = __qam_fput(dbp,
 
1063
                                    cp->pgno, cp->page, put_mode)) != 0)
 
1064
                                        break;
 
1065
                                cp->page = NULL;
 
1066
 
 
1067
                                if (exact == 1) {
 
1068
                                        ret = __qam_fremove(dbp, cp->pgno);
 
1069
                                        t_ret = __LPUT(dbc, cp->lock);
 
1070
                                }
 
1071
                                if (ret != 0)
 
1072
                                        break;
 
1073
                                if (t_ret != 0) {
 
1074
                                        ret = t_ret;
 
1075
                                        break;
 
1076
                                }
 
1077
                        } else if (cp->page != NULL && (ret =
 
1078
                            __qam_fput(dbp, cp->pgno, cp->page, put_mode)) != 0)
 
1079
                                break;
 
1080
                        cp->page = NULL;
 
1081
                        first++;
 
1082
                        if (first == RECNO_OOB) {
 
1083
                                wrapped = 0;
 
1084
                                first++;
 
1085
                        }
 
1086
 
 
1087
                        /*
 
1088
                         * LOOP EXIT when we come move to the current
 
1089
                         * pointer.
 
1090
                         */
 
1091
                        if (!wrapped && first >= current)
 
1092
                                break;
 
1093
 
 
1094
                        ret = __db_lget(dbc, 0, first, DB_LOCK_READ,
 
1095
                            DB_LOCK_NOWAIT | DB_LOCK_RECORD, &lock);
 
1096
                        if (ret == DB_LOCK_NOTGRANTED) {
 
1097
                                ret = 0;
 
1098
                                break;
 
1099
                        }
 
1100
                        if (ret != 0)
 
1101
                                break;
 
1102
 
 
1103
                        if ((ret = __qam_position(dbc,
 
1104
                            &first, QAM_READ, &exact)) != 0) {
 
1105
                                (void)__LPUT(dbc, lock);
 
1106
                                break;
 
1107
                        }
 
1108
                        put_mode = 0;
 
1109
                        if ((ret =__LPUT(dbc, lock)) != 0
 
1110
                            || (ret = __LPUT(dbc, cp->lock)) != 0 ||exact) {
 
1111
                                if ((t_ret = __qam_fput(dbp, cp->pgno,
 
1112
                                    cp->page, put_mode)) != 0 && ret == 0)
 
1113
                                        ret = t_ret;
 
1114
                                cp->page = NULL;
 
1115
                                break;
 
1116
                        }
 
1117
                }
 
1118
 
 
1119
                cp->pgno = save_page;
 
1120
                cp->indx = save_indx;
 
1121
                cp->recno = save_recno;
 
1122
                cp->lock = save_lock;
 
1123
 
 
1124
                /*
 
1125
                 * We have advanced as far as we can.
 
1126
                 * Advance first_recno to this point.
 
1127
                 */
 
1128
                if (ret == 0 && meta->first_recno != first) {
 
1129
#ifdef QDEBUG
 
1130
                __db_logmsg(dbp->dbenv, dbc->txn, "Queue M",
 
1131
                    0, "%x %d %d %d", dbc->locker, cp->recno,
 
1132
                    first, meta->first_recno);
 
1133
#endif
 
1134
                        if (DB_LOGGING(dbc))
 
1135
                                if ((ret =
 
1136
                                     __qam_incfirst_log(dbp->dbenv,
 
1137
                                     dbc->txn, &meta->dbmeta.lsn, 0,
 
1138
                                     dbp->log_fileid, cp->recno)) != 0)
 
1139
                                        goto err;
 
1140
                        meta->first_recno = first;
 
1141
                        meta_dirty = 1;
 
1142
                }
 
1143
        }
 
1144
 
 
1145
done:
 
1146
err1:   if (cp->page != NULL) {
 
1147
                t_ret = __qam_fput(dbp, cp->pgno, cp->page, put_mode);
 
1148
 
 
1149
                if (!ret)
 
1150
                        ret = t_ret;
 
1151
                /* Doing record locking, release the page lock */
 
1152
                t_ret = __LPUT(dbc, pglock);
 
1153
                cp->page = NULL;
 
1154
        }
 
1155
 
 
1156
err:    if (!ret)
 
1157
                ret = t_ret;
 
1158
        if (meta) {
 
1159
 
 
1160
                /* release the meta page */
 
1161
                t_ret = memp_fput(
 
1162
                    dbp->mpf, meta, meta_dirty ? DB_MPOOL_DIRTY : 0);
 
1163
 
 
1164
                if (!ret)
 
1165
                        ret = t_ret;
 
1166
 
 
1167
                /* Don't hold the meta page long term. */
 
1168
                if (locked)
 
1169
                        t_ret = __LPUT(dbc, metalock);
 
1170
        }
 
1171
        DB_ASSERT(!LOCK_ISSET(metalock));
 
1172
 
 
1173
        /*
 
1174
         * There is no need to keep the record locked if we are
 
1175
         * not in a transaction.
 
1176
         */
 
1177
        if (t_ret == 0)
 
1178
                t_ret = __TLPUT(dbc, cp->lock);
 
1179
 
 
1180
        return (ret ? ret : t_ret);
 
1181
}
 
1182
 
 
1183
static int
 
1184
__qam_bulk(dbc, data, flags)
 
1185
        DBC *dbc;
 
1186
        DBT *data;
 
1187
        u_int32_t flags;
 
1188
{
 
1189
        DB *dbp;
 
1190
        DB_LOCK metalock;
 
1191
        PAGE *pg;
 
1192
        QMETA *meta;
 
1193
        QAMDATA *qp;
 
1194
        QUEUE_CURSOR *cp;
 
1195
        db_indx_t indx;
 
1196
        db_pgno_t metapno;
 
1197
        int32_t  *endp, *offp;
 
1198
        u_int8_t *dbuf, *dp, *np;
 
1199
        int exact, mode, recs, re_len, ret, t_ret, valid;
 
1200
        int is_key, need_pg, pagesize, size, space;
 
1201
 
 
1202
        size = 0;
 
1203
        mode = QAM_READ;
 
1204
        if (F_ISSET(dbc, DBC_RMW))
 
1205
                mode = QAM_WRITE;
 
1206
 
 
1207
        dbp = dbc->dbp;
 
1208
        cp = (QUEUE_CURSOR *)dbc->internal;
 
1209
 
 
1210
        pagesize = dbc->dbp->pgsize;
 
1211
        re_len = ((QUEUE *)dbc->dbp->q_internal)->re_len;
 
1212
        recs = ((QUEUE *)dbc->dbp->q_internal)->rec_page;
 
1213
        metapno = ((QUEUE *)dbc->dbp->q_internal)->q_meta;
 
1214
 
 
1215
        is_key = LF_ISSET(DB_MULTIPLE_KEY) ? 1 : 0;
 
1216
 
 
1217
        if ((ret = __db_lget(dbc, 0, metapno, DB_LOCK_READ, 0, &metalock)) != 0)
 
1218
                return (ret);
 
1219
        if ((ret = memp_fget(dbp->mpf, &metapno, 0, &meta)) != 0) {
 
1220
                /* We did not fetch it, we can release the lock. */
 
1221
                (void)__LPUT(dbc, metalock);
 
1222
                return (ret);
 
1223
        }
 
1224
 
 
1225
        dbuf = data->data;
 
1226
        np = dp = dbuf;
 
1227
 
 
1228
        /* Keep track of space that is left.  There is an termination entry */
 
1229
        space = data->ulen;
 
1230
        space -= sizeof(*offp);
 
1231
 
 
1232
        /* Build the offset/size table form the end up. */
 
1233
        endp = (int32_t *) ((u_int8_t *)dbuf + data->ulen);
 
1234
        endp--;
 
1235
        offp = endp;
 
1236
 
 
1237
next_pg:
 
1238
        if ((ret = __qam_position(dbc, &cp->recno, mode, &exact)) != 0)
 
1239
                goto done;
 
1240
 
 
1241
        pg = cp->page;
 
1242
        indx = cp->indx;
 
1243
        need_pg = 1;
 
1244
 
 
1245
        do {
 
1246
                /*
 
1247
                 * If this page is a nonexistent page at the end of an
 
1248
                 * extent, pg may be NULL.  A NULL page has no valid records,
 
1249
                 * so just keep looping as though qp exists and isn't QAM_VALID;
 
1250
                 * calling QAM_GET_RECORD is unsafe.
 
1251
                 */
 
1252
                valid = 0;
 
1253
                if (pg != NULL) {
 
1254
                        qp = QAM_GET_RECORD(dbp, pg, indx);
 
1255
                        if (F_ISSET(qp, QAM_VALID)) {
 
1256
                                valid = 1;
 
1257
                                space -= (is_key ? 3 : 2) * sizeof(*offp);
 
1258
                                if (space < 0)
 
1259
                                        goto get_space;
 
1260
                                if (need_pg) {
 
1261
                                        dp = np;
 
1262
                                        size = pagesize - sizeof(QPAGE);
 
1263
                                        if (space < size) {
 
1264
get_space:
 
1265
                                                if (offp == endp) {
 
1266
                                                        data->size =
 
1267
                                                            ALIGN(size +
 
1268
                                                            pagesize,
 
1269
                                                            sizeof(u_int32_t));
 
1270
                                                        ret = ENOMEM;
 
1271
                                                        break;
 
1272
                                                }
 
1273
                                                if (indx != 0)
 
1274
                                                        indx--;
 
1275
                                                cp->recno--;
 
1276
                                                break;
 
1277
                                        }
 
1278
                                        memcpy(dp,
 
1279
                                            (char *)pg + sizeof(QPAGE), size);
 
1280
                                        need_pg = 0;
 
1281
                                        space -= size;
 
1282
                                        np += size;
 
1283
                                }
 
1284
                                if (is_key)
 
1285
                                        *offp-- = cp->recno;
 
1286
                                *offp-- = (u_int8_t*)qp - (u_int8_t*)pg -
 
1287
                                    sizeof(QPAGE) + dp - dbuf +
 
1288
                                    SSZA(QAMDATA, data);
 
1289
                                *offp-- = re_len;
 
1290
                        }
 
1291
                }
 
1292
                if (!valid && is_key == 0) {
 
1293
                        *offp-- = 0;
 
1294
                        *offp-- = 0;
 
1295
                }
 
1296
                cp->recno++;
 
1297
        } while (++indx < recs && cp->recno < meta->cur_recno);
 
1298
 
 
1299
        if ((t_ret = __TLPUT(dbc, cp->lock)) != 0 && ret == 0)
 
1300
                ret = t_ret;
 
1301
 
 
1302
        if (cp->page != NULL) {
 
1303
                if ((t_ret =
 
1304
                    __qam_fput(dbp, cp->pgno, cp->page, 0)) != 0 && ret == 0)
 
1305
                        ret = t_ret;
 
1306
                cp->page = NULL;
 
1307
        }
 
1308
 
 
1309
        if (ret == 0 && indx >= recs && cp->recno < meta->cur_recno)
 
1310
                goto next_pg;
 
1311
 
 
1312
        *offp = -1;
 
1313
 
 
1314
done:
 
1315
        /* release the meta page */
 
1316
        t_ret = memp_fput(dbp->mpf, meta, 0);
 
1317
 
 
1318
        if (!ret)
 
1319
                ret = t_ret;
 
1320
 
 
1321
        t_ret = __LPUT(dbc, metalock);
 
1322
 
 
1323
        return (ret);
 
1324
}
 
1325
 
 
1326
/*
 
1327
 * __qam_c_close --
 
1328
 *      Close down the cursor from a single use.
 
1329
 */
 
1330
static int
 
1331
__qam_c_close(dbc, root_pgno, rmroot)
 
1332
        DBC *dbc;
 
1333
        db_pgno_t root_pgno;
 
1334
        int *rmroot;
 
1335
{
 
1336
        QUEUE_CURSOR *cp;
 
1337
 
 
1338
        COMPQUIET(root_pgno, 0);
 
1339
        COMPQUIET(rmroot, NULL);
 
1340
 
 
1341
        cp = (QUEUE_CURSOR *)dbc->internal;
 
1342
 
 
1343
        /* Discard any locks not acquired inside of a transaction. */
 
1344
        (void)__TLPUT(dbc, cp->lock);
 
1345
        LOCK_INIT(cp->lock);
 
1346
 
 
1347
        cp->page = NULL;
 
1348
        cp->pgno = PGNO_INVALID;
 
1349
        cp->indx = 0;
 
1350
        cp->lock_mode = DB_LOCK_NG;
 
1351
        cp->recno = RECNO_OOB;
 
1352
        cp->flags = 0;
 
1353
 
 
1354
        return (0);
 
1355
}
 
1356
 
 
1357
/*
 
1358
 * __qam_c_dup --
 
1359
 *      Duplicate a queue cursor, such that the new one holds appropriate
 
1360
 *      locks for the position of the original.
 
1361
 *
 
1362
 * PUBLIC: int __qam_c_dup __P((DBC *, DBC *));
 
1363
 */
 
1364
int
 
1365
__qam_c_dup(orig_dbc, new_dbc)
 
1366
        DBC *orig_dbc, *new_dbc;
 
1367
{
 
1368
        QUEUE_CURSOR *orig, *new;
 
1369
 
 
1370
        orig = (QUEUE_CURSOR *)orig_dbc->internal;
 
1371
        new = (QUEUE_CURSOR *)new_dbc->internal;
 
1372
 
 
1373
        new->recno = orig->recno;
 
1374
 
 
1375
        /* reget the long term lock if we are not in a xact */
 
1376
        if (orig_dbc->txn != NULL ||
 
1377
            !STD_LOCKING(orig_dbc) || !LOCK_ISSET(orig->lock))
 
1378
                return (0);
 
1379
 
 
1380
        return (__db_lget(new_dbc,
 
1381
            0, new->recno, new->lock_mode, DB_LOCK_RECORD, &new->lock));
 
1382
}
 
1383
 
 
1384
/*
 
1385
 * __qam_c_init
 
1386
 *
 
1387
 * PUBLIC: int __qam_c_init __P((DBC *));
 
1388
 */
 
1389
int
 
1390
__qam_c_init(dbc)
 
1391
        DBC *dbc;
 
1392
{
 
1393
        QUEUE_CURSOR *cp;
 
1394
        DB *dbp;
 
1395
        int ret;
 
1396
 
 
1397
        dbp = dbc->dbp;
 
1398
 
 
1399
        /* Allocate the internal structure. */
 
1400
        cp = (QUEUE_CURSOR *)dbc->internal;
 
1401
        if (cp == NULL) {
 
1402
                if ((ret =
 
1403
                    __os_calloc(dbp->dbenv, 1, sizeof(QUEUE_CURSOR), &cp)) != 0)
 
1404
                        return (ret);
 
1405
                dbc->internal = (DBC_INTERNAL *)cp;
 
1406
        }
 
1407
 
 
1408
        /* Initialize methods. */
 
1409
        dbc->c_close = __db_c_close;
 
1410
        dbc->c_count = __db_c_count;
 
1411
        dbc->c_del = __db_c_del;
 
1412
        dbc->c_dup = __db_c_dup;
 
1413
        dbc->c_get = dbc->c_real_get = __db_c_get;
 
1414
        dbc->c_pget = __db_c_pget;
 
1415
        dbc->c_put = __db_c_put;
 
1416
        dbc->c_am_bulk = __qam_bulk;
 
1417
        dbc->c_am_close = __qam_c_close;
 
1418
        dbc->c_am_del = __qam_c_del;
 
1419
        dbc->c_am_destroy = __qam_c_destroy;
 
1420
        dbc->c_am_get = __qam_c_get;
 
1421
        dbc->c_am_put = __qam_c_put;
 
1422
        dbc->c_am_writelock = NULL;
 
1423
 
 
1424
        return (0);
 
1425
}
 
1426
 
 
1427
/*
 
1428
 * __qam_c_destroy --
 
1429
 *      Close a single cursor -- internal version.
 
1430
 */
 
1431
static int
 
1432
__qam_c_destroy(dbc)
 
1433
        DBC *dbc;
 
1434
{
 
1435
        /* Discard the structures. */
 
1436
        __os_free(dbc->dbp->dbenv, dbc->internal, sizeof(QUEUE_CURSOR));
 
1437
 
 
1438
        return (0);
 
1439
}
 
1440
 
 
1441
/*
 
1442
 * __qam_getno --
 
1443
 *      Check the user's record number.
 
1444
 */
 
1445
static int
 
1446
__qam_getno(dbp, key, rep)
 
1447
        DB *dbp;
 
1448
        const DBT *key;
 
1449
        db_recno_t *rep;
 
1450
{
 
1451
        if ((*rep = *(db_recno_t *)key->data) == 0) {
 
1452
                __db_err(dbp->dbenv, "illegal record number of 0");
 
1453
                return (EINVAL);
 
1454
        }
 
1455
        return (0);
 
1456
}
 
1457
 
 
1458
/*
 
1459
 * __qam_truncate --
 
1460
 *      Truncate a queue database
 
1461
 *
 
1462
 * PUBLIC: int __qam_truncate __P((DB *, DB_TXN *, u_int32_t *));
 
1463
 */
 
1464
int
 
1465
__qam_truncate(dbp, txn, countp)
 
1466
        DB *dbp;
 
1467
        DB_TXN *txn;
 
1468
        u_int32_t *countp;
 
1469
{
 
1470
        DB_LOCK metalock;
 
1471
        DBC *dbc;
 
1472
        QMETA *meta;
 
1473
        db_pgno_t metapno;
 
1474
        int count, ret, t_ret;
 
1475
 
 
1476
        /* Acquire a cursor. */
 
1477
        if ((ret = dbp->cursor(dbp, txn, &dbc, 0)) != 0)
 
1478
                return (ret);
 
1479
 
 
1480
        /* Walk the queue, counting rows. */
 
1481
        count = 0;
 
1482
        while ((ret = __qam_c_get(dbc, NULL, NULL, DB_CONSUME, &metapno)) == 0)
 
1483
                count++;
 
1484
 
 
1485
        if (ret == DB_NOTFOUND)
 
1486
                ret = 0;
 
1487
 
 
1488
        /* Discard the cursor. */
 
1489
        if ((t_ret = dbc->c_close(dbc)) != 0 && ret == 0)
 
1490
                ret = t_ret;
 
1491
 
 
1492
        if (ret != 0)
 
1493
                return (ret);
 
1494
 
 
1495
        /* update the meta page */
 
1496
        /* get the meta page */
 
1497
        metapno = ((QUEUE *)dbp->q_internal)->q_meta;
 
1498
        if ((ret =
 
1499
            __db_lget(dbc, 0, metapno, DB_LOCK_WRITE, 0, &metalock)) != 0)
 
1500
                return (ret);
 
1501
 
 
1502
        if ((ret = memp_fget(dbp->mpf, &metapno, 0, &meta)) != 0) {
 
1503
                /* We did not fetch it, we can release the lock. */
 
1504
                (void)__LPUT(dbc, metalock);
 
1505
                return (ret);
 
1506
        }
 
1507
        if (DB_LOGGING(dbc)) {
 
1508
                ret = __qam_mvptr_log(dbp->dbenv, dbc->txn,
 
1509
                    &meta->dbmeta.lsn, 0, QAM_SETCUR|QAM_SETFIRST|QAM_TRUNCATE,
 
1510
                    dbp->log_fileid, meta->first_recno, 1,
 
1511
                    meta->cur_recno, 1, &meta->dbmeta.lsn);
 
1512
        }
 
1513
        if (ret == 0)
 
1514
                meta->first_recno = meta->cur_recno = 1;
 
1515
 
 
1516
        if ((t_ret = memp_fput(dbp->mpf,
 
1517
            meta, ret == 0 ? DB_MPOOL_DIRTY: 0)) != 0 && ret == 0)
 
1518
                ret = t_ret;
 
1519
        if ((t_ret = __LPUT(dbc, metalock)) != 0 && ret == 0)
 
1520
                ret = t_ret;
 
1521
 
 
1522
        *countp = count;
 
1523
 
 
1524
        return (ret);
 
1525
}