~ubuntu-branches/ubuntu/maverick/evolution-data-server/maverick-proposed

« back to all changes in this revision

Viewing changes to libdb/rep/rep_record.c

  • Committer: Bazaar Package Importer
  • Author(s): Didier Roche
  • Date: 2010-05-17 17:02:06 UTC
  • mfrom: (1.1.79 upstream) (1.6.12 experimental)
  • Revision ID: james.westby@ubuntu.com-20100517170206-4ufr52vwrhh26yh0
Tags: 2.30.1-1ubuntu1
* Merge from debian experimental. Remaining change:
  (LP: #42199, #229669, #173703, #360344, #508494)
  + debian/control:
    - add Vcs-Bzr tag
    - don't use libgnome
    - Use Breaks instead of Conflicts against evolution 2.25 and earlier.
  + debian/evolution-data-server.install,
    debian/patches/45_libcamel_providers_version.patch:
    - use the upstream versioning, not a Debian-specific one 
  + debian/libedata-book1.2-dev.install, debian/libebackend-1.2-dev.install,
    debian/libcamel1.2-dev.install, debian/libedataserverui1.2-dev.install:
    - install html documentation
  + debian/rules:
    - don't build documentation it's shipped with the tarball

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
/*-
2
 
 * See the file LICENSE for redistribution information.
3
 
 *
4
 
 * Copyright (c) 2001-2002
5
 
 *      Sleepycat Software.  All rights reserved.
6
 
 */
7
 
 
8
 
#include "db_config.h"
9
 
 
10
 
#ifndef lint
11
 
static const char revid[] = "$Id$";
12
 
#endif /* not lint */
13
 
 
14
 
#ifndef NO_SYSTEM_INCLUDES
15
 
#include <stdlib.h>
16
 
#include <string.h>
17
 
#endif
18
 
 
19
 
#include "db_int.h"
20
 
#include "dbinc/db_page.h"
21
 
#include "dbinc/db_am.h"
22
 
#include "dbinc/log.h"
23
 
#include "dbinc/rep.h"
24
 
#include "dbinc/txn.h"
25
 
 
26
 
static int __rep_apply __P((DB_ENV *, REP_CONTROL *, DBT *));
27
 
static int __rep_collect_txn __P((DB_ENV *, DB_LSN *, LSN_COLLECTION *));
28
 
static int __rep_lsn_cmp __P((const void *, const void *));
29
 
static int __rep_newfile __P((DB_ENV *, REP_CONTROL *, DBT *, DB_LSN *));
30
 
 
31
 
#define IS_SIMPLE(R)    ((R) != DB___txn_regop && \
32
 
    (R) != DB___txn_ckp && (R) != DB___dbreg_register)
33
 
 
34
 
/*
35
 
 * __rep_process_message --
36
 
 *
37
 
 * This routine takes an incoming message and processes it.
38
 
 *
39
 
 * control: contains the control fields from the record
40
 
 * rec: contains the actual record
41
 
 * eidp: contains the machine id of the sender of the message;
42
 
 *      in the case of a DB_NEWMASTER message, returns the eid
43
 
 *      of the new master.
44
 
 *
45
 
 * PUBLIC: int __rep_process_message __P((DB_ENV *, DBT *, DBT *, int *));
46
 
 */
47
 
int
48
 
__rep_process_message(dbenv, control, rec, eidp)
49
 
        DB_ENV *dbenv;
50
 
        DBT *control, *rec;
51
 
        int *eidp;
52
 
{
53
 
        DB_LOG *dblp;
54
 
        DB_LOGC *logc;
55
 
        DB_LSN init_lsn, lsn, newfilelsn, oldfilelsn;
56
 
        DB_REP *db_rep;
57
 
        DBT *d, data_dbt, lsndbt, mylog;
58
 
        LOG *lp;
59
 
        REP *rep;
60
 
        REP_CONTROL *rp;
61
 
        REP_VOTE_INFO *vi;
62
 
        u_int32_t bytes, gen, gbytes, type, unused;
63
 
        int check_limit, cmp, done, do_req, i;
64
 
        int master, old, recovering, ret, t_ret, *tally;
65
 
 
66
 
        PANIC_CHECK(dbenv);
67
 
        ENV_REQUIRES_CONFIG(dbenv, dbenv->tx_handle, "rep_stat", DB_INIT_TXN);
68
 
 
69
 
        /* Control argument must be non-Null. */
70
 
        if (control == NULL || control->size == 0) {
71
 
                __db_err(dbenv,
72
 
        "DB_ENV->rep_process_message: control argument must be specified");
73
 
                return (EINVAL);
74
 
        }
75
 
 
76
 
        ret = 0;
77
 
        db_rep = dbenv->rep_handle;
78
 
        rep = db_rep->region;
79
 
        dblp = dbenv->lg_handle;
80
 
        lp = dblp->reginfo.primary;
81
 
 
82
 
        MUTEX_LOCK(dbenv, db_rep->mutexp);
83
 
        gen = rep->gen;
84
 
        recovering = F_ISSET(rep, REP_F_RECOVER);
85
 
 
86
 
        rep->stat.st_msgs_processed++;
87
 
        MUTEX_UNLOCK(dbenv, db_rep->mutexp);
88
 
 
89
 
        rp = (REP_CONTROL *)control->data;
90
 
 
91
 
#if 0
92
 
        __rep_print_message(dbenv, *eidp, rp, "rep_process_message");
93
 
#endif
94
 
 
95
 
        /* Complain if we see an improper version number. */
96
 
        if (rp->rep_version != DB_REPVERSION) {
97
 
                __db_err(dbenv,
98
 
                    "unexpected replication message version %d, expected %d",
99
 
                    rp->rep_version, DB_REPVERSION);
100
 
                return (EINVAL);
101
 
        }
102
 
        if (rp->log_version != DB_LOGVERSION) {
103
 
                __db_err(dbenv,
104
 
                    "unexpected log record version %d, expected %d",
105
 
                    rp->log_version, DB_LOGVERSION);
106
 
                return (EINVAL);
107
 
        }
108
 
 
109
 
        /*
110
 
         * Check for generation number matching.  Ignore any old messages
111
 
         * except requests that are indicative of a new client that needs
112
 
         * to get in sync.
113
 
         */
114
 
        if (rp->gen < gen && rp->rectype != REP_ALIVE_REQ &&
115
 
            rp->rectype != REP_NEWCLIENT && rp->rectype != REP_MASTER_REQ) {
116
 
                /*
117
 
                 * We don't hold the rep mutex, and could miscount if we race.
118
 
                 */
119
 
                rep->stat.st_msgs_badgen++;
120
 
                return (0);
121
 
        }
122
 
        if (rp->gen > gen && rp->rectype != REP_ALIVE &&
123
 
            rp->rectype != REP_NEWMASTER)
124
 
                return (__rep_send_message(dbenv,
125
 
                    DB_EID_BROADCAST, REP_MASTER_REQ, NULL, NULL, 0));
126
 
 
127
 
        /*
128
 
         * We need to check if we're in recovery and if we are
129
 
         * then we need to ignore any messages except VERIFY, VOTE,
130
 
         * ELECT (the master might fail while we are recovering), and
131
 
         * ALIVE_REQ.
132
 
         */
133
 
        if (recovering)
134
 
                switch(rp->rectype) {
135
 
                        case REP_ALIVE:
136
 
                        case REP_ALIVE_REQ:
137
 
                        case REP_ELECT:
138
 
                        case REP_NEWCLIENT:
139
 
                        case REP_NEWMASTER:
140
 
                        case REP_NEWSITE:
141
 
                        case REP_VERIFY:
142
 
                                R_LOCK(dbenv, &dblp->reginfo);
143
 
                                cmp = log_compare(&lp->verify_lsn, &rp->lsn);
144
 
                                R_UNLOCK(dbenv, &dblp->reginfo);
145
 
                                if (cmp != 0)
146
 
                                        goto skip;
147
 
                                /* FALLTHROUGH */
148
 
                        case REP_VOTE1:
149
 
                        case REP_VOTE2:
150
 
                                break;
151
 
                        default:
152
 
skip:                           /*
153
 
                                 * We don't hold the rep mutex, and could
154
 
                                 * miscount if we race.
155
 
                                 */
156
 
                                rep->stat.st_msgs_recover++;
157
 
 
158
 
                                /* Check for need to retransmit. */
159
 
                                R_LOCK(dbenv, &dblp->reginfo);
160
 
                                do_req = *eidp == rep->master_id &&
161
 
                                    ++lp->rcvd_recs >= lp->wait_recs;
162
 
                                if (do_req) {
163
 
                                        lp->wait_recs *= 2;
164
 
                                        if (lp->wait_recs + rep->max_gap)
165
 
                                                lp->wait_recs = rep->max_gap;
166
 
                                        lp->rcvd_recs = 0;
167
 
                                        lsn = lp->verify_lsn;
168
 
                                }
169
 
                                R_UNLOCK(dbenv, &dblp->reginfo);
170
 
                                if (do_req)
171
 
                                        ret = __rep_send_message(dbenv, *eidp,
172
 
                                            REP_VERIFY_REQ, &lsn, NULL, 0);
173
 
 
174
 
                                return (ret);
175
 
                }
176
 
 
177
 
        switch(rp->rectype) {
178
 
        case REP_ALIVE:
179
 
                ANYSITE(dbenv);
180
 
                if (rp->gen > gen && rp->flags)
181
 
                        return (__rep_new_master(dbenv, rp, *eidp));
182
 
                break;
183
 
        case REP_ALIVE_REQ:
184
 
                ANYSITE(dbenv);
185
 
                dblp = dbenv->lg_handle;
186
 
                R_LOCK(dbenv, &dblp->reginfo);
187
 
                lsn = ((LOG *)dblp->reginfo.primary)->lsn;
188
 
                R_UNLOCK(dbenv, &dblp->reginfo);
189
 
                return (__rep_send_message(dbenv,
190
 
                    *eidp, REP_ALIVE, &lsn, NULL,
191
 
                    F_ISSET(dbenv, DB_ENV_REP_MASTER) ? 1 : 0));
192
 
        case REP_ALL_REQ:
193
 
                MASTER_ONLY(dbenv);
194
 
                gbytes  = bytes = 0;
195
 
                MUTEX_LOCK(dbenv, db_rep->mutexp);
196
 
                gbytes = rep->gbytes;
197
 
                bytes = rep->bytes;
198
 
                MUTEX_UNLOCK(dbenv, db_rep->mutexp);
199
 
                check_limit = gbytes != 0 || bytes != 0;
200
 
                if ((ret = dbenv->log_cursor(dbenv, &logc, 0)) != 0)
201
 
                        return (ret);
202
 
                memset(&data_dbt, 0, sizeof(data_dbt));
203
 
                oldfilelsn = lsn = rp->lsn;
204
 
                type = REP_LOG;
205
 
                for (ret = logc->get(logc, &rp->lsn, &data_dbt, DB_SET);
206
 
                    ret == 0 && type == REP_LOG;
207
 
                    ret = logc->get(logc, &lsn, &data_dbt, DB_NEXT)) {
208
 
                        /*
209
 
                         * lsn.offset will only be 0 if this is the
210
 
                         * beginning of the log;  DB_SET, but not DB_NEXT,
211
 
                         * can set the log cursor to [n][0].
212
 
                         */
213
 
                        if (lsn.offset == 0)
214
 
                                ret = __rep_send_message(dbenv, *eidp,
215
 
                                    REP_NEWFILE, &lsn, NULL, 0);
216
 
                        else {
217
 
                                /*
218
 
                                 * DB_NEXT will never run into offsets
219
 
                                 * of 0;  thus, when a log file changes,
220
 
                                 * we'll have a real log record with
221
 
                                 * some lsn [n][m], and we'll also want to send
222
 
                                 * a NEWFILE message with lsn [n][0].
223
 
                                 * So that the client can detect gaps,
224
 
                                 * send in the rec parameter the
225
 
                                 * last LSN in the old file.
226
 
                                 */
227
 
                                if (lsn.file != oldfilelsn.file) {
228
 
                                        newfilelsn.file = lsn.file;
229
 
                                        newfilelsn.offset = 0;
230
 
 
231
 
                                        memset(&lsndbt, 0, sizeof(DBT));
232
 
                                        lsndbt.size = sizeof(DB_LSN);
233
 
                                        lsndbt.data = &oldfilelsn;
234
 
 
235
 
                                        if ((ret = __rep_send_message(dbenv,
236
 
                                            *eidp, REP_NEWFILE, &newfilelsn,
237
 
                                            &lsndbt, 0)) != 0)
238
 
                                                break;
239
 
                                }
240
 
                                if (check_limit) {
241
 
                                        /*
242
 
                                         * data_dbt.size is only the size of
243
 
                                         * the log record;  it doesn't count
244
 
                                         * the size of the control structure.
245
 
                                         * Factor that in as well so we're
246
 
                                         * not off by a lot if our log
247
 
                                         * records are small.
248
 
                                         */
249
 
                                        while (bytes < data_dbt.size +
250
 
                                            sizeof(REP_CONTROL)) {
251
 
                                                if (gbytes > 0) {
252
 
                                                        bytes += GIGABYTE;
253
 
                                                        --gbytes;
254
 
                                                        continue;
255
 
                                                }
256
 
                                                /*
257
 
                                                 * We don't hold the rep mutex,
258
 
                                                 * and may miscount.
259
 
                                                 */
260
 
                                                rep->stat.st_nthrottles++;
261
 
                                                type = REP_LOG_MORE;
262
 
                                                goto send;
263
 
                                        }
264
 
                                        bytes -= (data_dbt.size +
265
 
                                            sizeof(REP_CONTROL));
266
 
                                }
267
 
send:                           ret = __rep_send_message(dbenv, *eidp,
268
 
                                    type, &lsn, &data_dbt, 0);
269
 
                        }
270
 
 
271
 
                        /*
272
 
                         * In case we're about to change files and need it
273
 
                         * for a NEWFILE message, save the current LSN.
274
 
                         */
275
 
                        oldfilelsn = lsn;
276
 
                }
277
 
 
278
 
                if (ret == DB_NOTFOUND)
279
 
                        ret = 0;
280
 
                if ((t_ret = logc->close(logc, 0)) != 0 && ret == 0)
281
 
                        ret = t_ret;
282
 
                return (ret);
283
 
        case REP_ELECT:
284
 
                if (F_ISSET(dbenv, DB_ENV_REP_MASTER)) {
285
 
                        R_LOCK(dbenv, &dblp->reginfo);
286
 
                        lsn = lp->lsn;
287
 
                        R_UNLOCK(dbenv, &dblp->reginfo);
288
 
                        MUTEX_LOCK(dbenv, db_rep->mutexp);
289
 
                        rep->gen++;
290
 
                        MUTEX_UNLOCK(dbenv, db_rep->mutexp);
291
 
                        return (__rep_send_message(dbenv,
292
 
                            *eidp, REP_NEWMASTER, &lsn, NULL, 0));
293
 
                }
294
 
                MUTEX_LOCK(dbenv, db_rep->mutexp);
295
 
                ret = IN_ELECTION(rep) ? 0 : DB_REP_HOLDELECTION;
296
 
                MUTEX_UNLOCK(dbenv, db_rep->mutexp);
297
 
                return (ret);
298
 
#ifdef NOTYET
299
 
        case REP_FILE: /* TODO */
300
 
                CLIENT_ONLY(dbenv);
301
 
                break;
302
 
        case REP_FILE_REQ:
303
 
                MASTER_ONLY(dbenv);
304
 
                return (__rep_send_file(dbenv, rec, *eidp));
305
 
                break;
306
 
#endif
307
 
        case REP_LOG:
308
 
        case REP_LOG_MORE:
309
 
                CLIENT_ONLY(dbenv);
310
 
                if ((ret = __rep_apply(dbenv, rp, rec)) != 0)
311
 
                        return (ret);
312
 
                if (rp->rectype == REP_LOG_MORE) {
313
 
                        MUTEX_LOCK(dbenv, db_rep->db_mutexp);
314
 
                        master = rep->master_id;
315
 
                        MUTEX_UNLOCK(dbenv, db_rep->db_mutexp);
316
 
                        R_LOCK(dbenv, &dblp->reginfo);
317
 
                        lsn = lp->lsn;
318
 
                        R_UNLOCK(dbenv, &dblp->reginfo);
319
 
                        ret = __rep_send_message(dbenv, master,
320
 
                            REP_ALL_REQ, &lsn, NULL, 0);
321
 
                }
322
 
                return (ret);
323
 
        case REP_LOG_REQ:
324
 
                MASTER_ONLY(dbenv);
325
 
                if ((ret = dbenv->log_cursor(dbenv, &logc, 0)) != 0)
326
 
                        return (ret);
327
 
                memset(&data_dbt, 0, sizeof(data_dbt));
328
 
                lsn = rp->lsn;
329
 
 
330
 
                /*
331
 
                 * There are three different cases here.
332
 
                 * 1. We asked for a particular LSN and got it.
333
 
                 * 2. We asked for an LSN of X,0 which is invalid and got the
334
 
                 *      first log record in a particular file.
335
 
                 * 3. We asked for an LSN and it's not found because it is
336
 
                 *      beyond the end of a log file and we need a NEWFILE msg.
337
 
                 */
338
 
                ret = logc->get(logc, &rp->lsn, &data_dbt, DB_SET);
339
 
                cmp = log_compare(&lsn, &rp->lsn);
340
 
 
341
 
                if (ret == 0 && cmp == 0) /* Case 1 */
342
 
                        ret = __rep_send_message(dbenv, *eidp,
343
 
                                    REP_LOG, &rp->lsn, &data_dbt, 0);
344
 
                else if (ret == DB_NOTFOUND ||
345
 
                    (ret == 0 && cmp < 0 && rp->lsn.offset == 0))
346
 
                        /* Cases 2 and 3: Send a NEWFILE message. */
347
 
                        ret = __rep_send_message(dbenv, *eidp,
348
 
                            REP_NEWFILE, &lsn, NULL, 0);
349
 
 
350
 
                if ((t_ret = logc->close(logc, 0)) != 0 && ret == 0)
351
 
                        ret = t_ret;
352
 
                return (ret);
353
 
        case REP_NEWSITE:
354
 
                /* We don't hold the rep mutex, and may miscount. */
355
 
                rep->stat.st_newsites++;
356
 
 
357
 
                /* This is a rebroadcast; simply tell the application. */
358
 
                if (F_ISSET(dbenv, DB_ENV_REP_MASTER)) {
359
 
                        dblp = dbenv->lg_handle;
360
 
                        lp = dblp->reginfo.primary;
361
 
                        R_LOCK(dbenv, &dblp->reginfo);
362
 
                        lsn = lp->lsn;
363
 
                        R_UNLOCK(dbenv, &dblp->reginfo);
364
 
                        (void)__rep_send_message(dbenv,
365
 
                            *eidp, REP_NEWMASTER, &lsn, NULL, 0);
366
 
                }
367
 
                return (DB_REP_NEWSITE);
368
 
        case REP_NEWCLIENT:
369
 
                /*
370
 
                 * This message was received and should have resulted in the
371
 
                 * application entering the machine ID in its machine table.
372
 
                 * We respond to this with an ALIVE to send relevant information
373
 
                 * to the new client.  But first, broadcast the new client's
374
 
                 * record to all the clients.
375
 
                 */
376
 
                if ((ret = __rep_send_message(dbenv,
377
 
                    DB_EID_BROADCAST, REP_NEWSITE, &rp->lsn, rec, 0)) != 0)
378
 
                        return (ret);
379
 
 
380
 
                if (F_ISSET(dbenv, DB_ENV_REP_CLIENT))
381
 
                        return (0);
382
 
 
383
 
                 /* FALLTHROUGH */
384
 
        case REP_MASTER_REQ:
385
 
                ANYSITE(dbenv);
386
 
                if (F_ISSET(dbenv, DB_ENV_REP_CLIENT))
387
 
                        return (0);
388
 
                dblp = dbenv->lg_handle;
389
 
                lp = dblp->reginfo.primary;
390
 
                R_LOCK(dbenv, &dblp->reginfo);
391
 
                lsn = lp->lsn;
392
 
                R_UNLOCK(dbenv, &dblp->reginfo);
393
 
                return (__rep_send_message(dbenv,
394
 
                    *eidp, REP_NEWMASTER, &lsn, NULL, 0));
395
 
        case REP_NEWFILE:
396
 
                CLIENT_ONLY(dbenv);
397
 
                return (__rep_apply(dbenv, rp, rec));
398
 
        case REP_NEWMASTER:
399
 
                ANYSITE(dbenv);
400
 
                if (F_ISSET(dbenv, DB_ENV_REP_MASTER) &&
401
 
                    *eidp != dbenv->rep_eid) {
402
 
                        /* We don't hold the rep mutex, and may miscount. */
403
 
                        rep->stat.st_dupmasters++;
404
 
                        return (DB_REP_DUPMASTER);
405
 
                }
406
 
                return (__rep_new_master(dbenv, rp, *eidp));
407
 
        case REP_PAGE: /* TODO */
408
 
                CLIENT_ONLY(dbenv);
409
 
                break;
410
 
        case REP_PAGE_REQ: /* TODO */
411
 
                MASTER_ONLY(dbenv);
412
 
                break;
413
 
        case REP_PLIST: /* TODO */
414
 
                CLIENT_ONLY(dbenv);
415
 
                break;
416
 
        case REP_PLIST_REQ: /* TODO */
417
 
                MASTER_ONLY(dbenv);
418
 
                break;
419
 
        case REP_VERIFY:
420
 
                CLIENT_ONLY(dbenv);
421
 
                DB_ASSERT((F_ISSET(rep, REP_F_RECOVER) &&
422
 
                    !IS_ZERO_LSN(lp->verify_lsn)) ||
423
 
                    (!F_ISSET(rep, REP_F_RECOVER) &&
424
 
                    IS_ZERO_LSN(lp->verify_lsn)));
425
 
                if (IS_ZERO_LSN(lp->verify_lsn))
426
 
                        return (0);
427
 
 
428
 
                if ((ret = dbenv->log_cursor(dbenv, &logc, 0)) != 0)
429
 
                        return (ret);
430
 
                memset(&mylog, 0, sizeof(mylog));
431
 
                if ((ret = logc->get(logc, &rp->lsn, &mylog, DB_SET)) != 0)
432
 
                        goto rep_verify_err;
433
 
                if (mylog.size == rec->size &&
434
 
                    memcmp(mylog.data, rec->data, rec->size) == 0) {
435
 
                        /*
436
 
                         * If we're a logs-only client, we can simply truncate
437
 
                         * the log to the point where it last agreed with the
438
 
                         * master's;  otherwise, recover to that point.
439
 
                         */
440
 
                        R_LOCK(dbenv, &dblp->reginfo);
441
 
                        ZERO_LSN(lp->verify_lsn);
442
 
                        R_UNLOCK(dbenv, &dblp->reginfo);
443
 
                        if (F_ISSET(dbenv, DB_ENV_REP_LOGSONLY)) {
444
 
                                INIT_LSN(init_lsn);
445
 
                                if ((ret = dbenv->log_flush(dbenv,
446
 
                                    &rp->lsn)) != 0 ||
447
 
                                    (ret = __log_vtruncate(dbenv,
448
 
                                    &rp->lsn, &init_lsn)) != 0)
449
 
                                        goto rep_verify_err;
450
 
                        } else if ((ret = __db_apprec(dbenv, &rp->lsn, 0)) != 0)
451
 
                                goto rep_verify_err;
452
 
 
453
 
                        /*
454
 
                         * The log has been truncated (either by __db_apprec or
455
 
                         * directly).  We want to make sure we're waiting for
456
 
                         * the LSN at the new end-of-log, not some later point.
457
 
                         */
458
 
                        R_LOCK(dbenv, &dblp->reginfo);
459
 
                        lp->ready_lsn = lp->lsn;
460
 
                        ZERO_LSN(lp->waiting_lsn);
461
 
                        R_UNLOCK(dbenv, &dblp->reginfo);
462
 
 
463
 
                        /*
464
 
                         * Discard any log records we have queued;  we're
465
 
                         * about to re-request them, and can't trust the
466
 
                         * ones in the queue.
467
 
                         */
468
 
                        MUTEX_LOCK(dbenv, db_rep->db_mutexp);
469
 
                        if ((ret = db_rep->rep_db->truncate(db_rep->rep_db,
470
 
                            NULL, &unused, 0)) != 0) {
471
 
                                MUTEX_UNLOCK(dbenv, db_rep->db_mutexp);
472
 
                                goto rep_verify_err;
473
 
                        }
474
 
                        rep->stat.st_log_queued = 0;
475
 
                        MUTEX_UNLOCK(dbenv, db_rep->db_mutexp);
476
 
 
477
 
                        MUTEX_LOCK(dbenv, db_rep->mutexp);
478
 
                        F_CLR(rep, REP_F_RECOVER);
479
 
 
480
 
                        /*
481
 
                         * If the master_id is invalid, this means that since
482
 
                         * the last record was sent, somebody declared an
483
 
                         * election and we may not have a master to request
484
 
                         * things of.
485
 
                         *
486
 
                         * This is not an error;  when we find a new master,
487
 
                         * we'll re-negotiate where the end of the log is and
488
 
                         * try to bring ourselves up to date again anyway.
489
 
                         */
490
 
                        if ((master = rep->master_id) == DB_EID_INVALID) {
491
 
                                DB_ASSERT(IN_ELECTION(rep));
492
 
                                MUTEX_UNLOCK(dbenv, db_rep->mutexp);
493
 
                                ret = 0;
494
 
                        } else {
495
 
                                MUTEX_UNLOCK(dbenv, db_rep->mutexp);
496
 
                                ret = __rep_send_message(dbenv, master,
497
 
                                    REP_ALL_REQ, &rp->lsn, NULL, 0);
498
 
                        }
499
 
                } else if ((ret =
500
 
                    logc->get(logc, &lsn, &mylog, DB_PREV)) == 0) {
501
 
                        R_LOCK(dbenv, &dblp->reginfo);
502
 
                        lp->verify_lsn = lsn;
503
 
                        lp->rcvd_recs = 0;
504
 
                        lp->wait_recs = rep->request_gap;
505
 
                        R_UNLOCK(dbenv, &dblp->reginfo);
506
 
                        ret = __rep_send_message(dbenv,
507
 
                            *eidp, REP_VERIFY_REQ, &lsn, NULL, 0);
508
 
                }
509
 
 
510
 
rep_verify_err: if ((t_ret = logc->close(logc, 0)) != 0 && ret == 0)
511
 
                        ret = t_ret;
512
 
                return (ret);
513
 
        case REP_VERIFY_FAIL:
514
 
                rep->stat.st_outdated++;
515
 
                return (DB_REP_OUTDATED);
516
 
        case REP_VERIFY_REQ:
517
 
                MASTER_ONLY(dbenv);
518
 
                type = REP_VERIFY;
519
 
                if ((ret = dbenv->log_cursor(dbenv, &logc, 0)) != 0)
520
 
                        return (ret);
521
 
                d = &data_dbt;
522
 
                memset(d, 0, sizeof(data_dbt));
523
 
                F_SET(logc, DB_LOG_SILENT_ERR);
524
 
                ret = logc->get(logc, &rp->lsn, d, DB_SET);
525
 
                /*
526
 
                 * If the LSN was invalid, then we might get a not
527
 
                 * found, we might get an EIO, we could get anything.
528
 
                 * If we get a DB_NOTFOUND, then there is a chance that
529
 
                 * the LSN comes before the first file present in which
530
 
                 * case we need to return a fail so that the client can return
531
 
                 * a DB_OUTDATED.
532
 
                 */
533
 
                if (ret == DB_NOTFOUND &&
534
 
                    __log_is_outdated(dbenv, rp->lsn.file, &old) == 0 &&
535
 
                    old != 0)
536
 
                        type = REP_VERIFY_FAIL;
537
 
 
538
 
                if (ret != 0)
539
 
                        d = NULL;
540
 
 
541
 
                ret = __rep_send_message(dbenv, *eidp, type, &rp->lsn, d, 0);
542
 
                if ((t_ret = logc->close(logc, 0)) != 0 && ret == 0)
543
 
                        ret = t_ret;
544
 
                return (ret);
545
 
        case REP_VOTE1:
546
 
                if (F_ISSET(dbenv, DB_ENV_REP_MASTER)) {
547
 
#ifdef DIAGNOSTIC
548
 
                        if (FLD_ISSET(dbenv->verbose, DB_VERB_REPLICATION))
549
 
                                __db_err(dbenv, "Master received vote");
550
 
#endif
551
 
                        R_LOCK(dbenv, &dblp->reginfo);
552
 
                        lsn = lp->lsn;
553
 
                        R_UNLOCK(dbenv, &dblp->reginfo);
554
 
                        return (__rep_send_message(dbenv,
555
 
                            *eidp, REP_NEWMASTER, &lsn, NULL, 0));
556
 
                }
557
 
 
558
 
                vi = (REP_VOTE_INFO *)rec->data;
559
 
                MUTEX_LOCK(dbenv, db_rep->mutexp);
560
 
 
561
 
                /*
562
 
                 * If you get a vote and you're not in an election, simply
563
 
                 * return an indicator to hold an election which will trigger
564
 
                 * this site to send its vote again.
565
 
                 */
566
 
                if (!IN_ELECTION(rep)) {
567
 
#ifdef DIAGNOSTIC
568
 
                        if (FLD_ISSET(dbenv->verbose, DB_VERB_REPLICATION))
569
 
                                __db_err(dbenv,
570
 
                                    "Not in election, but received vote1");
571
 
#endif
572
 
                        ret = DB_REP_HOLDELECTION;
573
 
                        goto unlock;
574
 
                }
575
 
 
576
 
                if (F_ISSET(rep, REP_F_EPHASE2))
577
 
                        goto unlock;
578
 
 
579
 
                /* Check if this site knows about more sites than we do. */
580
 
                if (vi->nsites > rep->nsites)
581
 
                        rep->nsites = vi->nsites;
582
 
 
583
 
                /* Check if we've heard from this site already. */
584
 
                tally = R_ADDR((REGINFO *)dbenv->reginfo, rep->tally_off);
585
 
                for (i = 0; i < rep->sites; i++) {
586
 
                        if (tally[i] == *eidp)
587
 
                                /* Duplicate vote. */
588
 
                                goto unlock;
589
 
                }
590
 
 
591
 
                /*
592
 
                 * We are keeping vote, let's see if that changes our count of
593
 
                 * the number of sites.
594
 
                 */
595
 
                if (rep->sites + 1 > rep->nsites)
596
 
                        rep->nsites = rep->sites + 1;
597
 
                if (rep->nsites > rep->asites &&
598
 
                    (ret = __rep_grow_sites(dbenv, rep->nsites)) != 0)
599
 
                                goto unlock;
600
 
 
601
 
                tally[rep->sites] = *eidp;
602
 
                rep->sites++;
603
 
 
604
 
                /*
605
 
                 * Change winners if the incoming record has a higher
606
 
                 * priority, or an equal priority but a larger LSN, or
607
 
                 * an equal priority and LSN but higher "tiebreaker" value.
608
 
                 */
609
 
#ifdef DIAGNOSTIC
610
 
                if (FLD_ISSET(dbenv->verbose, DB_VERB_REPLICATION)) {
611
 
                        __db_err(dbenv,
612
 
                            "%s(eid)%d (pri)%d (gen)%d (sites)%d [%d,%d]",
613
 
                            "Existing vote: ",
614
 
                            rep->winner, rep->w_priority, rep->w_gen,
615
 
                            rep->sites, rep->w_lsn.file, rep->w_lsn.offset);
616
 
                        __db_err(dbenv,
617
 
                            "Incoming vote: (eid)%d (pri)%d (gen)%d [%d,%d]",
618
 
                            *eidp, vi->priority, rp->gen, rp->lsn.file,
619
 
                            rp->lsn.offset);
620
 
                }
621
 
#endif
622
 
                cmp = log_compare(&rp->lsn, &rep->w_lsn);
623
 
                if (vi->priority > rep->w_priority ||
624
 
                    (vi->priority != 0 && vi->priority == rep->w_priority &&
625
 
                    (cmp > 0 ||
626
 
                    (cmp == 0 && vi->tiebreaker > rep->w_tiebreaker)))) {
627
 
#ifdef DIAGNOSTIC
628
 
                        if (FLD_ISSET(dbenv->verbose, DB_VERB_REPLICATION))
629
 
                                __db_err(dbenv, "Accepting new vote");
630
 
#endif
631
 
                        rep->winner = *eidp;
632
 
                        rep->w_priority = vi->priority;
633
 
                        rep->w_lsn = rp->lsn;
634
 
                        rep->w_gen = rp->gen;
635
 
                }
636
 
                master = rep->winner;
637
 
                lsn = rep->w_lsn;
638
 
                done = rep->sites == rep->nsites && rep->w_priority != 0;
639
 
                if (done) {
640
 
#ifdef DIAGNOSTIC
641
 
                        if (FLD_ISSET(dbenv->verbose, DB_VERB_REPLICATION)) {
642
 
                                __db_err(dbenv, "Phase1 election done");
643
 
                                __db_err(dbenv, "Voting for %d%s",
644
 
                                    master, master == rep->eid ? "(self)" : "");
645
 
                        }
646
 
#endif
647
 
                        F_CLR(rep, REP_F_EPHASE1);
648
 
                        F_SET(rep, REP_F_EPHASE2);
649
 
                }
650
 
 
651
 
                if (done && master == rep->eid) {
652
 
                        rep->votes++;
653
 
                        MUTEX_UNLOCK(dbenv, db_rep->mutexp);
654
 
                        return (0);
655
 
                }
656
 
                MUTEX_UNLOCK(dbenv, db_rep->mutexp);
657
 
 
658
 
                /* Vote for someone else. */
659
 
                if (done)
660
 
                        return (__rep_send_message(dbenv,
661
 
                            master, REP_VOTE2, NULL, NULL, 0));
662
 
 
663
 
                /* Election is still going on. */
664
 
                break;
665
 
        case REP_VOTE2:
666
 
#ifdef DIAGNOSTIC
667
 
                if (FLD_ISSET(dbenv->verbose, DB_VERB_REPLICATION))
668
 
                        __db_err(dbenv, "We received a vote%s",
669
 
                            F_ISSET(dbenv, DB_ENV_REP_MASTER) ?
670
 
                            " (master)" : "");
671
 
#endif
672
 
                if (F_ISSET(dbenv, DB_ENV_REP_MASTER)) {
673
 
                        R_LOCK(dbenv, &dblp->reginfo);
674
 
                        lsn = lp->lsn;
675
 
                        R_UNLOCK(dbenv, &dblp->reginfo);
676
 
                        rep->stat.st_elections_won++;
677
 
                        return (__rep_send_message(dbenv,
678
 
                            *eidp, REP_NEWMASTER, &lsn, NULL, 0));
679
 
                }
680
 
 
681
 
                MUTEX_LOCK(dbenv, db_rep->mutexp);
682
 
 
683
 
                /* If we have priority 0, we should never get a vote. */
684
 
                DB_ASSERT(rep->priority != 0);
685
 
 
686
 
                if (!IN_ELECTION(rep)) {
687
 
#ifdef DIAGNOSTIC
688
 
                        if (FLD_ISSET(dbenv->verbose, DB_VERB_REPLICATION))
689
 
                                __db_err(dbenv, "Not in election, got vote");
690
 
#endif
691
 
                        MUTEX_UNLOCK(dbenv, db_rep->mutexp);
692
 
                        return (DB_REP_HOLDELECTION);
693
 
                }
694
 
                /* avoid counting duplicates. */
695
 
                rep->votes++;
696
 
                done = rep->votes > rep->nsites / 2;
697
 
                if (done) {
698
 
                        rep->master_id = rep->eid;
699
 
                        rep->gen = rep->w_gen + 1;
700
 
                        ELECTION_DONE(rep);
701
 
                        F_CLR(rep, REP_F_UPGRADE);
702
 
                        F_SET(rep, REP_F_MASTER);
703
 
                        *eidp = rep->master_id;
704
 
#ifdef DIAGNOSTIC
705
 
                        if (FLD_ISSET(dbenv->verbose, DB_VERB_REPLICATION))
706
 
                                __db_err(dbenv,
707
 
                        "Got enough votes to win; election done; winner is %d",
708
 
                                    rep->master_id);
709
 
#endif
710
 
                }
711
 
                MUTEX_UNLOCK(dbenv, db_rep->mutexp);
712
 
                if (done) {
713
 
                        R_LOCK(dbenv, &dblp->reginfo);
714
 
                        lsn = lp->lsn;
715
 
                        R_UNLOCK(dbenv, &dblp->reginfo);
716
 
 
717
 
                        /* Declare me the winner. */
718
 
#ifdef DIAGNOSTIC
719
 
                        if (FLD_ISSET(dbenv->verbose, DB_VERB_REPLICATION))
720
 
                                __db_err(dbenv, "I won, sending NEWMASTER");
721
 
#endif
722
 
                        rep->stat.st_elections_won++;
723
 
                        if ((ret = __rep_send_message(dbenv, DB_EID_BROADCAST,
724
 
                            REP_NEWMASTER, &lsn, NULL, 0)) != 0)
725
 
                                break;
726
 
                        return (DB_REP_NEWMASTER);
727
 
                }
728
 
                break;
729
 
        default:
730
 
                __db_err(dbenv,
731
 
        "DB_ENV->rep_process_message: unknown replication message: type %lu",
732
 
                   (u_long)rp->rectype);
733
 
                return (EINVAL);
734
 
        }
735
 
 
736
 
        return (0);
737
 
 
738
 
unlock: MUTEX_UNLOCK(dbenv, db_rep->mutexp);
739
 
        return (ret);
740
 
}
741
 
 
742
 
/*
743
 
 * __rep_apply --
744
 
 *
745
 
 * Handle incoming log records on a client, applying when possible and
746
 
 * entering into the bookkeeping table otherwise.  This is the guts of
747
 
 * the routine that handles the state machine that describes how we
748
 
 * process and manage incoming log records.
749
 
 */
750
 
static int
751
 
__rep_apply(dbenv, rp, rec)
752
 
        DB_ENV *dbenv;
753
 
        REP_CONTROL *rp;
754
 
        DBT *rec;
755
 
{
756
 
        __dbreg_register_args dbreg_args;
757
 
        __txn_ckp_args ckp_args;
758
 
        DB_REP *db_rep;
759
 
        DBT control_dbt, key_dbt, lsn_dbt, nextrec_dbt, rec_dbt;
760
 
        DB *dbp;
761
 
        DBC *dbc;
762
 
        DB_LOG *dblp;
763
 
        DB_LSN ckp_lsn, lsn, newfile_lsn, next_lsn, waiting_lsn;
764
 
        LOG *lp;
765
 
        REP *rep;
766
 
        REP_CONTROL lsn_rc;
767
 
        u_int32_t rectype, txnid;
768
 
        int cmp, do_req, eid, have_mutex, ret, t_ret;
769
 
 
770
 
        db_rep = dbenv->rep_handle;
771
 
        rep = db_rep->region;
772
 
        dbp = db_rep->rep_db;
773
 
        dbc = NULL;
774
 
        have_mutex = ret = 0;
775
 
        memset(&control_dbt, 0, sizeof(control_dbt));
776
 
        memset(&rec_dbt, 0, sizeof(rec_dbt));
777
 
 
778
 
        /*
779
 
         * If this is a log record and it's the next one in line, simply
780
 
         * write it to the log.  If it's a "normal" log record, i.e., not
781
 
         * a COMMIT or CHECKPOINT or something that needs immediate processing,
782
 
         * just return.  If it's a COMMIT, CHECKPOINT or LOG_REGISTER (i.e.,
783
 
         * not SIMPLE), handle it now.  If it's a NEWFILE record, then we
784
 
         * have to be prepared to deal with a logfile change.
785
 
         */
786
 
        dblp = dbenv->lg_handle;
787
 
        R_LOCK(dbenv, &dblp->reginfo);
788
 
        lp = dblp->reginfo.primary;
789
 
        cmp = log_compare(&rp->lsn, &lp->ready_lsn);
790
 
 
791
 
        /*
792
 
         * This is written to assume that you don't end up with a lot of
793
 
         * records after a hole.  That is, it optimizes for the case where
794
 
         * there is only a record or two after a hole.  If you have a lot
795
 
         * of records after a hole, what you'd really want to do is write
796
 
         * all of them and then process all the commits, checkpoints, etc.
797
 
         * together.  That is more complicated processing that we can add
798
 
         * later if necessary.
799
 
         *
800
 
         * That said, I really don't want to do db operations holding the
801
 
         * log mutex, so the synchronization here is tricky.
802
 
         */
803
 
        if (cmp == 0) {
804
 
                /* We got the log record that we are expecting. */
805
 
                if (rp->rectype == REP_NEWFILE) {
806
 
newfile:                ret = __rep_newfile(dbenv, rp, rec, &lp->ready_lsn);
807
 
 
808
 
                        /* Make this evaluate to a simple rectype. */
809
 
                        rectype = 0;
810
 
                } else {
811
 
                        DB_ASSERT(log_compare(&rp->lsn, &lp->lsn) == 0);
812
 
                        ret = __log_rep_put(dbenv, &rp->lsn, rec);
813
 
                        lp->ready_lsn = lp->lsn;
814
 
                        memcpy(&rectype, rec->data, sizeof(rectype));
815
 
                        if (ret == 0)
816
 
                                /*
817
 
                                 * We may miscount if we race, since we
818
 
                                 * don't currently hold the rep mutex.
819
 
                                 */
820
 
                                rep->stat.st_log_records++;
821
 
                }
822
 
                while (ret == 0 && IS_SIMPLE(rectype) &&
823
 
                    log_compare(&lp->ready_lsn, &lp->waiting_lsn) == 0) {
824
 
                        /*
825
 
                         * We just filled in a gap in the log record stream.
826
 
                         * Write subsequent records to the log.
827
 
                         */
828
 
gap_check:              lp->wait_recs = 0;
829
 
                        lp->rcvd_recs = 0;
830
 
                        R_UNLOCK(dbenv, &dblp->reginfo);
831
 
                        if (have_mutex == 0) {
832
 
                                MUTEX_LOCK(dbenv, db_rep->db_mutexp);
833
 
                                have_mutex = 1;
834
 
                        }
835
 
                        if (dbc == NULL &&
836
 
                            (ret = dbp->cursor(dbp, NULL, &dbc, 0)) != 0)
837
 
                                goto err;
838
 
 
839
 
                        /* The DBTs need to persist through another call. */
840
 
                        F_SET(&control_dbt, DB_DBT_REALLOC);
841
 
                        F_SET(&rec_dbt, DB_DBT_REALLOC);
842
 
                        if ((ret = dbc->c_get(dbc,
843
 
                            &control_dbt, &rec_dbt, DB_RMW | DB_FIRST)) != 0)
844
 
                                goto err;
845
 
 
846
 
                        rp = (REP_CONTROL *)control_dbt.data;
847
 
                        rec = &rec_dbt;
848
 
                        memcpy(&rectype, rec->data, sizeof(rectype));
849
 
                        R_LOCK(dbenv, &dblp->reginfo);
850
 
                        /*
851
 
                         * We need to check again, because it's possible that
852
 
                         * some other thread of control changed the waiting_lsn
853
 
                         * or removed that record from the database.
854
 
                         */
855
 
                        if (log_compare(&lp->ready_lsn, &rp->lsn) == 0) {
856
 
                                if (rp->rectype != REP_NEWFILE) {
857
 
                                        DB_ASSERT(log_compare
858
 
                                            (&rp->lsn, &lp->lsn) == 0);
859
 
                                        ret = __log_rep_put(dbenv,
860
 
                                            &rp->lsn, rec);
861
 
                                        lp->ready_lsn = lp->lsn;
862
 
 
863
 
                                        /*
864
 
                                         * We may miscount if we race, since we
865
 
                                         * don't currently hold the rep mutex.
866
 
                                         */
867
 
                                        if (ret == 0)
868
 
                                                rep->stat.st_log_records++;
869
 
                                } else {
870
 
                                        ret = __rep_newfile(dbenv,
871
 
                                            rp, rec, &lp->ready_lsn);
872
 
                                        rectype = 0;
873
 
                                }
874
 
                                waiting_lsn = lp->waiting_lsn;
875
 
                                R_UNLOCK(dbenv, &dblp->reginfo);
876
 
                                if ((ret = dbc->c_del(dbc, 0)) != 0)
877
 
                                        goto err;
878
 
 
879
 
                                /*
880
 
                                 * We may miscount, as we don't hold the rep
881
 
                                 * mutex.
882
 
                                 */
883
 
                                --rep->stat.st_log_queued;
884
 
 
885
 
                                /*
886
 
                                 * Update waiting_lsn.  We need to move it
887
 
                                 * forward to the LSN of the next record
888
 
                                 * in the queue.
889
 
                                 */
890
 
                                memset(&lsn_dbt, 0, sizeof(lsn_dbt));
891
 
                                F_SET(&lsn_dbt, DB_DBT_USERMEM);
892
 
                                lsn_dbt.data = &lsn_rc;
893
 
                                lsn_dbt.ulen = sizeof(lsn_rc);
894
 
                                memset(&lsn_rc, 0, sizeof(lsn_rc));
895
 
 
896
 
                                /*
897
 
                                 * If the next item in the database is a log
898
 
                                 * record--the common case--we're not
899
 
                                 * interested in its contents, just in its LSN.
900
 
                                 * If it's a newfile message, though, the
901
 
                                 * data field may be the LSN of the last
902
 
                                 * record in the old file, and we need to use
903
 
                                 * that to determine whether or not there's
904
 
                                 * a gap.
905
 
                                 *
906
 
                                 * Optimize both these cases by doing a partial
907
 
                                 * get of the data item.  If it's a newfile
908
 
                                 * record, we'll get the whole LSN, and if
909
 
                                 * it's not, we won't waste time allocating.
910
 
                                 */
911
 
                                memset(&nextrec_dbt, 0, sizeof(nextrec_dbt));
912
 
                                F_SET(&nextrec_dbt,
913
 
                                    DB_DBT_USERMEM | DB_DBT_PARTIAL);
914
 
                                nextrec_dbt.ulen =
915
 
                                    nextrec_dbt.dlen = sizeof(newfile_lsn);
916
 
                                ZERO_LSN(newfile_lsn);
917
 
                                nextrec_dbt.data = &newfile_lsn;
918
 
 
919
 
                                ret = dbc->c_get(dbc,
920
 
                                    &lsn_dbt, &nextrec_dbt, DB_NEXT);
921
 
                                if (ret != DB_NOTFOUND && ret != 0)
922
 
                                        goto err;
923
 
 
924
 
                                R_LOCK(dbenv, &dblp->reginfo);
925
 
                                if (ret == DB_NOTFOUND) {
926
 
                                        /*
927
 
                                         * Do a quick double-check to make
928
 
                                         * sure waiting_lsn hasn't changed.
929
 
                                         * It's possible that between the
930
 
                                         * DB_NOTFOUND return and the R_LOCK,
931
 
                                         * some record was added to the
932
 
                                         * database, and we don't want to lose
933
 
                                         * sight of the fact that it's there.
934
 
                                         */
935
 
                                        if (log_compare(&waiting_lsn,
936
 
                                            &lp->waiting_lsn) == 0)
937
 
                                                ZERO_LSN(
938
 
                                                    lp->waiting_lsn);
939
 
 
940
 
                                        /*
941
 
                                         * Whether or not the current record is
942
 
                                         * simple, there's no next one, and
943
 
                                         * therefore we haven't got anything
944
 
                                         * else to do right now.  Break out.
945
 
                                         */
946
 
                                        break;
947
 
                                }
948
 
 
949
 
                                DB_ASSERT(lsn_dbt.size == sizeof(lsn_rc));
950
 
 
951
 
                                /*
952
 
                                 * NEWFILE records have somewhat convoluted
953
 
                                 * semantics, so there are five cases
954
 
                                 * pertaining to what the newly-gotten record
955
 
                                 * is and what we want to do about it.
956
 
                                 *
957
 
                                 * 1) This isn't a NEWFILE record.  Advance
958
 
                                 *    waiting_lsn and proceed.
959
 
                                 *
960
 
                                 * 2) NEWFILE, no LSN stored as the datum,
961
 
                                 *    lsn_rc.lsn == ready_lsn.  The NEWFILE
962
 
                                 *    record is next, so set waiting_lsn =
963
 
                                 *    ready_lsn.
964
 
                                 *
965
 
                                 * 3) NEWFILE, no LSN stored as the datum, but
966
 
                                 *    lsn_rc.lsn > ready_lsn.  There's still a
967
 
                                 *    gap; set waiting_lsn = lsn_rc.lsn.
968
 
                                 *
969
 
                                 * 4) NEWFILE, newfile_lsn in datum, and it's <
970
 
                                 *    ready_lsn. (If the datum is non-empty,
971
 
                                 *    it's the LSN of the last record in a log
972
 
                                 *    file, not the end of the log, and
973
 
                                 *    lsn_rc.lsn is the LSN of the start of
974
 
                                 *    the new file--we didn't have the end of
975
 
                                 *    the old log handy when we sent the
976
 
                                 *    record.)  No gap--we're ready to
977
 
                                 *    proceed.  Set both waiting and ready_lsn
978
 
                                 *    to lsn_rc.lsn.
979
 
                                 *
980
 
                                 * 5) NEWFILE, newfile_lsn in datum, and it's >=
981
 
                                 *    ready_lsn.  We're still missing at
982
 
                                 *    least one record;  set waiting_lsn,
983
 
                                 *    but not ready_lsn, to lsn_rc.lsn.
984
 
                                 */
985
 
                                if (lsn_rc.rectype == REP_NEWFILE &&
986
 
                                    nextrec_dbt.size > 0 && log_compare(
987
 
                                    &newfile_lsn, &lp->ready_lsn) < 0)
988
 
                                        /* Case 4. */
989
 
                                        lp->ready_lsn =
990
 
                                            lp->waiting_lsn = lsn_rc.lsn;
991
 
                                else {
992
 
                                        /* Cases 1, 2, 3, and 5. */
993
 
                                        DB_ASSERT(log_compare(&lsn_rc.lsn,
994
 
                                            &lp->ready_lsn) >= 0);
995
 
                                        lp->waiting_lsn = lsn_rc.lsn;
996
 
                                }
997
 
 
998
 
                                /*
999
 
                                 * If the current rectype is simple, we're
1000
 
                                 * done with it, and we should check and see
1001
 
                                 * whether the next record queued is the next
1002
 
                                 * one we're ready for.  This is just the loop
1003
 
                                 * condition, so we continue.
1004
 
                                 *
1005
 
                                 * Otherwise, we need to break out of this loop
1006
 
                                 * and process this record first.
1007
 
                                 */
1008
 
                                if (!IS_SIMPLE(rectype))
1009
 
                                        break;
1010
 
                        }
1011
 
                }
1012
 
 
1013
 
                /*
1014
 
                 * Check if we're at a gap in the table and if so, whether we
1015
 
                 * need to ask for any records.
1016
 
                 */
1017
 
                do_req = 0;
1018
 
                if (!IS_ZERO_LSN(lp->waiting_lsn) &&
1019
 
                    log_compare(&lp->ready_lsn, &lp->waiting_lsn) != 0) {
1020
 
                        next_lsn = lp->ready_lsn;
1021
 
                        do_req = ++lp->rcvd_recs >= lp->wait_recs;
1022
 
                        if (do_req) {
1023
 
                                lp->wait_recs = rep->request_gap;
1024
 
                                lp->rcvd_recs = 0;
1025
 
                        }
1026
 
                }
1027
 
 
1028
 
                R_UNLOCK(dbenv, &dblp->reginfo);
1029
 
                if (dbc != NULL) {
1030
 
                        if ((ret = dbc->c_close(dbc)) != 0)
1031
 
                                goto err;
1032
 
                        MUTEX_UNLOCK(dbenv, db_rep->db_mutexp);
1033
 
                        have_mutex = 0;
1034
 
                }
1035
 
                dbc = NULL;
1036
 
 
1037
 
                if (do_req) {
1038
 
                        MUTEX_LOCK(dbenv, db_rep->mutexp);
1039
 
                        eid = db_rep->region->master_id;
1040
 
                        MUTEX_UNLOCK(dbenv, db_rep->mutexp);
1041
 
                        if (eid != DB_EID_INVALID) {
1042
 
                                rep->stat.st_log_requested++;
1043
 
                                if ((ret = __rep_send_message(dbenv,
1044
 
                                    eid, REP_LOG_REQ, &next_lsn, NULL, 0)) != 0)
1045
 
                                        goto err;
1046
 
                        }
1047
 
                }
1048
 
        } else if (cmp > 0) {
1049
 
                /*
1050
 
                 * The LSN is higher than the one we were waiting for.
1051
 
                 * If it is a NEWFILE message, this may not mean that
1052
 
                 * there's a gap;  in some cases, NEWFILE messages contain
1053
 
                 * the LSN of the beginning of the new file instead
1054
 
                 * of the end of the old.
1055
 
                 *
1056
 
                 * In these cases, the rec DBT will contain the last LSN
1057
 
                 * of the old file, so we can tell whether there's a gap.
1058
 
                 */
1059
 
                if (rp->rectype == REP_NEWFILE &&
1060
 
                    rp->lsn.file == lp->ready_lsn.file + 1 &&
1061
 
                    rp->lsn.offset == 0) {
1062
 
                        DB_ASSERT(rec != NULL && rec->data != NULL &&
1063
 
                            rec->size == sizeof(DB_LSN));
1064
 
                        memcpy(&lsn, rec->data, sizeof(DB_LSN));
1065
 
                        if (log_compare(&lp->ready_lsn, &lsn) > 0)
1066
 
                                /*
1067
 
                                 * The last LSN in the old file is smaller
1068
 
                                 * than the one we're expecting, so there's
1069
 
                                 * no gap--the one we're expecting just
1070
 
                                 * doesn't exist.
1071
 
                                 */
1072
 
                                goto newfile;
1073
 
                }
1074
 
 
1075
 
                /*
1076
 
                 * This record isn't in sequence; add it to the table and
1077
 
                 * update waiting_lsn if necessary.
1078
 
                 */
1079
 
                memset(&key_dbt, 0, sizeof(key_dbt));
1080
 
                key_dbt.data = rp;
1081
 
                key_dbt.size = sizeof(*rp);
1082
 
                next_lsn = lp->lsn;
1083
 
                do_req = 0;
1084
 
                if (lp->wait_recs == 0) {
1085
 
                        /*
1086
 
                         * This is a new gap. Initialize the number of
1087
 
                         * records that we should wait before requesting
1088
 
                         * that it be resent.  We grab the limits out of
1089
 
                         * the rep without the mutex.
1090
 
                         */
1091
 
                        lp->wait_recs = rep->request_gap;
1092
 
                        lp->rcvd_recs = 0;
1093
 
                }
1094
 
 
1095
 
                if (++lp->rcvd_recs >= lp->wait_recs) {
1096
 
                        /*
1097
 
                         * If we've waited long enough, request the record
1098
 
                         * and double the wait interval.
1099
 
                         */
1100
 
                        do_req = 1;
1101
 
                        lp->wait_recs <<= 1;
1102
 
                        lp->rcvd_recs = 0;
1103
 
                        if (lp->wait_recs > rep->max_gap)
1104
 
                                lp->wait_recs = rep->max_gap;
1105
 
                }
1106
 
                R_UNLOCK(dbenv, &dblp->reginfo);
1107
 
 
1108
 
                MUTEX_LOCK(dbenv, db_rep->db_mutexp);
1109
 
                ret = dbp->put(dbp, NULL, &key_dbt, rec, 0);
1110
 
                rep->stat.st_log_queued++;
1111
 
                rep->stat.st_log_queued_total++;
1112
 
                if (rep->stat.st_log_queued_max < rep->stat.st_log_queued)
1113
 
                        rep->stat.st_log_queued_max = rep->stat.st_log_queued;
1114
 
                MUTEX_UNLOCK(dbenv, db_rep->db_mutexp);
1115
 
 
1116
 
                if (ret != 0)
1117
 
                        return (ret);
1118
 
 
1119
 
                R_LOCK(dbenv, &dblp->reginfo);
1120
 
                if (IS_ZERO_LSN(lp->waiting_lsn) ||
1121
 
                    log_compare(&rp->lsn, &lp->waiting_lsn) < 0)
1122
 
                        lp->waiting_lsn = rp->lsn;
1123
 
                R_UNLOCK(dbenv, &dblp->reginfo);
1124
 
 
1125
 
                if (do_req) {
1126
 
                        /* Request the LSN we are still waiting for. */
1127
 
                        MUTEX_LOCK(dbenv, db_rep->mutexp);
1128
 
 
1129
 
                        /* May as well do this after we grab the mutex. */
1130
 
                        eid = db_rep->region->master_id;
1131
 
 
1132
 
                        /*
1133
 
                         * If the master_id is invalid, this means that since
1134
 
                         * the last record was sent, somebody declared an
1135
 
                         * election and we may not have a master to request
1136
 
                         * things of.
1137
 
                         *
1138
 
                         * This is not an error;  when we find a new master,
1139
 
                         * we'll re-negotiate where the end of the log is and
1140
 
                         * try to to bring ourselves up to date again anyway.
1141
 
                         */
1142
 
                        if (eid != DB_EID_INVALID) {
1143
 
                                rep->stat.st_log_requested++;
1144
 
                                MUTEX_UNLOCK(dbenv, db_rep->mutexp);
1145
 
                                ret = __rep_send_message(dbenv,
1146
 
                                    eid, REP_LOG_REQ, &next_lsn, NULL, 0);
1147
 
                        } else
1148
 
                                MUTEX_UNLOCK(dbenv, db_rep->mutexp);
1149
 
                }
1150
 
                return (ret);
1151
 
        } else {
1152
 
                R_UNLOCK(dbenv, &dblp->reginfo);
1153
 
 
1154
 
                /*
1155
 
                 * We may miscount if we race, since we
1156
 
                 * don't currently hold the rep mutex.
1157
 
                 */
1158
 
                rep->stat.st_log_duplicated++;
1159
 
        }
1160
 
        if (ret != 0 || cmp < 0 || (cmp == 0 && IS_SIMPLE(rectype)))
1161
 
                goto done;
1162
 
 
1163
 
        /*
1164
 
         * If we got here, then we've got a log record in rp and rec that
1165
 
         * we need to process.
1166
 
         */
1167
 
        switch(rectype) {
1168
 
        case DB___dbreg_register:
1169
 
                /*
1170
 
                 * DB opens occur in the context of a transaction, so we can
1171
 
                 * simply handle them when we process the transaction.  Closes,
1172
 
                 * however, are not transaction-protected, so we have to
1173
 
                 * handle them here.
1174
 
                 *
1175
 
                 * Note that it should be unsafe for the master to do a close
1176
 
                 * of a file that was opened in an active transaction, so we
1177
 
                 * should be guaranteed to get the ordering right.
1178
 
                 */
1179
 
                memcpy(&txnid, (u_int8_t *)rec->data +
1180
 
                    ((u_int8_t *)&dbreg_args.txnid - (u_int8_t *)&dbreg_args),
1181
 
                    sizeof(u_int32_t));
1182
 
                if (txnid == TXN_INVALID &&
1183
 
                    !F_ISSET(dbenv, DB_ENV_REP_LOGSONLY))
1184
 
                        ret = __db_dispatch(dbenv, dbenv->recover_dtab,
1185
 
                            dbenv->recover_dtab_size, rec, &rp->lsn,
1186
 
                            DB_TXN_APPLY, NULL);
1187
 
                break;
1188
 
        case DB___txn_ckp:
1189
 
                /* Sync the memory pool. */
1190
 
                memcpy(&ckp_lsn, (u_int8_t *)rec->data +
1191
 
                    ((u_int8_t *)&ckp_args.ckp_lsn - (u_int8_t *)&ckp_args),
1192
 
                    sizeof(DB_LSN));
1193
 
                if (!F_ISSET(dbenv, DB_ENV_REP_LOGSONLY))
1194
 
                        ret = dbenv->memp_sync(dbenv, &ckp_lsn);
1195
 
                else
1196
 
                        /*
1197
 
                         * We ought to make sure the logs on a logs-only
1198
 
                         * replica get flushed now and again.
1199
 
                         */
1200
 
                        ret = dbenv->log_flush(dbenv, &ckp_lsn);
1201
 
                /* Update the last_ckp in the txn region. */
1202
 
                if (ret == 0)
1203
 
                        __txn_updateckp(dbenv, &rp->lsn);
1204
 
                break;
1205
 
        case DB___txn_regop:
1206
 
                if (!F_ISSET(dbenv, DB_ENV_REP_LOGSONLY))
1207
 
                        do {
1208
 
                                /*
1209
 
                                 * If an application is doing app-specific
1210
 
                                 * recovery and acquires locks while applying
1211
 
                                 * a transaction, it can deadlock.  Any other
1212
 
                                 * locks held by this thread should have been
1213
 
                                 * discarded in the __rep_process_txn error
1214
 
                                 * path, so if we simply retry, we should
1215
 
                                 * eventually succeed.
1216
 
                                 */
1217
 
                                ret = __rep_process_txn(dbenv, rec);
1218
 
                        } while (ret == DB_LOCK_DEADLOCK);
1219
 
                break;
1220
 
        default:
1221
 
                goto err;
1222
 
        }
1223
 
 
1224
 
        /* Check if we need to go back into the table. */
1225
 
        if (ret == 0) {
1226
 
                R_LOCK(dbenv, &dblp->reginfo);
1227
 
                if (log_compare(&lp->ready_lsn, &lp->waiting_lsn) == 0)
1228
 
                        goto gap_check;
1229
 
                R_UNLOCK(dbenv, &dblp->reginfo);
1230
 
        }
1231
 
 
1232
 
done:
1233
 
err:    if (dbc != NULL && (t_ret = dbc->c_close(dbc)) != 0 && ret == 0)
1234
 
                ret = t_ret;
1235
 
        if (have_mutex)
1236
 
                MUTEX_UNLOCK(dbenv, db_rep->db_mutexp);
1237
 
 
1238
 
        if (control_dbt.data != NULL)
1239
 
                __os_ufree(dbenv, control_dbt.data);
1240
 
        if (rec_dbt.data != NULL)
1241
 
                __os_ufree(dbenv, rec_dbt.data);
1242
 
 
1243
 
        return (ret);
1244
 
}
1245
 
 
1246
 
/*
1247
 
 * __rep_process_txn --
1248
 
 *
1249
 
 * This is the routine that actually gets a transaction ready for
1250
 
 * processing.
1251
 
 *
1252
 
 * PUBLIC: int __rep_process_txn __P((DB_ENV *, DBT *));
1253
 
 */
1254
 
int
1255
 
__rep_process_txn(dbenv, rec)
1256
 
        DB_ENV *dbenv;
1257
 
        DBT *rec;
1258
 
{
1259
 
        DBT data_dbt;
1260
 
        DB_LOCKREQ req, *lvp;
1261
 
        DB_LOGC *logc;
1262
 
        DB_LSN prev_lsn, *lsnp;
1263
 
        DB_REP *db_rep;
1264
 
        LSN_COLLECTION lc;
1265
 
        REP *rep;
1266
 
        __txn_regop_args *txn_args;
1267
 
        __txn_xa_regop_args *prep_args;
1268
 
        u_int32_t lockid, op, rectype;
1269
 
        int i, ret, t_ret;
1270
 
        int (**dtab)__P((DB_ENV *, DBT *, DB_LSN *, db_recops, void *));
1271
 
        size_t dtabsize;
1272
 
        void *txninfo;
1273
 
 
1274
 
        db_rep = dbenv->rep_handle;
1275
 
        rep = db_rep->region;
1276
 
 
1277
 
        logc = NULL;
1278
 
        txninfo = NULL;
1279
 
        memset(&data_dbt, 0, sizeof(data_dbt));
1280
 
        if (F_ISSET(dbenv, DB_ENV_THREAD))
1281
 
                F_SET(&data_dbt, DB_DBT_REALLOC);
1282
 
 
1283
 
        /*
1284
 
         * There are two phases:  First, we have to traverse
1285
 
         * backwards through the log records gathering the list
1286
 
         * of all LSNs in the transaction.  Once we have this information,
1287
 
         * we can loop through, acquire the locks we need for each record,
1288
 
         * and then apply it.
1289
 
         */
1290
 
        dtab = NULL;
1291
 
 
1292
 
        /*
1293
 
         * We may be passed a prepare (if we're restoring a prepare
1294
 
         * on upgrade) instead of a commit (the common case).
1295
 
         * Check which and behave appropriately.
1296
 
         */
1297
 
        memcpy(&rectype, rec->data, sizeof(rectype));
1298
 
        memset(&lc, 0, sizeof(lc));
1299
 
        if (rectype == DB___txn_regop) {
1300
 
                /*
1301
 
                 * We're the end of a transaction.  Make sure this is
1302
 
                 * really a commit and not an abort!
1303
 
                 */
1304
 
                if ((ret = __txn_regop_read(dbenv, rec->data, &txn_args)) != 0)
1305
 
                        return (ret);
1306
 
                op = txn_args->opcode;
1307
 
                prev_lsn = txn_args->prev_lsn;
1308
 
                __os_free(dbenv, txn_args);
1309
 
                if (op != TXN_COMMIT)
1310
 
                        return (0);
1311
 
        } else {
1312
 
                /* We're a prepare. */
1313
 
                DB_ASSERT(rectype == DB___txn_xa_regop);
1314
 
 
1315
 
                if ((ret =
1316
 
                    __txn_xa_regop_read(dbenv, rec->data, &prep_args)) != 0)
1317
 
                        return (ret);
1318
 
                prev_lsn = prep_args->prev_lsn;
1319
 
                __os_free(dbenv, prep_args);
1320
 
        }
1321
 
 
1322
 
        /* Phase 1.  Get a list of the LSNs in this transaction, and sort it. */
1323
 
        if ((ret = __rep_collect_txn(dbenv, &prev_lsn, &lc)) != 0)
1324
 
                return (ret);
1325
 
        qsort(lc.array, lc.nlsns, sizeof(DB_LSN), __rep_lsn_cmp);
1326
 
 
1327
 
        if ((ret = dbenv->lock_id(dbenv, &lockid)) != 0)
1328
 
                goto err;
1329
 
 
1330
 
        /* Initialize the getpgno dispatch table. */
1331
 
        if ((ret = __rep_lockpgno_init(dbenv, &dtab, &dtabsize)) != 0)
1332
 
                goto err;
1333
 
 
1334
 
        /*
1335
 
         * The set of records for a transaction may include dbreg_register
1336
 
         * records.  Create a txnlist so that they can keep track of file
1337
 
         * state between records.
1338
 
         */
1339
 
        if ((ret = __db_txnlist_init(dbenv, 0, 0, NULL, &txninfo)) != 0)
1340
 
                goto err;
1341
 
 
1342
 
        /* Phase 2: Apply updates. */
1343
 
        if ((ret = dbenv->log_cursor(dbenv, &logc, 0)) != 0)
1344
 
                goto err;
1345
 
        for (lsnp = &lc.array[0], i = 0; i < lc.nlsns; i++, lsnp++) {
1346
 
                if ((ret = __rep_lockpages(dbenv,
1347
 
                    dtab, dtabsize, lsnp, NULL, NULL, lockid)) != 0)
1348
 
                        goto err;
1349
 
                if ((ret = logc->get(logc, lsnp, &data_dbt, DB_SET)) != 0)
1350
 
                        goto err;
1351
 
                if ((ret = __db_dispatch(dbenv, dbenv->recover_dtab,
1352
 
                    dbenv->recover_dtab_size, &data_dbt, lsnp,
1353
 
                    DB_TXN_APPLY, txninfo)) != 0)
1354
 
                        goto err;
1355
 
        }
1356
 
 
1357
 
err:    memset(&req, 0, sizeof(req));
1358
 
        req.op = DB_LOCK_PUT_ALL;
1359
 
        if ((t_ret = dbenv->lock_vec(dbenv, lockid,
1360
 
            DB_LOCK_FREE_LOCKER, &req, 1, &lvp)) != 0 && ret == 0)
1361
 
                ret = t_ret;
1362
 
 
1363
 
        if (lc.nalloc != 0)
1364
 
                __os_free(dbenv, lc.array);
1365
 
 
1366
 
        if ((t_ret =
1367
 
            dbenv->lock_id_free(dbenv, lockid)) != 0 && ret == 0)
1368
 
                ret = t_ret;
1369
 
 
1370
 
        if (logc != NULL && (t_ret = logc->close(logc, 0)) != 0 && ret == 0)
1371
 
                ret = t_ret;
1372
 
 
1373
 
        if (txninfo != NULL)
1374
 
                __db_txnlist_end(dbenv, txninfo);
1375
 
 
1376
 
        if (F_ISSET(&data_dbt, DB_DBT_REALLOC) && data_dbt.data != NULL)
1377
 
                __os_ufree(dbenv, data_dbt.data);
1378
 
 
1379
 
        if (dtab != NULL)
1380
 
                __os_free(dbenv, dtab);
1381
 
 
1382
 
        if (ret == 0)
1383
 
                /*
1384
 
                 * We don't hold the rep mutex, and could miscount if we race.
1385
 
                 */
1386
 
                rep->stat.st_txns_applied++;
1387
 
 
1388
 
        return (ret);
1389
 
}
1390
 
 
1391
 
/*
1392
 
 * __rep_collect_txn
1393
 
 *      Recursive function that will let us visit every entry in a transaction
1394
 
 *      chain including all child transactions so that we can then apply
1395
 
 *      the entire transaction family at once.
1396
 
 */
1397
 
static int
1398
 
__rep_collect_txn(dbenv, lsnp, lc)
1399
 
        DB_ENV *dbenv;
1400
 
        DB_LSN *lsnp;
1401
 
        LSN_COLLECTION *lc;
1402
 
{
1403
 
        __txn_child_args *argp;
1404
 
        DB_LOGC *logc;
1405
 
        DB_LSN c_lsn;
1406
 
        DBT data;
1407
 
        u_int32_t rectype;
1408
 
        int nalloc, ret, t_ret;
1409
 
 
1410
 
        memset(&data, 0, sizeof(data));
1411
 
        F_SET(&data, DB_DBT_REALLOC);
1412
 
 
1413
 
        if ((ret = dbenv->log_cursor(dbenv, &logc, 0)) != 0)
1414
 
                return (ret);
1415
 
 
1416
 
        while (!IS_ZERO_LSN(*lsnp) &&
1417
 
            (ret = logc->get(logc, lsnp, &data, DB_SET)) == 0) {
1418
 
                memcpy(&rectype, data.data, sizeof(rectype));
1419
 
                if (rectype == DB___txn_child) {
1420
 
                        if ((ret = __txn_child_read(dbenv,
1421
 
                            data.data, &argp)) != 0)
1422
 
                                goto err;
1423
 
                        c_lsn = argp->c_lsn;
1424
 
                        *lsnp = argp->prev_lsn;
1425
 
                        __os_free(dbenv, argp);
1426
 
                        ret = __rep_collect_txn(dbenv, &c_lsn, lc);
1427
 
                } else {
1428
 
                        if (lc->nalloc < lc->nlsns + 1) {
1429
 
                                nalloc = lc->nalloc == 0 ? 20 : lc->nalloc * 2;
1430
 
                                if ((ret = __os_realloc(dbenv,
1431
 
                                    nalloc * sizeof(DB_LSN), &lc->array)) != 0)
1432
 
                                        goto err;
1433
 
                                lc->nalloc = nalloc;
1434
 
                        }
1435
 
                        lc->array[lc->nlsns++] = *lsnp;
1436
 
 
1437
 
                        /*
1438
 
                         * Explicitly copy the previous lsn.  The record
1439
 
                         * starts with a u_int32_t record type, a u_int32_t
1440
 
                         * txn id, and then the DB_LSN (prev_lsn) that we
1441
 
                         * want.  We copy explicitly because we have no idea
1442
 
                         * what kind of record this is.
1443
 
                         */
1444
 
                        memcpy(lsnp, (u_int8_t *)data.data +
1445
 
                            sizeof(u_int32_t) + sizeof(u_int32_t),
1446
 
                            sizeof(DB_LSN));
1447
 
                }
1448
 
 
1449
 
                if (ret != 0)
1450
 
                        goto err;
1451
 
        }
1452
 
 
1453
 
err:    if ((t_ret = logc->close(logc, 0)) != 0 && ret == 0)
1454
 
                ret = t_ret;
1455
 
        if (data.data != NULL)
1456
 
                __os_ufree(dbenv, data.data);
1457
 
        return (ret);
1458
 
}
1459
 
 
1460
 
/*
1461
 
 * __rep_lsn_cmp --
1462
 
 *      qsort-type-compatible wrapper for log_compare.
1463
 
 */
1464
 
static int
1465
 
__rep_lsn_cmp(lsn1, lsn2)
1466
 
        const void *lsn1, *lsn2;
1467
 
{
1468
 
 
1469
 
        return (log_compare((DB_LSN *)lsn1, (DB_LSN *)lsn2));
1470
 
}
1471
 
 
1472
 
/*
1473
 
 * __rep_newfile --
1474
 
 *      NEWFILE messages can contain either the last LSN of the old file
1475
 
 * or the first LSN of the new one, depending on which we have available
1476
 
 * when the message is sent.  When applying a NEWFILE message, make sure
1477
 
 * we haven't already swapped files, as it's possible (given the right sequence
1478
 
 * of out-of-order messages) to wind up with a NEWFILE message of each
1479
 
 * variety, and __rep_apply won't detect the two as duplicates of each other.
1480
 
 */
1481
 
static int
1482
 
__rep_newfile(dbenv, rc, msgdbt, lsnp)
1483
 
        DB_ENV *dbenv;
1484
 
        REP_CONTROL *rc;
1485
 
        DBT *msgdbt;
1486
 
        DB_LSN *lsnp;
1487
 
{
1488
 
        DB_LOG *dblp;
1489
 
        LOG *lp;
1490
 
        u_int32_t newfile;
1491
 
 
1492
 
        dblp = dbenv->lg_handle;
1493
 
        lp = dblp->reginfo.primary;
1494
 
 
1495
 
        /*
1496
 
         * A NEWFILE message containing the old file's LSN will be
1497
 
         * accompanied by a NULL rec DBT;  one containing the new one's LSN
1498
 
         * will need to supply the last record in the old file by
1499
 
         * sending it in the rec DBT.
1500
 
         */
1501
 
        if (msgdbt == NULL || msgdbt->size == 0)
1502
 
                newfile = rc->lsn.file + 1;
1503
 
        else
1504
 
                newfile = rc->lsn.file;
1505
 
 
1506
 
        if (newfile > lp->lsn.file)
1507
 
                return (__log_newfile(dblp, lsnp));
1508
 
        else {
1509
 
                /* We've already applied this NEWFILE.  Just ignore it. */
1510
 
                *lsnp = lp->lsn;
1511
 
                return (0);
1512
 
        }
1513
 
}