1
/*-------------------------------------------------------------------------
4
* WAL replay logic for GiST.
7
* Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
8
* Portions Copyright (c) 1994, Regents of the University of California
11
* src/backend/access/gist/gistxlog.c
12
*-------------------------------------------------------------------------
16
#include "access/gist_private.h"
17
#include "access/xlogutils.h"
18
#include "miscadmin.h"
19
#include "storage/bufmgr.h"
20
#include "utils/memutils.h"
21
#include "utils/rel.h"
31
gistxlogPageSplit *data;
35
static MemoryContext opCtx; /* working memory for operations */
38
* Replay the clearing of F_FOLLOW_RIGHT flag.
41
gistRedoClearFollowRight(RelFileNode node, XLogRecPtr lsn,
42
BlockNumber leftblkno)
46
buffer = XLogReadBuffer(node, leftblkno, false);
47
if (BufferIsValid(buffer))
49
Page page = (Page) BufferGetPage(buffer);
52
* Note that we still update the page even if page LSN is equal to the
53
* LSN of this record, because the updated NSN is not included in the
56
if (!XLByteLT(lsn, PageGetLSN(page)))
58
GistPageGetOpaque(page)->nsn = lsn;
59
GistClearFollowRight(page);
61
PageSetLSN(page, lsn);
62
PageSetTLI(page, ThisTimeLineID);
63
MarkBufferDirty(buffer);
65
UnlockReleaseBuffer(buffer);
70
* redo any page update (except page split)
73
gistRedoPageUpdateRecord(XLogRecPtr lsn, XLogRecord *record)
75
char *begin = XLogRecGetData(record);
76
gistxlogPageUpdate *xldata = (gistxlogPageUpdate *) begin;
81
if (BlockNumberIsValid(xldata->leftchild))
82
gistRedoClearFollowRight(xldata->node, lsn, xldata->leftchild);
84
/* nothing more to do if page was backed up (and no info to do it with) */
85
if (record->xl_info & XLR_BKP_BLOCK_1)
88
buffer = XLogReadBuffer(xldata->node, xldata->blkno, false);
89
if (!BufferIsValid(buffer))
91
page = (Page) BufferGetPage(buffer);
93
if (XLByteLE(lsn, PageGetLSN(page)))
95
UnlockReleaseBuffer(buffer);
99
data = begin + sizeof(gistxlogPageUpdate);
101
/* Delete old tuples */
102
if (xldata->ntodelete > 0)
105
OffsetNumber *todelete = (OffsetNumber *) data;
107
data += sizeof(OffsetNumber) * xldata->ntodelete;
109
for (i = 0; i < xldata->ntodelete; i++)
110
PageIndexTupleDelete(page, todelete[i]);
111
if (GistPageIsLeaf(page))
112
GistMarkTuplesDeleted(page);
116
if (data - begin < record->xl_len)
118
OffsetNumber off = (PageIsEmpty(page)) ? FirstOffsetNumber :
119
OffsetNumberNext(PageGetMaxOffsetNumber(page));
121
while (data - begin < record->xl_len)
123
IndexTuple itup = (IndexTuple) data;
124
Size sz = IndexTupleSize(itup);
129
l = PageAddItem(page, (Item) itup, sz, off, false, false);
130
if (l == InvalidOffsetNumber)
131
elog(ERROR, "failed to add item to GiST index page, size %d bytes",
139
* special case: leafpage, nothing to insert, nothing to delete, then
142
if (GistPageIsLeaf(page) && xldata->ntodelete == 0)
143
GistClearTuplesDeleted(page);
146
if (!GistPageIsLeaf(page) && PageGetMaxOffsetNumber(page) == InvalidOffsetNumber && xldata->blkno == GIST_ROOT_BLKNO)
149
* all links on non-leaf root page was deleted by vacuum full, so root
150
* page becomes a leaf
152
GistPageSetLeaf(page);
154
GistPageGetOpaque(page)->rightlink = InvalidBlockNumber;
155
PageSetLSN(page, lsn);
156
PageSetTLI(page, ThisTimeLineID);
157
MarkBufferDirty(buffer);
158
UnlockReleaseBuffer(buffer);
162
gistRedoPageDeleteRecord(XLogRecPtr lsn, XLogRecord *record)
164
gistxlogPageDelete *xldata = (gistxlogPageDelete *) XLogRecGetData(record);
168
/* nothing else to do if page was backed up (and no info to do it with) */
169
if (record->xl_info & XLR_BKP_BLOCK_1)
172
buffer = XLogReadBuffer(xldata->node, xldata->blkno, false);
173
if (!BufferIsValid(buffer))
176
page = (Page) BufferGetPage(buffer);
177
GistPageSetDeleted(page);
179
PageSetLSN(page, lsn);
180
PageSetTLI(page, ThisTimeLineID);
181
MarkBufferDirty(buffer);
182
UnlockReleaseBuffer(buffer);
186
decodePageSplitRecord(PageSplitRecord *decoded, XLogRecord *record)
188
char *begin = XLogRecGetData(record),
193
decoded->data = (gistxlogPageSplit *) begin;
194
decoded->page = (NewPage *) palloc(sizeof(NewPage) * decoded->data->npage);
196
ptr = begin + sizeof(gistxlogPageSplit);
197
for (i = 0; i < decoded->data->npage; i++)
199
Assert(ptr - begin < record->xl_len);
200
decoded->page[i].header = (gistxlogPage *) ptr;
201
ptr += sizeof(gistxlogPage);
203
decoded->page[i].itup = (IndexTuple *)
204
palloc(sizeof(IndexTuple) * decoded->page[i].header->num);
206
while (j < decoded->page[i].header->num)
208
Assert(ptr - begin < record->xl_len);
209
decoded->page[i].itup[j] = (IndexTuple) ptr;
210
ptr += IndexTupleSize((IndexTuple) ptr);
217
gistRedoPageSplitRecord(XLogRecPtr lsn, XLogRecord *record)
219
gistxlogPageSplit *xldata = (gistxlogPageSplit *) XLogRecGetData(record);
220
PageSplitRecord xlrec;
224
bool isrootsplit = false;
226
if (BlockNumberIsValid(xldata->leftchild))
227
gistRedoClearFollowRight(xldata->node, lsn, xldata->leftchild);
228
decodePageSplitRecord(&xlrec, record);
230
/* loop around all pages */
231
for (i = 0; i < xlrec.data->npage; i++)
233
NewPage *newpage = xlrec.page + i;
236
if (newpage->header->blkno == GIST_ROOT_BLKNO)
242
buffer = XLogReadBuffer(xlrec.data->node, newpage->header->blkno, true);
243
Assert(BufferIsValid(buffer));
244
page = (Page) BufferGetPage(buffer);
246
/* ok, clear buffer */
247
if (xlrec.data->origleaf && newpage->header->blkno != GIST_ROOT_BLKNO)
251
GISTInitBuffer(buffer, flags);
254
gistfillbuffer(page, newpage->itup, newpage->header->num, FirstOffsetNumber);
256
if (newpage->header->blkno == GIST_ROOT_BLKNO)
258
GistPageGetOpaque(page)->rightlink = InvalidBlockNumber;
259
GistPageGetOpaque(page)->nsn = xldata->orignsn;
260
GistClearFollowRight(page);
264
if (i < xlrec.data->npage - 1)
265
GistPageGetOpaque(page)->rightlink = xlrec.page[i + 1].header->blkno;
267
GistPageGetOpaque(page)->rightlink = xldata->origrlink;
268
GistPageGetOpaque(page)->nsn = xldata->orignsn;
269
if (i < xlrec.data->npage - 1 && !isrootsplit)
270
GistMarkFollowRight(page);
272
GistClearFollowRight(page);
275
PageSetLSN(page, lsn);
276
PageSetTLI(page, ThisTimeLineID);
277
MarkBufferDirty(buffer);
278
UnlockReleaseBuffer(buffer);
283
gistRedoCreateIndex(XLogRecPtr lsn, XLogRecord *record)
285
RelFileNode *node = (RelFileNode *) XLogRecGetData(record);
289
buffer = XLogReadBuffer(*node, GIST_ROOT_BLKNO, true);
290
Assert(BufferIsValid(buffer));
291
page = (Page) BufferGetPage(buffer);
293
GISTInitBuffer(buffer, F_LEAF);
295
PageSetLSN(page, lsn);
296
PageSetTLI(page, ThisTimeLineID);
298
MarkBufferDirty(buffer);
299
UnlockReleaseBuffer(buffer);
303
gist_redo(XLogRecPtr lsn, XLogRecord *record)
305
uint8 info = record->xl_info & ~XLR_INFO_MASK;
306
MemoryContext oldCxt;
309
* GIST indexes do not require any conflict processing. NB: If we ever
310
* implement a similar optimization we have in b-tree, and remove killed
311
* tuples outside VACUUM, we'll need to handle that here.
313
RestoreBkpBlocks(lsn, record, false);
315
oldCxt = MemoryContextSwitchTo(opCtx);
318
case XLOG_GIST_PAGE_UPDATE:
319
gistRedoPageUpdateRecord(lsn, record);
321
case XLOG_GIST_PAGE_DELETE:
322
gistRedoPageDeleteRecord(lsn, record);
324
case XLOG_GIST_PAGE_SPLIT:
325
gistRedoPageSplitRecord(lsn, record);
327
case XLOG_GIST_CREATE_INDEX:
328
gistRedoCreateIndex(lsn, record);
331
elog(PANIC, "gist_redo: unknown op code %u", info);
334
MemoryContextSwitchTo(oldCxt);
335
MemoryContextReset(opCtx);
339
out_target(StringInfo buf, RelFileNode node)
341
appendStringInfo(buf, "rel %u/%u/%u",
342
node.spcNode, node.dbNode, node.relNode);
346
out_gistxlogPageUpdate(StringInfo buf, gistxlogPageUpdate *xlrec)
348
out_target(buf, xlrec->node);
349
appendStringInfo(buf, "; block number %u", xlrec->blkno);
353
out_gistxlogPageDelete(StringInfo buf, gistxlogPageDelete *xlrec)
355
appendStringInfo(buf, "page_delete: rel %u/%u/%u; blkno %u",
356
xlrec->node.spcNode, xlrec->node.dbNode, xlrec->node.relNode,
361
out_gistxlogPageSplit(StringInfo buf, gistxlogPageSplit *xlrec)
363
appendStringInfo(buf, "page_split: ");
364
out_target(buf, xlrec->node);
365
appendStringInfo(buf, "; block number %u splits to %d pages",
366
xlrec->origblkno, xlrec->npage);
370
gist_desc(StringInfo buf, uint8 xl_info, char *rec)
372
uint8 info = xl_info & ~XLR_INFO_MASK;
376
case XLOG_GIST_PAGE_UPDATE:
377
appendStringInfo(buf, "page_update: ");
378
out_gistxlogPageUpdate(buf, (gistxlogPageUpdate *) rec);
380
case XLOG_GIST_PAGE_DELETE:
381
out_gistxlogPageDelete(buf, (gistxlogPageDelete *) rec);
383
case XLOG_GIST_PAGE_SPLIT:
384
out_gistxlogPageSplit(buf, (gistxlogPageSplit *) rec);
386
case XLOG_GIST_CREATE_INDEX:
387
appendStringInfo(buf, "create_index: rel %u/%u/%u",
388
((RelFileNode *) rec)->spcNode,
389
((RelFileNode *) rec)->dbNode,
390
((RelFileNode *) rec)->relNode);
393
appendStringInfo(buf, "unknown gist op code %u", info);
399
gist_xlog_startup(void)
401
opCtx = createTempGistContext();
405
gist_xlog_cleanup(void)
407
MemoryContextDelete(opCtx);
411
* Write WAL record of a page split.
414
gistXLogSplit(RelFileNode node, BlockNumber blkno, bool page_is_leaf,
415
SplitedPageLayout *dist,
416
BlockNumber origrlink, GistNSN orignsn,
420
gistxlogPageSplit xlrec;
421
SplitedPageLayout *ptr;
426
for (ptr = dist; ptr; ptr = ptr->next)
429
rdata = (XLogRecData *) palloc(sizeof(XLogRecData) * (npage * 2 + 2));
432
xlrec.origblkno = blkno;
433
xlrec.origrlink = origrlink;
434
xlrec.orignsn = orignsn;
435
xlrec.origleaf = page_is_leaf;
436
xlrec.npage = (uint16) npage;
438
BufferIsValid(leftchildbuf) ? BufferGetBlockNumber(leftchildbuf) : InvalidBlockNumber;
440
rdata[0].data = (char *) &xlrec;
441
rdata[0].len = sizeof(gistxlogPageSplit);
442
rdata[0].buffer = InvalidBuffer;
447
* Include a full page image of the child buf. (only necessary if a
448
* checkpoint happened since the child page was split)
450
if (BufferIsValid(leftchildbuf))
452
rdata[cur - 1].next = &(rdata[cur]);
453
rdata[cur].data = NULL;
455
rdata[cur].buffer = leftchildbuf;
456
rdata[cur].buffer_std = true;
460
for (ptr = dist; ptr; ptr = ptr->next)
462
rdata[cur - 1].next = &(rdata[cur]);
463
rdata[cur].buffer = InvalidBuffer;
464
rdata[cur].data = (char *) &(ptr->block);
465
rdata[cur].len = sizeof(gistxlogPage);
468
rdata[cur - 1].next = &(rdata[cur]);
469
rdata[cur].buffer = InvalidBuffer;
470
rdata[cur].data = (char *) (ptr->list);
471
rdata[cur].len = ptr->lenlist;
474
rdata[cur - 1].next = NULL;
476
recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_SPLIT, rdata);
483
* Write XLOG record describing a page update. The update can include any
484
* number of deletions and/or insertions of tuples on a single index page.
486
* If this update inserts a downlink for a split page, also record that
487
* the F_FOLLOW_RIGHT flag on the child page is cleared and NSN set.
489
* Note that both the todelete array and the tuples are marked as belonging
490
* to the target buffer; they need not be stored in XLOG if XLogInsert decides
491
* to log the whole buffer contents instead. Also, we take care that there's
492
* at least one rdata item referencing the buffer, even when ntodelete and
493
* ituplen are both zero; this ensures that XLogInsert knows about the buffer.
496
gistXLogUpdate(RelFileNode node, Buffer buffer,
497
OffsetNumber *todelete, int ntodelete,
498
IndexTuple *itup, int ituplen,
502
gistxlogPageUpdate *xlrec;
507
rdata = (XLogRecData *) palloc(sizeof(XLogRecData) * (4 + ituplen));
508
xlrec = (gistxlogPageUpdate *) palloc(sizeof(gistxlogPageUpdate));
511
xlrec->blkno = BufferGetBlockNumber(buffer);
512
xlrec->ntodelete = ntodelete;
514
BufferIsValid(leftchildbuf) ? BufferGetBlockNumber(leftchildbuf) : InvalidBlockNumber;
516
rdata[0].buffer = buffer;
517
rdata[0].buffer_std = true;
518
rdata[0].data = NULL;
520
rdata[0].next = &(rdata[1]);
522
rdata[1].data = (char *) xlrec;
523
rdata[1].len = sizeof(gistxlogPageUpdate);
524
rdata[1].buffer = InvalidBuffer;
525
rdata[1].next = &(rdata[2]);
527
rdata[2].data = (char *) todelete;
528
rdata[2].len = sizeof(OffsetNumber) * ntodelete;
529
rdata[2].buffer = buffer;
530
rdata[2].buffer_std = true;
535
for (i = 0; i < ituplen; i++)
537
rdata[cur - 1].next = &(rdata[cur]);
538
rdata[cur].data = (char *) (itup[i]);
539
rdata[cur].len = IndexTupleSize(itup[i]);
540
rdata[cur].buffer = buffer;
541
rdata[cur].buffer_std = true;
546
* Include a full page image of the child buf. (only necessary if a
547
* checkpoint happened since the child page was split)
549
if (BufferIsValid(leftchildbuf))
551
rdata[cur - 1].next = &(rdata[cur]);
552
rdata[cur].data = NULL;
554
rdata[cur].buffer = leftchildbuf;
555
rdata[cur].buffer_std = true;
558
rdata[cur - 1].next = NULL;
560
recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_UPDATE, rdata);