~vcs-imports/mammoth-replicator/trunk

« back to all changes in this revision

Viewing changes to src/backend/access/nbtree/nbtinsert.c

  • Committer: alvherre
  • Date: 2005-12-16 21:24:52 UTC
  • Revision ID: svn-v4:db760fc0-0f08-0410-9d63-cc6633f64896:trunk:1
Initial import of the REL8_0_3 sources from the Pgsql CVS repository.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*-------------------------------------------------------------------------
 
2
 *
 
3
 * nbtinsert.c
 
4
 *        Item insertion in Lehman and Yao btrees for Postgres.
 
5
 *
 
6
 * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
 
7
 * Portions Copyright (c) 1994, Regents of the University of California
 
8
 *
 
9
 *
 
10
 * IDENTIFICATION
 
11
 *        $PostgreSQL: pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.119 2004-12-31 21:59:22 pgsql Exp $
 
12
 *
 
13
 *-------------------------------------------------------------------------
 
14
 */
 
15
 
 
16
#include "postgres.h"
 
17
 
 
18
#include "access/heapam.h"
 
19
#include "access/nbtree.h"
 
20
#include "miscadmin.h"
 
21
 
 
22
 
 
23
typedef struct
 
24
{
 
25
        /* context data for _bt_checksplitloc */
 
26
        Size            newitemsz;              /* size of new item to be inserted */
 
27
        bool            is_leaf;                /* T if splitting a leaf page */
 
28
        bool            is_rightmost;   /* T if splitting a rightmost page */
 
29
 
 
30
        bool            have_split;             /* found a valid split? */
 
31
 
 
32
        /* these fields valid only if have_split is true */
 
33
        bool            newitemonleft;  /* new item on left or right of best split */
 
34
        OffsetNumber firstright;        /* best split point */
 
35
        int                     best_delta;             /* best size delta so far */
 
36
} FindSplitData;
 
37
 
 
38
 
 
39
static Buffer _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf);
 
40
 
 
41
static TransactionId _bt_check_unique(Relation rel, BTItem btitem,
 
42
                                 Relation heapRel, Buffer buf,
 
43
                                 ScanKey itup_scankey);
 
44
static InsertIndexResult _bt_insertonpg(Relation rel, Buffer buf,
 
45
                           BTStack stack,
 
46
                           int keysz, ScanKey scankey,
 
47
                           BTItem btitem,
 
48
                           OffsetNumber afteritem,
 
49
                           bool split_only_page);
 
50
static Buffer _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
 
51
                  OffsetNumber newitemoff, Size newitemsz,
 
52
                  BTItem newitem, bool newitemonleft,
 
53
                  OffsetNumber *itup_off, BlockNumber *itup_blkno);
 
54
static OffsetNumber _bt_findsplitloc(Relation rel, Page page,
 
55
                                 OffsetNumber newitemoff,
 
56
                                 Size newitemsz,
 
57
                                 bool *newitemonleft);
 
58
static void _bt_checksplitloc(FindSplitData *state, OffsetNumber firstright,
 
59
                                  int leftfree, int rightfree,
 
60
                                  bool newitemonleft, Size firstrightitemsz);
 
61
static void _bt_pgaddtup(Relation rel, Page page,
 
62
                         Size itemsize, BTItem btitem,
 
63
                         OffsetNumber itup_off, const char *where);
 
64
static bool _bt_isequal(TupleDesc itupdesc, Page page, OffsetNumber offnum,
 
65
                        int keysz, ScanKey scankey);
 
66
 
 
67
 
 
68
/*
 
69
 *      _bt_doinsert() -- Handle insertion of a single btitem in the tree.
 
70
 *
 
71
 *              This routine is called by the public interface routines, btbuild
 
72
 *              and btinsert.  By here, btitem is filled in, including the TID.
 
73
 */
 
74
InsertIndexResult
 
75
_bt_doinsert(Relation rel, BTItem btitem,
 
76
                         bool index_is_unique, Relation heapRel)
 
77
{
 
78
        IndexTuple      itup = &(btitem->bti_itup);
 
79
        int                     natts = rel->rd_rel->relnatts;
 
80
        ScanKey         itup_scankey;
 
81
        BTStack         stack;
 
82
        Buffer          buf;
 
83
        InsertIndexResult res;
 
84
 
 
85
        /* we need a scan key to do our search, so build one */
 
86
        itup_scankey = _bt_mkscankey(rel, itup);
 
87
 
 
88
top:
 
89
        /* find the first page containing this key */
 
90
        stack = _bt_search(rel, natts, itup_scankey, false, &buf, BT_WRITE);
 
91
 
 
92
        /* trade in our read lock for a write lock */
 
93
        LockBuffer(buf, BUFFER_LOCK_UNLOCK);
 
94
        LockBuffer(buf, BT_WRITE);
 
95
 
 
96
        /*
 
97
         * If the page was split between the time that we surrendered our read
 
98
         * lock and acquired our write lock, then this page may no longer be
 
99
         * the right place for the key we want to insert.  In this case, we
 
100
         * need to move right in the tree.      See Lehman and Yao for an
 
101
         * excruciatingly precise description.
 
102
         */
 
103
        buf = _bt_moveright(rel, buf, natts, itup_scankey, false, BT_WRITE);
 
104
 
 
105
        /*
 
106
         * If we're not allowing duplicates, make sure the key isn't already
 
107
         * in the index.
 
108
         *
 
109
         * NOTE: obviously, _bt_check_unique can only detect keys that are
 
110
         * already in the index; so it cannot defend against concurrent
 
111
         * insertions of the same key.  We protect against that by means of
 
112
         * holding a write lock on the target page.  Any other would-be
 
113
         * inserter of the same key must acquire a write lock on the same
 
114
         * target page, so only one would-be inserter can be making the check
 
115
         * at one time.  Furthermore, once we are past the check we hold write
 
116
         * locks continuously until we have performed our insertion, so no
 
117
         * later inserter can fail to see our insertion.  (This requires some
 
118
         * care in _bt_insertonpg.)
 
119
         *
 
120
         * If we must wait for another xact, we release the lock while waiting,
 
121
         * and then must start over completely.
 
122
         */
 
123
        if (index_is_unique)
 
124
        {
 
125
                TransactionId xwait;
 
126
 
 
127
                xwait = _bt_check_unique(rel, btitem, heapRel, buf, itup_scankey);
 
128
 
 
129
                if (TransactionIdIsValid(xwait))
 
130
                {
 
131
                        /* Have to wait for the other guy ... */
 
132
                        _bt_relbuf(rel, buf);
 
133
                        XactLockTableWait(xwait);
 
134
                        /* start over... */
 
135
                        _bt_freestack(stack);
 
136
                        goto top;
 
137
                }
 
138
        }
 
139
 
 
140
        /* do the insertion */
 
141
        res = _bt_insertonpg(rel, buf, stack, natts, itup_scankey, btitem,
 
142
                                                 0, false);
 
143
 
 
144
        /* be tidy */
 
145
        _bt_freestack(stack);
 
146
        _bt_freeskey(itup_scankey);
 
147
 
 
148
        return res;
 
149
}
 
150
 
 
151
/*
 
152
 *      _bt_check_unique() -- Check for violation of unique index constraint
 
153
 *
 
154
 * Returns InvalidTransactionId if there is no conflict, else an xact ID
 
155
 * we must wait for to see if it commits a conflicting tuple.   If an actual
 
156
 * conflict is detected, no return --- just ereport().
 
157
 */
 
158
static TransactionId
 
159
_bt_check_unique(Relation rel, BTItem btitem, Relation heapRel,
 
160
                                 Buffer buf, ScanKey itup_scankey)
 
161
{
 
162
        TupleDesc       itupdesc = RelationGetDescr(rel);
 
163
        int                     natts = rel->rd_rel->relnatts;
 
164
        OffsetNumber offset,
 
165
                                maxoff;
 
166
        Page            page;
 
167
        BTPageOpaque opaque;
 
168
        Buffer          nbuf = InvalidBuffer;
 
169
 
 
170
        page = BufferGetPage(buf);
 
171
        opaque = (BTPageOpaque) PageGetSpecialPointer(page);
 
172
        maxoff = PageGetMaxOffsetNumber(page);
 
173
 
 
174
        /*
 
175
         * Find first item >= proposed new item.  Note we could also get a
 
176
         * pointer to end-of-page here.
 
177
         */
 
178
        offset = _bt_binsrch(rel, buf, natts, itup_scankey, false);
 
179
 
 
180
        /*
 
181
         * Scan over all equal tuples, looking for live conflicts.
 
182
         */
 
183
        for (;;)
 
184
        {
 
185
                HeapTupleData htup;
 
186
                Buffer          hbuffer;
 
187
                ItemId          curitemid;
 
188
                BTItem          cbti;
 
189
                BlockNumber nblkno;
 
190
 
 
191
                /*
 
192
                 * make sure the offset points to an actual item before trying to
 
193
                 * examine it...
 
194
                 */
 
195
                if (offset <= maxoff)
 
196
                {
 
197
                        curitemid = PageGetItemId(page, offset);
 
198
 
 
199
                        /*
 
200
                         * We can skip items that are marked killed.
 
201
                         *
 
202
                         * Formerly, we applied _bt_isequal() before checking the kill
 
203
                         * flag, so as to fall out of the item loop as soon as
 
204
                         * possible. However, in the presence of heavy update activity
 
205
                         * an index may contain many killed items with the same key;
 
206
                         * running _bt_isequal() on each killed item gets expensive.
 
207
                         * Furthermore it is likely that the non-killed version of
 
208
                         * each key appears first, so that we didn't actually get to
 
209
                         * exit any sooner anyway. So now we just advance over killed
 
210
                         * items as quickly as we can. We only apply _bt_isequal()
 
211
                         * when we get to a non-killed item or the end of the page.
 
212
                         */
 
213
                        if (!ItemIdDeleted(curitemid))
 
214
                        {
 
215
                                /*
 
216
                                 * _bt_compare returns 0 for (1,NULL) and (1,NULL) -
 
217
                                 * this's how we handling NULLs - and so we must not use
 
218
                                 * _bt_compare in real comparison, but only for
 
219
                                 * ordering/finding items on pages. - vadim 03/24/97
 
220
                                 */
 
221
                                if (!_bt_isequal(itupdesc, page, offset, natts, itup_scankey))
 
222
                                        break;          /* we're past all the equal tuples */
 
223
 
 
224
                                /* okay, we gotta fetch the heap tuple ... */
 
225
                                cbti = (BTItem) PageGetItem(page, curitemid);
 
226
                                htup.t_self = cbti->bti_itup.t_tid;
 
227
                                if (heap_fetch(heapRel, SnapshotDirty, &htup, &hbuffer,
 
228
                                                           true, NULL))
 
229
                                {
 
230
                                        /* it is a duplicate */
 
231
                                        TransactionId xwait =
 
232
                                        (TransactionIdIsValid(SnapshotDirty->xmin)) ?
 
233
                                        SnapshotDirty->xmin : SnapshotDirty->xmax;
 
234
 
 
235
                                        ReleaseBuffer(hbuffer);
 
236
 
 
237
                                        /*
 
238
                                         * If this tuple is being updated by other transaction
 
239
                                         * then we have to wait for its commit/abort.
 
240
                                         */
 
241
                                        if (TransactionIdIsValid(xwait))
 
242
                                        {
 
243
                                                if (nbuf != InvalidBuffer)
 
244
                                                        _bt_relbuf(rel, nbuf);
 
245
                                                /* Tell _bt_doinsert to wait... */
 
246
                                                return xwait;
 
247
                                        }
 
248
 
 
249
                                        /*
 
250
                                         * Otherwise we have a definite conflict.
 
251
                                         */
 
252
                                        ereport(ERROR,
 
253
                                                        (errcode(ERRCODE_UNIQUE_VIOLATION),
 
254
                                                         errmsg("duplicate key violates unique constraint \"%s\"",
 
255
                                                                        RelationGetRelationName(rel))));
 
256
                                }
 
257
                                else if (htup.t_data != NULL)
 
258
                                {
 
259
                                        /*
 
260
                                         * Hmm, if we can't see the tuple, maybe it can be
 
261
                                         * marked killed.  This logic should match
 
262
                                         * index_getnext and btgettuple.
 
263
                                         */
 
264
                                        LockBuffer(hbuffer, BUFFER_LOCK_SHARE);
 
265
                                        if (HeapTupleSatisfiesVacuum(htup.t_data, RecentGlobalXmin,
 
266
                                                                                                 hbuffer) == HEAPTUPLE_DEAD)
 
267
                                        {
 
268
                                                curitemid->lp_flags |= LP_DELETE;
 
269
                                                SetBufferCommitInfoNeedsSave(buf);
 
270
                                        }
 
271
                                        LockBuffer(hbuffer, BUFFER_LOCK_UNLOCK);
 
272
                                }
 
273
                                ReleaseBuffer(hbuffer);
 
274
                        }
 
275
                }
 
276
 
 
277
                /*
 
278
                 * Advance to next tuple to continue checking.
 
279
                 */
 
280
                if (offset < maxoff)
 
281
                        offset = OffsetNumberNext(offset);
 
282
                else
 
283
                {
 
284
                        /* If scankey == hikey we gotta check the next page too */
 
285
                        if (P_RIGHTMOST(opaque))
 
286
                                break;
 
287
                        if (!_bt_isequal(itupdesc, page, P_HIKEY,
 
288
                                                         natts, itup_scankey))
 
289
                                break;
 
290
                        /* Advance to next non-dead page --- there must be one */
 
291
                        for (;;)
 
292
                        {
 
293
                                nblkno = opaque->btpo_next;
 
294
                                nbuf = _bt_relandgetbuf(rel, nbuf, nblkno, BT_READ);
 
295
                                page = BufferGetPage(nbuf);
 
296
                                opaque = (BTPageOpaque) PageGetSpecialPointer(page);
 
297
                                if (!P_IGNORE(opaque))
 
298
                                        break;
 
299
                                if (P_RIGHTMOST(opaque))
 
300
                                        elog(ERROR, "fell off the end of \"%s\"",
 
301
                                                 RelationGetRelationName(rel));
 
302
                        }
 
303
                        maxoff = PageGetMaxOffsetNumber(page);
 
304
                        offset = P_FIRSTDATAKEY(opaque);
 
305
                }
 
306
        }
 
307
 
 
308
        if (nbuf != InvalidBuffer)
 
309
                _bt_relbuf(rel, nbuf);
 
310
 
 
311
        return InvalidTransactionId;
 
312
}
 
313
 
 
314
/*----------
 
315
 *      _bt_insertonpg() -- Insert a tuple on a particular page in the index.
 
316
 *
 
317
 *              This recursive procedure does the following things:
 
318
 *
 
319
 *                      +  finds the right place to insert the tuple.
 
320
 *                      +  if necessary, splits the target page (making sure that the
 
321
 *                         split is equitable as far as post-insert free space goes).
 
322
 *                      +  inserts the tuple.
 
323
 *                      +  if the page was split, pops the parent stack, and finds the
 
324
 *                         right place to insert the new child pointer (by walking
 
325
 *                         right using information stored in the parent stack).
 
326
 *                      +  invokes itself with the appropriate tuple for the right
 
327
 *                         child page on the parent.
 
328
 *                      +  updates the metapage if a true root or fast root is split.
 
329
 *
 
330
 *              On entry, we must have the right buffer on which to do the
 
331
 *              insertion, and the buffer must be pinned and locked.  On return,
 
332
 *              we will have dropped both the pin and the write lock on the buffer.
 
333
 *
 
334
 *              If 'afteritem' is >0 then the new tuple must be inserted after the
 
335
 *              existing item of that number, noplace else.  If 'afteritem' is 0
 
336
 *              then the procedure finds the exact spot to insert it by searching.
 
337
 *              (keysz and scankey parameters are used ONLY if afteritem == 0.)
 
338
 *
 
339
 *              NOTE: if the new key is equal to one or more existing keys, we can
 
340
 *              legitimately place it anywhere in the series of equal keys --- in fact,
 
341
 *              if the new key is equal to the page's "high key" we can place it on
 
342
 *              the next page.  If it is equal to the high key, and there's not room
 
343
 *              to insert the new tuple on the current page without splitting, then
 
344
 *              we can move right hoping to find more free space and avoid a split.
 
345
 *              (We should not move right indefinitely, however, since that leads to
 
346
 *              O(N^2) insertion behavior in the presence of many equal keys.)
 
347
 *              Once we have chosen the page to put the key on, we'll insert it before
 
348
 *              any existing equal keys because of the way _bt_binsrch() works.
 
349
 *
 
350
 *              The locking interactions in this code are critical.  You should
 
351
 *              grok Lehman and Yao's paper before making any changes.  In addition,
 
352
 *              you need to understand how we disambiguate duplicate keys in this
 
353
 *              implementation, in order to be able to find our location using
 
354
 *              L&Y "move right" operations.  Since we may insert duplicate user
 
355
 *              keys, and since these dups may propagate up the tree, we use the
 
356
 *              'afteritem' parameter to position ourselves correctly for the
 
357
 *              insertion on internal pages.
 
358
 *----------
 
359
 */
 
360
static InsertIndexResult
 
361
_bt_insertonpg(Relation rel,
 
362
                           Buffer buf,
 
363
                           BTStack stack,
 
364
                           int keysz,
 
365
                           ScanKey scankey,
 
366
                           BTItem btitem,
 
367
                           OffsetNumber afteritem,
 
368
                           bool split_only_page)
 
369
{
 
370
        InsertIndexResult res;
 
371
        Page            page;
 
372
        BTPageOpaque lpageop;
 
373
        OffsetNumber itup_off;
 
374
        BlockNumber itup_blkno;
 
375
        OffsetNumber newitemoff;
 
376
        OffsetNumber firstright = InvalidOffsetNumber;
 
377
        Size            itemsz;
 
378
 
 
379
        page = BufferGetPage(buf);
 
380
        lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
 
381
 
 
382
        itemsz = IndexTupleDSize(btitem->bti_itup)
 
383
                + (sizeof(BTItemData) - sizeof(IndexTupleData));
 
384
 
 
385
        itemsz = MAXALIGN(itemsz);      /* be safe, PageAddItem will do this but
 
386
                                                                 * we need to be consistent */
 
387
 
 
388
        /*
 
389
         * Check whether the item can fit on a btree page at all. (Eventually,
 
390
         * we ought to try to apply TOAST methods if not.) We actually need to
 
391
         * be able to fit three items on every page, so restrict any one item
 
392
         * to 1/3 the per-page available space. Note that at this point,
 
393
         * itemsz doesn't include the ItemId.
 
394
         */
 
395
        if (itemsz > BTMaxItemSize(page))
 
396
                ereport(ERROR,
 
397
                                (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
 
398
                                 errmsg("index row size %lu exceeds btree maximum, %lu",
 
399
                                                (unsigned long) itemsz,
 
400
                                                (unsigned long) BTMaxItemSize(page))));
 
401
 
 
402
        /*
 
403
         * Determine exactly where new item will go.
 
404
         */
 
405
        if (afteritem > 0)
 
406
                newitemoff = afteritem + 1;
 
407
        else
 
408
        {
 
409
                /*----------
 
410
                 * If we will need to split the page to put the item here,
 
411
                 * check whether we can put the tuple somewhere to the right,
 
412
                 * instead.  Keep scanning right until we
 
413
                 *              (a) find a page with enough free space,
 
414
                 *              (b) reach the last page where the tuple can legally go, or
 
415
                 *              (c) get tired of searching.
 
416
                 * (c) is not flippant; it is important because if there are many
 
417
                 * pages' worth of equal keys, it's better to split one of the early
 
418
                 * pages than to scan all the way to the end of the run of equal keys
 
419
                 * on every insert.  We implement "get tired" as a random choice,
 
420
                 * since stopping after scanning a fixed number of pages wouldn't work
 
421
                 * well (we'd never reach the right-hand side of previously split
 
422
                 * pages).      Currently the probability of moving right is set at 0.99,
 
423
                 * which may seem too high to change the behavior much, but it does an
 
424
                 * excellent job of preventing O(N^2) behavior with many equal keys.
 
425
                 *----------
 
426
                 */
 
427
                bool            movedright = false;
 
428
 
 
429
                while (PageGetFreeSpace(page) < itemsz &&
 
430
                           !P_RIGHTMOST(lpageop) &&
 
431
                           _bt_compare(rel, keysz, scankey, page, P_HIKEY) == 0 &&
 
432
                           random() > (MAX_RANDOM_VALUE / 100))
 
433
                {
 
434
                        /*
 
435
                         * step right to next non-dead page
 
436
                         *
 
437
                         * must write-lock that page before releasing write lock on
 
438
                         * current page; else someone else's _bt_check_unique scan
 
439
                         * could fail to see our insertion.  write locks on
 
440
                         * intermediate dead pages won't do because we don't know when
 
441
                         * they will get de-linked from the tree.
 
442
                         */
 
443
                        Buffer          rbuf = InvalidBuffer;
 
444
 
 
445
                        for (;;)
 
446
                        {
 
447
                                BlockNumber rblkno = lpageop->btpo_next;
 
448
 
 
449
                                rbuf = _bt_relandgetbuf(rel, rbuf, rblkno, BT_WRITE);
 
450
                                page = BufferGetPage(rbuf);
 
451
                                lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
 
452
                                if (!P_IGNORE(lpageop))
 
453
                                        break;
 
454
                                if (P_RIGHTMOST(lpageop))
 
455
                                        elog(ERROR, "fell off the end of \"%s\"",
 
456
                                                 RelationGetRelationName(rel));
 
457
                        }
 
458
                        _bt_relbuf(rel, buf);
 
459
                        buf = rbuf;
 
460
                        movedright = true;
 
461
                }
 
462
 
 
463
                /*
 
464
                 * Now we are on the right page, so find the insert position. If
 
465
                 * we moved right at all, we know we should insert at the start of
 
466
                 * the page, else must find the position by searching.
 
467
                 */
 
468
                if (movedright)
 
469
                        newitemoff = P_FIRSTDATAKEY(lpageop);
 
470
                else
 
471
                        newitemoff = _bt_binsrch(rel, buf, keysz, scankey, false);
 
472
        }
 
473
 
 
474
        /*
 
475
         * Do we need to split the page to fit the item on it?
 
476
         *
 
477
         * Note: PageGetFreeSpace() subtracts sizeof(ItemIdData) from its result,
 
478
         * so this comparison is correct even though we appear to be
 
479
         * accounting only for the item and not for its line pointer.
 
480
         */
 
481
        if (PageGetFreeSpace(page) < itemsz)
 
482
        {
 
483
                bool            is_root = P_ISROOT(lpageop);
 
484
                bool            is_only = P_LEFTMOST(lpageop) && P_RIGHTMOST(lpageop);
 
485
                bool            newitemonleft;
 
486
                Buffer          rbuf;
 
487
 
 
488
                /* Choose the split point */
 
489
                firstright = _bt_findsplitloc(rel, page,
 
490
                                                                          newitemoff, itemsz,
 
491
                                                                          &newitemonleft);
 
492
 
 
493
                /* split the buffer into left and right halves */
 
494
                rbuf = _bt_split(rel, buf, firstright,
 
495
                                                 newitemoff, itemsz, btitem, newitemonleft,
 
496
                                                 &itup_off, &itup_blkno);
 
497
 
 
498
                /*----------
 
499
                 * By here,
 
500
                 *
 
501
                 *              +  our target page has been split;
 
502
                 *              +  the original tuple has been inserted;
 
503
                 *              +  we have write locks on both the old (left half)
 
504
                 *                 and new (right half) buffers, after the split; and
 
505
                 *              +  we know the key we want to insert into the parent
 
506
                 *                 (it's the "high key" on the left child page).
 
507
                 *
 
508
                 * We're ready to do the parent insertion.  We need to hold onto the
 
509
                 * locks for the child pages until we locate the parent, but we can
 
510
                 * release them before doing the actual insertion (see Lehman and Yao
 
511
                 * for the reasoning).
 
512
                 *----------
 
513
                 */
 
514
                _bt_insert_parent(rel, buf, rbuf, stack, is_root, is_only);
 
515
        }
 
516
        else
 
517
        {
 
518
                Buffer          metabuf = InvalidBuffer;
 
519
                Page            metapg = NULL;
 
520
                BTMetaPageData *metad = NULL;
 
521
 
 
522
                itup_off = newitemoff;
 
523
                itup_blkno = BufferGetBlockNumber(buf);
 
524
 
 
525
                /*
 
526
                 * If we are doing this insert because we split a page that was
 
527
                 * the only one on its tree level, but was not the root, it may
 
528
                 * have been the "fast root".  We need to ensure that the fast
 
529
                 * root link points at or above the current page.  We can safely
 
530
                 * acquire a lock on the metapage here --- see comments for
 
531
                 * _bt_newroot().
 
532
                 */
 
533
                if (split_only_page)
 
534
                {
 
535
                        metabuf = _bt_getbuf(rel, BTREE_METAPAGE, BT_WRITE);
 
536
                        metapg = BufferGetPage(metabuf);
 
537
                        metad = BTPageGetMeta(metapg);
 
538
 
 
539
                        if (metad->btm_fastlevel >= lpageop->btpo.level)
 
540
                        {
 
541
                                /* no update wanted */
 
542
                                _bt_relbuf(rel, metabuf);
 
543
                                metabuf = InvalidBuffer;
 
544
                        }
 
545
                }
 
546
 
 
547
                /* Do the update.  No ereport(ERROR) until changes are logged */
 
548
                START_CRIT_SECTION();
 
549
 
 
550
                _bt_pgaddtup(rel, page, itemsz, btitem, newitemoff, "page");
 
551
 
 
552
                if (BufferIsValid(metabuf))
 
553
                {
 
554
                        metad->btm_fastroot = itup_blkno;
 
555
                        metad->btm_fastlevel = lpageop->btpo.level;
 
556
                }
 
557
 
 
558
                /* XLOG stuff */
 
559
                if (!rel->rd_istemp)
 
560
                {
 
561
                        xl_btree_insert xlrec;
 
562
                        xl_btree_metadata xlmeta;
 
563
                        uint8           xlinfo;
 
564
                        XLogRecPtr      recptr;
 
565
                        XLogRecData rdata[3];
 
566
                        XLogRecData *nextrdata;
 
567
                        BTItemData      truncitem;
 
568
 
 
569
                        xlrec.target.node = rel->rd_node;
 
570
                        ItemPointerSet(&(xlrec.target.tid), itup_blkno, itup_off);
 
571
 
 
572
                        rdata[0].buffer = InvalidBuffer;
 
573
                        rdata[0].data = (char *) &xlrec;
 
574
                        rdata[0].len = SizeOfBtreeInsert;
 
575
                        rdata[0].next = nextrdata = &(rdata[1]);
 
576
 
 
577
                        if (BufferIsValid(metabuf))
 
578
                        {
 
579
                                xlmeta.root = metad->btm_root;
 
580
                                xlmeta.level = metad->btm_level;
 
581
                                xlmeta.fastroot = metad->btm_fastroot;
 
582
                                xlmeta.fastlevel = metad->btm_fastlevel;
 
583
 
 
584
                                nextrdata->buffer = InvalidBuffer;
 
585
                                nextrdata->data = (char *) &xlmeta;
 
586
                                nextrdata->len = sizeof(xl_btree_metadata);
 
587
                                nextrdata->next = nextrdata + 1;
 
588
                                nextrdata++;
 
589
                                xlinfo = XLOG_BTREE_INSERT_META;
 
590
                        }
 
591
                        else if (P_ISLEAF(lpageop))
 
592
                                xlinfo = XLOG_BTREE_INSERT_LEAF;
 
593
                        else
 
594
                                xlinfo = XLOG_BTREE_INSERT_UPPER;
 
595
 
 
596
                        /* Read comments in _bt_pgaddtup */
 
597
                        if (!P_ISLEAF(lpageop) && newitemoff == P_FIRSTDATAKEY(lpageop))
 
598
                        {
 
599
                                truncitem = *btitem;
 
600
                                truncitem.bti_itup.t_info = sizeof(BTItemData);
 
601
                                nextrdata->data = (char *) &truncitem;
 
602
                                nextrdata->len = sizeof(BTItemData);
 
603
                        }
 
604
                        else
 
605
                        {
 
606
                                nextrdata->data = (char *) btitem;
 
607
                                nextrdata->len = IndexTupleDSize(btitem->bti_itup) +
 
608
                                        (sizeof(BTItemData) - sizeof(IndexTupleData));
 
609
                        }
 
610
                        nextrdata->buffer = buf;
 
611
                        nextrdata->next = NULL;
 
612
 
 
613
                        recptr = XLogInsert(RM_BTREE_ID, xlinfo, rdata);
 
614
 
 
615
                        if (BufferIsValid(metabuf))
 
616
                        {
 
617
                                PageSetLSN(metapg, recptr);
 
618
                                PageSetTLI(metapg, ThisTimeLineID);
 
619
                        }
 
620
 
 
621
                        PageSetLSN(page, recptr);
 
622
                        PageSetTLI(page, ThisTimeLineID);
 
623
                }
 
624
 
 
625
                END_CRIT_SECTION();
 
626
 
 
627
                /* Write out the updated page and release pin/lock */
 
628
                if (BufferIsValid(metabuf))
 
629
                        _bt_wrtbuf(rel, metabuf);
 
630
 
 
631
                _bt_wrtbuf(rel, buf);
 
632
        }
 
633
 
 
634
        /* by here, the new tuple is inserted at itup_blkno/itup_off */
 
635
        res = (InsertIndexResult) palloc(sizeof(InsertIndexResultData));
 
636
        ItemPointerSet(&(res->pointerData), itup_blkno, itup_off);
 
637
 
 
638
        return res;
 
639
}
 
640
 
 
641
/*
 
642
 *      _bt_split() -- split a page in the btree.
 
643
 *
 
644
 *              On entry, buf is the page to split, and is write-locked and pinned.
 
645
 *              firstright is the item index of the first item to be moved to the
 
646
 *              new right page.  newitemoff etc. tell us about the new item that
 
647
 *              must be inserted along with the data from the old page.
 
648
 *
 
649
 *              Returns the new right sibling of buf, pinned and write-locked.
 
650
 *              The pin and lock on buf are maintained.  *itup_off and *itup_blkno
 
651
 *              are set to the exact location where newitem was inserted.
 
652
 */
 
653
static Buffer
 
654
_bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
 
655
                  OffsetNumber newitemoff, Size newitemsz, BTItem newitem,
 
656
                  bool newitemonleft,
 
657
                  OffsetNumber *itup_off, BlockNumber *itup_blkno)
 
658
{
 
659
        Buffer          rbuf;
 
660
        Page            origpage;
 
661
        Page            leftpage,
 
662
                                rightpage;
 
663
        BTPageOpaque ropaque,
 
664
                                lopaque,
 
665
                                oopaque;
 
666
        Buffer          sbuf = InvalidBuffer;
 
667
        Page            spage = NULL;
 
668
        BTPageOpaque sopaque = NULL;
 
669
        Size            itemsz;
 
670
        ItemId          itemid;
 
671
        BTItem          item;
 
672
        OffsetNumber leftoff,
 
673
                                rightoff;
 
674
        OffsetNumber maxoff;
 
675
        OffsetNumber i;
 
676
 
 
677
        rbuf = _bt_getbuf(rel, P_NEW, BT_WRITE);
 
678
        origpage = BufferGetPage(buf);
 
679
        leftpage = PageGetTempPage(origpage, sizeof(BTPageOpaqueData));
 
680
        rightpage = BufferGetPage(rbuf);
 
681
 
 
682
        _bt_pageinit(leftpage, BufferGetPageSize(buf));
 
683
        _bt_pageinit(rightpage, BufferGetPageSize(rbuf));
 
684
 
 
685
        /* init btree private data */
 
686
        oopaque = (BTPageOpaque) PageGetSpecialPointer(origpage);
 
687
        lopaque = (BTPageOpaque) PageGetSpecialPointer(leftpage);
 
688
        ropaque = (BTPageOpaque) PageGetSpecialPointer(rightpage);
 
689
 
 
690
        /* if we're splitting this page, it won't be the root when we're done */
 
691
        lopaque->btpo_flags = oopaque->btpo_flags;
 
692
        lopaque->btpo_flags &= ~BTP_ROOT;
 
693
        ropaque->btpo_flags = lopaque->btpo_flags;
 
694
        lopaque->btpo_prev = oopaque->btpo_prev;
 
695
        lopaque->btpo_next = BufferGetBlockNumber(rbuf);
 
696
        ropaque->btpo_prev = BufferGetBlockNumber(buf);
 
697
        ropaque->btpo_next = oopaque->btpo_next;
 
698
        lopaque->btpo.level = ropaque->btpo.level = oopaque->btpo.level;
 
699
 
 
700
        /*
 
701
         * If the page we're splitting is not the rightmost page at its level
 
702
         * in the tree, then the first entry on the page is the high key for
 
703
         * the page.  We need to copy that to the right half.  Otherwise
 
704
         * (meaning the rightmost page case), all the items on the right half
 
705
         * will be user data.
 
706
         */
 
707
        rightoff = P_HIKEY;
 
708
 
 
709
        if (!P_RIGHTMOST(oopaque))
 
710
        {
 
711
                itemid = PageGetItemId(origpage, P_HIKEY);
 
712
                itemsz = ItemIdGetLength(itemid);
 
713
                item = (BTItem) PageGetItem(origpage, itemid);
 
714
                if (PageAddItem(rightpage, (Item) item, itemsz, rightoff,
 
715
                                                LP_USED) == InvalidOffsetNumber)
 
716
                        elog(PANIC, "failed to add hikey to the right sibling");
 
717
                rightoff = OffsetNumberNext(rightoff);
 
718
        }
 
719
 
 
720
        /*
 
721
         * The "high key" for the new left page will be the first key that's
 
722
         * going to go into the new right page.  This might be either the
 
723
         * existing data item at position firstright, or the incoming tuple.
 
724
         */
 
725
        leftoff = P_HIKEY;
 
726
        if (!newitemonleft && newitemoff == firstright)
 
727
        {
 
728
                /* incoming tuple will become first on right page */
 
729
                itemsz = newitemsz;
 
730
                item = newitem;
 
731
        }
 
732
        else
 
733
        {
 
734
                /* existing item at firstright will become first on right page */
 
735
                itemid = PageGetItemId(origpage, firstright);
 
736
                itemsz = ItemIdGetLength(itemid);
 
737
                item = (BTItem) PageGetItem(origpage, itemid);
 
738
        }
 
739
        if (PageAddItem(leftpage, (Item) item, itemsz, leftoff,
 
740
                                        LP_USED) == InvalidOffsetNumber)
 
741
                elog(PANIC, "failed to add hikey to the left sibling");
 
742
        leftoff = OffsetNumberNext(leftoff);
 
743
 
 
744
        /*
 
745
         * Now transfer all the data items to the appropriate page
 
746
         */
 
747
        maxoff = PageGetMaxOffsetNumber(origpage);
 
748
 
 
749
        for (i = P_FIRSTDATAKEY(oopaque); i <= maxoff; i = OffsetNumberNext(i))
 
750
        {
 
751
                itemid = PageGetItemId(origpage, i);
 
752
                itemsz = ItemIdGetLength(itemid);
 
753
                item = (BTItem) PageGetItem(origpage, itemid);
 
754
 
 
755
                /* does new item belong before this one? */
 
756
                if (i == newitemoff)
 
757
                {
 
758
                        if (newitemonleft)
 
759
                        {
 
760
                                _bt_pgaddtup(rel, leftpage, newitemsz, newitem, leftoff,
 
761
                                                         "left sibling");
 
762
                                *itup_off = leftoff;
 
763
                                *itup_blkno = BufferGetBlockNumber(buf);
 
764
                                leftoff = OffsetNumberNext(leftoff);
 
765
                        }
 
766
                        else
 
767
                        {
 
768
                                _bt_pgaddtup(rel, rightpage, newitemsz, newitem, rightoff,
 
769
                                                         "right sibling");
 
770
                                *itup_off = rightoff;
 
771
                                *itup_blkno = BufferGetBlockNumber(rbuf);
 
772
                                rightoff = OffsetNumberNext(rightoff);
 
773
                        }
 
774
                }
 
775
 
 
776
                /* decide which page to put it on */
 
777
                if (i < firstright)
 
778
                {
 
779
                        _bt_pgaddtup(rel, leftpage, itemsz, item, leftoff,
 
780
                                                 "left sibling");
 
781
                        leftoff = OffsetNumberNext(leftoff);
 
782
                }
 
783
                else
 
784
                {
 
785
                        _bt_pgaddtup(rel, rightpage, itemsz, item, rightoff,
 
786
                                                 "right sibling");
 
787
                        rightoff = OffsetNumberNext(rightoff);
 
788
                }
 
789
        }
 
790
 
 
791
        /* cope with possibility that newitem goes at the end */
 
792
        if (i <= newitemoff)
 
793
        {
 
794
                if (newitemonleft)
 
795
                {
 
796
                        _bt_pgaddtup(rel, leftpage, newitemsz, newitem, leftoff,
 
797
                                                 "left sibling");
 
798
                        *itup_off = leftoff;
 
799
                        *itup_blkno = BufferGetBlockNumber(buf);
 
800
                        leftoff = OffsetNumberNext(leftoff);
 
801
                }
 
802
                else
 
803
                {
 
804
                        _bt_pgaddtup(rel, rightpage, newitemsz, newitem, rightoff,
 
805
                                                 "right sibling");
 
806
                        *itup_off = rightoff;
 
807
                        *itup_blkno = BufferGetBlockNumber(rbuf);
 
808
                        rightoff = OffsetNumberNext(rightoff);
 
809
                }
 
810
        }
 
811
 
 
812
        /*
 
813
         * We have to grab the right sibling (if any) and fix the prev pointer
 
814
         * there. We are guaranteed that this is deadlock-free since no other
 
815
         * writer will be holding a lock on that page and trying to move left,
 
816
         * and all readers release locks on a page before trying to fetch its
 
817
         * neighbors.
 
818
         */
 
819
 
 
820
        if (!P_RIGHTMOST(ropaque))
 
821
        {
 
822
                sbuf = _bt_getbuf(rel, ropaque->btpo_next, BT_WRITE);
 
823
                spage = BufferGetPage(sbuf);
 
824
                sopaque = (BTPageOpaque) PageGetSpecialPointer(spage);
 
825
                if (sopaque->btpo_prev != ropaque->btpo_prev)
 
826
                        elog(PANIC, "right sibling's left-link doesn't match");
 
827
        }
 
828
 
 
829
        /*
 
830
         * Right sibling is locked, new siblings are prepared, but original
 
831
         * page is not updated yet. Log changes before continuing.
 
832
         *
 
833
         * NO EREPORT(ERROR) till right sibling is updated.
 
834
         */
 
835
        START_CRIT_SECTION();
 
836
 
 
837
        if (!P_RIGHTMOST(ropaque))
 
838
                sopaque->btpo_prev = BufferGetBlockNumber(rbuf);
 
839
 
 
840
        /* XLOG stuff */
 
841
        if (!rel->rd_istemp)
 
842
        {
 
843
                xl_btree_split xlrec;
 
844
                uint8           xlinfo;
 
845
                XLogRecPtr      recptr;
 
846
                XLogRecData rdata[4];
 
847
 
 
848
                xlrec.target.node = rel->rd_node;
 
849
                ItemPointerSet(&(xlrec.target.tid), *itup_blkno, *itup_off);
 
850
                if (newitemonleft)
 
851
                        xlrec.otherblk = BufferGetBlockNumber(rbuf);
 
852
                else
 
853
                        xlrec.otherblk = BufferGetBlockNumber(buf);
 
854
                xlrec.leftblk = lopaque->btpo_prev;
 
855
                xlrec.rightblk = ropaque->btpo_next;
 
856
                xlrec.level = lopaque->btpo.level;
 
857
 
 
858
                /*
 
859
                 * Direct access to page is not good but faster - we should
 
860
                 * implement some new func in page API.  Note we only store the
 
861
                 * tuples themselves, knowing that the item pointers are in the
 
862
                 * same order and can be reconstructed by scanning the tuples.
 
863
                 */
 
864
                xlrec.leftlen = ((PageHeader) leftpage)->pd_special -
 
865
                        ((PageHeader) leftpage)->pd_upper;
 
866
 
 
867
                rdata[0].buffer = InvalidBuffer;
 
868
                rdata[0].data = (char *) &xlrec;
 
869
                rdata[0].len = SizeOfBtreeSplit;
 
870
                rdata[0].next = &(rdata[1]);
 
871
 
 
872
                rdata[1].buffer = InvalidBuffer;
 
873
                rdata[1].data = (char *) leftpage + ((PageHeader) leftpage)->pd_upper;
 
874
                rdata[1].len = xlrec.leftlen;
 
875
                rdata[1].next = &(rdata[2]);
 
876
 
 
877
                rdata[2].buffer = InvalidBuffer;
 
878
                rdata[2].data = (char *) rightpage + ((PageHeader) rightpage)->pd_upper;
 
879
                rdata[2].len = ((PageHeader) rightpage)->pd_special -
 
880
                        ((PageHeader) rightpage)->pd_upper;
 
881
                rdata[2].next = NULL;
 
882
 
 
883
                if (!P_RIGHTMOST(ropaque))
 
884
                {
 
885
                        rdata[2].next = &(rdata[3]);
 
886
                        rdata[3].buffer = sbuf;
 
887
                        rdata[3].data = NULL;
 
888
                        rdata[3].len = 0;
 
889
                        rdata[3].next = NULL;
 
890
                }
 
891
 
 
892
                if (P_ISROOT(oopaque))
 
893
                        xlinfo = newitemonleft ? XLOG_BTREE_SPLIT_L_ROOT : XLOG_BTREE_SPLIT_R_ROOT;
 
894
                else
 
895
                        xlinfo = newitemonleft ? XLOG_BTREE_SPLIT_L : XLOG_BTREE_SPLIT_R;
 
896
 
 
897
                recptr = XLogInsert(RM_BTREE_ID, xlinfo, rdata);
 
898
 
 
899
                PageSetLSN(leftpage, recptr);
 
900
                PageSetTLI(leftpage, ThisTimeLineID);
 
901
                PageSetLSN(rightpage, recptr);
 
902
                PageSetTLI(rightpage, ThisTimeLineID);
 
903
                if (!P_RIGHTMOST(ropaque))
 
904
                {
 
905
                        PageSetLSN(spage, recptr);
 
906
                        PageSetTLI(spage, ThisTimeLineID);
 
907
                }
 
908
        }
 
909
 
 
910
        /*
 
911
         * By here, the original data page has been split into two new halves,
 
912
         * and these are correct.  The algorithm requires that the left page
 
913
         * never move during a split, so we copy the new left page back on top
 
914
         * of the original.  Note that this is not a waste of time, since we
 
915
         * also require (in the page management code) that the center of a
 
916
         * page always be clean, and the most efficient way to guarantee this
 
917
         * is just to compact the data by reinserting it into a new left page.
 
918
         */
 
919
 
 
920
        PageRestoreTempPage(leftpage, origpage);
 
921
 
 
922
        END_CRIT_SECTION();
 
923
 
 
924
        /* write and release the old right sibling */
 
925
        if (!P_RIGHTMOST(ropaque))
 
926
                _bt_wrtbuf(rel, sbuf);
 
927
 
 
928
        /* split's done */
 
929
        return rbuf;
 
930
}
 
931
 
 
932
/*
 
933
 *      _bt_findsplitloc() -- find an appropriate place to split a page.
 
934
 *
 
935
 * The idea here is to equalize the free space that will be on each split
 
936
 * page, *after accounting for the inserted tuple*.  (If we fail to account
 
937
 * for it, we might find ourselves with too little room on the page that
 
938
 * it needs to go into!)
 
939
 *
 
940
 * If the page is the rightmost page on its level, we instead try to arrange
 
941
 * for twice as much free space on the right as on the left.  In this way,
 
942
 * when we are inserting successively increasing keys (consider sequences,
 
943
 * timestamps, etc) we will end up with a tree whose pages are about 67% full,
 
944
 * instead of the 50% full result that we'd get without this special case.
 
945
 * (We could bias it even further to make the initially-loaded tree more full.
 
946
 * But since the steady-state load for a btree is about 70%, we'd likely just
 
947
 * be making more page-splitting work for ourselves later on, when we start
 
948
 * seeing updates to existing tuples.)
 
949
 *
 
950
 * We are passed the intended insert position of the new tuple, expressed as
 
951
 * the offsetnumber of the tuple it must go in front of.  (This could be
 
952
 * maxoff+1 if the tuple is to go at the end.)
 
953
 *
 
954
 * We return the index of the first existing tuple that should go on the
 
955
 * righthand page, plus a boolean indicating whether the new tuple goes on
 
956
 * the left or right page.      The bool is necessary to disambiguate the case
 
957
 * where firstright == newitemoff.
 
958
 */
 
959
static OffsetNumber
 
960
_bt_findsplitloc(Relation rel,
 
961
                                 Page page,
 
962
                                 OffsetNumber newitemoff,
 
963
                                 Size newitemsz,
 
964
                                 bool *newitemonleft)
 
965
{
 
966
        BTPageOpaque opaque;
 
967
        OffsetNumber offnum;
 
968
        OffsetNumber maxoff;
 
969
        ItemId          itemid;
 
970
        FindSplitData state;
 
971
        int                     leftspace,
 
972
                                rightspace,
 
973
                                goodenough,
 
974
                                dataitemtotal,
 
975
                                dataitemstoleft;
 
976
 
 
977
        opaque = (BTPageOpaque) PageGetSpecialPointer(page);
 
978
 
 
979
        /* Passed-in newitemsz is MAXALIGNED but does not include line pointer */
 
980
        newitemsz += sizeof(ItemIdData);
 
981
        state.newitemsz = newitemsz;
 
982
        state.is_leaf = P_ISLEAF(opaque);
 
983
        state.is_rightmost = P_RIGHTMOST(opaque);
 
984
        state.have_split = false;
 
985
 
 
986
        /* Total free space available on a btree page, after fixed overhead */
 
987
        leftspace = rightspace =
 
988
                PageGetPageSize(page) - SizeOfPageHeaderData -
 
989
                MAXALIGN(sizeof(BTPageOpaqueData));
 
990
 
 
991
        /*
 
992
         * Finding the best possible split would require checking all the
 
993
         * possible split points, because of the high-key and left-key special
 
994
         * cases. That's probably more work than it's worth; instead, stop as
 
995
         * soon as we find a "good-enough" split, where good-enough is defined
 
996
         * as an imbalance in free space of no more than pagesize/16
 
997
         * (arbitrary...) This should let us stop near the middle on most
 
998
         * pages, instead of plowing to the end.
 
999
         */
 
1000
        goodenough = leftspace / 16;
 
1001
 
 
1002
        /* The right page will have the same high key as the old page */
 
1003
        if (!P_RIGHTMOST(opaque))
 
1004
        {
 
1005
                itemid = PageGetItemId(page, P_HIKEY);
 
1006
                rightspace -= (int) (MAXALIGN(ItemIdGetLength(itemid)) +
 
1007
                                                         sizeof(ItemIdData));
 
1008
        }
 
1009
 
 
1010
        /* Count up total space in data items without actually scanning 'em */
 
1011
        dataitemtotal = rightspace - (int) PageGetFreeSpace(page);
 
1012
 
 
1013
        /*
 
1014
         * Scan through the data items and calculate space usage for a split
 
1015
         * at each possible position.
 
1016
         */
 
1017
        dataitemstoleft = 0;
 
1018
        maxoff = PageGetMaxOffsetNumber(page);
 
1019
 
 
1020
        for (offnum = P_FIRSTDATAKEY(opaque);
 
1021
                 offnum <= maxoff;
 
1022
                 offnum = OffsetNumberNext(offnum))
 
1023
        {
 
1024
                Size            itemsz;
 
1025
                int                     leftfree,
 
1026
                                        rightfree;
 
1027
 
 
1028
                itemid = PageGetItemId(page, offnum);
 
1029
                itemsz = MAXALIGN(ItemIdGetLength(itemid)) + sizeof(ItemIdData);
 
1030
 
 
1031
                /*
 
1032
                 * We have to allow for the current item becoming the high key of
 
1033
                 * the left page; therefore it counts against left space as well
 
1034
                 * as right space.
 
1035
                 */
 
1036
                leftfree = leftspace - dataitemstoleft - (int) itemsz;
 
1037
                rightfree = rightspace - (dataitemtotal - dataitemstoleft);
 
1038
 
 
1039
                /*
 
1040
                 * Will the new item go to left or right of split?
 
1041
                 */
 
1042
                if (offnum > newitemoff)
 
1043
                        _bt_checksplitloc(&state, offnum, leftfree, rightfree,
 
1044
                                                          true, itemsz);
 
1045
                else if (offnum < newitemoff)
 
1046
                        _bt_checksplitloc(&state, offnum, leftfree, rightfree,
 
1047
                                                          false, itemsz);
 
1048
                else
 
1049
                {
 
1050
                        /* need to try it both ways! */
 
1051
                        _bt_checksplitloc(&state, offnum, leftfree, rightfree,
 
1052
                                                          true, itemsz);
 
1053
                        /* here we are contemplating newitem as first on right */
 
1054
                        _bt_checksplitloc(&state, offnum, leftfree, rightfree,
 
1055
                                                          false, newitemsz);
 
1056
                }
 
1057
 
 
1058
                /* Abort scan once we find a good-enough choice */
 
1059
                if (state.have_split && state.best_delta <= goodenough)
 
1060
                        break;
 
1061
 
 
1062
                dataitemstoleft += itemsz;
 
1063
        }
 
1064
 
 
1065
        /*
 
1066
         * I believe it is not possible to fail to find a feasible split, but
 
1067
         * just in case ...
 
1068
         */
 
1069
        if (!state.have_split)
 
1070
                elog(ERROR, "could not find a feasible split point for \"%s\"",
 
1071
                         RelationGetRelationName(rel));
 
1072
 
 
1073
        *newitemonleft = state.newitemonleft;
 
1074
        return state.firstright;
 
1075
}
 
1076
 
 
1077
/*
 
1078
 * Subroutine to analyze a particular possible split choice (ie, firstright
 
1079
 * and newitemonleft settings), and record the best split so far in *state.
 
1080
 */
 
1081
static void
 
1082
_bt_checksplitloc(FindSplitData *state, OffsetNumber firstright,
 
1083
                                  int leftfree, int rightfree,
 
1084
                                  bool newitemonleft, Size firstrightitemsz)
 
1085
{
 
1086
        /*
 
1087
         * Account for the new item on whichever side it is to be put.
 
1088
         */
 
1089
        if (newitemonleft)
 
1090
                leftfree -= (int) state->newitemsz;
 
1091
        else
 
1092
                rightfree -= (int) state->newitemsz;
 
1093
 
 
1094
        /*
 
1095
         * If we are not on the leaf level, we will be able to discard the key
 
1096
         * data from the first item that winds up on the right page.
 
1097
         */
 
1098
        if (!state->is_leaf)
 
1099
                rightfree += (int) firstrightitemsz -
 
1100
                        (int) (MAXALIGN(sizeof(BTItemData)) + sizeof(ItemIdData));
 
1101
 
 
1102
        /*
 
1103
         * If feasible split point, remember best delta.
 
1104
         */
 
1105
        if (leftfree >= 0 && rightfree >= 0)
 
1106
        {
 
1107
                int                     delta;
 
1108
 
 
1109
                if (state->is_rightmost)
 
1110
                {
 
1111
                        /*
 
1112
                         * On a rightmost page, try to equalize right free space with
 
1113
                         * twice the left free space.  See comments for
 
1114
                         * _bt_findsplitloc.
 
1115
                         */
 
1116
                        delta = (2 * leftfree) - rightfree;
 
1117
                }
 
1118
                else
 
1119
                {
 
1120
                        /* Otherwise, aim for equal free space on both sides */
 
1121
                        delta = leftfree - rightfree;
 
1122
                }
 
1123
 
 
1124
                if (delta < 0)
 
1125
                        delta = -delta;
 
1126
                if (!state->have_split || delta < state->best_delta)
 
1127
                {
 
1128
                        state->have_split = true;
 
1129
                        state->newitemonleft = newitemonleft;
 
1130
                        state->firstright = firstright;
 
1131
                        state->best_delta = delta;
 
1132
                }
 
1133
        }
 
1134
}
 
1135
 
 
1136
/*
 
1137
 * _bt_insert_parent() -- Insert downlink into parent after a page split.
 
1138
 *
 
1139
 * On entry, buf and rbuf are the left and right split pages, which we
 
1140
 * still hold write locks on per the L&Y algorithm.  We release the
 
1141
 * write locks once we have write lock on the parent page.      (Any sooner,
 
1142
 * and it'd be possible for some other process to try to split or delete
 
1143
 * one of these pages, and get confused because it cannot find the downlink.)
 
1144
 *
 
1145
 * stack - stack showing how we got here.  May be NULL in cases that don't
 
1146
 *                      have to be efficient (concurrent ROOT split, WAL recovery)
 
1147
 * is_root - we split the true root
 
1148
 * is_only - we split a page alone on its level (might have been fast root)
 
1149
 *
 
1150
 * This is exported so it can be called by nbtxlog.c.
 
1151
 */
 
1152
void
 
1153
_bt_insert_parent(Relation rel,
 
1154
                                  Buffer buf,
 
1155
                                  Buffer rbuf,
 
1156
                                  BTStack stack,
 
1157
                                  bool is_root,
 
1158
                                  bool is_only)
 
1159
{
 
1160
        /*
 
1161
         * Here we have to do something Lehman and Yao don't talk about: deal
 
1162
         * with a root split and construction of a new root.  If our stack is
 
1163
         * empty then we have just split a node on what had been the root
 
1164
         * level when we descended the tree.  If it was still the root then we
 
1165
         * perform a new-root construction.  If it *wasn't* the root anymore,
 
1166
         * search to find the next higher level that someone constructed
 
1167
         * meanwhile, and find the right place to insert as for the normal
 
1168
         * case.
 
1169
         *
 
1170
         * If we have to search for the parent level, we do so by re-descending
 
1171
         * from the root.  This is not super-efficient, but it's rare enough
 
1172
         * not to matter.  (This path is also taken when called from WAL
 
1173
         * recovery --- we have no stack in that case.)
 
1174
         */
 
1175
        if (is_root)
 
1176
        {
 
1177
                Buffer          rootbuf;
 
1178
 
 
1179
                Assert(stack == NULL);
 
1180
                Assert(is_only);
 
1181
                /* create a new root node and update the metapage */
 
1182
                rootbuf = _bt_newroot(rel, buf, rbuf);
 
1183
                /* release the split buffers */
 
1184
                _bt_wrtbuf(rel, rootbuf);
 
1185
                _bt_wrtbuf(rel, rbuf);
 
1186
                _bt_wrtbuf(rel, buf);
 
1187
        }
 
1188
        else
 
1189
        {
 
1190
                BlockNumber bknum = BufferGetBlockNumber(buf);
 
1191
                BlockNumber rbknum = BufferGetBlockNumber(rbuf);
 
1192
                Page            page = BufferGetPage(buf);
 
1193
                InsertIndexResult newres;
 
1194
                BTItem          new_item;
 
1195
                BTStackData fakestack;
 
1196
                BTItem          ritem;
 
1197
                Buffer          pbuf;
 
1198
 
 
1199
                if (stack == NULL)
 
1200
                {
 
1201
                        BTPageOpaque lpageop;
 
1202
 
 
1203
                        if (!InRecovery)
 
1204
                                elog(DEBUG2, "concurrent ROOT page split");
 
1205
                        lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
 
1206
                        /* Find the leftmost page at the next level up */
 
1207
                        pbuf = _bt_get_endpoint(rel, lpageop->btpo.level + 1, false);
 
1208
                        /* Set up a phony stack entry pointing there */
 
1209
                        stack = &fakestack;
 
1210
                        stack->bts_blkno = BufferGetBlockNumber(pbuf);
 
1211
                        stack->bts_offset = InvalidOffsetNumber;
 
1212
                        /* bts_btitem will be initialized below */
 
1213
                        stack->bts_parent = NULL;
 
1214
                        _bt_relbuf(rel, pbuf);
 
1215
                }
 
1216
 
 
1217
                /* get high key from left page == lowest key on new right page */
 
1218
                ritem = (BTItem) PageGetItem(page,
 
1219
                                                                         PageGetItemId(page, P_HIKEY));
 
1220
 
 
1221
                /* form an index tuple that points at the new right page */
 
1222
                new_item = _bt_formitem(&(ritem->bti_itup));
 
1223
                ItemPointerSet(&(new_item->bti_itup.t_tid), rbknum, P_HIKEY);
 
1224
 
 
1225
                /*
 
1226
                 * Find the parent buffer and get the parent page.
 
1227
                 *
 
1228
                 * Oops - if we were moved right then we need to change stack item!
 
1229
                 * We want to find parent pointing to where we are, right ?    -
 
1230
                 * vadim 05/27/97
 
1231
                 */
 
1232
                ItemPointerSet(&(stack->bts_btitem.bti_itup.t_tid),
 
1233
                                           bknum, P_HIKEY);
 
1234
 
 
1235
                pbuf = _bt_getstackbuf(rel, stack, BT_WRITE);
 
1236
 
 
1237
                /* Now we can write and unlock the children */
 
1238
                _bt_wrtbuf(rel, rbuf);
 
1239
                _bt_wrtbuf(rel, buf);
 
1240
 
 
1241
                /* Check for error only after writing children */
 
1242
                if (pbuf == InvalidBuffer)
 
1243
                        elog(ERROR, "failed to re-find parent key in \"%s\"",
 
1244
                                 RelationGetRelationName(rel));
 
1245
 
 
1246
                /* Recursively update the parent */
 
1247
                newres = _bt_insertonpg(rel, pbuf, stack->bts_parent,
 
1248
                                                                0, NULL, new_item, stack->bts_offset,
 
1249
                                                                is_only);
 
1250
 
 
1251
                /* be tidy */
 
1252
                pfree(newres);
 
1253
                pfree(new_item);
 
1254
        }
 
1255
}
 
1256
 
 
1257
/*
 
1258
 *      _bt_getstackbuf() -- Walk back up the tree one step, and find the item
 
1259
 *                                               we last looked at in the parent.
 
1260
 *
 
1261
 *              This is possible because we save the downlink from the parent item,
 
1262
 *              which is enough to uniquely identify it.  Insertions into the parent
 
1263
 *              level could cause the item to move right; deletions could cause it
 
1264
 *              to move left, but not left of the page we previously found it in.
 
1265
 *
 
1266
 *              Adjusts bts_blkno & bts_offset if changed.
 
1267
 *
 
1268
 *              Returns InvalidBuffer if item not found (should not happen).
 
1269
 */
 
1270
Buffer
 
1271
_bt_getstackbuf(Relation rel, BTStack stack, int access)
 
1272
{
 
1273
        BlockNumber blkno;
 
1274
        OffsetNumber start;
 
1275
 
 
1276
        blkno = stack->bts_blkno;
 
1277
        start = stack->bts_offset;
 
1278
 
 
1279
        for (;;)
 
1280
        {
 
1281
                Buffer          buf;
 
1282
                Page            page;
 
1283
                BTPageOpaque opaque;
 
1284
 
 
1285
                buf = _bt_getbuf(rel, blkno, access);
 
1286
                page = BufferGetPage(buf);
 
1287
                opaque = (BTPageOpaque) PageGetSpecialPointer(page);
 
1288
 
 
1289
                if (!P_IGNORE(opaque))
 
1290
                {
 
1291
                        OffsetNumber offnum,
 
1292
                                                minoff,
 
1293
                                                maxoff;
 
1294
                        ItemId          itemid;
 
1295
                        BTItem          item;
 
1296
 
 
1297
                        minoff = P_FIRSTDATAKEY(opaque);
 
1298
                        maxoff = PageGetMaxOffsetNumber(page);
 
1299
 
 
1300
                        /*
 
1301
                         * start = InvalidOffsetNumber means "search the whole page".
 
1302
                         * We need this test anyway due to possibility that page has a
 
1303
                         * high key now when it didn't before.
 
1304
                         */
 
1305
                        if (start < minoff)
 
1306
                                start = minoff;
 
1307
 
 
1308
                        /*
 
1309
                         * Need this check too, to guard against possibility that page
 
1310
                         * split since we visited it originally.
 
1311
                         */
 
1312
                        if (start > maxoff)
 
1313
                                start = OffsetNumberNext(maxoff);
 
1314
 
 
1315
                        /*
 
1316
                         * These loops will check every item on the page --- but in an
 
1317
                         * order that's attuned to the probability of where it
 
1318
                         * actually is.  Scan to the right first, then to the left.
 
1319
                         */
 
1320
                        for (offnum = start;
 
1321
                                 offnum <= maxoff;
 
1322
                                 offnum = OffsetNumberNext(offnum))
 
1323
                        {
 
1324
                                itemid = PageGetItemId(page, offnum);
 
1325
                                item = (BTItem) PageGetItem(page, itemid);
 
1326
                                if (BTItemSame(item, &stack->bts_btitem))
 
1327
                                {
 
1328
                                        /* Return accurate pointer to where link is now */
 
1329
                                        stack->bts_blkno = blkno;
 
1330
                                        stack->bts_offset = offnum;
 
1331
                                        return buf;
 
1332
                                }
 
1333
                        }
 
1334
 
 
1335
                        for (offnum = OffsetNumberPrev(start);
 
1336
                                 offnum >= minoff;
 
1337
                                 offnum = OffsetNumberPrev(offnum))
 
1338
                        {
 
1339
                                itemid = PageGetItemId(page, offnum);
 
1340
                                item = (BTItem) PageGetItem(page, itemid);
 
1341
                                if (BTItemSame(item, &stack->bts_btitem))
 
1342
                                {
 
1343
                                        /* Return accurate pointer to where link is now */
 
1344
                                        stack->bts_blkno = blkno;
 
1345
                                        stack->bts_offset = offnum;
 
1346
                                        return buf;
 
1347
                                }
 
1348
                        }
 
1349
                }
 
1350
 
 
1351
                /*
 
1352
                 * The item we're looking for moved right at least one page.
 
1353
                 */
 
1354
                if (P_RIGHTMOST(opaque))
 
1355
                {
 
1356
                        _bt_relbuf(rel, buf);
 
1357
                        return InvalidBuffer;
 
1358
                }
 
1359
                blkno = opaque->btpo_next;
 
1360
                start = InvalidOffsetNumber;
 
1361
                _bt_relbuf(rel, buf);
 
1362
        }
 
1363
}
 
1364
 
 
1365
/*
 
1366
 *      _bt_newroot() -- Create a new root page for the index.
 
1367
 *
 
1368
 *              We've just split the old root page and need to create a new one.
 
1369
 *              In order to do this, we add a new root page to the file, then lock
 
1370
 *              the metadata page and update it.  This is guaranteed to be deadlock-
 
1371
 *              free, because all readers release their locks on the metadata page
 
1372
 *              before trying to lock the root, and all writers lock the root before
 
1373
 *              trying to lock the metadata page.  We have a write lock on the old
 
1374
 *              root page, so we have not introduced any cycles into the waits-for
 
1375
 *              graph.
 
1376
 *
 
1377
 *              On entry, lbuf (the old root) and rbuf (its new peer) are write-
 
1378
 *              locked. On exit, a new root page exists with entries for the
 
1379
 *              two new children, metapage is updated and unlocked/unpinned.
 
1380
 *              The new root buffer is returned to caller which has to unlock/unpin
 
1381
 *              lbuf, rbuf & rootbuf.
 
1382
 */
 
1383
static Buffer
 
1384
_bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf)
 
1385
{
 
1386
        Buffer          rootbuf;
 
1387
        Page            lpage,
 
1388
                                rpage,
 
1389
                                rootpage;
 
1390
        BlockNumber lbkno,
 
1391
                                rbkno;
 
1392
        BlockNumber rootblknum;
 
1393
        BTPageOpaque rootopaque;
 
1394
        ItemId          itemid;
 
1395
        BTItem          item;
 
1396
        Size            itemsz;
 
1397
        BTItem          new_item;
 
1398
        Buffer          metabuf;
 
1399
        Page            metapg;
 
1400
        BTMetaPageData *metad;
 
1401
 
 
1402
        lbkno = BufferGetBlockNumber(lbuf);
 
1403
        rbkno = BufferGetBlockNumber(rbuf);
 
1404
        lpage = BufferGetPage(lbuf);
 
1405
        rpage = BufferGetPage(rbuf);
 
1406
 
 
1407
        /* get a new root page */
 
1408
        rootbuf = _bt_getbuf(rel, P_NEW, BT_WRITE);
 
1409
        rootpage = BufferGetPage(rootbuf);
 
1410
        rootblknum = BufferGetBlockNumber(rootbuf);
 
1411
 
 
1412
        /* acquire lock on the metapage */
 
1413
        metabuf = _bt_getbuf(rel, BTREE_METAPAGE, BT_WRITE);
 
1414
        metapg = BufferGetPage(metabuf);
 
1415
        metad = BTPageGetMeta(metapg);
 
1416
 
 
1417
        /* NO EREPORT(ERROR) from here till newroot op is logged */
 
1418
        START_CRIT_SECTION();
 
1419
 
 
1420
        /* set btree special data */
 
1421
        rootopaque = (BTPageOpaque) PageGetSpecialPointer(rootpage);
 
1422
        rootopaque->btpo_prev = rootopaque->btpo_next = P_NONE;
 
1423
        rootopaque->btpo_flags = BTP_ROOT;
 
1424
        rootopaque->btpo.level =
 
1425
                ((BTPageOpaque) PageGetSpecialPointer(lpage))->btpo.level + 1;
 
1426
 
 
1427
        /* update metapage data */
 
1428
        metad->btm_root = rootblknum;
 
1429
        metad->btm_level = rootopaque->btpo.level;
 
1430
        metad->btm_fastroot = rootblknum;
 
1431
        metad->btm_fastlevel = rootopaque->btpo.level;
 
1432
 
 
1433
        /*
 
1434
         * Create downlink item for left page (old root).  Since this will be
 
1435
         * the first item in a non-leaf page, it implicitly has minus-infinity
 
1436
         * key value, so we need not store any actual key in it.
 
1437
         */
 
1438
        itemsz = sizeof(BTItemData);
 
1439
        new_item = (BTItem) palloc(itemsz);
 
1440
        new_item->bti_itup.t_info = itemsz;
 
1441
        ItemPointerSet(&(new_item->bti_itup.t_tid), lbkno, P_HIKEY);
 
1442
 
 
1443
        /*
 
1444
         * Insert the left page pointer into the new root page.  The root page
 
1445
         * is the rightmost page on its level so there is no "high key" in it;
 
1446
         * the two items will go into positions P_HIKEY and P_FIRSTKEY.
 
1447
         */
 
1448
        if (PageAddItem(rootpage, (Item) new_item, itemsz, P_HIKEY, LP_USED) == InvalidOffsetNumber)
 
1449
                elog(PANIC, "failed to add leftkey to new root page");
 
1450
        pfree(new_item);
 
1451
 
 
1452
        /*
 
1453
         * Create downlink item for right page.  The key for it is obtained
 
1454
         * from the "high key" position in the left page.
 
1455
         */
 
1456
        itemid = PageGetItemId(lpage, P_HIKEY);
 
1457
        itemsz = ItemIdGetLength(itemid);
 
1458
        item = (BTItem) PageGetItem(lpage, itemid);
 
1459
        new_item = _bt_formitem(&(item->bti_itup));
 
1460
        ItemPointerSet(&(new_item->bti_itup.t_tid), rbkno, P_HIKEY);
 
1461
 
 
1462
        /*
 
1463
         * insert the right page pointer into the new root page.
 
1464
         */
 
1465
        if (PageAddItem(rootpage, (Item) new_item, itemsz, P_FIRSTKEY, LP_USED) == InvalidOffsetNumber)
 
1466
                elog(PANIC, "failed to add rightkey to new root page");
 
1467
        pfree(new_item);
 
1468
 
 
1469
        /* XLOG stuff */
 
1470
        if (!rel->rd_istemp)
 
1471
        {
 
1472
                xl_btree_newroot xlrec;
 
1473
                XLogRecPtr      recptr;
 
1474
                XLogRecData rdata[2];
 
1475
 
 
1476
                xlrec.node = rel->rd_node;
 
1477
                xlrec.rootblk = rootblknum;
 
1478
                xlrec.level = metad->btm_level;
 
1479
 
 
1480
                rdata[0].buffer = InvalidBuffer;
 
1481
                rdata[0].data = (char *) &xlrec;
 
1482
                rdata[0].len = SizeOfBtreeNewroot;
 
1483
                rdata[0].next = &(rdata[1]);
 
1484
 
 
1485
                /*
 
1486
                 * Direct access to page is not good but faster - we should
 
1487
                 * implement some new func in page API.
 
1488
                 */
 
1489
                rdata[1].buffer = InvalidBuffer;
 
1490
                rdata[1].data = (char *) rootpage + ((PageHeader) rootpage)->pd_upper;
 
1491
                rdata[1].len = ((PageHeader) rootpage)->pd_special -
 
1492
                        ((PageHeader) rootpage)->pd_upper;
 
1493
                rdata[1].next = NULL;
 
1494
 
 
1495
                recptr = XLogInsert(RM_BTREE_ID, XLOG_BTREE_NEWROOT, rdata);
 
1496
 
 
1497
                PageSetLSN(rootpage, recptr);
 
1498
                PageSetTLI(rootpage, ThisTimeLineID);
 
1499
                PageSetLSN(metapg, recptr);
 
1500
                PageSetTLI(metapg, ThisTimeLineID);
 
1501
                PageSetLSN(lpage, recptr);
 
1502
                PageSetTLI(lpage, ThisTimeLineID);
 
1503
                PageSetLSN(rpage, recptr);
 
1504
                PageSetTLI(rpage, ThisTimeLineID);
 
1505
        }
 
1506
 
 
1507
        END_CRIT_SECTION();
 
1508
 
 
1509
        /* write and let go of metapage buffer */
 
1510
        _bt_wrtbuf(rel, metabuf);
 
1511
 
 
1512
        return (rootbuf);
 
1513
}
 
1514
 
 
1515
/*
 
1516
 *      _bt_pgaddtup() -- add a tuple to a particular page in the index.
 
1517
 *
 
1518
 *              This routine adds the tuple to the page as requested.  It does
 
1519
 *              not affect pin/lock status, but you'd better have a write lock
 
1520
 *              and pin on the target buffer!  Don't forget to write and release
 
1521
 *              the buffer afterwards, either.
 
1522
 *
 
1523
 *              The main difference between this routine and a bare PageAddItem call
 
1524
 *              is that this code knows that the leftmost data item on a non-leaf
 
1525
 *              btree page doesn't need to have a key.  Therefore, it strips such
 
1526
 *              items down to just the item header.  CAUTION: this works ONLY if
 
1527
 *              we insert the items in order, so that the given itup_off does
 
1528
 *              represent the final position of the item!
 
1529
 */
 
1530
static void
 
1531
_bt_pgaddtup(Relation rel,
 
1532
                         Page page,
 
1533
                         Size itemsize,
 
1534
                         BTItem btitem,
 
1535
                         OffsetNumber itup_off,
 
1536
                         const char *where)
 
1537
{
 
1538
        BTPageOpaque opaque = (BTPageOpaque) PageGetSpecialPointer(page);
 
1539
        BTItemData      truncitem;
 
1540
 
 
1541
        if (!P_ISLEAF(opaque) && itup_off == P_FIRSTDATAKEY(opaque))
 
1542
        {
 
1543
                memcpy(&truncitem, btitem, sizeof(BTItemData));
 
1544
                truncitem.bti_itup.t_info = sizeof(BTItemData);
 
1545
                btitem = &truncitem;
 
1546
                itemsize = sizeof(BTItemData);
 
1547
        }
 
1548
 
 
1549
        if (PageAddItem(page, (Item) btitem, itemsize, itup_off,
 
1550
                                        LP_USED) == InvalidOffsetNumber)
 
1551
                elog(PANIC, "failed to add item to the %s for \"%s\"",
 
1552
                         where, RelationGetRelationName(rel));
 
1553
}
 
1554
 
 
1555
/*
 
1556
 * _bt_isequal - used in _bt_doinsert in check for duplicates.
 
1557
 *
 
1558
 * This is very similar to _bt_compare, except for NULL handling.
 
1559
 * Rule is simple: NOT_NULL not equal NULL, NULL not equal NULL too.
 
1560
 */
 
1561
static bool
 
1562
_bt_isequal(TupleDesc itupdesc, Page page, OffsetNumber offnum,
 
1563
                        int keysz, ScanKey scankey)
 
1564
{
 
1565
        BTItem          btitem;
 
1566
        IndexTuple      itup;
 
1567
        int                     i;
 
1568
 
 
1569
        /* Better be comparing to a leaf item */
 
1570
        Assert(P_ISLEAF((BTPageOpaque) PageGetSpecialPointer(page)));
 
1571
 
 
1572
        btitem = (BTItem) PageGetItem(page, PageGetItemId(page, offnum));
 
1573
        itup = &(btitem->bti_itup);
 
1574
 
 
1575
        for (i = 1; i <= keysz; i++)
 
1576
        {
 
1577
                AttrNumber      attno;
 
1578
                Datum           datum;
 
1579
                bool            isNull;
 
1580
                int32           result;
 
1581
 
 
1582
                attno = scankey->sk_attno;
 
1583
                Assert(attno == i);
 
1584
                datum = index_getattr(itup, attno, itupdesc, &isNull);
 
1585
 
 
1586
                /* NULLs are never equal to anything */
 
1587
                if (isNull || (scankey->sk_flags & SK_ISNULL))
 
1588
                        return false;
 
1589
 
 
1590
                result = DatumGetInt32(FunctionCall2(&scankey->sk_func,
 
1591
                                                                                         datum,
 
1592
                                                                                         scankey->sk_argument));
 
1593
 
 
1594
                if (result != 0)
 
1595
                        return false;
 
1596
 
 
1597
                scankey++;
 
1598
        }
 
1599
 
 
1600
        /* if we get here, the keys are equal */
 
1601
        return true;
 
1602
}