~vcs-imports/mammoth-replicator/trunk

« back to all changes in this revision

Viewing changes to src/backend/access/rtree/rtree.c

  • Committer: alvherre
  • Date: 2005-12-16 21:24:52 UTC
  • Revision ID: svn-v4:db760fc0-0f08-0410-9d63-cc6633f64896:trunk:1
Initial import of the REL8_0_3 sources from the Pgsql CVS repository.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*-------------------------------------------------------------------------
 
2
 *
 
3
 * rtree.c
 
4
 *        interface routines for the postgres rtree indexed access method.
 
5
 *
 
6
 * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
 
7
 * Portions Copyright (c) 1994, Regents of the University of California
 
8
 *
 
9
 *
 
10
 * IDENTIFICATION
 
11
 *        $PostgreSQL: pgsql/src/backend/access/rtree/rtree.c,v 1.85.4.1 2005-01-24 02:47:52 tgl Exp $
 
12
 *
 
13
 *-------------------------------------------------------------------------
 
14
 */
 
15
 
 
16
#include "postgres.h"
 
17
 
 
18
#include "access/genam.h"
 
19
#include "access/heapam.h"
 
20
#include "access/rtree.h"
 
21
#include "access/xlogutils.h"
 
22
#include "catalog/index.h"
 
23
#include "commands/vacuum.h"
 
24
#include "executor/executor.h"
 
25
#include "miscadmin.h"
 
26
 
 
27
 
 
28
/*
 
29
 * XXX We assume that all datatypes indexable in rtrees are pass-by-reference.
 
30
 * To fix this, you'd need to improve the IndexTupleGetDatum() macro, and
 
31
 * do something with the various datum-pfreeing code.  However, it's not that
 
32
 * unreasonable an assumption in practice.
 
33
 */
 
34
#define IndexTupleGetDatum(itup)  \
 
35
        PointerGetDatum(((char *) (itup)) + sizeof(IndexTupleData))
 
36
 
 
37
/*
 
38
 * Space-allocation macros.  Note we count the item's line pointer in its size.
 
39
 */
 
40
#define RTPageAvailSpace  \
 
41
        (BLCKSZ - (sizeof(PageHeaderData) - sizeof(ItemIdData)) \
 
42
         - MAXALIGN(sizeof(RTreePageOpaqueData)))
 
43
#define IndexTupleTotalSize(itup)  \
 
44
        (MAXALIGN(IndexTupleSize(itup)) + sizeof(ItemIdData))
 
45
#define IndexTupleAttSize(itup)  \
 
46
        (IndexTupleSize(itup) - sizeof(IndexTupleData))
 
47
 
 
48
/* results of rtpicksplit() */
 
49
typedef struct SPLITVEC
 
50
{
 
51
        OffsetNumber *spl_left;
 
52
        int                     spl_nleft;
 
53
        Datum           spl_ldatum;
 
54
        OffsetNumber *spl_right;
 
55
        int                     spl_nright;
 
56
        Datum           spl_rdatum;
 
57
} SPLITVEC;
 
58
 
 
59
/* for sorting tuples by cost, for picking split */
 
60
typedef struct SPLITCOST
 
61
{
 
62
        OffsetNumber offset_number;
 
63
        float           cost_differential;
 
64
        bool            choose_left;
 
65
} SPLITCOST;
 
66
 
 
67
typedef struct RTSTATE
 
68
{
 
69
        FmgrInfo        unionFn;                /* union function */
 
70
        FmgrInfo        sizeFn;                 /* size function */
 
71
        FmgrInfo        interFn;                /* intersection function */
 
72
} RTSTATE;
 
73
 
 
74
/* Working state for rtbuild and its callback */
 
75
typedef struct
 
76
{
 
77
        RTSTATE         rtState;
 
78
        double          indtuples;
 
79
} RTBuildState;
 
80
 
 
81
/* non-export function prototypes */
 
82
static void rtbuildCallback(Relation index,
 
83
                                HeapTuple htup,
 
84
                                Datum *attdata,
 
85
                                char *nulls,
 
86
                                bool tupleIsAlive,
 
87
                                void *state);
 
88
static InsertIndexResult rtdoinsert(Relation r, IndexTuple itup,
 
89
                   RTSTATE *rtstate);
 
90
static void rttighten(Relation r, RTSTACK *stk, Datum datum, int att_size,
 
91
                  RTSTATE *rtstate);
 
92
static InsertIndexResult rtdosplit(Relation r, Buffer buffer, RTSTACK *stack,
 
93
                  IndexTuple itup, RTSTATE *rtstate);
 
94
static void rtintinsert(Relation r, RTSTACK *stk, IndexTuple ltup,
 
95
                        IndexTuple rtup, RTSTATE *rtstate);
 
96
static void rtnewroot(Relation r, IndexTuple lt, IndexTuple rt);
 
97
static void rtpicksplit(Relation r, Page page, SPLITVEC *v, IndexTuple itup,
 
98
                        RTSTATE *rtstate);
 
99
static void RTInitBuffer(Buffer b, uint32 f);
 
100
static OffsetNumber choose(Relation r, Page p, IndexTuple it,
 
101
           RTSTATE *rtstate);
 
102
static int      nospace(Page p, IndexTuple it);
 
103
static void initRtstate(RTSTATE *rtstate, Relation index);
 
104
static int      qsort_comp_splitcost(const void *a, const void *b);
 
105
 
 
106
 
 
107
/*
 
108
 * routine to build an index.  Basically calls insert over and over
 
109
 */
 
110
Datum
 
111
rtbuild(PG_FUNCTION_ARGS)
 
112
{
 
113
        Relation        heap = (Relation) PG_GETARG_POINTER(0);
 
114
        Relation        index = (Relation) PG_GETARG_POINTER(1);
 
115
        IndexInfo  *indexInfo = (IndexInfo *) PG_GETARG_POINTER(2);
 
116
        double          reltuples;
 
117
        RTBuildState buildstate;
 
118
        Buffer          buffer;
 
119
 
 
120
        /* no locking is needed */
 
121
 
 
122
        initRtstate(&buildstate.rtState, index);
 
123
 
 
124
        /*
 
125
         * We expect to be called exactly once for any index relation. If
 
126
         * that's not the case, big trouble's what we have.
 
127
         */
 
128
        if (RelationGetNumberOfBlocks(index) != 0)
 
129
                elog(ERROR, "index \"%s\" already contains data",
 
130
                         RelationGetRelationName(index));
 
131
 
 
132
        /* initialize the root page */
 
133
        buffer = ReadBuffer(index, P_NEW);
 
134
        RTInitBuffer(buffer, F_LEAF);
 
135
        WriteBuffer(buffer);
 
136
 
 
137
        /* build the index */
 
138
        buildstate.indtuples = 0;
 
139
 
 
140
        /* do the heap scan */
 
141
        reltuples = IndexBuildHeapScan(heap, index, indexInfo,
 
142
                                                                   rtbuildCallback, (void *) &buildstate);
 
143
 
 
144
        /* okay, all heap tuples are indexed */
 
145
 
 
146
        /*
 
147
         * Since we just counted the tuples in the heap, we update its stats
 
148
         * in pg_class to guarantee that the planner takes advantage of the
 
149
         * index we just created.  But, only update statistics during normal
 
150
         * index definitions, not for indices on system catalogs created
 
151
         * during bootstrap processing.  We must close the relations before
 
152
         * updating statistics to guarantee that the relcache entries are
 
153
         * flushed when we increment the command counter in UpdateStats(). But
 
154
         * we do not release any locks on the relations; those will be held
 
155
         * until end of transaction.
 
156
         */
 
157
        if (IsNormalProcessingMode())
 
158
        {
 
159
                Oid                     hrelid = RelationGetRelid(heap);
 
160
                Oid                     irelid = RelationGetRelid(index);
 
161
 
 
162
                heap_close(heap, NoLock);
 
163
                index_close(index);
 
164
                UpdateStats(hrelid, reltuples);
 
165
                UpdateStats(irelid, buildstate.indtuples);
 
166
        }
 
167
 
 
168
        PG_RETURN_VOID();
 
169
}
 
170
 
 
171
/*
 
172
 * Per-tuple callback from IndexBuildHeapScan
 
173
 */
 
174
static void
 
175
rtbuildCallback(Relation index,
 
176
                                HeapTuple htup,
 
177
                                Datum *attdata,
 
178
                                char *nulls,
 
179
                                bool tupleIsAlive,
 
180
                                void *state)
 
181
{
 
182
        RTBuildState *buildstate = (RTBuildState *) state;
 
183
        IndexTuple      itup;
 
184
        InsertIndexResult res;
 
185
 
 
186
        /* form an index tuple and point it at the heap tuple */
 
187
        itup = index_formtuple(RelationGetDescr(index), attdata, nulls);
 
188
        itup->t_tid = htup->t_self;
 
189
 
 
190
        /* rtree indexes don't index nulls, see notes in rtinsert */
 
191
        if (IndexTupleHasNulls(itup))
 
192
        {
 
193
                pfree(itup);
 
194
                return;
 
195
        }
 
196
 
 
197
        /*
 
198
         * Since we already have the index relation locked, we call rtdoinsert
 
199
         * directly.  Normal access method calls dispatch through rtinsert,
 
200
         * which locks the relation for write.  This is the right thing to do
 
201
         * if you're inserting single tups, but not when you're initializing
 
202
         * the whole index at once.
 
203
         */
 
204
        res = rtdoinsert(index, itup, &buildstate->rtState);
 
205
 
 
206
        if (res)
 
207
                pfree(res);
 
208
 
 
209
        buildstate->indtuples += 1;
 
210
 
 
211
        pfree(itup);
 
212
}
 
213
 
 
214
/*
 
215
 *      rtinsert -- wrapper for rtree tuple insertion.
 
216
 *
 
217
 *        This is the public interface routine for tuple insertion in rtrees.
 
218
 *        It doesn't do any work; just locks the relation and passes the buck.
 
219
 */
 
220
Datum
 
221
rtinsert(PG_FUNCTION_ARGS)
 
222
{
 
223
        Relation        r = (Relation) PG_GETARG_POINTER(0);
 
224
        Datum      *datum = (Datum *) PG_GETARG_POINTER(1);
 
225
        char       *nulls = (char *) PG_GETARG_POINTER(2);
 
226
        ItemPointer ht_ctid = (ItemPointer) PG_GETARG_POINTER(3);
 
227
 
 
228
#ifdef NOT_USED
 
229
        Relation        heapRel = (Relation) PG_GETARG_POINTER(4);
 
230
        bool            checkUnique = PG_GETARG_BOOL(5);
 
231
#endif
 
232
        InsertIndexResult res;
 
233
        IndexTuple      itup;
 
234
        RTSTATE         rtState;
 
235
 
 
236
        /* generate an index tuple */
 
237
        itup = index_formtuple(RelationGetDescr(r), datum, nulls);
 
238
        itup->t_tid = *ht_ctid;
 
239
 
 
240
        /*
 
241
         * Currently, rtrees do not support indexing NULLs; considerable
 
242
         * infrastructure work would have to be done to do anything reasonable
 
243
         * with a NULL.
 
244
         */
 
245
        if (IndexTupleHasNulls(itup))
 
246
        {
 
247
                pfree(itup);
 
248
                PG_RETURN_POINTER(NULL);
 
249
        }
 
250
 
 
251
        initRtstate(&rtState, r);
 
252
 
 
253
        /*
 
254
         * Since rtree is not marked "amconcurrent" in pg_am, caller should
 
255
         * have acquired exclusive lock on index relation.      We need no locking
 
256
         * here.
 
257
         */
 
258
 
 
259
        res = rtdoinsert(r, itup, &rtState);
 
260
 
 
261
        PG_RETURN_POINTER(res);
 
262
}
 
263
 
 
264
static InsertIndexResult
 
265
rtdoinsert(Relation r, IndexTuple itup, RTSTATE *rtstate)
 
266
{
 
267
        Page            page;
 
268
        Buffer          buffer;
 
269
        BlockNumber blk;
 
270
        IndexTuple      which;
 
271
        OffsetNumber l;
 
272
        RTSTACK    *stack;
 
273
        InsertIndexResult res;
 
274
        RTreePageOpaque opaque;
 
275
        Datum           datum;
 
276
 
 
277
        blk = P_ROOT;
 
278
        buffer = InvalidBuffer;
 
279
        stack = NULL;
 
280
 
 
281
        do
 
282
        {
 
283
                /* let go of current buffer before getting next */
 
284
                if (buffer != InvalidBuffer)
 
285
                        ReleaseBuffer(buffer);
 
286
 
 
287
                /* get next buffer */
 
288
                buffer = ReadBuffer(r, blk);
 
289
                page = (Page) BufferGetPage(buffer);
 
290
 
 
291
                opaque = (RTreePageOpaque) PageGetSpecialPointer(page);
 
292
                if (!(opaque->flags & F_LEAF))
 
293
                {
 
294
                        RTSTACK    *n;
 
295
                        ItemId          iid;
 
296
 
 
297
                        n = (RTSTACK *) palloc(sizeof(RTSTACK));
 
298
                        n->rts_parent = stack;
 
299
                        n->rts_blk = blk;
 
300
                        n->rts_child = choose(r, page, itup, rtstate);
 
301
                        stack = n;
 
302
 
 
303
                        iid = PageGetItemId(page, n->rts_child);
 
304
                        which = (IndexTuple) PageGetItem(page, iid);
 
305
                        blk = ItemPointerGetBlockNumber(&(which->t_tid));
 
306
                }
 
307
        } while (!(opaque->flags & F_LEAF));
 
308
 
 
309
        if (nospace(page, itup))
 
310
        {
 
311
                /* need to do a split */
 
312
                res = rtdosplit(r, buffer, stack, itup, rtstate);
 
313
                freestack(stack);
 
314
                WriteBuffer(buffer);    /* don't forget to release buffer! */
 
315
                return res;
 
316
        }
 
317
 
 
318
        /* add the item and write the buffer */
 
319
        if (PageIsEmpty(page))
 
320
        {
 
321
                l = PageAddItem(page, (Item) itup, IndexTupleSize(itup),
 
322
                                                FirstOffsetNumber,
 
323
                                                LP_USED);
 
324
        }
 
325
        else
 
326
        {
 
327
                l = PageAddItem(page, (Item) itup, IndexTupleSize(itup),
 
328
                                                OffsetNumberNext(PageGetMaxOffsetNumber(page)),
 
329
                                                LP_USED);
 
330
        }
 
331
        if (l == InvalidOffsetNumber)
 
332
                elog(ERROR, "failed to add index item to \"%s\"",
 
333
                         RelationGetRelationName(r));
 
334
 
 
335
        WriteBuffer(buffer);
 
336
 
 
337
        datum = IndexTupleGetDatum(itup);
 
338
 
 
339
        /* now expand the page boundary in the parent to include the new child */
 
340
        rttighten(r, stack, datum, IndexTupleAttSize(itup), rtstate);
 
341
        freestack(stack);
 
342
 
 
343
        /* build and return an InsertIndexResult for this insertion */
 
344
        res = (InsertIndexResult) palloc(sizeof(InsertIndexResultData));
 
345
        ItemPointerSet(&(res->pointerData), blk, l);
 
346
 
 
347
        return res;
 
348
}
 
349
 
 
350
static void
 
351
rttighten(Relation r,
 
352
                  RTSTACK *stk,
 
353
                  Datum datum,
 
354
                  int att_size,
 
355
                  RTSTATE *rtstate)
 
356
{
 
357
        Datum           oldud;
 
358
        Datum           tdatum;
 
359
        Page            p;
 
360
        float           old_size,
 
361
                                newd_size;
 
362
        Buffer          b;
 
363
 
 
364
        if (stk == NULL)
 
365
                return;
 
366
 
 
367
        b = ReadBuffer(r, stk->rts_blk);
 
368
        p = BufferGetPage(b);
 
369
 
 
370
        oldud = IndexTupleGetDatum(PageGetItem(p,
 
371
                                                                          PageGetItemId(p, stk->rts_child)));
 
372
 
 
373
        FunctionCall2(&rtstate->sizeFn, oldud,
 
374
                                  PointerGetDatum(&old_size));
 
375
 
 
376
        datum = FunctionCall2(&rtstate->unionFn, oldud, datum);
 
377
 
 
378
        FunctionCall2(&rtstate->sizeFn, datum,
 
379
                                  PointerGetDatum(&newd_size));
 
380
 
 
381
        /*
 
382
         * If newd_size == 0 we have degenerate rectangles, so we don't know
 
383
         * if there was any change, so we have to assume there was.
 
384
         */
 
385
        if ((newd_size == 0) || (newd_size != old_size))
 
386
        {
 
387
                TupleDesc       td = RelationGetDescr(r);
 
388
 
 
389
                if (td->attrs[0]->attlen < 0)
 
390
                {
 
391
                        /*
 
392
                         * This is an internal page, so 'oldud' had better be a union
 
393
                         * (constant-length) key, too.  (See comment below.)
 
394
                         */
 
395
                        Assert(VARSIZE(DatumGetPointer(datum)) ==
 
396
                                   VARSIZE(DatumGetPointer(oldud)));
 
397
                        memmove(DatumGetPointer(oldud), DatumGetPointer(datum),
 
398
                                        VARSIZE(DatumGetPointer(datum)));
 
399
                }
 
400
                else
 
401
                {
 
402
                        memmove(DatumGetPointer(oldud), DatumGetPointer(datum),
 
403
                                        att_size);
 
404
                }
 
405
                WriteBuffer(b);
 
406
 
 
407
                /*
 
408
                 * The user may be defining an index on variable-sized data (like
 
409
                 * polygons).  If so, we need to get a constant-sized datum for
 
410
                 * insertion on the internal page.      We do this by calling the
 
411
                 * union proc, which is required to return a rectangle.
 
412
                 */
 
413
                tdatum = FunctionCall2(&rtstate->unionFn, datum, datum);
 
414
 
 
415
                rttighten(r, stk->rts_parent, tdatum, att_size, rtstate);
 
416
                pfree(DatumGetPointer(tdatum));
 
417
        }
 
418
        else
 
419
                ReleaseBuffer(b);
 
420
        pfree(DatumGetPointer(datum));
 
421
}
 
422
 
 
423
/*
 
424
 *      rtdosplit -- split a page in the tree.
 
425
 *
 
426
 *        rtpicksplit does the interesting work of choosing the split.
 
427
 *        This routine just does the bit-pushing.
 
428
 */
 
429
static InsertIndexResult
 
430
rtdosplit(Relation r,
 
431
                  Buffer buffer,
 
432
                  RTSTACK *stack,
 
433
                  IndexTuple itup,
 
434
                  RTSTATE *rtstate)
 
435
{
 
436
        Page            p;
 
437
        Buffer          leftbuf,
 
438
                                rightbuf;
 
439
        Page            left,
 
440
                                right;
 
441
        ItemId          itemid;
 
442
        IndexTuple      item;
 
443
        IndexTuple      ltup,
 
444
                                rtup;
 
445
        OffsetNumber maxoff;
 
446
        OffsetNumber i;
 
447
        OffsetNumber leftoff,
 
448
                                rightoff;
 
449
        BlockNumber lbknum,
 
450
                                rbknum;
 
451
        BlockNumber bufblock;
 
452
        RTreePageOpaque opaque;
 
453
        int                     blank;
 
454
        InsertIndexResult res;
 
455
        char       *isnull;
 
456
        SPLITVEC        v;
 
457
        OffsetNumber *spl_left,
 
458
                           *spl_right;
 
459
        TupleDesc       tupDesc;
 
460
        int                     n;
 
461
        OffsetNumber newitemoff;
 
462
 
 
463
        p = (Page) BufferGetPage(buffer);
 
464
        opaque = (RTreePageOpaque) PageGetSpecialPointer(p);
 
465
 
 
466
        rtpicksplit(r, p, &v, itup, rtstate);
 
467
 
 
468
        /*
 
469
         * The root of the tree is the first block in the relation.  If we're
 
470
         * about to split the root, we need to do some hocus-pocus to enforce
 
471
         * this guarantee.
 
472
         */
 
473
 
 
474
        if (BufferGetBlockNumber(buffer) == P_ROOT)
 
475
        {
 
476
                leftbuf = ReadBuffer(r, P_NEW);
 
477
                RTInitBuffer(leftbuf, opaque->flags);
 
478
                lbknum = BufferGetBlockNumber(leftbuf);
 
479
                left = (Page) BufferGetPage(leftbuf);
 
480
        }
 
481
        else
 
482
        {
 
483
                leftbuf = buffer;
 
484
                IncrBufferRefCount(buffer);
 
485
                lbknum = BufferGetBlockNumber(buffer);
 
486
                left = (Page) PageGetTempPage(p, sizeof(RTreePageOpaqueData));
 
487
        }
 
488
 
 
489
        rightbuf = ReadBuffer(r, P_NEW);
 
490
        RTInitBuffer(rightbuf, opaque->flags);
 
491
        rbknum = BufferGetBlockNumber(rightbuf);
 
492
        right = (Page) BufferGetPage(rightbuf);
 
493
 
 
494
        spl_left = v.spl_left;
 
495
        spl_right = v.spl_right;
 
496
        leftoff = rightoff = FirstOffsetNumber;
 
497
        maxoff = PageGetMaxOffsetNumber(p);
 
498
        newitemoff = OffsetNumberNext(maxoff);
 
499
 
 
500
        /* build an InsertIndexResult for this insertion */
 
501
        res = (InsertIndexResult) palloc(sizeof(InsertIndexResultData));
 
502
 
 
503
        /*
 
504
         * spl_left contains a list of the offset numbers of the tuples that
 
505
         * will go to the left page.  For each offset number, get the tuple
 
506
         * item, then add the item to the left page.  Similarly for the right
 
507
         * side.
 
508
         */
 
509
 
 
510
        /* fill left node */
 
511
        for (n = 0; n < v.spl_nleft; n++)
 
512
        {
 
513
                i = *spl_left;
 
514
                if (i == newitemoff)
 
515
                        item = itup;
 
516
                else
 
517
                {
 
518
                        itemid = PageGetItemId(p, i);
 
519
                        item = (IndexTuple) PageGetItem(p, itemid);
 
520
                }
 
521
 
 
522
                if (PageAddItem(left, (Item) item, IndexTupleSize(item),
 
523
                                                leftoff, LP_USED) == InvalidOffsetNumber)
 
524
                        elog(ERROR, "failed to add index item to \"%s\"",
 
525
                                 RelationGetRelationName(r));
 
526
                leftoff = OffsetNumberNext(leftoff);
 
527
 
 
528
                if (i == newitemoff)
 
529
                        ItemPointerSet(&(res->pointerData), lbknum, leftoff);
 
530
 
 
531
                spl_left++;                             /* advance in left split vector */
 
532
        }
 
533
 
 
534
        /* fill right node */
 
535
        for (n = 0; n < v.spl_nright; n++)
 
536
        {
 
537
                i = *spl_right;
 
538
                if (i == newitemoff)
 
539
                        item = itup;
 
540
                else
 
541
                {
 
542
                        itemid = PageGetItemId(p, i);
 
543
                        item = (IndexTuple) PageGetItem(p, itemid);
 
544
                }
 
545
 
 
546
                if (PageAddItem(right, (Item) item, IndexTupleSize(item),
 
547
                                                rightoff, LP_USED) == InvalidOffsetNumber)
 
548
                        elog(ERROR, "failed to add index item to \"%s\"",
 
549
                                 RelationGetRelationName(r));
 
550
                rightoff = OffsetNumberNext(rightoff);
 
551
 
 
552
                if (i == newitemoff)
 
553
                        ItemPointerSet(&(res->pointerData), rbknum, rightoff);
 
554
 
 
555
                spl_right++;                    /* advance in right split vector */
 
556
        }
 
557
 
 
558
        /* Make sure we consumed all of the split vectors, and release 'em */
 
559
        Assert(*spl_left == InvalidOffsetNumber);
 
560
        Assert(*spl_right == InvalidOffsetNumber);
 
561
        pfree(v.spl_left);
 
562
        pfree(v.spl_right);
 
563
 
 
564
        if ((bufblock = BufferGetBlockNumber(buffer)) != P_ROOT)
 
565
                PageRestoreTempPage(left, p);
 
566
        WriteBuffer(leftbuf);
 
567
        WriteBuffer(rightbuf);
 
568
 
 
569
        /*
 
570
         * Okay, the page is split.  We have three things left to do:
 
571
         *
 
572
         * 1)  Adjust any active scans on this index to cope with changes we
 
573
         * introduced in its structure by splitting this page.
 
574
         *
 
575
         * 2)  "Tighten" the bounding box of the pointer to the left page in the
 
576
         * parent node in the tree, if any.  Since we moved a bunch of stuff
 
577
         * off the left page, we expect it to get smaller.      This happens in
 
578
         * the internal insertion routine.
 
579
         *
 
580
         * 3)  Insert a pointer to the right page in the parent.  This may cause
 
581
         * the parent to split.  If it does, we need to repeat steps one and
 
582
         * two for each split node in the tree.
 
583
         */
 
584
 
 
585
        /* adjust active scans */
 
586
        rtadjscans(r, RTOP_SPLIT, bufblock, FirstOffsetNumber);
 
587
 
 
588
        tupDesc = r->rd_att;
 
589
        isnull = (char *) palloc(r->rd_rel->relnatts);
 
590
        for (blank = 0; blank < r->rd_rel->relnatts; blank++)
 
591
                isnull[blank] = ' ';
 
592
 
 
593
        ltup = (IndexTuple) index_formtuple(tupDesc,
 
594
                                                                                &(v.spl_ldatum), isnull);
 
595
        rtup = (IndexTuple) index_formtuple(tupDesc,
 
596
                                                                                &(v.spl_rdatum), isnull);
 
597
        pfree(isnull);
 
598
        pfree(DatumGetPointer(v.spl_ldatum));
 
599
        pfree(DatumGetPointer(v.spl_rdatum));
 
600
 
 
601
        /* set pointers to new child pages in the internal index tuples */
 
602
        ItemPointerSet(&(ltup->t_tid), lbknum, 1);
 
603
        ItemPointerSet(&(rtup->t_tid), rbknum, 1);
 
604
 
 
605
        rtintinsert(r, stack, ltup, rtup, rtstate);
 
606
 
 
607
        pfree(ltup);
 
608
        pfree(rtup);
 
609
 
 
610
        return res;
 
611
}
 
612
 
 
613
static void
 
614
rtintinsert(Relation r,
 
615
                        RTSTACK *stk,
 
616
                        IndexTuple ltup,
 
617
                        IndexTuple rtup,
 
618
                        RTSTATE *rtstate)
 
619
{
 
620
        IndexTuple      old;
 
621
        Buffer          b;
 
622
        Page            p;
 
623
        Datum           ldatum,
 
624
                                rdatum,
 
625
                                newdatum;
 
626
        InsertIndexResult res;
 
627
 
 
628
        if (stk == NULL)
 
629
        {
 
630
                rtnewroot(r, ltup, rtup);
 
631
                return;
 
632
        }
 
633
 
 
634
        b = ReadBuffer(r, stk->rts_blk);
 
635
        p = BufferGetPage(b);
 
636
        old = (IndexTuple) PageGetItem(p, PageGetItemId(p, stk->rts_child));
 
637
 
 
638
        /*
 
639
         * This is a hack.      Right now, we force rtree internal keys to be
 
640
         * constant size. To fix this, need delete the old key and add both
 
641
         * left and right for the two new pages.  The insertion of left may
 
642
         * force a split if the new left key is bigger than the old key.
 
643
         */
 
644
 
 
645
        if (IndexTupleSize(old) != IndexTupleSize(ltup))
 
646
                ereport(ERROR,
 
647
                                (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
 
648
                                 errmsg("variable-length rtree keys are not supported")));
 
649
 
 
650
        /* install pointer to left child */
 
651
        memmove(old, ltup, IndexTupleSize(ltup));
 
652
 
 
653
        if (nospace(p, rtup))
 
654
        {
 
655
                newdatum = IndexTupleGetDatum(ltup);
 
656
                rttighten(r, stk->rts_parent, newdatum,
 
657
                                  IndexTupleAttSize(ltup), rtstate);
 
658
                res = rtdosplit(r, b, stk->rts_parent, rtup, rtstate);
 
659
                WriteBuffer(b);                 /* don't forget to release buffer!  -
 
660
                                                                 * 01/31/94 */
 
661
                pfree(res);
 
662
        }
 
663
        else
 
664
        {
 
665
                if (PageAddItem(p, (Item) rtup, IndexTupleSize(rtup),
 
666
                                                PageGetMaxOffsetNumber(p),
 
667
                                                LP_USED) == InvalidOffsetNumber)
 
668
                        elog(ERROR, "failed to add index item to \"%s\"",
 
669
                                 RelationGetRelationName(r));
 
670
                WriteBuffer(b);
 
671
                ldatum = IndexTupleGetDatum(ltup);
 
672
                rdatum = IndexTupleGetDatum(rtup);
 
673
                newdatum = FunctionCall2(&rtstate->unionFn, ldatum, rdatum);
 
674
 
 
675
                rttighten(r, stk->rts_parent, newdatum,
 
676
                                  IndexTupleAttSize(rtup), rtstate);
 
677
 
 
678
                pfree(DatumGetPointer(newdatum));
 
679
        }
 
680
}
 
681
 
 
682
static void
 
683
rtnewroot(Relation r, IndexTuple lt, IndexTuple rt)
 
684
{
 
685
        Buffer          b;
 
686
        Page            p;
 
687
 
 
688
        b = ReadBuffer(r, P_ROOT);
 
689
        RTInitBuffer(b, 0);
 
690
        p = BufferGetPage(b);
 
691
        if (PageAddItem(p, (Item) lt, IndexTupleSize(lt),
 
692
                                        FirstOffsetNumber,
 
693
                                        LP_USED) == InvalidOffsetNumber)
 
694
                elog(ERROR, "failed to add index item to \"%s\"",
 
695
                         RelationGetRelationName(r));
 
696
        if (PageAddItem(p, (Item) rt, IndexTupleSize(rt),
 
697
                                        OffsetNumberNext(FirstOffsetNumber),
 
698
                                        LP_USED) == InvalidOffsetNumber)
 
699
                elog(ERROR, "failed to add index item to \"%s\"",
 
700
                         RelationGetRelationName(r));
 
701
        WriteBuffer(b);
 
702
}
 
703
 
 
704
/*
 
705
 * Choose how to split an rtree page into two pages.
 
706
 *
 
707
 * We return two vectors of index item numbers, one for the items to be
 
708
 * put on the left page, one for the items to be put on the right page.
 
709
 * In addition, the item to be added (itup) is listed in the appropriate
 
710
 * vector.      It is represented by item number N+1 (N = # of items on page).
 
711
 *
 
712
 * Both vectors have a terminating sentinel value of InvalidOffsetNumber,
 
713
 * but the sentinal value is no longer used, because the SPLITVEC
 
714
 * vector also contains the length of each vector, and that information
 
715
 * is now used to iterate over them in rtdosplit(). --kbb, 21 Sept 2001
 
716
 *
 
717
 * The bounding-box datums for the two new pages are also returned in *v.
 
718
 *
 
719
 * This is the quadratic-cost split algorithm Guttman describes in
 
720
 * his paper.  The reason we chose it is that you can implement this
 
721
 * with less information about the data types on which you're operating.
 
722
 *
 
723
 * We must also deal with a consideration not found in Guttman's algorithm:
 
724
 * variable-length data.  In particular, the incoming item might be
 
725
 * large enough that not just any split will work.      In the worst case,
 
726
 * our "split" may have to be the new item on one page and all the existing
 
727
 * items on the other.  Short of that, we have to take care that we do not
 
728
 * make a split that leaves both pages too full for the new item.
 
729
 */
 
730
static void
 
731
rtpicksplit(Relation r,
 
732
                        Page page,
 
733
                        SPLITVEC *v,
 
734
                        IndexTuple itup,
 
735
                        RTSTATE *rtstate)
 
736
{
 
737
        OffsetNumber maxoff,
 
738
                                newitemoff;
 
739
        OffsetNumber i,
 
740
                                j;
 
741
        IndexTuple      item_1,
 
742
                                item_2;
 
743
        Datum           datum_alpha,
 
744
                                datum_beta;
 
745
        Datum           datum_l,
 
746
                                datum_r;
 
747
        Datum           union_d,
 
748
                                union_dl,
 
749
                                union_dr;
 
750
        Datum           inter_d;
 
751
        bool            firsttime;
 
752
        float           size_alpha,
 
753
                                size_beta,
 
754
                                size_union,
 
755
                                size_inter;
 
756
        float           size_waste,
 
757
                                waste;
 
758
        float           size_l,
 
759
                                size_r;
 
760
        int                     nbytes;
 
761
        OffsetNumber seed_1 = 0,
 
762
                                seed_2 = 0;
 
763
        OffsetNumber *left,
 
764
                           *right;
 
765
        Size            newitemsz,
 
766
                                item_1_sz,
 
767
                                item_2_sz,
 
768
                                left_avail_space,
 
769
                                right_avail_space;
 
770
        int                     total_num_tuples,
 
771
                                num_tuples_without_seeds,
 
772
                                max_after_split;        /* in Guttman's lingo, (M - m) */
 
773
        float           diff;                   /* diff between cost of putting tuple left
 
774
                                                                 * or right */
 
775
        SPLITCOST  *cost_vector;
 
776
        int                     n;
 
777
 
 
778
        /*
 
779
         * First, make sure the new item is not so large that we can't
 
780
         * possibly fit it on a page, even by itself.  (It's sufficient to
 
781
         * make this test here, since any oversize tuple must lead to a page
 
782
         * split attempt.)
 
783
         */
 
784
        newitemsz = IndexTupleTotalSize(itup);
 
785
        if (newitemsz > RTPageAvailSpace)
 
786
                ereport(ERROR,
 
787
                                (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
 
788
                                 errmsg("index row size %lu exceeds rtree maximum, %lu",
 
789
                                                (unsigned long) newitemsz,
 
790
                                                (unsigned long) RTPageAvailSpace)));
 
791
 
 
792
        maxoff = PageGetMaxOffsetNumber(page);
 
793
        newitemoff = OffsetNumberNext(maxoff);          /* phony index for new
 
794
                                                                                                 * item */
 
795
        total_num_tuples = newitemoff;
 
796
        num_tuples_without_seeds = total_num_tuples - 2;
 
797
        max_after_split = total_num_tuples / 2;         /* works for m = M/2 */
 
798
 
 
799
        /* Make arrays big enough for worst case, including sentinel */
 
800
        nbytes = (maxoff + 2) * sizeof(OffsetNumber);
 
801
        v->spl_left = (OffsetNumber *) palloc(nbytes);
 
802
        v->spl_right = (OffsetNumber *) palloc(nbytes);
 
803
 
 
804
        firsttime = true;
 
805
        waste = 0.0;
 
806
 
 
807
        for (i = FirstOffsetNumber; i < maxoff; i = OffsetNumberNext(i))
 
808
        {
 
809
                item_1 = (IndexTuple) PageGetItem(page, PageGetItemId(page, i));
 
810
                datum_alpha = IndexTupleGetDatum(item_1);
 
811
                item_1_sz = IndexTupleTotalSize(item_1);
 
812
 
 
813
                for (j = OffsetNumberNext(i); j <= maxoff; j = OffsetNumberNext(j))
 
814
                {
 
815
                        item_2 = (IndexTuple) PageGetItem(page, PageGetItemId(page, j));
 
816
                        datum_beta = IndexTupleGetDatum(item_2);
 
817
                        item_2_sz = IndexTupleTotalSize(item_2);
 
818
 
 
819
                        /*
 
820
                         * Ignore seed pairs that don't leave room for the new item on
 
821
                         * either split page.
 
822
                         */
 
823
                        if (newitemsz + item_1_sz > RTPageAvailSpace &&
 
824
                                newitemsz + item_2_sz > RTPageAvailSpace)
 
825
                                continue;
 
826
 
 
827
                        /* compute the wasted space by unioning these guys */
 
828
                        union_d = FunctionCall2(&rtstate->unionFn,
 
829
                                                                        datum_alpha, datum_beta);
 
830
                        FunctionCall2(&rtstate->sizeFn, union_d,
 
831
                                                  PointerGetDatum(&size_union));
 
832
                        inter_d = FunctionCall2(&rtstate->interFn,
 
833
                                                                        datum_alpha, datum_beta);
 
834
 
 
835
                        /*
 
836
                         * The interFn may return a NULL pointer (not an SQL null!) to
 
837
                         * indicate no intersection.  sizeFn must cope with this.
 
838
                         */
 
839
                        FunctionCall2(&rtstate->sizeFn, inter_d,
 
840
                                                  PointerGetDatum(&size_inter));
 
841
                        size_waste = size_union - size_inter;
 
842
 
 
843
                        if (DatumGetPointer(union_d) != NULL)
 
844
                                pfree(DatumGetPointer(union_d));
 
845
                        if (DatumGetPointer(inter_d) != NULL)
 
846
                                pfree(DatumGetPointer(inter_d));
 
847
 
 
848
                        /*
 
849
                         * are these a more promising split that what we've already
 
850
                         * seen?
 
851
                         */
 
852
                        if (size_waste > waste || firsttime)
 
853
                        {
 
854
                                waste = size_waste;
 
855
                                seed_1 = i;
 
856
                                seed_2 = j;
 
857
                                firsttime = false;
 
858
                        }
 
859
                }
 
860
        }
 
861
 
 
862
        if (firsttime)
 
863
        {
 
864
                /*
 
865
                 * There is no possible split except to put the new item on its
 
866
                 * own page.  Since we still have to compute the union rectangles,
 
867
                 * we play dumb and run through the split algorithm anyway,
 
868
                 * setting seed_1 = first item on page and seed_2 = new item.
 
869
                 */
 
870
                seed_1 = FirstOffsetNumber;
 
871
                seed_2 = newitemoff;
 
872
        }
 
873
 
 
874
        item_1 = (IndexTuple) PageGetItem(page, PageGetItemId(page, seed_1));
 
875
        datum_alpha = IndexTupleGetDatum(item_1);
 
876
        datum_l = FunctionCall2(&rtstate->unionFn, datum_alpha, datum_alpha);
 
877
        FunctionCall2(&rtstate->sizeFn, datum_l, PointerGetDatum(&size_l));
 
878
        left_avail_space = RTPageAvailSpace - IndexTupleTotalSize(item_1);
 
879
 
 
880
        if (seed_2 == newitemoff)
 
881
        {
 
882
                item_2 = itup;
 
883
                /* Needn't leave room for new item in calculations below */
 
884
                newitemsz = 0;
 
885
        }
 
886
        else
 
887
                item_2 = (IndexTuple) PageGetItem(page, PageGetItemId(page, seed_2));
 
888
        datum_beta = IndexTupleGetDatum(item_2);
 
889
        datum_r = FunctionCall2(&rtstate->unionFn, datum_beta, datum_beta);
 
890
        FunctionCall2(&rtstate->sizeFn, datum_r, PointerGetDatum(&size_r));
 
891
        right_avail_space = RTPageAvailSpace - IndexTupleTotalSize(item_2);
 
892
 
 
893
        /*
 
894
         * Now split up the regions between the two seeds.
 
895
         *
 
896
         * The cost_vector array will contain hints for determining where each
 
897
         * tuple should go.  Each record in the array will contain a boolean,
 
898
         * choose_left, that indicates which node the tuple prefers to be on,
 
899
         * and the absolute difference in cost between putting the tuple in
 
900
         * its favored node and in the other node.
 
901
         *
 
902
         * Later, we will sort the cost_vector in descending order by cost
 
903
         * difference, and consider the tuples in that order for placement.
 
904
         * That way, the tuples that *really* want to be in one node or the
 
905
         * other get to choose first, and the tuples that don't really care
 
906
         * choose last.
 
907
         *
 
908
         * First, build the cost_vector array.  The new index tuple will also be
 
909
         * handled in this loop, and represented in the array, with
 
910
         * i==newitemoff.
 
911
         *
 
912
         * In the case of variable size tuples it is possible that we only have
 
913
         * the two seeds and no other tuples, in which case we don't do any of
 
914
         * this cost_vector stuff.
 
915
         */
 
916
 
 
917
        /* to keep compiler quiet */
 
918
        cost_vector = NULL;
 
919
 
 
920
        if (num_tuples_without_seeds > 0)
 
921
        {
 
922
                cost_vector =
 
923
                        (SPLITCOST *) palloc(num_tuples_without_seeds * sizeof(SPLITCOST));
 
924
                n = 0;
 
925
                for (i = FirstOffsetNumber; i <= newitemoff; i = OffsetNumberNext(i))
 
926
                {
 
927
                        /* Compute new union datums and sizes for both choices */
 
928
 
 
929
                        if ((i == seed_1) || (i == seed_2))
 
930
                                continue;
 
931
                        else if (i == newitemoff)
 
932
                                item_1 = itup;
 
933
                        else
 
934
                                item_1 = (IndexTuple) PageGetItem(page, PageGetItemId(page, i));
 
935
 
 
936
                        datum_alpha = IndexTupleGetDatum(item_1);
 
937
                        union_dl = FunctionCall2(&rtstate->unionFn, datum_l, datum_alpha);
 
938
                        union_dr = FunctionCall2(&rtstate->unionFn, datum_r, datum_alpha);
 
939
                        FunctionCall2(&rtstate->sizeFn, union_dl,
 
940
                                                  PointerGetDatum(&size_alpha));
 
941
                        FunctionCall2(&rtstate->sizeFn, union_dr,
 
942
                                                  PointerGetDatum(&size_beta));
 
943
                        pfree(DatumGetPointer(union_dl));
 
944
                        pfree(DatumGetPointer(union_dr));
 
945
 
 
946
                        diff = (size_alpha - size_l) - (size_beta - size_r);
 
947
 
 
948
                        cost_vector[n].offset_number = i;
 
949
                        cost_vector[n].cost_differential = fabs(diff);
 
950
                        cost_vector[n].choose_left = (diff < 0);
 
951
 
 
952
                        n++;
 
953
                }
 
954
 
 
955
                /*
 
956
                 * Sort the array.      The function qsort_comp_splitcost is set up
 
957
                 * "backwards", to provided descending order.
 
958
                 */
 
959
                qsort(cost_vector, num_tuples_without_seeds, sizeof(SPLITCOST),
 
960
                          &qsort_comp_splitcost);
 
961
        }
 
962
 
 
963
        /*
 
964
         * Now make the final decisions about where each tuple will go, and
 
965
         * build the vectors to return in the SPLITVEC record.
 
966
         *
 
967
         * The cost_vector array contains (descriptions of) all the tuples, in
 
968
         * the order that we want to consider them, so we we just iterate
 
969
         * through it and place each tuple in left or right nodes, according
 
970
         * to the criteria described below.
 
971
         */
 
972
 
 
973
        left = v->spl_left;
 
974
        v->spl_nleft = 0;
 
975
        right = v->spl_right;
 
976
        v->spl_nright = 0;
 
977
 
 
978
        /*
 
979
         * Place the seeds first. left avail space, left union, right avail
 
980
         * space, and right union have already been adjusted for the seeds.
 
981
         */
 
982
 
 
983
        *left++ = seed_1;
 
984
        v->spl_nleft++;
 
985
 
 
986
        *right++ = seed_2;
 
987
        v->spl_nright++;
 
988
 
 
989
        for (n = 0; n < num_tuples_without_seeds; n++)
 
990
        {
 
991
                bool            left_feasible,
 
992
                                        right_feasible,
 
993
                                        choose_left;
 
994
 
 
995
                /*
 
996
                 * We need to figure out which page needs the least enlargement in
 
997
                 * order to store the item.
 
998
                 */
 
999
 
 
1000
                i = cost_vector[n].offset_number;
 
1001
 
 
1002
                /* Compute new union datums and sizes for both possible additions */
 
1003
                if (i == newitemoff)
 
1004
                {
 
1005
                        item_1 = itup;
 
1006
                        /* Needn't leave room for new item anymore */
 
1007
                        newitemsz = 0;
 
1008
                }
 
1009
                else
 
1010
                        item_1 = (IndexTuple) PageGetItem(page, PageGetItemId(page, i));
 
1011
                item_1_sz = IndexTupleTotalSize(item_1);
 
1012
 
 
1013
                datum_alpha = IndexTupleGetDatum(item_1);
 
1014
                union_dl = FunctionCall2(&rtstate->unionFn, datum_l, datum_alpha);
 
1015
                union_dr = FunctionCall2(&rtstate->unionFn, datum_r, datum_alpha);
 
1016
                FunctionCall2(&rtstate->sizeFn, union_dl,
 
1017
                                          PointerGetDatum(&size_alpha));
 
1018
                FunctionCall2(&rtstate->sizeFn, union_dr,
 
1019
                                          PointerGetDatum(&size_beta));
 
1020
 
 
1021
                /*
 
1022
                 * We prefer the page that shows smaller enlargement of its union
 
1023
                 * area (Guttman's algorithm), but we must take care that at least
 
1024
                 * one page will still have room for the new item after this one
 
1025
                 * is added.
 
1026
                 *
 
1027
                 * (We know that all the old items together can fit on one page, so
 
1028
                 * we need not worry about any other problem than failing to fit
 
1029
                 * the new item.)
 
1030
                 *
 
1031
                 * Guttman's algorithm actually has two factors to consider (in
 
1032
                 * order):      1. if one node has so many tuples already assigned to
 
1033
                 * it that the other needs all the rest in order to satisfy the
 
1034
                 * condition that neither node has fewer than m tuples, then that
 
1035
                 * is decisive; 2. otherwise, choose the page that shows the
 
1036
                 * smaller enlargement of its union area.
 
1037
                 *
 
1038
                 * I have chosen m = M/2, where M is the maximum number of tuples on
 
1039
                 * a page.      (Actually, this is only strictly true for fixed size
 
1040
                 * tuples.      For variable size tuples, there still might have to be
 
1041
                 * only one tuple on a page, if it is really big.  But even with
 
1042
                 * variable size tuples we still try to get m as close as possible
 
1043
                 * to M/2.)
 
1044
                 *
 
1045
                 * The question of which page shows the smaller enlargement of its
 
1046
                 * union area has already been answered, and the answer stored in
 
1047
                 * the choose_left field of the SPLITCOST record.
 
1048
                 */
 
1049
                left_feasible = (left_avail_space >= item_1_sz &&
 
1050
                                                 ((left_avail_space - item_1_sz) >= newitemsz ||
 
1051
                                                  right_avail_space >= newitemsz));
 
1052
                right_feasible = (right_avail_space >= item_1_sz &&
 
1053
                                                  ((right_avail_space - item_1_sz) >= newitemsz ||
 
1054
                                                   left_avail_space >= newitemsz));
 
1055
                if (left_feasible && right_feasible)
 
1056
                {
 
1057
                        /*
 
1058
                         * Both feasible, use Guttman's algorithm. First check the m
 
1059
                         * condition described above, and if that doesn't apply,
 
1060
                         * choose the page with the smaller enlargement of its union
 
1061
                         * area.
 
1062
                         */
 
1063
                        if (v->spl_nleft > max_after_split)
 
1064
                                choose_left = false;
 
1065
                        else if (v->spl_nright > max_after_split)
 
1066
                                choose_left = true;
 
1067
                        else
 
1068
                                choose_left = cost_vector[n].choose_left;
 
1069
                }
 
1070
                else if (left_feasible)
 
1071
                        choose_left = true;
 
1072
                else if (right_feasible)
 
1073
                        choose_left = false;
 
1074
                else
 
1075
                {
 
1076
                        elog(ERROR, "failed to find a workable rtree page split");
 
1077
                        choose_left = false;    /* keep compiler quiet */
 
1078
                }
 
1079
 
 
1080
                if (choose_left)
 
1081
                {
 
1082
                        pfree(DatumGetPointer(datum_l));
 
1083
                        pfree(DatumGetPointer(union_dr));
 
1084
                        datum_l = union_dl;
 
1085
                        size_l = size_alpha;
 
1086
                        left_avail_space -= item_1_sz;
 
1087
                        *left++ = i;
 
1088
                        v->spl_nleft++;
 
1089
                }
 
1090
                else
 
1091
                {
 
1092
                        pfree(DatumGetPointer(datum_r));
 
1093
                        pfree(DatumGetPointer(union_dl));
 
1094
                        datum_r = union_dr;
 
1095
                        size_r = size_beta;
 
1096
                        right_avail_space -= item_1_sz;
 
1097
                        *right++ = i;
 
1098
                        v->spl_nright++;
 
1099
                }
 
1100
        }
 
1101
 
 
1102
        if (num_tuples_without_seeds > 0)
 
1103
                pfree(cost_vector);
 
1104
 
 
1105
        *left = *right = InvalidOffsetNumber;           /* add ending sentinels */
 
1106
 
 
1107
        v->spl_ldatum = datum_l;
 
1108
        v->spl_rdatum = datum_r;
 
1109
}
 
1110
 
 
1111
static void
 
1112
RTInitBuffer(Buffer b, uint32 f)
 
1113
{
 
1114
        RTreePageOpaque opaque;
 
1115
        Page            page;
 
1116
        Size            pageSize;
 
1117
 
 
1118
        pageSize = BufferGetPageSize(b);
 
1119
 
 
1120
        page = BufferGetPage(b);
 
1121
 
 
1122
        PageInit(page, pageSize, sizeof(RTreePageOpaqueData));
 
1123
 
 
1124
        opaque = (RTreePageOpaque) PageGetSpecialPointer(page);
 
1125
        opaque->flags = f;
 
1126
}
 
1127
 
 
1128
static OffsetNumber
 
1129
choose(Relation r, Page p, IndexTuple it, RTSTATE *rtstate)
 
1130
{
 
1131
        OffsetNumber maxoff;
 
1132
        OffsetNumber i;
 
1133
        Datum           ud,
 
1134
                                id;
 
1135
        Datum           datum;
 
1136
        float           usize,
 
1137
                                dsize;
 
1138
        OffsetNumber which;
 
1139
        float           which_grow;
 
1140
 
 
1141
        id = IndexTupleGetDatum(it);
 
1142
        maxoff = PageGetMaxOffsetNumber(p);
 
1143
        which_grow = -1.0;
 
1144
        which = -1;
 
1145
 
 
1146
        for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
 
1147
        {
 
1148
                datum = IndexTupleGetDatum(PageGetItem(p, PageGetItemId(p, i)));
 
1149
                FunctionCall2(&rtstate->sizeFn, datum,
 
1150
                                          PointerGetDatum(&dsize));
 
1151
                ud = FunctionCall2(&rtstate->unionFn, datum, id);
 
1152
                FunctionCall2(&rtstate->sizeFn, ud,
 
1153
                                          PointerGetDatum(&usize));
 
1154
                pfree(DatumGetPointer(ud));
 
1155
                if (which_grow < 0 || usize - dsize < which_grow)
 
1156
                {
 
1157
                        which = i;
 
1158
                        which_grow = usize - dsize;
 
1159
                        if (which_grow == 0)
 
1160
                                break;
 
1161
                }
 
1162
        }
 
1163
 
 
1164
        return which;
 
1165
}
 
1166
 
 
1167
static int
 
1168
nospace(Page p, IndexTuple it)
 
1169
{
 
1170
        return PageGetFreeSpace(p) < IndexTupleSize(it);
 
1171
}
 
1172
 
 
1173
void
 
1174
freestack(RTSTACK *s)
 
1175
{
 
1176
        RTSTACK    *p;
 
1177
 
 
1178
        while (s != NULL)
 
1179
        {
 
1180
                p = s->rts_parent;
 
1181
                pfree(s);
 
1182
                s = p;
 
1183
        }
 
1184
}
 
1185
 
 
1186
/*
 
1187
 * Bulk deletion of all index entries pointing to a set of heap tuples.
 
1188
 * The set of target tuples is specified via a callback routine that tells
 
1189
 * whether any given heap tuple (identified by ItemPointer) is being deleted.
 
1190
 *
 
1191
 * Result: a palloc'd struct containing statistical info for VACUUM displays.
 
1192
 */
 
1193
Datum
 
1194
rtbulkdelete(PG_FUNCTION_ARGS)
 
1195
{
 
1196
        Relation        rel = (Relation) PG_GETARG_POINTER(0);
 
1197
        IndexBulkDeleteCallback callback = (IndexBulkDeleteCallback) PG_GETARG_POINTER(1);
 
1198
        void       *callback_state = (void *) PG_GETARG_POINTER(2);
 
1199
        IndexBulkDeleteResult *result;
 
1200
        BlockNumber num_pages;
 
1201
        double          tuples_removed;
 
1202
        double          num_index_tuples;
 
1203
        IndexScanDesc iscan;
 
1204
 
 
1205
        tuples_removed = 0;
 
1206
        num_index_tuples = 0;
 
1207
 
 
1208
        /*
 
1209
         * Since rtree is not marked "amconcurrent" in pg_am, caller should
 
1210
         * have acquired exclusive lock on index relation.      We need no locking
 
1211
         * here.
 
1212
         */
 
1213
 
 
1214
        /*
 
1215
         * XXX generic implementation --- should be improved!
 
1216
         */
 
1217
 
 
1218
        /* walk through the entire index */
 
1219
        iscan = index_beginscan(NULL, rel, SnapshotAny, 0, NULL);
 
1220
        /* including killed tuples */
 
1221
        iscan->ignore_killed_tuples = false;
 
1222
 
 
1223
        while (index_getnext_indexitem(iscan, ForwardScanDirection))
 
1224
        {
 
1225
                vacuum_delay_point();
 
1226
 
 
1227
                if (callback(&iscan->xs_ctup.t_self, callback_state))
 
1228
                {
 
1229
                        ItemPointerData indextup = iscan->currentItemData;
 
1230
                        BlockNumber blkno;
 
1231
                        OffsetNumber offnum;
 
1232
                        Buffer          buf;
 
1233
                        Page            page;
 
1234
 
 
1235
                        blkno = ItemPointerGetBlockNumber(&indextup);
 
1236
                        offnum = ItemPointerGetOffsetNumber(&indextup);
 
1237
 
 
1238
                        /* adjust any scans that will be affected by this deletion */
 
1239
                        /* (namely, my own scan) */
 
1240
                        rtadjscans(rel, RTOP_DEL, blkno, offnum);
 
1241
 
 
1242
                        /* delete the index tuple */
 
1243
                        buf = ReadBuffer(rel, blkno);
 
1244
                        page = BufferGetPage(buf);
 
1245
 
 
1246
                        PageIndexTupleDelete(page, offnum);
 
1247
 
 
1248
                        WriteBuffer(buf);
 
1249
 
 
1250
                        tuples_removed += 1;
 
1251
                }
 
1252
                else
 
1253
                        num_index_tuples += 1;
 
1254
        }
 
1255
 
 
1256
        index_endscan(iscan);
 
1257
 
 
1258
        /* return statistics */
 
1259
        num_pages = RelationGetNumberOfBlocks(rel);
 
1260
 
 
1261
        result = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
 
1262
        result->num_pages = num_pages;
 
1263
        result->num_index_tuples = num_index_tuples;
 
1264
        result->tuples_removed = tuples_removed;
 
1265
 
 
1266
        PG_RETURN_POINTER(result);
 
1267
}
 
1268
 
 
1269
 
 
1270
static void
 
1271
initRtstate(RTSTATE *rtstate, Relation index)
 
1272
{
 
1273
        fmgr_info_copy(&rtstate->unionFn,
 
1274
                                   index_getprocinfo(index, 1, RT_UNION_PROC),
 
1275
                                   CurrentMemoryContext);
 
1276
        fmgr_info_copy(&rtstate->sizeFn,
 
1277
                                   index_getprocinfo(index, 1, RT_SIZE_PROC),
 
1278
                                   CurrentMemoryContext);
 
1279
        fmgr_info_copy(&rtstate->interFn,
 
1280
                                   index_getprocinfo(index, 1, RT_INTER_PROC),
 
1281
                                   CurrentMemoryContext);
 
1282
}
 
1283
 
 
1284
/* for sorting SPLITCOST records in descending order */
 
1285
static int
 
1286
qsort_comp_splitcost(const void *a, const void *b)
 
1287
{
 
1288
        float           diff =
 
1289
        ((SPLITCOST *) a)->cost_differential -
 
1290
        ((SPLITCOST *) b)->cost_differential;
 
1291
 
 
1292
        if (diff < 0)
 
1293
                return 1;
 
1294
        else if (diff > 0)
 
1295
                return -1;
 
1296
        else
 
1297
                return 0;
 
1298
}
 
1299
 
 
1300
#ifdef RTDEBUG
 
1301
 
 
1302
void
 
1303
_rtdump(Relation r)
 
1304
{
 
1305
        Buffer          buf;
 
1306
        Page            page;
 
1307
        OffsetNumber offnum,
 
1308
                                maxoff;
 
1309
        BlockNumber blkno;
 
1310
        BlockNumber nblocks;
 
1311
        RTreePageOpaque po;
 
1312
        IndexTuple      itup;
 
1313
        BlockNumber itblkno;
 
1314
        OffsetNumber itoffno;
 
1315
        Datum           datum;
 
1316
        char       *itkey;
 
1317
 
 
1318
        nblocks = RelationGetNumberOfBlocks(r);
 
1319
        for (blkno = 0; blkno < nblocks; blkno++)
 
1320
        {
 
1321
                buf = ReadBuffer(r, blkno);
 
1322
                page = BufferGetPage(buf);
 
1323
                po = (RTreePageOpaque) PageGetSpecialPointer(page);
 
1324
                maxoff = PageGetMaxOffsetNumber(page);
 
1325
                printf("Page %d maxoff %d <%s>\n", blkno, maxoff,
 
1326
                           (po->flags & F_LEAF ? "LEAF" : "INTERNAL"));
 
1327
 
 
1328
                if (PageIsEmpty(page))
 
1329
                {
 
1330
                        ReleaseBuffer(buf);
 
1331
                        continue;
 
1332
                }
 
1333
 
 
1334
                for (offnum = FirstOffsetNumber;
 
1335
                         offnum <= maxoff;
 
1336
                         offnum = OffsetNumberNext(offnum))
 
1337
                {
 
1338
                        itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, offnum));
 
1339
                        itblkno = ItemPointerGetBlockNumber(&(itup->t_tid));
 
1340
                        itoffno = ItemPointerGetOffsetNumber(&(itup->t_tid));
 
1341
                        datum = IndexTupleGetDatum(itup);
 
1342
                        itkey = DatumGetCString(DirectFunctionCall1(box_out,
 
1343
                                                                                                                datum));
 
1344
                        printf("\t[%d] size %d heap <%d,%d> key:%s\n",
 
1345
                                   offnum, IndexTupleSize(itup), itblkno, itoffno, itkey);
 
1346
                        pfree(itkey);
 
1347
                }
 
1348
 
 
1349
                ReleaseBuffer(buf);
 
1350
        }
 
1351
}
 
1352
#endif   /* defined RTDEBUG */
 
1353
 
 
1354
void
 
1355
rtree_redo(XLogRecPtr lsn, XLogRecord *record)
 
1356
{
 
1357
        elog(PANIC, "rtree_redo: unimplemented");
 
1358
}
 
1359
 
 
1360
void
 
1361
rtree_undo(XLogRecPtr lsn, XLogRecord *record)
 
1362
{
 
1363
        elog(PANIC, "rtree_undo: unimplemented");
 
1364
}
 
1365
 
 
1366
void
 
1367
rtree_desc(char *buf, uint8 xl_info, char *rec)
 
1368
{
 
1369
}