~ubuntu-branches/ubuntu/hardy/postgresql-8.4/hardy-backports

« back to all changes in this revision

Viewing changes to src/backend/access/hash/hash.c

  • Committer: Bazaar Package Importer
  • Author(s): Martin Pitt
  • Date: 2009-03-20 12:00:13 UTC
  • Revision ID: james.westby@ubuntu.com-20090320120013-hogj7egc5mjncc5g
Tags: upstream-8.4~0cvs20090328
ImportĀ upstreamĀ versionĀ 8.4~0cvs20090328

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*-------------------------------------------------------------------------
 
2
 *
 
3
 * hash.c
 
4
 *        Implementation of Margo Seltzer's Hashing package for postgres.
 
5
 *
 
6
 * Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group
 
7
 * Portions Copyright (c) 1994, Regents of the University of California
 
8
 *
 
9
 *
 
10
 * IDENTIFICATION
 
11
 *        $PostgreSQL$
 
12
 *
 
13
 * NOTES
 
14
 *        This file contains only the public interface routines.
 
15
 *
 
16
 *-------------------------------------------------------------------------
 
17
 */
 
18
 
 
19
#include "postgres.h"
 
20
 
 
21
#include "access/hash.h"
 
22
#include "access/relscan.h"
 
23
#include "catalog/index.h"
 
24
#include "commands/vacuum.h"
 
25
#include "miscadmin.h"
 
26
#include "optimizer/cost.h"
 
27
#include "optimizer/plancat.h"
 
28
#include "storage/bufmgr.h"
 
29
 
 
30
 
 
31
/* Working state for hashbuild and its callback */
 
32
typedef struct
 
33
{
 
34
        HSpool     *spool;                      /* NULL if not using spooling */
 
35
        double          indtuples;              /* # tuples accepted into index */
 
36
} HashBuildState;
 
37
 
 
38
static void hashbuildCallback(Relation index,
 
39
                                  HeapTuple htup,
 
40
                                  Datum *values,
 
41
                                  bool *isnull,
 
42
                                  bool tupleIsAlive,
 
43
                                  void *state);
 
44
 
 
45
 
 
46
/*
 
47
 *      hashbuild() -- build a new hash index.
 
48
 */
 
49
Datum
 
50
hashbuild(PG_FUNCTION_ARGS)
 
51
{
 
52
        Relation        heap = (Relation) PG_GETARG_POINTER(0);
 
53
        Relation        index = (Relation) PG_GETARG_POINTER(1);
 
54
        IndexInfo  *indexInfo = (IndexInfo *) PG_GETARG_POINTER(2);
 
55
        IndexBuildResult *result;
 
56
        BlockNumber     relpages;
 
57
        double          reltuples;
 
58
        uint32          num_buckets;
 
59
        HashBuildState buildstate;
 
60
 
 
61
        /*
 
62
         * We expect to be called exactly once for any index relation. If that's
 
63
         * not the case, big trouble's what we have.
 
64
         */
 
65
        if (RelationGetNumberOfBlocks(index) != 0)
 
66
                elog(ERROR, "index \"%s\" already contains data",
 
67
                         RelationGetRelationName(index));
 
68
 
 
69
        /* Estimate the number of rows currently present in the table */
 
70
        estimate_rel_size(heap, NULL, &relpages, &reltuples);
 
71
 
 
72
        /* Initialize the hash index metadata page and initial buckets */
 
73
        num_buckets = _hash_metapinit(index, reltuples);
 
74
 
 
75
        /*
 
76
         * If we just insert the tuples into the index in scan order, then
 
77
         * (assuming their hash codes are pretty random) there will be no locality
 
78
         * of access to the index, and if the index is bigger than available RAM
 
79
         * then we'll thrash horribly.  To prevent that scenario, we can sort the
 
80
         * tuples by (expected) bucket number.  However, such a sort is useless
 
81
         * overhead when the index does fit in RAM.  We choose to sort if the
 
82
         * initial index size exceeds NBuffers.
 
83
         *
 
84
         * NOTE: this test will need adjustment if a bucket is ever different
 
85
         * from one page.
 
86
         */
 
87
        if (num_buckets >= (uint32) NBuffers)
 
88
                buildstate.spool = _h_spoolinit(index, num_buckets);
 
89
        else
 
90
                buildstate.spool = NULL;
 
91
 
 
92
        /* prepare to build the index */
 
93
        buildstate.indtuples = 0;
 
94
 
 
95
        /* do the heap scan */
 
96
        reltuples = IndexBuildHeapScan(heap, index, indexInfo, true,
 
97
                                                                   hashbuildCallback, (void *) &buildstate);
 
98
 
 
99
        if (buildstate.spool)
 
100
        {
 
101
                /* sort the tuples and insert them into the index */
 
102
                _h_indexbuild(buildstate.spool);
 
103
                _h_spooldestroy(buildstate.spool);
 
104
        }
 
105
 
 
106
        /*
 
107
         * Return statistics
 
108
         */
 
109
        result = (IndexBuildResult *) palloc(sizeof(IndexBuildResult));
 
110
 
 
111
        result->heap_tuples = reltuples;
 
112
        result->index_tuples = buildstate.indtuples;
 
113
 
 
114
        PG_RETURN_POINTER(result);
 
115
}
 
116
 
 
117
/*
 
118
 * Per-tuple callback from IndexBuildHeapScan
 
119
 */
 
120
static void
 
121
hashbuildCallback(Relation index,
 
122
                                  HeapTuple htup,
 
123
                                  Datum *values,
 
124
                                  bool *isnull,
 
125
                                  bool tupleIsAlive,
 
126
                                  void *state)
 
127
{
 
128
        HashBuildState *buildstate = (HashBuildState *) state;
 
129
        IndexTuple      itup;
 
130
 
 
131
        /* form an index tuple and point it at the heap tuple */
 
132
        itup = _hash_form_tuple(index, values, isnull);
 
133
        itup->t_tid = htup->t_self;
 
134
 
 
135
        /* Hash indexes don't index nulls, see notes in hashinsert */
 
136
        if (IndexTupleHasNulls(itup))
 
137
        {
 
138
                pfree(itup);
 
139
                return;
 
140
        }
 
141
 
 
142
        /* Either spool the tuple for sorting, or just put it into the index */
 
143
        if (buildstate->spool)
 
144
                _h_spool(itup, buildstate->spool);
 
145
        else
 
146
                _hash_doinsert(index, itup);
 
147
 
 
148
        buildstate->indtuples += 1;
 
149
 
 
150
        pfree(itup);
 
151
}
 
152
 
 
153
/*
 
154
 *      hashinsert() -- insert an index tuple into a hash table.
 
155
 *
 
156
 *      Hash on the heap tuple's key, form an index tuple with hash code.
 
157
 *      Find the appropriate location for the new tuple, and put it there.
 
158
 */
 
159
Datum
 
160
hashinsert(PG_FUNCTION_ARGS)
 
161
{
 
162
        Relation        rel = (Relation) PG_GETARG_POINTER(0);
 
163
        Datum      *values = (Datum *) PG_GETARG_POINTER(1);
 
164
        bool       *isnull = (bool *) PG_GETARG_POINTER(2);
 
165
        ItemPointer ht_ctid = (ItemPointer) PG_GETARG_POINTER(3);
 
166
 
 
167
#ifdef NOT_USED
 
168
        Relation        heapRel = (Relation) PG_GETARG_POINTER(4);
 
169
        bool            checkUnique = PG_GETARG_BOOL(5);
 
170
#endif
 
171
        IndexTuple      itup;
 
172
 
 
173
        /* generate an index tuple */
 
174
        itup = _hash_form_tuple(rel, values, isnull);
 
175
        itup->t_tid = *ht_ctid;
 
176
 
 
177
        /*
 
178
         * If the single index key is null, we don't insert it into the index.
 
179
         * Hash tables support scans on '='. Relational algebra says that A = B
 
180
         * returns null if either A or B is null.  This means that no
 
181
         * qualification used in an index scan could ever return true on a null
 
182
         * attribute.  It also means that indices can't be used by ISNULL or
 
183
         * NOTNULL scans, but that's an artifact of the strategy map architecture
 
184
         * chosen in 1986, not of the way nulls are handled here.
 
185
         */
 
186
        if (IndexTupleHasNulls(itup))
 
187
        {
 
188
                pfree(itup);
 
189
                PG_RETURN_BOOL(false);
 
190
        }
 
191
 
 
192
        _hash_doinsert(rel, itup);
 
193
 
 
194
        pfree(itup);
 
195
 
 
196
        PG_RETURN_BOOL(true);
 
197
}
 
198
 
 
199
 
 
200
/*
 
201
 *      hashgettuple() -- Get the next tuple in the scan.
 
202
 */
 
203
Datum
 
204
hashgettuple(PG_FUNCTION_ARGS)
 
205
{
 
206
        IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
 
207
        ScanDirection dir = (ScanDirection) PG_GETARG_INT32(1);
 
208
        HashScanOpaque so = (HashScanOpaque) scan->opaque;
 
209
        Relation        rel = scan->indexRelation;
 
210
        Page            page;
 
211
        OffsetNumber offnum;
 
212
        bool            res;
 
213
 
 
214
        /* Hash indexes are always lossy since we store only the hash code */
 
215
        scan->xs_recheck = true;
 
216
 
 
217
        /*
 
218
         * We hold pin but not lock on current buffer while outside the hash AM.
 
219
         * Reacquire the read lock here.
 
220
         */
 
221
        if (BufferIsValid(so->hashso_curbuf))
 
222
                _hash_chgbufaccess(rel, so->hashso_curbuf, HASH_NOLOCK, HASH_READ);
 
223
 
 
224
        /*
 
225
         * If we've already initialized this scan, we can just advance it in the
 
226
         * appropriate direction.  If we haven't done so yet, we call a routine to
 
227
         * get the first item in the scan.
 
228
         */
 
229
        if (ItemPointerIsValid(&(so->hashso_curpos)))
 
230
        {
 
231
                /*
 
232
                 * Check to see if we should kill the previously-fetched tuple.
 
233
                 */
 
234
                if (scan->kill_prior_tuple)
 
235
                {
 
236
                        /*
 
237
                         * Yes, so mark it by setting the LP_DEAD state in the item flags.
 
238
                         */
 
239
                        offnum = ItemPointerGetOffsetNumber(&(so->hashso_curpos));
 
240
                        page = BufferGetPage(so->hashso_curbuf);
 
241
                        ItemIdMarkDead(PageGetItemId(page, offnum));
 
242
 
 
243
                        /*
 
244
                         * Since this can be redone later if needed, it's treated the same
 
245
                         * as a commit-hint-bit status update for heap tuples: we mark the
 
246
                         * buffer dirty but don't make a WAL log entry.
 
247
                         */
 
248
                        SetBufferCommitInfoNeedsSave(so->hashso_curbuf);
 
249
                }
 
250
 
 
251
                /*
 
252
                 * Now continue the scan.
 
253
                 */
 
254
                res = _hash_next(scan, dir);
 
255
        }
 
256
        else
 
257
                res = _hash_first(scan, dir);
 
258
 
 
259
        /*
 
260
         * Skip killed tuples if asked to.
 
261
         */
 
262
        if (scan->ignore_killed_tuples)
 
263
        {
 
264
                while (res)
 
265
                {
 
266
                        offnum = ItemPointerGetOffsetNumber(&(so->hashso_curpos));
 
267
                        page = BufferGetPage(so->hashso_curbuf);
 
268
                        if (!ItemIdIsDead(PageGetItemId(page, offnum)))
 
269
                                break;
 
270
                        res = _hash_next(scan, dir);
 
271
                }
 
272
        }
 
273
 
 
274
        /* Release read lock on current buffer, but keep it pinned */
 
275
        if (BufferIsValid(so->hashso_curbuf))
 
276
                _hash_chgbufaccess(rel, so->hashso_curbuf, HASH_READ, HASH_NOLOCK);
 
277
 
 
278
        PG_RETURN_BOOL(res);
 
279
}
 
280
 
 
281
 
 
282
/*
 
283
 *      hashgetbitmap() -- get all tuples at once
 
284
 */
 
285
Datum
 
286
hashgetbitmap(PG_FUNCTION_ARGS)
 
287
{
 
288
        IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
 
289
        TIDBitmap *tbm = (TIDBitmap *) PG_GETARG_POINTER(1);
 
290
        HashScanOpaque so = (HashScanOpaque) scan->opaque;
 
291
        bool            res;
 
292
        int64           ntids = 0;
 
293
 
 
294
        res = _hash_first(scan, ForwardScanDirection);
 
295
 
 
296
        while (res)
 
297
        {
 
298
                bool    add_tuple;
 
299
 
 
300
                CHECK_FOR_INTERRUPTS();
 
301
 
 
302
                /*
 
303
                 * Skip killed tuples if asked to.
 
304
                 */
 
305
                if (scan->ignore_killed_tuples)
 
306
                {
 
307
                        Page            page;
 
308
                        OffsetNumber offnum;
 
309
 
 
310
                        offnum = ItemPointerGetOffsetNumber(&(so->hashso_curpos));
 
311
                        page = BufferGetPage(so->hashso_curbuf);
 
312
                        add_tuple = !ItemIdIsDead(PageGetItemId(page, offnum));
 
313
                }
 
314
                else
 
315
                        add_tuple = true;
 
316
 
 
317
                /* Save tuple ID, and continue scanning */
 
318
                if (add_tuple) 
 
319
                {
 
320
                        /* Note we mark the tuple ID as requiring recheck */
 
321
                        tbm_add_tuples(tbm, &scan->xs_ctup.t_self, 1, true);
 
322
                        ntids++;
 
323
                }
 
324
 
 
325
                res = _hash_next(scan, ForwardScanDirection);
 
326
        }
 
327
 
 
328
        PG_RETURN_INT64(ntids);
 
329
}
 
330
 
 
331
 
 
332
/*
 
333
 *      hashbeginscan() -- start a scan on a hash index
 
334
 */
 
335
Datum
 
336
hashbeginscan(PG_FUNCTION_ARGS)
 
337
{
 
338
        Relation        rel = (Relation) PG_GETARG_POINTER(0);
 
339
        int                     keysz = PG_GETARG_INT32(1);
 
340
        ScanKey         scankey = (ScanKey) PG_GETARG_POINTER(2);
 
341
        IndexScanDesc scan;
 
342
        HashScanOpaque so;
 
343
 
 
344
        scan = RelationGetIndexScan(rel, keysz, scankey);
 
345
        so = (HashScanOpaque) palloc(sizeof(HashScanOpaqueData));
 
346
        so->hashso_bucket_valid = false;
 
347
        so->hashso_bucket_blkno = 0;
 
348
        so->hashso_curbuf = InvalidBuffer;
 
349
        /* set position invalid (this will cause _hash_first call) */
 
350
        ItemPointerSetInvalid(&(so->hashso_curpos));
 
351
 
 
352
        scan->opaque = so;
 
353
 
 
354
        /* register scan in case we change pages it's using */
 
355
        _hash_regscan(scan);
 
356
 
 
357
        PG_RETURN_POINTER(scan);
 
358
}
 
359
 
 
360
/*
 
361
 *      hashrescan() -- rescan an index relation
 
362
 */
 
363
Datum
 
364
hashrescan(PG_FUNCTION_ARGS)
 
365
{
 
366
        IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
 
367
        ScanKey         scankey = (ScanKey) PG_GETARG_POINTER(1);
 
368
        HashScanOpaque so = (HashScanOpaque) scan->opaque;
 
369
        Relation        rel = scan->indexRelation;
 
370
 
 
371
        /* if we are called from beginscan, so is still NULL */
 
372
        if (so)
 
373
        {
 
374
                /* release any pin we still hold */
 
375
                if (BufferIsValid(so->hashso_curbuf))
 
376
                        _hash_dropbuf(rel, so->hashso_curbuf);
 
377
                so->hashso_curbuf = InvalidBuffer;
 
378
 
 
379
                /* release lock on bucket, too */
 
380
                if (so->hashso_bucket_blkno)
 
381
                        _hash_droplock(rel, so->hashso_bucket_blkno, HASH_SHARE);
 
382
                so->hashso_bucket_blkno = 0;
 
383
 
 
384
                /* set position invalid (this will cause _hash_first call) */
 
385
                ItemPointerSetInvalid(&(so->hashso_curpos));
 
386
        }
 
387
 
 
388
        /* Update scan key, if a new one is given */
 
389
        if (scankey && scan->numberOfKeys > 0)
 
390
        {
 
391
                memmove(scan->keyData,
 
392
                                scankey,
 
393
                                scan->numberOfKeys * sizeof(ScanKeyData));
 
394
                if (so)
 
395
                        so->hashso_bucket_valid = false;
 
396
        }
 
397
 
 
398
        PG_RETURN_VOID();
 
399
}
 
400
 
 
401
/*
 
402
 *      hashendscan() -- close down a scan
 
403
 */
 
404
Datum
 
405
hashendscan(PG_FUNCTION_ARGS)
 
406
{
 
407
        IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
 
408
        HashScanOpaque so = (HashScanOpaque) scan->opaque;
 
409
        Relation        rel = scan->indexRelation;
 
410
 
 
411
        /* don't need scan registered anymore */
 
412
        _hash_dropscan(scan);
 
413
 
 
414
        /* release any pin we still hold */
 
415
        if (BufferIsValid(so->hashso_curbuf))
 
416
                _hash_dropbuf(rel, so->hashso_curbuf);
 
417
        so->hashso_curbuf = InvalidBuffer;
 
418
 
 
419
        /* release lock on bucket, too */
 
420
        if (so->hashso_bucket_blkno)
 
421
                _hash_droplock(rel, so->hashso_bucket_blkno, HASH_SHARE);
 
422
        so->hashso_bucket_blkno = 0;
 
423
 
 
424
        pfree(so);
 
425
        scan->opaque = NULL;
 
426
 
 
427
        PG_RETURN_VOID();
 
428
}
 
429
 
 
430
/*
 
431
 *      hashmarkpos() -- save current scan position
 
432
 */
 
433
Datum
 
434
hashmarkpos(PG_FUNCTION_ARGS)
 
435
{
 
436
        elog(ERROR, "hash does not support mark/restore");
 
437
        PG_RETURN_VOID();
 
438
}
 
439
 
 
440
/*
 
441
 *      hashrestrpos() -- restore scan to last saved position
 
442
 */
 
443
Datum
 
444
hashrestrpos(PG_FUNCTION_ARGS)
 
445
{
 
446
        elog(ERROR, "hash does not support mark/restore");
 
447
        PG_RETURN_VOID();
 
448
}
 
449
 
 
450
/*
 
451
 * Bulk deletion of all index entries pointing to a set of heap tuples.
 
452
 * The set of target tuples is specified via a callback routine that tells
 
453
 * whether any given heap tuple (identified by ItemPointer) is being deleted.
 
454
 *
 
455
 * Result: a palloc'd struct containing statistical info for VACUUM displays.
 
456
 */
 
457
Datum
 
458
hashbulkdelete(PG_FUNCTION_ARGS)
 
459
{
 
460
        IndexVacuumInfo *info = (IndexVacuumInfo *) PG_GETARG_POINTER(0);
 
461
        IndexBulkDeleteResult *stats = (IndexBulkDeleteResult *) PG_GETARG_POINTER(1);
 
462
        IndexBulkDeleteCallback callback = (IndexBulkDeleteCallback) PG_GETARG_POINTER(2);
 
463
        void       *callback_state = (void *) PG_GETARG_POINTER(3);
 
464
        Relation        rel = info->index;
 
465
        double          tuples_removed;
 
466
        double          num_index_tuples;
 
467
        double          orig_ntuples;
 
468
        Bucket          orig_maxbucket;
 
469
        Bucket          cur_maxbucket;
 
470
        Bucket          cur_bucket;
 
471
        Buffer          metabuf;
 
472
        HashMetaPage metap;
 
473
        HashMetaPageData local_metapage;
 
474
 
 
475
        tuples_removed = 0;
 
476
        num_index_tuples = 0;
 
477
 
 
478
        /*
 
479
         * Read the metapage to fetch original bucket and tuple counts.  Also, we
 
480
         * keep a copy of the last-seen metapage so that we can use its
 
481
         * hashm_spares[] values to compute bucket page addresses.      This is a bit
 
482
         * hokey but perfectly safe, since the interesting entries in the spares
 
483
         * array cannot change under us; and it beats rereading the metapage for
 
484
         * each bucket.
 
485
         */
 
486
        metabuf = _hash_getbuf(rel, HASH_METAPAGE, HASH_READ, LH_META_PAGE);
 
487
        metap =  HashPageGetMeta(BufferGetPage(metabuf));
 
488
        orig_maxbucket = metap->hashm_maxbucket;
 
489
        orig_ntuples = metap->hashm_ntuples;
 
490
        memcpy(&local_metapage, metap, sizeof(local_metapage));
 
491
        _hash_relbuf(rel, metabuf);
 
492
 
 
493
        /* Scan the buckets that we know exist */
 
494
        cur_bucket = 0;
 
495
        cur_maxbucket = orig_maxbucket;
 
496
 
 
497
loop_top:
 
498
        while (cur_bucket <= cur_maxbucket)
 
499
        {
 
500
                BlockNumber bucket_blkno;
 
501
                BlockNumber blkno;
 
502
                bool            bucket_dirty = false;
 
503
 
 
504
                /* Get address of bucket's start page */
 
505
                bucket_blkno = BUCKET_TO_BLKNO(&local_metapage, cur_bucket);
 
506
 
 
507
                /* Exclusive-lock the bucket so we can shrink it */
 
508
                _hash_getlock(rel, bucket_blkno, HASH_EXCLUSIVE);
 
509
 
 
510
                /* Shouldn't have any active scans locally, either */
 
511
                if (_hash_has_active_scan(rel, cur_bucket))
 
512
                        elog(ERROR, "hash index has active scan during VACUUM");
 
513
 
 
514
                /* Scan each page in bucket */
 
515
                blkno = bucket_blkno;
 
516
                while (BlockNumberIsValid(blkno))
 
517
                {
 
518
                        Buffer          buf;
 
519
                        Page            page;
 
520
                        HashPageOpaque opaque;
 
521
                        OffsetNumber offno;
 
522
                        OffsetNumber maxoffno;
 
523
                        bool            page_dirty = false;
 
524
 
 
525
                        vacuum_delay_point();
 
526
 
 
527
                        buf = _hash_getbuf_with_strategy(rel, blkno, HASH_WRITE,
 
528
                                                                                   LH_BUCKET_PAGE | LH_OVERFLOW_PAGE,
 
529
                                                                                         info->strategy);
 
530
                        page = BufferGetPage(buf);
 
531
                        opaque = (HashPageOpaque) PageGetSpecialPointer(page);
 
532
                        Assert(opaque->hasho_bucket == cur_bucket);
 
533
 
 
534
                        /* Scan each tuple in page */
 
535
                        offno = FirstOffsetNumber;
 
536
                        maxoffno = PageGetMaxOffsetNumber(page);
 
537
                        while (offno <= maxoffno)
 
538
                        {
 
539
                                IndexTuple      itup;
 
540
                                ItemPointer htup;
 
541
 
 
542
                                itup = (IndexTuple) PageGetItem(page,
 
543
                                                                                                PageGetItemId(page, offno));
 
544
                                htup = &(itup->t_tid);
 
545
                                if (callback(htup, callback_state))
 
546
                                {
 
547
                                        /* delete the item from the page */
 
548
                                        PageIndexTupleDelete(page, offno);
 
549
                                        bucket_dirty = page_dirty = true;
 
550
 
 
551
                                        /* don't increment offno, instead decrement maxoffno */
 
552
                                        maxoffno = OffsetNumberPrev(maxoffno);
 
553
 
 
554
                                        tuples_removed += 1;
 
555
                                }
 
556
                                else
 
557
                                {
 
558
                                        offno = OffsetNumberNext(offno);
 
559
 
 
560
                                        num_index_tuples += 1;
 
561
                                }
 
562
                        }
 
563
 
 
564
                        /*
 
565
                         * Write page if needed, advance to next page.
 
566
                         */
 
567
                        blkno = opaque->hasho_nextblkno;
 
568
 
 
569
                        if (page_dirty)
 
570
                                _hash_wrtbuf(rel, buf);
 
571
                        else
 
572
                                _hash_relbuf(rel, buf);
 
573
                }
 
574
 
 
575
                /* If we deleted anything, try to compact free space */
 
576
                if (bucket_dirty)
 
577
                        _hash_squeezebucket(rel, cur_bucket, bucket_blkno,
 
578
                                                                info->strategy);
 
579
 
 
580
                /* Release bucket lock */
 
581
                _hash_droplock(rel, bucket_blkno, HASH_EXCLUSIVE);
 
582
 
 
583
                /* Advance to next bucket */
 
584
                cur_bucket++;
 
585
        }
 
586
 
 
587
        /* Write-lock metapage and check for split since we started */
 
588
        metabuf = _hash_getbuf(rel, HASH_METAPAGE, HASH_WRITE, LH_META_PAGE);
 
589
        metap = HashPageGetMeta(BufferGetPage(metabuf));
 
590
 
 
591
        if (cur_maxbucket != metap->hashm_maxbucket)
 
592
        {
 
593
                /* There's been a split, so process the additional bucket(s) */
 
594
                cur_maxbucket = metap->hashm_maxbucket;
 
595
                memcpy(&local_metapage, metap, sizeof(local_metapage));
 
596
                _hash_relbuf(rel, metabuf);
 
597
                goto loop_top;
 
598
        }
 
599
 
 
600
        /* Okay, we're really done.  Update tuple count in metapage. */
 
601
 
 
602
        if (orig_maxbucket == metap->hashm_maxbucket &&
 
603
                orig_ntuples == metap->hashm_ntuples)
 
604
        {
 
605
                /*
 
606
                 * No one has split or inserted anything since start of scan, so
 
607
                 * believe our count as gospel.
 
608
                 */
 
609
                metap->hashm_ntuples = num_index_tuples;
 
610
        }
 
611
        else
 
612
        {
 
613
                /*
 
614
                 * Otherwise, our count is untrustworthy since we may have
 
615
                 * double-scanned tuples in split buckets.      Proceed by dead-reckoning.
 
616
                 */
 
617
                if (metap->hashm_ntuples > tuples_removed)
 
618
                        metap->hashm_ntuples -= tuples_removed;
 
619
                else
 
620
                        metap->hashm_ntuples = 0;
 
621
                num_index_tuples = metap->hashm_ntuples;
 
622
        }
 
623
 
 
624
        _hash_wrtbuf(rel, metabuf);
 
625
 
 
626
        /* return statistics */
 
627
        if (stats == NULL)
 
628
                stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
 
629
        stats->num_index_tuples = num_index_tuples;
 
630
        stats->tuples_removed += tuples_removed;
 
631
        /* hashvacuumcleanup will fill in num_pages */
 
632
 
 
633
        PG_RETURN_POINTER(stats);
 
634
}
 
635
 
 
636
/*
 
637
 * Post-VACUUM cleanup.
 
638
 *
 
639
 * Result: a palloc'd struct containing statistical info for VACUUM displays.
 
640
 */
 
641
Datum
 
642
hashvacuumcleanup(PG_FUNCTION_ARGS)
 
643
{
 
644
        IndexVacuumInfo *info = (IndexVacuumInfo *) PG_GETARG_POINTER(0);
 
645
        IndexBulkDeleteResult *stats = (IndexBulkDeleteResult *) PG_GETARG_POINTER(1);
 
646
        Relation        rel = info->index;
 
647
        BlockNumber num_pages;
 
648
 
 
649
        /* If hashbulkdelete wasn't called, return NULL signifying no change */
 
650
        /* Note: this covers the analyze_only case too */
 
651
        if (stats == NULL)
 
652
                PG_RETURN_POINTER(NULL);
 
653
 
 
654
        /* update statistics */
 
655
        num_pages = RelationGetNumberOfBlocks(rel);
 
656
        stats->num_pages = num_pages;
 
657
 
 
658
        PG_RETURN_POINTER(stats);
 
659
}
 
660
 
 
661
 
 
662
void
 
663
hash_redo(XLogRecPtr lsn, XLogRecord *record)
 
664
{
 
665
        elog(PANIC, "hash_redo: unimplemented");
 
666
}
 
667
 
 
668
void
 
669
hash_desc(StringInfo buf, uint8 xl_info, char *rec)
 
670
{
 
671
}