2
* See the file LICENSE for redistribution information.
4
* Copyright (c) 2001-2002
5
* Sleepycat Software. All rights reserved.
11
static const char revid[] = "$Id$";
14
#ifndef NO_SYSTEM_INCLUDES
20
#include "dbinc/db_page.h"
21
#include "dbinc/btree.h"
22
#include "dbinc/fop.h"
23
#include "dbinc/hash.h"
24
#include "dbinc/log.h"
25
#include "dbinc/qam.h"
26
#include "dbinc/rep.h"
27
#include "dbinc/txn.h"
31
* Miscellaneous replication-related utility functions, including
32
* those called by other subsystems.
34
static int __rep_cmp_bylsn __P((const void *, const void *));
35
static int __rep_cmp_bypage __P((const void *, const void *));
38
static void __rep_print_logmsg __P((DB_ENV *, const DBT *, DB_LSN *));
42
* __rep_check_alloc --
43
* Make sure the array of TXN_REC entries is of at least size n.
44
* (This function is called by the __*_getpgnos() functions in
47
* PUBLIC: int __rep_check_alloc __P((DB_ENV *, TXN_RECS *, int));
50
__rep_check_alloc(dbenv, r, n)
57
while (r->nalloc < r->npages + n) {
58
nalloc = r->nalloc == 0 ? 20 : r->nalloc * 2;
60
if ((ret = __os_realloc(dbenv, nalloc * sizeof(LSN_PAGE),
71
* __rep_send_message --
72
* This is a wrapper for sending a message. It takes care of constructing
73
* the REP_CONTROL structure and calling the user's specified send function.
75
* PUBLIC: int __rep_send_message __P((DB_ENV *, int,
76
* PUBLIC: u_int32_t, DB_LSN *, const DBT *, u_int32_t));
79
__rep_send_message(dbenv, eid, rtype, lsnp, dbtp, flags)
94
db_rep = dbenv->rep_handle;
97
/* Set up control structure. */
98
memset(&cntrl, 0, sizeof(cntrl));
103
cntrl.rectype = rtype;
105
cntrl.rep_version = DB_REPVERSION;
106
cntrl.log_version = DB_LOGVERSION;
107
MUTEX_LOCK(dbenv, db_rep->mutexp);
108
cntrl.gen = rep->gen;
109
MUTEX_UNLOCK(dbenv, db_rep->mutexp);
111
memset(&cdbt, 0, sizeof(cdbt));
113
cdbt.size = sizeof(cntrl);
115
/* Don't assume the send function will be tolerant of NULL records. */
117
memset(&scrap_dbt, 0, sizeof(DBT));
121
send_flags = (LF_ISSET(DB_PERMANENT) ? DB_REP_PERMANENT : 0);
124
__rep_print_message(dbenv, eid, &cntrl, "rep_send_message");
126
#ifdef REP_DIAGNOSTIC
127
if (rtype == REP_LOG)
128
__rep_print_logmsg(dbenv, dbtp, lsnp);
130
ret = db_rep->rep_send(dbenv, &cdbt, dbtp, eid, send_flags);
133
* We don't hold the rep lock, so this could miscount if we race.
134
* I don't think it's worth grabbing the mutex for that bit of
138
rep->stat.st_msgs_sent++;
140
rep->stat.st_msgs_send_failures++;
145
#ifdef REP_DIAGNOSTIC
148
* __rep_print_logmsg --
149
* This is a debugging routine for printing out log records that
150
* we are about to transmit to a client.
154
__rep_print_logmsg(dbenv, logdbt, lsnp)
159
/* Static structures to hold the printing functions. */
160
static int (**ptab)__P((DB_ENV *,
161
DBT *, DB_LSN *, db_recops, void *)) = NULL;
165
/* Initialize the table. */
166
(void)__bam_init_print(dbenv, &ptab, &ptabsize);
167
(void)__crdel_init_print(dbenv, &ptab, &ptabsize);
168
(void)__db_init_print(dbenv, &ptab, &ptabsize);
169
(void)__dbreg_init_print(dbenv, &ptab, &ptabsize);
170
(void)__fop_init_print(dbenv, &ptab, &ptabsize);
171
(void)__qam_init_print(dbenv, &ptab, &ptabsize);
172
(void)__ham_init_print(dbenv, &ptab, &ptabsize);
173
(void)__txn_init_print(dbenv, &ptab, &ptabsize);
176
(void)__db_dispatch(dbenv,
177
ptab, ptabsize, (DBT *)logdbt, lsnp, DB_TXN_PRINT, NULL);
182
* __rep_new_master --
183
* Called after a master election to sync back up with a new master.
184
* It's possible that we already know of this new master in which case
185
* we don't need to do anything.
187
* This is written assuming that this message came from the master; we
188
* need to enforce that in __rep_process_record, but right now, we have
189
* no way to identify the master.
191
* PUBLIC: int __rep_new_master __P((DB_ENV *, REP_CONTROL *, int));
194
__rep_new_master(dbenv, cntrl, eid)
201
DB_LSN last_lsn, lsn;
206
int change, ret, t_ret;
208
db_rep = dbenv->rep_handle;
209
rep = db_rep->region;
210
MUTEX_LOCK(dbenv, db_rep->mutexp);
212
change = rep->gen != cntrl->gen || rep->master_id != eid;
214
rep->gen = cntrl->gen;
215
rep->master_id = eid;
216
F_SET(rep, REP_F_RECOVER);
217
rep->stat.st_master_changes++;
219
MUTEX_UNLOCK(dbenv, db_rep->mutexp);
225
* If the master changed, we need to start the process of
226
* figuring out what our last valid log record is. However,
227
* if both the master and we agree that the max LSN is 0,0,
228
* then there is no recovery to be done. If we are at 0 and
229
* the master is not, then we just need to request all the log
230
* records from the master.
232
dblp = dbenv->lg_handle;
233
lp = dblp->reginfo.primary;
234
R_LOCK(dbenv, &dblp->reginfo);
235
last_lsn = lsn = lp->lsn;
236
if (last_lsn.offset > sizeof(LOGP))
237
last_lsn.offset -= lp->len;
238
R_UNLOCK(dbenv, &dblp->reginfo);
239
if (IS_INIT_LSN(lsn) || IS_ZERO_LSN(lsn)) {
240
empty: MUTEX_LOCK(dbenv, db_rep->mutexp);
241
F_CLR(rep, REP_F_RECOVER);
242
MUTEX_UNLOCK(dbenv, db_rep->mutexp);
244
if (IS_INIT_LSN(cntrl->lsn))
247
ret = __rep_send_message(dbenv, rep->master_id,
248
REP_ALL_REQ, &lsn, NULL, 0);
251
ret = DB_REP_NEWMASTER;
253
} else if (last_lsn.offset <= sizeof(LOGP)) {
255
* We have just changed log files and need to set lastlsn
256
* to the last record in the previous log files.
258
if ((ret = dbenv->log_cursor(dbenv, &logc, 0)) != 0)
260
memset(&dbt, 0, sizeof(dbt));
261
ret = logc->get(logc, &last_lsn, &dbt, DB_LAST);
262
if ((t_ret = logc->close(logc, 0)) != 0 && ret == 0)
264
if (ret == DB_NOTFOUND)
270
R_LOCK(dbenv, &dblp->reginfo);
271
lp->verify_lsn = last_lsn;
272
R_UNLOCK(dbenv, &dblp->reginfo);
273
if ((ret = __rep_send_message(dbenv,
274
eid, REP_VERIFY_REQ, &last_lsn, NULL, 0)) != 0)
277
return (DB_REP_NEWMASTER);
281
* __rep_lockpgno_init
282
* Create a dispatch table for acquiring locks on each log record.
284
* PUBLIC: int __rep_lockpgno_init __P((DB_ENV *,
285
* PUBLIC: int (***)(DB_ENV *, DBT *, DB_LSN *, db_recops, void *),
286
* PUBLIC: size_t *));
289
__rep_lockpgno_init(dbenv, dtabp, dtabsizep)
291
int (***dtabp)__P((DB_ENV *, DBT *, DB_LSN *, db_recops, void *));
296
/* Initialize dispatch table. */
299
if ((ret = __bam_init_getpgnos(dbenv, dtabp, dtabsizep)) != 0 ||
300
(ret = __crdel_init_getpgnos(dbenv, dtabp, dtabsizep)) != 0 ||
301
(ret = __db_init_getpgnos(dbenv, dtabp, dtabsizep)) != 0 ||
302
(ret = __dbreg_init_getpgnos(dbenv, dtabp, dtabsizep)) != 0 ||
303
(ret = __fop_init_getpgnos(dbenv, dtabp, dtabsizep)) != 0 ||
304
(ret = __qam_init_getpgnos(dbenv, dtabp, dtabsizep)) != 0 ||
305
(ret = __ham_init_getpgnos(dbenv, dtabp, dtabsizep)) != 0 ||
306
(ret = __txn_init_getpgnos(dbenv, dtabp, dtabsizep)) != 0)
313
* __rep_unlockpages --
314
* Unlock the pages locked in __rep_lockpages.
316
* PUBLIC: int __rep_unlockpages __P((DB_ENV *, u_int32_t));
319
__rep_unlockpages(dbenv, lid)
323
DB_LOCKREQ req, *lvp;
325
req.op = DB_LOCK_PUT_ALL;
326
return (dbenv->lock_vec(dbenv, lid, 0, &req, 1, &lvp));
331
* Called to gather and lock pages in preparation for both
332
* single transaction apply as well as client synchronization
333
* with a new master. A non-NULL key_lsn means that we're locking
334
* in order to apply a single log record during client recovery
335
* to the joint LSN. A non-NULL max_lsn means that we are applying
336
* a transaction whose commit is at max_lsn.
338
* PUBLIC: int __rep_lockpages __P((DB_ENV *,
339
* PUBLIC: int (**)(DB_ENV *, DBT *, DB_LSN *, db_recops, void *),
340
* PUBLIC: size_t, DB_LSN *, DB_LSN *, TXN_RECS *, u_int32_t));
343
__rep_lockpages(dbenv, dtab, dtabsize, key_lsn, max_lsn, recs, lid)
345
int (**dtab)__P((DB_ENV *, DBT *, DB_LSN *, db_recops, void *));
347
DB_LSN *key_lsn, *max_lsn;
359
int i, ret, t_ret, unique;
363
* There are two phases: First, we have to traverse backwards through
364
* the log records gathering the list of all the pages accessed. Once
365
* we have this information we can acquire all the locks we need.
369
memset(&locks, 0, sizeof(locks));
372
t = recs != NULL ? recs : &tmp;
373
t->npages = t->nalloc = 0;
377
* We've got to be in one mode or the other; else life will either
378
* be excessively boring or overly exciting.
380
DB_ASSERT(key_lsn != NULL || max_lsn != NULL);
381
DB_ASSERT(key_lsn == NULL || max_lsn == NULL);
384
* Phase 1: Fill in the pgno array.
386
memset(&data_dbt, 0, sizeof(data_dbt));
387
if (F_ISSET(dbenv, DB_ENV_THREAD))
388
F_SET(&data_dbt, DB_DBT_REALLOC);
390
/* Single transaction apply. */
391
if (max_lsn != NULL) {
392
DB_ASSERT(0); /* XXX */
395
if ((ret = __rep_apply_thread(dbenv, dtab, dtabsize,
396
&data_dbt, &tmp_lsn, t)) != 0)
402
if (key_lsn != NULL) {
403
if ((ret = dbenv->log_cursor(dbenv, &logc, 0)) != 0)
405
ret = logc->get(logc, key_lsn, &data_dbt, DB_SET);
407
/* Save lsn values, since dispatch functions can change them. */
409
ret = __db_dispatch(dbenv,
410
dtab, dtabsize, &data_dbt, &tmp_lsn, DB_TXN_GETPGNOS, t);
412
if ((t_ret = logc->close(logc, 0)) != 0 && ret == 0)
416
* If ret == DB_DELETED, this record refers to a temporary
417
* file and there's nothing to apply.
419
if (ret == DB_DELETED) {
429
/* Phase 2: Write lock all the pages. */
431
/* Sort the entries in the array by page number. */
432
qsort(t->array, t->npages, sizeof(LSN_PAGE), __rep_cmp_bypage);
434
/* Count the number of unique pages. */
435
cur_fid = DB_LOGFILEID_INVALID;
436
cur_pgno = PGNO_INVALID;
438
for (i = 0; i < t->npages; i++) {
439
if (F_ISSET(&t->array[i], LSN_PAGE_NOLOCK))
441
if (t->array[i].pgdesc.pgno != cur_pgno ||
442
t->array[i].fid != cur_fid) {
443
cur_pgno = t->array[i].pgdesc.pgno;
444
cur_fid = t->array[i].fid;
452
/* Handle single lock case specially, else allocate space for locks. */
454
memset(&lo, 0, sizeof(lo));
455
lo.data = &t->array[0].pgdesc;
456
lo.size = sizeof(t->array[0].pgdesc);
457
ret = dbenv->lock_get(dbenv, lid, 0, &lo, DB_LOCK_WRITE, &l);
461
/* Multi-lock case. */
463
if ((ret = __os_calloc(dbenv,
464
unique, sizeof(DB_LOCKREQ), &locks.reqs)) != 0)
466
if ((ret = __os_calloc(dbenv, unique, sizeof(DBT), &locks.objs)) != 0)
470
cur_fid = DB_LOGFILEID_INVALID;
471
cur_pgno = PGNO_INVALID;
472
for (i = 0; i < t->npages; i++) {
473
if (F_ISSET(&t->array[i], LSN_PAGE_NOLOCK))
475
if (t->array[i].pgdesc.pgno != cur_pgno ||
476
t->array[i].fid != cur_fid) {
477
cur_pgno = t->array[i].pgdesc.pgno;
478
cur_fid = t->array[i].fid;
479
locks.reqs[unique].op = DB_LOCK_GET;
480
locks.reqs[unique].mode = DB_LOCK_WRITE;
481
locks.reqs[unique].obj = &locks.objs[unique];
482
locks.objs[unique].data = &t->array[i].pgdesc;
483
locks.objs[unique].size = sizeof(t->array[i].pgdesc);
488
/* Finally, get the locks. */
490
dbenv->lock_vec(dbenv, lid, 0, locks.reqs, unique, &lvp)) != 0) {
492
* If we were unsuccessful, unlock any locks we acquired before
493
* the error and return the original error value.
495
(void)__rep_unlockpages(dbenv, lid);
499
out: if (locks.objs != NULL)
500
__os_free(dbenv, locks.objs);
501
if (locks.reqs != NULL)
502
__os_free(dbenv, locks.reqs);
505
* Before we return, sort by LSN so that we apply records in the
508
qsort(t->array, t->npages, sizeof(LSN_PAGE), __rep_cmp_bylsn);
510
out2: if ((ret != 0 || recs == NULL) && t->nalloc != 0) {
511
__os_free(dbenv, t->array);
513
t->npages = t->nalloc = 0;
516
if (F_ISSET(&data_dbt, DB_DBT_REALLOC) && data_dbt.data != NULL)
517
__os_ufree(dbenv, data_dbt.data);
523
* __rep_cmp_bypage and __rep_cmp_bylsn --
524
* Sort functions for qsort. "bypage" sorts first by page numbers and
525
* then by the LSN. "bylsn" sorts first by the LSN, then by page numbers.
528
__rep_cmp_bypage(a, b)
536
if (ap->fid < bp->fid)
539
if (ap->fid > bp->fid)
542
if (ap->pgdesc.pgno < bp->pgdesc.pgno)
545
if (ap->pgdesc.pgno > bp->pgdesc.pgno)
548
if (ap->lsn.file < bp->lsn.file)
551
if (ap->lsn.file > bp->lsn.file)
554
if (ap->lsn.offset < bp->lsn.offset)
557
if (ap->lsn.offset > bp->lsn.offset)
564
__rep_cmp_bylsn(a, b)
572
if (ap->lsn.file < bp->lsn.file)
575
if (ap->lsn.file > bp->lsn.file)
578
if (ap->lsn.offset < bp->lsn.offset)
581
if (ap->lsn.offset > bp->lsn.offset)
584
if (ap->fid < bp->fid)
587
if (ap->fid > bp->fid)
590
if (ap->pgdesc.pgno < bp->pgdesc.pgno)
593
if (ap->pgdesc.pgno > bp->pgdesc.pgno)
601
* Used by other subsystems to figure out if this is a replication
604
* PUBLIC: int __rep_is_client __P((DB_ENV *));
607
__rep_is_client(dbenv)
614
if ((db_rep = dbenv->rep_handle) == NULL)
616
rep = db_rep->region;
618
MUTEX_LOCK(dbenv, db_rep->mutexp);
619
ret = F_ISSET(rep, REP_F_UPGRADE | REP_F_LOGSONLY);
620
MUTEX_UNLOCK(dbenv, db_rep->mutexp);
626
* Send this site's vote for the election.
628
* PUBLIC: int __rep_send_vote __P((DB_ENV *, DB_LSN *, int, int, int));
631
__rep_send_vote(dbenv, lsnp, nsites, pri, tiebreaker)
634
int nsites, pri, tiebreaker;
639
memset(&vi, 0, sizeof(vi));
643
vi.tiebreaker = tiebreaker;
645
memset(&vote_dbt, 0, sizeof(vote_dbt));
647
vote_dbt.size = sizeof(vi);
649
return (__rep_send_message(dbenv,
650
DB_EID_BROADCAST, REP_VOTE1, lsnp, &vote_dbt, 0));
654
* __rep_grow_sites --
655
* Called to allocate more space in the election tally information.
656
* Called with the rep mutex held. We need to call the region mutex, so
657
* we need to make sure that we *never* acquire those mutexes in the
660
* PUBLIC: int __rep_grow_sites __P((DB_ENV *dbenv, int nsites));
663
__rep_grow_sites(dbenv, nsites)
670
int nalloc, ret, *tally;
672
rep = ((DB_REP *)dbenv->rep_handle)->region;
675
* Allocate either twice the current allocation or nsites,
679
nalloc = 2 * rep->asites;
683
infop = dbenv->reginfo;
684
renv = infop->primary;
685
MUTEX_LOCK(dbenv, &renv->mutex);
686
if ((ret = __db_shalloc(infop->addr,
687
sizeof(nalloc * sizeof(int)), sizeof(int), &tally)) == 0) {
688
if (rep->tally_off != INVALID_ROFF)
689
__db_shalloc_free(infop->addr,
690
R_ADDR(infop, rep->tally_off));
691
rep->asites = nalloc;
692
rep->nsites = nsites;
693
rep->tally_off = R_OFFSET(infop, tally);
695
MUTEX_UNLOCK(dbenv, &renv->mutex);
700
static int __rep_send_file __P((DB_ENV *, DBT *, u_int32_t));
703
* Send an entire file, one block at a time.
706
__rep_send_file(dbenv, rec, eid)
717
db_pgno_t last_pgno, pgno;
726
if ((ret = db_create(&dbp, dbenv, 0)) != 0)
729
if ((ret = dbp->open(dbp, rec->data, NULL, DB_UNKNOWN, 0, 0)) != 0)
732
if ((ret = dbp->cursor(dbp, NULL, &dbc, 0)) != 0)
735
* Force last_pgno to some value that will let us read the meta-dat
736
* page in the following loop.
738
memset(&rec_dbt, 0, sizeof(rec_dbt));
740
for (pgno = 0; pgno <= last_pgno; pgno++) {
741
if ((ret = __db_lget(dbc, 0, pgno, DB_LOCK_READ, 0, &lk)) != 0)
744
if ((ret = mpf->get(mpf, &pgno, 0, &pagep)) != 0)
748
last_pgno = ((DBMETA *)pagep)->last_pgno;
750
rec_dbt.data = pagep;
751
rec_dbt.size = dbp->pgsize;
752
if ((ret = __rep_send_message(dbenv, eid,
753
REP_FILE, NULL, &rec_dbt, pgno == last_pgno)) != 0)
755
ret = mpf->put(mpf, pagep, 0);
759
ret = __LPUT(dbc, lk);
765
err: if (LOCK_ISSET(lk) && (t_ret = __LPUT(dbc, lk)) != 0 && ret == 0)
767
if (dbc != NULL && (t_ret = dbc->c_close(dbc)) != 0 && ret == 0)
769
if (pagep != NULL && (t_ret = mpf->put(mpf, pagep, 0)) != 0 && ret == 0)
771
if (dbp != NULL && (t_ret = dbp->close(dbp, 0)) != 0 && ret == 0)
779
* PUBLIC: void __rep_print_message __P((DB_ENV *, int, REP_CONTROL *, char *));
782
__rep_print_message(dbenv, eid, rp, str)
789
switch (rp->rectype) {
847
case REP_VERIFY_FAIL:
848
type = "verify_fail";
863
printf("%s %s: gen = %d eid %d, type %s, LSN [%u][%u]\n",
864
dbenv->db_home, str, rp->gen, eid, type, rp->lsn.file,