~ubuntu-branches/ubuntu/maverick/evolution-data-server/maverick-proposed

« back to all changes in this revision

Viewing changes to libdb/qam/qam.c

  • Committer: Bazaar Package Importer
  • Author(s): Didier Roche
  • Date: 2010-05-17 17:02:06 UTC
  • mfrom: (1.1.79 upstream) (1.6.12 experimental)
  • Revision ID: james.westby@ubuntu.com-20100517170206-4ufr52vwrhh26yh0
Tags: 2.30.1-1ubuntu1
* Merge from debian experimental. Remaining change:
  (LP: #42199, #229669, #173703, #360344, #508494)
  + debian/control:
    - add Vcs-Bzr tag
    - don't use libgnome
    - Use Breaks instead of Conflicts against evolution 2.25 and earlier.
  + debian/evolution-data-server.install,
    debian/patches/45_libcamel_providers_version.patch:
    - use the upstream versioning, not a Debian-specific one 
  + debian/libedata-book1.2-dev.install, debian/libebackend-1.2-dev.install,
    debian/libcamel1.2-dev.install, debian/libedataserverui1.2-dev.install:
    - install html documentation
  + debian/rules:
    - don't build documentation it's shipped with the tarball

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
/*-
2
 
 * See the file LICENSE for redistribution information.
3
 
 *
4
 
 * Copyright (c) 1999-2002
5
 
 *      Sleepycat Software.  All rights reserved.
6
 
 */
7
 
 
8
 
#include "db_config.h"
9
 
 
10
 
#ifndef lint
11
 
static const char revid[] = "$Id$";
12
 
#endif /* not lint */
13
 
 
14
 
#ifndef NO_SYSTEM_INCLUDES
15
 
#include <sys/types.h>
16
 
 
17
 
#include <string.h>
18
 
#endif
19
 
 
20
 
#include "db_int.h"
21
 
#include "dbinc/db_page.h"
22
 
#include "dbinc/db_shash.h"
23
 
#include "dbinc/btree.h"
24
 
#include "dbinc/lock.h"
25
 
#include "dbinc/log.h"
26
 
#include "dbinc/qam.h"
27
 
 
28
 
static int __qam_bulk __P((DBC *, DBT *, u_int32_t));
29
 
static int __qam_c_close __P((DBC *, db_pgno_t, int *));
30
 
static int __qam_c_del __P((DBC *));
31
 
static int __qam_c_destroy __P((DBC *));
32
 
static int __qam_c_get __P((DBC *, DBT *, DBT *, u_int32_t, db_pgno_t *));
33
 
static int __qam_c_put __P((DBC *, DBT *, DBT *, u_int32_t, db_pgno_t *));
34
 
static int __qam_consume __P((DBC *, QMETA *, db_recno_t));
35
 
static int __qam_getno __P((DB *, const DBT *, db_recno_t *));
36
 
 
37
 
/*
38
 
 * __qam_position --
39
 
 *      Position a queued access method cursor at a record.  This returns
40
 
 *      the page locked.  *exactp will be set if the record is valid.
41
 
 * PUBLIC: int __qam_position
42
 
 * PUBLIC:       __P((DBC *, db_recno_t *, qam_position_mode, int *));
43
 
 */
44
 
int
45
 
__qam_position(dbc, recnop, mode, exactp)
46
 
        DBC *dbc;               /* open cursor */
47
 
        db_recno_t *recnop;     /* pointer to recno to find */
48
 
        qam_position_mode mode;/* locking: read or write */
49
 
        int *exactp;            /* indicate if it was found */
50
 
{
51
 
        QUEUE_CURSOR *cp;
52
 
        DB *dbp;
53
 
        QAMDATA  *qp;
54
 
        db_pgno_t pg;
55
 
        int ret;
56
 
 
57
 
        dbp = dbc->dbp;
58
 
        cp = (QUEUE_CURSOR *)dbc->internal;
59
 
 
60
 
        /* Fetch the page for this recno. */
61
 
        pg = QAM_RECNO_PAGE(dbp, *recnop);
62
 
 
63
 
        if ((ret = __db_lget(dbc, 0, pg, mode == QAM_READ ?
64
 
            DB_LOCK_READ : DB_LOCK_WRITE, 0, &cp->lock)) != 0)
65
 
                return (ret);
66
 
        cp->page = NULL;
67
 
        *exactp = 0;
68
 
        if ((ret = __qam_fget(dbp, &pg,
69
 
            mode == QAM_WRITE ? DB_MPOOL_CREATE : 0, &cp->page)) != 0) {
70
 
                /* We did not fetch it, we can release the lock. */
71
 
                (void)__LPUT(dbc, cp->lock);
72
 
                if (mode != QAM_WRITE &&
73
 
                    (ret == DB_PAGE_NOTFOUND || ret == ENOENT))
74
 
                        return (0);
75
 
                return (ret);
76
 
        }
77
 
        cp->pgno = pg;
78
 
        cp->indx = QAM_RECNO_INDEX(dbp, pg, *recnop);
79
 
 
80
 
        if (PGNO(cp->page) == 0) {
81
 
                if (F_ISSET(dbp, DB_AM_RDONLY)) {
82
 
                        *exactp = 0;
83
 
                        return (0);
84
 
                }
85
 
                PGNO(cp->page) = pg;
86
 
                TYPE(cp->page) = P_QAMDATA;
87
 
        }
88
 
 
89
 
        qp = QAM_GET_RECORD(dbp, cp->page, cp->indx);
90
 
        *exactp = F_ISSET(qp, QAM_VALID) ? 1 : 0;
91
 
 
92
 
        return (ret);
93
 
}
94
 
 
95
 
/*
96
 
 * __qam_pitem --
97
 
 *      Put an item on a queue page.  Copy the data to the page and set the
98
 
 *      VALID and SET bits.  If logging and the record was previously set,
99
 
 *      log that data, otherwise just log the new data.
100
 
 *
101
 
 *   pagep must be write locked
102
 
 *
103
 
 * PUBLIC: int __qam_pitem
104
 
 * PUBLIC:     __P((DBC *,  QPAGE *, u_int32_t, db_recno_t, DBT *));
105
 
 */
106
 
int
107
 
__qam_pitem(dbc, pagep, indx, recno, data)
108
 
        DBC *dbc;
109
 
        QPAGE *pagep;
110
 
        u_int32_t indx;
111
 
        db_recno_t recno;
112
 
        DBT *data;
113
 
{
114
 
        DB *dbp;
115
 
        DBT olddata, pdata, *datap;
116
 
        QAMDATA *qp;
117
 
        QUEUE *t;
118
 
        u_int32_t alloced;
119
 
        u_int8_t *dest, *p;
120
 
        int ret;
121
 
 
122
 
        alloced = ret = 0;
123
 
 
124
 
        dbp = dbc->dbp;
125
 
        t = (QUEUE *)dbp->q_internal;
126
 
 
127
 
        if (data->size > t->re_len)
128
 
                goto len_err;
129
 
 
130
 
        qp = QAM_GET_RECORD(dbp, pagep, indx);
131
 
 
132
 
        p = qp->data;
133
 
        datap = data;
134
 
        if (F_ISSET(data, DB_DBT_PARTIAL)) {
135
 
                if (data->doff + data->dlen > t->re_len) {
136
 
                        alloced = data->dlen;
137
 
                        goto len_err;
138
 
                }
139
 
                if (data->size != data->dlen) {
140
 
len_err:                __db_err(dbp->dbenv,
141
 
                            "Length improper for fixed length record %lu",
142
 
                            (u_long)(alloced ? alloced : data->size));
143
 
                        return (EINVAL);
144
 
                }
145
 
                if (data->size == t->re_len)
146
 
                        goto no_partial;
147
 
 
148
 
                /*
149
 
                 * If we are logging, then we have to build the record
150
 
                 * first, otherwise, we can simply drop the change
151
 
                 * directly on the page.  After this clause, make
152
 
                 * sure that datap and p are set up correctly so that
153
 
                 * copying datap into p does the right thing.
154
 
                 *
155
 
                 * Note, I am changing this so that if the existing
156
 
                 * record is not valid, we create a complete record
157
 
                 * to log so that both this and the recovery code is simpler.
158
 
                 */
159
 
 
160
 
                if (DBC_LOGGING(dbc) || !F_ISSET(qp, QAM_VALID)) {
161
 
                        datap = &pdata;
162
 
                        memset(datap, 0, sizeof(*datap));
163
 
 
164
 
                        if ((ret = __os_malloc(dbp->dbenv,
165
 
                            t->re_len, &datap->data)) != 0)
166
 
                                return (ret);
167
 
                        alloced = 1;
168
 
                        datap->size = t->re_len;
169
 
 
170
 
                        /*
171
 
                         * Construct the record if it's valid, otherwise set it
172
 
                         * all to the pad character.
173
 
                         */
174
 
                        dest = datap->data;
175
 
                        if (F_ISSET(qp, QAM_VALID))
176
 
                                memcpy(dest, p, t->re_len);
177
 
                        else
178
 
                                memset(dest, t->re_pad, t->re_len);
179
 
 
180
 
                        dest += data->doff;
181
 
                        memcpy(dest, data->data, data->size);
182
 
                } else {
183
 
                        datap = data;
184
 
                        p += data->doff;
185
 
                }
186
 
        }
187
 
 
188
 
no_partial:
189
 
        if (DBC_LOGGING(dbc)) {
190
 
                olddata.size = 0;
191
 
                if (F_ISSET(qp, QAM_SET)) {
192
 
                        olddata.data = qp->data;
193
 
                        olddata.size = t->re_len;
194
 
                }
195
 
                if ((ret = __qam_add_log(dbp, dbc->txn, &LSN(pagep),
196
 
                    0, &LSN(pagep), pagep->pgno,
197
 
                    indx, recno, datap, qp->flags,
198
 
                    olddata.size == 0 ? NULL : &olddata)) != 0)
199
 
                        goto err;
200
 
        }
201
 
 
202
 
        F_SET(qp, QAM_VALID | QAM_SET);
203
 
        memcpy(p, datap->data, datap->size);
204
 
        if (!F_ISSET(data, DB_DBT_PARTIAL))
205
 
                memset(p + datap->size,  t->re_pad, t->re_len - datap->size);
206
 
 
207
 
err:    if (alloced)
208
 
                __os_free(dbp->dbenv, datap->data);
209
 
 
210
 
        return (ret);
211
 
}
212
 
/*
213
 
 * __qam_c_put
214
 
 *      Cursor put for queued access method.
215
 
 *      BEFORE and AFTER cannot be specified.
216
 
 */
217
 
static int
218
 
__qam_c_put(dbc, key, data, flags, pgnop)
219
 
        DBC *dbc;
220
 
        DBT *key, *data;
221
 
        u_int32_t flags;
222
 
        db_pgno_t *pgnop;
223
 
{
224
 
        DB *dbp;
225
 
        DB_LOCK lock;
226
 
        DB_MPOOLFILE *mpf;
227
 
        QMETA *meta;
228
 
        QUEUE_CURSOR *cp;
229
 
        db_pgno_t pg;
230
 
        db_recno_t new_cur, new_first;
231
 
        u_int32_t opcode;
232
 
        int exact, ret, t_ret;
233
 
 
234
 
        dbp = dbc->dbp;
235
 
        mpf = dbp->mpf;
236
 
        if (pgnop != NULL)
237
 
                *pgnop = PGNO_INVALID;
238
 
 
239
 
        cp = (QUEUE_CURSOR *)dbc->internal;
240
 
 
241
 
        switch (flags) {
242
 
        case DB_KEYFIRST:
243
 
        case DB_KEYLAST:
244
 
                if ((ret = __qam_getno(dbp, key, &cp->recno)) != 0)
245
 
                        return (ret);
246
 
                /* FALLTHROUGH */
247
 
        case DB_CURRENT:
248
 
                break;
249
 
        default:
250
 
                /* The interface shouldn't let anything else through. */
251
 
                DB_ASSERT(0);
252
 
                return (__db_ferr(dbp->dbenv, "__qam_c_put", flags));
253
 
        }
254
 
 
255
 
        /* Write lock the record. */
256
 
        if ((ret = __db_lget(dbc,
257
 
            0, cp->recno, DB_LOCK_WRITE, DB_LOCK_RECORD, &lock)) != 0)
258
 
                return (ret);
259
 
 
260
 
        if ((ret = __qam_position(dbc,
261
 
            &cp->recno, QAM_WRITE, &exact)) != 0) {
262
 
                /* We could not get the page, we can release the record lock. */
263
 
                __LPUT(dbc, lock);
264
 
                return (ret);
265
 
        }
266
 
 
267
 
        /* Put the item on the page. */
268
 
        ret = __qam_pitem(dbc, (QPAGE *)cp->page, cp->indx, cp->recno, data);
269
 
 
270
 
        /* Doing record locking, release the page lock */
271
 
        if ((t_ret = __LPUT(dbc, cp->lock)) != 0 && ret == 0)
272
 
                ret = t_ret;
273
 
        if ((t_ret = __qam_fput(
274
 
            dbp, cp->pgno, cp->page, DB_MPOOL_DIRTY)) != 0 && ret == 0)
275
 
                ret = t_ret;
276
 
        cp->page = NULL;
277
 
        cp->lock = lock;
278
 
        cp->lock_mode = DB_LOCK_WRITE;
279
 
        if (ret != 0)
280
 
                return (ret);
281
 
 
282
 
        /* We may need to reset the head or tail of the queue. */
283
 
        pg = ((QUEUE *)dbp->q_internal)->q_meta;
284
 
 
285
 
        /*
286
 
         * Get the meta page first, we don't want to write lock it while
287
 
         * trying to pin it.
288
 
         */
289
 
        if ((ret = mpf->get(mpf, &pg, 0, &meta)) != 0)
290
 
                return (ret);
291
 
        if ((ret = __db_lget(dbc, 0, pg,  DB_LOCK_WRITE, 0, &lock)) != 0) {
292
 
                (void)mpf->put(mpf, meta, 0);
293
 
                return (ret);
294
 
        }
295
 
 
296
 
        opcode = 0;
297
 
        new_cur = new_first = 0;
298
 
 
299
 
        /*
300
 
         * If the put address is outside the queue, adjust the head and
301
 
         * tail of the queue.  If the order is inverted we move
302
 
         * the one which is closer.  The first case is when the
303
 
         * queue is empty, move first and current to where the new
304
 
         * insert is.
305
 
         */
306
 
 
307
 
        if (meta->first_recno == meta->cur_recno) {
308
 
                new_first = cp->recno;
309
 
                new_cur = cp->recno + 1;
310
 
                if (new_cur == RECNO_OOB)
311
 
                        new_cur++;
312
 
                opcode |= QAM_SETFIRST;
313
 
                opcode |= QAM_SETCUR;
314
 
        } else {
315
 
                if (QAM_BEFORE_FIRST(meta, cp->recno) &&
316
 
                    (meta->first_recno <= meta->cur_recno ||
317
 
                    meta->first_recno - cp->recno <
318
 
                    cp->recno - meta->cur_recno)) {
319
 
                        new_first = cp->recno;
320
 
                        opcode |= QAM_SETFIRST;
321
 
                }
322
 
 
323
 
                if (meta->cur_recno == cp->recno ||
324
 
                    (QAM_AFTER_CURRENT(meta, cp->recno) &&
325
 
                    (meta->first_recno <= meta->cur_recno ||
326
 
                    cp->recno - meta->cur_recno <=
327
 
                    meta->first_recno - cp->recno))) {
328
 
                        new_cur = cp->recno + 1;
329
 
                        if (new_cur == RECNO_OOB)
330
 
                                new_cur++;
331
 
                        opcode |= QAM_SETCUR;
332
 
                }
333
 
        }
334
 
 
335
 
        if (opcode != 0 && DBC_LOGGING(dbc)) {
336
 
                ret = __qam_mvptr_log(dbp, dbc->txn, &meta->dbmeta.lsn,
337
 
                    0, opcode, meta->first_recno, new_first,
338
 
                    meta->cur_recno, new_cur, &meta->dbmeta.lsn, PGNO_BASE_MD);
339
 
                if (ret != 0)
340
 
                        opcode = 0;
341
 
        }
342
 
 
343
 
        if (opcode & QAM_SETCUR)
344
 
                meta->cur_recno = new_cur;
345
 
        if (opcode & QAM_SETFIRST)
346
 
                meta->first_recno = new_first;
347
 
 
348
 
        if ((t_ret = mpf->put(
349
 
            mpf, meta, opcode != 0 ? DB_MPOOL_DIRTY : 0)) != 0 && ret == 0)
350
 
                ret = t_ret;
351
 
 
352
 
        /* Don't hold the meta page long term. */
353
 
        if ((t_ret = __LPUT(dbc, lock)) != 0 && ret == 0)
354
 
                ret = t_ret;
355
 
        return (ret);
356
 
}
357
 
 
358
 
/*
359
 
 * __qam_append --
360
 
 *      Perform a put(DB_APPEND) in queue.
361
 
 *
362
 
 * PUBLIC: int __qam_append __P((DBC *, DBT *, DBT *));
363
 
 */
364
 
int
365
 
__qam_append(dbc, key, data)
366
 
        DBC *dbc;
367
 
        DBT *key, *data;
368
 
{
369
 
        DB *dbp;
370
 
        DB_LOCK lock;
371
 
        DB_MPOOLFILE *mpf;
372
 
        QMETA *meta;
373
 
        QPAGE *page;
374
 
        QUEUE *qp;
375
 
        QUEUE_CURSOR *cp;
376
 
        db_pgno_t pg;
377
 
        db_recno_t recno;
378
 
        int ret, t_ret;
379
 
 
380
 
        dbp = dbc->dbp;
381
 
        mpf = dbp->mpf;
382
 
        cp = (QUEUE_CURSOR *)dbc->internal;
383
 
 
384
 
        pg = ((QUEUE *)dbp->q_internal)->q_meta;
385
 
        /*
386
 
         * Get the meta page first, we don't want to write lock it while
387
 
         * trying to pin it.
388
 
         */
389
 
        if ((ret = mpf->get(mpf, &pg, 0, &meta)) != 0)
390
 
                return (ret);
391
 
        /* Write lock the meta page. */
392
 
        if ((ret = __db_lget(dbc, 0, pg,  DB_LOCK_WRITE, 0, &lock)) != 0) {
393
 
                (void)mpf->put(mpf, meta, 0);
394
 
                return (ret);
395
 
        }
396
 
 
397
 
        /* Get the next record number. */
398
 
        recno = meta->cur_recno;
399
 
        meta->cur_recno++;
400
 
        if (meta->cur_recno == RECNO_OOB)
401
 
                meta->cur_recno++;
402
 
        if (meta->cur_recno == meta->first_recno) {
403
 
                meta->cur_recno--;
404
 
                if (meta->cur_recno == RECNO_OOB)
405
 
                        meta->cur_recno--;
406
 
                (void)__LPUT(dbc, lock);
407
 
                ret = EFBIG;
408
 
                goto err;
409
 
        }
410
 
 
411
 
        if (QAM_BEFORE_FIRST(meta, recno))
412
 
                meta->first_recno = recno;
413
 
 
414
 
        /* Lock the record and release meta page lock. */
415
 
        if ((ret = __db_lget(dbc, LCK_COUPLE_ALWAYS,
416
 
            recno, DB_LOCK_WRITE, DB_LOCK_RECORD, &lock)) != 0) {
417
 
                (void)__LPUT(dbc, lock);
418
 
                goto err;
419
 
        }
420
 
 
421
 
        /*
422
 
         * The application may modify the data based on the selected record
423
 
         * number.
424
 
         */
425
 
        if (dbc->dbp->db_append_recno != NULL &&
426
 
            (ret = dbc->dbp->db_append_recno(dbc->dbp, data, recno)) != 0) {
427
 
                (void)__LPUT(dbc, lock);
428
 
                goto err;
429
 
        }
430
 
 
431
 
        cp->lock = lock;
432
 
        cp->lock_mode = DB_LOCK_WRITE;
433
 
 
434
 
        pg = QAM_RECNO_PAGE(dbp, recno);
435
 
 
436
 
        /* Fetch and write lock the data page. */
437
 
        if ((ret = __db_lget(dbc, 0, pg,  DB_LOCK_WRITE, 0, &lock)) != 0)
438
 
                goto err;
439
 
        if ((ret = __qam_fget(dbp, &pg, DB_MPOOL_CREATE, &page)) != 0) {
440
 
                /* We did not fetch it, we can release the lock. */
441
 
                (void)__LPUT(dbc, lock);
442
 
                goto err;
443
 
        }
444
 
 
445
 
        /* See if this is a new page. */
446
 
        if (page->pgno == 0) {
447
 
                page->pgno = pg;
448
 
                page->type = P_QAMDATA;
449
 
        }
450
 
 
451
 
        /* Put the item on the page and log it. */
452
 
        ret = __qam_pitem(dbc, page,
453
 
            QAM_RECNO_INDEX(dbp, pg, recno), recno, data);
454
 
 
455
 
        /* Doing record locking, release the page lock */
456
 
        if ((t_ret = __LPUT(dbc, lock)) != 0 && ret == 0)
457
 
                ret = t_ret;
458
 
 
459
 
        if ((t_ret
460
 
            = __qam_fput(dbp, pg, page, DB_MPOOL_DIRTY)) != 0 && ret == 0)
461
 
                ret = t_ret;
462
 
 
463
 
        /* Return the record number to the user. */
464
 
        if (ret == 0)
465
 
                ret = __db_retcopy(dbp->dbenv, key,
466
 
                    &recno, sizeof(recno), &dbc->rkey->data, &dbc->rkey->ulen);
467
 
 
468
 
        /* Position the cursor on this record. */
469
 
        cp->recno = recno;
470
 
 
471
 
        /* See if we are leaving the extent. */
472
 
        qp = (QUEUE *) dbp->q_internal;
473
 
        if (qp->page_ext != 0 &&
474
 
            (recno % (qp->page_ext * qp->rec_page) == 0 ||
475
 
            recno == UINT32_T_MAX)) {
476
 
                if ((ret = __db_lget(dbc,
477
 
                    0, ((QUEUE *)dbp->q_internal)->q_meta,
478
 
                    DB_LOCK_WRITE, 0, &lock)) != 0)
479
 
                        goto err;
480
 
                if (!QAM_AFTER_CURRENT(meta, recno))
481
 
                        ret = __qam_fclose(dbp, pg);
482
 
                (void)__LPUT(dbc, lock);
483
 
        }
484
 
 
485
 
err:
486
 
        /* Release the meta page. */
487
 
        if ((t_ret = mpf->put(mpf, meta, DB_MPOOL_DIRTY)) != 0 && ret == 0)
488
 
                ret = t_ret;
489
 
 
490
 
        return (ret);
491
 
}
492
 
 
493
 
/*
494
 
 * __qam_c_del --
495
 
 *      Qam cursor->am_del function
496
 
 */
497
 
static int
498
 
__qam_c_del(dbc)
499
 
        DBC *dbc;
500
 
{
501
 
        DB *dbp;
502
 
        DBT data;
503
 
        DB_LOCK lock;
504
 
        DB_MPOOLFILE *mpf;
505
 
        PAGE *pagep;
506
 
        QAMDATA *qp;
507
 
        QMETA *meta;
508
 
        QUEUE_CURSOR *cp;
509
 
        db_pgno_t pg;
510
 
        db_recno_t first;
511
 
        int exact, ret, t_ret;
512
 
 
513
 
        dbp = dbc->dbp;
514
 
        mpf = dbp->mpf;
515
 
        cp = (QUEUE_CURSOR *)dbc->internal;
516
 
 
517
 
        pg = ((QUEUE *)dbp->q_internal)->q_meta;
518
 
        /*
519
 
         * Get the meta page first, we don't want to write lock it while
520
 
         * trying to pin it.
521
 
         */
522
 
        if ((ret = mpf->get(mpf, &pg, 0, &meta)) != 0)
523
 
                return (ret);
524
 
        /* Write lock the meta page. */
525
 
        if ((ret = __db_lget(dbc, 0, pg,  DB_LOCK_READ, 0, &lock)) != 0) {
526
 
                (void)mpf->put(mpf, meta, 0);
527
 
                return (ret);
528
 
        }
529
 
 
530
 
        if (QAM_NOT_VALID(meta, cp->recno))
531
 
                ret = DB_NOTFOUND;
532
 
 
533
 
        first = meta->first_recno;
534
 
 
535
 
        /* Don't hold the meta page long term. */
536
 
        if ((t_ret = __LPUT(dbc, lock)) != 0 && ret == 0)
537
 
                ret = t_ret;
538
 
 
539
 
        if (ret != 0)
540
 
                goto err1;
541
 
 
542
 
        if ((ret = __db_lget(dbc,
543
 
            0, cp->recno, DB_LOCK_WRITE, DB_LOCK_RECORD, &lock)) != 0)
544
 
                goto err1;
545
 
 
546
 
        cp->lock_mode = DB_LOCK_WRITE;
547
 
        /* Find the record ; delete only deletes exact matches. */
548
 
        if ((ret = __qam_position(dbc,
549
 
            &cp->recno, QAM_WRITE, &exact)) != 0) {
550
 
                cp->lock = lock;
551
 
                goto err1;
552
 
        }
553
 
        if (!exact) {
554
 
                ret = DB_NOTFOUND;
555
 
                goto err1;
556
 
        }
557
 
 
558
 
        pagep = cp->page;
559
 
        qp = QAM_GET_RECORD(dbp, pagep, cp->indx);
560
 
 
561
 
        if (DBC_LOGGING(dbc)) {
562
 
                if (((QUEUE *)dbp->q_internal)->page_ext == 0 ||
563
 
                    ((QUEUE *)dbp->q_internal)->re_len == 0) {
564
 
                        if ((ret = __qam_del_log(dbp,
565
 
                            dbc->txn, &LSN(pagep), 0, &LSN(pagep),
566
 
                            pagep->pgno, cp->indx, cp->recno)) != 0)
567
 
                                goto err1;
568
 
                } else {
569
 
                        data.size = ((QUEUE *)dbp->q_internal)->re_len;
570
 
                        data.data = qp->data;
571
 
                        if ((ret = __qam_delext_log(dbp,
572
 
                            dbc->txn, &LSN(pagep), 0, &LSN(pagep),
573
 
                            pagep->pgno, cp->indx, cp->recno, &data)) != 0)
574
 
                                goto err1;
575
 
                }
576
 
        }
577
 
 
578
 
        F_CLR(qp, QAM_VALID);
579
 
 
580
 
        if (cp->recno == first) {
581
 
                pg = ((QUEUE *)dbp->q_internal)->q_meta;
582
 
                if ((ret =
583
 
                    __db_lget(dbc, 0, pg,  DB_LOCK_WRITE, 0, &lock)) != 0)
584
 
                        goto err1;
585
 
                ret = __qam_consume(dbc, meta, first);
586
 
                if ((t_ret = __LPUT(dbc, lock)) != 0 && ret == 0)
587
 
                        ret = t_ret;
588
 
        }
589
 
 
590
 
err1:
591
 
        if ((t_ret = mpf->put(mpf, meta, 0)) != 0 && ret == 0)
592
 
                ret = t_ret;
593
 
        if (cp->page != NULL && (t_ret = __qam_fput(dbp, cp->pgno,
594
 
            cp->page, ret == 0 ? DB_MPOOL_DIRTY : 0)) != 0 && ret == 0)
595
 
                ret = t_ret;
596
 
        cp->page = NULL;
597
 
 
598
 
        /* Doing record locking, release the page lock */
599
 
        if ((t_ret = __LPUT(dbc, cp->lock)) != 0 && ret == 0)
600
 
                ret = t_ret;
601
 
        cp->lock = lock;
602
 
 
603
 
        return (ret);
604
 
}
605
 
 
606
 
#ifdef  DEBUG_WOP
607
 
#define QDEBUG
608
 
#endif
609
 
 
610
 
/*
611
 
 * __qam_c_get --
612
 
 *      Queue cursor->c_get function.
613
 
 */
614
 
static int
615
 
__qam_c_get(dbc, key, data, flags, pgnop)
616
 
        DBC *dbc;
617
 
        DBT *key, *data;
618
 
        u_int32_t flags;
619
 
        db_pgno_t *pgnop;
620
 
{
621
 
        DB *dbp;
622
 
        DBC *dbcdup;
623
 
        DBT tmp;
624
 
        DB_ENV *dbenv;
625
 
        DB_LOCK lock, pglock, metalock;
626
 
        DB_MPOOLFILE *mpf;
627
 
        PAGE *pg;
628
 
        QAMDATA *qp;
629
 
        QMETA *meta;
630
 
        QUEUE *t;
631
 
        QUEUE_CURSOR *cp;
632
 
        db_lockmode_t lock_mode;
633
 
        db_pgno_t metapno;
634
 
        db_recno_t first;
635
 
        qam_position_mode mode;
636
 
        int exact, is_first, locked, ret, t_ret, wait, with_delete;
637
 
        int put_mode, meta_dirty, retrying;
638
 
 
639
 
        dbp = dbc->dbp;
640
 
        dbenv = dbp->dbenv;
641
 
        mpf = dbp->mpf;
642
 
        cp = (QUEUE_CURSOR *)dbc->internal;
643
 
 
644
 
        PANIC_CHECK(dbenv);
645
 
 
646
 
        wait = 0;
647
 
        with_delete = 0;
648
 
        retrying = 0;
649
 
        lock_mode = DB_LOCK_READ;
650
 
        put_mode = 0;
651
 
        t_ret = 0;
652
 
        *pgnop = 0;
653
 
        pg = NULL;
654
 
 
655
 
        mode = QAM_READ;
656
 
        if (F_ISSET(dbc, DBC_RMW)) {
657
 
                lock_mode = DB_LOCK_WRITE;
658
 
                mode = QAM_WRITE;
659
 
        }
660
 
 
661
 
        if (flags == DB_CONSUME_WAIT) {
662
 
                wait = 1;
663
 
                flags = DB_CONSUME;
664
 
        }
665
 
        if (flags == DB_CONSUME) {
666
 
                if ((ret = __db_check_txn(dbp, dbc->txn, dbc->locker, 0)) != 0)
667
 
                        return (ret);
668
 
 
669
 
                with_delete = 1;
670
 
                flags = DB_FIRST;
671
 
                lock_mode = DB_LOCK_WRITE;
672
 
                mode = QAM_CONSUME;
673
 
        }
674
 
 
675
 
        DEBUG_LREAD(dbc, dbc->txn, "qam_c_get",
676
 
            flags == DB_SET || flags == DB_SET_RANGE ? key : NULL, NULL, flags);
677
 
 
678
 
        /* Make lint and friends happy. */
679
 
        meta_dirty = 0;
680
 
        locked = 0;
681
 
 
682
 
        is_first = 0;
683
 
 
684
 
        t = (QUEUE *)dbp->q_internal;
685
 
        metapno = t->q_meta;
686
 
 
687
 
        /*
688
 
         * Get the meta page first, we don't want to write lock it while
689
 
         * trying to pin it.  This is because someone my have it pinned
690
 
         * but not locked.
691
 
         */
692
 
        if ((ret = mpf->get(mpf, &metapno, 0, &meta)) != 0)
693
 
                return (ret);
694
 
        if ((ret = __db_lget(dbc, 0, metapno, lock_mode, 0, &metalock)) != 0)
695
 
                goto err;
696
 
        locked = 1;
697
 
 
698
 
        first = 0;
699
 
 
700
 
        /* Release any previous lock if not in a transaction. */
701
 
        (void)__TLPUT(dbc, cp->lock);
702
 
 
703
 
retry:  /* Update the record number. */
704
 
        switch (flags) {
705
 
        case DB_CURRENT:
706
 
                break;
707
 
        case DB_NEXT_DUP:
708
 
                ret = DB_NOTFOUND;
709
 
                goto err;
710
 
                /* NOTREACHED */
711
 
        case DB_NEXT:
712
 
        case DB_NEXT_NODUP:
713
 
                if (cp->recno != RECNO_OOB) {
714
 
                        ++cp->recno;
715
 
                        /* Wrap around, skipping zero. */
716
 
                        if (cp->recno == RECNO_OOB)
717
 
                                cp->recno++;
718
 
                        break;
719
 
                }
720
 
                /* FALLTHROUGH */
721
 
        case DB_FIRST:
722
 
                flags = DB_NEXT;
723
 
                is_first = 1;
724
 
 
725
 
                /* get the first record number */
726
 
                cp->recno = first = meta->first_recno;
727
 
 
728
 
                break;
729
 
        case DB_PREV:
730
 
        case DB_PREV_NODUP:
731
 
                if (cp->recno != RECNO_OOB) {
732
 
                        if (QAM_BEFORE_FIRST(meta, cp->recno) ||
733
 
                            cp->recno == meta->first_recno) {
734
 
                                ret = DB_NOTFOUND;
735
 
                                goto err;
736
 
                        }
737
 
                        --cp->recno;
738
 
                        /* Wrap around, skipping zero. */
739
 
                        if (cp->recno == RECNO_OOB)
740
 
                                --cp->recno;
741
 
                        break;
742
 
                }
743
 
                /* FALLTHROUGH */
744
 
        case DB_LAST:
745
 
                if (meta->first_recno == meta->cur_recno) {
746
 
                        ret = DB_NOTFOUND;
747
 
                        goto err;
748
 
                }
749
 
                cp->recno = meta->cur_recno - 1;
750
 
                if (cp->recno == RECNO_OOB)
751
 
                        cp->recno--;
752
 
                break;
753
 
        case DB_SET:
754
 
        case DB_SET_RANGE:
755
 
        case DB_GET_BOTH:
756
 
        case DB_GET_BOTH_RANGE:
757
 
                if ((ret = __qam_getno(dbp, key, &cp->recno)) != 0)
758
 
                        goto err;
759
 
                break;
760
 
        default:
761
 
                ret = __db_unknown_flag(dbenv, "__qam_c_get", flags);
762
 
                goto err;
763
 
        }
764
 
 
765
 
        /*
766
 
         * Check to see if we are out of data.  Current points to
767
 
         * the first free slot.
768
 
         */
769
 
        if (cp->recno == meta->cur_recno ||
770
 
            QAM_AFTER_CURRENT(meta, cp->recno)) {
771
 
                ret = DB_NOTFOUND;
772
 
                pg = NULL;
773
 
                if (wait) {
774
 
                        flags = DB_FIRST;
775
 
                        /*
776
 
                         * If first is not set, then we skipped a
777
 
                         * locked record, go back and find it.
778
 
                         * If we find a locked record again
779
 
                         * wait for it.
780
 
                         */
781
 
                        if (first == 0) {
782
 
                                retrying = 1;
783
 
                                goto retry;
784
 
                        }
785
 
                        if (CDB_LOCKING(dbenv)) {
786
 
                                if ((ret = dbenv->lock_get(
787
 
                                    dbenv, dbc->locker,
788
 
                                    DB_LOCK_SWITCH, &dbc->lock_dbt,
789
 
                                    DB_LOCK_WAIT, &dbc->mylock)) != 0)
790
 
                                        goto err;
791
 
                                if ((ret = dbenv->lock_get(
792
 
                                    dbenv, dbc->locker,
793
 
                                    DB_LOCK_UPGRADE, &dbc->lock_dbt,
794
 
                                    DB_LOCK_WRITE, &dbc->mylock)) != 0)
795
 
                                        goto err;
796
 
                                goto retry;
797
 
                        }
798
 
                        /*
799
 
                         * Wait for someone to update the meta page.
800
 
                         * This will probably mean there is something
801
 
                         * in the queue.  We then go back up and
802
 
                         * try again.
803
 
                         */
804
 
                        if (locked == 0) {
805
 
                                if ((ret = __db_lget( dbc,
806
 
                                    0, metapno, lock_mode, 0, &metalock)) != 0)
807
 
                                        goto err;
808
 
                                locked = 1;
809
 
                                if (cp->recno != RECNO_OOB &&
810
 
                                    !QAM_AFTER_CURRENT(meta, cp->recno))
811
 
                                        goto retry;
812
 
                        }
813
 
                        if ((ret = __db_lget(dbc, 0, metapno,
814
 
                            DB_LOCK_WAIT, DB_LOCK_SWITCH, &metalock)) != 0)
815
 
                                goto err;
816
 
                        if ((ret = dbenv->lock_get(dbenv, dbc->locker,
817
 
                            DB_LOCK_UPGRADE, &dbc->lock_dbt, DB_LOCK_WRITE,
818
 
                            &metalock)) != 0)
819
 
                                goto err;
820
 
                        locked = 1;
821
 
                        goto retry;
822
 
                }
823
 
 
824
 
                goto err;
825
 
        }
826
 
 
827
 
        /* Don't hold the meta page long term. */
828
 
        if (locked) {
829
 
                if ((ret = __LPUT(dbc, metalock)) != 0)
830
 
                        goto err;
831
 
                locked = 0;
832
 
        }
833
 
 
834
 
        /* Lock the record. */
835
 
        if ((ret = __db_lget(dbc, 0, cp->recno, lock_mode,
836
 
            (with_delete && !retrying) ?
837
 
            DB_LOCK_NOWAIT | DB_LOCK_RECORD : DB_LOCK_RECORD,
838
 
            &lock)) == DB_LOCK_NOTGRANTED && with_delete) {
839
 
#ifdef QDEBUG
840
 
                __db_logmsg(dbenv,
841
 
                    dbc->txn, "Queue S", 0, "%x %d %d %d",
842
 
                    dbc->locker, cp->recno, first, meta->first_recno);
843
 
#endif
844
 
                first = 0;
845
 
                if ((ret =
846
 
                    __db_lget(dbc, 0, metapno, lock_mode, 0, &metalock)) != 0)
847
 
                        goto err;
848
 
                locked = 1;
849
 
                goto retry;
850
 
        }
851
 
 
852
 
        if (ret != 0)
853
 
                goto err;
854
 
 
855
 
        /*
856
 
         * In the DB_FIRST or DB_LAST cases we must wait and then start over
857
 
         * since the first/last may have moved while we slept.
858
 
         * We release our locks and try again.
859
 
         */
860
 
        if ((!with_delete && is_first) || flags == DB_LAST) {
861
 
                if ((ret =
862
 
                    __db_lget(dbc, 0, metapno, lock_mode, 0, &metalock)) != 0)
863
 
                        goto err;
864
 
                if (cp->recno !=
865
 
                    (is_first ? meta->first_recno : (meta->cur_recno - 1))) {
866
 
                        __LPUT(dbc, lock);
867
 
                        if (is_first)
868
 
                                flags = DB_FIRST;
869
 
                        locked = 1;
870
 
                        goto retry;
871
 
                }
872
 
                /* Don't hold the meta page long term. */
873
 
                if ((ret = __LPUT(dbc, metalock)) != 0)
874
 
                        goto err;
875
 
        }
876
 
 
877
 
        /* Position the cursor on the record. */
878
 
        if ((ret = __qam_position(dbc, &cp->recno, mode, &exact)) != 0) {
879
 
                /* We cannot get the page, release the record lock. */
880
 
                (void)__LPUT(dbc, lock);
881
 
                goto err;
882
 
        }
883
 
 
884
 
        pg = cp->page;
885
 
        pglock = cp->lock;
886
 
        cp->lock = lock;
887
 
        cp->lock_mode = lock_mode;
888
 
 
889
 
        if (!exact) {
890
 
                if (flags == DB_NEXT || flags == DB_NEXT_NODUP ||
891
 
                    flags == DB_PREV || flags == DB_PREV_NODUP ||
892
 
                    flags == DB_LAST) {
893
 
                        /* Release locks and try again. */
894
 
                        if (pg != NULL)
895
 
                                (void)__qam_fput(dbp, cp->pgno, pg, 0);
896
 
                        cp->page = pg = NULL;
897
 
                        (void)__LPUT(dbc, pglock);
898
 
                        (void)__LPUT(dbc, cp->lock);
899
 
                        if (flags == DB_LAST)
900
 
                                flags = DB_PREV;
901
 
                        if (!with_delete)
902
 
                                is_first = 0;
903
 
                        retrying = 0;
904
 
                        goto retry;
905
 
                }
906
 
                /* this is for the SET and SET_RANGE cases */
907
 
                ret = DB_KEYEMPTY;
908
 
                goto err1;
909
 
        }
910
 
 
911
 
        /* Return the key if the user didn't give us one. */
912
 
        if (key != NULL) {
913
 
                if (flags != DB_GET_BOTH && flags != DB_GET_BOTH_RANGE &&
914
 
                    flags != DB_SET && flags != DB_SET_RANGE &&
915
 
                    (ret = __db_retcopy(dbp->dbenv,
916
 
                    key, &cp->recno, sizeof(cp->recno),
917
 
                    &dbc->rkey->data, &dbc->rkey->ulen)) != 0)
918
 
                        goto err1;
919
 
                F_SET(key, DB_DBT_ISSET);
920
 
        }
921
 
 
922
 
        qp = QAM_GET_RECORD(dbp, pg, cp->indx);
923
 
 
924
 
        /* Return the data item. */
925
 
        if (flags == DB_GET_BOTH || flags == DB_GET_BOTH_RANGE) {
926
 
                /*
927
 
                 * Need to compare
928
 
                 */
929
 
                tmp.data = qp->data;
930
 
                tmp.size = t->re_len;
931
 
                if ((ret = __bam_defcmp(dbp, data, &tmp)) != 0) {
932
 
                        ret = DB_NOTFOUND;
933
 
                        goto err1;
934
 
                }
935
 
        }
936
 
        if (data != NULL &&
937
 
            !F_ISSET(dbc, DBC_MULTIPLE|DBC_MULTIPLE_KEY) &&
938
 
            (ret = __db_retcopy(dbp->dbenv, data,
939
 
            qp->data, t->re_len, &dbc->rdata->data, &dbc->rdata->ulen)) != 0)
940
 
                goto err1;
941
 
 
942
 
        if (data != NULL)
943
 
                F_SET(data, DB_DBT_ISSET);
944
 
 
945
 
        /* Finally, if we are doing DB_CONSUME mark the record. */
946
 
        if (with_delete) {
947
 
                /*
948
 
                 * Assert that we're not a secondary index.  Doing a DB_CONSUME
949
 
                 * on a secondary makes very little sense, since one can't
950
 
                 * DB_APPEND there;  attempting one should be forbidden by
951
 
                 * the interface.
952
 
                 */
953
 
                DB_ASSERT(!F_ISSET(dbp, DB_AM_SECONDARY));
954
 
 
955
 
                /*
956
 
                 * Check and see if we *have* any secondary indices.
957
 
                 * If we do, we're a primary, so call __db_c_del_primary
958
 
                 * to delete the references to the item we're about to
959
 
                 * delete.
960
 
                 *
961
 
                 * Note that we work on a duplicated cursor, since the
962
 
                 * __db_ret work has already been done, so it's not safe
963
 
                 * to perform any additional ops on this cursor.
964
 
                 */
965
 
                if (LIST_FIRST(&dbp->s_secondaries) != NULL) {
966
 
                        if ((ret = __db_c_idup(dbc,
967
 
                            &dbcdup, DB_POSITIONI)) != 0)
968
 
                                goto err1;
969
 
 
970
 
                        if ((ret = __db_c_del_primary(dbcdup)) != 0) {
971
 
                                /*
972
 
                                 * The __db_c_del_primary return is more
973
 
                                 * interesting.
974
 
                                 */
975
 
                                (void)dbcdup->c_close(dbcdup);
976
 
                                goto err1;
977
 
                        }
978
 
 
979
 
                        if ((ret = dbcdup->c_close(dbcdup)) != 0)
980
 
                                goto err1;
981
 
                }
982
 
 
983
 
                if (DBC_LOGGING(dbc)) {
984
 
                        if (t->page_ext == 0 || t->re_len == 0) {
985
 
                                if ((ret = __qam_del_log(dbp, dbc->txn,
986
 
                                    &LSN(pg), 0, &LSN(pg),
987
 
                                    pg->pgno, cp->indx, cp->recno)) != 0)
988
 
                                        goto err1;
989
 
                        } else {
990
 
                                tmp.data = qp->data;
991
 
                                tmp.size = t->re_len;
992
 
                                if ((ret = __qam_delext_log(dbp,
993
 
                                   dbc->txn, &LSN(pg), 0, &LSN(pg),
994
 
                                   pg->pgno, cp->indx, cp->recno, &tmp)) != 0)
995
 
                                        goto err1;
996
 
                        }
997
 
                }
998
 
 
999
 
                F_CLR(qp, QAM_VALID);
1000
 
                put_mode = DB_MPOOL_DIRTY;
1001
 
 
1002
 
                if ((ret = __LPUT(dbc, pglock)) != 0)
1003
 
                        goto err1;
1004
 
 
1005
 
                /*
1006
 
                 * Now we need to update the metapage
1007
 
                 * first pointer. If we have deleted
1008
 
                 * the record that is pointed to by
1009
 
                 * first_recno then we move it as far
1010
 
                 * forward as we can without blocking.
1011
 
                 * The metapage lock must be held for
1012
 
                 * the whole scan otherwise someone could
1013
 
                 * do a random insert behind where we are
1014
 
                 * looking.
1015
 
                 */
1016
 
 
1017
 
                if (locked == 0 && (ret = __db_lget(
1018
 
                    dbc, 0, metapno, lock_mode, 0, &metalock)) != 0)
1019
 
                        goto err1;
1020
 
                locked = 1;
1021
 
 
1022
 
#ifdef QDEBUG
1023
 
                __db_logmsg(dbenv,
1024
 
                    dbc->txn, "Queue D", 0, "%x %d %d %d",
1025
 
                    dbc->locker, cp->recno, first, meta->first_recno);
1026
 
#endif
1027
 
                /*
1028
 
                 * See if we deleted the "first" record.  If
1029
 
                 * first is zero then we skipped something,
1030
 
                 * see if first_recno has been move passed
1031
 
                 * that to the record that we deleted.
1032
 
                 */
1033
 
                if (first == 0)
1034
 
                        first = cp->recno;
1035
 
                if (first != meta->first_recno)
1036
 
                        goto done;
1037
 
 
1038
 
                if ((ret = __qam_consume(dbc, meta, first)) != 0)
1039
 
                        goto err1;
1040
 
        }
1041
 
 
1042
 
done:
1043
 
err1:   if (cp->page != NULL) {
1044
 
                t_ret = __qam_fput(dbp, cp->pgno, cp->page, put_mode);
1045
 
 
1046
 
                if (!ret)
1047
 
                        ret = t_ret;
1048
 
                /* Doing record locking, release the page lock */
1049
 
                t_ret = __LPUT(dbc, pglock);
1050
 
                cp->page = NULL;
1051
 
        }
1052
 
 
1053
 
err:    if (!ret)
1054
 
                ret = t_ret;
1055
 
        if (meta) {
1056
 
 
1057
 
                /* release the meta page */
1058
 
                t_ret = mpf->put(mpf, meta, meta_dirty ? DB_MPOOL_DIRTY : 0);
1059
 
 
1060
 
                if (!ret)
1061
 
                        ret = t_ret;
1062
 
 
1063
 
                /* Don't hold the meta page long term. */
1064
 
                if (locked)
1065
 
                        t_ret = __LPUT(dbc, metalock);
1066
 
        }
1067
 
        DB_ASSERT(!LOCK_ISSET(metalock));
1068
 
 
1069
 
        /*
1070
 
         * There is no need to keep the record locked if we are
1071
 
         * not in a transaction.
1072
 
         */
1073
 
        if (t_ret == 0)
1074
 
                t_ret = __TLPUT(dbc, cp->lock);
1075
 
 
1076
 
        return (ret ? ret : t_ret);
1077
 
}
1078
 
 
1079
 
/*
1080
 
 * __qam_consume -- try to reset the head of the queue.
1081
 
 *
1082
 
 */
1083
 
 
1084
 
static int
1085
 
__qam_consume(dbc, meta, first)
1086
 
        DBC *dbc;
1087
 
        QMETA *meta;
1088
 
        db_recno_t first;
1089
 
{
1090
 
        DB *dbp;
1091
 
        DB_LOCK lock, save_lock;
1092
 
        DB_MPOOLFILE *mpf;
1093
 
        QUEUE_CURSOR *cp;
1094
 
        db_indx_t save_indx;
1095
 
        db_pgno_t save_page;
1096
 
        db_recno_t current, save_recno;
1097
 
        u_int32_t rec_extent;
1098
 
        int exact, put_mode, ret, t_ret, wrapped;
1099
 
 
1100
 
        dbp = dbc->dbp;
1101
 
        mpf = dbp->mpf;
1102
 
        cp = (QUEUE_CURSOR *)dbc->internal;
1103
 
        put_mode = DB_MPOOL_DIRTY;
1104
 
        ret = t_ret = 0;
1105
 
 
1106
 
        save_page = cp->pgno;
1107
 
        save_indx = cp->indx;
1108
 
        save_recno = cp->recno;
1109
 
        save_lock = cp->lock;
1110
 
 
1111
 
        /*
1112
 
         * If we skipped some deleted records, we need to
1113
 
         * reposition on the first one.  Get a lock
1114
 
         * in case someone is trying to put it back.
1115
 
         */
1116
 
        if (first != cp->recno) {
1117
 
                ret = __db_lget(dbc, 0, first, DB_LOCK_READ,
1118
 
                    DB_LOCK_NOWAIT | DB_LOCK_RECORD, &lock);
1119
 
                if (ret == DB_LOCK_NOTGRANTED) {
1120
 
                        ret = 0;
1121
 
                        goto done;
1122
 
                }
1123
 
                if (ret != 0)
1124
 
                        goto done;
1125
 
                if ((ret =
1126
 
                    __qam_fput(dbp, cp->pgno, cp->page, put_mode)) != 0)
1127
 
                        goto done;
1128
 
                cp->page = NULL;
1129
 
                put_mode = 0;
1130
 
                if ((ret = __qam_position(dbc,
1131
 
                    &first, QAM_READ, &exact)) != 0 || exact != 0) {
1132
 
                        (void)__LPUT(dbc, lock);
1133
 
                        goto done;
1134
 
                }
1135
 
                if ((ret =__LPUT(dbc, lock)) != 0)
1136
 
                        goto done;
1137
 
                if ((ret = __LPUT(dbc, cp->lock)) != 0)
1138
 
                        goto done;
1139
 
        }
1140
 
 
1141
 
        current = meta->cur_recno;
1142
 
        wrapped = 0;
1143
 
        if (first > current)
1144
 
                wrapped = 1;
1145
 
        rec_extent = meta->page_ext * meta->rec_page;
1146
 
 
1147
 
        /* Loop until we find a record or hit current */
1148
 
        for (;;) {
1149
 
                /*
1150
 
                 * Check to see if we are moving off the extent
1151
 
                 * and remove the extent.
1152
 
                 * If we are moving off a page we need to
1153
 
                 * get rid of the buffer.
1154
 
                 * Wait for the lagging readers to move off the
1155
 
                 * page.
1156
 
                 */
1157
 
                if (cp->page != NULL && rec_extent != 0 &&
1158
 
                    ((exact = (first % rec_extent == 0)) ||
1159
 
                    first % meta->rec_page == 0 ||
1160
 
                    first == UINT32_T_MAX)) {
1161
 
                        if (exact == 1 && (ret = __db_lget(dbc,
1162
 
                            0, cp->pgno, DB_LOCK_WRITE, 0, &cp->lock)) != 0)
1163
 
                                break;
1164
 
 
1165
 
#ifdef QDEBUG
1166
 
                        __db_logmsg(dbp->dbenv,
1167
 
                            dbc->txn, "Queue R", 0, "%x %d %d %d",
1168
 
                            dbc->locker, cp->pgno, first, meta->first_recno);
1169
 
#endif
1170
 
                        put_mode |= DB_MPOOL_DISCARD;
1171
 
                        if ((ret = __qam_fput(dbp,
1172
 
                            cp->pgno, cp->page, put_mode)) != 0)
1173
 
                                break;
1174
 
                        cp->page = NULL;
1175
 
 
1176
 
                        if (exact == 1) {
1177
 
                                ret = __qam_fremove(dbp, cp->pgno);
1178
 
                                t_ret = __LPUT(dbc, cp->lock);
1179
 
                        }
1180
 
                        if (ret != 0)
1181
 
                                break;
1182
 
                        if (t_ret != 0) {
1183
 
                                ret = t_ret;
1184
 
                                break;
1185
 
                        }
1186
 
                } else if (cp->page != NULL && (ret =
1187
 
                    __qam_fput(dbp, cp->pgno, cp->page, put_mode)) != 0)
1188
 
                        break;
1189
 
                cp->page = NULL;
1190
 
                first++;
1191
 
                if (first == RECNO_OOB) {
1192
 
                        wrapped = 0;
1193
 
                        first++;
1194
 
                }
1195
 
 
1196
 
                /*
1197
 
                 * LOOP EXIT when we come move to the current
1198
 
                 * pointer.
1199
 
                 */
1200
 
                if (!wrapped && first >= current)
1201
 
                        break;
1202
 
 
1203
 
                ret = __db_lget(dbc, 0, first, DB_LOCK_READ,
1204
 
                    DB_LOCK_NOWAIT | DB_LOCK_RECORD, &lock);
1205
 
                if (ret == DB_LOCK_NOTGRANTED) {
1206
 
                        ret = 0;
1207
 
                        break;
1208
 
                }
1209
 
                if (ret != 0)
1210
 
                        break;
1211
 
 
1212
 
                if ((ret = __qam_position(dbc,
1213
 
                    &first, QAM_READ, &exact)) != 0) {
1214
 
                        (void)__LPUT(dbc, lock);
1215
 
                        break;
1216
 
                }
1217
 
                put_mode = 0;
1218
 
                if ((ret =__LPUT(dbc, lock)) != 0 ||
1219
 
                    (ret = __LPUT(dbc, cp->lock)) != 0 || exact) {
1220
 
                        if ((t_ret = __qam_fput(dbp, cp->pgno,
1221
 
                            cp->page, put_mode)) != 0 && ret == 0)
1222
 
                                ret = t_ret;
1223
 
                        cp->page = NULL;
1224
 
                        break;
1225
 
                }
1226
 
        }
1227
 
 
1228
 
        cp->pgno = save_page;
1229
 
        cp->indx = save_indx;
1230
 
        cp->recno = save_recno;
1231
 
        cp->lock = save_lock;
1232
 
 
1233
 
        /*
1234
 
         * We have advanced as far as we can.
1235
 
         * Advance first_recno to this point.
1236
 
         */
1237
 
        if (ret == 0 && meta->first_recno != first) {
1238
 
#ifdef QDEBUG
1239
 
                __db_logmsg(dbp->dbenv, dbc->txn, "Queue M",
1240
 
                    0, "%x %d %d %d", dbc->locker, cp->recno,
1241
 
                    first, meta->first_recno);
1242
 
#endif
1243
 
                if (DBC_LOGGING(dbc))
1244
 
                        if ((ret = __qam_incfirst_log(dbp,
1245
 
                            dbc->txn, &meta->dbmeta.lsn, 0,
1246
 
                            cp->recno, PGNO_BASE_MD)) != 0)
1247
 
                                goto done;
1248
 
                meta->first_recno = first;
1249
 
                (void)mpf->set(mpf, meta, DB_MPOOL_DIRTY);
1250
 
        }
1251
 
 
1252
 
done:
1253
 
        return (ret);
1254
 
}
1255
 
 
1256
 
static int
1257
 
__qam_bulk(dbc, data, flags)
1258
 
        DBC *dbc;
1259
 
        DBT *data;
1260
 
        u_int32_t flags;
1261
 
{
1262
 
        DB *dbp;
1263
 
        DB_LOCK metalock;
1264
 
        DB_MPOOLFILE *mpf;
1265
 
        PAGE *pg;
1266
 
        QMETA *meta;
1267
 
        QAMDATA *qp;
1268
 
        QUEUE_CURSOR *cp;
1269
 
        db_indx_t indx;
1270
 
        db_pgno_t metapno;
1271
 
        qam_position_mode mode;
1272
 
        int32_t  *endp, *offp;
1273
 
        u_int8_t *dbuf, *dp, *np;
1274
 
        int exact, recs, re_len, ret, t_ret, valid;
1275
 
        int is_key, need_pg, pagesize, size, space;
1276
 
 
1277
 
        dbp = dbc->dbp;
1278
 
        mpf = dbp->mpf;
1279
 
        cp = (QUEUE_CURSOR *)dbc->internal;
1280
 
 
1281
 
        mode = QAM_READ;
1282
 
        if (F_ISSET(dbc, DBC_RMW))
1283
 
                mode = QAM_WRITE;
1284
 
 
1285
 
        pagesize = dbp->pgsize;
1286
 
        re_len = ((QUEUE *)dbp->q_internal)->re_len;
1287
 
        recs = ((QUEUE *)dbp->q_internal)->rec_page;
1288
 
        metapno = ((QUEUE *)dbp->q_internal)->q_meta;
1289
 
 
1290
 
        is_key = LF_ISSET(DB_MULTIPLE_KEY) ? 1 : 0;
1291
 
        size = 0;
1292
 
 
1293
 
        if ((ret = __db_lget(dbc, 0, metapno, DB_LOCK_READ, 0, &metalock)) != 0)
1294
 
                return (ret);
1295
 
        if ((ret = mpf->get(mpf, &metapno, 0, &meta)) != 0) {
1296
 
                /* We did not fetch it, we can release the lock. */
1297
 
                (void)__LPUT(dbc, metalock);
1298
 
                return (ret);
1299
 
        }
1300
 
 
1301
 
        dbuf = data->data;
1302
 
        np = dp = dbuf;
1303
 
 
1304
 
        /* Keep track of space that is left.  There is an termination entry */
1305
 
        space = data->ulen;
1306
 
        space -= sizeof(*offp);
1307
 
 
1308
 
        /* Build the offset/size table form the end up. */
1309
 
        endp = (int32_t *) ((u_int8_t *)dbuf + data->ulen);
1310
 
        endp--;
1311
 
        offp = endp;
1312
 
 
1313
 
next_pg:
1314
 
        if ((ret = __qam_position(dbc, &cp->recno, mode, &exact)) != 0)
1315
 
                goto done;
1316
 
 
1317
 
        pg = cp->page;
1318
 
        indx = cp->indx;
1319
 
        need_pg = 1;
1320
 
 
1321
 
        do {
1322
 
                /*
1323
 
                 * If this page is a nonexistent page at the end of an
1324
 
                 * extent, pg may be NULL.  A NULL page has no valid records,
1325
 
                 * so just keep looping as though qp exists and isn't QAM_VALID;
1326
 
                 * calling QAM_GET_RECORD is unsafe.
1327
 
                 */
1328
 
                valid = 0;
1329
 
 
1330
 
                /* Wrap around, skipping zero. */
1331
 
                if (cp->recno == RECNO_OOB)
1332
 
                        cp->recno++;
1333
 
                if (pg != NULL) {
1334
 
                        qp = QAM_GET_RECORD(dbp, pg, indx);
1335
 
                        if (F_ISSET(qp, QAM_VALID)) {
1336
 
                                valid = 1;
1337
 
                                space -= (is_key ? 3 : 2) * sizeof(*offp);
1338
 
                                if (space < 0)
1339
 
                                        goto get_space;
1340
 
                                if (need_pg) {
1341
 
                                        dp = np;
1342
 
                                        size = pagesize - QPAGE_SZ(dbp);
1343
 
                                        if (space < size) {
1344
 
get_space:
1345
 
                                                if (offp == endp) {
1346
 
                                                        data->size =
1347
 
                                                            ALIGN(size +
1348
 
                                                            pagesize,
1349
 
                                                            sizeof(u_int32_t));
1350
 
                                                        ret = ENOMEM;
1351
 
                                                        break;
1352
 
                                                }
1353
 
                                                if (indx != 0)
1354
 
                                                        indx--;
1355
 
                                                cp->recno--;
1356
 
                                                break;
1357
 
                                        }
1358
 
                                        memcpy(dp,
1359
 
                                            (char *)pg + QPAGE_SZ(dbp), size);
1360
 
                                        need_pg = 0;
1361
 
                                        space -= size;
1362
 
                                        np += size;
1363
 
                                }
1364
 
                                if (is_key)
1365
 
                                        *offp-- = cp->recno;
1366
 
                                *offp-- = (int32_t)((u_int8_t*)qp -
1367
 
                                    (u_int8_t*)pg - QPAGE_SZ(dbp) +
1368
 
                                    dp - dbuf + SSZA(QAMDATA, data));
1369
 
                                *offp-- = re_len;
1370
 
                        }
1371
 
                }
1372
 
                if (!valid && is_key == 0) {
1373
 
                        *offp-- = 0;
1374
 
                        *offp-- = 0;
1375
 
                }
1376
 
                cp->recno++;
1377
 
        } while (++indx < recs && indx != RECNO_OOB
1378
 
            && cp->recno != meta->cur_recno
1379
 
            && !QAM_AFTER_CURRENT(meta, cp->recno));
1380
 
 
1381
 
        if ((t_ret = __TLPUT(dbc, cp->lock)) != 0 && ret == 0)
1382
 
                ret = t_ret;
1383
 
 
1384
 
        if (cp->page != NULL) {
1385
 
                if ((t_ret =
1386
 
                    __qam_fput(dbp, cp->pgno, cp->page, 0)) != 0 && ret == 0)
1387
 
                        ret = t_ret;
1388
 
                cp->page = NULL;
1389
 
        }
1390
 
 
1391
 
        if (ret == 0
1392
 
            && (indx >= recs || indx == RECNO_OOB)
1393
 
            && cp->recno != meta->cur_recno
1394
 
            && !QAM_AFTER_CURRENT(meta, cp->recno))
1395
 
                goto next_pg;
1396
 
 
1397
 
        if (is_key == 1)
1398
 
                *offp = RECNO_OOB;
1399
 
        else
1400
 
                *offp = -1;
1401
 
 
1402
 
done:
1403
 
        /* release the meta page */
1404
 
        t_ret = mpf->put(mpf, meta, 0);
1405
 
 
1406
 
        if (!ret)
1407
 
                ret = t_ret;
1408
 
 
1409
 
        t_ret = __LPUT(dbc, metalock);
1410
 
 
1411
 
        return (ret);
1412
 
}
1413
 
 
1414
 
/*
1415
 
 * __qam_c_close --
1416
 
 *      Close down the cursor from a single use.
1417
 
 */
1418
 
static int
1419
 
__qam_c_close(dbc, root_pgno, rmroot)
1420
 
        DBC *dbc;
1421
 
        db_pgno_t root_pgno;
1422
 
        int *rmroot;
1423
 
{
1424
 
        QUEUE_CURSOR *cp;
1425
 
 
1426
 
        COMPQUIET(root_pgno, 0);
1427
 
        COMPQUIET(rmroot, NULL);
1428
 
 
1429
 
        cp = (QUEUE_CURSOR *)dbc->internal;
1430
 
 
1431
 
        /* Discard any locks not acquired inside of a transaction. */
1432
 
        (void)__TLPUT(dbc, cp->lock);
1433
 
        LOCK_INIT(cp->lock);
1434
 
 
1435
 
        cp->page = NULL;
1436
 
        cp->pgno = PGNO_INVALID;
1437
 
        cp->indx = 0;
1438
 
        cp->lock_mode = DB_LOCK_NG;
1439
 
        cp->recno = RECNO_OOB;
1440
 
        cp->flags = 0;
1441
 
 
1442
 
        return (0);
1443
 
}
1444
 
 
1445
 
/*
1446
 
 * __qam_c_dup --
1447
 
 *      Duplicate a queue cursor, such that the new one holds appropriate
1448
 
 *      locks for the position of the original.
1449
 
 *
1450
 
 * PUBLIC: int __qam_c_dup __P((DBC *, DBC *));
1451
 
 */
1452
 
int
1453
 
__qam_c_dup(orig_dbc, new_dbc)
1454
 
        DBC *orig_dbc, *new_dbc;
1455
 
{
1456
 
        QUEUE_CURSOR *orig, *new;
1457
 
 
1458
 
        orig = (QUEUE_CURSOR *)orig_dbc->internal;
1459
 
        new = (QUEUE_CURSOR *)new_dbc->internal;
1460
 
 
1461
 
        new->recno = orig->recno;
1462
 
 
1463
 
        /* reget the long term lock if we are not in a xact */
1464
 
        if (orig_dbc->txn != NULL ||
1465
 
            !STD_LOCKING(orig_dbc) || !LOCK_ISSET(orig->lock))
1466
 
                return (0);
1467
 
 
1468
 
        return (__db_lget(new_dbc,
1469
 
            0, new->recno, new->lock_mode, DB_LOCK_RECORD, &new->lock));
1470
 
}
1471
 
 
1472
 
/*
1473
 
 * __qam_c_init
1474
 
 *
1475
 
 * PUBLIC: int __qam_c_init __P((DBC *));
1476
 
 */
1477
 
int
1478
 
__qam_c_init(dbc)
1479
 
        DBC *dbc;
1480
 
{
1481
 
        QUEUE_CURSOR *cp;
1482
 
        DB *dbp;
1483
 
        int ret;
1484
 
 
1485
 
        dbp = dbc->dbp;
1486
 
 
1487
 
        /* Allocate the internal structure. */
1488
 
        cp = (QUEUE_CURSOR *)dbc->internal;
1489
 
        if (cp == NULL) {
1490
 
                if ((ret =
1491
 
                    __os_calloc(dbp->dbenv, 1, sizeof(QUEUE_CURSOR), &cp)) != 0)
1492
 
                        return (ret);
1493
 
                dbc->internal = (DBC_INTERNAL *)cp;
1494
 
        }
1495
 
 
1496
 
        /* Initialize methods. */
1497
 
        dbc->c_close = __db_c_close;
1498
 
        dbc->c_count = __db_c_count;
1499
 
        dbc->c_del = __db_c_del;
1500
 
        dbc->c_dup = __db_c_dup;
1501
 
        dbc->c_get = dbc->c_real_get = __db_c_get;
1502
 
        dbc->c_pget = __db_c_pget;
1503
 
        dbc->c_put = __db_c_put;
1504
 
        dbc->c_am_bulk = __qam_bulk;
1505
 
        dbc->c_am_close = __qam_c_close;
1506
 
        dbc->c_am_del = __qam_c_del;
1507
 
        dbc->c_am_destroy = __qam_c_destroy;
1508
 
        dbc->c_am_get = __qam_c_get;
1509
 
        dbc->c_am_put = __qam_c_put;
1510
 
        dbc->c_am_writelock = NULL;
1511
 
 
1512
 
        return (0);
1513
 
}
1514
 
 
1515
 
/*
1516
 
 * __qam_c_destroy --
1517
 
 *      Close a single cursor -- internal version.
1518
 
 */
1519
 
static int
1520
 
__qam_c_destroy(dbc)
1521
 
        DBC *dbc;
1522
 
{
1523
 
        /* Discard the structures. */
1524
 
        __os_free(dbc->dbp->dbenv, dbc->internal);
1525
 
 
1526
 
        return (0);
1527
 
}
1528
 
 
1529
 
/*
1530
 
 * __qam_getno --
1531
 
 *      Check the user's record number.
1532
 
 */
1533
 
static int
1534
 
__qam_getno(dbp, key, rep)
1535
 
        DB *dbp;
1536
 
        const DBT *key;
1537
 
        db_recno_t *rep;
1538
 
{
1539
 
        if ((*rep = *(db_recno_t *)key->data) == 0) {
1540
 
                __db_err(dbp->dbenv, "illegal record number of 0");
1541
 
                return (EINVAL);
1542
 
        }
1543
 
        return (0);
1544
 
}
1545
 
 
1546
 
/*
1547
 
 * __qam_truncate --
1548
 
 *      Truncate a queue database
1549
 
 *
1550
 
 * PUBLIC: int __qam_truncate __P((DB *, DB_TXN *, u_int32_t *));
1551
 
 */
1552
 
int
1553
 
__qam_truncate(dbp, txn, countp)
1554
 
        DB *dbp;
1555
 
        DB_TXN *txn;
1556
 
        u_int32_t *countp;
1557
 
{
1558
 
        DBC *dbc;
1559
 
        DB_LOCK metalock;
1560
 
        DB_MPOOLFILE *mpf;
1561
 
        QMETA *meta;
1562
 
        db_pgno_t metapno;
1563
 
        int count, ret, t_ret;
1564
 
 
1565
 
        mpf = dbp->mpf;
1566
 
 
1567
 
        /* Acquire a cursor. */
1568
 
        if ((ret = dbp->cursor(dbp, txn, &dbc, 0)) != 0)
1569
 
                return (ret);
1570
 
 
1571
 
        /* Walk the queue, counting rows. */
1572
 
        count = 0;
1573
 
        while ((ret = __qam_c_get(dbc, NULL, NULL, DB_CONSUME, &metapno)) == 0)
1574
 
                count++;
1575
 
 
1576
 
        if (ret == DB_NOTFOUND)
1577
 
                ret = 0;
1578
 
 
1579
 
        /* Discard the cursor. */
1580
 
        if ((t_ret = dbc->c_close(dbc)) != 0 && ret == 0)
1581
 
                ret = t_ret;
1582
 
 
1583
 
        if (ret != 0)
1584
 
                return (ret);
1585
 
 
1586
 
        /* update the meta page */
1587
 
        /* get the meta page */
1588
 
        metapno = ((QUEUE *)dbp->q_internal)->q_meta;
1589
 
        if ((ret =
1590
 
            __db_lget(dbc, 0, metapno, DB_LOCK_WRITE, 0, &metalock)) != 0)
1591
 
                return (ret);
1592
 
 
1593
 
        if ((ret = mpf->get(mpf, &metapno, 0, &meta)) != 0) {
1594
 
                /* We did not fetch it, we can release the lock. */
1595
 
                (void)__LPUT(dbc, metalock);
1596
 
                return (ret);
1597
 
        }
1598
 
        if (DBC_LOGGING(dbc)) {
1599
 
                ret = __qam_mvptr_log(dbp, dbc->txn, &meta->dbmeta.lsn, 0,
1600
 
                    QAM_SETCUR | QAM_SETFIRST | QAM_TRUNCATE, meta->first_recno,
1601
 
                    1, meta->cur_recno, 1, &meta->dbmeta.lsn, PGNO_BASE_MD);
1602
 
        }
1603
 
        if (ret == 0)
1604
 
                meta->first_recno = meta->cur_recno = 1;
1605
 
 
1606
 
        if ((t_ret =
1607
 
            mpf->put(mpf, meta, ret == 0 ? DB_MPOOL_DIRTY: 0)) != 0 && ret == 0)
1608
 
                ret = t_ret;
1609
 
        if ((t_ret = __LPUT(dbc, metalock)) != 0 && ret == 0)
1610
 
                ret = t_ret;
1611
 
 
1612
 
        *countp = count;
1613
 
 
1614
 
        return (ret);
1615
 
}