~ubuntu-branches/ubuntu/hardy/postgresql-8.4/hardy-backports

« back to all changes in this revision

Viewing changes to src/backend/access/gin/ginfast.c

  • Committer: Bazaar Package Importer
  • Author(s): Ubuntu Archive Backport
  • Date: 2009-11-09 13:46:16 UTC
  • mfrom: (5.2.2 sid)
  • Revision ID: james.westby@ubuntu.com-20091109134616-ae14l9tr3g7p2t9y
Tags: 8.4.1-1~hardy1
Automated backport upload; no source changes.

Show diffs side-by-side

added added

removed removed

Lines of Context:
11
11
 * Portions Copyright (c) 1994, Regents of the University of California
12
12
 *
13
13
 * IDENTIFICATION
14
 
 *                      $PostgreSQL: pgsql/src/backend/access/gin/ginfast.c,v 1.2 2009/03/24 22:06:03 tgl Exp $
 
14
 *                      $PostgreSQL: pgsql/src/backend/access/gin/ginfast.c,v 1.3 2009/06/11 14:48:53 momjian Exp $
15
15
 *
16
16
 *-------------------------------------------------------------------------
17
17
 */
33
33
 
34
34
typedef struct DatumArray
35
35
{
36
 
        Datum   *values;                        /* expansible array */
37
 
        int32    nvalues;                       /* current number of valid entries */
38
 
        int32    maxvalues;                     /* allocated size of array */
 
36
        Datum      *values;                     /* expansible array */
 
37
        int32           nvalues;                /* current number of valid entries */
 
38
        int32           maxvalues;              /* allocated size of array */
39
39
} DatumArray;
40
40
 
41
41
 
46
46
writeListPage(Relation index, Buffer buffer,
47
47
                          IndexTuple *tuples, int32 ntuples, BlockNumber rightlink)
48
48
{
49
 
        Page                    page = BufferGetPage(buffer);
50
 
        int                     i, freesize, size=0;
51
 
        OffsetNumber    l, off;
52
 
        char               *workspace;
53
 
        char               *ptr;
 
49
        Page            page = BufferGetPage(buffer);
 
50
        int                     i,
 
51
                                freesize,
 
52
                                size = 0;
 
53
        OffsetNumber l,
 
54
                                off;
 
55
        char       *workspace;
 
56
        char       *ptr;
54
57
 
55
58
        /* workspace could be a local array; we use palloc for alignment */
56
59
        workspace = palloc(BLCKSZ);
62
65
        off = FirstOffsetNumber;
63
66
        ptr = workspace;
64
67
 
65
 
        for(i=0; i<ntuples; i++)
 
68
        for (i = 0; i < ntuples; i++)
66
69
        {
67
 
                int             this_size = IndexTupleSize(tuples[i]);
 
70
                int                     this_size = IndexTupleSize(tuples[i]);
68
71
 
69
72
                memcpy(ptr, tuples[i], this_size);
70
73
                ptr += this_size;
71
74
                size += this_size;
72
75
 
73
 
                l = PageAddItem(page, (Item)tuples[i], this_size, off, false, false);
 
76
                l = PageAddItem(page, (Item) tuples[i], this_size, off, false, false);
74
77
 
75
78
                if (l == InvalidOffsetNumber)
76
79
                        elog(ERROR, "failed to add item to index page in \"%s\"",
84
87
        GinPageGetOpaque(page)->rightlink = rightlink;
85
88
 
86
89
        /*
87
 
         * tail page may contain only the whole row(s) or final
88
 
         * part of row placed on previous pages
 
90
         * tail page may contain only the whole row(s) or final part of row placed
 
91
         * on previous pages
89
92
         */
90
 
        if ( rightlink == InvalidBlockNumber )
 
93
        if (rightlink == InvalidBlockNumber)
91
94
        {
92
95
                GinPageSetFullRow(page);
93
96
                GinPageGetOpaque(page)->maxoff = 1;
103
106
 
104
107
        if (!index->rd_istemp)
105
108
        {
106
 
                XLogRecData                             rdata[2];
107
 
                ginxlogInsertListPage   data;
108
 
                XLogRecPtr                      recptr;
 
109
                XLogRecData rdata[2];
 
110
                ginxlogInsertListPage data;
 
111
                XLogRecPtr      recptr;
109
112
 
110
113
                rdata[0].buffer = buffer;
111
114
                rdata[0].buffer_std = true;
112
 
                rdata[0].data = (char*)&data;
 
115
                rdata[0].data = (char *) &data;
113
116
                rdata[0].len = sizeof(ginxlogInsertListPage);
114
 
                rdata[0].next = rdata+1;
 
117
                rdata[0].next = rdata + 1;
115
118
 
116
119
                rdata[1].buffer = InvalidBuffer;
117
120
                rdata[1].data = workspace;
140
143
makeSublist(Relation index, IndexTuple *tuples, int32 ntuples,
141
144
                        GinMetaPageData *res)
142
145
{
143
 
        Buffer                  curBuffer = InvalidBuffer;
144
 
        Buffer                  prevBuffer = InvalidBuffer;
145
 
        int                     i, size = 0, tupsize;
146
 
        int                     startTuple = 0;
 
146
        Buffer          curBuffer = InvalidBuffer;
 
147
        Buffer          prevBuffer = InvalidBuffer;
 
148
        int                     i,
 
149
                                size = 0,
 
150
                                tupsize;
 
151
        int                     startTuple = 0;
147
152
 
148
153
        Assert(ntuples > 0);
149
154
 
150
155
        /*
151
156
         * Split tuples into pages
152
157
         */
153
 
        for(i=0;i<ntuples;i++)
 
158
        for (i = 0; i < ntuples; i++)
154
159
        {
155
 
                if ( curBuffer == InvalidBuffer )
 
160
                if (curBuffer == InvalidBuffer)
156
161
                {
157
162
                        curBuffer = GinNewBuffer(index);
158
163
 
159
 
                        if ( prevBuffer != InvalidBuffer )
 
164
                        if (prevBuffer != InvalidBuffer)
160
165
                        {
161
166
                                res->nPendingPages++;
162
167
                                writeListPage(index, prevBuffer,
163
 
                                                          tuples+startTuple, i-startTuple,
 
168
                                                          tuples + startTuple, i - startTuple,
164
169
                                                          BufferGetBlockNumber(curBuffer));
165
170
                        }
166
171
                        else
175
180
 
176
181
                tupsize = MAXALIGN(IndexTupleSize(tuples[i])) + sizeof(ItemIdData);
177
182
 
178
 
                if ( size + tupsize >= GinListPageSize )
 
183
                if (size + tupsize >= GinListPageSize)
179
184
                {
180
185
                        /* won't fit, force a new page and reprocess */
181
186
                        i--;
192
197
         */
193
198
        res->tail = BufferGetBlockNumber(curBuffer);
194
199
        res->tailFreeSize = writeListPage(index, curBuffer,
195
 
                                                                          tuples+startTuple, ntuples-startTuple,
 
200
                                                                   tuples + startTuple, ntuples - startTuple,
196
201
                                                                          InvalidBlockNumber);
197
202
        res->nPendingPages++;
198
203
        /* that was only one heap tuple */
207
212
ginHeapTupleFastInsert(Relation index, GinState *ginstate,
208
213
                                           GinTupleCollector *collector)
209
214
{
210
 
        Buffer                          metabuffer;
211
 
        Page                            metapage;
212
 
        GinMetaPageData    *metadata = NULL;
213
 
        XLogRecData                     rdata[2];
214
 
        Buffer                          buffer = InvalidBuffer;
215
 
        Page                            page = NULL;
216
 
        ginxlogUpdateMeta       data;
217
 
        bool                            separateList = false;
218
 
        bool                            needCleanup = false;
 
215
        Buffer          metabuffer;
 
216
        Page            metapage;
 
217
        GinMetaPageData *metadata = NULL;
 
218
        XLogRecData rdata[2];
 
219
        Buffer          buffer = InvalidBuffer;
 
220
        Page            page = NULL;
 
221
        ginxlogUpdateMeta data;
 
222
        bool            separateList = false;
 
223
        bool            needCleanup = false;
219
224
 
220
 
        if ( collector->ntuples == 0 )
 
225
        if (collector->ntuples == 0)
221
226
                return;
222
227
 
223
228
        data.node = index->rd_node;
232
237
        metabuffer = ReadBuffer(index, GIN_METAPAGE_BLKNO);
233
238
        metapage = BufferGetPage(metabuffer);
234
239
 
235
 
        if ( collector->sumsize + collector->ntuples * sizeof(ItemIdData) > GIN_PAGE_FREESIZE )
 
240
        if (collector->sumsize + collector->ntuples * sizeof(ItemIdData) > GIN_PAGE_FREESIZE)
236
241
        {
237
242
                /*
238
243
                 * Total size is greater than one page => make sublist
244
249
                LockBuffer(metabuffer, GIN_EXCLUSIVE);
245
250
                metadata = GinPageGetMeta(metapage);
246
251
 
247
 
                if ( metadata->head == InvalidBlockNumber ||
248
 
                        collector->sumsize + collector->ntuples * sizeof(ItemIdData) > metadata->tailFreeSize )
 
252
                if (metadata->head == InvalidBlockNumber ||
 
253
                        collector->sumsize + collector->ntuples * sizeof(ItemIdData) > metadata->tailFreeSize)
249
254
                {
250
255
                        /*
251
256
                         * Pending list is empty or total size is greater than freespace
258
263
                }
259
264
        }
260
265
 
261
 
        if ( separateList )
 
266
        if (separateList)
262
267
        {
263
 
                GinMetaPageData         sublist;
 
268
                GinMetaPageData sublist;
264
269
 
265
270
                /*
266
271
                 * We should make sublist separately and append it to the tail
267
272
                 */
268
 
                memset( &sublist, 0, sizeof(GinMetaPageData) );
 
273
                memset(&sublist, 0, sizeof(GinMetaPageData));
269
274
 
270
275
                makeSublist(index, collector->tuples, collector->ntuples, &sublist);
271
276
 
275
280
                LockBuffer(metabuffer, GIN_EXCLUSIVE);
276
281
                metadata = GinPageGetMeta(metapage);
277
282
 
278
 
                if ( metadata->head == InvalidBlockNumber )
 
283
                if (metadata->head == InvalidBlockNumber)
279
284
                {
280
285
                        /*
281
286
                         * Sublist becomes main list
282
287
                         */
283
288
                        START_CRIT_SECTION();
284
 
                        memcpy(metadata, &sublist, sizeof(GinMetaPageData) );
285
 
                        memcpy(&data.metadata, &sublist, sizeof(GinMetaPageData) );
 
289
                        memcpy(metadata, &sublist, sizeof(GinMetaPageData));
 
290
                        memcpy(&data.metadata, &sublist, sizeof(GinMetaPageData));
286
291
                }
287
292
                else
288
293
                {
305
310
                        metadata->nPendingPages += sublist.nPendingPages;
306
311
                        metadata->nPendingHeapTuples += sublist.nPendingHeapTuples;
307
312
 
308
 
                        memcpy(&data.metadata, metadata, sizeof(GinMetaPageData) );
 
313
                        memcpy(&data.metadata, metadata, sizeof(GinMetaPageData));
309
314
                        data.newRightlink = sublist.head;
310
315
 
311
316
                        MarkBufferDirty(buffer);
317
322
                 * Insert into tail page, metapage is already locked
318
323
                 */
319
324
 
320
 
                OffsetNumber    l, off;
321
 
                int                             i, tupsize;
322
 
                char                    *ptr;
 
325
                OffsetNumber l,
 
326
                                        off;
 
327
                int                     i,
 
328
                                        tupsize;
 
329
                char       *ptr;
323
330
 
324
331
                buffer = ReadBuffer(index, metadata->tail);
325
332
                LockBuffer(buffer, GIN_EXCLUSIVE);
326
333
                page = BufferGetPage(buffer);
327
334
                off = (PageIsEmpty(page)) ? FirstOffsetNumber :
328
 
                                OffsetNumberNext(PageGetMaxOffsetNumber(page));
 
335
                        OffsetNumberNext(PageGetMaxOffsetNumber(page));
329
336
 
330
337
                rdata[0].next = rdata + 1;
331
338
 
332
339
                rdata[1].buffer = buffer;
333
340
                rdata[1].buffer_std = true;
334
 
                ptr = rdata[1].data = (char *) palloc( collector->sumsize );
 
341
                ptr = rdata[1].data = (char *) palloc(collector->sumsize);
335
342
                rdata[1].len = collector->sumsize;
336
343
                rdata[1].next = NULL;
337
344
 
342
349
                /*
343
350
                 * Increase counter of heap tuples
344
351
                 */
345
 
                Assert( GinPageGetOpaque(page)->maxoff <= metadata->nPendingHeapTuples );
 
352
                Assert(GinPageGetOpaque(page)->maxoff <= metadata->nPendingHeapTuples);
346
353
                GinPageGetOpaque(page)->maxoff++;
347
354
                metadata->nPendingHeapTuples++;
348
355
 
349
 
                for(i=0; i<collector->ntuples; i++)
 
356
                for (i = 0; i < collector->ntuples; i++)
350
357
                {
351
358
                        tupsize = IndexTupleSize(collector->tuples[i]);
352
 
                        l = PageAddItem(page, (Item)collector->tuples[i], tupsize, off, false, false);
 
359
                        l = PageAddItem(page, (Item) collector->tuples[i], tupsize, off, false, false);
353
360
 
354
361
                        if (l == InvalidOffsetNumber)
355
362
                                elog(ERROR, "failed to add item to index page in \"%s\"",
356
 
                                                 RelationGetRelationName(index));
 
363
                                         RelationGetRelationName(index));
357
364
 
358
365
                        memcpy(ptr, collector->tuples[i], tupsize);
359
 
                        ptr+=tupsize;
 
366
                        ptr += tupsize;
360
367
 
361
368
                        off++;
362
369
                }
363
370
 
364
371
                metadata->tailFreeSize -= collector->sumsize + collector->ntuples * sizeof(ItemIdData);
365
 
                memcpy(&data.metadata, metadata, sizeof(GinMetaPageData) );
 
372
                memcpy(&data.metadata, metadata, sizeof(GinMetaPageData));
366
373
                MarkBufferDirty(buffer);
367
374
        }
368
375
 
369
376
        /*
370
 
         *  Make real write
 
377
         * Make real write
371
378
         */
372
379
 
373
380
        MarkBufferDirty(metabuffer);
374
 
        if ( !index->rd_istemp )
 
381
        if (!index->rd_istemp)
375
382
        {
376
 
                XLogRecPtr  recptr;
 
383
                XLogRecPtr      recptr;
377
384
 
378
385
                recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_UPDATE_META_PAGE, rdata);
379
386
                PageSetLSN(metapage, recptr);
380
387
                PageSetTLI(metapage, ThisTimeLineID);
381
388
 
382
 
                if ( buffer != InvalidBuffer )
 
389
                if (buffer != InvalidBuffer)
383
390
                {
384
391
                        PageSetLSN(page, recptr);
385
392
                        PageSetTLI(page, ThisTimeLineID);
390
397
                UnlockReleaseBuffer(buffer);
391
398
 
392
399
        /*
393
 
         * Force pending list cleanup when it becomes too long.
394
 
         * And, ginInsertCleanup could take significant amount of
395
 
         * time, so we prefer to call it when it can do all the work in a
396
 
         * single collection cycle. In non-vacuum mode, it shouldn't
397
 
         * require maintenance_work_mem, so fire it while pending list is
398
 
         * still small enough to fit into work_mem.
 
400
         * Force pending list cleanup when it becomes too long. And,
 
401
         * ginInsertCleanup could take significant amount of time, so we prefer to
 
402
         * call it when it can do all the work in a single collection cycle. In
 
403
         * non-vacuum mode, it shouldn't require maintenance_work_mem, so fire it
 
404
         * while pending list is still small enough to fit into work_mem.
399
405
         *
400
406
         * ginInsertCleanup() should not be called inside our CRIT_SECTION.
401
407
         */
402
 
        if ( metadata->nPendingPages * GIN_PAGE_FREESIZE > work_mem * 1024L )
 
408
        if (metadata->nPendingPages * GIN_PAGE_FREESIZE > work_mem * 1024L)
403
409
                needCleanup = true;
404
410
 
405
411
        UnlockReleaseBuffer(metabuffer);
406
412
 
407
413
        END_CRIT_SECTION();
408
414
 
409
 
        if ( needCleanup )
 
415
        if (needCleanup)
410
416
                ginInsertCleanup(index, ginstate, false, NULL);
411
417
}
412
418
 
432
438
        /*
433
439
         * Allocate/reallocate memory for storing collected tuples
434
440
         */
435
 
        if ( collector->tuples == NULL )
 
441
        if (collector->tuples == NULL)
436
442
        {
437
443
                collector->lentuples = nentries * index->rd_att->natts;
438
 
                collector->tuples = (IndexTuple*)palloc(sizeof(IndexTuple) * collector->lentuples);
 
444
                collector->tuples = (IndexTuple *) palloc(sizeof(IndexTuple) * collector->lentuples);
439
445
        }
440
446
 
441
 
        while ( collector->ntuples + nentries > collector->lentuples )
 
447
        while (collector->ntuples + nentries > collector->lentuples)
442
448
        {
443
449
                collector->lentuples *= 2;
444
 
                collector->tuples = (IndexTuple*)repalloc( collector->tuples,
445
 
                                                                                                        sizeof(IndexTuple) * collector->lentuples);
 
450
                collector->tuples = (IndexTuple *) repalloc(collector->tuples,
 
451
                                                                  sizeof(IndexTuple) * collector->lentuples);
446
452
        }
447
453
 
448
454
        /*
450
456
         */
451
457
        for (i = 0; i < nentries; i++)
452
458
        {
453
 
                int32 tupsize;
 
459
                int32           tupsize;
454
460
 
455
461
                collector->tuples[collector->ntuples + i] = GinFormTuple(ginstate, attnum, entries[i], NULL, 0);
456
462
                collector->tuples[collector->ntuples + i]->t_tid = *item;
457
463
                tupsize = IndexTupleSize(collector->tuples[collector->ntuples + i]);
458
464
 
459
 
                if ( tupsize > TOAST_INDEX_TARGET || tupsize >= GinMaxItemSize)
 
465
                if (tupsize > TOAST_INDEX_TARGET || tupsize >= GinMaxItemSize)
460
466
                        elog(ERROR, "huge tuple");
461
467
 
462
468
                collector->sumsize += tupsize;
480
486
shiftList(Relation index, Buffer metabuffer, BlockNumber newHead,
481
487
                  IndexBulkDeleteResult *stats)
482
488
{
483
 
        Page                                    metapage;
484
 
        GinMetaPageData            *metadata;
485
 
        BlockNumber                             blknoToDelete;
 
489
        Page            metapage;
 
490
        GinMetaPageData *metadata;
 
491
        BlockNumber blknoToDelete;
486
492
 
487
493
        metapage = BufferGetPage(metabuffer);
488
494
        metadata = GinPageGetMeta(metapage);
490
496
 
491
497
        do
492
498
        {
493
 
                Page                                    page;
494
 
                int                                             i;
495
 
                int64                                   nDeletedHeapTuples = 0;
496
 
                ginxlogDeleteListPages  data;
497
 
                XLogRecData                             rdata[1];
498
 
                Buffer                                  buffers[GIN_NDELETE_AT_ONCE];
 
499
                Page            page;
 
500
                int                     i;
 
501
                int64           nDeletedHeapTuples = 0;
 
502
                ginxlogDeleteListPages data;
 
503
                XLogRecData rdata[1];
 
504
                Buffer          buffers[GIN_NDELETE_AT_ONCE];
499
505
 
500
506
                data.node = index->rd_node;
501
507
 
507
513
                data.ndeleted = 0;
508
514
                while (data.ndeleted < GIN_NDELETE_AT_ONCE && blknoToDelete != newHead)
509
515
                {
510
 
                        data.toDelete[ data.ndeleted ] = blknoToDelete;
511
 
                        buffers[ data.ndeleted ] = ReadBuffer(index, blknoToDelete);
512
 
                        LockBuffer( buffers[ data.ndeleted ], GIN_EXCLUSIVE );
513
 
                        page = BufferGetPage( buffers[ data.ndeleted ] );
 
516
                        data.toDelete[data.ndeleted] = blknoToDelete;
 
517
                        buffers[data.ndeleted] = ReadBuffer(index, blknoToDelete);
 
518
                        LockBuffer(buffers[data.ndeleted], GIN_EXCLUSIVE);
 
519
                        page = BufferGetPage(buffers[data.ndeleted]);
514
520
 
515
521
                        data.ndeleted++;
516
522
 
517
 
                        if ( GinPageIsDeleted(page) )
 
523
                        if (GinPageIsDeleted(page))
518
524
                        {
519
525
                                /* concurrent cleanup process is detected */
520
 
                                for(i=0;i<data.ndeleted;i++)
521
 
                                        UnlockReleaseBuffer( buffers[i] );
 
526
                                for (i = 0; i < data.ndeleted; i++)
 
527
                                        UnlockReleaseBuffer(buffers[i]);
522
528
 
523
529
                                return true;
524
530
                        }
525
531
 
526
532
                        nDeletedHeapTuples += GinPageGetOpaque(page)->maxoff;
527
 
                        blknoToDelete = GinPageGetOpaque( page )->rightlink;
 
533
                        blknoToDelete = GinPageGetOpaque(page)->rightlink;
528
534
                }
529
535
 
530
536
                if (stats)
534
540
 
535
541
                metadata->head = blknoToDelete;
536
542
 
537
 
                Assert( metadata->nPendingPages >= data.ndeleted );
 
543
                Assert(metadata->nPendingPages >= data.ndeleted);
538
544
                metadata->nPendingPages -= data.ndeleted;
539
 
                Assert( metadata->nPendingHeapTuples >= nDeletedHeapTuples );
 
545
                Assert(metadata->nPendingHeapTuples >= nDeletedHeapTuples);
540
546
                metadata->nPendingHeapTuples -= nDeletedHeapTuples;
541
547
 
542
 
                if ( blknoToDelete == InvalidBlockNumber )
 
548
                if (blknoToDelete == InvalidBlockNumber)
543
549
                {
544
550
                        metadata->tail = InvalidBlockNumber;
545
551
                        metadata->tailFreeSize = 0;
546
552
                        metadata->nPendingPages = 0;
547
553
                        metadata->nPendingHeapTuples = 0;
548
554
                }
549
 
                memcpy( &data.metadata, metadata, sizeof(GinMetaPageData));
550
 
 
551
 
                MarkBufferDirty( metabuffer );
552
 
 
553
 
                for(i=0; i<data.ndeleted; i++)
 
555
                memcpy(&data.metadata, metadata, sizeof(GinMetaPageData));
 
556
 
 
557
                MarkBufferDirty(metabuffer);
 
558
 
 
559
                for (i = 0; i < data.ndeleted; i++)
554
560
                {
555
 
                        page = BufferGetPage( buffers[ i ] );
556
 
                        GinPageGetOpaque( page )->flags = GIN_DELETED;
557
 
                        MarkBufferDirty( buffers[ i ] );
 
561
                        page = BufferGetPage(buffers[i]);
 
562
                        GinPageGetOpaque(page)->flags = GIN_DELETED;
 
563
                        MarkBufferDirty(buffers[i]);
558
564
                }
559
565
 
560
 
                if ( !index->rd_istemp )
 
566
                if (!index->rd_istemp)
561
567
                {
562
 
                        XLogRecPtr  recptr;
 
568
                        XLogRecPtr      recptr;
563
569
 
564
570
                        recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_DELETE_LISTPAGE, rdata);
565
571
                        PageSetLSN(metapage, recptr);
566
572
                        PageSetTLI(metapage, ThisTimeLineID);
567
573
 
568
 
                        for(i=0; i<data.ndeleted; i++)
 
574
                        for (i = 0; i < data.ndeleted; i++)
569
575
                        {
570
 
                                page = BufferGetPage( buffers[ i ] );
 
576
                                page = BufferGetPage(buffers[i]);
571
577
                                PageSetLSN(page, recptr);
572
578
                                PageSetTLI(page, ThisTimeLineID);
573
579
                        }
574
580
                }
575
581
 
576
 
                for(i=0; i<data.ndeleted; i++)
577
 
                        UnlockReleaseBuffer( buffers[ i ] );
 
582
                for (i = 0; i < data.ndeleted; i++)
 
583
                        UnlockReleaseBuffer(buffers[i]);
578
584
 
579
585
                END_CRIT_SECTION();
580
 
        } while( blknoToDelete != newHead );
 
586
        } while (blknoToDelete != newHead);
581
587
 
582
588
        return false;
583
589
}
586
592
static void
587
593
addDatum(DatumArray *datums, Datum datum)
588
594
{
589
 
        if ( datums->nvalues >= datums->maxvalues)
 
595
        if (datums->nvalues >= datums->maxvalues)
590
596
        {
591
597
                datums->maxvalues *= 2;
592
 
                datums->values = (Datum*)repalloc(datums->values,
593
 
                                                                                  sizeof(Datum)*datums->maxvalues);
 
598
                datums->values = (Datum *) repalloc(datums->values,
 
599
                                                                                  sizeof(Datum) * datums->maxvalues);
594
600
        }
595
601
 
596
 
        datums->values[ datums->nvalues++ ] = datum;
 
602
        datums->values[datums->nvalues++] = datum;
597
603
}
598
604
 
599
605
/*
606
612
processPendingPage(BuildAccumulator *accum, DatumArray *da,
607
613
                                   Page page, OffsetNumber startoff)
608
614
{
609
 
        ItemPointerData heapptr;
610
 
        OffsetNumber    i,maxoff;
611
 
        OffsetNumber    attrnum, curattnum;
 
615
        ItemPointerData heapptr;
 
616
        OffsetNumber i,
 
617
                                maxoff;
 
618
        OffsetNumber attrnum,
 
619
                                curattnum;
612
620
 
613
621
        /* reset *da to empty */
614
622
        da->nvalues = 0;
615
623
 
616
624
        maxoff = PageGetMaxOffsetNumber(page);
617
 
        Assert( maxoff >= FirstOffsetNumber );
 
625
        Assert(maxoff >= FirstOffsetNumber);
618
626
        ItemPointerSetInvalid(&heapptr);
619
627
        attrnum = 0;
620
628
 
621
629
        for (i = startoff; i <= maxoff; i = OffsetNumberNext(i))
622
630
        {
623
 
                IndexTuple  itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, i));
 
631
                IndexTuple      itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, i));
624
632
 
625
633
                curattnum = gintuple_get_attrnum(accum->ginstate, itup);
626
634
 
627
 
                if ( !ItemPointerIsValid(&heapptr) )
 
635
                if (!ItemPointerIsValid(&heapptr))
628
636
                {
629
637
                        heapptr = itup->t_tid;
630
638
                        attrnum = curattnum;
631
639
                }
632
 
                else if ( !(ItemPointerEquals(&heapptr, &itup->t_tid) &&
633
 
                                        curattnum == attrnum) )
 
640
                else if (!(ItemPointerEquals(&heapptr, &itup->t_tid) &&
 
641
                                   curattnum == attrnum))
634
642
                {
635
643
                        /*
636
644
                         * We can insert several datums per call, but only for one heap
652
660
 *
653
661
 * This can be called concurrently by multiple backends, so it must cope.
654
662
 * On first glance it looks completely not concurrent-safe and not crash-safe
655
 
 * either.  The reason it's okay is that multiple insertion of the same entry
 
663
 * either.      The reason it's okay is that multiple insertion of the same entry
656
664
 * is detected and treated as a no-op by gininsert.c.  If we crash after
657
665
 * posting entries to the main index and before removing them from the
658
666
 * pending list, it's okay because when we redo the posting later on, nothing
671
679
ginInsertCleanup(Relation index, GinState *ginstate,
672
680
                                 bool vac_delay, IndexBulkDeleteResult *stats)
673
681
{
674
 
        Buffer                          metabuffer, buffer;
675
 
        Page                            metapage, page;
676
 
        GinMetaPageData    *metadata;
677
 
        MemoryContext           opCtx, oldCtx;
678
 
        BuildAccumulator        accum;
679
 
        DatumArray                      datums;
680
 
        BlockNumber                     blkno;
 
682
        Buffer          metabuffer,
 
683
                                buffer;
 
684
        Page            metapage,
 
685
                                page;
 
686
        GinMetaPageData *metadata;
 
687
        MemoryContext opCtx,
 
688
                                oldCtx;
 
689
        BuildAccumulator accum;
 
690
        DatumArray      datums;
 
691
        BlockNumber blkno;
681
692
 
682
693
        metabuffer = ReadBuffer(index, GIN_METAPAGE_BLKNO);
683
694
        LockBuffer(metabuffer, GIN_SHARE);
684
695
        metapage = BufferGetPage(metabuffer);
685
696
        metadata = GinPageGetMeta(metapage);
686
697
 
687
 
        if ( metadata->head == InvalidBlockNumber )
 
698
        if (metadata->head == InvalidBlockNumber)
688
699
        {
689
700
                /* Nothing to do */
690
701
                UnlockReleaseBuffer(metabuffer);
702
713
        LockBuffer(metabuffer, GIN_UNLOCK);
703
714
 
704
715
        /*
705
 
         * Initialize.  All temporary space will be in opCtx
 
716
         * Initialize.  All temporary space will be in opCtx
706
717
         */
707
718
        opCtx = AllocSetContextCreate(CurrentMemoryContext,
708
719
                                                                  "GIN insert cleanup temporary context",
712
723
 
713
724
        oldCtx = MemoryContextSwitchTo(opCtx);
714
725
 
715
 
        datums.maxvalues=128;
 
726
        datums.maxvalues = 128;
716
727
        datums.nvalues = 0;
717
 
        datums.values = (Datum*)palloc(sizeof(Datum)*datums.maxvalues);
 
728
        datums.values = (Datum *) palloc(sizeof(Datum) * datums.maxvalues);
718
729
 
719
730
        ginInitBA(&accum);
720
731
        accum.ginstate = ginstate;
721
732
 
722
733
        /*
723
 
         * At the top of this loop, we have pin and lock on the current page
724
 
         * of the pending list.  However, we'll release that before exiting
725
 
         * the loop.  Note we also have pin but not lock on the metapage.
 
734
         * At the top of this loop, we have pin and lock on the current page of
 
735
         * the pending list.  However, we'll release that before exiting the loop.
 
736
         * Note we also have pin but not lock on the metapage.
726
737
         */
727
 
        for(;;)
 
738
        for (;;)
728
739
        {
729
 
                if ( GinPageIsDeleted(page) )
 
740
                if (GinPageIsDeleted(page))
730
741
                {
731
742
                        /* another cleanup process is running concurrently */
732
 
                        UnlockReleaseBuffer( buffer );
 
743
                        UnlockReleaseBuffer(buffer);
733
744
                        break;
734
745
                }
735
746
 
742
753
                        vacuum_delay_point();
743
754
 
744
755
                /*
745
 
                 * Is it time to flush memory to disk?  Flush if we are at the end
746
 
                 * of the pending list, or if we have a full row and memory is
747
 
                 * getting full.
 
756
                 * Is it time to flush memory to disk?  Flush if we are at the end of
 
757
                 * the pending list, or if we have a full row and memory is getting
 
758
                 * full.
748
759
                 *
749
760
                 * XXX using up maintenance_work_mem here is probably unreasonably
750
761
                 * much, since vacuum might already be using that much.
754
765
                         (accum.allocatedMemory >= maintenance_work_mem * 1024L ||
755
766
                          accum.maxdepth > GIN_MAX_TREE_DEPTH)))
756
767
                {
757
 
                        ItemPointerData    *list;
758
 
                        uint32                  nlist;
759
 
                        Datum                   entry;
760
 
                        OffsetNumber            maxoff, attnum;
 
768
                        ItemPointerData *list;
 
769
                        uint32          nlist;
 
770
                        Datum           entry;
 
771
                        OffsetNumber maxoff,
 
772
                                                attnum;
761
773
 
762
774
                        /*
763
 
                         * Unlock current page to increase performance.
764
 
                         * Changes of page will be checked later by comparing
765
 
                         * maxoff after completion of memory flush.
 
775
                         * Unlock current page to increase performance. Changes of page
 
776
                         * will be checked later by comparing maxoff after completion of
 
777
                         * memory flush.
766
778
                         */
767
779
                        maxoff = PageGetMaxOffsetNumber(page);
768
780
                        LockBuffer(buffer, GIN_UNLOCK);
785
797
                        LockBuffer(metabuffer, GIN_EXCLUSIVE);
786
798
                        LockBuffer(buffer, GIN_SHARE);
787
799
 
788
 
                        if ( GinPageIsDeleted(page) )
 
800
                        if (GinPageIsDeleted(page))
789
801
                        {
790
802
                                /* another cleanup process is running concurrently */
791
803
                                UnlockReleaseBuffer(buffer);
795
807
 
796
808
                        /*
797
809
                         * While we left the page unlocked, more stuff might have gotten
798
 
                         * added to it.  If so, process those entries immediately.  There
 
810
                         * added to it.  If so, process those entries immediately.      There
799
811
                         * shouldn't be very many, so we don't worry about the fact that
800
812
                         * we're doing this with exclusive lock. Insertion algorithm
801
813
                         * gurantees that inserted row(s) will not continue on next page.
802
814
                         * NOTE: intentionally no vacuum_delay_point in this loop.
803
815
                         */
804
 
                        if ( PageGetMaxOffsetNumber(page) != maxoff )
 
816
                        if (PageGetMaxOffsetNumber(page) != maxoff)
805
817
                        {
806
818
                                ginInitBA(&accum);
807
 
                                processPendingPage(&accum, &datums, page, maxoff+1);
 
819
                                processPendingPage(&accum, &datums, page, maxoff + 1);
808
820
 
809
821
                                while ((list = ginGetEntry(&accum, &attnum, &entry, &nlist)) != NULL)
810
822
                                        ginEntryInsert(index, ginstate, attnum, entry, list, nlist, FALSE);
814
826
                         * Remember next page - it will become the new list head
815
827
                         */
816
828
                        blkno = GinPageGetOpaque(page)->rightlink;
817
 
                        UnlockReleaseBuffer(buffer); /* shiftList will do exclusive locking */
 
829
                        UnlockReleaseBuffer(buffer);            /* shiftList will do exclusive
 
830
                                                                                                 * locking */
818
831
 
819
832
                        /*
820
833
                         * remove readed pages from pending list, at this point all
821
834
                         * content of readed pages is in regular structure
822
835
                         */
823
 
                        if ( shiftList(index, metabuffer, blkno, stats) )
 
836
                        if (shiftList(index, metabuffer, blkno, stats))
824
837
                        {
825
838
                                /* another cleanup process is running concurrently */
826
839
                                LockBuffer(metabuffer, GIN_UNLOCK);
827
840
                                break;
828
841
                        }
829
842
 
830
 
                        Assert( blkno == metadata->head );
 
843
                        Assert(blkno == metadata->head);
831
844
                        LockBuffer(metabuffer, GIN_UNLOCK);
832
845
 
833
846
                        /*
834
847
                         * if we removed the whole pending list just exit
835
848
                         */
836
 
                        if ( blkno == InvalidBlockNumber )
 
849
                        if (blkno == InvalidBlockNumber)
837
850
                                break;
838
851
 
839
852
                        /*
842
855
                        MemoryContextReset(opCtx);
843
856
                        ginInitBA(&accum);
844
857
                        datums.nvalues = 0;
845
 
                        datums.values = (Datum*)palloc(sizeof(Datum)*datums.maxvalues);
 
858
                        datums.values = (Datum *) palloc(sizeof(Datum) * datums.maxvalues);
846
859
                }
847
860
                else
848
861
                {