~ubuntu-branches/ubuntu/lucid/postgresql-8.4/lucid-proposed

« back to all changes in this revision

Viewing changes to src/backend/access/gist/gistsplit.c

  • Committer: Bazaar Package Importer
  • Author(s): Martin Pitt
  • Date: 2009-03-20 12:00:13 UTC
  • Revision ID: james.westby@ubuntu.com-20090320120013-hogj7egc5mjncc5g
Tags: upstream-8.4~0cvs20090328
ImportĀ upstreamĀ versionĀ 8.4~0cvs20090328

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*-------------------------------------------------------------------------
 
2
 *
 
3
 * gistsplit.c
 
4
 *        Split page algorithm
 
5
 *
 
6
 *
 
7
 * Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group
 
8
 * Portions Copyright (c) 1994, Regents of the University of California
 
9
 *
 
10
 * IDENTIFICATION
 
11
 *        $PostgreSQL$
 
12
 *
 
13
 *-------------------------------------------------------------------------
 
14
 */
 
15
#include "postgres.h"
 
16
 
 
17
#include "access/gist_private.h"
 
18
#include "utils/rel.h"
 
19
 
 
20
typedef struct
 
21
{
 
22
        Datum      *attr;
 
23
        int                     len;
 
24
        OffsetNumber *entries;
 
25
        bool       *isnull;
 
26
        bool       *equiv;
 
27
} GistSplitUnion;
 
28
 
 
29
 
 
30
/*
 
31
 * Forms unions of subkeys after page split, but
 
32
 * uses only tuples aren't in groups of equalent tuples
 
33
 */
 
34
static void
 
35
gistunionsubkeyvec(GISTSTATE *giststate, IndexTuple *itvec,
 
36
                                   GistSplitUnion *gsvp, int startkey)
 
37
{
 
38
        IndexTuple *cleanedItVec;
 
39
        int                     i,
 
40
                                cleanedLen = 0;
 
41
 
 
42
        cleanedItVec = (IndexTuple *) palloc(sizeof(IndexTuple) * gsvp->len);
 
43
 
 
44
        for (i = 0; i < gsvp->len; i++)
 
45
        {
 
46
                if (gsvp->equiv && gsvp->equiv[gsvp->entries[i]])
 
47
                        continue;
 
48
 
 
49
                cleanedItVec[cleanedLen++] = itvec[gsvp->entries[i] - 1];
 
50
        }
 
51
 
 
52
        gistMakeUnionItVec(giststate, cleanedItVec, cleanedLen, startkey,
 
53
                                           gsvp->attr, gsvp->isnull);
 
54
 
 
55
        pfree(cleanedItVec);
 
56
}
 
57
 
 
58
/*
 
59
 * unions subkeys for after user picksplit over attno-1 column
 
60
 */
 
61
static void
 
62
gistunionsubkey(GISTSTATE *giststate, IndexTuple *itvec, GistSplitVector *spl, int attno)
 
63
{
 
64
        GistSplitUnion gsvp;
 
65
 
 
66
        gsvp.equiv = spl->spl_equiv;
 
67
 
 
68
        gsvp.attr = spl->spl_lattr;
 
69
        gsvp.len = spl->splitVector.spl_nleft;
 
70
        gsvp.entries = spl->splitVector.spl_left;
 
71
        gsvp.isnull = spl->spl_lisnull;
 
72
 
 
73
        gistunionsubkeyvec(giststate, itvec, &gsvp, attno);
 
74
 
 
75
        gsvp.attr = spl->spl_rattr;
 
76
        gsvp.len = spl->splitVector.spl_nright;
 
77
        gsvp.entries = spl->splitVector.spl_right;
 
78
        gsvp.isnull = spl->spl_risnull;
 
79
 
 
80
        gistunionsubkeyvec(giststate, itvec, &gsvp, attno);
 
81
}
 
82
 
 
83
/*
 
84
 * find group in vector with equivalent value
 
85
 */
 
86
static int
 
87
gistfindgroup(Relation r, GISTSTATE *giststate, GISTENTRY *valvec, GistSplitVector *spl, int attno)
 
88
{
 
89
        int                     i;
 
90
        GISTENTRY       entry;
 
91
        int                     len = 0;
 
92
 
 
93
        /*
 
94
         * attno key is always not null (see gistSplitByKey), so we may not check
 
95
         * for nulls
 
96
         */
 
97
        gistentryinit(entry, spl->splitVector.spl_rdatum, r, NULL, (OffsetNumber) 0, FALSE);
 
98
        for (i = 0; i < spl->splitVector.spl_nleft; i++)
 
99
        {
 
100
                float           penalty = gistpenalty(giststate, attno, &entry, false,
 
101
                                                           &valvec[spl->splitVector.spl_left[i]], false);
 
102
 
 
103
                if (penalty == 0.0)
 
104
                {
 
105
                        spl->spl_equiv[spl->splitVector.spl_left[i]] = true;
 
106
                        len++;
 
107
                }
 
108
        }
 
109
 
 
110
        gistentryinit(entry, spl->splitVector.spl_ldatum, r, NULL, (OffsetNumber) 0, FALSE);
 
111
        for (i = 0; i < spl->splitVector.spl_nright; i++)
 
112
        {
 
113
                float           penalty = gistpenalty(giststate, attno, &entry, false,
 
114
                                                          &valvec[spl->splitVector.spl_right[i]], false);
 
115
 
 
116
                if (penalty == 0.0)
 
117
                {
 
118
                        spl->spl_equiv[spl->splitVector.spl_right[i]] = true;
 
119
                        len++;
 
120
                }
 
121
        }
 
122
 
 
123
        return len;
 
124
}
 
125
 
 
126
static void
 
127
cleanupOffsets(OffsetNumber *a, int *len, bool *equiv, int *LenEquiv)
 
128
{
 
129
        int                     curlen,
 
130
                                i;
 
131
        OffsetNumber *curwpos;
 
132
 
 
133
        curlen = *len;
 
134
        curwpos = a;
 
135
        for (i = 0; i < *len; i++)
 
136
        {
 
137
                if (equiv[a[i]] == FALSE)
 
138
                {
 
139
                        *curwpos = a[i];
 
140
                        curwpos++;
 
141
                }
 
142
                else
 
143
                {
 
144
                        /* corner case: we shouldn't make void array */
 
145
                        if (curlen == 1)
 
146
                        {
 
147
                                equiv[a[i]] = FALSE;    /* mark item as non-equivalent */
 
148
                                i--;                    /* redo the same */
 
149
                                *LenEquiv -= 1;
 
150
                                continue;
 
151
                        }
 
152
                        else
 
153
                                curlen--;
 
154
                }
 
155
        }
 
156
 
 
157
        *len = curlen;
 
158
}
 
159
 
 
160
static void
 
161
placeOne(Relation r, GISTSTATE *giststate, GistSplitVector *v, IndexTuple itup, OffsetNumber off, int attno)
 
162
{
 
163
        GISTENTRY       identry[INDEX_MAX_KEYS];
 
164
        bool            isnull[INDEX_MAX_KEYS];
 
165
        bool            toLeft = true;
 
166
 
 
167
        gistDeCompressAtt(giststate, r, itup, NULL, (OffsetNumber) 0, identry, isnull);
 
168
 
 
169
        for (; attno < giststate->tupdesc->natts; attno++)
 
170
        {
 
171
                float           lpenalty,
 
172
                                        rpenalty;
 
173
                GISTENTRY       entry;
 
174
 
 
175
                gistentryinit(entry, v->spl_lattr[attno], r, NULL, 0, FALSE);
 
176
                lpenalty = gistpenalty(giststate, attno, &entry, v->spl_lisnull[attno], identry + attno, isnull[attno]);
 
177
                gistentryinit(entry, v->spl_rattr[attno], r, NULL, 0, FALSE);
 
178
                rpenalty = gistpenalty(giststate, attno, &entry, v->spl_risnull[attno], identry + attno, isnull[attno]);
 
179
 
 
180
                if (lpenalty != rpenalty)
 
181
                {
 
182
                        if (lpenalty > rpenalty)
 
183
                                toLeft = false;
 
184
                        break;
 
185
                }
 
186
        }
 
187
 
 
188
        if (toLeft)
 
189
                v->splitVector.spl_left[v->splitVector.spl_nleft++] = off;
 
190
        else
 
191
                v->splitVector.spl_right[v->splitVector.spl_nright++] = off;
 
192
}
 
193
 
 
194
#define SWAPVAR( s, d, t ) \
 
195
do {    \
 
196
        (t) = (s); \
 
197
        (s) = (d); \
 
198
        (d) = (t); \
 
199
} while(0)
 
200
 
 
201
/*
 
202
 * adjust left and right unions according to splits by previous
 
203
 * split by firsts columns. This function is called only in case
 
204
 * when pickSplit doesn't support subspplit.
 
205
 */
 
206
 
 
207
static void
 
208
supportSecondarySplit(Relation r, GISTSTATE *giststate, int attno, GIST_SPLITVEC *sv, Datum oldL, Datum oldR)
 
209
{
 
210
        bool            leaveOnLeft = true,
 
211
                                tmpBool;
 
212
        GISTENTRY       entryL,
 
213
                                entryR,
 
214
                                entrySL,
 
215
                                entrySR;
 
216
 
 
217
        gistentryinit(entryL, oldL, r, NULL, 0, FALSE);
 
218
        gistentryinit(entryR, oldR, r, NULL, 0, FALSE);
 
219
        gistentryinit(entrySL, sv->spl_ldatum, r, NULL, 0, FALSE);
 
220
        gistentryinit(entrySR, sv->spl_rdatum, r, NULL, 0, FALSE);
 
221
 
 
222
        if (sv->spl_ldatum_exists && sv->spl_rdatum_exists)
 
223
        {
 
224
                float           penalty1,
 
225
                                        penalty2;
 
226
 
 
227
                penalty1 = gistpenalty(giststate, attno, &entryL, false, &entrySL, false) +
 
228
                        gistpenalty(giststate, attno, &entryR, false, &entrySR, false);
 
229
                penalty2 = gistpenalty(giststate, attno, &entryL, false, &entrySR, false) +
 
230
                        gistpenalty(giststate, attno, &entryR, false, &entrySL, false);
 
231
 
 
232
                if (penalty1 > penalty2)
 
233
                        leaveOnLeft = false;
 
234
 
 
235
        }
 
236
        else
 
237
        {
 
238
                GISTENTRY  *entry1 = (sv->spl_ldatum_exists) ? &entryL : &entryR;
 
239
                float           penalty1,
 
240
                                        penalty2;
 
241
 
 
242
                /*
 
243
                 * there is only one previously defined union, so we just choose swap
 
244
                 * or not by lowest penalty
 
245
                 */
 
246
 
 
247
                penalty1 = gistpenalty(giststate, attno, entry1, false, &entrySL, false);
 
248
                penalty2 = gistpenalty(giststate, attno, entry1, false, &entrySR, false);
 
249
 
 
250
                if (penalty1 < penalty2)
 
251
                        leaveOnLeft = (sv->spl_ldatum_exists) ? true : false;
 
252
                else
 
253
                        leaveOnLeft = (sv->spl_rdatum_exists) ? true : false;
 
254
        }
 
255
 
 
256
        if (leaveOnLeft == false)
 
257
        {
 
258
                /*
 
259
                 * swap left and right
 
260
                 */
 
261
                OffsetNumber *off,
 
262
                                        noff;
 
263
                Datum           datum;
 
264
 
 
265
                SWAPVAR(sv->spl_left, sv->spl_right, off);
 
266
                SWAPVAR(sv->spl_nleft, sv->spl_nright, noff);
 
267
                SWAPVAR(sv->spl_ldatum, sv->spl_rdatum, datum);
 
268
                gistentryinit(entrySL, sv->spl_ldatum, r, NULL, 0, FALSE);
 
269
                gistentryinit(entrySR, sv->spl_rdatum, r, NULL, 0, FALSE);
 
270
        }
 
271
 
 
272
        if (sv->spl_ldatum_exists)
 
273
                gistMakeUnionKey(giststate, attno, &entryL, false, &entrySL, false,
 
274
                                                 &sv->spl_ldatum, &tmpBool);
 
275
 
 
276
        if (sv->spl_rdatum_exists)
 
277
                gistMakeUnionKey(giststate, attno, &entryR, false, &entrySR, false,
 
278
                                                 &sv->spl_rdatum, &tmpBool);
 
279
 
 
280
        sv->spl_ldatum_exists = sv->spl_rdatum_exists = false;
 
281
}
 
282
 
 
283
/*
 
284
 * Calls user picksplit method for attno columns to split vector to
 
285
 * two vectors. May use attno+n columns data to
 
286
 * get better split.
 
287
 * Returns TRUE and v->spl_equiv = NULL if left and right unions of attno columns are the same,
 
288
 * so caller may find better split
 
289
 * Returns TRUE and v->spl_equiv != NULL if there is tuples which may be freely moved
 
290
 */
 
291
static bool
 
292
gistUserPicksplit(Relation r, GistEntryVector *entryvec, int attno, GistSplitVector *v,
 
293
                                  IndexTuple *itup, int len, GISTSTATE *giststate)
 
294
{
 
295
        GIST_SPLITVEC *sv = &v->splitVector;
 
296
 
 
297
        /*
 
298
         * now let the user-defined picksplit function set up the split vector; in
 
299
         * entryvec have no null value!!
 
300
         */
 
301
 
 
302
        sv->spl_ldatum_exists = (v->spl_lisnull[attno]) ? false : true;
 
303
        sv->spl_rdatum_exists = (v->spl_risnull[attno]) ? false : true;
 
304
        sv->spl_ldatum = v->spl_lattr[attno];
 
305
        sv->spl_rdatum = v->spl_rattr[attno];
 
306
 
 
307
        FunctionCall2(&giststate->picksplitFn[attno],
 
308
                                  PointerGetDatum(entryvec),
 
309
                                  PointerGetDatum(sv));
 
310
 
 
311
        /* compatibility with old code */
 
312
        if (sv->spl_left[sv->spl_nleft - 1] == InvalidOffsetNumber)
 
313
                sv->spl_left[sv->spl_nleft - 1] = (OffsetNumber) (entryvec->n - 1);
 
314
        if (sv->spl_right[sv->spl_nright - 1] == InvalidOffsetNumber)
 
315
                sv->spl_right[sv->spl_nright - 1] = (OffsetNumber) (entryvec->n - 1);
 
316
 
 
317
        if (sv->spl_ldatum_exists || sv->spl_rdatum_exists)
 
318
        {
 
319
                elog(LOG, "PickSplit method of %d columns of index '%s' doesn't support secondary split",
 
320
                         attno + 1, RelationGetRelationName(r));
 
321
 
 
322
                supportSecondarySplit(r, giststate, attno, sv, v->spl_lattr[attno], v->spl_rattr[attno]);
 
323
        }
 
324
 
 
325
        v->spl_lattr[attno] = sv->spl_ldatum;
 
326
        v->spl_rattr[attno] = sv->spl_rdatum;
 
327
        v->spl_lisnull[attno] = false;
 
328
        v->spl_risnull[attno] = false;
 
329
 
 
330
        /*
 
331
         * if index is multikey, then we must to try get smaller bounding box for
 
332
         * subkey(s)
 
333
         */
 
334
        v->spl_equiv = NULL;
 
335
 
 
336
        if (giststate->tupdesc->natts > 1 && attno + 1 != giststate->tupdesc->natts)
 
337
        {
 
338
                if (gistKeyIsEQ(giststate, attno, sv->spl_ldatum, sv->spl_rdatum))
 
339
                {
 
340
                        /*
 
341
                         * Left and right key's unions are equial, so we can get better
 
342
                         * split by following columns. Note, unions for attno columns are
 
343
                         * already done.
 
344
                         */
 
345
 
 
346
                        return true;
 
347
                }
 
348
                else
 
349
                {
 
350
                        int                     LenEquiv;
 
351
 
 
352
                        v->spl_equiv = (bool *) palloc0(sizeof(bool) * (entryvec->n + 1));
 
353
 
 
354
                        LenEquiv = gistfindgroup(r, giststate, entryvec->vector, v, attno);
 
355
 
 
356
                        /*
 
357
                         * if possible, we should distribute equivalent tuples
 
358
                         */
 
359
                        if (LenEquiv == 0)
 
360
                        {
 
361
                                gistunionsubkey(giststate, itup, v, attno + 1);
 
362
                        }
 
363
                        else
 
364
                        {
 
365
                                cleanupOffsets(sv->spl_left, &sv->spl_nleft, v->spl_equiv, &LenEquiv);
 
366
                                cleanupOffsets(sv->spl_right, &sv->spl_nright, v->spl_equiv, &LenEquiv);
 
367
 
 
368
                                gistunionsubkey(giststate, itup, v, attno + 1);
 
369
                                if (LenEquiv == 1)
 
370
                                {
 
371
                                        /*
 
372
                                         * In case with one tuple we just choose left-right by
 
373
                                         * penalty. It's simplify user-defined pickSplit
 
374
                                         */
 
375
                                        OffsetNumber toMove = InvalidOffsetNumber;
 
376
 
 
377
                                        for (toMove = FirstOffsetNumber; toMove < entryvec->n; toMove++)
 
378
                                                if (v->spl_equiv[toMove])
 
379
                                                        break;
 
380
                                        Assert(toMove < entryvec->n);
 
381
 
 
382
                                        placeOne(r, giststate, v, itup[toMove - 1], toMove, attno + 1);
 
383
 
 
384
                                        /*
 
385
                                         * redo gistunionsubkey(): it will not degradate
 
386
                                         * performance, because it's very rarely
 
387
                                         */
 
388
                                        v->spl_equiv = NULL;
 
389
                                        gistunionsubkey(giststate, itup, v, attno + 1);
 
390
 
 
391
                                        return false;
 
392
                                }
 
393
                                else if (LenEquiv > 1)
 
394
                                        return true;
 
395
                        }
 
396
                }
 
397
        }
 
398
 
 
399
        return false;
 
400
}
 
401
 
 
402
/*
 
403
 * simple split page
 
404
 */
 
405
static void
 
406
gistSplitHalf(GIST_SPLITVEC *v, int len)
 
407
{
 
408
        int                     i;
 
409
 
 
410
        v->spl_nright = v->spl_nleft = 0;
 
411
        v->spl_left = (OffsetNumber *) palloc(len * sizeof(OffsetNumber));
 
412
        v->spl_right = (OffsetNumber *) palloc(len * sizeof(OffsetNumber));
 
413
        for (i = 1; i <= len; i++)
 
414
                if (i < len / 2)
 
415
                        v->spl_right[v->spl_nright++] = i;
 
416
                else
 
417
                        v->spl_left[v->spl_nleft++] = i;
 
418
}
 
419
 
 
420
/*
 
421
 * if it was invalid tuple then we need special processing.
 
422
 * We move all invalid tuples on right page.
 
423
 *
 
424
 * if there is no place on left page, gistSplit will be called one more
 
425
 * time for left page.
 
426
 *
 
427
 * Normally, we never exec this code, but after crash replay it's possible
 
428
 * to get 'invalid' tuples (probability is low enough)
 
429
 */
 
430
static void
 
431
gistSplitByInvalid(GISTSTATE *giststate, GistSplitVector *v, IndexTuple *itup, int len)
 
432
{
 
433
        int                     i;
 
434
        static OffsetNumber offInvTuples[MaxOffsetNumber];
 
435
        int                     nOffInvTuples = 0;
 
436
 
 
437
        for (i = 1; i <= len; i++)
 
438
                if (GistTupleIsInvalid(itup[i - 1]))
 
439
                        offInvTuples[nOffInvTuples++] = i;
 
440
 
 
441
        if (nOffInvTuples == len)
 
442
        {
 
443
                /* corner case, all tuples are invalid */
 
444
                v->spl_rightvalid = v->spl_leftvalid = false;
 
445
                gistSplitHalf(&v->splitVector, len);
 
446
        }
 
447
        else
 
448
        {
 
449
                GistSplitUnion gsvp;
 
450
 
 
451
                v->splitVector.spl_right = offInvTuples;
 
452
                v->splitVector.spl_nright = nOffInvTuples;
 
453
                v->spl_rightvalid = false;
 
454
 
 
455
                v->splitVector.spl_left = (OffsetNumber *) palloc(len * sizeof(OffsetNumber));
 
456
                v->splitVector.spl_nleft = 0;
 
457
                for (i = 1; i <= len; i++)
 
458
                        if (!GistTupleIsInvalid(itup[i - 1]))
 
459
                                v->splitVector.spl_left[v->splitVector.spl_nleft++] = i;
 
460
                v->spl_leftvalid = true;
 
461
 
 
462
                gsvp.equiv = NULL;
 
463
                gsvp.attr = v->spl_lattr;
 
464
                gsvp.len = v->splitVector.spl_nleft;
 
465
                gsvp.entries = v->splitVector.spl_left;
 
466
                gsvp.isnull = v->spl_lisnull;
 
467
 
 
468
                gistunionsubkeyvec(giststate, itup, &gsvp, 0);
 
469
        }
 
470
}
 
471
 
 
472
/*
 
473
 * trys to split page by attno key, in a case of null
 
474
 * values move its to separate page.
 
475
 */
 
476
void
 
477
gistSplitByKey(Relation r, Page page, IndexTuple *itup, int len, GISTSTATE *giststate,
 
478
                           GistSplitVector *v, GistEntryVector *entryvec, int attno)
 
479
{
 
480
        int                     i;
 
481
        static OffsetNumber offNullTuples[MaxOffsetNumber];
 
482
        int                     nOffNullTuples = 0;
 
483
 
 
484
        for (i = 1; i <= len; i++)
 
485
        {
 
486
                Datum           datum;
 
487
                bool            IsNull;
 
488
 
 
489
                if (!GistPageIsLeaf(page) && GistTupleIsInvalid(itup[i - 1]))
 
490
                {
 
491
                        gistSplitByInvalid(giststate, v, itup, len);
 
492
                        return;
 
493
                }
 
494
 
 
495
                datum = index_getattr(itup[i - 1], attno + 1, giststate->tupdesc, &IsNull);
 
496
                gistdentryinit(giststate, attno, &(entryvec->vector[i]),
 
497
                                           datum, r, page, i,
 
498
                                           FALSE, IsNull);
 
499
                if (IsNull)
 
500
                        offNullTuples[nOffNullTuples++] = i;
 
501
        }
 
502
 
 
503
        v->spl_leftvalid = v->spl_rightvalid = true;
 
504
 
 
505
        if (nOffNullTuples == len)
 
506
        {
 
507
                /*
 
508
                 * Corner case: All keys in attno column are null, we should try to
 
509
                 * split by keys in next column. It all keys in all columns are NULL
 
510
                 * just split page half by half
 
511
                 */
 
512
                v->spl_risnull[attno] = v->spl_lisnull[attno] = TRUE;
 
513
 
 
514
                if (attno + 1 == r->rd_att->natts)
 
515
                        gistSplitHalf(&v->splitVector, len);
 
516
                else
 
517
                        gistSplitByKey(r, page, itup, len, giststate, v, entryvec, attno + 1);
 
518
        }
 
519
        else if (nOffNullTuples > 0)
 
520
        {
 
521
                int                     j = 0;
 
522
 
 
523
                /*
 
524
                 * We don't want to mix NULLs and not-NULLs keys on one page, so move
 
525
                 * nulls to right page
 
526
                 */
 
527
                v->splitVector.spl_right = offNullTuples;
 
528
                v->splitVector.spl_nright = nOffNullTuples;
 
529
                v->spl_risnull[attno] = TRUE;
 
530
 
 
531
                v->splitVector.spl_left = (OffsetNumber *) palloc(len * sizeof(OffsetNumber));
 
532
                v->splitVector.spl_nleft = 0;
 
533
                for (i = 1; i <= len; i++)
 
534
                        if (j < v->splitVector.spl_nright && offNullTuples[j] == i)
 
535
                                j++;
 
536
                        else
 
537
                                v->splitVector.spl_left[v->splitVector.spl_nleft++] = i;
 
538
 
 
539
                v->spl_equiv = NULL;
 
540
                gistunionsubkey(giststate, itup, v, attno);
 
541
        }
 
542
        else
 
543
        {
 
544
                /*
 
545
                 * all keys are not-null
 
546
                 */
 
547
                entryvec->n = len + 1;
 
548
 
 
549
                if (gistUserPicksplit(r, entryvec, attno, v, itup, len, giststate) && attno + 1 != r->rd_att->natts)
 
550
                {
 
551
                        /*
 
552
                         * Splitting on attno column is not optimized: there is a tuples
 
553
                         * which can be freely left or right page, we will try to split
 
554
                         * page by following columns
 
555
                         */
 
556
                        if (v->spl_equiv == NULL)
 
557
                        {
 
558
                                /*
 
559
                                 * simple case: left and right keys for attno column are
 
560
                                 * equial
 
561
                                 */
 
562
                                gistSplitByKey(r, page, itup, len, giststate, v, entryvec, attno + 1);
 
563
                        }
 
564
                        else
 
565
                        {
 
566
                                /* we should clean up vector from already distributed tuples */
 
567
                                IndexTuple *newitup = (IndexTuple *) palloc((len + 1) * sizeof(IndexTuple));
 
568
                                OffsetNumber *map = (OffsetNumber *) palloc((len + 1) * sizeof(IndexTuple));
 
569
                                int                     newlen = 0;
 
570
                                GIST_SPLITVEC backupSplit = v->splitVector;
 
571
 
 
572
                                for (i = 0; i < len; i++)
 
573
                                        if (v->spl_equiv[i + 1])
 
574
                                        {
 
575
                                                map[newlen] = i + 1;
 
576
                                                newitup[newlen++] = itup[i];
 
577
                                        }
 
578
 
 
579
                                Assert(newlen > 0);
 
580
 
 
581
                                backupSplit.spl_left = (OffsetNumber *) palloc(sizeof(OffsetNumber) * len);
 
582
                                memcpy(backupSplit.spl_left, v->splitVector.spl_left, sizeof(OffsetNumber) * v->splitVector.spl_nleft);
 
583
                                backupSplit.spl_right = (OffsetNumber *) palloc(sizeof(OffsetNumber) * len);
 
584
                                memcpy(backupSplit.spl_right, v->splitVector.spl_right, sizeof(OffsetNumber) * v->splitVector.spl_nright);
 
585
 
 
586
                                gistSplitByKey(r, page, newitup, newlen, giststate, v, entryvec, attno + 1);
 
587
 
 
588
                                /* merge result of subsplit */
 
589
                                for (i = 0; i < v->splitVector.spl_nleft; i++)
 
590
                                        backupSplit.spl_left[backupSplit.spl_nleft++] = map[v->splitVector.spl_left[i] - 1];
 
591
                                for (i = 0; i < v->splitVector.spl_nright; i++)
 
592
                                        backupSplit.spl_right[backupSplit.spl_nright++] = map[v->splitVector.spl_right[i] - 1];
 
593
 
 
594
                                v->splitVector = backupSplit;
 
595
                                /* reunion left and right datums */
 
596
                                gistunionsubkey(giststate, itup, v, attno);
 
597
                        }
 
598
                }
 
599
        }
 
600
}