~ubuntu-branches/ubuntu/precise/postgresql-9.1/precise-security

« back to all changes in this revision

Viewing changes to src/backend/executor/nodeTidscan.c

  • Committer: Bazaar Package Importer
  • Author(s): Martin Pitt
  • Date: 2011-05-11 10:41:53 UTC
  • Revision ID: james.westby@ubuntu.com-20110511104153-psbh2o58553fv1m0
Tags: upstream-9.1~beta1
ImportĀ upstreamĀ versionĀ 9.1~beta1

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*-------------------------------------------------------------------------
 
2
 *
 
3
 * nodeTidscan.c
 
4
 *        Routines to support direct tid scans of relations
 
5
 *
 
6
 * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
 
7
 * Portions Copyright (c) 1994, Regents of the University of California
 
8
 *
 
9
 *
 
10
 * IDENTIFICATION
 
11
 *        src/backend/executor/nodeTidscan.c
 
12
 *
 
13
 *-------------------------------------------------------------------------
 
14
 */
 
15
/*
 
16
 * INTERFACE ROUTINES
 
17
 *
 
18
 *              ExecTidScan                     scans a relation using tids
 
19
 *              ExecInitTidScan         creates and initializes state info.
 
20
 *              ExecReScanTidScan       rescans the tid relation.
 
21
 *              ExecEndTidScan          releases all storage.
 
22
 *              ExecTidMarkPos          marks scan position.
 
23
 *              ExecTidRestrPos         restores scan position.
 
24
 */
 
25
#include "postgres.h"
 
26
 
 
27
#include "access/heapam.h"
 
28
#include "access/sysattr.h"
 
29
#include "catalog/pg_type.h"
 
30
#include "executor/execdebug.h"
 
31
#include "executor/nodeTidscan.h"
 
32
#include "optimizer/clauses.h"
 
33
#include "storage/bufmgr.h"
 
34
#include "utils/array.h"
 
35
 
 
36
 
 
37
#define IsCTIDVar(node)  \
 
38
        ((node) != NULL && \
 
39
         IsA((node), Var) && \
 
40
         ((Var *) (node))->varattno == SelfItemPointerAttributeNumber && \
 
41
         ((Var *) (node))->varlevelsup == 0)
 
42
 
 
43
static void TidListCreate(TidScanState *tidstate);
 
44
static int      itemptr_comparator(const void *a, const void *b);
 
45
static TupleTableSlot *TidNext(TidScanState *node);
 
46
 
 
47
 
 
48
/*
 
49
 * Compute the list of TIDs to be visited, by evaluating the expressions
 
50
 * for them.
 
51
 *
 
52
 * (The result is actually an array, not a list.)
 
53
 */
 
54
static void
 
55
TidListCreate(TidScanState *tidstate)
 
56
{
 
57
        List       *evalList = tidstate->tss_tidquals;
 
58
        ExprContext *econtext = tidstate->ss.ps.ps_ExprContext;
 
59
        BlockNumber nblocks;
 
60
        ItemPointerData *tidList;
 
61
        int                     numAllocTids;
 
62
        int                     numTids;
 
63
        ListCell   *l;
 
64
 
 
65
        /*
 
66
         * We silently discard any TIDs that are out of range at the time of scan
 
67
         * start.  (Since we hold at least AccessShareLock on the table, it won't
 
68
         * be possible for someone to truncate away the blocks we intend to
 
69
         * visit.)
 
70
         */
 
71
        nblocks = RelationGetNumberOfBlocks(tidstate->ss.ss_currentRelation);
 
72
 
 
73
        /*
 
74
         * We initialize the array with enough slots for the case that all quals
 
75
         * are simple OpExprs or CurrentOfExprs.  If there are any
 
76
         * ScalarArrayOpExprs, we may have to enlarge the array.
 
77
         */
 
78
        numAllocTids = list_length(evalList);
 
79
        tidList = (ItemPointerData *)
 
80
                palloc(numAllocTids * sizeof(ItemPointerData));
 
81
        numTids = 0;
 
82
        tidstate->tss_isCurrentOf = false;
 
83
 
 
84
        foreach(l, evalList)
 
85
        {
 
86
                ExprState  *exstate = (ExprState *) lfirst(l);
 
87
                Expr       *expr = exstate->expr;
 
88
                ItemPointer itemptr;
 
89
                bool            isNull;
 
90
 
 
91
                if (is_opclause(expr))
 
92
                {
 
93
                        FuncExprState *fexstate = (FuncExprState *) exstate;
 
94
                        Node       *arg1;
 
95
                        Node       *arg2;
 
96
 
 
97
                        arg1 = get_leftop(expr);
 
98
                        arg2 = get_rightop(expr);
 
99
                        if (IsCTIDVar(arg1))
 
100
                                exstate = (ExprState *) lsecond(fexstate->args);
 
101
                        else if (IsCTIDVar(arg2))
 
102
                                exstate = (ExprState *) linitial(fexstate->args);
 
103
                        else
 
104
                                elog(ERROR, "could not identify CTID variable");
 
105
 
 
106
                        itemptr = (ItemPointer)
 
107
                                DatumGetPointer(ExecEvalExprSwitchContext(exstate,
 
108
                                                                                                                  econtext,
 
109
                                                                                                                  &isNull,
 
110
                                                                                                                  NULL));
 
111
                        if (!isNull &&
 
112
                                ItemPointerIsValid(itemptr) &&
 
113
                                ItemPointerGetBlockNumber(itemptr) < nblocks)
 
114
                        {
 
115
                                if (numTids >= numAllocTids)
 
116
                                {
 
117
                                        numAllocTids *= 2;
 
118
                                        tidList = (ItemPointerData *)
 
119
                                                repalloc(tidList,
 
120
                                                                 numAllocTids * sizeof(ItemPointerData));
 
121
                                }
 
122
                                tidList[numTids++] = *itemptr;
 
123
                        }
 
124
                }
 
125
                else if (expr && IsA(expr, ScalarArrayOpExpr))
 
126
                {
 
127
                        ScalarArrayOpExprState *saexstate = (ScalarArrayOpExprState *) exstate;
 
128
                        Datum           arraydatum;
 
129
                        ArrayType  *itemarray;
 
130
                        Datum      *ipdatums;
 
131
                        bool       *ipnulls;
 
132
                        int                     ndatums;
 
133
                        int                     i;
 
134
 
 
135
                        exstate = (ExprState *) lsecond(saexstate->fxprstate.args);
 
136
                        arraydatum = ExecEvalExprSwitchContext(exstate,
 
137
                                                                                                   econtext,
 
138
                                                                                                   &isNull,
 
139
                                                                                                   NULL);
 
140
                        if (isNull)
 
141
                                continue;
 
142
                        itemarray = DatumGetArrayTypeP(arraydatum);
 
143
                        deconstruct_array(itemarray,
 
144
                                                          TIDOID, SizeOfIptrData, false, 's',
 
145
                                                          &ipdatums, &ipnulls, &ndatums);
 
146
                        if (numTids + ndatums > numAllocTids)
 
147
                        {
 
148
                                numAllocTids = numTids + ndatums;
 
149
                                tidList = (ItemPointerData *)
 
150
                                        repalloc(tidList,
 
151
                                                         numAllocTids * sizeof(ItemPointerData));
 
152
                        }
 
153
                        for (i = 0; i < ndatums; i++)
 
154
                        {
 
155
                                if (!ipnulls[i])
 
156
                                {
 
157
                                        itemptr = (ItemPointer) DatumGetPointer(ipdatums[i]);
 
158
                                        if (ItemPointerIsValid(itemptr) &&
 
159
                                                ItemPointerGetBlockNumber(itemptr) < nblocks)
 
160
                                                tidList[numTids++] = *itemptr;
 
161
                                }
 
162
                        }
 
163
                        pfree(ipdatums);
 
164
                        pfree(ipnulls);
 
165
                }
 
166
                else if (expr && IsA(expr, CurrentOfExpr))
 
167
                {
 
168
                        CurrentOfExpr *cexpr = (CurrentOfExpr *) expr;
 
169
                        ItemPointerData cursor_tid;
 
170
 
 
171
                        if (execCurrentOf(cexpr, econtext,
 
172
                                                   RelationGetRelid(tidstate->ss.ss_currentRelation),
 
173
                                                          &cursor_tid))
 
174
                        {
 
175
                                if (numTids >= numAllocTids)
 
176
                                {
 
177
                                        numAllocTids *= 2;
 
178
                                        tidList = (ItemPointerData *)
 
179
                                                repalloc(tidList,
 
180
                                                                 numAllocTids * sizeof(ItemPointerData));
 
181
                                }
 
182
                                tidList[numTids++] = cursor_tid;
 
183
                                tidstate->tss_isCurrentOf = true;
 
184
                        }
 
185
                }
 
186
                else
 
187
                        elog(ERROR, "could not identify CTID expression");
 
188
        }
 
189
 
 
190
        /*
 
191
         * Sort the array of TIDs into order, and eliminate duplicates.
 
192
         * Eliminating duplicates is necessary since we want OR semantics across
 
193
         * the list.  Sorting makes it easier to detect duplicates, and as a bonus
 
194
         * ensures that we will visit the heap in the most efficient way.
 
195
         */
 
196
        if (numTids > 1)
 
197
        {
 
198
                int                     lastTid;
 
199
                int                     i;
 
200
 
 
201
                /* CurrentOfExpr could never appear OR'd with something else */
 
202
                Assert(!tidstate->tss_isCurrentOf);
 
203
 
 
204
                qsort((void *) tidList, numTids, sizeof(ItemPointerData),
 
205
                          itemptr_comparator);
 
206
                lastTid = 0;
 
207
                for (i = 1; i < numTids; i++)
 
208
                {
 
209
                        if (!ItemPointerEquals(&tidList[lastTid], &tidList[i]))
 
210
                                tidList[++lastTid] = tidList[i];
 
211
                }
 
212
                numTids = lastTid + 1;
 
213
        }
 
214
 
 
215
        tidstate->tss_TidList = tidList;
 
216
        tidstate->tss_NumTids = numTids;
 
217
        tidstate->tss_TidPtr = -1;
 
218
}
 
219
 
 
220
/*
 
221
 * qsort comparator for ItemPointerData items
 
222
 */
 
223
static int
 
224
itemptr_comparator(const void *a, const void *b)
 
225
{
 
226
        const ItemPointerData *ipa = (const ItemPointerData *) a;
 
227
        const ItemPointerData *ipb = (const ItemPointerData *) b;
 
228
        BlockNumber ba = ItemPointerGetBlockNumber(ipa);
 
229
        BlockNumber bb = ItemPointerGetBlockNumber(ipb);
 
230
        OffsetNumber oa = ItemPointerGetOffsetNumber(ipa);
 
231
        OffsetNumber ob = ItemPointerGetOffsetNumber(ipb);
 
232
 
 
233
        if (ba < bb)
 
234
                return -1;
 
235
        if (ba > bb)
 
236
                return 1;
 
237
        if (oa < ob)
 
238
                return -1;
 
239
        if (oa > ob)
 
240
                return 1;
 
241
        return 0;
 
242
}
 
243
 
 
244
/* ----------------------------------------------------------------
 
245
 *              TidNext
 
246
 *
 
247
 *              Retrieve a tuple from the TidScan node's currentRelation
 
248
 *              using the tids in the TidScanState information.
 
249
 *
 
250
 * ----------------------------------------------------------------
 
251
 */
 
252
static TupleTableSlot *
 
253
TidNext(TidScanState *node)
 
254
{
 
255
        EState     *estate;
 
256
        ScanDirection direction;
 
257
        Snapshot        snapshot;
 
258
        Relation        heapRelation;
 
259
        HeapTuple       tuple;
 
260
        TupleTableSlot *slot;
 
261
        Buffer          buffer = InvalidBuffer;
 
262
        ItemPointerData *tidList;
 
263
        int                     numTids;
 
264
        bool            bBackward;
 
265
 
 
266
        /*
 
267
         * extract necessary information from tid scan node
 
268
         */
 
269
        estate = node->ss.ps.state;
 
270
        direction = estate->es_direction;
 
271
        snapshot = estate->es_snapshot;
 
272
        heapRelation = node->ss.ss_currentRelation;
 
273
        slot = node->ss.ss_ScanTupleSlot;
 
274
 
 
275
        /*
 
276
         * First time through, compute the list of TIDs to be visited
 
277
         */
 
278
        if (node->tss_TidList == NULL)
 
279
                TidListCreate(node);
 
280
 
 
281
        tidList = node->tss_TidList;
 
282
        numTids = node->tss_NumTids;
 
283
 
 
284
        tuple = &(node->tss_htup);
 
285
 
 
286
        /*
 
287
         * Initialize or advance scan position, depending on direction.
 
288
         */
 
289
        bBackward = ScanDirectionIsBackward(direction);
 
290
        if (bBackward)
 
291
        {
 
292
                if (node->tss_TidPtr < 0)
 
293
                {
 
294
                        /* initialize for backward scan */
 
295
                        node->tss_TidPtr = numTids - 1;
 
296
                }
 
297
                else
 
298
                        node->tss_TidPtr--;
 
299
        }
 
300
        else
 
301
        {
 
302
                if (node->tss_TidPtr < 0)
 
303
                {
 
304
                        /* initialize for forward scan */
 
305
                        node->tss_TidPtr = 0;
 
306
                }
 
307
                else
 
308
                        node->tss_TidPtr++;
 
309
        }
 
310
 
 
311
        while (node->tss_TidPtr >= 0 && node->tss_TidPtr < numTids)
 
312
        {
 
313
                tuple->t_self = tidList[node->tss_TidPtr];
 
314
 
 
315
                /*
 
316
                 * For WHERE CURRENT OF, the tuple retrieved from the cursor might
 
317
                 * since have been updated; if so, we should fetch the version that is
 
318
                 * current according to our snapshot.
 
319
                 */
 
320
                if (node->tss_isCurrentOf)
 
321
                        heap_get_latest_tid(heapRelation, snapshot, &tuple->t_self);
 
322
 
 
323
                if (heap_fetch(heapRelation, snapshot, tuple, &buffer, false, NULL))
 
324
                {
 
325
                        /*
 
326
                         * store the scanned tuple in the scan tuple slot of the scan
 
327
                         * state.  Eventually we will only do this and not return a tuple.
 
328
                         * Note: we pass 'false' because tuples returned by amgetnext are
 
329
                         * pointers onto disk pages and were not created with palloc() and
 
330
                         * so should not be pfree()'d.
 
331
                         */
 
332
                        ExecStoreTuple(tuple,           /* tuple to store */
 
333
                                                   slot,        /* slot to store in */
 
334
                                                   buffer,              /* buffer associated with tuple  */
 
335
                                                   false);              /* don't pfree */
 
336
 
 
337
                        /*
 
338
                         * At this point we have an extra pin on the buffer, because
 
339
                         * ExecStoreTuple incremented the pin count. Drop our local pin.
 
340
                         */
 
341
                        ReleaseBuffer(buffer);
 
342
 
 
343
                        return slot;
 
344
                }
 
345
                /* Bad TID or failed snapshot qual; try next */
 
346
                if (bBackward)
 
347
                        node->tss_TidPtr--;
 
348
                else
 
349
                        node->tss_TidPtr++;
 
350
        }
 
351
 
 
352
        /*
 
353
         * if we get here it means the tid scan failed so we are at the end of the
 
354
         * scan..
 
355
         */
 
356
        return ExecClearTuple(slot);
 
357
}
 
358
 
 
359
/*
 
360
 * TidRecheck -- access method routine to recheck a tuple in EvalPlanQual
 
361
 */
 
362
static bool
 
363
TidRecheck(TidScanState *node, TupleTableSlot *slot)
 
364
{
 
365
        /*
 
366
         * XXX shouldn't we check here to make sure tuple matches TID list? In
 
367
         * runtime-key case this is not certain, is it?  However, in the WHERE
 
368
         * CURRENT OF case it might not match anyway ...
 
369
         */
 
370
        return true;
 
371
}
 
372
 
 
373
 
 
374
/* ----------------------------------------------------------------
 
375
 *              ExecTidScan(node)
 
376
 *
 
377
 *              Scans the relation using tids and returns
 
378
 *                 the next qualifying tuple in the direction specified.
 
379
 *              We call the ExecScan() routine and pass it the appropriate
 
380
 *              access method functions.
 
381
 *
 
382
 *              Conditions:
 
383
 *                -- the "cursor" maintained by the AMI is positioned at the tuple
 
384
 *                       returned previously.
 
385
 *
 
386
 *              Initial States:
 
387
 *                -- the relation indicated is opened for scanning so that the
 
388
 *                       "cursor" is positioned before the first qualifying tuple.
 
389
 *                -- tidPtr is -1.
 
390
 * ----------------------------------------------------------------
 
391
 */
 
392
TupleTableSlot *
 
393
ExecTidScan(TidScanState *node)
 
394
{
 
395
        return ExecScan(&node->ss,
 
396
                                        (ExecScanAccessMtd) TidNext,
 
397
                                        (ExecScanRecheckMtd) TidRecheck);
 
398
}
 
399
 
 
400
/* ----------------------------------------------------------------
 
401
 *              ExecReScanTidScan(node)
 
402
 * ----------------------------------------------------------------
 
403
 */
 
404
void
 
405
ExecReScanTidScan(TidScanState *node)
 
406
{
 
407
        if (node->tss_TidList)
 
408
                pfree(node->tss_TidList);
 
409
        node->tss_TidList = NULL;
 
410
        node->tss_NumTids = 0;
 
411
        node->tss_TidPtr = -1;
 
412
 
 
413
        ExecScanReScan(&node->ss);
 
414
}
 
415
 
 
416
/* ----------------------------------------------------------------
 
417
 *              ExecEndTidScan
 
418
 *
 
419
 *              Releases any storage allocated through C routines.
 
420
 *              Returns nothing.
 
421
 * ----------------------------------------------------------------
 
422
 */
 
423
void
 
424
ExecEndTidScan(TidScanState *node)
 
425
{
 
426
        /*
 
427
         * Free the exprcontext
 
428
         */
 
429
        ExecFreeExprContext(&node->ss.ps);
 
430
 
 
431
        /*
 
432
         * clear out tuple table slots
 
433
         */
 
434
        ExecClearTuple(node->ss.ps.ps_ResultTupleSlot);
 
435
        ExecClearTuple(node->ss.ss_ScanTupleSlot);
 
436
 
 
437
        /*
 
438
         * close the heap relation.
 
439
         */
 
440
        ExecCloseScanRelation(node->ss.ss_currentRelation);
 
441
}
 
442
 
 
443
/* ----------------------------------------------------------------
 
444
 *              ExecTidMarkPos
 
445
 *
 
446
 *              Marks scan position by marking the current tid.
 
447
 *              Returns nothing.
 
448
 * ----------------------------------------------------------------
 
449
 */
 
450
void
 
451
ExecTidMarkPos(TidScanState *node)
 
452
{
 
453
        node->tss_MarkTidPtr = node->tss_TidPtr;
 
454
}
 
455
 
 
456
/* ----------------------------------------------------------------
 
457
 *              ExecTidRestrPos
 
458
 *
 
459
 *              Restores scan position by restoring the current tid.
 
460
 *              Returns nothing.
 
461
 *
 
462
 *              XXX Assumes previously marked scan position belongs to current tid
 
463
 * ----------------------------------------------------------------
 
464
 */
 
465
void
 
466
ExecTidRestrPos(TidScanState *node)
 
467
{
 
468
        node->tss_TidPtr = node->tss_MarkTidPtr;
 
469
}
 
470
 
 
471
/* ----------------------------------------------------------------
 
472
 *              ExecInitTidScan
 
473
 *
 
474
 *              Initializes the tid scan's state information, creates
 
475
 *              scan keys, and opens the base and tid relations.
 
476
 *
 
477
 *              Parameters:
 
478
 *                node: TidNode node produced by the planner.
 
479
 *                estate: the execution state initialized in InitPlan.
 
480
 * ----------------------------------------------------------------
 
481
 */
 
482
TidScanState *
 
483
ExecInitTidScan(TidScan *node, EState *estate, int eflags)
 
484
{
 
485
        TidScanState *tidstate;
 
486
        Relation        currentRelation;
 
487
 
 
488
        /*
 
489
         * create state structure
 
490
         */
 
491
        tidstate = makeNode(TidScanState);
 
492
        tidstate->ss.ps.plan = (Plan *) node;
 
493
        tidstate->ss.ps.state = estate;
 
494
 
 
495
        /*
 
496
         * Miscellaneous initialization
 
497
         *
 
498
         * create expression context for node
 
499
         */
 
500
        ExecAssignExprContext(estate, &tidstate->ss.ps);
 
501
 
 
502
        tidstate->ss.ps.ps_TupFromTlist = false;
 
503
 
 
504
        /*
 
505
         * initialize child expressions
 
506
         */
 
507
        tidstate->ss.ps.targetlist = (List *)
 
508
                ExecInitExpr((Expr *) node->scan.plan.targetlist,
 
509
                                         (PlanState *) tidstate);
 
510
        tidstate->ss.ps.qual = (List *)
 
511
                ExecInitExpr((Expr *) node->scan.plan.qual,
 
512
                                         (PlanState *) tidstate);
 
513
 
 
514
        tidstate->tss_tidquals = (List *)
 
515
                ExecInitExpr((Expr *) node->tidquals,
 
516
                                         (PlanState *) tidstate);
 
517
 
 
518
        /*
 
519
         * tuple table initialization
 
520
         */
 
521
        ExecInitResultTupleSlot(estate, &tidstate->ss.ps);
 
522
        ExecInitScanTupleSlot(estate, &tidstate->ss);
 
523
 
 
524
        /*
 
525
         * mark tid list as not computed yet
 
526
         */
 
527
        tidstate->tss_TidList = NULL;
 
528
        tidstate->tss_NumTids = 0;
 
529
        tidstate->tss_TidPtr = -1;
 
530
 
 
531
        /*
 
532
         * open the base relation and acquire appropriate lock on it.
 
533
         */
 
534
        currentRelation = ExecOpenScanRelation(estate, node->scan.scanrelid);
 
535
 
 
536
        tidstate->ss.ss_currentRelation = currentRelation;
 
537
        tidstate->ss.ss_currentScanDesc = NULL;         /* no heap scan here */
 
538
 
 
539
        /*
 
540
         * get the scan type from the relation descriptor.
 
541
         */
 
542
        ExecAssignScanType(&tidstate->ss, RelationGetDescr(currentRelation));
 
543
 
 
544
        /*
 
545
         * Initialize result tuple type and projection info.
 
546
         */
 
547
        ExecAssignResultTypeFromTL(&tidstate->ss.ps);
 
548
        ExecAssignScanProjectionInfo(&tidstate->ss);
 
549
 
 
550
        /*
 
551
         * all done.
 
552
         */
 
553
        return tidstate;
 
554
}