~ubuntu-branches/ubuntu/oneiric/postgresql-9.1/oneiric-security

« back to all changes in this revision

Viewing changes to src/backend/executor/nodeWindowAgg.c

  • Committer: Bazaar Package Importer
  • Author(s): Martin Pitt
  • Date: 2011-05-11 10:41:53 UTC
  • Revision ID: james.westby@ubuntu.com-20110511104153-psbh2o58553fv1m0
Tags: upstream-9.1~beta1
ImportĀ upstreamĀ versionĀ 9.1~beta1

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*-------------------------------------------------------------------------
 
2
 *
 
3
 * nodeWindowAgg.c
 
4
 *        routines to handle WindowAgg nodes.
 
5
 *
 
6
 * A WindowAgg node evaluates "window functions" across suitable partitions
 
7
 * of the input tuple set.      Any one WindowAgg works for just a single window
 
8
 * specification, though it can evaluate multiple window functions sharing
 
9
 * identical window specifications.  The input tuples are required to be
 
10
 * delivered in sorted order, with the PARTITION BY columns (if any) as
 
11
 * major sort keys and the ORDER BY columns (if any) as minor sort keys.
 
12
 * (The planner generates a stack of WindowAggs with intervening Sort nodes
 
13
 * as needed, if a query involves more than one window specification.)
 
14
 *
 
15
 * Since window functions can require access to any or all of the rows in
 
16
 * the current partition, we accumulate rows of the partition into a
 
17
 * tuplestore.  The window functions are called using the WindowObject API
 
18
 * so that they can access those rows as needed.
 
19
 *
 
20
 * We also support using plain aggregate functions as window functions.
 
21
 * For these, the regular Agg-node environment is emulated for each partition.
 
22
 * As required by the SQL spec, the output represents the value of the
 
23
 * aggregate function over all rows in the current row's window frame.
 
24
 *
 
25
 *
 
26
 * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
 
27
 * Portions Copyright (c) 1994, Regents of the University of California
 
28
 *
 
29
 * IDENTIFICATION
 
30
 *        src/backend/executor/nodeWindowAgg.c
 
31
 *
 
32
 *-------------------------------------------------------------------------
 
33
 */
 
34
#include "postgres.h"
 
35
 
 
36
#include "catalog/pg_aggregate.h"
 
37
#include "catalog/pg_proc.h"
 
38
#include "catalog/pg_type.h"
 
39
#include "executor/executor.h"
 
40
#include "executor/nodeWindowAgg.h"
 
41
#include "miscadmin.h"
 
42
#include "nodes/nodeFuncs.h"
 
43
#include "optimizer/clauses.h"
 
44
#include "parser/parse_agg.h"
 
45
#include "parser/parse_coerce.h"
 
46
#include "utils/acl.h"
 
47
#include "utils/builtins.h"
 
48
#include "utils/datum.h"
 
49
#include "utils/lsyscache.h"
 
50
#include "utils/memutils.h"
 
51
#include "utils/syscache.h"
 
52
#include "windowapi.h"
 
53
 
 
54
/*
 
55
 * All the window function APIs are called with this object, which is passed
 
56
 * to window functions as fcinfo->context.
 
57
 */
 
58
typedef struct WindowObjectData
 
59
{
 
60
        NodeTag         type;
 
61
        WindowAggState *winstate;       /* parent WindowAggState */
 
62
        List       *argstates;          /* ExprState trees for fn's arguments */
 
63
        void       *localmem;           /* WinGetPartitionLocalMemory's chunk */
 
64
        int                     markptr;                /* tuplestore mark pointer for this fn */
 
65
        int                     readptr;                /* tuplestore read pointer for this fn */
 
66
        int64           markpos;                /* row that markptr is positioned on */
 
67
        int64           seekpos;                /* row that readptr is positioned on */
 
68
} WindowObjectData;
 
69
 
 
70
/*
 
71
 * We have one WindowStatePerFunc struct for each window function and
 
72
 * window aggregate handled by this node.
 
73
 */
 
74
typedef struct WindowStatePerFuncData
 
75
{
 
76
        /* Links to WindowFunc expr and state nodes this working state is for */
 
77
        WindowFuncExprState *wfuncstate;
 
78
        WindowFunc *wfunc;
 
79
 
 
80
        int                     numArguments;   /* number of arguments */
 
81
 
 
82
        FmgrInfo        flinfo;                 /* fmgr lookup data for window function */
 
83
 
 
84
        Oid                     winCollation;   /* collation derived for window function */
 
85
 
 
86
        /*
 
87
         * We need the len and byval info for the result of each function in order
 
88
         * to know how to copy/delete values.
 
89
         */
 
90
        int16           resulttypeLen;
 
91
        bool            resulttypeByVal;
 
92
 
 
93
        bool            plain_agg;              /* is it just a plain aggregate function? */
 
94
        int                     aggno;                  /* if so, index of its PerAggData */
 
95
 
 
96
        WindowObject winobj;            /* object used in window function API */
 
97
}       WindowStatePerFuncData;
 
98
 
 
99
/*
 
100
 * For plain aggregate window functions, we also have one of these.
 
101
 */
 
102
typedef struct WindowStatePerAggData
 
103
{
 
104
        /* Oids of transfer functions */
 
105
        Oid                     transfn_oid;
 
106
        Oid                     finalfn_oid;    /* may be InvalidOid */
 
107
 
 
108
        /*
 
109
         * fmgr lookup data for transfer functions --- only valid when
 
110
         * corresponding oid is not InvalidOid.  Note in particular that fn_strict
 
111
         * flags are kept here.
 
112
         */
 
113
        FmgrInfo        transfn;
 
114
        FmgrInfo        finalfn;
 
115
 
 
116
        /*
 
117
         * initial value from pg_aggregate entry
 
118
         */
 
119
        Datum           initValue;
 
120
        bool            initValueIsNull;
 
121
 
 
122
        /*
 
123
         * cached value for current frame boundaries
 
124
         */
 
125
        Datum           resultValue;
 
126
        bool            resultValueIsNull;
 
127
 
 
128
        /*
 
129
         * We need the len and byval info for the agg's input, result, and
 
130
         * transition data types in order to know how to copy/delete values.
 
131
         */
 
132
        int16           inputtypeLen,
 
133
                                resulttypeLen,
 
134
                                transtypeLen;
 
135
        bool            inputtypeByVal,
 
136
                                resulttypeByVal,
 
137
                                transtypeByVal;
 
138
 
 
139
        int                     wfuncno;                /* index of associated PerFuncData */
 
140
 
 
141
        /* Current transition value */
 
142
        Datum           transValue;             /* current transition value */
 
143
        bool            transValueIsNull;
 
144
 
 
145
        bool            noTransValue;   /* true if transValue not set yet */
 
146
} WindowStatePerAggData;
 
147
 
 
148
static void initialize_windowaggregate(WindowAggState *winstate,
 
149
                                                   WindowStatePerFunc perfuncstate,
 
150
                                                   WindowStatePerAgg peraggstate);
 
151
static void advance_windowaggregate(WindowAggState *winstate,
 
152
                                                WindowStatePerFunc perfuncstate,
 
153
                                                WindowStatePerAgg peraggstate);
 
154
static void finalize_windowaggregate(WindowAggState *winstate,
 
155
                                                 WindowStatePerFunc perfuncstate,
 
156
                                                 WindowStatePerAgg peraggstate,
 
157
                                                 Datum *result, bool *isnull);
 
158
 
 
159
static void eval_windowaggregates(WindowAggState *winstate);
 
160
static void eval_windowfunction(WindowAggState *winstate,
 
161
                                        WindowStatePerFunc perfuncstate,
 
162
                                        Datum *result, bool *isnull);
 
163
 
 
164
static void begin_partition(WindowAggState *winstate);
 
165
static void spool_tuples(WindowAggState *winstate, int64 pos);
 
166
static void release_partition(WindowAggState *winstate);
 
167
 
 
168
static bool row_is_in_frame(WindowAggState *winstate, int64 pos,
 
169
                                TupleTableSlot *slot);
 
170
static void update_frameheadpos(WindowObject winobj, TupleTableSlot *slot);
 
171
static void update_frametailpos(WindowObject winobj, TupleTableSlot *slot);
 
172
 
 
173
static WindowStatePerAggData *initialize_peragg(WindowAggState *winstate,
 
174
                                  WindowFunc *wfunc,
 
175
                                  WindowStatePerAgg peraggstate);
 
176
static Datum GetAggInitVal(Datum textInitVal, Oid transtype);
 
177
 
 
178
static bool are_peers(WindowAggState *winstate, TupleTableSlot *slot1,
 
179
                  TupleTableSlot *slot2);
 
180
static bool window_gettupleslot(WindowObject winobj, int64 pos,
 
181
                                        TupleTableSlot *slot);
 
182
 
 
183
 
 
184
/*
 
185
 * initialize_windowaggregate
 
186
 * parallel to initialize_aggregates in nodeAgg.c
 
187
 */
 
188
static void
 
189
initialize_windowaggregate(WindowAggState *winstate,
 
190
                                                   WindowStatePerFunc perfuncstate,
 
191
                                                   WindowStatePerAgg peraggstate)
 
192
{
 
193
        MemoryContext oldContext;
 
194
 
 
195
        if (peraggstate->initValueIsNull)
 
196
                peraggstate->transValue = peraggstate->initValue;
 
197
        else
 
198
        {
 
199
                oldContext = MemoryContextSwitchTo(winstate->aggcontext);
 
200
                peraggstate->transValue = datumCopy(peraggstate->initValue,
 
201
                                                                                        peraggstate->transtypeByVal,
 
202
                                                                                        peraggstate->transtypeLen);
 
203
                MemoryContextSwitchTo(oldContext);
 
204
        }
 
205
        peraggstate->transValueIsNull = peraggstate->initValueIsNull;
 
206
        peraggstate->noTransValue = peraggstate->initValueIsNull;
 
207
        peraggstate->resultValueIsNull = true;
 
208
}
 
209
 
 
210
/*
 
211
 * advance_windowaggregate
 
212
 * parallel to advance_aggregates in nodeAgg.c
 
213
 */
 
214
static void
 
215
advance_windowaggregate(WindowAggState *winstate,
 
216
                                                WindowStatePerFunc perfuncstate,
 
217
                                                WindowStatePerAgg peraggstate)
 
218
{
 
219
        WindowFuncExprState *wfuncstate = perfuncstate->wfuncstate;
 
220
        int                     numArguments = perfuncstate->numArguments;
 
221
        FunctionCallInfoData fcinfodata;
 
222
        FunctionCallInfo fcinfo = &fcinfodata;
 
223
        Datum           newVal;
 
224
        ListCell   *arg;
 
225
        int                     i;
 
226
        MemoryContext oldContext;
 
227
        ExprContext *econtext = winstate->tmpcontext;
 
228
 
 
229
        oldContext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
 
230
 
 
231
        /* We start from 1, since the 0th arg will be the transition value */
 
232
        i = 1;
 
233
        foreach(arg, wfuncstate->args)
 
234
        {
 
235
                ExprState  *argstate = (ExprState *) lfirst(arg);
 
236
 
 
237
                fcinfo->arg[i] = ExecEvalExpr(argstate, econtext,
 
238
                                                                          &fcinfo->argnull[i], NULL);
 
239
                i++;
 
240
        }
 
241
 
 
242
        if (peraggstate->transfn.fn_strict)
 
243
        {
 
244
                /*
 
245
                 * For a strict transfn, nothing happens when there's a NULL input; we
 
246
                 * just keep the prior transValue.
 
247
                 */
 
248
                for (i = 1; i <= numArguments; i++)
 
249
                {
 
250
                        if (fcinfo->argnull[i])
 
251
                        {
 
252
                                MemoryContextSwitchTo(oldContext);
 
253
                                return;
 
254
                        }
 
255
                }
 
256
                if (peraggstate->noTransValue)
 
257
                {
 
258
                        /*
 
259
                         * transValue has not been initialized. This is the first non-NULL
 
260
                         * input value. We use it as the initial value for transValue. (We
 
261
                         * already checked that the agg's input type is binary-compatible
 
262
                         * with its transtype, so straight copy here is OK.)
 
263
                         *
 
264
                         * We must copy the datum into aggcontext if it is pass-by-ref. We
 
265
                         * do not need to pfree the old transValue, since it's NULL.
 
266
                         */
 
267
                        MemoryContextSwitchTo(winstate->aggcontext);
 
268
                        peraggstate->transValue = datumCopy(fcinfo->arg[1],
 
269
                                                                                                peraggstate->transtypeByVal,
 
270
                                                                                                peraggstate->transtypeLen);
 
271
                        peraggstate->transValueIsNull = false;
 
272
                        peraggstate->noTransValue = false;
 
273
                        MemoryContextSwitchTo(oldContext);
 
274
                        return;
 
275
                }
 
276
                if (peraggstate->transValueIsNull)
 
277
                {
 
278
                        /*
 
279
                         * Don't call a strict function with NULL inputs.  Note it is
 
280
                         * possible to get here despite the above tests, if the transfn is
 
281
                         * strict *and* returned a NULL on a prior cycle. If that happens
 
282
                         * we will propagate the NULL all the way to the end.
 
283
                         */
 
284
                        MemoryContextSwitchTo(oldContext);
 
285
                        return;
 
286
                }
 
287
        }
 
288
 
 
289
        /*
 
290
         * OK to call the transition function
 
291
         */
 
292
        InitFunctionCallInfoData(*fcinfo, &(peraggstate->transfn),
 
293
                                                         numArguments + 1,
 
294
                                                         perfuncstate->winCollation,
 
295
                                                         (void *) winstate, NULL);
 
296
        fcinfo->arg[0] = peraggstate->transValue;
 
297
        fcinfo->argnull[0] = peraggstate->transValueIsNull;
 
298
        newVal = FunctionCallInvoke(fcinfo);
 
299
 
 
300
        /*
 
301
         * If pass-by-ref datatype, must copy the new value into aggcontext and
 
302
         * pfree the prior transValue.  But if transfn returned a pointer to its
 
303
         * first input, we don't need to do anything.
 
304
         */
 
305
        if (!peraggstate->transtypeByVal &&
 
306
                DatumGetPointer(newVal) != DatumGetPointer(peraggstate->transValue))
 
307
        {
 
308
                if (!fcinfo->isnull)
 
309
                {
 
310
                        MemoryContextSwitchTo(winstate->aggcontext);
 
311
                        newVal = datumCopy(newVal,
 
312
                                                           peraggstate->transtypeByVal,
 
313
                                                           peraggstate->transtypeLen);
 
314
                }
 
315
                if (!peraggstate->transValueIsNull)
 
316
                        pfree(DatumGetPointer(peraggstate->transValue));
 
317
        }
 
318
 
 
319
        MemoryContextSwitchTo(oldContext);
 
320
        peraggstate->transValue = newVal;
 
321
        peraggstate->transValueIsNull = fcinfo->isnull;
 
322
}
 
323
 
 
324
/*
 
325
 * finalize_windowaggregate
 
326
 * parallel to finalize_aggregate in nodeAgg.c
 
327
 */
 
328
static void
 
329
finalize_windowaggregate(WindowAggState *winstate,
 
330
                                                 WindowStatePerFunc perfuncstate,
 
331
                                                 WindowStatePerAgg peraggstate,
 
332
                                                 Datum *result, bool *isnull)
 
333
{
 
334
        MemoryContext oldContext;
 
335
 
 
336
        oldContext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_tuple_memory);
 
337
 
 
338
        /*
 
339
         * Apply the agg's finalfn if one is provided, else return transValue.
 
340
         */
 
341
        if (OidIsValid(peraggstate->finalfn_oid))
 
342
        {
 
343
                FunctionCallInfoData fcinfo;
 
344
 
 
345
                InitFunctionCallInfoData(fcinfo, &(peraggstate->finalfn), 1,
 
346
                                                                 perfuncstate->winCollation,
 
347
                                                                 (void *) winstate, NULL);
 
348
                fcinfo.arg[0] = peraggstate->transValue;
 
349
                fcinfo.argnull[0] = peraggstate->transValueIsNull;
 
350
                if (fcinfo.flinfo->fn_strict && peraggstate->transValueIsNull)
 
351
                {
 
352
                        /* don't call a strict function with NULL inputs */
 
353
                        *result = (Datum) 0;
 
354
                        *isnull = true;
 
355
                }
 
356
                else
 
357
                {
 
358
                        *result = FunctionCallInvoke(&fcinfo);
 
359
                        *isnull = fcinfo.isnull;
 
360
                }
 
361
        }
 
362
        else
 
363
        {
 
364
                *result = peraggstate->transValue;
 
365
                *isnull = peraggstate->transValueIsNull;
 
366
        }
 
367
 
 
368
        /*
 
369
         * If result is pass-by-ref, make sure it is in the right context.
 
370
         */
 
371
        if (!peraggstate->resulttypeByVal && !*isnull &&
 
372
                !MemoryContextContains(CurrentMemoryContext,
 
373
                                                           DatumGetPointer(*result)))
 
374
                *result = datumCopy(*result,
 
375
                                                        peraggstate->resulttypeByVal,
 
376
                                                        peraggstate->resulttypeLen);
 
377
        MemoryContextSwitchTo(oldContext);
 
378
}
 
379
 
 
380
/*
 
381
 * eval_windowaggregates
 
382
 * evaluate plain aggregates being used as window functions
 
383
 *
 
384
 * Much of this is duplicated from nodeAgg.c.  But NOTE that we expect to be
 
385
 * able to call aggregate final functions repeatedly after aggregating more
 
386
 * data onto the same transition value.  This is not a behavior required by
 
387
 * nodeAgg.c.
 
388
 */
 
389
static void
 
390
eval_windowaggregates(WindowAggState *winstate)
 
391
{
 
392
        WindowStatePerAgg peraggstate;
 
393
        int                     wfuncno,
 
394
                                numaggs;
 
395
        int                     i;
 
396
        MemoryContext oldContext;
 
397
        ExprContext *econtext;
 
398
        WindowObject agg_winobj;
 
399
        TupleTableSlot *agg_row_slot;
 
400
 
 
401
        numaggs = winstate->numaggs;
 
402
        if (numaggs == 0)
 
403
                return;                                 /* nothing to do */
 
404
 
 
405
        /* final output execution is in ps_ExprContext */
 
406
        econtext = winstate->ss.ps.ps_ExprContext;
 
407
        agg_winobj = winstate->agg_winobj;
 
408
        agg_row_slot = winstate->agg_row_slot;
 
409
 
 
410
        /*
 
411
         * Currently, we support only a subset of the SQL-standard window framing
 
412
         * rules.
 
413
         *
 
414
         * If the frame start is UNBOUNDED_PRECEDING, the window frame consists of
 
415
         * a contiguous group of rows extending forward from the start of the
 
416
         * partition, and rows only enter the frame, never exit it, as the current
 
417
         * row advances forward.  This makes it possible to use an incremental
 
418
         * strategy for evaluating aggregates: we run the transition function for
 
419
         * each row added to the frame, and run the final function whenever we
 
420
         * need the current aggregate value.  This is considerably more efficient
 
421
         * than the naive approach of re-running the entire aggregate calculation
 
422
         * for each current row.  It does assume that the final function doesn't
 
423
         * damage the running transition value, but we have the same assumption in
 
424
         * nodeAgg.c too (when it rescans an existing hash table).
 
425
         *
 
426
         * For other frame start rules, we discard the aggregate state and re-run
 
427
         * the aggregates whenever the frame head row moves.  We can still
 
428
         * optimize as above whenever successive rows share the same frame head.
 
429
         *
 
430
         * In many common cases, multiple rows share the same frame and hence the
 
431
         * same aggregate value. (In particular, if there's no ORDER BY in a RANGE
 
432
         * window, then all rows are peers and so they all have window frame equal
 
433
         * to the whole partition.)  We optimize such cases by calculating the
 
434
         * aggregate value once when we reach the first row of a peer group, and
 
435
         * then returning the saved value for all subsequent rows.
 
436
         *
 
437
         * 'aggregatedupto' keeps track of the first row that has not yet been
 
438
         * accumulated into the aggregate transition values.  Whenever we start a
 
439
         * new peer group, we accumulate forward to the end of the peer group.
 
440
         *
 
441
         * TODO: Rerunning aggregates from the frame start can be pretty slow. For
 
442
         * some aggregates like SUM and COUNT we could avoid that by implementing
 
443
         * a "negative transition function" that would be called for each row as
 
444
         * it exits the frame.  We'd have to think about avoiding recalculation of
 
445
         * volatile arguments of aggregate functions, too.
 
446
         */
 
447
 
 
448
        /*
 
449
         * First, update the frame head position.
 
450
         */
 
451
        update_frameheadpos(agg_winobj, winstate->temp_slot_1);
 
452
 
 
453
        /*
 
454
         * Initialize aggregates on first call for partition, or if the frame head
 
455
         * position moved since last time.
 
456
         */
 
457
        if (winstate->currentpos == 0 ||
 
458
                winstate->frameheadpos != winstate->aggregatedbase)
 
459
        {
 
460
                /*
 
461
                 * Discard transient aggregate values
 
462
                 */
 
463
                MemoryContextResetAndDeleteChildren(winstate->aggcontext);
 
464
 
 
465
                for (i = 0; i < numaggs; i++)
 
466
                {
 
467
                        peraggstate = &winstate->peragg[i];
 
468
                        wfuncno = peraggstate->wfuncno;
 
469
                        initialize_windowaggregate(winstate,
 
470
                                                                           &winstate->perfunc[wfuncno],
 
471
                                                                           peraggstate);
 
472
                }
 
473
 
 
474
                /*
 
475
                 * If we created a mark pointer for aggregates, keep it pushed up to
 
476
                 * frame head, so that tuplestore can discard unnecessary rows.
 
477
                 */
 
478
                if (agg_winobj->markptr >= 0)
 
479
                        WinSetMarkPosition(agg_winobj, winstate->frameheadpos);
 
480
 
 
481
                /*
 
482
                 * Initialize for loop below
 
483
                 */
 
484
                ExecClearTuple(agg_row_slot);
 
485
                winstate->aggregatedbase = winstate->frameheadpos;
 
486
                winstate->aggregatedupto = winstate->frameheadpos;
 
487
        }
 
488
 
 
489
        /*
 
490
         * In UNBOUNDED_FOLLOWING mode, we don't have to recalculate aggregates
 
491
         * except when the frame head moves.  In END_CURRENT_ROW mode, we only
 
492
         * have to recalculate when the frame head moves or currentpos has
 
493
         * advanced past the place we'd aggregated up to.  Check for these cases
 
494
         * and if so, reuse the saved result values.
 
495
         */
 
496
        if ((winstate->frameOptions & (FRAMEOPTION_END_UNBOUNDED_FOLLOWING |
 
497
                                                                   FRAMEOPTION_END_CURRENT_ROW)) &&
 
498
                winstate->aggregatedbase <= winstate->currentpos &&
 
499
                winstate->aggregatedupto > winstate->currentpos)
 
500
        {
 
501
                for (i = 0; i < numaggs; i++)
 
502
                {
 
503
                        peraggstate = &winstate->peragg[i];
 
504
                        wfuncno = peraggstate->wfuncno;
 
505
                        econtext->ecxt_aggvalues[wfuncno] = peraggstate->resultValue;
 
506
                        econtext->ecxt_aggnulls[wfuncno] = peraggstate->resultValueIsNull;
 
507
                }
 
508
                return;
 
509
        }
 
510
 
 
511
        /*
 
512
         * Advance until we reach a row not in frame (or end of partition).
 
513
         *
 
514
         * Note the loop invariant: agg_row_slot is either empty or holds the row
 
515
         * at position aggregatedupto.  We advance aggregatedupto after processing
 
516
         * a row.
 
517
         */
 
518
        for (;;)
 
519
        {
 
520
                /* Fetch next row if we didn't already */
 
521
                if (TupIsNull(agg_row_slot))
 
522
                {
 
523
                        if (!window_gettupleslot(agg_winobj, winstate->aggregatedupto,
 
524
                                                                         agg_row_slot))
 
525
                                break;                  /* must be end of partition */
 
526
                }
 
527
 
 
528
                /* Exit loop (for now) if not in frame */
 
529
                if (!row_is_in_frame(winstate, winstate->aggregatedupto, agg_row_slot))
 
530
                        break;
 
531
 
 
532
                /* Set tuple context for evaluation of aggregate arguments */
 
533
                winstate->tmpcontext->ecxt_outertuple = agg_row_slot;
 
534
 
 
535
                /* Accumulate row into the aggregates */
 
536
                for (i = 0; i < numaggs; i++)
 
537
                {
 
538
                        peraggstate = &winstate->peragg[i];
 
539
                        wfuncno = peraggstate->wfuncno;
 
540
                        advance_windowaggregate(winstate,
 
541
                                                                        &winstate->perfunc[wfuncno],
 
542
                                                                        peraggstate);
 
543
                }
 
544
 
 
545
                /* Reset per-input-tuple context after each tuple */
 
546
                ResetExprContext(winstate->tmpcontext);
 
547
 
 
548
                /* And advance the aggregated-row state */
 
549
                winstate->aggregatedupto++;
 
550
                ExecClearTuple(agg_row_slot);
 
551
        }
 
552
 
 
553
        /*
 
554
         * finalize aggregates and fill result/isnull fields.
 
555
         */
 
556
        for (i = 0; i < numaggs; i++)
 
557
        {
 
558
                Datum      *result;
 
559
                bool       *isnull;
 
560
 
 
561
                peraggstate = &winstate->peragg[i];
 
562
                wfuncno = peraggstate->wfuncno;
 
563
                result = &econtext->ecxt_aggvalues[wfuncno];
 
564
                isnull = &econtext->ecxt_aggnulls[wfuncno];
 
565
                finalize_windowaggregate(winstate,
 
566
                                                                 &winstate->perfunc[wfuncno],
 
567
                                                                 peraggstate,
 
568
                                                                 result, isnull);
 
569
 
 
570
                /*
 
571
                 * save the result in case next row shares the same frame.
 
572
                 *
 
573
                 * XXX in some framing modes, eg ROWS/END_CURRENT_ROW, we can know in
 
574
                 * advance that the next row can't possibly share the same frame. Is
 
575
                 * it worth detecting that and skipping this code?
 
576
                 */
 
577
                if (!peraggstate->resulttypeByVal)
 
578
                {
 
579
                        /*
 
580
                         * clear old resultValue in order not to leak memory.  (Note: the
 
581
                         * new result can't possibly be the same datum as old resultValue,
 
582
                         * because we never passed it to the trans function.)
 
583
                         */
 
584
                        if (!peraggstate->resultValueIsNull)
 
585
                                pfree(DatumGetPointer(peraggstate->resultValue));
 
586
 
 
587
                        /*
 
588
                         * If pass-by-ref, copy it into our aggregate context.
 
589
                         */
 
590
                        if (!*isnull)
 
591
                        {
 
592
                                oldContext = MemoryContextSwitchTo(winstate->aggcontext);
 
593
                                peraggstate->resultValue =
 
594
                                        datumCopy(*result,
 
595
                                                          peraggstate->resulttypeByVal,
 
596
                                                          peraggstate->resulttypeLen);
 
597
                                MemoryContextSwitchTo(oldContext);
 
598
                        }
 
599
                }
 
600
                else
 
601
                {
 
602
                        peraggstate->resultValue = *result;
 
603
                }
 
604
                peraggstate->resultValueIsNull = *isnull;
 
605
        }
 
606
}
 
607
 
 
608
/*
 
609
 * eval_windowfunction
 
610
 *
 
611
 * Arguments of window functions are not evaluated here, because a window
 
612
 * function can need random access to arbitrary rows in the partition.
 
613
 * The window function uses the special WinGetFuncArgInPartition and
 
614
 * WinGetFuncArgInFrame functions to evaluate the arguments for the rows
 
615
 * it wants.
 
616
 */
 
617
static void
 
618
eval_windowfunction(WindowAggState *winstate, WindowStatePerFunc perfuncstate,
 
619
                                        Datum *result, bool *isnull)
 
620
{
 
621
        FunctionCallInfoData fcinfo;
 
622
        MemoryContext oldContext;
 
623
 
 
624
        oldContext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_tuple_memory);
 
625
 
 
626
        /*
 
627
         * We don't pass any normal arguments to a window function, but we do pass
 
628
         * it the number of arguments, in order to permit window function
 
629
         * implementations to support varying numbers of arguments.  The real info
 
630
         * goes through the WindowObject, which is passed via fcinfo->context.
 
631
         */
 
632
        InitFunctionCallInfoData(fcinfo, &(perfuncstate->flinfo),
 
633
                                                         perfuncstate->numArguments,
 
634
                                                         perfuncstate->winCollation,
 
635
                                                         (void *) perfuncstate->winobj, NULL);
 
636
        /* Just in case, make all the regular argument slots be null */
 
637
        memset(fcinfo.argnull, true, perfuncstate->numArguments);
 
638
 
 
639
        *result = FunctionCallInvoke(&fcinfo);
 
640
        *isnull = fcinfo.isnull;
 
641
 
 
642
        /*
 
643
         * Make sure pass-by-ref data is allocated in the appropriate context. (We
 
644
         * need this in case the function returns a pointer into some short-lived
 
645
         * tuple, as is entirely possible.)
 
646
         */
 
647
        if (!perfuncstate->resulttypeByVal && !fcinfo.isnull &&
 
648
                !MemoryContextContains(CurrentMemoryContext,
 
649
                                                           DatumGetPointer(*result)))
 
650
                *result = datumCopy(*result,
 
651
                                                        perfuncstate->resulttypeByVal,
 
652
                                                        perfuncstate->resulttypeLen);
 
653
 
 
654
        MemoryContextSwitchTo(oldContext);
 
655
}
 
656
 
 
657
/*
 
658
 * begin_partition
 
659
 * Start buffering rows of the next partition.
 
660
 */
 
661
static void
 
662
begin_partition(WindowAggState *winstate)
 
663
{
 
664
        PlanState  *outerPlan = outerPlanState(winstate);
 
665
        int                     numfuncs = winstate->numfuncs;
 
666
        int                     i;
 
667
 
 
668
        winstate->partition_spooled = false;
 
669
        winstate->framehead_valid = false;
 
670
        winstate->frametail_valid = false;
 
671
        winstate->spooled_rows = 0;
 
672
        winstate->currentpos = 0;
 
673
        winstate->frameheadpos = 0;
 
674
        winstate->frametailpos = -1;
 
675
        ExecClearTuple(winstate->agg_row_slot);
 
676
 
 
677
        /*
 
678
         * If this is the very first partition, we need to fetch the first input
 
679
         * row to store in first_part_slot.
 
680
         */
 
681
        if (TupIsNull(winstate->first_part_slot))
 
682
        {
 
683
                TupleTableSlot *outerslot = ExecProcNode(outerPlan);
 
684
 
 
685
                if (!TupIsNull(outerslot))
 
686
                        ExecCopySlot(winstate->first_part_slot, outerslot);
 
687
                else
 
688
                {
 
689
                        /* outer plan is empty, so we have nothing to do */
 
690
                        winstate->partition_spooled = true;
 
691
                        winstate->more_partitions = false;
 
692
                        return;
 
693
                }
 
694
        }
 
695
 
 
696
        /* Create new tuplestore for this partition */
 
697
        winstate->buffer = tuplestore_begin_heap(false, false, work_mem);
 
698
 
 
699
        /*
 
700
         * Set up read pointers for the tuplestore.  The current pointer doesn't
 
701
         * need BACKWARD capability, but the per-window-function read pointers do,
 
702
         * and the aggregate pointer does if frame start is movable.
 
703
         */
 
704
        winstate->current_ptr = 0;      /* read pointer 0 is pre-allocated */
 
705
 
 
706
        /* reset default REWIND capability bit for current ptr */
 
707
        tuplestore_set_eflags(winstate->buffer, 0);
 
708
 
 
709
        /* create read pointers for aggregates, if needed */
 
710
        if (winstate->numaggs > 0)
 
711
        {
 
712
                WindowObject agg_winobj = winstate->agg_winobj;
 
713
                int                     readptr_flags = 0;
 
714
 
 
715
                /* If the frame head is potentially movable ... */
 
716
                if (!(winstate->frameOptions & FRAMEOPTION_START_UNBOUNDED_PRECEDING))
 
717
                {
 
718
                        /* ... create a mark pointer to track the frame head */
 
719
                        agg_winobj->markptr = tuplestore_alloc_read_pointer(winstate->buffer, 0);
 
720
                        /* and the read pointer will need BACKWARD capability */
 
721
                        readptr_flags |= EXEC_FLAG_BACKWARD;
 
722
                }
 
723
 
 
724
                agg_winobj->readptr = tuplestore_alloc_read_pointer(winstate->buffer,
 
725
                                                                                                                        readptr_flags);
 
726
                agg_winobj->markpos = -1;
 
727
                agg_winobj->seekpos = -1;
 
728
 
 
729
                /* Also reset the row counters for aggregates */
 
730
                winstate->aggregatedbase = 0;
 
731
                winstate->aggregatedupto = 0;
 
732
        }
 
733
 
 
734
        /* create mark and read pointers for each real window function */
 
735
        for (i = 0; i < numfuncs; i++)
 
736
        {
 
737
                WindowStatePerFunc perfuncstate = &(winstate->perfunc[i]);
 
738
 
 
739
                if (!perfuncstate->plain_agg)
 
740
                {
 
741
                        WindowObject winobj = perfuncstate->winobj;
 
742
 
 
743
                        winobj->markptr = tuplestore_alloc_read_pointer(winstate->buffer,
 
744
                                                                                                                        0);
 
745
                        winobj->readptr = tuplestore_alloc_read_pointer(winstate->buffer,
 
746
                                                                                                                 EXEC_FLAG_BACKWARD);
 
747
                        winobj->markpos = -1;
 
748
                        winobj->seekpos = -1;
 
749
                }
 
750
        }
 
751
 
 
752
        /*
 
753
         * Store the first tuple into the tuplestore (it's always available now;
 
754
         * we either read it above, or saved it at the end of previous partition)
 
755
         */
 
756
        tuplestore_puttupleslot(winstate->buffer, winstate->first_part_slot);
 
757
        winstate->spooled_rows++;
 
758
}
 
759
 
 
760
/*
 
761
 * Read tuples from the outer node, up to and including position 'pos', and
 
762
 * store them into the tuplestore. If pos is -1, reads the whole partition.
 
763
 */
 
764
static void
 
765
spool_tuples(WindowAggState *winstate, int64 pos)
 
766
{
 
767
        WindowAgg  *node = (WindowAgg *) winstate->ss.ps.plan;
 
768
        PlanState  *outerPlan;
 
769
        TupleTableSlot *outerslot;
 
770
        MemoryContext oldcontext;
 
771
 
 
772
        if (!winstate->buffer)
 
773
                return;                                 /* just a safety check */
 
774
        if (winstate->partition_spooled)
 
775
                return;                                 /* whole partition done already */
 
776
 
 
777
        /*
 
778
         * If the tuplestore has spilled to disk, alternate reading and writing
 
779
         * becomes quite expensive due to frequent buffer flushes.      It's cheaper
 
780
         * to force the entire partition to get spooled in one go.
 
781
         *
 
782
         * XXX this is a horrid kluge --- it'd be better to fix the performance
 
783
         * problem inside tuplestore.  FIXME
 
784
         */
 
785
        if (!tuplestore_in_memory(winstate->buffer))
 
786
                pos = -1;
 
787
 
 
788
        outerPlan = outerPlanState(winstate);
 
789
 
 
790
        /* Must be in query context to call outerplan */
 
791
        oldcontext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_query_memory);
 
792
 
 
793
        while (winstate->spooled_rows <= pos || pos == -1)
 
794
        {
 
795
                outerslot = ExecProcNode(outerPlan);
 
796
                if (TupIsNull(outerslot))
 
797
                {
 
798
                        /* reached the end of the last partition */
 
799
                        winstate->partition_spooled = true;
 
800
                        winstate->more_partitions = false;
 
801
                        break;
 
802
                }
 
803
 
 
804
                if (node->partNumCols > 0)
 
805
                {
 
806
                        /* Check if this tuple still belongs to the current partition */
 
807
                        if (!execTuplesMatch(winstate->first_part_slot,
 
808
                                                                 outerslot,
 
809
                                                                 node->partNumCols, node->partColIdx,
 
810
                                                                 winstate->partEqfunctions,
 
811
                                                                 winstate->tmpcontext->ecxt_per_tuple_memory))
 
812
                        {
 
813
                                /*
 
814
                                 * end of partition; copy the tuple for the next cycle.
 
815
                                 */
 
816
                                ExecCopySlot(winstate->first_part_slot, outerslot);
 
817
                                winstate->partition_spooled = true;
 
818
                                winstate->more_partitions = true;
 
819
                                break;
 
820
                        }
 
821
                }
 
822
 
 
823
                /* Still in partition, so save it into the tuplestore */
 
824
                tuplestore_puttupleslot(winstate->buffer, outerslot);
 
825
                winstate->spooled_rows++;
 
826
        }
 
827
 
 
828
        MemoryContextSwitchTo(oldcontext);
 
829
}
 
830
 
 
831
/*
 
832
 * release_partition
 
833
 * clear information kept within a partition, including
 
834
 * tuplestore and aggregate results.
 
835
 */
 
836
static void
 
837
release_partition(WindowAggState *winstate)
 
838
{
 
839
        int                     i;
 
840
 
 
841
        for (i = 0; i < winstate->numfuncs; i++)
 
842
        {
 
843
                WindowStatePerFunc perfuncstate = &(winstate->perfunc[i]);
 
844
 
 
845
                /* Release any partition-local state of this window function */
 
846
                if (perfuncstate->winobj)
 
847
                        perfuncstate->winobj->localmem = NULL;
 
848
        }
 
849
 
 
850
        /*
 
851
         * Release all partition-local memory (in particular, any partition-local
 
852
         * state that we might have trashed our pointers to in the above loop, and
 
853
         * any aggregate temp data).  We don't rely on retail pfree because some
 
854
         * aggregates might have allocated data we don't have direct pointers to.
 
855
         */
 
856
        MemoryContextResetAndDeleteChildren(winstate->partcontext);
 
857
        MemoryContextResetAndDeleteChildren(winstate->aggcontext);
 
858
 
 
859
        if (winstate->buffer)
 
860
                tuplestore_end(winstate->buffer);
 
861
        winstate->buffer = NULL;
 
862
        winstate->partition_spooled = false;
 
863
}
 
864
 
 
865
/*
 
866
 * row_is_in_frame
 
867
 * Determine whether a row is in the current row's window frame according
 
868
 * to our window framing rule
 
869
 *
 
870
 * The caller must have already determined that the row is in the partition
 
871
 * and fetched it into a slot.  This function just encapsulates the framing
 
872
 * rules.
 
873
 */
 
874
static bool
 
875
row_is_in_frame(WindowAggState *winstate, int64 pos, TupleTableSlot *slot)
 
876
{
 
877
        int                     frameOptions = winstate->frameOptions;
 
878
 
 
879
        Assert(pos >= 0);                       /* else caller error */
 
880
 
 
881
        /* First, check frame starting conditions */
 
882
        if (frameOptions & FRAMEOPTION_START_CURRENT_ROW)
 
883
        {
 
884
                if (frameOptions & FRAMEOPTION_ROWS)
 
885
                {
 
886
                        /* rows before current row are out of frame */
 
887
                        if (pos < winstate->currentpos)
 
888
                                return false;
 
889
                }
 
890
                else if (frameOptions & FRAMEOPTION_RANGE)
 
891
                {
 
892
                        /* preceding row that is not peer is out of frame */
 
893
                        if (pos < winstate->currentpos &&
 
894
                                !are_peers(winstate, slot, winstate->ss.ss_ScanTupleSlot))
 
895
                                return false;
 
896
                }
 
897
                else
 
898
                        Assert(false);
 
899
        }
 
900
        else if (frameOptions & FRAMEOPTION_START_VALUE)
 
901
        {
 
902
                if (frameOptions & FRAMEOPTION_ROWS)
 
903
                {
 
904
                        int64           offset = DatumGetInt64(winstate->startOffsetValue);
 
905
 
 
906
                        /* rows before current row + offset are out of frame */
 
907
                        if (frameOptions & FRAMEOPTION_START_VALUE_PRECEDING)
 
908
                                offset = -offset;
 
909
 
 
910
                        if (pos < winstate->currentpos + offset)
 
911
                                return false;
 
912
                }
 
913
                else if (frameOptions & FRAMEOPTION_RANGE)
 
914
                {
 
915
                        /* parser should have rejected this */
 
916
                        elog(ERROR, "window frame with value offset is not implemented");
 
917
                }
 
918
                else
 
919
                        Assert(false);
 
920
        }
 
921
 
 
922
        /* Okay so far, now check frame ending conditions */
 
923
        if (frameOptions & FRAMEOPTION_END_CURRENT_ROW)
 
924
        {
 
925
                if (frameOptions & FRAMEOPTION_ROWS)
 
926
                {
 
927
                        /* rows after current row are out of frame */
 
928
                        if (pos > winstate->currentpos)
 
929
                                return false;
 
930
                }
 
931
                else if (frameOptions & FRAMEOPTION_RANGE)
 
932
                {
 
933
                        /* following row that is not peer is out of frame */
 
934
                        if (pos > winstate->currentpos &&
 
935
                                !are_peers(winstate, slot, winstate->ss.ss_ScanTupleSlot))
 
936
                                return false;
 
937
                }
 
938
                else
 
939
                        Assert(false);
 
940
        }
 
941
        else if (frameOptions & FRAMEOPTION_END_VALUE)
 
942
        {
 
943
                if (frameOptions & FRAMEOPTION_ROWS)
 
944
                {
 
945
                        int64           offset = DatumGetInt64(winstate->endOffsetValue);
 
946
 
 
947
                        /* rows after current row + offset are out of frame */
 
948
                        if (frameOptions & FRAMEOPTION_END_VALUE_PRECEDING)
 
949
                                offset = -offset;
 
950
 
 
951
                        if (pos > winstate->currentpos + offset)
 
952
                                return false;
 
953
                }
 
954
                else if (frameOptions & FRAMEOPTION_RANGE)
 
955
                {
 
956
                        /* parser should have rejected this */
 
957
                        elog(ERROR, "window frame with value offset is not implemented");
 
958
                }
 
959
                else
 
960
                        Assert(false);
 
961
        }
 
962
 
 
963
        /* If we get here, it's in frame */
 
964
        return true;
 
965
}
 
966
 
 
967
/*
 
968
 * update_frameheadpos
 
969
 * make frameheadpos valid for the current row
 
970
 *
 
971
 * Uses the winobj's read pointer for any required fetches; hence, if the
 
972
 * frame mode is one that requires row comparisons, the winobj's mark must
 
973
 * not be past the currently known frame head.  Also uses the specified slot
 
974
 * for any required fetches.
 
975
 */
 
976
static void
 
977
update_frameheadpos(WindowObject winobj, TupleTableSlot *slot)
 
978
{
 
979
        WindowAggState *winstate = winobj->winstate;
 
980
        WindowAgg  *node = (WindowAgg *) winstate->ss.ps.plan;
 
981
        int                     frameOptions = winstate->frameOptions;
 
982
 
 
983
        if (winstate->framehead_valid)
 
984
                return;                                 /* already known for current row */
 
985
 
 
986
        if (frameOptions & FRAMEOPTION_START_UNBOUNDED_PRECEDING)
 
987
        {
 
988
                /* In UNBOUNDED PRECEDING mode, frame head is always row 0 */
 
989
                winstate->frameheadpos = 0;
 
990
                winstate->framehead_valid = true;
 
991
        }
 
992
        else if (frameOptions & FRAMEOPTION_START_CURRENT_ROW)
 
993
        {
 
994
                if (frameOptions & FRAMEOPTION_ROWS)
 
995
                {
 
996
                        /* In ROWS mode, frame head is the same as current */
 
997
                        winstate->frameheadpos = winstate->currentpos;
 
998
                        winstate->framehead_valid = true;
 
999
                }
 
1000
                else if (frameOptions & FRAMEOPTION_RANGE)
 
1001
                {
 
1002
                        int64           fhprev;
 
1003
 
 
1004
                        /* If no ORDER BY, all rows are peers with each other */
 
1005
                        if (node->ordNumCols == 0)
 
1006
                        {
 
1007
                                winstate->frameheadpos = 0;
 
1008
                                winstate->framehead_valid = true;
 
1009
                                return;
 
1010
                        }
 
1011
 
 
1012
                        /*
 
1013
                         * In RANGE START_CURRENT mode, frame head is the first row that
 
1014
                         * is a peer of current row.  We search backwards from current,
 
1015
                         * which could be a bit inefficient if peer sets are large. Might
 
1016
                         * be better to have a separate read pointer that moves forward
 
1017
                         * tracking the frame head.
 
1018
                         */
 
1019
                        fhprev = winstate->currentpos - 1;
 
1020
                        for (;;)
 
1021
                        {
 
1022
                                /* assume the frame head can't go backwards */
 
1023
                                if (fhprev < winstate->frameheadpos)
 
1024
                                        break;
 
1025
                                if (!window_gettupleslot(winobj, fhprev, slot))
 
1026
                                        break;          /* start of partition */
 
1027
                                if (!are_peers(winstate, slot, winstate->ss.ss_ScanTupleSlot))
 
1028
                                        break;          /* not peer of current row */
 
1029
                                fhprev--;
 
1030
                        }
 
1031
                        winstate->frameheadpos = fhprev + 1;
 
1032
                        winstate->framehead_valid = true;
 
1033
                }
 
1034
                else
 
1035
                        Assert(false);
 
1036
        }
 
1037
        else if (frameOptions & FRAMEOPTION_START_VALUE)
 
1038
        {
 
1039
                if (frameOptions & FRAMEOPTION_ROWS)
 
1040
                {
 
1041
                        /* In ROWS mode, bound is physically n before/after current */
 
1042
                        int64           offset = DatumGetInt64(winstate->startOffsetValue);
 
1043
 
 
1044
                        if (frameOptions & FRAMEOPTION_START_VALUE_PRECEDING)
 
1045
                                offset = -offset;
 
1046
 
 
1047
                        winstate->frameheadpos = winstate->currentpos + offset;
 
1048
                        /* frame head can't go before first row */
 
1049
                        if (winstate->frameheadpos < 0)
 
1050
                                winstate->frameheadpos = 0;
 
1051
                        else if (winstate->frameheadpos > winstate->currentpos)
 
1052
                        {
 
1053
                                /* make sure frameheadpos is not past end of partition */
 
1054
                                spool_tuples(winstate, winstate->frameheadpos - 1);
 
1055
                                if (winstate->frameheadpos > winstate->spooled_rows)
 
1056
                                        winstate->frameheadpos = winstate->spooled_rows;
 
1057
                        }
 
1058
                        winstate->framehead_valid = true;
 
1059
                }
 
1060
                else if (frameOptions & FRAMEOPTION_RANGE)
 
1061
                {
 
1062
                        /* parser should have rejected this */
 
1063
                        elog(ERROR, "window frame with value offset is not implemented");
 
1064
                }
 
1065
                else
 
1066
                        Assert(false);
 
1067
        }
 
1068
        else
 
1069
                Assert(false);
 
1070
}
 
1071
 
 
1072
/*
 
1073
 * update_frametailpos
 
1074
 * make frametailpos valid for the current row
 
1075
 *
 
1076
 * Uses the winobj's read pointer for any required fetches; hence, if the
 
1077
 * frame mode is one that requires row comparisons, the winobj's mark must
 
1078
 * not be past the currently known frame tail.  Also uses the specified slot
 
1079
 * for any required fetches.
 
1080
 */
 
1081
static void
 
1082
update_frametailpos(WindowObject winobj, TupleTableSlot *slot)
 
1083
{
 
1084
        WindowAggState *winstate = winobj->winstate;
 
1085
        WindowAgg  *node = (WindowAgg *) winstate->ss.ps.plan;
 
1086
        int                     frameOptions = winstate->frameOptions;
 
1087
 
 
1088
        if (winstate->frametail_valid)
 
1089
                return;                                 /* already known for current row */
 
1090
 
 
1091
        if (frameOptions & FRAMEOPTION_END_UNBOUNDED_FOLLOWING)
 
1092
        {
 
1093
                /* In UNBOUNDED FOLLOWING mode, all partition rows are in frame */
 
1094
                spool_tuples(winstate, -1);
 
1095
                winstate->frametailpos = winstate->spooled_rows - 1;
 
1096
                winstate->frametail_valid = true;
 
1097
        }
 
1098
        else if (frameOptions & FRAMEOPTION_END_CURRENT_ROW)
 
1099
        {
 
1100
                if (frameOptions & FRAMEOPTION_ROWS)
 
1101
                {
 
1102
                        /* In ROWS mode, exactly the rows up to current are in frame */
 
1103
                        winstate->frametailpos = winstate->currentpos;
 
1104
                        winstate->frametail_valid = true;
 
1105
                }
 
1106
                else if (frameOptions & FRAMEOPTION_RANGE)
 
1107
                {
 
1108
                        int64           ftnext;
 
1109
 
 
1110
                        /* If no ORDER BY, all rows are peers with each other */
 
1111
                        if (node->ordNumCols == 0)
 
1112
                        {
 
1113
                                spool_tuples(winstate, -1);
 
1114
                                winstate->frametailpos = winstate->spooled_rows - 1;
 
1115
                                winstate->frametail_valid = true;
 
1116
                                return;
 
1117
                        }
 
1118
 
 
1119
                        /*
 
1120
                         * Else we have to search for the first non-peer of the current
 
1121
                         * row.  We assume the current value of frametailpos is a lower
 
1122
                         * bound on the possible frame tail location, ie, frame tail never
 
1123
                         * goes backward, and that currentpos is also a lower bound, ie,
 
1124
                         * frame end always >= current row.
 
1125
                         */
 
1126
                        ftnext = Max(winstate->frametailpos, winstate->currentpos) + 1;
 
1127
                        for (;;)
 
1128
                        {
 
1129
                                if (!window_gettupleslot(winobj, ftnext, slot))
 
1130
                                        break;          /* end of partition */
 
1131
                                if (!are_peers(winstate, slot, winstate->ss.ss_ScanTupleSlot))
 
1132
                                        break;          /* not peer of current row */
 
1133
                                ftnext++;
 
1134
                        }
 
1135
                        winstate->frametailpos = ftnext - 1;
 
1136
                        winstate->frametail_valid = true;
 
1137
                }
 
1138
                else
 
1139
                        Assert(false);
 
1140
        }
 
1141
        else if (frameOptions & FRAMEOPTION_END_VALUE)
 
1142
        {
 
1143
                if (frameOptions & FRAMEOPTION_ROWS)
 
1144
                {
 
1145
                        /* In ROWS mode, bound is physically n before/after current */
 
1146
                        int64           offset = DatumGetInt64(winstate->endOffsetValue);
 
1147
 
 
1148
                        if (frameOptions & FRAMEOPTION_END_VALUE_PRECEDING)
 
1149
                                offset = -offset;
 
1150
 
 
1151
                        winstate->frametailpos = winstate->currentpos + offset;
 
1152
                        /* smallest allowable value of frametailpos is -1 */
 
1153
                        if (winstate->frametailpos < 0)
 
1154
                                winstate->frametailpos = -1;
 
1155
                        else if (winstate->frametailpos > winstate->currentpos)
 
1156
                        {
 
1157
                                /* make sure frametailpos is not past last row of partition */
 
1158
                                spool_tuples(winstate, winstate->frametailpos);
 
1159
                                if (winstate->frametailpos >= winstate->spooled_rows)
 
1160
                                        winstate->frametailpos = winstate->spooled_rows - 1;
 
1161
                        }
 
1162
                        winstate->frametail_valid = true;
 
1163
                }
 
1164
                else if (frameOptions & FRAMEOPTION_RANGE)
 
1165
                {
 
1166
                        /* parser should have rejected this */
 
1167
                        elog(ERROR, "window frame with value offset is not implemented");
 
1168
                }
 
1169
                else
 
1170
                        Assert(false);
 
1171
        }
 
1172
        else
 
1173
                Assert(false);
 
1174
}
 
1175
 
 
1176
 
 
1177
/* -----------------
 
1178
 * ExecWindowAgg
 
1179
 *
 
1180
 *      ExecWindowAgg receives tuples from its outer subplan and
 
1181
 *      stores them into a tuplestore, then processes window functions.
 
1182
 *      This node doesn't reduce nor qualify any row so the number of
 
1183
 *      returned rows is exactly the same as its outer subplan's result
 
1184
 *      (ignoring the case of SRFs in the targetlist, that is).
 
1185
 * -----------------
 
1186
 */
 
1187
TupleTableSlot *
 
1188
ExecWindowAgg(WindowAggState *winstate)
 
1189
{
 
1190
        TupleTableSlot *result;
 
1191
        ExprDoneCond isDone;
 
1192
        ExprContext *econtext;
 
1193
        int                     i;
 
1194
        int                     numfuncs;
 
1195
 
 
1196
        if (winstate->all_done)
 
1197
                return NULL;
 
1198
 
 
1199
        /*
 
1200
         * Check to see if we're still projecting out tuples from a previous
 
1201
         * output tuple (because there is a function-returning-set in the
 
1202
         * projection expressions).  If so, try to project another one.
 
1203
         */
 
1204
        if (winstate->ss.ps.ps_TupFromTlist)
 
1205
        {
 
1206
                TupleTableSlot *result;
 
1207
                ExprDoneCond isDone;
 
1208
 
 
1209
                result = ExecProject(winstate->ss.ps.ps_ProjInfo, &isDone);
 
1210
                if (isDone == ExprMultipleResult)
 
1211
                        return result;
 
1212
                /* Done with that source tuple... */
 
1213
                winstate->ss.ps.ps_TupFromTlist = false;
 
1214
        }
 
1215
 
 
1216
        /*
 
1217
         * Compute frame offset values, if any, during first call.
 
1218
         */
 
1219
        if (winstate->all_first)
 
1220
        {
 
1221
                int                     frameOptions = winstate->frameOptions;
 
1222
                ExprContext *econtext = winstate->ss.ps.ps_ExprContext;
 
1223
                Datum           value;
 
1224
                bool            isnull;
 
1225
                int16           len;
 
1226
                bool            byval;
 
1227
 
 
1228
                if (frameOptions & FRAMEOPTION_START_VALUE)
 
1229
                {
 
1230
                        Assert(winstate->startOffset != NULL);
 
1231
                        value = ExecEvalExprSwitchContext(winstate->startOffset,
 
1232
                                                                                          econtext,
 
1233
                                                                                          &isnull,
 
1234
                                                                                          NULL);
 
1235
                        if (isnull)
 
1236
                                ereport(ERROR,
 
1237
                                                (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
 
1238
                                                 errmsg("frame starting offset must not be null")));
 
1239
                        /* copy value into query-lifespan context */
 
1240
                        get_typlenbyval(exprType((Node *) winstate->startOffset->expr),
 
1241
                                                        &len, &byval);
 
1242
                        winstate->startOffsetValue = datumCopy(value, byval, len);
 
1243
                        if (frameOptions & FRAMEOPTION_ROWS)
 
1244
                        {
 
1245
                                /* value is known to be int8 */
 
1246
                                int64           offset = DatumGetInt64(value);
 
1247
 
 
1248
                                if (offset < 0)
 
1249
                                        ereport(ERROR,
 
1250
                                                        (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
 
1251
                                          errmsg("frame starting offset must not be negative")));
 
1252
                        }
 
1253
                }
 
1254
                if (frameOptions & FRAMEOPTION_END_VALUE)
 
1255
                {
 
1256
                        Assert(winstate->endOffset != NULL);
 
1257
                        value = ExecEvalExprSwitchContext(winstate->endOffset,
 
1258
                                                                                          econtext,
 
1259
                                                                                          &isnull,
 
1260
                                                                                          NULL);
 
1261
                        if (isnull)
 
1262
                                ereport(ERROR,
 
1263
                                                (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
 
1264
                                                 errmsg("frame ending offset must not be null")));
 
1265
                        /* copy value into query-lifespan context */
 
1266
                        get_typlenbyval(exprType((Node *) winstate->endOffset->expr),
 
1267
                                                        &len, &byval);
 
1268
                        winstate->endOffsetValue = datumCopy(value, byval, len);
 
1269
                        if (frameOptions & FRAMEOPTION_ROWS)
 
1270
                        {
 
1271
                                /* value is known to be int8 */
 
1272
                                int64           offset = DatumGetInt64(value);
 
1273
 
 
1274
                                if (offset < 0)
 
1275
                                        ereport(ERROR,
 
1276
                                                        (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
 
1277
                                                errmsg("frame ending offset must not be negative")));
 
1278
                        }
 
1279
                }
 
1280
                winstate->all_first = false;
 
1281
        }
 
1282
 
 
1283
restart:
 
1284
        if (winstate->buffer == NULL)
 
1285
        {
 
1286
                /* Initialize for first partition and set current row = 0 */
 
1287
                begin_partition(winstate);
 
1288
                /* If there are no input rows, we'll detect that and exit below */
 
1289
        }
 
1290
        else
 
1291
        {
 
1292
                /* Advance current row within partition */
 
1293
                winstate->currentpos++;
 
1294
                /* This might mean that the frame moves, too */
 
1295
                winstate->framehead_valid = false;
 
1296
                winstate->frametail_valid = false;
 
1297
        }
 
1298
 
 
1299
        /*
 
1300
         * Spool all tuples up to and including the current row, if we haven't
 
1301
         * already
 
1302
         */
 
1303
        spool_tuples(winstate, winstate->currentpos);
 
1304
 
 
1305
        /* Move to the next partition if we reached the end of this partition */
 
1306
        if (winstate->partition_spooled &&
 
1307
                winstate->currentpos >= winstate->spooled_rows)
 
1308
        {
 
1309
                release_partition(winstate);
 
1310
 
 
1311
                if (winstate->more_partitions)
 
1312
                {
 
1313
                        begin_partition(winstate);
 
1314
                        Assert(winstate->spooled_rows > 0);
 
1315
                }
 
1316
                else
 
1317
                {
 
1318
                        winstate->all_done = true;
 
1319
                        return NULL;
 
1320
                }
 
1321
        }
 
1322
 
 
1323
        /* final output execution is in ps_ExprContext */
 
1324
        econtext = winstate->ss.ps.ps_ExprContext;
 
1325
 
 
1326
        /* Clear the per-output-tuple context for current row */
 
1327
        ResetExprContext(econtext);
 
1328
 
 
1329
        /*
 
1330
         * Read the current row from the tuplestore, and save in ScanTupleSlot.
 
1331
         * (We can't rely on the outerplan's output slot because we may have to
 
1332
         * read beyond the current row.  Also, we have to actually copy the row
 
1333
         * out of the tuplestore, since window function evaluation might cause the
 
1334
         * tuplestore to dump its state to disk.)
 
1335
         *
 
1336
         * Current row must be in the tuplestore, since we spooled it above.
 
1337
         */
 
1338
        tuplestore_select_read_pointer(winstate->buffer, winstate->current_ptr);
 
1339
        if (!tuplestore_gettupleslot(winstate->buffer, true, true,
 
1340
                                                                 winstate->ss.ss_ScanTupleSlot))
 
1341
                elog(ERROR, "unexpected end of tuplestore");
 
1342
 
 
1343
        /*
 
1344
         * Evaluate true window functions
 
1345
         */
 
1346
        numfuncs = winstate->numfuncs;
 
1347
        for (i = 0; i < numfuncs; i++)
 
1348
        {
 
1349
                WindowStatePerFunc perfuncstate = &(winstate->perfunc[i]);
 
1350
 
 
1351
                if (perfuncstate->plain_agg)
 
1352
                        continue;
 
1353
                eval_windowfunction(winstate, perfuncstate,
 
1354
                          &(econtext->ecxt_aggvalues[perfuncstate->wfuncstate->wfuncno]),
 
1355
                          &(econtext->ecxt_aggnulls[perfuncstate->wfuncstate->wfuncno]));
 
1356
        }
 
1357
 
 
1358
        /*
 
1359
         * Evaluate aggregates
 
1360
         */
 
1361
        if (winstate->numaggs > 0)
 
1362
                eval_windowaggregates(winstate);
 
1363
 
 
1364
        /*
 
1365
         * Truncate any no-longer-needed rows from the tuplestore.
 
1366
         */
 
1367
        tuplestore_trim(winstate->buffer);
 
1368
 
 
1369
        /*
 
1370
         * Form and return a projection tuple using the windowfunc results and the
 
1371
         * current row.  Setting ecxt_outertuple arranges that any Vars will be
 
1372
         * evaluated with respect to that row.
 
1373
         */
 
1374
        econtext->ecxt_outertuple = winstate->ss.ss_ScanTupleSlot;
 
1375
        result = ExecProject(winstate->ss.ps.ps_ProjInfo, &isDone);
 
1376
 
 
1377
        if (isDone == ExprEndResult)
 
1378
        {
 
1379
                /* SRF in tlist returned no rows, so advance to next input tuple */
 
1380
                goto restart;
 
1381
        }
 
1382
 
 
1383
        winstate->ss.ps.ps_TupFromTlist =
 
1384
                (isDone == ExprMultipleResult);
 
1385
        return result;
 
1386
}
 
1387
 
 
1388
/* -----------------
 
1389
 * ExecInitWindowAgg
 
1390
 *
 
1391
 *      Creates the run-time information for the WindowAgg node produced by the
 
1392
 *      planner and initializes its outer subtree
 
1393
 * -----------------
 
1394
 */
 
1395
WindowAggState *
 
1396
ExecInitWindowAgg(WindowAgg *node, EState *estate, int eflags)
 
1397
{
 
1398
        WindowAggState *winstate;
 
1399
        Plan       *outerPlan;
 
1400
        ExprContext *econtext;
 
1401
        ExprContext *tmpcontext;
 
1402
        WindowStatePerFunc perfunc;
 
1403
        WindowStatePerAgg peragg;
 
1404
        int                     numfuncs,
 
1405
                                wfuncno,
 
1406
                                numaggs,
 
1407
                                aggno;
 
1408
        ListCell   *l;
 
1409
 
 
1410
        /* check for unsupported flags */
 
1411
        Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
 
1412
 
 
1413
        /*
 
1414
         * create state structure
 
1415
         */
 
1416
        winstate = makeNode(WindowAggState);
 
1417
        winstate->ss.ps.plan = (Plan *) node;
 
1418
        winstate->ss.ps.state = estate;
 
1419
 
 
1420
        /*
 
1421
         * Create expression contexts.  We need two, one for per-input-tuple
 
1422
         * processing and one for per-output-tuple processing.  We cheat a little
 
1423
         * by using ExecAssignExprContext() to build both.
 
1424
         */
 
1425
        ExecAssignExprContext(estate, &winstate->ss.ps);
 
1426
        tmpcontext = winstate->ss.ps.ps_ExprContext;
 
1427
        winstate->tmpcontext = tmpcontext;
 
1428
        ExecAssignExprContext(estate, &winstate->ss.ps);
 
1429
 
 
1430
        /* Create long-lived context for storage of partition-local memory etc */
 
1431
        winstate->partcontext =
 
1432
                AllocSetContextCreate(CurrentMemoryContext,
 
1433
                                                          "WindowAgg_Partition",
 
1434
                                                          ALLOCSET_DEFAULT_MINSIZE,
 
1435
                                                          ALLOCSET_DEFAULT_INITSIZE,
 
1436
                                                          ALLOCSET_DEFAULT_MAXSIZE);
 
1437
 
 
1438
        /* Create mid-lived context for aggregate trans values etc */
 
1439
        winstate->aggcontext =
 
1440
                AllocSetContextCreate(CurrentMemoryContext,
 
1441
                                                          "WindowAgg_Aggregates",
 
1442
                                                          ALLOCSET_DEFAULT_MINSIZE,
 
1443
                                                          ALLOCSET_DEFAULT_INITSIZE,
 
1444
                                                          ALLOCSET_DEFAULT_MAXSIZE);
 
1445
 
 
1446
        /*
 
1447
         * tuple table initialization
 
1448
         */
 
1449
        ExecInitScanTupleSlot(estate, &winstate->ss);
 
1450
        ExecInitResultTupleSlot(estate, &winstate->ss.ps);
 
1451
        winstate->first_part_slot = ExecInitExtraTupleSlot(estate);
 
1452
        winstate->agg_row_slot = ExecInitExtraTupleSlot(estate);
 
1453
        winstate->temp_slot_1 = ExecInitExtraTupleSlot(estate);
 
1454
        winstate->temp_slot_2 = ExecInitExtraTupleSlot(estate);
 
1455
 
 
1456
        winstate->ss.ps.targetlist = (List *)
 
1457
                ExecInitExpr((Expr *) node->plan.targetlist,
 
1458
                                         (PlanState *) winstate);
 
1459
 
 
1460
        /*
 
1461
         * WindowAgg nodes never have quals, since they can only occur at the
 
1462
         * logical top level of a query (ie, after any WHERE or HAVING filters)
 
1463
         */
 
1464
        Assert(node->plan.qual == NIL);
 
1465
        winstate->ss.ps.qual = NIL;
 
1466
 
 
1467
        /*
 
1468
         * initialize child nodes
 
1469
         */
 
1470
        outerPlan = outerPlan(node);
 
1471
        outerPlanState(winstate) = ExecInitNode(outerPlan, estate, eflags);
 
1472
 
 
1473
        /*
 
1474
         * initialize source tuple type (which is also the tuple type that we'll
 
1475
         * store in the tuplestore and use in all our working slots).
 
1476
         */
 
1477
        ExecAssignScanTypeFromOuterPlan(&winstate->ss);
 
1478
 
 
1479
        ExecSetSlotDescriptor(winstate->first_part_slot,
 
1480
                                                  winstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor);
 
1481
        ExecSetSlotDescriptor(winstate->agg_row_slot,
 
1482
                                                  winstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor);
 
1483
        ExecSetSlotDescriptor(winstate->temp_slot_1,
 
1484
                                                  winstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor);
 
1485
        ExecSetSlotDescriptor(winstate->temp_slot_2,
 
1486
                                                  winstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor);
 
1487
 
 
1488
        /*
 
1489
         * Initialize result tuple type and projection info.
 
1490
         */
 
1491
        ExecAssignResultTypeFromTL(&winstate->ss.ps);
 
1492
        ExecAssignProjectionInfo(&winstate->ss.ps, NULL);
 
1493
 
 
1494
        winstate->ss.ps.ps_TupFromTlist = false;
 
1495
 
 
1496
        /* Set up data for comparing tuples */
 
1497
        if (node->partNumCols > 0)
 
1498
                winstate->partEqfunctions = execTuplesMatchPrepare(node->partNumCols,
 
1499
                                                                                                                node->partOperators);
 
1500
        if (node->ordNumCols > 0)
 
1501
                winstate->ordEqfunctions = execTuplesMatchPrepare(node->ordNumCols,
 
1502
                                                                                                                  node->ordOperators);
 
1503
 
 
1504
        /*
 
1505
         * WindowAgg nodes use aggvalues and aggnulls as well as Agg nodes.
 
1506
         */
 
1507
        numfuncs = winstate->numfuncs;
 
1508
        numaggs = winstate->numaggs;
 
1509
        econtext = winstate->ss.ps.ps_ExprContext;
 
1510
        econtext->ecxt_aggvalues = (Datum *) palloc0(sizeof(Datum) * numfuncs);
 
1511
        econtext->ecxt_aggnulls = (bool *) palloc0(sizeof(bool) * numfuncs);
 
1512
 
 
1513
        /*
 
1514
         * allocate per-wfunc/per-agg state information.
 
1515
         */
 
1516
        perfunc = (WindowStatePerFunc) palloc0(sizeof(WindowStatePerFuncData) * numfuncs);
 
1517
        peragg = (WindowStatePerAgg) palloc0(sizeof(WindowStatePerAggData) * numaggs);
 
1518
        winstate->perfunc = perfunc;
 
1519
        winstate->peragg = peragg;
 
1520
 
 
1521
        wfuncno = -1;
 
1522
        aggno = -1;
 
1523
        foreach(l, winstate->funcs)
 
1524
        {
 
1525
                WindowFuncExprState *wfuncstate = (WindowFuncExprState *) lfirst(l);
 
1526
                WindowFunc *wfunc = (WindowFunc *) wfuncstate->xprstate.expr;
 
1527
                WindowStatePerFunc perfuncstate;
 
1528
                AclResult       aclresult;
 
1529
                int                     i;
 
1530
 
 
1531
                if (wfunc->winref != node->winref)              /* planner screwed up? */
 
1532
                        elog(ERROR, "WindowFunc with winref %u assigned to WindowAgg with winref %u",
 
1533
                                 wfunc->winref, node->winref);
 
1534
 
 
1535
                /* Look for a previous duplicate window function */
 
1536
                for (i = 0; i <= wfuncno; i++)
 
1537
                {
 
1538
                        if (equal(wfunc, perfunc[i].wfunc) &&
 
1539
                                !contain_volatile_functions((Node *) wfunc))
 
1540
                                break;
 
1541
                }
 
1542
                if (i <= wfuncno)
 
1543
                {
 
1544
                        /* Found a match to an existing entry, so just mark it */
 
1545
                        wfuncstate->wfuncno = i;
 
1546
                        continue;
 
1547
                }
 
1548
 
 
1549
                /* Nope, so assign a new PerAgg record */
 
1550
                perfuncstate = &perfunc[++wfuncno];
 
1551
 
 
1552
                /* Mark WindowFunc state node with assigned index in the result array */
 
1553
                wfuncstate->wfuncno = wfuncno;
 
1554
 
 
1555
                /* Check permission to call window function */
 
1556
                aclresult = pg_proc_aclcheck(wfunc->winfnoid, GetUserId(),
 
1557
                                                                         ACL_EXECUTE);
 
1558
                if (aclresult != ACLCHECK_OK)
 
1559
                        aclcheck_error(aclresult, ACL_KIND_PROC,
 
1560
                                                   get_func_name(wfunc->winfnoid));
 
1561
 
 
1562
                /* Fill in the perfuncstate data */
 
1563
                perfuncstate->wfuncstate = wfuncstate;
 
1564
                perfuncstate->wfunc = wfunc;
 
1565
                perfuncstate->numArguments = list_length(wfuncstate->args);
 
1566
 
 
1567
                fmgr_info_cxt(wfunc->winfnoid, &perfuncstate->flinfo,
 
1568
                                          econtext->ecxt_per_query_memory);
 
1569
                fmgr_info_set_expr((Node *) wfunc, &perfuncstate->flinfo);
 
1570
 
 
1571
                perfuncstate->winCollation = wfunc->inputcollid;
 
1572
 
 
1573
                get_typlenbyval(wfunc->wintype,
 
1574
                                                &perfuncstate->resulttypeLen,
 
1575
                                                &perfuncstate->resulttypeByVal);
 
1576
 
 
1577
                /*
 
1578
                 * If it's really just a plain aggregate function, we'll emulate the
 
1579
                 * Agg environment for it.
 
1580
                 */
 
1581
                perfuncstate->plain_agg = wfunc->winagg;
 
1582
                if (wfunc->winagg)
 
1583
                {
 
1584
                        WindowStatePerAgg peraggstate;
 
1585
 
 
1586
                        perfuncstate->aggno = ++aggno;
 
1587
                        peraggstate = &winstate->peragg[aggno];
 
1588
                        initialize_peragg(winstate, wfunc, peraggstate);
 
1589
                        peraggstate->wfuncno = wfuncno;
 
1590
                }
 
1591
                else
 
1592
                {
 
1593
                        WindowObject winobj = makeNode(WindowObjectData);
 
1594
 
 
1595
                        winobj->winstate = winstate;
 
1596
                        winobj->argstates = wfuncstate->args;
 
1597
                        winobj->localmem = NULL;
 
1598
                        perfuncstate->winobj = winobj;
 
1599
                }
 
1600
        }
 
1601
 
 
1602
        /* Update numfuncs, numaggs to match number of unique functions found */
 
1603
        winstate->numfuncs = wfuncno + 1;
 
1604
        winstate->numaggs = aggno + 1;
 
1605
 
 
1606
        /* Set up WindowObject for aggregates, if needed */
 
1607
        if (winstate->numaggs > 0)
 
1608
        {
 
1609
                WindowObject agg_winobj = makeNode(WindowObjectData);
 
1610
 
 
1611
                agg_winobj->winstate = winstate;
 
1612
                agg_winobj->argstates = NIL;
 
1613
                agg_winobj->localmem = NULL;
 
1614
                /* make sure markptr = -1 to invalidate. It may not get used */
 
1615
                agg_winobj->markptr = -1;
 
1616
                agg_winobj->readptr = -1;
 
1617
                winstate->agg_winobj = agg_winobj;
 
1618
        }
 
1619
 
 
1620
        /* copy frame options to state node for easy access */
 
1621
        winstate->frameOptions = node->frameOptions;
 
1622
 
 
1623
        /* initialize frame bound offset expressions */
 
1624
        winstate->startOffset = ExecInitExpr((Expr *) node->startOffset,
 
1625
                                                                                 (PlanState *) winstate);
 
1626
        winstate->endOffset = ExecInitExpr((Expr *) node->endOffset,
 
1627
                                                                           (PlanState *) winstate);
 
1628
 
 
1629
        winstate->all_first = true;
 
1630
        winstate->partition_spooled = false;
 
1631
        winstate->more_partitions = false;
 
1632
 
 
1633
        return winstate;
 
1634
}
 
1635
 
 
1636
/* -----------------
 
1637
 * ExecEndWindowAgg
 
1638
 * -----------------
 
1639
 */
 
1640
void
 
1641
ExecEndWindowAgg(WindowAggState *node)
 
1642
{
 
1643
        PlanState  *outerPlan;
 
1644
 
 
1645
        release_partition(node);
 
1646
 
 
1647
        pfree(node->perfunc);
 
1648
        pfree(node->peragg);
 
1649
 
 
1650
        ExecClearTuple(node->ss.ss_ScanTupleSlot);
 
1651
        ExecClearTuple(node->first_part_slot);
 
1652
        ExecClearTuple(node->agg_row_slot);
 
1653
        ExecClearTuple(node->temp_slot_1);
 
1654
        ExecClearTuple(node->temp_slot_2);
 
1655
 
 
1656
        /*
 
1657
         * Free both the expr contexts.
 
1658
         */
 
1659
        ExecFreeExprContext(&node->ss.ps);
 
1660
        node->ss.ps.ps_ExprContext = node->tmpcontext;
 
1661
        ExecFreeExprContext(&node->ss.ps);
 
1662
 
 
1663
        MemoryContextDelete(node->partcontext);
 
1664
        MemoryContextDelete(node->aggcontext);
 
1665
 
 
1666
        outerPlan = outerPlanState(node);
 
1667
        ExecEndNode(outerPlan);
 
1668
}
 
1669
 
 
1670
/* -----------------
 
1671
 * ExecRescanWindowAgg
 
1672
 * -----------------
 
1673
 */
 
1674
void
 
1675
ExecReScanWindowAgg(WindowAggState *node)
 
1676
{
 
1677
        ExprContext *econtext = node->ss.ps.ps_ExprContext;
 
1678
 
 
1679
        node->all_done = false;
 
1680
 
 
1681
        node->ss.ps.ps_TupFromTlist = false;
 
1682
        node->all_first = true;
 
1683
 
 
1684
        /* release tuplestore et al */
 
1685
        release_partition(node);
 
1686
 
 
1687
        /* release all temp tuples, but especially first_part_slot */
 
1688
        ExecClearTuple(node->ss.ss_ScanTupleSlot);
 
1689
        ExecClearTuple(node->first_part_slot);
 
1690
        ExecClearTuple(node->agg_row_slot);
 
1691
        ExecClearTuple(node->temp_slot_1);
 
1692
        ExecClearTuple(node->temp_slot_2);
 
1693
 
 
1694
        /* Forget current wfunc values */
 
1695
        MemSet(econtext->ecxt_aggvalues, 0, sizeof(Datum) * node->numfuncs);
 
1696
        MemSet(econtext->ecxt_aggnulls, 0, sizeof(bool) * node->numfuncs);
 
1697
 
 
1698
        /*
 
1699
         * if chgParam of subnode is not null then plan will be re-scanned by
 
1700
         * first ExecProcNode.
 
1701
         */
 
1702
        if (node->ss.ps.lefttree->chgParam == NULL)
 
1703
                ExecReScan(node->ss.ps.lefttree);
 
1704
}
 
1705
 
 
1706
/*
 
1707
 * initialize_peragg
 
1708
 *
 
1709
 * Almost same as in nodeAgg.c, except we don't support DISTINCT currently.
 
1710
 */
 
1711
static WindowStatePerAggData *
 
1712
initialize_peragg(WindowAggState *winstate, WindowFunc *wfunc,
 
1713
                                  WindowStatePerAgg peraggstate)
 
1714
{
 
1715
        Oid                     inputTypes[FUNC_MAX_ARGS];
 
1716
        int                     numArguments;
 
1717
        HeapTuple       aggTuple;
 
1718
        Form_pg_aggregate aggform;
 
1719
        Oid                     aggtranstype;
 
1720
        AclResult       aclresult;
 
1721
        Oid                     transfn_oid,
 
1722
                                finalfn_oid;
 
1723
        Expr       *transfnexpr,
 
1724
                           *finalfnexpr;
 
1725
        Datum           textInitVal;
 
1726
        int                     i;
 
1727
        ListCell   *lc;
 
1728
 
 
1729
        numArguments = list_length(wfunc->args);
 
1730
 
 
1731
        i = 0;
 
1732
        foreach(lc, wfunc->args)
 
1733
        {
 
1734
                inputTypes[i++] = exprType((Node *) lfirst(lc));
 
1735
        }
 
1736
 
 
1737
        aggTuple = SearchSysCache1(AGGFNOID, ObjectIdGetDatum(wfunc->winfnoid));
 
1738
        if (!HeapTupleIsValid(aggTuple))
 
1739
                elog(ERROR, "cache lookup failed for aggregate %u",
 
1740
                         wfunc->winfnoid);
 
1741
        aggform = (Form_pg_aggregate) GETSTRUCT(aggTuple);
 
1742
 
 
1743
        /*
 
1744
         * ExecInitWindowAgg already checked permission to call aggregate function
 
1745
         * ... but we still need to check the component functions
 
1746
         */
 
1747
 
 
1748
        peraggstate->transfn_oid = transfn_oid = aggform->aggtransfn;
 
1749
        peraggstate->finalfn_oid = finalfn_oid = aggform->aggfinalfn;
 
1750
 
 
1751
        /* Check that aggregate owner has permission to call component fns */
 
1752
        {
 
1753
                HeapTuple       procTuple;
 
1754
                Oid                     aggOwner;
 
1755
 
 
1756
                procTuple = SearchSysCache1(PROCOID,
 
1757
                                                                        ObjectIdGetDatum(wfunc->winfnoid));
 
1758
                if (!HeapTupleIsValid(procTuple))
 
1759
                        elog(ERROR, "cache lookup failed for function %u",
 
1760
                                 wfunc->winfnoid);
 
1761
                aggOwner = ((Form_pg_proc) GETSTRUCT(procTuple))->proowner;
 
1762
                ReleaseSysCache(procTuple);
 
1763
 
 
1764
                aclresult = pg_proc_aclcheck(transfn_oid, aggOwner,
 
1765
                                                                         ACL_EXECUTE);
 
1766
                if (aclresult != ACLCHECK_OK)
 
1767
                        aclcheck_error(aclresult, ACL_KIND_PROC,
 
1768
                                                   get_func_name(transfn_oid));
 
1769
                if (OidIsValid(finalfn_oid))
 
1770
                {
 
1771
                        aclresult = pg_proc_aclcheck(finalfn_oid, aggOwner,
 
1772
                                                                                 ACL_EXECUTE);
 
1773
                        if (aclresult != ACLCHECK_OK)
 
1774
                                aclcheck_error(aclresult, ACL_KIND_PROC,
 
1775
                                                           get_func_name(finalfn_oid));
 
1776
                }
 
1777
        }
 
1778
 
 
1779
        /* resolve actual type of transition state, if polymorphic */
 
1780
        aggtranstype = aggform->aggtranstype;
 
1781
        if (IsPolymorphicType(aggtranstype))
 
1782
        {
 
1783
                /* have to fetch the agg's declared input types... */
 
1784
                Oid                *declaredArgTypes;
 
1785
                int                     agg_nargs;
 
1786
 
 
1787
                get_func_signature(wfunc->winfnoid,
 
1788
                                                   &declaredArgTypes, &agg_nargs);
 
1789
                Assert(agg_nargs == numArguments);
 
1790
                aggtranstype = enforce_generic_type_consistency(inputTypes,
 
1791
                                                                                                                declaredArgTypes,
 
1792
                                                                                                                agg_nargs,
 
1793
                                                                                                                aggtranstype,
 
1794
                                                                                                                false);
 
1795
                pfree(declaredArgTypes);
 
1796
        }
 
1797
 
 
1798
        /* build expression trees using actual argument & result types */
 
1799
        build_aggregate_fnexprs(inputTypes,
 
1800
                                                        numArguments,
 
1801
                                                        aggtranstype,
 
1802
                                                        wfunc->wintype,
 
1803
                                                        wfunc->inputcollid,
 
1804
                                                        transfn_oid,
 
1805
                                                        finalfn_oid,
 
1806
                                                        &transfnexpr,
 
1807
                                                        &finalfnexpr);
 
1808
 
 
1809
        fmgr_info(transfn_oid, &peraggstate->transfn);
 
1810
        fmgr_info_set_expr((Node *) transfnexpr, &peraggstate->transfn);
 
1811
 
 
1812
        if (OidIsValid(finalfn_oid))
 
1813
        {
 
1814
                fmgr_info(finalfn_oid, &peraggstate->finalfn);
 
1815
                fmgr_info_set_expr((Node *) finalfnexpr, &peraggstate->finalfn);
 
1816
        }
 
1817
 
 
1818
        get_typlenbyval(wfunc->wintype,
 
1819
                                        &peraggstate->resulttypeLen,
 
1820
                                        &peraggstate->resulttypeByVal);
 
1821
        get_typlenbyval(aggtranstype,
 
1822
                                        &peraggstate->transtypeLen,
 
1823
                                        &peraggstate->transtypeByVal);
 
1824
 
 
1825
        /*
 
1826
         * initval is potentially null, so don't try to access it as a struct
 
1827
         * field. Must do it the hard way with SysCacheGetAttr.
 
1828
         */
 
1829
        textInitVal = SysCacheGetAttr(AGGFNOID, aggTuple,
 
1830
                                                                  Anum_pg_aggregate_agginitval,
 
1831
                                                                  &peraggstate->initValueIsNull);
 
1832
 
 
1833
        if (peraggstate->initValueIsNull)
 
1834
                peraggstate->initValue = (Datum) 0;
 
1835
        else
 
1836
                peraggstate->initValue = GetAggInitVal(textInitVal,
 
1837
                                                                                           aggtranstype);
 
1838
 
 
1839
        /*
 
1840
         * If the transfn is strict and the initval is NULL, make sure input type
 
1841
         * and transtype are the same (or at least binary-compatible), so that
 
1842
         * it's OK to use the first input value as the initial transValue.  This
 
1843
         * should have been checked at agg definition time, but just in case...
 
1844
         */
 
1845
        if (peraggstate->transfn.fn_strict && peraggstate->initValueIsNull)
 
1846
        {
 
1847
                if (numArguments < 1 ||
 
1848
                        !IsBinaryCoercible(inputTypes[0], aggtranstype))
 
1849
                        ereport(ERROR,
 
1850
                                        (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
 
1851
                                         errmsg("aggregate %u needs to have compatible input type and transition type",
 
1852
                                                        wfunc->winfnoid)));
 
1853
        }
 
1854
 
 
1855
        ReleaseSysCache(aggTuple);
 
1856
 
 
1857
        return peraggstate;
 
1858
}
 
1859
 
 
1860
static Datum
 
1861
GetAggInitVal(Datum textInitVal, Oid transtype)
 
1862
{
 
1863
        Oid                     typinput,
 
1864
                                typioparam;
 
1865
        char       *strInitVal;
 
1866
        Datum           initVal;
 
1867
 
 
1868
        getTypeInputInfo(transtype, &typinput, &typioparam);
 
1869
        strInitVal = TextDatumGetCString(textInitVal);
 
1870
        initVal = OidInputFunctionCall(typinput, strInitVal,
 
1871
                                                                   typioparam, -1);
 
1872
        pfree(strInitVal);
 
1873
        return initVal;
 
1874
}
 
1875
 
 
1876
/*
 
1877
 * are_peers
 
1878
 * compare two rows to see if they are equal according to the ORDER BY clause
 
1879
 *
 
1880
 * NB: this does not consider the window frame mode.
 
1881
 */
 
1882
static bool
 
1883
are_peers(WindowAggState *winstate, TupleTableSlot *slot1,
 
1884
                  TupleTableSlot *slot2)
 
1885
{
 
1886
        WindowAgg  *node = (WindowAgg *) winstate->ss.ps.plan;
 
1887
 
 
1888
        /* If no ORDER BY, all rows are peers with each other */
 
1889
        if (node->ordNumCols == 0)
 
1890
                return true;
 
1891
 
 
1892
        return execTuplesMatch(slot1, slot2,
 
1893
                                                   node->ordNumCols, node->ordColIdx,
 
1894
                                                   winstate->ordEqfunctions,
 
1895
                                                   winstate->tmpcontext->ecxt_per_tuple_memory);
 
1896
}
 
1897
 
 
1898
/*
 
1899
 * window_gettupleslot
 
1900
 *      Fetch the pos'th tuple of the current partition into the slot,
 
1901
 *      using the winobj's read pointer
 
1902
 *
 
1903
 * Returns true if successful, false if no such row
 
1904
 */
 
1905
static bool
 
1906
window_gettupleslot(WindowObject winobj, int64 pos, TupleTableSlot *slot)
 
1907
{
 
1908
        WindowAggState *winstate = winobj->winstate;
 
1909
        MemoryContext oldcontext;
 
1910
 
 
1911
        /* Don't allow passing -1 to spool_tuples here */
 
1912
        if (pos < 0)
 
1913
                return false;
 
1914
 
 
1915
        /* If necessary, fetch the tuple into the spool */
 
1916
        spool_tuples(winstate, pos);
 
1917
 
 
1918
        if (pos >= winstate->spooled_rows)
 
1919
                return false;
 
1920
 
 
1921
        if (pos < winobj->markpos)
 
1922
                elog(ERROR, "cannot fetch row before WindowObject's mark position");
 
1923
 
 
1924
        oldcontext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_query_memory);
 
1925
 
 
1926
        tuplestore_select_read_pointer(winstate->buffer, winobj->readptr);
 
1927
 
 
1928
        /*
 
1929
         * There's no API to refetch the tuple at the current position. We have to
 
1930
         * move one tuple forward, and then one backward.  (We don't do it the
 
1931
         * other way because we might try to fetch the row before our mark, which
 
1932
         * isn't allowed.)  XXX this case could stand to be optimized.
 
1933
         */
 
1934
        if (winobj->seekpos == pos)
 
1935
        {
 
1936
                tuplestore_advance(winstate->buffer, true);
 
1937
                winobj->seekpos++;
 
1938
        }
 
1939
 
 
1940
        while (winobj->seekpos > pos)
 
1941
        {
 
1942
                if (!tuplestore_gettupleslot(winstate->buffer, false, true, slot))
 
1943
                        elog(ERROR, "unexpected end of tuplestore");
 
1944
                winobj->seekpos--;
 
1945
        }
 
1946
 
 
1947
        while (winobj->seekpos < pos)
 
1948
        {
 
1949
                if (!tuplestore_gettupleslot(winstate->buffer, true, true, slot))
 
1950
                        elog(ERROR, "unexpected end of tuplestore");
 
1951
                winobj->seekpos++;
 
1952
        }
 
1953
 
 
1954
        MemoryContextSwitchTo(oldcontext);
 
1955
 
 
1956
        return true;
 
1957
}
 
1958
 
 
1959
 
 
1960
/***********************************************************************
 
1961
 * API exposed to window functions
 
1962
 ***********************************************************************/
 
1963
 
 
1964
 
 
1965
/*
 
1966
 * WinGetPartitionLocalMemory
 
1967
 *              Get working memory that lives till end of partition processing
 
1968
 *
 
1969
 * On first call within a given partition, this allocates and zeroes the
 
1970
 * requested amount of space.  Subsequent calls just return the same chunk.
 
1971
 *
 
1972
 * Memory obtained this way is normally used to hold state that should be
 
1973
 * automatically reset for each new partition.  If a window function wants
 
1974
 * to hold state across the whole query, fcinfo->fn_extra can be used in the
 
1975
 * usual way for that.
 
1976
 */
 
1977
void *
 
1978
WinGetPartitionLocalMemory(WindowObject winobj, Size sz)
 
1979
{
 
1980
        Assert(WindowObjectIsValid(winobj));
 
1981
        if (winobj->localmem == NULL)
 
1982
                winobj->localmem =
 
1983
                        MemoryContextAllocZero(winobj->winstate->partcontext, sz);
 
1984
        return winobj->localmem;
 
1985
}
 
1986
 
 
1987
/*
 
1988
 * WinGetCurrentPosition
 
1989
 *              Return the current row's position (counting from 0) within the current
 
1990
 *              partition.
 
1991
 */
 
1992
int64
 
1993
WinGetCurrentPosition(WindowObject winobj)
 
1994
{
 
1995
        Assert(WindowObjectIsValid(winobj));
 
1996
        return winobj->winstate->currentpos;
 
1997
}
 
1998
 
 
1999
/*
 
2000
 * WinGetPartitionRowCount
 
2001
 *              Return total number of rows contained in the current partition.
 
2002
 *
 
2003
 * Note: this is a relatively expensive operation because it forces the
 
2004
 * whole partition to be "spooled" into the tuplestore at once.  Once
 
2005
 * executed, however, additional calls within the same partition are cheap.
 
2006
 */
 
2007
int64
 
2008
WinGetPartitionRowCount(WindowObject winobj)
 
2009
{
 
2010
        Assert(WindowObjectIsValid(winobj));
 
2011
        spool_tuples(winobj->winstate, -1);
 
2012
        return winobj->winstate->spooled_rows;
 
2013
}
 
2014
 
 
2015
/*
 
2016
 * WinSetMarkPosition
 
2017
 *              Set the "mark" position for the window object, which is the oldest row
 
2018
 *              number (counting from 0) it is allowed to fetch during all subsequent
 
2019
 *              operations within the current partition.
 
2020
 *
 
2021
 * Window functions do not have to call this, but are encouraged to move the
 
2022
 * mark forward when possible to keep the tuplestore size down and prevent
 
2023
 * having to spill rows to disk.
 
2024
 */
 
2025
void
 
2026
WinSetMarkPosition(WindowObject winobj, int64 markpos)
 
2027
{
 
2028
        WindowAggState *winstate;
 
2029
 
 
2030
        Assert(WindowObjectIsValid(winobj));
 
2031
        winstate = winobj->winstate;
 
2032
 
 
2033
        if (markpos < winobj->markpos)
 
2034
                elog(ERROR, "cannot move WindowObject's mark position backward");
 
2035
        tuplestore_select_read_pointer(winstate->buffer, winobj->markptr);
 
2036
        while (markpos > winobj->markpos)
 
2037
        {
 
2038
                tuplestore_advance(winstate->buffer, true);
 
2039
                winobj->markpos++;
 
2040
        }
 
2041
        tuplestore_select_read_pointer(winstate->buffer, winobj->readptr);
 
2042
        while (markpos > winobj->seekpos)
 
2043
        {
 
2044
                tuplestore_advance(winstate->buffer, true);
 
2045
                winobj->seekpos++;
 
2046
        }
 
2047
}
 
2048
 
 
2049
/*
 
2050
 * WinRowsArePeers
 
2051
 *              Compare two rows (specified by absolute position in window) to see
 
2052
 *              if they are equal according to the ORDER BY clause.
 
2053
 *
 
2054
 * NB: this does not consider the window frame mode.
 
2055
 */
 
2056
bool
 
2057
WinRowsArePeers(WindowObject winobj, int64 pos1, int64 pos2)
 
2058
{
 
2059
        WindowAggState *winstate;
 
2060
        WindowAgg  *node;
 
2061
        TupleTableSlot *slot1;
 
2062
        TupleTableSlot *slot2;
 
2063
        bool            res;
 
2064
 
 
2065
        Assert(WindowObjectIsValid(winobj));
 
2066
        winstate = winobj->winstate;
 
2067
        node = (WindowAgg *) winstate->ss.ps.plan;
 
2068
 
 
2069
        /* If no ORDER BY, all rows are peers; don't bother to fetch them */
 
2070
        if (node->ordNumCols == 0)
 
2071
                return true;
 
2072
 
 
2073
        slot1 = winstate->temp_slot_1;
 
2074
        slot2 = winstate->temp_slot_2;
 
2075
 
 
2076
        if (!window_gettupleslot(winobj, pos1, slot1))
 
2077
                elog(ERROR, "specified position is out of window: " INT64_FORMAT,
 
2078
                         pos1);
 
2079
        if (!window_gettupleslot(winobj, pos2, slot2))
 
2080
                elog(ERROR, "specified position is out of window: " INT64_FORMAT,
 
2081
                         pos2);
 
2082
 
 
2083
        res = are_peers(winstate, slot1, slot2);
 
2084
 
 
2085
        ExecClearTuple(slot1);
 
2086
        ExecClearTuple(slot2);
 
2087
 
 
2088
        return res;
 
2089
}
 
2090
 
 
2091
/*
 
2092
 * WinGetFuncArgInPartition
 
2093
 *              Evaluate a window function's argument expression on a specified
 
2094
 *              row of the partition.  The row is identified in lseek(2) style,
 
2095
 *              i.e. relative to the current, first, or last row.
 
2096
 *
 
2097
 * argno: argument number to evaluate (counted from 0)
 
2098
 * relpos: signed rowcount offset from the seek position
 
2099
 * seektype: WINDOW_SEEK_CURRENT, WINDOW_SEEK_HEAD, or WINDOW_SEEK_TAIL
 
2100
 * set_mark: If the row is found and set_mark is true, the mark is moved to
 
2101
 *              the row as a side-effect.
 
2102
 * isnull: output argument, receives isnull status of result
 
2103
 * isout: output argument, set to indicate whether target row position
 
2104
 *              is out of partition (can pass NULL if caller doesn't care about this)
 
2105
 *
 
2106
 * Specifying a nonexistent row is not an error, it just causes a null result
 
2107
 * (plus setting *isout true, if isout isn't NULL).
 
2108
 */
 
2109
Datum
 
2110
WinGetFuncArgInPartition(WindowObject winobj, int argno,
 
2111
                                                 int relpos, int seektype, bool set_mark,
 
2112
                                                 bool *isnull, bool *isout)
 
2113
{
 
2114
        WindowAggState *winstate;
 
2115
        ExprContext *econtext;
 
2116
        TupleTableSlot *slot;
 
2117
        bool            gottuple;
 
2118
        int64           abs_pos;
 
2119
 
 
2120
        Assert(WindowObjectIsValid(winobj));
 
2121
        winstate = winobj->winstate;
 
2122
        econtext = winstate->ss.ps.ps_ExprContext;
 
2123
        slot = winstate->temp_slot_1;
 
2124
 
 
2125
        switch (seektype)
 
2126
        {
 
2127
                case WINDOW_SEEK_CURRENT:
 
2128
                        abs_pos = winstate->currentpos + relpos;
 
2129
                        break;
 
2130
                case WINDOW_SEEK_HEAD:
 
2131
                        abs_pos = relpos;
 
2132
                        break;
 
2133
                case WINDOW_SEEK_TAIL:
 
2134
                        spool_tuples(winstate, -1);
 
2135
                        abs_pos = winstate->spooled_rows - 1 + relpos;
 
2136
                        break;
 
2137
                default:
 
2138
                        elog(ERROR, "unrecognized window seek type: %d", seektype);
 
2139
                        abs_pos = 0;            /* keep compiler quiet */
 
2140
                        break;
 
2141
        }
 
2142
 
 
2143
        gottuple = window_gettupleslot(winobj, abs_pos, slot);
 
2144
 
 
2145
        if (!gottuple)
 
2146
        {
 
2147
                if (isout)
 
2148
                        *isout = true;
 
2149
                *isnull = true;
 
2150
                return (Datum) 0;
 
2151
        }
 
2152
        else
 
2153
        {
 
2154
                if (isout)
 
2155
                        *isout = false;
 
2156
                if (set_mark)
 
2157
                {
 
2158
                        int                     frameOptions = winstate->frameOptions;
 
2159
                        int64           mark_pos = abs_pos;
 
2160
 
 
2161
                        /*
 
2162
                         * In RANGE mode with a moving frame head, we must not let the
 
2163
                         * mark advance past frameheadpos, since that row has to be
 
2164
                         * fetchable during future update_frameheadpos calls.
 
2165
                         *
 
2166
                         * XXX it is very ugly to pollute window functions' marks with
 
2167
                         * this consideration; it could for instance mask a logic bug that
 
2168
                         * lets a window function fetch rows before what it had claimed
 
2169
                         * was its mark.  Perhaps use a separate mark for frame head
 
2170
                         * probes?
 
2171
                         */
 
2172
                        if ((frameOptions & FRAMEOPTION_RANGE) &&
 
2173
                                !(frameOptions & FRAMEOPTION_START_UNBOUNDED_PRECEDING))
 
2174
                        {
 
2175
                                update_frameheadpos(winobj, winstate->temp_slot_2);
 
2176
                                if (mark_pos > winstate->frameheadpos)
 
2177
                                        mark_pos = winstate->frameheadpos;
 
2178
                        }
 
2179
                        WinSetMarkPosition(winobj, mark_pos);
 
2180
                }
 
2181
                econtext->ecxt_outertuple = slot;
 
2182
                return ExecEvalExpr((ExprState *) list_nth(winobj->argstates, argno),
 
2183
                                                        econtext, isnull, NULL);
 
2184
        }
 
2185
}
 
2186
 
 
2187
/*
 
2188
 * WinGetFuncArgInFrame
 
2189
 *              Evaluate a window function's argument expression on a specified
 
2190
 *              row of the window frame.  The row is identified in lseek(2) style,
 
2191
 *              i.e. relative to the current, first, or last row.
 
2192
 *
 
2193
 * argno: argument number to evaluate (counted from 0)
 
2194
 * relpos: signed rowcount offset from the seek position
 
2195
 * seektype: WINDOW_SEEK_CURRENT, WINDOW_SEEK_HEAD, or WINDOW_SEEK_TAIL
 
2196
 * set_mark: If the row is found and set_mark is true, the mark is moved to
 
2197
 *              the row as a side-effect.
 
2198
 * isnull: output argument, receives isnull status of result
 
2199
 * isout: output argument, set to indicate whether target row position
 
2200
 *              is out of frame (can pass NULL if caller doesn't care about this)
 
2201
 *
 
2202
 * Specifying a nonexistent row is not an error, it just causes a null result
 
2203
 * (plus setting *isout true, if isout isn't NULL).
 
2204
 */
 
2205
Datum
 
2206
WinGetFuncArgInFrame(WindowObject winobj, int argno,
 
2207
                                         int relpos, int seektype, bool set_mark,
 
2208
                                         bool *isnull, bool *isout)
 
2209
{
 
2210
        WindowAggState *winstate;
 
2211
        ExprContext *econtext;
 
2212
        TupleTableSlot *slot;
 
2213
        bool            gottuple;
 
2214
        int64           abs_pos;
 
2215
 
 
2216
        Assert(WindowObjectIsValid(winobj));
 
2217
        winstate = winobj->winstate;
 
2218
        econtext = winstate->ss.ps.ps_ExprContext;
 
2219
        slot = winstate->temp_slot_1;
 
2220
 
 
2221
        switch (seektype)
 
2222
        {
 
2223
                case WINDOW_SEEK_CURRENT:
 
2224
                        abs_pos = winstate->currentpos + relpos;
 
2225
                        break;
 
2226
                case WINDOW_SEEK_HEAD:
 
2227
                        update_frameheadpos(winobj, slot);
 
2228
                        abs_pos = winstate->frameheadpos + relpos;
 
2229
                        break;
 
2230
                case WINDOW_SEEK_TAIL:
 
2231
                        update_frametailpos(winobj, slot);
 
2232
                        abs_pos = winstate->frametailpos + relpos;
 
2233
                        break;
 
2234
                default:
 
2235
                        elog(ERROR, "unrecognized window seek type: %d", seektype);
 
2236
                        abs_pos = 0;            /* keep compiler quiet */
 
2237
                        break;
 
2238
        }
 
2239
 
 
2240
        gottuple = window_gettupleslot(winobj, abs_pos, slot);
 
2241
        if (gottuple)
 
2242
                gottuple = row_is_in_frame(winstate, abs_pos, slot);
 
2243
 
 
2244
        if (!gottuple)
 
2245
        {
 
2246
                if (isout)
 
2247
                        *isout = true;
 
2248
                *isnull = true;
 
2249
                return (Datum) 0;
 
2250
        }
 
2251
        else
 
2252
        {
 
2253
                if (isout)
 
2254
                        *isout = false;
 
2255
                if (set_mark)
 
2256
                {
 
2257
                        int                     frameOptions = winstate->frameOptions;
 
2258
                        int64           mark_pos = abs_pos;
 
2259
 
 
2260
                        /*
 
2261
                         * In RANGE mode with a moving frame head, we must not let the
 
2262
                         * mark advance past frameheadpos, since that row has to be
 
2263
                         * fetchable during future update_frameheadpos calls.
 
2264
                         *
 
2265
                         * XXX it is very ugly to pollute window functions' marks with
 
2266
                         * this consideration; it could for instance mask a logic bug that
 
2267
                         * lets a window function fetch rows before what it had claimed
 
2268
                         * was its mark.  Perhaps use a separate mark for frame head
 
2269
                         * probes?
 
2270
                         */
 
2271
                        if ((frameOptions & FRAMEOPTION_RANGE) &&
 
2272
                                !(frameOptions & FRAMEOPTION_START_UNBOUNDED_PRECEDING))
 
2273
                        {
 
2274
                                update_frameheadpos(winobj, winstate->temp_slot_2);
 
2275
                                if (mark_pos > winstate->frameheadpos)
 
2276
                                        mark_pos = winstate->frameheadpos;
 
2277
                        }
 
2278
                        WinSetMarkPosition(winobj, mark_pos);
 
2279
                }
 
2280
                econtext->ecxt_outertuple = slot;
 
2281
                return ExecEvalExpr((ExprState *) list_nth(winobj->argstates, argno),
 
2282
                                                        econtext, isnull, NULL);
 
2283
        }
 
2284
}
 
2285
 
 
2286
/*
 
2287
 * WinGetFuncArgCurrent
 
2288
 *              Evaluate a window function's argument expression on the current row.
 
2289
 *
 
2290
 * argno: argument number to evaluate (counted from 0)
 
2291
 * isnull: output argument, receives isnull status of result
 
2292
 *
 
2293
 * Note: this isn't quite equivalent to WinGetFuncArgInPartition or
 
2294
 * WinGetFuncArgInFrame targeting the current row, because it will succeed
 
2295
 * even if the WindowObject's mark has been set beyond the current row.
 
2296
 * This should generally be used for "ordinary" arguments of a window
 
2297
 * function, such as the offset argument of lead() or lag().
 
2298
 */
 
2299
Datum
 
2300
WinGetFuncArgCurrent(WindowObject winobj, int argno, bool *isnull)
 
2301
{
 
2302
        WindowAggState *winstate;
 
2303
        ExprContext *econtext;
 
2304
 
 
2305
        Assert(WindowObjectIsValid(winobj));
 
2306
        winstate = winobj->winstate;
 
2307
 
 
2308
        econtext = winstate->ss.ps.ps_ExprContext;
 
2309
 
 
2310
        econtext->ecxt_outertuple = winstate->ss.ss_ScanTupleSlot;
 
2311
        return ExecEvalExpr((ExprState *) list_nth(winobj->argstates, argno),
 
2312
                                                econtext, isnull, NULL);
 
2313
}