~ubuntu-branches/ubuntu/oneiric/postgresql-9.1/oneiric-security

« back to all changes in this revision

Viewing changes to src/backend/access/hash/hash.c

  • Committer: Bazaar Package Importer
  • Author(s): Martin Pitt
  • Date: 2011-05-11 10:41:53 UTC
  • Revision ID: james.westby@ubuntu.com-20110511104153-psbh2o58553fv1m0
Tags: upstream-9.1~beta1
ImportĀ upstreamĀ versionĀ 9.1~beta1

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*-------------------------------------------------------------------------
 
2
 *
 
3
 * hash.c
 
4
 *        Implementation of Margo Seltzer's Hashing package for postgres.
 
5
 *
 
6
 * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
 
7
 * Portions Copyright (c) 1994, Regents of the University of California
 
8
 *
 
9
 *
 
10
 * IDENTIFICATION
 
11
 *        src/backend/access/hash/hash.c
 
12
 *
 
13
 * NOTES
 
14
 *        This file contains only the public interface routines.
 
15
 *
 
16
 *-------------------------------------------------------------------------
 
17
 */
 
18
 
 
19
#include "postgres.h"
 
20
 
 
21
#include "access/hash.h"
 
22
#include "access/relscan.h"
 
23
#include "catalog/index.h"
 
24
#include "commands/vacuum.h"
 
25
#include "optimizer/cost.h"
 
26
#include "optimizer/plancat.h"
 
27
#include "storage/bufmgr.h"
 
28
 
 
29
 
 
30
/* Working state for hashbuild and its callback */
 
31
typedef struct
 
32
{
 
33
        HSpool     *spool;                      /* NULL if not using spooling */
 
34
        double          indtuples;              /* # tuples accepted into index */
 
35
} HashBuildState;
 
36
 
 
37
static void hashbuildCallback(Relation index,
 
38
                                  HeapTuple htup,
 
39
                                  Datum *values,
 
40
                                  bool *isnull,
 
41
                                  bool tupleIsAlive,
 
42
                                  void *state);
 
43
 
 
44
 
 
45
/*
 
46
 *      hashbuild() -- build a new hash index.
 
47
 */
 
48
Datum
 
49
hashbuild(PG_FUNCTION_ARGS)
 
50
{
 
51
        Relation        heap = (Relation) PG_GETARG_POINTER(0);
 
52
        Relation        index = (Relation) PG_GETARG_POINTER(1);
 
53
        IndexInfo  *indexInfo = (IndexInfo *) PG_GETARG_POINTER(2);
 
54
        IndexBuildResult *result;
 
55
        BlockNumber relpages;
 
56
        double          reltuples;
 
57
        uint32          num_buckets;
 
58
        HashBuildState buildstate;
 
59
 
 
60
        /*
 
61
         * We expect to be called exactly once for any index relation. If that's
 
62
         * not the case, big trouble's what we have.
 
63
         */
 
64
        if (RelationGetNumberOfBlocks(index) != 0)
 
65
                elog(ERROR, "index \"%s\" already contains data",
 
66
                         RelationGetRelationName(index));
 
67
 
 
68
        /* Estimate the number of rows currently present in the table */
 
69
        estimate_rel_size(heap, NULL, &relpages, &reltuples);
 
70
 
 
71
        /* Initialize the hash index metadata page and initial buckets */
 
72
        num_buckets = _hash_metapinit(index, reltuples, MAIN_FORKNUM);
 
73
 
 
74
        /*
 
75
         * If we just insert the tuples into the index in scan order, then
 
76
         * (assuming their hash codes are pretty random) there will be no locality
 
77
         * of access to the index, and if the index is bigger than available RAM
 
78
         * then we'll thrash horribly.  To prevent that scenario, we can sort the
 
79
         * tuples by (expected) bucket number.  However, such a sort is useless
 
80
         * overhead when the index does fit in RAM.  We choose to sort if the
 
81
         * initial index size exceeds NBuffers.
 
82
         *
 
83
         * NOTE: this test will need adjustment if a bucket is ever different from
 
84
         * one page.
 
85
         */
 
86
        if (num_buckets >= (uint32) NBuffers)
 
87
                buildstate.spool = _h_spoolinit(index, num_buckets);
 
88
        else
 
89
                buildstate.spool = NULL;
 
90
 
 
91
        /* prepare to build the index */
 
92
        buildstate.indtuples = 0;
 
93
 
 
94
        /* do the heap scan */
 
95
        reltuples = IndexBuildHeapScan(heap, index, indexInfo, true,
 
96
                                                                   hashbuildCallback, (void *) &buildstate);
 
97
 
 
98
        if (buildstate.spool)
 
99
        {
 
100
                /* sort the tuples and insert them into the index */
 
101
                _h_indexbuild(buildstate.spool);
 
102
                _h_spooldestroy(buildstate.spool);
 
103
        }
 
104
 
 
105
        /*
 
106
         * Return statistics
 
107
         */
 
108
        result = (IndexBuildResult *) palloc(sizeof(IndexBuildResult));
 
109
 
 
110
        result->heap_tuples = reltuples;
 
111
        result->index_tuples = buildstate.indtuples;
 
112
 
 
113
        PG_RETURN_POINTER(result);
 
114
}
 
115
 
 
116
/*
 
117
 *      hashbuildempty() -- build an empty hash index in the initialization fork
 
118
 */
 
119
Datum
 
120
hashbuildempty(PG_FUNCTION_ARGS)
 
121
{
 
122
        Relation        index = (Relation) PG_GETARG_POINTER(0);
 
123
 
 
124
        _hash_metapinit(index, 0, INIT_FORKNUM);
 
125
 
 
126
        PG_RETURN_VOID();
 
127
}
 
128
 
 
129
/*
 
130
 * Per-tuple callback from IndexBuildHeapScan
 
131
 */
 
132
static void
 
133
hashbuildCallback(Relation index,
 
134
                                  HeapTuple htup,
 
135
                                  Datum *values,
 
136
                                  bool *isnull,
 
137
                                  bool tupleIsAlive,
 
138
                                  void *state)
 
139
{
 
140
        HashBuildState *buildstate = (HashBuildState *) state;
 
141
        IndexTuple      itup;
 
142
 
 
143
        /* form an index tuple and point it at the heap tuple */
 
144
        itup = _hash_form_tuple(index, values, isnull);
 
145
        itup->t_tid = htup->t_self;
 
146
 
 
147
        /* Hash indexes don't index nulls, see notes in hashinsert */
 
148
        if (IndexTupleHasNulls(itup))
 
149
        {
 
150
                pfree(itup);
 
151
                return;
 
152
        }
 
153
 
 
154
        /* Either spool the tuple for sorting, or just put it into the index */
 
155
        if (buildstate->spool)
 
156
                _h_spool(itup, buildstate->spool);
 
157
        else
 
158
                _hash_doinsert(index, itup);
 
159
 
 
160
        buildstate->indtuples += 1;
 
161
 
 
162
        pfree(itup);
 
163
}
 
164
 
 
165
/*
 
166
 *      hashinsert() -- insert an index tuple into a hash table.
 
167
 *
 
168
 *      Hash on the heap tuple's key, form an index tuple with hash code.
 
169
 *      Find the appropriate location for the new tuple, and put it there.
 
170
 */
 
171
Datum
 
172
hashinsert(PG_FUNCTION_ARGS)
 
173
{
 
174
        Relation        rel = (Relation) PG_GETARG_POINTER(0);
 
175
        Datum      *values = (Datum *) PG_GETARG_POINTER(1);
 
176
        bool       *isnull = (bool *) PG_GETARG_POINTER(2);
 
177
        ItemPointer ht_ctid = (ItemPointer) PG_GETARG_POINTER(3);
 
178
 
 
179
#ifdef NOT_USED
 
180
        Relation        heapRel = (Relation) PG_GETARG_POINTER(4);
 
181
        IndexUniqueCheck checkUnique = (IndexUniqueCheck) PG_GETARG_INT32(5);
 
182
#endif
 
183
        IndexTuple      itup;
 
184
 
 
185
        /* generate an index tuple */
 
186
        itup = _hash_form_tuple(rel, values, isnull);
 
187
        itup->t_tid = *ht_ctid;
 
188
 
 
189
        /*
 
190
         * If the single index key is null, we don't insert it into the index.
 
191
         * Hash tables support scans on '='. Relational algebra says that A = B
 
192
         * returns null if either A or B is null.  This means that no
 
193
         * qualification used in an index scan could ever return true on a null
 
194
         * attribute.  It also means that indices can't be used by ISNULL or
 
195
         * NOTNULL scans, but that's an artifact of the strategy map architecture
 
196
         * chosen in 1986, not of the way nulls are handled here.
 
197
         */
 
198
        if (IndexTupleHasNulls(itup))
 
199
        {
 
200
                pfree(itup);
 
201
                PG_RETURN_BOOL(false);
 
202
        }
 
203
 
 
204
        _hash_doinsert(rel, itup);
 
205
 
 
206
        pfree(itup);
 
207
 
 
208
        PG_RETURN_BOOL(false);
 
209
}
 
210
 
 
211
 
 
212
/*
 
213
 *      hashgettuple() -- Get the next tuple in the scan.
 
214
 */
 
215
Datum
 
216
hashgettuple(PG_FUNCTION_ARGS)
 
217
{
 
218
        IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
 
219
        ScanDirection dir = (ScanDirection) PG_GETARG_INT32(1);
 
220
        HashScanOpaque so = (HashScanOpaque) scan->opaque;
 
221
        Relation        rel = scan->indexRelation;
 
222
        Buffer          buf;
 
223
        Page            page;
 
224
        OffsetNumber offnum;
 
225
        ItemPointer current;
 
226
        bool            res;
 
227
 
 
228
        /* Hash indexes are always lossy since we store only the hash code */
 
229
        scan->xs_recheck = true;
 
230
 
 
231
        /*
 
232
         * We hold pin but not lock on current buffer while outside the hash AM.
 
233
         * Reacquire the read lock here.
 
234
         */
 
235
        if (BufferIsValid(so->hashso_curbuf))
 
236
                _hash_chgbufaccess(rel, so->hashso_curbuf, HASH_NOLOCK, HASH_READ);
 
237
 
 
238
        /*
 
239
         * If we've already initialized this scan, we can just advance it in the
 
240
         * appropriate direction.  If we haven't done so yet, we call a routine to
 
241
         * get the first item in the scan.
 
242
         */
 
243
        current = &(so->hashso_curpos);
 
244
        if (ItemPointerIsValid(current))
 
245
        {
 
246
                /*
 
247
                 * An insertion into the current index page could have happened while
 
248
                 * we didn't have read lock on it.  Re-find our position by looking
 
249
                 * for the TID we previously returned.  (Because we hold share lock on
 
250
                 * the bucket, no deletions or splits could have occurred; therefore
 
251
                 * we can expect that the TID still exists in the current index page,
 
252
                 * at an offset >= where we were.)
 
253
                 */
 
254
                OffsetNumber maxoffnum;
 
255
 
 
256
                buf = so->hashso_curbuf;
 
257
                Assert(BufferIsValid(buf));
 
258
                page = BufferGetPage(buf);
 
259
                maxoffnum = PageGetMaxOffsetNumber(page);
 
260
                for (offnum = ItemPointerGetOffsetNumber(current);
 
261
                         offnum <= maxoffnum;
 
262
                         offnum = OffsetNumberNext(offnum))
 
263
                {
 
264
                        IndexTuple      itup;
 
265
 
 
266
                        itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, offnum));
 
267
                        if (ItemPointerEquals(&(so->hashso_heappos), &(itup->t_tid)))
 
268
                                break;
 
269
                }
 
270
                if (offnum > maxoffnum)
 
271
                        elog(ERROR, "failed to re-find scan position within index \"%s\"",
 
272
                                 RelationGetRelationName(rel));
 
273
                ItemPointerSetOffsetNumber(current, offnum);
 
274
 
 
275
                /*
 
276
                 * Check to see if we should kill the previously-fetched tuple.
 
277
                 */
 
278
                if (scan->kill_prior_tuple)
 
279
                {
 
280
                        /*
 
281
                         * Yes, so mark it by setting the LP_DEAD state in the item flags.
 
282
                         */
 
283
                        ItemIdMarkDead(PageGetItemId(page, offnum));
 
284
 
 
285
                        /*
 
286
                         * Since this can be redone later if needed, it's treated the same
 
287
                         * as a commit-hint-bit status update for heap tuples: we mark the
 
288
                         * buffer dirty but don't make a WAL log entry.
 
289
                         */
 
290
                        SetBufferCommitInfoNeedsSave(buf);
 
291
                }
 
292
 
 
293
                /*
 
294
                 * Now continue the scan.
 
295
                 */
 
296
                res = _hash_next(scan, dir);
 
297
        }
 
298
        else
 
299
                res = _hash_first(scan, dir);
 
300
 
 
301
        /*
 
302
         * Skip killed tuples if asked to.
 
303
         */
 
304
        if (scan->ignore_killed_tuples)
 
305
        {
 
306
                while (res)
 
307
                {
 
308
                        offnum = ItemPointerGetOffsetNumber(current);
 
309
                        page = BufferGetPage(so->hashso_curbuf);
 
310
                        if (!ItemIdIsDead(PageGetItemId(page, offnum)))
 
311
                                break;
 
312
                        res = _hash_next(scan, dir);
 
313
                }
 
314
        }
 
315
 
 
316
        /* Release read lock on current buffer, but keep it pinned */
 
317
        if (BufferIsValid(so->hashso_curbuf))
 
318
                _hash_chgbufaccess(rel, so->hashso_curbuf, HASH_READ, HASH_NOLOCK);
 
319
 
 
320
        /* Return current heap TID on success */
 
321
        scan->xs_ctup.t_self = so->hashso_heappos;
 
322
 
 
323
        PG_RETURN_BOOL(res);
 
324
}
 
325
 
 
326
 
 
327
/*
 
328
 *      hashgetbitmap() -- get all tuples at once
 
329
 */
 
330
Datum
 
331
hashgetbitmap(PG_FUNCTION_ARGS)
 
332
{
 
333
        IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
 
334
        TIDBitmap  *tbm = (TIDBitmap *) PG_GETARG_POINTER(1);
 
335
        HashScanOpaque so = (HashScanOpaque) scan->opaque;
 
336
        bool            res;
 
337
        int64           ntids = 0;
 
338
 
 
339
        res = _hash_first(scan, ForwardScanDirection);
 
340
 
 
341
        while (res)
 
342
        {
 
343
                bool            add_tuple;
 
344
 
 
345
                /*
 
346
                 * Skip killed tuples if asked to.
 
347
                 */
 
348
                if (scan->ignore_killed_tuples)
 
349
                {
 
350
                        Page            page;
 
351
                        OffsetNumber offnum;
 
352
 
 
353
                        offnum = ItemPointerGetOffsetNumber(&(so->hashso_curpos));
 
354
                        page = BufferGetPage(so->hashso_curbuf);
 
355
                        add_tuple = !ItemIdIsDead(PageGetItemId(page, offnum));
 
356
                }
 
357
                else
 
358
                        add_tuple = true;
 
359
 
 
360
                /* Save tuple ID, and continue scanning */
 
361
                if (add_tuple)
 
362
                {
 
363
                        /* Note we mark the tuple ID as requiring recheck */
 
364
                        tbm_add_tuples(tbm, &(so->hashso_heappos), 1, true);
 
365
                        ntids++;
 
366
                }
 
367
 
 
368
                res = _hash_next(scan, ForwardScanDirection);
 
369
        }
 
370
 
 
371
        PG_RETURN_INT64(ntids);
 
372
}
 
373
 
 
374
 
 
375
/*
 
376
 *      hashbeginscan() -- start a scan on a hash index
 
377
 */
 
378
Datum
 
379
hashbeginscan(PG_FUNCTION_ARGS)
 
380
{
 
381
        Relation        rel = (Relation) PG_GETARG_POINTER(0);
 
382
        int                     nkeys = PG_GETARG_INT32(1);
 
383
        int                     norderbys = PG_GETARG_INT32(2);
 
384
        IndexScanDesc scan;
 
385
        HashScanOpaque so;
 
386
 
 
387
        /* no order by operators allowed */
 
388
        Assert(norderbys == 0);
 
389
 
 
390
        scan = RelationGetIndexScan(rel, nkeys, norderbys);
 
391
 
 
392
        so = (HashScanOpaque) palloc(sizeof(HashScanOpaqueData));
 
393
        so->hashso_bucket_valid = false;
 
394
        so->hashso_bucket_blkno = 0;
 
395
        so->hashso_curbuf = InvalidBuffer;
 
396
        /* set position invalid (this will cause _hash_first call) */
 
397
        ItemPointerSetInvalid(&(so->hashso_curpos));
 
398
        ItemPointerSetInvalid(&(so->hashso_heappos));
 
399
 
 
400
        scan->opaque = so;
 
401
 
 
402
        /* register scan in case we change pages it's using */
 
403
        _hash_regscan(scan);
 
404
 
 
405
        PG_RETURN_POINTER(scan);
 
406
}
 
407
 
 
408
/*
 
409
 *      hashrescan() -- rescan an index relation
 
410
 */
 
411
Datum
 
412
hashrescan(PG_FUNCTION_ARGS)
 
413
{
 
414
        IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
 
415
        ScanKey         scankey = (ScanKey) PG_GETARG_POINTER(1);
 
416
 
 
417
        /* remaining arguments are ignored */
 
418
        HashScanOpaque so = (HashScanOpaque) scan->opaque;
 
419
        Relation        rel = scan->indexRelation;
 
420
 
 
421
        /* release any pin we still hold */
 
422
        if (BufferIsValid(so->hashso_curbuf))
 
423
                _hash_dropbuf(rel, so->hashso_curbuf);
 
424
        so->hashso_curbuf = InvalidBuffer;
 
425
 
 
426
        /* release lock on bucket, too */
 
427
        if (so->hashso_bucket_blkno)
 
428
                _hash_droplock(rel, so->hashso_bucket_blkno, HASH_SHARE);
 
429
        so->hashso_bucket_blkno = 0;
 
430
 
 
431
        /* set position invalid (this will cause _hash_first call) */
 
432
        ItemPointerSetInvalid(&(so->hashso_curpos));
 
433
        ItemPointerSetInvalid(&(so->hashso_heappos));
 
434
 
 
435
        /* Update scan key, if a new one is given */
 
436
        if (scankey && scan->numberOfKeys > 0)
 
437
        {
 
438
                memmove(scan->keyData,
 
439
                                scankey,
 
440
                                scan->numberOfKeys * sizeof(ScanKeyData));
 
441
                so->hashso_bucket_valid = false;
 
442
        }
 
443
 
 
444
        PG_RETURN_VOID();
 
445
}
 
446
 
 
447
/*
 
448
 *      hashendscan() -- close down a scan
 
449
 */
 
450
Datum
 
451
hashendscan(PG_FUNCTION_ARGS)
 
452
{
 
453
        IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
 
454
        HashScanOpaque so = (HashScanOpaque) scan->opaque;
 
455
        Relation        rel = scan->indexRelation;
 
456
 
 
457
        /* don't need scan registered anymore */
 
458
        _hash_dropscan(scan);
 
459
 
 
460
        /* release any pin we still hold */
 
461
        if (BufferIsValid(so->hashso_curbuf))
 
462
                _hash_dropbuf(rel, so->hashso_curbuf);
 
463
        so->hashso_curbuf = InvalidBuffer;
 
464
 
 
465
        /* release lock on bucket, too */
 
466
        if (so->hashso_bucket_blkno)
 
467
                _hash_droplock(rel, so->hashso_bucket_blkno, HASH_SHARE);
 
468
        so->hashso_bucket_blkno = 0;
 
469
 
 
470
        pfree(so);
 
471
        scan->opaque = NULL;
 
472
 
 
473
        PG_RETURN_VOID();
 
474
}
 
475
 
 
476
/*
 
477
 *      hashmarkpos() -- save current scan position
 
478
 */
 
479
Datum
 
480
hashmarkpos(PG_FUNCTION_ARGS)
 
481
{
 
482
        elog(ERROR, "hash does not support mark/restore");
 
483
        PG_RETURN_VOID();
 
484
}
 
485
 
 
486
/*
 
487
 *      hashrestrpos() -- restore scan to last saved position
 
488
 */
 
489
Datum
 
490
hashrestrpos(PG_FUNCTION_ARGS)
 
491
{
 
492
        elog(ERROR, "hash does not support mark/restore");
 
493
        PG_RETURN_VOID();
 
494
}
 
495
 
 
496
/*
 
497
 * Bulk deletion of all index entries pointing to a set of heap tuples.
 
498
 * The set of target tuples is specified via a callback routine that tells
 
499
 * whether any given heap tuple (identified by ItemPointer) is being deleted.
 
500
 *
 
501
 * Result: a palloc'd struct containing statistical info for VACUUM displays.
 
502
 */
 
503
Datum
 
504
hashbulkdelete(PG_FUNCTION_ARGS)
 
505
{
 
506
        IndexVacuumInfo *info = (IndexVacuumInfo *) PG_GETARG_POINTER(0);
 
507
        IndexBulkDeleteResult *stats = (IndexBulkDeleteResult *) PG_GETARG_POINTER(1);
 
508
        IndexBulkDeleteCallback callback = (IndexBulkDeleteCallback) PG_GETARG_POINTER(2);
 
509
        void       *callback_state = (void *) PG_GETARG_POINTER(3);
 
510
        Relation        rel = info->index;
 
511
        double          tuples_removed;
 
512
        double          num_index_tuples;
 
513
        double          orig_ntuples;
 
514
        Bucket          orig_maxbucket;
 
515
        Bucket          cur_maxbucket;
 
516
        Bucket          cur_bucket;
 
517
        Buffer          metabuf;
 
518
        HashMetaPage metap;
 
519
        HashMetaPageData local_metapage;
 
520
 
 
521
        tuples_removed = 0;
 
522
        num_index_tuples = 0;
 
523
 
 
524
        /*
 
525
         * Read the metapage to fetch original bucket and tuple counts.  Also, we
 
526
         * keep a copy of the last-seen metapage so that we can use its
 
527
         * hashm_spares[] values to compute bucket page addresses.      This is a bit
 
528
         * hokey but perfectly safe, since the interesting entries in the spares
 
529
         * array cannot change under us; and it beats rereading the metapage for
 
530
         * each bucket.
 
531
         */
 
532
        metabuf = _hash_getbuf(rel, HASH_METAPAGE, HASH_READ, LH_META_PAGE);
 
533
        metap = HashPageGetMeta(BufferGetPage(metabuf));
 
534
        orig_maxbucket = metap->hashm_maxbucket;
 
535
        orig_ntuples = metap->hashm_ntuples;
 
536
        memcpy(&local_metapage, metap, sizeof(local_metapage));
 
537
        _hash_relbuf(rel, metabuf);
 
538
 
 
539
        /* Scan the buckets that we know exist */
 
540
        cur_bucket = 0;
 
541
        cur_maxbucket = orig_maxbucket;
 
542
 
 
543
loop_top:
 
544
        while (cur_bucket <= cur_maxbucket)
 
545
        {
 
546
                BlockNumber bucket_blkno;
 
547
                BlockNumber blkno;
 
548
                bool            bucket_dirty = false;
 
549
 
 
550
                /* Get address of bucket's start page */
 
551
                bucket_blkno = BUCKET_TO_BLKNO(&local_metapage, cur_bucket);
 
552
 
 
553
                /* Exclusive-lock the bucket so we can shrink it */
 
554
                _hash_getlock(rel, bucket_blkno, HASH_EXCLUSIVE);
 
555
 
 
556
                /* Shouldn't have any active scans locally, either */
 
557
                if (_hash_has_active_scan(rel, cur_bucket))
 
558
                        elog(ERROR, "hash index has active scan during VACUUM");
 
559
 
 
560
                /* Scan each page in bucket */
 
561
                blkno = bucket_blkno;
 
562
                while (BlockNumberIsValid(blkno))
 
563
                {
 
564
                        Buffer          buf;
 
565
                        Page            page;
 
566
                        HashPageOpaque opaque;
 
567
                        OffsetNumber offno;
 
568
                        OffsetNumber maxoffno;
 
569
                        OffsetNumber deletable[MaxOffsetNumber];
 
570
                        int                     ndeletable = 0;
 
571
 
 
572
                        vacuum_delay_point();
 
573
 
 
574
                        buf = _hash_getbuf_with_strategy(rel, blkno, HASH_WRITE,
 
575
                                                                                   LH_BUCKET_PAGE | LH_OVERFLOW_PAGE,
 
576
                                                                                         info->strategy);
 
577
                        page = BufferGetPage(buf);
 
578
                        opaque = (HashPageOpaque) PageGetSpecialPointer(page);
 
579
                        Assert(opaque->hasho_bucket == cur_bucket);
 
580
 
 
581
                        /* Scan each tuple in page */
 
582
                        maxoffno = PageGetMaxOffsetNumber(page);
 
583
                        for (offno = FirstOffsetNumber;
 
584
                                 offno <= maxoffno;
 
585
                                 offno = OffsetNumberNext(offno))
 
586
                        {
 
587
                                IndexTuple      itup;
 
588
                                ItemPointer htup;
 
589
 
 
590
                                itup = (IndexTuple) PageGetItem(page,
 
591
                                                                                                PageGetItemId(page, offno));
 
592
                                htup = &(itup->t_tid);
 
593
                                if (callback(htup, callback_state))
 
594
                                {
 
595
                                        /* mark the item for deletion */
 
596
                                        deletable[ndeletable++] = offno;
 
597
                                        tuples_removed += 1;
 
598
                                }
 
599
                                else
 
600
                                        num_index_tuples += 1;
 
601
                        }
 
602
 
 
603
                        /*
 
604
                         * Apply deletions and write page if needed, advance to next page.
 
605
                         */
 
606
                        blkno = opaque->hasho_nextblkno;
 
607
 
 
608
                        if (ndeletable > 0)
 
609
                        {
 
610
                                PageIndexMultiDelete(page, deletable, ndeletable);
 
611
                                _hash_wrtbuf(rel, buf);
 
612
                                bucket_dirty = true;
 
613
                        }
 
614
                        else
 
615
                                _hash_relbuf(rel, buf);
 
616
                }
 
617
 
 
618
                /* If we deleted anything, try to compact free space */
 
619
                if (bucket_dirty)
 
620
                        _hash_squeezebucket(rel, cur_bucket, bucket_blkno,
 
621
                                                                info->strategy);
 
622
 
 
623
                /* Release bucket lock */
 
624
                _hash_droplock(rel, bucket_blkno, HASH_EXCLUSIVE);
 
625
 
 
626
                /* Advance to next bucket */
 
627
                cur_bucket++;
 
628
        }
 
629
 
 
630
        /* Write-lock metapage and check for split since we started */
 
631
        metabuf = _hash_getbuf(rel, HASH_METAPAGE, HASH_WRITE, LH_META_PAGE);
 
632
        metap = HashPageGetMeta(BufferGetPage(metabuf));
 
633
 
 
634
        if (cur_maxbucket != metap->hashm_maxbucket)
 
635
        {
 
636
                /* There's been a split, so process the additional bucket(s) */
 
637
                cur_maxbucket = metap->hashm_maxbucket;
 
638
                memcpy(&local_metapage, metap, sizeof(local_metapage));
 
639
                _hash_relbuf(rel, metabuf);
 
640
                goto loop_top;
 
641
        }
 
642
 
 
643
        /* Okay, we're really done.  Update tuple count in metapage. */
 
644
 
 
645
        if (orig_maxbucket == metap->hashm_maxbucket &&
 
646
                orig_ntuples == metap->hashm_ntuples)
 
647
        {
 
648
                /*
 
649
                 * No one has split or inserted anything since start of scan, so
 
650
                 * believe our count as gospel.
 
651
                 */
 
652
                metap->hashm_ntuples = num_index_tuples;
 
653
        }
 
654
        else
 
655
        {
 
656
                /*
 
657
                 * Otherwise, our count is untrustworthy since we may have
 
658
                 * double-scanned tuples in split buckets.      Proceed by dead-reckoning.
 
659
                 * (Note: we still return estimated_count = false, because using this
 
660
                 * count is better than not updating reltuples at all.)
 
661
                 */
 
662
                if (metap->hashm_ntuples > tuples_removed)
 
663
                        metap->hashm_ntuples -= tuples_removed;
 
664
                else
 
665
                        metap->hashm_ntuples = 0;
 
666
                num_index_tuples = metap->hashm_ntuples;
 
667
        }
 
668
 
 
669
        _hash_wrtbuf(rel, metabuf);
 
670
 
 
671
        /* return statistics */
 
672
        if (stats == NULL)
 
673
                stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
 
674
        stats->estimated_count = false;
 
675
        stats->num_index_tuples = num_index_tuples;
 
676
        stats->tuples_removed += tuples_removed;
 
677
        /* hashvacuumcleanup will fill in num_pages */
 
678
 
 
679
        PG_RETURN_POINTER(stats);
 
680
}
 
681
 
 
682
/*
 
683
 * Post-VACUUM cleanup.
 
684
 *
 
685
 * Result: a palloc'd struct containing statistical info for VACUUM displays.
 
686
 */
 
687
Datum
 
688
hashvacuumcleanup(PG_FUNCTION_ARGS)
 
689
{
 
690
        IndexVacuumInfo *info = (IndexVacuumInfo *) PG_GETARG_POINTER(0);
 
691
        IndexBulkDeleteResult *stats = (IndexBulkDeleteResult *) PG_GETARG_POINTER(1);
 
692
        Relation        rel = info->index;
 
693
        BlockNumber num_pages;
 
694
 
 
695
        /* If hashbulkdelete wasn't called, return NULL signifying no change */
 
696
        /* Note: this covers the analyze_only case too */
 
697
        if (stats == NULL)
 
698
                PG_RETURN_POINTER(NULL);
 
699
 
 
700
        /* update statistics */
 
701
        num_pages = RelationGetNumberOfBlocks(rel);
 
702
        stats->num_pages = num_pages;
 
703
 
 
704
        PG_RETURN_POINTER(stats);
 
705
}
 
706
 
 
707
 
 
708
void
 
709
hash_redo(XLogRecPtr lsn, XLogRecord *record)
 
710
{
 
711
        elog(PANIC, "hash_redo: unimplemented");
 
712
}
 
713
 
 
714
void
 
715
hash_desc(StringInfo buf, uint8 xl_info, char *rec)
 
716
{
 
717
}