1
/*-------------------------------------------------------------------------
4
* Item insertion in Lehman and Yao btrees for Postgres.
6
* Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
7
* Portions Copyright (c) 1994, Regents of the University of California
11
* $PostgreSQL: pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.119 2004-12-31 21:59:22 pgsql Exp $
13
*-------------------------------------------------------------------------
18
#include "access/heapam.h"
19
#include "access/nbtree.h"
20
#include "miscadmin.h"
25
/* context data for _bt_checksplitloc */
26
Size newitemsz; /* size of new item to be inserted */
27
bool is_leaf; /* T if splitting a leaf page */
28
bool is_rightmost; /* T if splitting a rightmost page */
30
bool have_split; /* found a valid split? */
32
/* these fields valid only if have_split is true */
33
bool newitemonleft; /* new item on left or right of best split */
34
OffsetNumber firstright; /* best split point */
35
int best_delta; /* best size delta so far */
39
static Buffer _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf);
41
static TransactionId _bt_check_unique(Relation rel, BTItem btitem,
42
Relation heapRel, Buffer buf,
43
ScanKey itup_scankey);
44
static InsertIndexResult _bt_insertonpg(Relation rel, Buffer buf,
46
int keysz, ScanKey scankey,
48
OffsetNumber afteritem,
49
bool split_only_page);
50
static Buffer _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
51
OffsetNumber newitemoff, Size newitemsz,
52
BTItem newitem, bool newitemonleft,
53
OffsetNumber *itup_off, BlockNumber *itup_blkno);
54
static OffsetNumber _bt_findsplitloc(Relation rel, Page page,
55
OffsetNumber newitemoff,
58
static void _bt_checksplitloc(FindSplitData *state, OffsetNumber firstright,
59
int leftfree, int rightfree,
60
bool newitemonleft, Size firstrightitemsz);
61
static void _bt_pgaddtup(Relation rel, Page page,
62
Size itemsize, BTItem btitem,
63
OffsetNumber itup_off, const char *where);
64
static bool _bt_isequal(TupleDesc itupdesc, Page page, OffsetNumber offnum,
65
int keysz, ScanKey scankey);
69
* _bt_doinsert() -- Handle insertion of a single btitem in the tree.
71
* This routine is called by the public interface routines, btbuild
72
* and btinsert. By here, btitem is filled in, including the TID.
75
_bt_doinsert(Relation rel, BTItem btitem,
76
bool index_is_unique, Relation heapRel)
78
IndexTuple itup = &(btitem->bti_itup);
79
int natts = rel->rd_rel->relnatts;
83
InsertIndexResult res;
85
/* we need a scan key to do our search, so build one */
86
itup_scankey = _bt_mkscankey(rel, itup);
89
/* find the first page containing this key */
90
stack = _bt_search(rel, natts, itup_scankey, false, &buf, BT_WRITE);
92
/* trade in our read lock for a write lock */
93
LockBuffer(buf, BUFFER_LOCK_UNLOCK);
94
LockBuffer(buf, BT_WRITE);
97
* If the page was split between the time that we surrendered our read
98
* lock and acquired our write lock, then this page may no longer be
99
* the right place for the key we want to insert. In this case, we
100
* need to move right in the tree. See Lehman and Yao for an
101
* excruciatingly precise description.
103
buf = _bt_moveright(rel, buf, natts, itup_scankey, false, BT_WRITE);
106
* If we're not allowing duplicates, make sure the key isn't already
109
* NOTE: obviously, _bt_check_unique can only detect keys that are
110
* already in the index; so it cannot defend against concurrent
111
* insertions of the same key. We protect against that by means of
112
* holding a write lock on the target page. Any other would-be
113
* inserter of the same key must acquire a write lock on the same
114
* target page, so only one would-be inserter can be making the check
115
* at one time. Furthermore, once we are past the check we hold write
116
* locks continuously until we have performed our insertion, so no
117
* later inserter can fail to see our insertion. (This requires some
118
* care in _bt_insertonpg.)
120
* If we must wait for another xact, we release the lock while waiting,
121
* and then must start over completely.
127
xwait = _bt_check_unique(rel, btitem, heapRel, buf, itup_scankey);
129
if (TransactionIdIsValid(xwait))
131
/* Have to wait for the other guy ... */
132
_bt_relbuf(rel, buf);
133
XactLockTableWait(xwait);
135
_bt_freestack(stack);
140
/* do the insertion */
141
res = _bt_insertonpg(rel, buf, stack, natts, itup_scankey, btitem,
145
_bt_freestack(stack);
146
_bt_freeskey(itup_scankey);
152
* _bt_check_unique() -- Check for violation of unique index constraint
154
* Returns InvalidTransactionId if there is no conflict, else an xact ID
155
* we must wait for to see if it commits a conflicting tuple. If an actual
156
* conflict is detected, no return --- just ereport().
159
_bt_check_unique(Relation rel, BTItem btitem, Relation heapRel,
160
Buffer buf, ScanKey itup_scankey)
162
TupleDesc itupdesc = RelationGetDescr(rel);
163
int natts = rel->rd_rel->relnatts;
168
Buffer nbuf = InvalidBuffer;
170
page = BufferGetPage(buf);
171
opaque = (BTPageOpaque) PageGetSpecialPointer(page);
172
maxoff = PageGetMaxOffsetNumber(page);
175
* Find first item >= proposed new item. Note we could also get a
176
* pointer to end-of-page here.
178
offset = _bt_binsrch(rel, buf, natts, itup_scankey, false);
181
* Scan over all equal tuples, looking for live conflicts.
192
* make sure the offset points to an actual item before trying to
195
if (offset <= maxoff)
197
curitemid = PageGetItemId(page, offset);
200
* We can skip items that are marked killed.
202
* Formerly, we applied _bt_isequal() before checking the kill
203
* flag, so as to fall out of the item loop as soon as
204
* possible. However, in the presence of heavy update activity
205
* an index may contain many killed items with the same key;
206
* running _bt_isequal() on each killed item gets expensive.
207
* Furthermore it is likely that the non-killed version of
208
* each key appears first, so that we didn't actually get to
209
* exit any sooner anyway. So now we just advance over killed
210
* items as quickly as we can. We only apply _bt_isequal()
211
* when we get to a non-killed item or the end of the page.
213
if (!ItemIdDeleted(curitemid))
216
* _bt_compare returns 0 for (1,NULL) and (1,NULL) -
217
* this's how we handling NULLs - and so we must not use
218
* _bt_compare in real comparison, but only for
219
* ordering/finding items on pages. - vadim 03/24/97
221
if (!_bt_isequal(itupdesc, page, offset, natts, itup_scankey))
222
break; /* we're past all the equal tuples */
224
/* okay, we gotta fetch the heap tuple ... */
225
cbti = (BTItem) PageGetItem(page, curitemid);
226
htup.t_self = cbti->bti_itup.t_tid;
227
if (heap_fetch(heapRel, SnapshotDirty, &htup, &hbuffer,
230
/* it is a duplicate */
231
TransactionId xwait =
232
(TransactionIdIsValid(SnapshotDirty->xmin)) ?
233
SnapshotDirty->xmin : SnapshotDirty->xmax;
235
ReleaseBuffer(hbuffer);
238
* If this tuple is being updated by other transaction
239
* then we have to wait for its commit/abort.
241
if (TransactionIdIsValid(xwait))
243
if (nbuf != InvalidBuffer)
244
_bt_relbuf(rel, nbuf);
245
/* Tell _bt_doinsert to wait... */
250
* Otherwise we have a definite conflict.
253
(errcode(ERRCODE_UNIQUE_VIOLATION),
254
errmsg("duplicate key violates unique constraint \"%s\"",
255
RelationGetRelationName(rel))));
257
else if (htup.t_data != NULL)
260
* Hmm, if we can't see the tuple, maybe it can be
261
* marked killed. This logic should match
262
* index_getnext and btgettuple.
264
LockBuffer(hbuffer, BUFFER_LOCK_SHARE);
265
if (HeapTupleSatisfiesVacuum(htup.t_data, RecentGlobalXmin,
266
hbuffer) == HEAPTUPLE_DEAD)
268
curitemid->lp_flags |= LP_DELETE;
269
SetBufferCommitInfoNeedsSave(buf);
271
LockBuffer(hbuffer, BUFFER_LOCK_UNLOCK);
273
ReleaseBuffer(hbuffer);
278
* Advance to next tuple to continue checking.
281
offset = OffsetNumberNext(offset);
284
/* If scankey == hikey we gotta check the next page too */
285
if (P_RIGHTMOST(opaque))
287
if (!_bt_isequal(itupdesc, page, P_HIKEY,
288
natts, itup_scankey))
290
/* Advance to next non-dead page --- there must be one */
293
nblkno = opaque->btpo_next;
294
nbuf = _bt_relandgetbuf(rel, nbuf, nblkno, BT_READ);
295
page = BufferGetPage(nbuf);
296
opaque = (BTPageOpaque) PageGetSpecialPointer(page);
297
if (!P_IGNORE(opaque))
299
if (P_RIGHTMOST(opaque))
300
elog(ERROR, "fell off the end of \"%s\"",
301
RelationGetRelationName(rel));
303
maxoff = PageGetMaxOffsetNumber(page);
304
offset = P_FIRSTDATAKEY(opaque);
308
if (nbuf != InvalidBuffer)
309
_bt_relbuf(rel, nbuf);
311
return InvalidTransactionId;
315
* _bt_insertonpg() -- Insert a tuple on a particular page in the index.
317
* This recursive procedure does the following things:
319
* + finds the right place to insert the tuple.
320
* + if necessary, splits the target page (making sure that the
321
* split is equitable as far as post-insert free space goes).
322
* + inserts the tuple.
323
* + if the page was split, pops the parent stack, and finds the
324
* right place to insert the new child pointer (by walking
325
* right using information stored in the parent stack).
326
* + invokes itself with the appropriate tuple for the right
327
* child page on the parent.
328
* + updates the metapage if a true root or fast root is split.
330
* On entry, we must have the right buffer on which to do the
331
* insertion, and the buffer must be pinned and locked. On return,
332
* we will have dropped both the pin and the write lock on the buffer.
334
* If 'afteritem' is >0 then the new tuple must be inserted after the
335
* existing item of that number, noplace else. If 'afteritem' is 0
336
* then the procedure finds the exact spot to insert it by searching.
337
* (keysz and scankey parameters are used ONLY if afteritem == 0.)
339
* NOTE: if the new key is equal to one or more existing keys, we can
340
* legitimately place it anywhere in the series of equal keys --- in fact,
341
* if the new key is equal to the page's "high key" we can place it on
342
* the next page. If it is equal to the high key, and there's not room
343
* to insert the new tuple on the current page without splitting, then
344
* we can move right hoping to find more free space and avoid a split.
345
* (We should not move right indefinitely, however, since that leads to
346
* O(N^2) insertion behavior in the presence of many equal keys.)
347
* Once we have chosen the page to put the key on, we'll insert it before
348
* any existing equal keys because of the way _bt_binsrch() works.
350
* The locking interactions in this code are critical. You should
351
* grok Lehman and Yao's paper before making any changes. In addition,
352
* you need to understand how we disambiguate duplicate keys in this
353
* implementation, in order to be able to find our location using
354
* L&Y "move right" operations. Since we may insert duplicate user
355
* keys, and since these dups may propagate up the tree, we use the
356
* 'afteritem' parameter to position ourselves correctly for the
357
* insertion on internal pages.
360
static InsertIndexResult
361
_bt_insertonpg(Relation rel,
367
OffsetNumber afteritem,
368
bool split_only_page)
370
InsertIndexResult res;
372
BTPageOpaque lpageop;
373
OffsetNumber itup_off;
374
BlockNumber itup_blkno;
375
OffsetNumber newitemoff;
376
OffsetNumber firstright = InvalidOffsetNumber;
379
page = BufferGetPage(buf);
380
lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
382
itemsz = IndexTupleDSize(btitem->bti_itup)
383
+ (sizeof(BTItemData) - sizeof(IndexTupleData));
385
itemsz = MAXALIGN(itemsz); /* be safe, PageAddItem will do this but
386
* we need to be consistent */
389
* Check whether the item can fit on a btree page at all. (Eventually,
390
* we ought to try to apply TOAST methods if not.) We actually need to
391
* be able to fit three items on every page, so restrict any one item
392
* to 1/3 the per-page available space. Note that at this point,
393
* itemsz doesn't include the ItemId.
395
if (itemsz > BTMaxItemSize(page))
397
(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
398
errmsg("index row size %lu exceeds btree maximum, %lu",
399
(unsigned long) itemsz,
400
(unsigned long) BTMaxItemSize(page))));
403
* Determine exactly where new item will go.
406
newitemoff = afteritem + 1;
410
* If we will need to split the page to put the item here,
411
* check whether we can put the tuple somewhere to the right,
412
* instead. Keep scanning right until we
413
* (a) find a page with enough free space,
414
* (b) reach the last page where the tuple can legally go, or
415
* (c) get tired of searching.
416
* (c) is not flippant; it is important because if there are many
417
* pages' worth of equal keys, it's better to split one of the early
418
* pages than to scan all the way to the end of the run of equal keys
419
* on every insert. We implement "get tired" as a random choice,
420
* since stopping after scanning a fixed number of pages wouldn't work
421
* well (we'd never reach the right-hand side of previously split
422
* pages). Currently the probability of moving right is set at 0.99,
423
* which may seem too high to change the behavior much, but it does an
424
* excellent job of preventing O(N^2) behavior with many equal keys.
427
bool movedright = false;
429
while (PageGetFreeSpace(page) < itemsz &&
430
!P_RIGHTMOST(lpageop) &&
431
_bt_compare(rel, keysz, scankey, page, P_HIKEY) == 0 &&
432
random() > (MAX_RANDOM_VALUE / 100))
435
* step right to next non-dead page
437
* must write-lock that page before releasing write lock on
438
* current page; else someone else's _bt_check_unique scan
439
* could fail to see our insertion. write locks on
440
* intermediate dead pages won't do because we don't know when
441
* they will get de-linked from the tree.
443
Buffer rbuf = InvalidBuffer;
447
BlockNumber rblkno = lpageop->btpo_next;
449
rbuf = _bt_relandgetbuf(rel, rbuf, rblkno, BT_WRITE);
450
page = BufferGetPage(rbuf);
451
lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
452
if (!P_IGNORE(lpageop))
454
if (P_RIGHTMOST(lpageop))
455
elog(ERROR, "fell off the end of \"%s\"",
456
RelationGetRelationName(rel));
458
_bt_relbuf(rel, buf);
464
* Now we are on the right page, so find the insert position. If
465
* we moved right at all, we know we should insert at the start of
466
* the page, else must find the position by searching.
469
newitemoff = P_FIRSTDATAKEY(lpageop);
471
newitemoff = _bt_binsrch(rel, buf, keysz, scankey, false);
475
* Do we need to split the page to fit the item on it?
477
* Note: PageGetFreeSpace() subtracts sizeof(ItemIdData) from its result,
478
* so this comparison is correct even though we appear to be
479
* accounting only for the item and not for its line pointer.
481
if (PageGetFreeSpace(page) < itemsz)
483
bool is_root = P_ISROOT(lpageop);
484
bool is_only = P_LEFTMOST(lpageop) && P_RIGHTMOST(lpageop);
488
/* Choose the split point */
489
firstright = _bt_findsplitloc(rel, page,
493
/* split the buffer into left and right halves */
494
rbuf = _bt_split(rel, buf, firstright,
495
newitemoff, itemsz, btitem, newitemonleft,
496
&itup_off, &itup_blkno);
501
* + our target page has been split;
502
* + the original tuple has been inserted;
503
* + we have write locks on both the old (left half)
504
* and new (right half) buffers, after the split; and
505
* + we know the key we want to insert into the parent
506
* (it's the "high key" on the left child page).
508
* We're ready to do the parent insertion. We need to hold onto the
509
* locks for the child pages until we locate the parent, but we can
510
* release them before doing the actual insertion (see Lehman and Yao
511
* for the reasoning).
514
_bt_insert_parent(rel, buf, rbuf, stack, is_root, is_only);
518
Buffer metabuf = InvalidBuffer;
520
BTMetaPageData *metad = NULL;
522
itup_off = newitemoff;
523
itup_blkno = BufferGetBlockNumber(buf);
526
* If we are doing this insert because we split a page that was
527
* the only one on its tree level, but was not the root, it may
528
* have been the "fast root". We need to ensure that the fast
529
* root link points at or above the current page. We can safely
530
* acquire a lock on the metapage here --- see comments for
535
metabuf = _bt_getbuf(rel, BTREE_METAPAGE, BT_WRITE);
536
metapg = BufferGetPage(metabuf);
537
metad = BTPageGetMeta(metapg);
539
if (metad->btm_fastlevel >= lpageop->btpo.level)
541
/* no update wanted */
542
_bt_relbuf(rel, metabuf);
543
metabuf = InvalidBuffer;
547
/* Do the update. No ereport(ERROR) until changes are logged */
548
START_CRIT_SECTION();
550
_bt_pgaddtup(rel, page, itemsz, btitem, newitemoff, "page");
552
if (BufferIsValid(metabuf))
554
metad->btm_fastroot = itup_blkno;
555
metad->btm_fastlevel = lpageop->btpo.level;
561
xl_btree_insert xlrec;
562
xl_btree_metadata xlmeta;
565
XLogRecData rdata[3];
566
XLogRecData *nextrdata;
567
BTItemData truncitem;
569
xlrec.target.node = rel->rd_node;
570
ItemPointerSet(&(xlrec.target.tid), itup_blkno, itup_off);
572
rdata[0].buffer = InvalidBuffer;
573
rdata[0].data = (char *) &xlrec;
574
rdata[0].len = SizeOfBtreeInsert;
575
rdata[0].next = nextrdata = &(rdata[1]);
577
if (BufferIsValid(metabuf))
579
xlmeta.root = metad->btm_root;
580
xlmeta.level = metad->btm_level;
581
xlmeta.fastroot = metad->btm_fastroot;
582
xlmeta.fastlevel = metad->btm_fastlevel;
584
nextrdata->buffer = InvalidBuffer;
585
nextrdata->data = (char *) &xlmeta;
586
nextrdata->len = sizeof(xl_btree_metadata);
587
nextrdata->next = nextrdata + 1;
589
xlinfo = XLOG_BTREE_INSERT_META;
591
else if (P_ISLEAF(lpageop))
592
xlinfo = XLOG_BTREE_INSERT_LEAF;
594
xlinfo = XLOG_BTREE_INSERT_UPPER;
596
/* Read comments in _bt_pgaddtup */
597
if (!P_ISLEAF(lpageop) && newitemoff == P_FIRSTDATAKEY(lpageop))
600
truncitem.bti_itup.t_info = sizeof(BTItemData);
601
nextrdata->data = (char *) &truncitem;
602
nextrdata->len = sizeof(BTItemData);
606
nextrdata->data = (char *) btitem;
607
nextrdata->len = IndexTupleDSize(btitem->bti_itup) +
608
(sizeof(BTItemData) - sizeof(IndexTupleData));
610
nextrdata->buffer = buf;
611
nextrdata->next = NULL;
613
recptr = XLogInsert(RM_BTREE_ID, xlinfo, rdata);
615
if (BufferIsValid(metabuf))
617
PageSetLSN(metapg, recptr);
618
PageSetTLI(metapg, ThisTimeLineID);
621
PageSetLSN(page, recptr);
622
PageSetTLI(page, ThisTimeLineID);
627
/* Write out the updated page and release pin/lock */
628
if (BufferIsValid(metabuf))
629
_bt_wrtbuf(rel, metabuf);
631
_bt_wrtbuf(rel, buf);
634
/* by here, the new tuple is inserted at itup_blkno/itup_off */
635
res = (InsertIndexResult) palloc(sizeof(InsertIndexResultData));
636
ItemPointerSet(&(res->pointerData), itup_blkno, itup_off);
642
* _bt_split() -- split a page in the btree.
644
* On entry, buf is the page to split, and is write-locked and pinned.
645
* firstright is the item index of the first item to be moved to the
646
* new right page. newitemoff etc. tell us about the new item that
647
* must be inserted along with the data from the old page.
649
* Returns the new right sibling of buf, pinned and write-locked.
650
* The pin and lock on buf are maintained. *itup_off and *itup_blkno
651
* are set to the exact location where newitem was inserted.
654
_bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
655
OffsetNumber newitemoff, Size newitemsz, BTItem newitem,
657
OffsetNumber *itup_off, BlockNumber *itup_blkno)
663
BTPageOpaque ropaque,
666
Buffer sbuf = InvalidBuffer;
668
BTPageOpaque sopaque = NULL;
672
OffsetNumber leftoff,
677
rbuf = _bt_getbuf(rel, P_NEW, BT_WRITE);
678
origpage = BufferGetPage(buf);
679
leftpage = PageGetTempPage(origpage, sizeof(BTPageOpaqueData));
680
rightpage = BufferGetPage(rbuf);
682
_bt_pageinit(leftpage, BufferGetPageSize(buf));
683
_bt_pageinit(rightpage, BufferGetPageSize(rbuf));
685
/* init btree private data */
686
oopaque = (BTPageOpaque) PageGetSpecialPointer(origpage);
687
lopaque = (BTPageOpaque) PageGetSpecialPointer(leftpage);
688
ropaque = (BTPageOpaque) PageGetSpecialPointer(rightpage);
690
/* if we're splitting this page, it won't be the root when we're done */
691
lopaque->btpo_flags = oopaque->btpo_flags;
692
lopaque->btpo_flags &= ~BTP_ROOT;
693
ropaque->btpo_flags = lopaque->btpo_flags;
694
lopaque->btpo_prev = oopaque->btpo_prev;
695
lopaque->btpo_next = BufferGetBlockNumber(rbuf);
696
ropaque->btpo_prev = BufferGetBlockNumber(buf);
697
ropaque->btpo_next = oopaque->btpo_next;
698
lopaque->btpo.level = ropaque->btpo.level = oopaque->btpo.level;
701
* If the page we're splitting is not the rightmost page at its level
702
* in the tree, then the first entry on the page is the high key for
703
* the page. We need to copy that to the right half. Otherwise
704
* (meaning the rightmost page case), all the items on the right half
709
if (!P_RIGHTMOST(oopaque))
711
itemid = PageGetItemId(origpage, P_HIKEY);
712
itemsz = ItemIdGetLength(itemid);
713
item = (BTItem) PageGetItem(origpage, itemid);
714
if (PageAddItem(rightpage, (Item) item, itemsz, rightoff,
715
LP_USED) == InvalidOffsetNumber)
716
elog(PANIC, "failed to add hikey to the right sibling");
717
rightoff = OffsetNumberNext(rightoff);
721
* The "high key" for the new left page will be the first key that's
722
* going to go into the new right page. This might be either the
723
* existing data item at position firstright, or the incoming tuple.
726
if (!newitemonleft && newitemoff == firstright)
728
/* incoming tuple will become first on right page */
734
/* existing item at firstright will become first on right page */
735
itemid = PageGetItemId(origpage, firstright);
736
itemsz = ItemIdGetLength(itemid);
737
item = (BTItem) PageGetItem(origpage, itemid);
739
if (PageAddItem(leftpage, (Item) item, itemsz, leftoff,
740
LP_USED) == InvalidOffsetNumber)
741
elog(PANIC, "failed to add hikey to the left sibling");
742
leftoff = OffsetNumberNext(leftoff);
745
* Now transfer all the data items to the appropriate page
747
maxoff = PageGetMaxOffsetNumber(origpage);
749
for (i = P_FIRSTDATAKEY(oopaque); i <= maxoff; i = OffsetNumberNext(i))
751
itemid = PageGetItemId(origpage, i);
752
itemsz = ItemIdGetLength(itemid);
753
item = (BTItem) PageGetItem(origpage, itemid);
755
/* does new item belong before this one? */
760
_bt_pgaddtup(rel, leftpage, newitemsz, newitem, leftoff,
763
*itup_blkno = BufferGetBlockNumber(buf);
764
leftoff = OffsetNumberNext(leftoff);
768
_bt_pgaddtup(rel, rightpage, newitemsz, newitem, rightoff,
770
*itup_off = rightoff;
771
*itup_blkno = BufferGetBlockNumber(rbuf);
772
rightoff = OffsetNumberNext(rightoff);
776
/* decide which page to put it on */
779
_bt_pgaddtup(rel, leftpage, itemsz, item, leftoff,
781
leftoff = OffsetNumberNext(leftoff);
785
_bt_pgaddtup(rel, rightpage, itemsz, item, rightoff,
787
rightoff = OffsetNumberNext(rightoff);
791
/* cope with possibility that newitem goes at the end */
796
_bt_pgaddtup(rel, leftpage, newitemsz, newitem, leftoff,
799
*itup_blkno = BufferGetBlockNumber(buf);
800
leftoff = OffsetNumberNext(leftoff);
804
_bt_pgaddtup(rel, rightpage, newitemsz, newitem, rightoff,
806
*itup_off = rightoff;
807
*itup_blkno = BufferGetBlockNumber(rbuf);
808
rightoff = OffsetNumberNext(rightoff);
813
* We have to grab the right sibling (if any) and fix the prev pointer
814
* there. We are guaranteed that this is deadlock-free since no other
815
* writer will be holding a lock on that page and trying to move left,
816
* and all readers release locks on a page before trying to fetch its
820
if (!P_RIGHTMOST(ropaque))
822
sbuf = _bt_getbuf(rel, ropaque->btpo_next, BT_WRITE);
823
spage = BufferGetPage(sbuf);
824
sopaque = (BTPageOpaque) PageGetSpecialPointer(spage);
825
if (sopaque->btpo_prev != ropaque->btpo_prev)
826
elog(PANIC, "right sibling's left-link doesn't match");
830
* Right sibling is locked, new siblings are prepared, but original
831
* page is not updated yet. Log changes before continuing.
833
* NO EREPORT(ERROR) till right sibling is updated.
835
START_CRIT_SECTION();
837
if (!P_RIGHTMOST(ropaque))
838
sopaque->btpo_prev = BufferGetBlockNumber(rbuf);
843
xl_btree_split xlrec;
846
XLogRecData rdata[4];
848
xlrec.target.node = rel->rd_node;
849
ItemPointerSet(&(xlrec.target.tid), *itup_blkno, *itup_off);
851
xlrec.otherblk = BufferGetBlockNumber(rbuf);
853
xlrec.otherblk = BufferGetBlockNumber(buf);
854
xlrec.leftblk = lopaque->btpo_prev;
855
xlrec.rightblk = ropaque->btpo_next;
856
xlrec.level = lopaque->btpo.level;
859
* Direct access to page is not good but faster - we should
860
* implement some new func in page API. Note we only store the
861
* tuples themselves, knowing that the item pointers are in the
862
* same order and can be reconstructed by scanning the tuples.
864
xlrec.leftlen = ((PageHeader) leftpage)->pd_special -
865
((PageHeader) leftpage)->pd_upper;
867
rdata[0].buffer = InvalidBuffer;
868
rdata[0].data = (char *) &xlrec;
869
rdata[0].len = SizeOfBtreeSplit;
870
rdata[0].next = &(rdata[1]);
872
rdata[1].buffer = InvalidBuffer;
873
rdata[1].data = (char *) leftpage + ((PageHeader) leftpage)->pd_upper;
874
rdata[1].len = xlrec.leftlen;
875
rdata[1].next = &(rdata[2]);
877
rdata[2].buffer = InvalidBuffer;
878
rdata[2].data = (char *) rightpage + ((PageHeader) rightpage)->pd_upper;
879
rdata[2].len = ((PageHeader) rightpage)->pd_special -
880
((PageHeader) rightpage)->pd_upper;
881
rdata[2].next = NULL;
883
if (!P_RIGHTMOST(ropaque))
885
rdata[2].next = &(rdata[3]);
886
rdata[3].buffer = sbuf;
887
rdata[3].data = NULL;
889
rdata[3].next = NULL;
892
if (P_ISROOT(oopaque))
893
xlinfo = newitemonleft ? XLOG_BTREE_SPLIT_L_ROOT : XLOG_BTREE_SPLIT_R_ROOT;
895
xlinfo = newitemonleft ? XLOG_BTREE_SPLIT_L : XLOG_BTREE_SPLIT_R;
897
recptr = XLogInsert(RM_BTREE_ID, xlinfo, rdata);
899
PageSetLSN(leftpage, recptr);
900
PageSetTLI(leftpage, ThisTimeLineID);
901
PageSetLSN(rightpage, recptr);
902
PageSetTLI(rightpage, ThisTimeLineID);
903
if (!P_RIGHTMOST(ropaque))
905
PageSetLSN(spage, recptr);
906
PageSetTLI(spage, ThisTimeLineID);
911
* By here, the original data page has been split into two new halves,
912
* and these are correct. The algorithm requires that the left page
913
* never move during a split, so we copy the new left page back on top
914
* of the original. Note that this is not a waste of time, since we
915
* also require (in the page management code) that the center of a
916
* page always be clean, and the most efficient way to guarantee this
917
* is just to compact the data by reinserting it into a new left page.
920
PageRestoreTempPage(leftpage, origpage);
924
/* write and release the old right sibling */
925
if (!P_RIGHTMOST(ropaque))
926
_bt_wrtbuf(rel, sbuf);
933
* _bt_findsplitloc() -- find an appropriate place to split a page.
935
* The idea here is to equalize the free space that will be on each split
936
* page, *after accounting for the inserted tuple*. (If we fail to account
937
* for it, we might find ourselves with too little room on the page that
938
* it needs to go into!)
940
* If the page is the rightmost page on its level, we instead try to arrange
941
* for twice as much free space on the right as on the left. In this way,
942
* when we are inserting successively increasing keys (consider sequences,
943
* timestamps, etc) we will end up with a tree whose pages are about 67% full,
944
* instead of the 50% full result that we'd get without this special case.
945
* (We could bias it even further to make the initially-loaded tree more full.
946
* But since the steady-state load for a btree is about 70%, we'd likely just
947
* be making more page-splitting work for ourselves later on, when we start
948
* seeing updates to existing tuples.)
950
* We are passed the intended insert position of the new tuple, expressed as
951
* the offsetnumber of the tuple it must go in front of. (This could be
952
* maxoff+1 if the tuple is to go at the end.)
954
* We return the index of the first existing tuple that should go on the
955
* righthand page, plus a boolean indicating whether the new tuple goes on
956
* the left or right page. The bool is necessary to disambiguate the case
957
* where firstright == newitemoff.
960
_bt_findsplitloc(Relation rel,
962
OffsetNumber newitemoff,
977
opaque = (BTPageOpaque) PageGetSpecialPointer(page);
979
/* Passed-in newitemsz is MAXALIGNED but does not include line pointer */
980
newitemsz += sizeof(ItemIdData);
981
state.newitemsz = newitemsz;
982
state.is_leaf = P_ISLEAF(opaque);
983
state.is_rightmost = P_RIGHTMOST(opaque);
984
state.have_split = false;
986
/* Total free space available on a btree page, after fixed overhead */
987
leftspace = rightspace =
988
PageGetPageSize(page) - SizeOfPageHeaderData -
989
MAXALIGN(sizeof(BTPageOpaqueData));
992
* Finding the best possible split would require checking all the
993
* possible split points, because of the high-key and left-key special
994
* cases. That's probably more work than it's worth; instead, stop as
995
* soon as we find a "good-enough" split, where good-enough is defined
996
* as an imbalance in free space of no more than pagesize/16
997
* (arbitrary...) This should let us stop near the middle on most
998
* pages, instead of plowing to the end.
1000
goodenough = leftspace / 16;
1002
/* The right page will have the same high key as the old page */
1003
if (!P_RIGHTMOST(opaque))
1005
itemid = PageGetItemId(page, P_HIKEY);
1006
rightspace -= (int) (MAXALIGN(ItemIdGetLength(itemid)) +
1007
sizeof(ItemIdData));
1010
/* Count up total space in data items without actually scanning 'em */
1011
dataitemtotal = rightspace - (int) PageGetFreeSpace(page);
1014
* Scan through the data items and calculate space usage for a split
1015
* at each possible position.
1017
dataitemstoleft = 0;
1018
maxoff = PageGetMaxOffsetNumber(page);
1020
for (offnum = P_FIRSTDATAKEY(opaque);
1022
offnum = OffsetNumberNext(offnum))
1028
itemid = PageGetItemId(page, offnum);
1029
itemsz = MAXALIGN(ItemIdGetLength(itemid)) + sizeof(ItemIdData);
1032
* We have to allow for the current item becoming the high key of
1033
* the left page; therefore it counts against left space as well
1036
leftfree = leftspace - dataitemstoleft - (int) itemsz;
1037
rightfree = rightspace - (dataitemtotal - dataitemstoleft);
1040
* Will the new item go to left or right of split?
1042
if (offnum > newitemoff)
1043
_bt_checksplitloc(&state, offnum, leftfree, rightfree,
1045
else if (offnum < newitemoff)
1046
_bt_checksplitloc(&state, offnum, leftfree, rightfree,
1050
/* need to try it both ways! */
1051
_bt_checksplitloc(&state, offnum, leftfree, rightfree,
1053
/* here we are contemplating newitem as first on right */
1054
_bt_checksplitloc(&state, offnum, leftfree, rightfree,
1058
/* Abort scan once we find a good-enough choice */
1059
if (state.have_split && state.best_delta <= goodenough)
1062
dataitemstoleft += itemsz;
1066
* I believe it is not possible to fail to find a feasible split, but
1069
if (!state.have_split)
1070
elog(ERROR, "could not find a feasible split point for \"%s\"",
1071
RelationGetRelationName(rel));
1073
*newitemonleft = state.newitemonleft;
1074
return state.firstright;
1078
* Subroutine to analyze a particular possible split choice (ie, firstright
1079
* and newitemonleft settings), and record the best split so far in *state.
1082
_bt_checksplitloc(FindSplitData *state, OffsetNumber firstright,
1083
int leftfree, int rightfree,
1084
bool newitemonleft, Size firstrightitemsz)
1087
* Account for the new item on whichever side it is to be put.
1090
leftfree -= (int) state->newitemsz;
1092
rightfree -= (int) state->newitemsz;
1095
* If we are not on the leaf level, we will be able to discard the key
1096
* data from the first item that winds up on the right page.
1098
if (!state->is_leaf)
1099
rightfree += (int) firstrightitemsz -
1100
(int) (MAXALIGN(sizeof(BTItemData)) + sizeof(ItemIdData));
1103
* If feasible split point, remember best delta.
1105
if (leftfree >= 0 && rightfree >= 0)
1109
if (state->is_rightmost)
1112
* On a rightmost page, try to equalize right free space with
1113
* twice the left free space. See comments for
1116
delta = (2 * leftfree) - rightfree;
1120
/* Otherwise, aim for equal free space on both sides */
1121
delta = leftfree - rightfree;
1126
if (!state->have_split || delta < state->best_delta)
1128
state->have_split = true;
1129
state->newitemonleft = newitemonleft;
1130
state->firstright = firstright;
1131
state->best_delta = delta;
1137
* _bt_insert_parent() -- Insert downlink into parent after a page split.
1139
* On entry, buf and rbuf are the left and right split pages, which we
1140
* still hold write locks on per the L&Y algorithm. We release the
1141
* write locks once we have write lock on the parent page. (Any sooner,
1142
* and it'd be possible for some other process to try to split or delete
1143
* one of these pages, and get confused because it cannot find the downlink.)
1145
* stack - stack showing how we got here. May be NULL in cases that don't
1146
* have to be efficient (concurrent ROOT split, WAL recovery)
1147
* is_root - we split the true root
1148
* is_only - we split a page alone on its level (might have been fast root)
1150
* This is exported so it can be called by nbtxlog.c.
1153
_bt_insert_parent(Relation rel,
1161
* Here we have to do something Lehman and Yao don't talk about: deal
1162
* with a root split and construction of a new root. If our stack is
1163
* empty then we have just split a node on what had been the root
1164
* level when we descended the tree. If it was still the root then we
1165
* perform a new-root construction. If it *wasn't* the root anymore,
1166
* search to find the next higher level that someone constructed
1167
* meanwhile, and find the right place to insert as for the normal
1170
* If we have to search for the parent level, we do so by re-descending
1171
* from the root. This is not super-efficient, but it's rare enough
1172
* not to matter. (This path is also taken when called from WAL
1173
* recovery --- we have no stack in that case.)
1179
Assert(stack == NULL);
1181
/* create a new root node and update the metapage */
1182
rootbuf = _bt_newroot(rel, buf, rbuf);
1183
/* release the split buffers */
1184
_bt_wrtbuf(rel, rootbuf);
1185
_bt_wrtbuf(rel, rbuf);
1186
_bt_wrtbuf(rel, buf);
1190
BlockNumber bknum = BufferGetBlockNumber(buf);
1191
BlockNumber rbknum = BufferGetBlockNumber(rbuf);
1192
Page page = BufferGetPage(buf);
1193
InsertIndexResult newres;
1195
BTStackData fakestack;
1201
BTPageOpaque lpageop;
1204
elog(DEBUG2, "concurrent ROOT page split");
1205
lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
1206
/* Find the leftmost page at the next level up */
1207
pbuf = _bt_get_endpoint(rel, lpageop->btpo.level + 1, false);
1208
/* Set up a phony stack entry pointing there */
1210
stack->bts_blkno = BufferGetBlockNumber(pbuf);
1211
stack->bts_offset = InvalidOffsetNumber;
1212
/* bts_btitem will be initialized below */
1213
stack->bts_parent = NULL;
1214
_bt_relbuf(rel, pbuf);
1217
/* get high key from left page == lowest key on new right page */
1218
ritem = (BTItem) PageGetItem(page,
1219
PageGetItemId(page, P_HIKEY));
1221
/* form an index tuple that points at the new right page */
1222
new_item = _bt_formitem(&(ritem->bti_itup));
1223
ItemPointerSet(&(new_item->bti_itup.t_tid), rbknum, P_HIKEY);
1226
* Find the parent buffer and get the parent page.
1228
* Oops - if we were moved right then we need to change stack item!
1229
* We want to find parent pointing to where we are, right ? -
1232
ItemPointerSet(&(stack->bts_btitem.bti_itup.t_tid),
1235
pbuf = _bt_getstackbuf(rel, stack, BT_WRITE);
1237
/* Now we can write and unlock the children */
1238
_bt_wrtbuf(rel, rbuf);
1239
_bt_wrtbuf(rel, buf);
1241
/* Check for error only after writing children */
1242
if (pbuf == InvalidBuffer)
1243
elog(ERROR, "failed to re-find parent key in \"%s\"",
1244
RelationGetRelationName(rel));
1246
/* Recursively update the parent */
1247
newres = _bt_insertonpg(rel, pbuf, stack->bts_parent,
1248
0, NULL, new_item, stack->bts_offset,
1258
* _bt_getstackbuf() -- Walk back up the tree one step, and find the item
1259
* we last looked at in the parent.
1261
* This is possible because we save the downlink from the parent item,
1262
* which is enough to uniquely identify it. Insertions into the parent
1263
* level could cause the item to move right; deletions could cause it
1264
* to move left, but not left of the page we previously found it in.
1266
* Adjusts bts_blkno & bts_offset if changed.
1268
* Returns InvalidBuffer if item not found (should not happen).
1271
_bt_getstackbuf(Relation rel, BTStack stack, int access)
1276
blkno = stack->bts_blkno;
1277
start = stack->bts_offset;
1283
BTPageOpaque opaque;
1285
buf = _bt_getbuf(rel, blkno, access);
1286
page = BufferGetPage(buf);
1287
opaque = (BTPageOpaque) PageGetSpecialPointer(page);
1289
if (!P_IGNORE(opaque))
1291
OffsetNumber offnum,
1297
minoff = P_FIRSTDATAKEY(opaque);
1298
maxoff = PageGetMaxOffsetNumber(page);
1301
* start = InvalidOffsetNumber means "search the whole page".
1302
* We need this test anyway due to possibility that page has a
1303
* high key now when it didn't before.
1309
* Need this check too, to guard against possibility that page
1310
* split since we visited it originally.
1313
start = OffsetNumberNext(maxoff);
1316
* These loops will check every item on the page --- but in an
1317
* order that's attuned to the probability of where it
1318
* actually is. Scan to the right first, then to the left.
1320
for (offnum = start;
1322
offnum = OffsetNumberNext(offnum))
1324
itemid = PageGetItemId(page, offnum);
1325
item = (BTItem) PageGetItem(page, itemid);
1326
if (BTItemSame(item, &stack->bts_btitem))
1328
/* Return accurate pointer to where link is now */
1329
stack->bts_blkno = blkno;
1330
stack->bts_offset = offnum;
1335
for (offnum = OffsetNumberPrev(start);
1337
offnum = OffsetNumberPrev(offnum))
1339
itemid = PageGetItemId(page, offnum);
1340
item = (BTItem) PageGetItem(page, itemid);
1341
if (BTItemSame(item, &stack->bts_btitem))
1343
/* Return accurate pointer to where link is now */
1344
stack->bts_blkno = blkno;
1345
stack->bts_offset = offnum;
1352
* The item we're looking for moved right at least one page.
1354
if (P_RIGHTMOST(opaque))
1356
_bt_relbuf(rel, buf);
1357
return InvalidBuffer;
1359
blkno = opaque->btpo_next;
1360
start = InvalidOffsetNumber;
1361
_bt_relbuf(rel, buf);
1366
* _bt_newroot() -- Create a new root page for the index.
1368
* We've just split the old root page and need to create a new one.
1369
* In order to do this, we add a new root page to the file, then lock
1370
* the metadata page and update it. This is guaranteed to be deadlock-
1371
* free, because all readers release their locks on the metadata page
1372
* before trying to lock the root, and all writers lock the root before
1373
* trying to lock the metadata page. We have a write lock on the old
1374
* root page, so we have not introduced any cycles into the waits-for
1377
* On entry, lbuf (the old root) and rbuf (its new peer) are write-
1378
* locked. On exit, a new root page exists with entries for the
1379
* two new children, metapage is updated and unlocked/unpinned.
1380
* The new root buffer is returned to caller which has to unlock/unpin
1381
* lbuf, rbuf & rootbuf.
1384
_bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf)
1392
BlockNumber rootblknum;
1393
BTPageOpaque rootopaque;
1400
BTMetaPageData *metad;
1402
lbkno = BufferGetBlockNumber(lbuf);
1403
rbkno = BufferGetBlockNumber(rbuf);
1404
lpage = BufferGetPage(lbuf);
1405
rpage = BufferGetPage(rbuf);
1407
/* get a new root page */
1408
rootbuf = _bt_getbuf(rel, P_NEW, BT_WRITE);
1409
rootpage = BufferGetPage(rootbuf);
1410
rootblknum = BufferGetBlockNumber(rootbuf);
1412
/* acquire lock on the metapage */
1413
metabuf = _bt_getbuf(rel, BTREE_METAPAGE, BT_WRITE);
1414
metapg = BufferGetPage(metabuf);
1415
metad = BTPageGetMeta(metapg);
1417
/* NO EREPORT(ERROR) from here till newroot op is logged */
1418
START_CRIT_SECTION();
1420
/* set btree special data */
1421
rootopaque = (BTPageOpaque) PageGetSpecialPointer(rootpage);
1422
rootopaque->btpo_prev = rootopaque->btpo_next = P_NONE;
1423
rootopaque->btpo_flags = BTP_ROOT;
1424
rootopaque->btpo.level =
1425
((BTPageOpaque) PageGetSpecialPointer(lpage))->btpo.level + 1;
1427
/* update metapage data */
1428
metad->btm_root = rootblknum;
1429
metad->btm_level = rootopaque->btpo.level;
1430
metad->btm_fastroot = rootblknum;
1431
metad->btm_fastlevel = rootopaque->btpo.level;
1434
* Create downlink item for left page (old root). Since this will be
1435
* the first item in a non-leaf page, it implicitly has minus-infinity
1436
* key value, so we need not store any actual key in it.
1438
itemsz = sizeof(BTItemData);
1439
new_item = (BTItem) palloc(itemsz);
1440
new_item->bti_itup.t_info = itemsz;
1441
ItemPointerSet(&(new_item->bti_itup.t_tid), lbkno, P_HIKEY);
1444
* Insert the left page pointer into the new root page. The root page
1445
* is the rightmost page on its level so there is no "high key" in it;
1446
* the two items will go into positions P_HIKEY and P_FIRSTKEY.
1448
if (PageAddItem(rootpage, (Item) new_item, itemsz, P_HIKEY, LP_USED) == InvalidOffsetNumber)
1449
elog(PANIC, "failed to add leftkey to new root page");
1453
* Create downlink item for right page. The key for it is obtained
1454
* from the "high key" position in the left page.
1456
itemid = PageGetItemId(lpage, P_HIKEY);
1457
itemsz = ItemIdGetLength(itemid);
1458
item = (BTItem) PageGetItem(lpage, itemid);
1459
new_item = _bt_formitem(&(item->bti_itup));
1460
ItemPointerSet(&(new_item->bti_itup.t_tid), rbkno, P_HIKEY);
1463
* insert the right page pointer into the new root page.
1465
if (PageAddItem(rootpage, (Item) new_item, itemsz, P_FIRSTKEY, LP_USED) == InvalidOffsetNumber)
1466
elog(PANIC, "failed to add rightkey to new root page");
1470
if (!rel->rd_istemp)
1472
xl_btree_newroot xlrec;
1474
XLogRecData rdata[2];
1476
xlrec.node = rel->rd_node;
1477
xlrec.rootblk = rootblknum;
1478
xlrec.level = metad->btm_level;
1480
rdata[0].buffer = InvalidBuffer;
1481
rdata[0].data = (char *) &xlrec;
1482
rdata[0].len = SizeOfBtreeNewroot;
1483
rdata[0].next = &(rdata[1]);
1486
* Direct access to page is not good but faster - we should
1487
* implement some new func in page API.
1489
rdata[1].buffer = InvalidBuffer;
1490
rdata[1].data = (char *) rootpage + ((PageHeader) rootpage)->pd_upper;
1491
rdata[1].len = ((PageHeader) rootpage)->pd_special -
1492
((PageHeader) rootpage)->pd_upper;
1493
rdata[1].next = NULL;
1495
recptr = XLogInsert(RM_BTREE_ID, XLOG_BTREE_NEWROOT, rdata);
1497
PageSetLSN(rootpage, recptr);
1498
PageSetTLI(rootpage, ThisTimeLineID);
1499
PageSetLSN(metapg, recptr);
1500
PageSetTLI(metapg, ThisTimeLineID);
1501
PageSetLSN(lpage, recptr);
1502
PageSetTLI(lpage, ThisTimeLineID);
1503
PageSetLSN(rpage, recptr);
1504
PageSetTLI(rpage, ThisTimeLineID);
1509
/* write and let go of metapage buffer */
1510
_bt_wrtbuf(rel, metabuf);
1516
* _bt_pgaddtup() -- add a tuple to a particular page in the index.
1518
* This routine adds the tuple to the page as requested. It does
1519
* not affect pin/lock status, but you'd better have a write lock
1520
* and pin on the target buffer! Don't forget to write and release
1521
* the buffer afterwards, either.
1523
* The main difference between this routine and a bare PageAddItem call
1524
* is that this code knows that the leftmost data item on a non-leaf
1525
* btree page doesn't need to have a key. Therefore, it strips such
1526
* items down to just the item header. CAUTION: this works ONLY if
1527
* we insert the items in order, so that the given itup_off does
1528
* represent the final position of the item!
1531
_bt_pgaddtup(Relation rel,
1535
OffsetNumber itup_off,
1538
BTPageOpaque opaque = (BTPageOpaque) PageGetSpecialPointer(page);
1539
BTItemData truncitem;
1541
if (!P_ISLEAF(opaque) && itup_off == P_FIRSTDATAKEY(opaque))
1543
memcpy(&truncitem, btitem, sizeof(BTItemData));
1544
truncitem.bti_itup.t_info = sizeof(BTItemData);
1545
btitem = &truncitem;
1546
itemsize = sizeof(BTItemData);
1549
if (PageAddItem(page, (Item) btitem, itemsize, itup_off,
1550
LP_USED) == InvalidOffsetNumber)
1551
elog(PANIC, "failed to add item to the %s for \"%s\"",
1552
where, RelationGetRelationName(rel));
1556
* _bt_isequal - used in _bt_doinsert in check for duplicates.
1558
* This is very similar to _bt_compare, except for NULL handling.
1559
* Rule is simple: NOT_NULL not equal NULL, NULL not equal NULL too.
1562
_bt_isequal(TupleDesc itupdesc, Page page, OffsetNumber offnum,
1563
int keysz, ScanKey scankey)
1569
/* Better be comparing to a leaf item */
1570
Assert(P_ISLEAF((BTPageOpaque) PageGetSpecialPointer(page)));
1572
btitem = (BTItem) PageGetItem(page, PageGetItemId(page, offnum));
1573
itup = &(btitem->bti_itup);
1575
for (i = 1; i <= keysz; i++)
1582
attno = scankey->sk_attno;
1584
datum = index_getattr(itup, attno, itupdesc, &isNull);
1586
/* NULLs are never equal to anything */
1587
if (isNull || (scankey->sk_flags & SK_ISNULL))
1590
result = DatumGetInt32(FunctionCall2(&scankey->sk_func,
1592
scankey->sk_argument));
1600
/* if we get here, the keys are equal */