1
/*-------------------------------------------------------------------------
4
* routines to handle WindowAgg nodes.
6
* A WindowAgg node evaluates "window functions" across suitable partitions
7
* of the input tuple set. Any one WindowAgg works for just a single window
8
* specification, though it can evaluate multiple window functions sharing
9
* identical window specifications. The input tuples are required to be
10
* delivered in sorted order, with the PARTITION BY columns (if any) as
11
* major sort keys and the ORDER BY columns (if any) as minor sort keys.
12
* (The planner generates a stack of WindowAggs with intervening Sort nodes
13
* as needed, if a query involves more than one window specification.)
15
* Since window functions can require access to any or all of the rows in
16
* the current partition, we accumulate rows of the partition into a
17
* tuplestore. The window functions are called using the WindowObject API
18
* so that they can access those rows as needed.
20
* We also support using plain aggregate functions as window functions.
21
* For these, the regular Agg-node environment is emulated for each partition.
22
* As required by the SQL spec, the output represents the value of the
23
* aggregate function over all rows in the current row's window frame.
26
* Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
27
* Portions Copyright (c) 1994, Regents of the University of California
30
* src/backend/executor/nodeWindowAgg.c
32
*-------------------------------------------------------------------------
36
#include "catalog/pg_aggregate.h"
37
#include "catalog/pg_proc.h"
38
#include "catalog/pg_type.h"
39
#include "executor/executor.h"
40
#include "executor/nodeWindowAgg.h"
41
#include "miscadmin.h"
42
#include "nodes/nodeFuncs.h"
43
#include "optimizer/clauses.h"
44
#include "parser/parse_agg.h"
45
#include "parser/parse_coerce.h"
46
#include "utils/acl.h"
47
#include "utils/builtins.h"
48
#include "utils/datum.h"
49
#include "utils/lsyscache.h"
50
#include "utils/memutils.h"
51
#include "utils/syscache.h"
52
#include "windowapi.h"
55
* All the window function APIs are called with this object, which is passed
56
* to window functions as fcinfo->context.
58
typedef struct WindowObjectData
61
WindowAggState *winstate; /* parent WindowAggState */
62
List *argstates; /* ExprState trees for fn's arguments */
63
void *localmem; /* WinGetPartitionLocalMemory's chunk */
64
int markptr; /* tuplestore mark pointer for this fn */
65
int readptr; /* tuplestore read pointer for this fn */
66
int64 markpos; /* row that markptr is positioned on */
67
int64 seekpos; /* row that readptr is positioned on */
71
* We have one WindowStatePerFunc struct for each window function and
72
* window aggregate handled by this node.
74
typedef struct WindowStatePerFuncData
76
/* Links to WindowFunc expr and state nodes this working state is for */
77
WindowFuncExprState *wfuncstate;
80
int numArguments; /* number of arguments */
82
FmgrInfo flinfo; /* fmgr lookup data for window function */
84
Oid winCollation; /* collation derived for window function */
87
* We need the len and byval info for the result of each function in order
88
* to know how to copy/delete values.
93
bool plain_agg; /* is it just a plain aggregate function? */
94
int aggno; /* if so, index of its PerAggData */
96
WindowObject winobj; /* object used in window function API */
97
} WindowStatePerFuncData;
100
* For plain aggregate window functions, we also have one of these.
102
typedef struct WindowStatePerAggData
104
/* Oids of transfer functions */
106
Oid finalfn_oid; /* may be InvalidOid */
109
* fmgr lookup data for transfer functions --- only valid when
110
* corresponding oid is not InvalidOid. Note in particular that fn_strict
111
* flags are kept here.
117
* initial value from pg_aggregate entry
120
bool initValueIsNull;
123
* cached value for current frame boundaries
126
bool resultValueIsNull;
129
* We need the len and byval info for the agg's input, result, and
130
* transition data types in order to know how to copy/delete values.
139
int wfuncno; /* index of associated PerFuncData */
141
/* Current transition value */
142
Datum transValue; /* current transition value */
143
bool transValueIsNull;
145
bool noTransValue; /* true if transValue not set yet */
146
} WindowStatePerAggData;
148
static void initialize_windowaggregate(WindowAggState *winstate,
149
WindowStatePerFunc perfuncstate,
150
WindowStatePerAgg peraggstate);
151
static void advance_windowaggregate(WindowAggState *winstate,
152
WindowStatePerFunc perfuncstate,
153
WindowStatePerAgg peraggstate);
154
static void finalize_windowaggregate(WindowAggState *winstate,
155
WindowStatePerFunc perfuncstate,
156
WindowStatePerAgg peraggstate,
157
Datum *result, bool *isnull);
159
static void eval_windowaggregates(WindowAggState *winstate);
160
static void eval_windowfunction(WindowAggState *winstate,
161
WindowStatePerFunc perfuncstate,
162
Datum *result, bool *isnull);
164
static void begin_partition(WindowAggState *winstate);
165
static void spool_tuples(WindowAggState *winstate, int64 pos);
166
static void release_partition(WindowAggState *winstate);
168
static bool row_is_in_frame(WindowAggState *winstate, int64 pos,
169
TupleTableSlot *slot);
170
static void update_frameheadpos(WindowObject winobj, TupleTableSlot *slot);
171
static void update_frametailpos(WindowObject winobj, TupleTableSlot *slot);
173
static WindowStatePerAggData *initialize_peragg(WindowAggState *winstate,
175
WindowStatePerAgg peraggstate);
176
static Datum GetAggInitVal(Datum textInitVal, Oid transtype);
178
static bool are_peers(WindowAggState *winstate, TupleTableSlot *slot1,
179
TupleTableSlot *slot2);
180
static bool window_gettupleslot(WindowObject winobj, int64 pos,
181
TupleTableSlot *slot);
185
* initialize_windowaggregate
186
* parallel to initialize_aggregates in nodeAgg.c
189
initialize_windowaggregate(WindowAggState *winstate,
190
WindowStatePerFunc perfuncstate,
191
WindowStatePerAgg peraggstate)
193
MemoryContext oldContext;
195
if (peraggstate->initValueIsNull)
196
peraggstate->transValue = peraggstate->initValue;
199
oldContext = MemoryContextSwitchTo(winstate->aggcontext);
200
peraggstate->transValue = datumCopy(peraggstate->initValue,
201
peraggstate->transtypeByVal,
202
peraggstate->transtypeLen);
203
MemoryContextSwitchTo(oldContext);
205
peraggstate->transValueIsNull = peraggstate->initValueIsNull;
206
peraggstate->noTransValue = peraggstate->initValueIsNull;
207
peraggstate->resultValueIsNull = true;
211
* advance_windowaggregate
212
* parallel to advance_aggregates in nodeAgg.c
215
advance_windowaggregate(WindowAggState *winstate,
216
WindowStatePerFunc perfuncstate,
217
WindowStatePerAgg peraggstate)
219
WindowFuncExprState *wfuncstate = perfuncstate->wfuncstate;
220
int numArguments = perfuncstate->numArguments;
221
FunctionCallInfoData fcinfodata;
222
FunctionCallInfo fcinfo = &fcinfodata;
226
MemoryContext oldContext;
227
ExprContext *econtext = winstate->tmpcontext;
229
oldContext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
231
/* We start from 1, since the 0th arg will be the transition value */
233
foreach(arg, wfuncstate->args)
235
ExprState *argstate = (ExprState *) lfirst(arg);
237
fcinfo->arg[i] = ExecEvalExpr(argstate, econtext,
238
&fcinfo->argnull[i], NULL);
242
if (peraggstate->transfn.fn_strict)
245
* For a strict transfn, nothing happens when there's a NULL input; we
246
* just keep the prior transValue.
248
for (i = 1; i <= numArguments; i++)
250
if (fcinfo->argnull[i])
252
MemoryContextSwitchTo(oldContext);
256
if (peraggstate->noTransValue)
259
* transValue has not been initialized. This is the first non-NULL
260
* input value. We use it as the initial value for transValue. (We
261
* already checked that the agg's input type is binary-compatible
262
* with its transtype, so straight copy here is OK.)
264
* We must copy the datum into aggcontext if it is pass-by-ref. We
265
* do not need to pfree the old transValue, since it's NULL.
267
MemoryContextSwitchTo(winstate->aggcontext);
268
peraggstate->transValue = datumCopy(fcinfo->arg[1],
269
peraggstate->transtypeByVal,
270
peraggstate->transtypeLen);
271
peraggstate->transValueIsNull = false;
272
peraggstate->noTransValue = false;
273
MemoryContextSwitchTo(oldContext);
276
if (peraggstate->transValueIsNull)
279
* Don't call a strict function with NULL inputs. Note it is
280
* possible to get here despite the above tests, if the transfn is
281
* strict *and* returned a NULL on a prior cycle. If that happens
282
* we will propagate the NULL all the way to the end.
284
MemoryContextSwitchTo(oldContext);
290
* OK to call the transition function
292
InitFunctionCallInfoData(*fcinfo, &(peraggstate->transfn),
294
perfuncstate->winCollation,
295
(void *) winstate, NULL);
296
fcinfo->arg[0] = peraggstate->transValue;
297
fcinfo->argnull[0] = peraggstate->transValueIsNull;
298
newVal = FunctionCallInvoke(fcinfo);
301
* If pass-by-ref datatype, must copy the new value into aggcontext and
302
* pfree the prior transValue. But if transfn returned a pointer to its
303
* first input, we don't need to do anything.
305
if (!peraggstate->transtypeByVal &&
306
DatumGetPointer(newVal) != DatumGetPointer(peraggstate->transValue))
310
MemoryContextSwitchTo(winstate->aggcontext);
311
newVal = datumCopy(newVal,
312
peraggstate->transtypeByVal,
313
peraggstate->transtypeLen);
315
if (!peraggstate->transValueIsNull)
316
pfree(DatumGetPointer(peraggstate->transValue));
319
MemoryContextSwitchTo(oldContext);
320
peraggstate->transValue = newVal;
321
peraggstate->transValueIsNull = fcinfo->isnull;
325
* finalize_windowaggregate
326
* parallel to finalize_aggregate in nodeAgg.c
329
finalize_windowaggregate(WindowAggState *winstate,
330
WindowStatePerFunc perfuncstate,
331
WindowStatePerAgg peraggstate,
332
Datum *result, bool *isnull)
334
MemoryContext oldContext;
336
oldContext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_tuple_memory);
339
* Apply the agg's finalfn if one is provided, else return transValue.
341
if (OidIsValid(peraggstate->finalfn_oid))
343
FunctionCallInfoData fcinfo;
345
InitFunctionCallInfoData(fcinfo, &(peraggstate->finalfn), 1,
346
perfuncstate->winCollation,
347
(void *) winstate, NULL);
348
fcinfo.arg[0] = peraggstate->transValue;
349
fcinfo.argnull[0] = peraggstate->transValueIsNull;
350
if (fcinfo.flinfo->fn_strict && peraggstate->transValueIsNull)
352
/* don't call a strict function with NULL inputs */
358
*result = FunctionCallInvoke(&fcinfo);
359
*isnull = fcinfo.isnull;
364
*result = peraggstate->transValue;
365
*isnull = peraggstate->transValueIsNull;
369
* If result is pass-by-ref, make sure it is in the right context.
371
if (!peraggstate->resulttypeByVal && !*isnull &&
372
!MemoryContextContains(CurrentMemoryContext,
373
DatumGetPointer(*result)))
374
*result = datumCopy(*result,
375
peraggstate->resulttypeByVal,
376
peraggstate->resulttypeLen);
377
MemoryContextSwitchTo(oldContext);
381
* eval_windowaggregates
382
* evaluate plain aggregates being used as window functions
384
* Much of this is duplicated from nodeAgg.c. But NOTE that we expect to be
385
* able to call aggregate final functions repeatedly after aggregating more
386
* data onto the same transition value. This is not a behavior required by
390
eval_windowaggregates(WindowAggState *winstate)
392
WindowStatePerAgg peraggstate;
396
MemoryContext oldContext;
397
ExprContext *econtext;
398
WindowObject agg_winobj;
399
TupleTableSlot *agg_row_slot;
401
numaggs = winstate->numaggs;
403
return; /* nothing to do */
405
/* final output execution is in ps_ExprContext */
406
econtext = winstate->ss.ps.ps_ExprContext;
407
agg_winobj = winstate->agg_winobj;
408
agg_row_slot = winstate->agg_row_slot;
411
* Currently, we support only a subset of the SQL-standard window framing
414
* If the frame start is UNBOUNDED_PRECEDING, the window frame consists of
415
* a contiguous group of rows extending forward from the start of the
416
* partition, and rows only enter the frame, never exit it, as the current
417
* row advances forward. This makes it possible to use an incremental
418
* strategy for evaluating aggregates: we run the transition function for
419
* each row added to the frame, and run the final function whenever we
420
* need the current aggregate value. This is considerably more efficient
421
* than the naive approach of re-running the entire aggregate calculation
422
* for each current row. It does assume that the final function doesn't
423
* damage the running transition value, but we have the same assumption in
424
* nodeAgg.c too (when it rescans an existing hash table).
426
* For other frame start rules, we discard the aggregate state and re-run
427
* the aggregates whenever the frame head row moves. We can still
428
* optimize as above whenever successive rows share the same frame head.
430
* In many common cases, multiple rows share the same frame and hence the
431
* same aggregate value. (In particular, if there's no ORDER BY in a RANGE
432
* window, then all rows are peers and so they all have window frame equal
433
* to the whole partition.) We optimize such cases by calculating the
434
* aggregate value once when we reach the first row of a peer group, and
435
* then returning the saved value for all subsequent rows.
437
* 'aggregatedupto' keeps track of the first row that has not yet been
438
* accumulated into the aggregate transition values. Whenever we start a
439
* new peer group, we accumulate forward to the end of the peer group.
441
* TODO: Rerunning aggregates from the frame start can be pretty slow. For
442
* some aggregates like SUM and COUNT we could avoid that by implementing
443
* a "negative transition function" that would be called for each row as
444
* it exits the frame. We'd have to think about avoiding recalculation of
445
* volatile arguments of aggregate functions, too.
449
* First, update the frame head position.
451
update_frameheadpos(agg_winobj, winstate->temp_slot_1);
454
* Initialize aggregates on first call for partition, or if the frame head
455
* position moved since last time.
457
if (winstate->currentpos == 0 ||
458
winstate->frameheadpos != winstate->aggregatedbase)
461
* Discard transient aggregate values
463
MemoryContextResetAndDeleteChildren(winstate->aggcontext);
465
for (i = 0; i < numaggs; i++)
467
peraggstate = &winstate->peragg[i];
468
wfuncno = peraggstate->wfuncno;
469
initialize_windowaggregate(winstate,
470
&winstate->perfunc[wfuncno],
475
* If we created a mark pointer for aggregates, keep it pushed up to
476
* frame head, so that tuplestore can discard unnecessary rows.
478
if (agg_winobj->markptr >= 0)
479
WinSetMarkPosition(agg_winobj, winstate->frameheadpos);
482
* Initialize for loop below
484
ExecClearTuple(agg_row_slot);
485
winstate->aggregatedbase = winstate->frameheadpos;
486
winstate->aggregatedupto = winstate->frameheadpos;
490
* In UNBOUNDED_FOLLOWING mode, we don't have to recalculate aggregates
491
* except when the frame head moves. In END_CURRENT_ROW mode, we only
492
* have to recalculate when the frame head moves or currentpos has
493
* advanced past the place we'd aggregated up to. Check for these cases
494
* and if so, reuse the saved result values.
496
if ((winstate->frameOptions & (FRAMEOPTION_END_UNBOUNDED_FOLLOWING |
497
FRAMEOPTION_END_CURRENT_ROW)) &&
498
winstate->aggregatedbase <= winstate->currentpos &&
499
winstate->aggregatedupto > winstate->currentpos)
501
for (i = 0; i < numaggs; i++)
503
peraggstate = &winstate->peragg[i];
504
wfuncno = peraggstate->wfuncno;
505
econtext->ecxt_aggvalues[wfuncno] = peraggstate->resultValue;
506
econtext->ecxt_aggnulls[wfuncno] = peraggstate->resultValueIsNull;
512
* Advance until we reach a row not in frame (or end of partition).
514
* Note the loop invariant: agg_row_slot is either empty or holds the row
515
* at position aggregatedupto. We advance aggregatedupto after processing
520
/* Fetch next row if we didn't already */
521
if (TupIsNull(agg_row_slot))
523
if (!window_gettupleslot(agg_winobj, winstate->aggregatedupto,
525
break; /* must be end of partition */
528
/* Exit loop (for now) if not in frame */
529
if (!row_is_in_frame(winstate, winstate->aggregatedupto, agg_row_slot))
532
/* Set tuple context for evaluation of aggregate arguments */
533
winstate->tmpcontext->ecxt_outertuple = agg_row_slot;
535
/* Accumulate row into the aggregates */
536
for (i = 0; i < numaggs; i++)
538
peraggstate = &winstate->peragg[i];
539
wfuncno = peraggstate->wfuncno;
540
advance_windowaggregate(winstate,
541
&winstate->perfunc[wfuncno],
545
/* Reset per-input-tuple context after each tuple */
546
ResetExprContext(winstate->tmpcontext);
548
/* And advance the aggregated-row state */
549
winstate->aggregatedupto++;
550
ExecClearTuple(agg_row_slot);
554
* finalize aggregates and fill result/isnull fields.
556
for (i = 0; i < numaggs; i++)
561
peraggstate = &winstate->peragg[i];
562
wfuncno = peraggstate->wfuncno;
563
result = &econtext->ecxt_aggvalues[wfuncno];
564
isnull = &econtext->ecxt_aggnulls[wfuncno];
565
finalize_windowaggregate(winstate,
566
&winstate->perfunc[wfuncno],
571
* save the result in case next row shares the same frame.
573
* XXX in some framing modes, eg ROWS/END_CURRENT_ROW, we can know in
574
* advance that the next row can't possibly share the same frame. Is
575
* it worth detecting that and skipping this code?
577
if (!peraggstate->resulttypeByVal)
580
* clear old resultValue in order not to leak memory. (Note: the
581
* new result can't possibly be the same datum as old resultValue,
582
* because we never passed it to the trans function.)
584
if (!peraggstate->resultValueIsNull)
585
pfree(DatumGetPointer(peraggstate->resultValue));
588
* If pass-by-ref, copy it into our aggregate context.
592
oldContext = MemoryContextSwitchTo(winstate->aggcontext);
593
peraggstate->resultValue =
595
peraggstate->resulttypeByVal,
596
peraggstate->resulttypeLen);
597
MemoryContextSwitchTo(oldContext);
602
peraggstate->resultValue = *result;
604
peraggstate->resultValueIsNull = *isnull;
609
* eval_windowfunction
611
* Arguments of window functions are not evaluated here, because a window
612
* function can need random access to arbitrary rows in the partition.
613
* The window function uses the special WinGetFuncArgInPartition and
614
* WinGetFuncArgInFrame functions to evaluate the arguments for the rows
618
eval_windowfunction(WindowAggState *winstate, WindowStatePerFunc perfuncstate,
619
Datum *result, bool *isnull)
621
FunctionCallInfoData fcinfo;
622
MemoryContext oldContext;
624
oldContext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_tuple_memory);
627
* We don't pass any normal arguments to a window function, but we do pass
628
* it the number of arguments, in order to permit window function
629
* implementations to support varying numbers of arguments. The real info
630
* goes through the WindowObject, which is passed via fcinfo->context.
632
InitFunctionCallInfoData(fcinfo, &(perfuncstate->flinfo),
633
perfuncstate->numArguments,
634
perfuncstate->winCollation,
635
(void *) perfuncstate->winobj, NULL);
636
/* Just in case, make all the regular argument slots be null */
637
memset(fcinfo.argnull, true, perfuncstate->numArguments);
639
*result = FunctionCallInvoke(&fcinfo);
640
*isnull = fcinfo.isnull;
643
* Make sure pass-by-ref data is allocated in the appropriate context. (We
644
* need this in case the function returns a pointer into some short-lived
645
* tuple, as is entirely possible.)
647
if (!perfuncstate->resulttypeByVal && !fcinfo.isnull &&
648
!MemoryContextContains(CurrentMemoryContext,
649
DatumGetPointer(*result)))
650
*result = datumCopy(*result,
651
perfuncstate->resulttypeByVal,
652
perfuncstate->resulttypeLen);
654
MemoryContextSwitchTo(oldContext);
659
* Start buffering rows of the next partition.
662
begin_partition(WindowAggState *winstate)
664
PlanState *outerPlan = outerPlanState(winstate);
665
int numfuncs = winstate->numfuncs;
668
winstate->partition_spooled = false;
669
winstate->framehead_valid = false;
670
winstate->frametail_valid = false;
671
winstate->spooled_rows = 0;
672
winstate->currentpos = 0;
673
winstate->frameheadpos = 0;
674
winstate->frametailpos = -1;
675
ExecClearTuple(winstate->agg_row_slot);
678
* If this is the very first partition, we need to fetch the first input
679
* row to store in first_part_slot.
681
if (TupIsNull(winstate->first_part_slot))
683
TupleTableSlot *outerslot = ExecProcNode(outerPlan);
685
if (!TupIsNull(outerslot))
686
ExecCopySlot(winstate->first_part_slot, outerslot);
689
/* outer plan is empty, so we have nothing to do */
690
winstate->partition_spooled = true;
691
winstate->more_partitions = false;
696
/* Create new tuplestore for this partition */
697
winstate->buffer = tuplestore_begin_heap(false, false, work_mem);
700
* Set up read pointers for the tuplestore. The current pointer doesn't
701
* need BACKWARD capability, but the per-window-function read pointers do,
702
* and the aggregate pointer does if frame start is movable.
704
winstate->current_ptr = 0; /* read pointer 0 is pre-allocated */
706
/* reset default REWIND capability bit for current ptr */
707
tuplestore_set_eflags(winstate->buffer, 0);
709
/* create read pointers for aggregates, if needed */
710
if (winstate->numaggs > 0)
712
WindowObject agg_winobj = winstate->agg_winobj;
713
int readptr_flags = 0;
715
/* If the frame head is potentially movable ... */
716
if (!(winstate->frameOptions & FRAMEOPTION_START_UNBOUNDED_PRECEDING))
718
/* ... create a mark pointer to track the frame head */
719
agg_winobj->markptr = tuplestore_alloc_read_pointer(winstate->buffer, 0);
720
/* and the read pointer will need BACKWARD capability */
721
readptr_flags |= EXEC_FLAG_BACKWARD;
724
agg_winobj->readptr = tuplestore_alloc_read_pointer(winstate->buffer,
726
agg_winobj->markpos = -1;
727
agg_winobj->seekpos = -1;
729
/* Also reset the row counters for aggregates */
730
winstate->aggregatedbase = 0;
731
winstate->aggregatedupto = 0;
734
/* create mark and read pointers for each real window function */
735
for (i = 0; i < numfuncs; i++)
737
WindowStatePerFunc perfuncstate = &(winstate->perfunc[i]);
739
if (!perfuncstate->plain_agg)
741
WindowObject winobj = perfuncstate->winobj;
743
winobj->markptr = tuplestore_alloc_read_pointer(winstate->buffer,
745
winobj->readptr = tuplestore_alloc_read_pointer(winstate->buffer,
747
winobj->markpos = -1;
748
winobj->seekpos = -1;
753
* Store the first tuple into the tuplestore (it's always available now;
754
* we either read it above, or saved it at the end of previous partition)
756
tuplestore_puttupleslot(winstate->buffer, winstate->first_part_slot);
757
winstate->spooled_rows++;
761
* Read tuples from the outer node, up to and including position 'pos', and
762
* store them into the tuplestore. If pos is -1, reads the whole partition.
765
spool_tuples(WindowAggState *winstate, int64 pos)
767
WindowAgg *node = (WindowAgg *) winstate->ss.ps.plan;
768
PlanState *outerPlan;
769
TupleTableSlot *outerslot;
770
MemoryContext oldcontext;
772
if (!winstate->buffer)
773
return; /* just a safety check */
774
if (winstate->partition_spooled)
775
return; /* whole partition done already */
778
* If the tuplestore has spilled to disk, alternate reading and writing
779
* becomes quite expensive due to frequent buffer flushes. It's cheaper
780
* to force the entire partition to get spooled in one go.
782
* XXX this is a horrid kluge --- it'd be better to fix the performance
783
* problem inside tuplestore. FIXME
785
if (!tuplestore_in_memory(winstate->buffer))
788
outerPlan = outerPlanState(winstate);
790
/* Must be in query context to call outerplan */
791
oldcontext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_query_memory);
793
while (winstate->spooled_rows <= pos || pos == -1)
795
outerslot = ExecProcNode(outerPlan);
796
if (TupIsNull(outerslot))
798
/* reached the end of the last partition */
799
winstate->partition_spooled = true;
800
winstate->more_partitions = false;
804
if (node->partNumCols > 0)
806
/* Check if this tuple still belongs to the current partition */
807
if (!execTuplesMatch(winstate->first_part_slot,
809
node->partNumCols, node->partColIdx,
810
winstate->partEqfunctions,
811
winstate->tmpcontext->ecxt_per_tuple_memory))
814
* end of partition; copy the tuple for the next cycle.
816
ExecCopySlot(winstate->first_part_slot, outerslot);
817
winstate->partition_spooled = true;
818
winstate->more_partitions = true;
823
/* Still in partition, so save it into the tuplestore */
824
tuplestore_puttupleslot(winstate->buffer, outerslot);
825
winstate->spooled_rows++;
828
MemoryContextSwitchTo(oldcontext);
833
* clear information kept within a partition, including
834
* tuplestore and aggregate results.
837
release_partition(WindowAggState *winstate)
841
for (i = 0; i < winstate->numfuncs; i++)
843
WindowStatePerFunc perfuncstate = &(winstate->perfunc[i]);
845
/* Release any partition-local state of this window function */
846
if (perfuncstate->winobj)
847
perfuncstate->winobj->localmem = NULL;
851
* Release all partition-local memory (in particular, any partition-local
852
* state that we might have trashed our pointers to in the above loop, and
853
* any aggregate temp data). We don't rely on retail pfree because some
854
* aggregates might have allocated data we don't have direct pointers to.
856
MemoryContextResetAndDeleteChildren(winstate->partcontext);
857
MemoryContextResetAndDeleteChildren(winstate->aggcontext);
859
if (winstate->buffer)
860
tuplestore_end(winstate->buffer);
861
winstate->buffer = NULL;
862
winstate->partition_spooled = false;
867
* Determine whether a row is in the current row's window frame according
868
* to our window framing rule
870
* The caller must have already determined that the row is in the partition
871
* and fetched it into a slot. This function just encapsulates the framing
875
row_is_in_frame(WindowAggState *winstate, int64 pos, TupleTableSlot *slot)
877
int frameOptions = winstate->frameOptions;
879
Assert(pos >= 0); /* else caller error */
881
/* First, check frame starting conditions */
882
if (frameOptions & FRAMEOPTION_START_CURRENT_ROW)
884
if (frameOptions & FRAMEOPTION_ROWS)
886
/* rows before current row are out of frame */
887
if (pos < winstate->currentpos)
890
else if (frameOptions & FRAMEOPTION_RANGE)
892
/* preceding row that is not peer is out of frame */
893
if (pos < winstate->currentpos &&
894
!are_peers(winstate, slot, winstate->ss.ss_ScanTupleSlot))
900
else if (frameOptions & FRAMEOPTION_START_VALUE)
902
if (frameOptions & FRAMEOPTION_ROWS)
904
int64 offset = DatumGetInt64(winstate->startOffsetValue);
906
/* rows before current row + offset are out of frame */
907
if (frameOptions & FRAMEOPTION_START_VALUE_PRECEDING)
910
if (pos < winstate->currentpos + offset)
913
else if (frameOptions & FRAMEOPTION_RANGE)
915
/* parser should have rejected this */
916
elog(ERROR, "window frame with value offset is not implemented");
922
/* Okay so far, now check frame ending conditions */
923
if (frameOptions & FRAMEOPTION_END_CURRENT_ROW)
925
if (frameOptions & FRAMEOPTION_ROWS)
927
/* rows after current row are out of frame */
928
if (pos > winstate->currentpos)
931
else if (frameOptions & FRAMEOPTION_RANGE)
933
/* following row that is not peer is out of frame */
934
if (pos > winstate->currentpos &&
935
!are_peers(winstate, slot, winstate->ss.ss_ScanTupleSlot))
941
else if (frameOptions & FRAMEOPTION_END_VALUE)
943
if (frameOptions & FRAMEOPTION_ROWS)
945
int64 offset = DatumGetInt64(winstate->endOffsetValue);
947
/* rows after current row + offset are out of frame */
948
if (frameOptions & FRAMEOPTION_END_VALUE_PRECEDING)
951
if (pos > winstate->currentpos + offset)
954
else if (frameOptions & FRAMEOPTION_RANGE)
956
/* parser should have rejected this */
957
elog(ERROR, "window frame with value offset is not implemented");
963
/* If we get here, it's in frame */
968
* update_frameheadpos
969
* make frameheadpos valid for the current row
971
* Uses the winobj's read pointer for any required fetches; hence, if the
972
* frame mode is one that requires row comparisons, the winobj's mark must
973
* not be past the currently known frame head. Also uses the specified slot
974
* for any required fetches.
977
update_frameheadpos(WindowObject winobj, TupleTableSlot *slot)
979
WindowAggState *winstate = winobj->winstate;
980
WindowAgg *node = (WindowAgg *) winstate->ss.ps.plan;
981
int frameOptions = winstate->frameOptions;
983
if (winstate->framehead_valid)
984
return; /* already known for current row */
986
if (frameOptions & FRAMEOPTION_START_UNBOUNDED_PRECEDING)
988
/* In UNBOUNDED PRECEDING mode, frame head is always row 0 */
989
winstate->frameheadpos = 0;
990
winstate->framehead_valid = true;
992
else if (frameOptions & FRAMEOPTION_START_CURRENT_ROW)
994
if (frameOptions & FRAMEOPTION_ROWS)
996
/* In ROWS mode, frame head is the same as current */
997
winstate->frameheadpos = winstate->currentpos;
998
winstate->framehead_valid = true;
1000
else if (frameOptions & FRAMEOPTION_RANGE)
1004
/* If no ORDER BY, all rows are peers with each other */
1005
if (node->ordNumCols == 0)
1007
winstate->frameheadpos = 0;
1008
winstate->framehead_valid = true;
1013
* In RANGE START_CURRENT mode, frame head is the first row that
1014
* is a peer of current row. We search backwards from current,
1015
* which could be a bit inefficient if peer sets are large. Might
1016
* be better to have a separate read pointer that moves forward
1017
* tracking the frame head.
1019
fhprev = winstate->currentpos - 1;
1022
/* assume the frame head can't go backwards */
1023
if (fhprev < winstate->frameheadpos)
1025
if (!window_gettupleslot(winobj, fhprev, slot))
1026
break; /* start of partition */
1027
if (!are_peers(winstate, slot, winstate->ss.ss_ScanTupleSlot))
1028
break; /* not peer of current row */
1031
winstate->frameheadpos = fhprev + 1;
1032
winstate->framehead_valid = true;
1037
else if (frameOptions & FRAMEOPTION_START_VALUE)
1039
if (frameOptions & FRAMEOPTION_ROWS)
1041
/* In ROWS mode, bound is physically n before/after current */
1042
int64 offset = DatumGetInt64(winstate->startOffsetValue);
1044
if (frameOptions & FRAMEOPTION_START_VALUE_PRECEDING)
1047
winstate->frameheadpos = winstate->currentpos + offset;
1048
/* frame head can't go before first row */
1049
if (winstate->frameheadpos < 0)
1050
winstate->frameheadpos = 0;
1051
else if (winstate->frameheadpos > winstate->currentpos)
1053
/* make sure frameheadpos is not past end of partition */
1054
spool_tuples(winstate, winstate->frameheadpos - 1);
1055
if (winstate->frameheadpos > winstate->spooled_rows)
1056
winstate->frameheadpos = winstate->spooled_rows;
1058
winstate->framehead_valid = true;
1060
else if (frameOptions & FRAMEOPTION_RANGE)
1062
/* parser should have rejected this */
1063
elog(ERROR, "window frame with value offset is not implemented");
1073
* update_frametailpos
1074
* make frametailpos valid for the current row
1076
* Uses the winobj's read pointer for any required fetches; hence, if the
1077
* frame mode is one that requires row comparisons, the winobj's mark must
1078
* not be past the currently known frame tail. Also uses the specified slot
1079
* for any required fetches.
1082
update_frametailpos(WindowObject winobj, TupleTableSlot *slot)
1084
WindowAggState *winstate = winobj->winstate;
1085
WindowAgg *node = (WindowAgg *) winstate->ss.ps.plan;
1086
int frameOptions = winstate->frameOptions;
1088
if (winstate->frametail_valid)
1089
return; /* already known for current row */
1091
if (frameOptions & FRAMEOPTION_END_UNBOUNDED_FOLLOWING)
1093
/* In UNBOUNDED FOLLOWING mode, all partition rows are in frame */
1094
spool_tuples(winstate, -1);
1095
winstate->frametailpos = winstate->spooled_rows - 1;
1096
winstate->frametail_valid = true;
1098
else if (frameOptions & FRAMEOPTION_END_CURRENT_ROW)
1100
if (frameOptions & FRAMEOPTION_ROWS)
1102
/* In ROWS mode, exactly the rows up to current are in frame */
1103
winstate->frametailpos = winstate->currentpos;
1104
winstate->frametail_valid = true;
1106
else if (frameOptions & FRAMEOPTION_RANGE)
1110
/* If no ORDER BY, all rows are peers with each other */
1111
if (node->ordNumCols == 0)
1113
spool_tuples(winstate, -1);
1114
winstate->frametailpos = winstate->spooled_rows - 1;
1115
winstate->frametail_valid = true;
1120
* Else we have to search for the first non-peer of the current
1121
* row. We assume the current value of frametailpos is a lower
1122
* bound on the possible frame tail location, ie, frame tail never
1123
* goes backward, and that currentpos is also a lower bound, ie,
1124
* frame end always >= current row.
1126
ftnext = Max(winstate->frametailpos, winstate->currentpos) + 1;
1129
if (!window_gettupleslot(winobj, ftnext, slot))
1130
break; /* end of partition */
1131
if (!are_peers(winstate, slot, winstate->ss.ss_ScanTupleSlot))
1132
break; /* not peer of current row */
1135
winstate->frametailpos = ftnext - 1;
1136
winstate->frametail_valid = true;
1141
else if (frameOptions & FRAMEOPTION_END_VALUE)
1143
if (frameOptions & FRAMEOPTION_ROWS)
1145
/* In ROWS mode, bound is physically n before/after current */
1146
int64 offset = DatumGetInt64(winstate->endOffsetValue);
1148
if (frameOptions & FRAMEOPTION_END_VALUE_PRECEDING)
1151
winstate->frametailpos = winstate->currentpos + offset;
1152
/* smallest allowable value of frametailpos is -1 */
1153
if (winstate->frametailpos < 0)
1154
winstate->frametailpos = -1;
1155
else if (winstate->frametailpos > winstate->currentpos)
1157
/* make sure frametailpos is not past last row of partition */
1158
spool_tuples(winstate, winstate->frametailpos);
1159
if (winstate->frametailpos >= winstate->spooled_rows)
1160
winstate->frametailpos = winstate->spooled_rows - 1;
1162
winstate->frametail_valid = true;
1164
else if (frameOptions & FRAMEOPTION_RANGE)
1166
/* parser should have rejected this */
1167
elog(ERROR, "window frame with value offset is not implemented");
1177
/* -----------------
1180
* ExecWindowAgg receives tuples from its outer subplan and
1181
* stores them into a tuplestore, then processes window functions.
1182
* This node doesn't reduce nor qualify any row so the number of
1183
* returned rows is exactly the same as its outer subplan's result
1184
* (ignoring the case of SRFs in the targetlist, that is).
1188
ExecWindowAgg(WindowAggState *winstate)
1190
TupleTableSlot *result;
1191
ExprDoneCond isDone;
1192
ExprContext *econtext;
1196
if (winstate->all_done)
1200
* Check to see if we're still projecting out tuples from a previous
1201
* output tuple (because there is a function-returning-set in the
1202
* projection expressions). If so, try to project another one.
1204
if (winstate->ss.ps.ps_TupFromTlist)
1206
TupleTableSlot *result;
1207
ExprDoneCond isDone;
1209
result = ExecProject(winstate->ss.ps.ps_ProjInfo, &isDone);
1210
if (isDone == ExprMultipleResult)
1212
/* Done with that source tuple... */
1213
winstate->ss.ps.ps_TupFromTlist = false;
1217
* Compute frame offset values, if any, during first call.
1219
if (winstate->all_first)
1221
int frameOptions = winstate->frameOptions;
1222
ExprContext *econtext = winstate->ss.ps.ps_ExprContext;
1228
if (frameOptions & FRAMEOPTION_START_VALUE)
1230
Assert(winstate->startOffset != NULL);
1231
value = ExecEvalExprSwitchContext(winstate->startOffset,
1237
(errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
1238
errmsg("frame starting offset must not be null")));
1239
/* copy value into query-lifespan context */
1240
get_typlenbyval(exprType((Node *) winstate->startOffset->expr),
1242
winstate->startOffsetValue = datumCopy(value, byval, len);
1243
if (frameOptions & FRAMEOPTION_ROWS)
1245
/* value is known to be int8 */
1246
int64 offset = DatumGetInt64(value);
1250
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1251
errmsg("frame starting offset must not be negative")));
1254
if (frameOptions & FRAMEOPTION_END_VALUE)
1256
Assert(winstate->endOffset != NULL);
1257
value = ExecEvalExprSwitchContext(winstate->endOffset,
1263
(errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
1264
errmsg("frame ending offset must not be null")));
1265
/* copy value into query-lifespan context */
1266
get_typlenbyval(exprType((Node *) winstate->endOffset->expr),
1268
winstate->endOffsetValue = datumCopy(value, byval, len);
1269
if (frameOptions & FRAMEOPTION_ROWS)
1271
/* value is known to be int8 */
1272
int64 offset = DatumGetInt64(value);
1276
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1277
errmsg("frame ending offset must not be negative")));
1280
winstate->all_first = false;
1284
if (winstate->buffer == NULL)
1286
/* Initialize for first partition and set current row = 0 */
1287
begin_partition(winstate);
1288
/* If there are no input rows, we'll detect that and exit below */
1292
/* Advance current row within partition */
1293
winstate->currentpos++;
1294
/* This might mean that the frame moves, too */
1295
winstate->framehead_valid = false;
1296
winstate->frametail_valid = false;
1300
* Spool all tuples up to and including the current row, if we haven't
1303
spool_tuples(winstate, winstate->currentpos);
1305
/* Move to the next partition if we reached the end of this partition */
1306
if (winstate->partition_spooled &&
1307
winstate->currentpos >= winstate->spooled_rows)
1309
release_partition(winstate);
1311
if (winstate->more_partitions)
1313
begin_partition(winstate);
1314
Assert(winstate->spooled_rows > 0);
1318
winstate->all_done = true;
1323
/* final output execution is in ps_ExprContext */
1324
econtext = winstate->ss.ps.ps_ExprContext;
1326
/* Clear the per-output-tuple context for current row */
1327
ResetExprContext(econtext);
1330
* Read the current row from the tuplestore, and save in ScanTupleSlot.
1331
* (We can't rely on the outerplan's output slot because we may have to
1332
* read beyond the current row. Also, we have to actually copy the row
1333
* out of the tuplestore, since window function evaluation might cause the
1334
* tuplestore to dump its state to disk.)
1336
* Current row must be in the tuplestore, since we spooled it above.
1338
tuplestore_select_read_pointer(winstate->buffer, winstate->current_ptr);
1339
if (!tuplestore_gettupleslot(winstate->buffer, true, true,
1340
winstate->ss.ss_ScanTupleSlot))
1341
elog(ERROR, "unexpected end of tuplestore");
1344
* Evaluate true window functions
1346
numfuncs = winstate->numfuncs;
1347
for (i = 0; i < numfuncs; i++)
1349
WindowStatePerFunc perfuncstate = &(winstate->perfunc[i]);
1351
if (perfuncstate->plain_agg)
1353
eval_windowfunction(winstate, perfuncstate,
1354
&(econtext->ecxt_aggvalues[perfuncstate->wfuncstate->wfuncno]),
1355
&(econtext->ecxt_aggnulls[perfuncstate->wfuncstate->wfuncno]));
1359
* Evaluate aggregates
1361
if (winstate->numaggs > 0)
1362
eval_windowaggregates(winstate);
1365
* Truncate any no-longer-needed rows from the tuplestore.
1367
tuplestore_trim(winstate->buffer);
1370
* Form and return a projection tuple using the windowfunc results and the
1371
* current row. Setting ecxt_outertuple arranges that any Vars will be
1372
* evaluated with respect to that row.
1374
econtext->ecxt_outertuple = winstate->ss.ss_ScanTupleSlot;
1375
result = ExecProject(winstate->ss.ps.ps_ProjInfo, &isDone);
1377
if (isDone == ExprEndResult)
1379
/* SRF in tlist returned no rows, so advance to next input tuple */
1383
winstate->ss.ps.ps_TupFromTlist =
1384
(isDone == ExprMultipleResult);
1388
/* -----------------
1391
* Creates the run-time information for the WindowAgg node produced by the
1392
* planner and initializes its outer subtree
1396
ExecInitWindowAgg(WindowAgg *node, EState *estate, int eflags)
1398
WindowAggState *winstate;
1400
ExprContext *econtext;
1401
ExprContext *tmpcontext;
1402
WindowStatePerFunc perfunc;
1403
WindowStatePerAgg peragg;
1410
/* check for unsupported flags */
1411
Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
1414
* create state structure
1416
winstate = makeNode(WindowAggState);
1417
winstate->ss.ps.plan = (Plan *) node;
1418
winstate->ss.ps.state = estate;
1421
* Create expression contexts. We need two, one for per-input-tuple
1422
* processing and one for per-output-tuple processing. We cheat a little
1423
* by using ExecAssignExprContext() to build both.
1425
ExecAssignExprContext(estate, &winstate->ss.ps);
1426
tmpcontext = winstate->ss.ps.ps_ExprContext;
1427
winstate->tmpcontext = tmpcontext;
1428
ExecAssignExprContext(estate, &winstate->ss.ps);
1430
/* Create long-lived context for storage of partition-local memory etc */
1431
winstate->partcontext =
1432
AllocSetContextCreate(CurrentMemoryContext,
1433
"WindowAgg_Partition",
1434
ALLOCSET_DEFAULT_MINSIZE,
1435
ALLOCSET_DEFAULT_INITSIZE,
1436
ALLOCSET_DEFAULT_MAXSIZE);
1438
/* Create mid-lived context for aggregate trans values etc */
1439
winstate->aggcontext =
1440
AllocSetContextCreate(CurrentMemoryContext,
1441
"WindowAgg_Aggregates",
1442
ALLOCSET_DEFAULT_MINSIZE,
1443
ALLOCSET_DEFAULT_INITSIZE,
1444
ALLOCSET_DEFAULT_MAXSIZE);
1447
* tuple table initialization
1449
ExecInitScanTupleSlot(estate, &winstate->ss);
1450
ExecInitResultTupleSlot(estate, &winstate->ss.ps);
1451
winstate->first_part_slot = ExecInitExtraTupleSlot(estate);
1452
winstate->agg_row_slot = ExecInitExtraTupleSlot(estate);
1453
winstate->temp_slot_1 = ExecInitExtraTupleSlot(estate);
1454
winstate->temp_slot_2 = ExecInitExtraTupleSlot(estate);
1456
winstate->ss.ps.targetlist = (List *)
1457
ExecInitExpr((Expr *) node->plan.targetlist,
1458
(PlanState *) winstate);
1461
* WindowAgg nodes never have quals, since they can only occur at the
1462
* logical top level of a query (ie, after any WHERE or HAVING filters)
1464
Assert(node->plan.qual == NIL);
1465
winstate->ss.ps.qual = NIL;
1468
* initialize child nodes
1470
outerPlan = outerPlan(node);
1471
outerPlanState(winstate) = ExecInitNode(outerPlan, estate, eflags);
1474
* initialize source tuple type (which is also the tuple type that we'll
1475
* store in the tuplestore and use in all our working slots).
1477
ExecAssignScanTypeFromOuterPlan(&winstate->ss);
1479
ExecSetSlotDescriptor(winstate->first_part_slot,
1480
winstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor);
1481
ExecSetSlotDescriptor(winstate->agg_row_slot,
1482
winstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor);
1483
ExecSetSlotDescriptor(winstate->temp_slot_1,
1484
winstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor);
1485
ExecSetSlotDescriptor(winstate->temp_slot_2,
1486
winstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor);
1489
* Initialize result tuple type and projection info.
1491
ExecAssignResultTypeFromTL(&winstate->ss.ps);
1492
ExecAssignProjectionInfo(&winstate->ss.ps, NULL);
1494
winstate->ss.ps.ps_TupFromTlist = false;
1496
/* Set up data for comparing tuples */
1497
if (node->partNumCols > 0)
1498
winstate->partEqfunctions = execTuplesMatchPrepare(node->partNumCols,
1499
node->partOperators);
1500
if (node->ordNumCols > 0)
1501
winstate->ordEqfunctions = execTuplesMatchPrepare(node->ordNumCols,
1502
node->ordOperators);
1505
* WindowAgg nodes use aggvalues and aggnulls as well as Agg nodes.
1507
numfuncs = winstate->numfuncs;
1508
numaggs = winstate->numaggs;
1509
econtext = winstate->ss.ps.ps_ExprContext;
1510
econtext->ecxt_aggvalues = (Datum *) palloc0(sizeof(Datum) * numfuncs);
1511
econtext->ecxt_aggnulls = (bool *) palloc0(sizeof(bool) * numfuncs);
1514
* allocate per-wfunc/per-agg state information.
1516
perfunc = (WindowStatePerFunc) palloc0(sizeof(WindowStatePerFuncData) * numfuncs);
1517
peragg = (WindowStatePerAgg) palloc0(sizeof(WindowStatePerAggData) * numaggs);
1518
winstate->perfunc = perfunc;
1519
winstate->peragg = peragg;
1523
foreach(l, winstate->funcs)
1525
WindowFuncExprState *wfuncstate = (WindowFuncExprState *) lfirst(l);
1526
WindowFunc *wfunc = (WindowFunc *) wfuncstate->xprstate.expr;
1527
WindowStatePerFunc perfuncstate;
1528
AclResult aclresult;
1531
if (wfunc->winref != node->winref) /* planner screwed up? */
1532
elog(ERROR, "WindowFunc with winref %u assigned to WindowAgg with winref %u",
1533
wfunc->winref, node->winref);
1535
/* Look for a previous duplicate window function */
1536
for (i = 0; i <= wfuncno; i++)
1538
if (equal(wfunc, perfunc[i].wfunc) &&
1539
!contain_volatile_functions((Node *) wfunc))
1544
/* Found a match to an existing entry, so just mark it */
1545
wfuncstate->wfuncno = i;
1549
/* Nope, so assign a new PerAgg record */
1550
perfuncstate = &perfunc[++wfuncno];
1552
/* Mark WindowFunc state node with assigned index in the result array */
1553
wfuncstate->wfuncno = wfuncno;
1555
/* Check permission to call window function */
1556
aclresult = pg_proc_aclcheck(wfunc->winfnoid, GetUserId(),
1558
if (aclresult != ACLCHECK_OK)
1559
aclcheck_error(aclresult, ACL_KIND_PROC,
1560
get_func_name(wfunc->winfnoid));
1562
/* Fill in the perfuncstate data */
1563
perfuncstate->wfuncstate = wfuncstate;
1564
perfuncstate->wfunc = wfunc;
1565
perfuncstate->numArguments = list_length(wfuncstate->args);
1567
fmgr_info_cxt(wfunc->winfnoid, &perfuncstate->flinfo,
1568
econtext->ecxt_per_query_memory);
1569
fmgr_info_set_expr((Node *) wfunc, &perfuncstate->flinfo);
1571
perfuncstate->winCollation = wfunc->inputcollid;
1573
get_typlenbyval(wfunc->wintype,
1574
&perfuncstate->resulttypeLen,
1575
&perfuncstate->resulttypeByVal);
1578
* If it's really just a plain aggregate function, we'll emulate the
1579
* Agg environment for it.
1581
perfuncstate->plain_agg = wfunc->winagg;
1584
WindowStatePerAgg peraggstate;
1586
perfuncstate->aggno = ++aggno;
1587
peraggstate = &winstate->peragg[aggno];
1588
initialize_peragg(winstate, wfunc, peraggstate);
1589
peraggstate->wfuncno = wfuncno;
1593
WindowObject winobj = makeNode(WindowObjectData);
1595
winobj->winstate = winstate;
1596
winobj->argstates = wfuncstate->args;
1597
winobj->localmem = NULL;
1598
perfuncstate->winobj = winobj;
1602
/* Update numfuncs, numaggs to match number of unique functions found */
1603
winstate->numfuncs = wfuncno + 1;
1604
winstate->numaggs = aggno + 1;
1606
/* Set up WindowObject for aggregates, if needed */
1607
if (winstate->numaggs > 0)
1609
WindowObject agg_winobj = makeNode(WindowObjectData);
1611
agg_winobj->winstate = winstate;
1612
agg_winobj->argstates = NIL;
1613
agg_winobj->localmem = NULL;
1614
/* make sure markptr = -1 to invalidate. It may not get used */
1615
agg_winobj->markptr = -1;
1616
agg_winobj->readptr = -1;
1617
winstate->agg_winobj = agg_winobj;
1620
/* copy frame options to state node for easy access */
1621
winstate->frameOptions = node->frameOptions;
1623
/* initialize frame bound offset expressions */
1624
winstate->startOffset = ExecInitExpr((Expr *) node->startOffset,
1625
(PlanState *) winstate);
1626
winstate->endOffset = ExecInitExpr((Expr *) node->endOffset,
1627
(PlanState *) winstate);
1629
winstate->all_first = true;
1630
winstate->partition_spooled = false;
1631
winstate->more_partitions = false;
1636
/* -----------------
1641
ExecEndWindowAgg(WindowAggState *node)
1643
PlanState *outerPlan;
1645
release_partition(node);
1647
pfree(node->perfunc);
1648
pfree(node->peragg);
1650
ExecClearTuple(node->ss.ss_ScanTupleSlot);
1651
ExecClearTuple(node->first_part_slot);
1652
ExecClearTuple(node->agg_row_slot);
1653
ExecClearTuple(node->temp_slot_1);
1654
ExecClearTuple(node->temp_slot_2);
1657
* Free both the expr contexts.
1659
ExecFreeExprContext(&node->ss.ps);
1660
node->ss.ps.ps_ExprContext = node->tmpcontext;
1661
ExecFreeExprContext(&node->ss.ps);
1663
MemoryContextDelete(node->partcontext);
1664
MemoryContextDelete(node->aggcontext);
1666
outerPlan = outerPlanState(node);
1667
ExecEndNode(outerPlan);
1670
/* -----------------
1671
* ExecRescanWindowAgg
1675
ExecReScanWindowAgg(WindowAggState *node)
1677
ExprContext *econtext = node->ss.ps.ps_ExprContext;
1679
node->all_done = false;
1681
node->ss.ps.ps_TupFromTlist = false;
1682
node->all_first = true;
1684
/* release tuplestore et al */
1685
release_partition(node);
1687
/* release all temp tuples, but especially first_part_slot */
1688
ExecClearTuple(node->ss.ss_ScanTupleSlot);
1689
ExecClearTuple(node->first_part_slot);
1690
ExecClearTuple(node->agg_row_slot);
1691
ExecClearTuple(node->temp_slot_1);
1692
ExecClearTuple(node->temp_slot_2);
1694
/* Forget current wfunc values */
1695
MemSet(econtext->ecxt_aggvalues, 0, sizeof(Datum) * node->numfuncs);
1696
MemSet(econtext->ecxt_aggnulls, 0, sizeof(bool) * node->numfuncs);
1699
* if chgParam of subnode is not null then plan will be re-scanned by
1700
* first ExecProcNode.
1702
if (node->ss.ps.lefttree->chgParam == NULL)
1703
ExecReScan(node->ss.ps.lefttree);
1709
* Almost same as in nodeAgg.c, except we don't support DISTINCT currently.
1711
static WindowStatePerAggData *
1712
initialize_peragg(WindowAggState *winstate, WindowFunc *wfunc,
1713
WindowStatePerAgg peraggstate)
1715
Oid inputTypes[FUNC_MAX_ARGS];
1718
Form_pg_aggregate aggform;
1720
AclResult aclresult;
1729
numArguments = list_length(wfunc->args);
1732
foreach(lc, wfunc->args)
1734
inputTypes[i++] = exprType((Node *) lfirst(lc));
1737
aggTuple = SearchSysCache1(AGGFNOID, ObjectIdGetDatum(wfunc->winfnoid));
1738
if (!HeapTupleIsValid(aggTuple))
1739
elog(ERROR, "cache lookup failed for aggregate %u",
1741
aggform = (Form_pg_aggregate) GETSTRUCT(aggTuple);
1744
* ExecInitWindowAgg already checked permission to call aggregate function
1745
* ... but we still need to check the component functions
1748
peraggstate->transfn_oid = transfn_oid = aggform->aggtransfn;
1749
peraggstate->finalfn_oid = finalfn_oid = aggform->aggfinalfn;
1751
/* Check that aggregate owner has permission to call component fns */
1753
HeapTuple procTuple;
1756
procTuple = SearchSysCache1(PROCOID,
1757
ObjectIdGetDatum(wfunc->winfnoid));
1758
if (!HeapTupleIsValid(procTuple))
1759
elog(ERROR, "cache lookup failed for function %u",
1761
aggOwner = ((Form_pg_proc) GETSTRUCT(procTuple))->proowner;
1762
ReleaseSysCache(procTuple);
1764
aclresult = pg_proc_aclcheck(transfn_oid, aggOwner,
1766
if (aclresult != ACLCHECK_OK)
1767
aclcheck_error(aclresult, ACL_KIND_PROC,
1768
get_func_name(transfn_oid));
1769
if (OidIsValid(finalfn_oid))
1771
aclresult = pg_proc_aclcheck(finalfn_oid, aggOwner,
1773
if (aclresult != ACLCHECK_OK)
1774
aclcheck_error(aclresult, ACL_KIND_PROC,
1775
get_func_name(finalfn_oid));
1779
/* resolve actual type of transition state, if polymorphic */
1780
aggtranstype = aggform->aggtranstype;
1781
if (IsPolymorphicType(aggtranstype))
1783
/* have to fetch the agg's declared input types... */
1784
Oid *declaredArgTypes;
1787
get_func_signature(wfunc->winfnoid,
1788
&declaredArgTypes, &agg_nargs);
1789
Assert(agg_nargs == numArguments);
1790
aggtranstype = enforce_generic_type_consistency(inputTypes,
1795
pfree(declaredArgTypes);
1798
/* build expression trees using actual argument & result types */
1799
build_aggregate_fnexprs(inputTypes,
1809
fmgr_info(transfn_oid, &peraggstate->transfn);
1810
fmgr_info_set_expr((Node *) transfnexpr, &peraggstate->transfn);
1812
if (OidIsValid(finalfn_oid))
1814
fmgr_info(finalfn_oid, &peraggstate->finalfn);
1815
fmgr_info_set_expr((Node *) finalfnexpr, &peraggstate->finalfn);
1818
get_typlenbyval(wfunc->wintype,
1819
&peraggstate->resulttypeLen,
1820
&peraggstate->resulttypeByVal);
1821
get_typlenbyval(aggtranstype,
1822
&peraggstate->transtypeLen,
1823
&peraggstate->transtypeByVal);
1826
* initval is potentially null, so don't try to access it as a struct
1827
* field. Must do it the hard way with SysCacheGetAttr.
1829
textInitVal = SysCacheGetAttr(AGGFNOID, aggTuple,
1830
Anum_pg_aggregate_agginitval,
1831
&peraggstate->initValueIsNull);
1833
if (peraggstate->initValueIsNull)
1834
peraggstate->initValue = (Datum) 0;
1836
peraggstate->initValue = GetAggInitVal(textInitVal,
1840
* If the transfn is strict and the initval is NULL, make sure input type
1841
* and transtype are the same (or at least binary-compatible), so that
1842
* it's OK to use the first input value as the initial transValue. This
1843
* should have been checked at agg definition time, but just in case...
1845
if (peraggstate->transfn.fn_strict && peraggstate->initValueIsNull)
1847
if (numArguments < 1 ||
1848
!IsBinaryCoercible(inputTypes[0], aggtranstype))
1850
(errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
1851
errmsg("aggregate %u needs to have compatible input type and transition type",
1855
ReleaseSysCache(aggTuple);
1861
GetAggInitVal(Datum textInitVal, Oid transtype)
1868
getTypeInputInfo(transtype, &typinput, &typioparam);
1869
strInitVal = TextDatumGetCString(textInitVal);
1870
initVal = OidInputFunctionCall(typinput, strInitVal,
1878
* compare two rows to see if they are equal according to the ORDER BY clause
1880
* NB: this does not consider the window frame mode.
1883
are_peers(WindowAggState *winstate, TupleTableSlot *slot1,
1884
TupleTableSlot *slot2)
1886
WindowAgg *node = (WindowAgg *) winstate->ss.ps.plan;
1888
/* If no ORDER BY, all rows are peers with each other */
1889
if (node->ordNumCols == 0)
1892
return execTuplesMatch(slot1, slot2,
1893
node->ordNumCols, node->ordColIdx,
1894
winstate->ordEqfunctions,
1895
winstate->tmpcontext->ecxt_per_tuple_memory);
1899
* window_gettupleslot
1900
* Fetch the pos'th tuple of the current partition into the slot,
1901
* using the winobj's read pointer
1903
* Returns true if successful, false if no such row
1906
window_gettupleslot(WindowObject winobj, int64 pos, TupleTableSlot *slot)
1908
WindowAggState *winstate = winobj->winstate;
1909
MemoryContext oldcontext;
1911
/* Don't allow passing -1 to spool_tuples here */
1915
/* If necessary, fetch the tuple into the spool */
1916
spool_tuples(winstate, pos);
1918
if (pos >= winstate->spooled_rows)
1921
if (pos < winobj->markpos)
1922
elog(ERROR, "cannot fetch row before WindowObject's mark position");
1924
oldcontext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_query_memory);
1926
tuplestore_select_read_pointer(winstate->buffer, winobj->readptr);
1929
* There's no API to refetch the tuple at the current position. We have to
1930
* move one tuple forward, and then one backward. (We don't do it the
1931
* other way because we might try to fetch the row before our mark, which
1932
* isn't allowed.) XXX this case could stand to be optimized.
1934
if (winobj->seekpos == pos)
1936
tuplestore_advance(winstate->buffer, true);
1940
while (winobj->seekpos > pos)
1942
if (!tuplestore_gettupleslot(winstate->buffer, false, true, slot))
1943
elog(ERROR, "unexpected end of tuplestore");
1947
while (winobj->seekpos < pos)
1949
if (!tuplestore_gettupleslot(winstate->buffer, true, true, slot))
1950
elog(ERROR, "unexpected end of tuplestore");
1954
MemoryContextSwitchTo(oldcontext);
1960
/***********************************************************************
1961
* API exposed to window functions
1962
***********************************************************************/
1966
* WinGetPartitionLocalMemory
1967
* Get working memory that lives till end of partition processing
1969
* On first call within a given partition, this allocates and zeroes the
1970
* requested amount of space. Subsequent calls just return the same chunk.
1972
* Memory obtained this way is normally used to hold state that should be
1973
* automatically reset for each new partition. If a window function wants
1974
* to hold state across the whole query, fcinfo->fn_extra can be used in the
1975
* usual way for that.
1978
WinGetPartitionLocalMemory(WindowObject winobj, Size sz)
1980
Assert(WindowObjectIsValid(winobj));
1981
if (winobj->localmem == NULL)
1983
MemoryContextAllocZero(winobj->winstate->partcontext, sz);
1984
return winobj->localmem;
1988
* WinGetCurrentPosition
1989
* Return the current row's position (counting from 0) within the current
1993
WinGetCurrentPosition(WindowObject winobj)
1995
Assert(WindowObjectIsValid(winobj));
1996
return winobj->winstate->currentpos;
2000
* WinGetPartitionRowCount
2001
* Return total number of rows contained in the current partition.
2003
* Note: this is a relatively expensive operation because it forces the
2004
* whole partition to be "spooled" into the tuplestore at once. Once
2005
* executed, however, additional calls within the same partition are cheap.
2008
WinGetPartitionRowCount(WindowObject winobj)
2010
Assert(WindowObjectIsValid(winobj));
2011
spool_tuples(winobj->winstate, -1);
2012
return winobj->winstate->spooled_rows;
2016
* WinSetMarkPosition
2017
* Set the "mark" position for the window object, which is the oldest row
2018
* number (counting from 0) it is allowed to fetch during all subsequent
2019
* operations within the current partition.
2021
* Window functions do not have to call this, but are encouraged to move the
2022
* mark forward when possible to keep the tuplestore size down and prevent
2023
* having to spill rows to disk.
2026
WinSetMarkPosition(WindowObject winobj, int64 markpos)
2028
WindowAggState *winstate;
2030
Assert(WindowObjectIsValid(winobj));
2031
winstate = winobj->winstate;
2033
if (markpos < winobj->markpos)
2034
elog(ERROR, "cannot move WindowObject's mark position backward");
2035
tuplestore_select_read_pointer(winstate->buffer, winobj->markptr);
2036
while (markpos > winobj->markpos)
2038
tuplestore_advance(winstate->buffer, true);
2041
tuplestore_select_read_pointer(winstate->buffer, winobj->readptr);
2042
while (markpos > winobj->seekpos)
2044
tuplestore_advance(winstate->buffer, true);
2051
* Compare two rows (specified by absolute position in window) to see
2052
* if they are equal according to the ORDER BY clause.
2054
* NB: this does not consider the window frame mode.
2057
WinRowsArePeers(WindowObject winobj, int64 pos1, int64 pos2)
2059
WindowAggState *winstate;
2061
TupleTableSlot *slot1;
2062
TupleTableSlot *slot2;
2065
Assert(WindowObjectIsValid(winobj));
2066
winstate = winobj->winstate;
2067
node = (WindowAgg *) winstate->ss.ps.plan;
2069
/* If no ORDER BY, all rows are peers; don't bother to fetch them */
2070
if (node->ordNumCols == 0)
2073
slot1 = winstate->temp_slot_1;
2074
slot2 = winstate->temp_slot_2;
2076
if (!window_gettupleslot(winobj, pos1, slot1))
2077
elog(ERROR, "specified position is out of window: " INT64_FORMAT,
2079
if (!window_gettupleslot(winobj, pos2, slot2))
2080
elog(ERROR, "specified position is out of window: " INT64_FORMAT,
2083
res = are_peers(winstate, slot1, slot2);
2085
ExecClearTuple(slot1);
2086
ExecClearTuple(slot2);
2092
* WinGetFuncArgInPartition
2093
* Evaluate a window function's argument expression on a specified
2094
* row of the partition. The row is identified in lseek(2) style,
2095
* i.e. relative to the current, first, or last row.
2097
* argno: argument number to evaluate (counted from 0)
2098
* relpos: signed rowcount offset from the seek position
2099
* seektype: WINDOW_SEEK_CURRENT, WINDOW_SEEK_HEAD, or WINDOW_SEEK_TAIL
2100
* set_mark: If the row is found and set_mark is true, the mark is moved to
2101
* the row as a side-effect.
2102
* isnull: output argument, receives isnull status of result
2103
* isout: output argument, set to indicate whether target row position
2104
* is out of partition (can pass NULL if caller doesn't care about this)
2106
* Specifying a nonexistent row is not an error, it just causes a null result
2107
* (plus setting *isout true, if isout isn't NULL).
2110
WinGetFuncArgInPartition(WindowObject winobj, int argno,
2111
int relpos, int seektype, bool set_mark,
2112
bool *isnull, bool *isout)
2114
WindowAggState *winstate;
2115
ExprContext *econtext;
2116
TupleTableSlot *slot;
2120
Assert(WindowObjectIsValid(winobj));
2121
winstate = winobj->winstate;
2122
econtext = winstate->ss.ps.ps_ExprContext;
2123
slot = winstate->temp_slot_1;
2127
case WINDOW_SEEK_CURRENT:
2128
abs_pos = winstate->currentpos + relpos;
2130
case WINDOW_SEEK_HEAD:
2133
case WINDOW_SEEK_TAIL:
2134
spool_tuples(winstate, -1);
2135
abs_pos = winstate->spooled_rows - 1 + relpos;
2138
elog(ERROR, "unrecognized window seek type: %d", seektype);
2139
abs_pos = 0; /* keep compiler quiet */
2143
gottuple = window_gettupleslot(winobj, abs_pos, slot);
2158
int frameOptions = winstate->frameOptions;
2159
int64 mark_pos = abs_pos;
2162
* In RANGE mode with a moving frame head, we must not let the
2163
* mark advance past frameheadpos, since that row has to be
2164
* fetchable during future update_frameheadpos calls.
2166
* XXX it is very ugly to pollute window functions' marks with
2167
* this consideration; it could for instance mask a logic bug that
2168
* lets a window function fetch rows before what it had claimed
2169
* was its mark. Perhaps use a separate mark for frame head
2172
if ((frameOptions & FRAMEOPTION_RANGE) &&
2173
!(frameOptions & FRAMEOPTION_START_UNBOUNDED_PRECEDING))
2175
update_frameheadpos(winobj, winstate->temp_slot_2);
2176
if (mark_pos > winstate->frameheadpos)
2177
mark_pos = winstate->frameheadpos;
2179
WinSetMarkPosition(winobj, mark_pos);
2181
econtext->ecxt_outertuple = slot;
2182
return ExecEvalExpr((ExprState *) list_nth(winobj->argstates, argno),
2183
econtext, isnull, NULL);
2188
* WinGetFuncArgInFrame
2189
* Evaluate a window function's argument expression on a specified
2190
* row of the window frame. The row is identified in lseek(2) style,
2191
* i.e. relative to the current, first, or last row.
2193
* argno: argument number to evaluate (counted from 0)
2194
* relpos: signed rowcount offset from the seek position
2195
* seektype: WINDOW_SEEK_CURRENT, WINDOW_SEEK_HEAD, or WINDOW_SEEK_TAIL
2196
* set_mark: If the row is found and set_mark is true, the mark is moved to
2197
* the row as a side-effect.
2198
* isnull: output argument, receives isnull status of result
2199
* isout: output argument, set to indicate whether target row position
2200
* is out of frame (can pass NULL if caller doesn't care about this)
2202
* Specifying a nonexistent row is not an error, it just causes a null result
2203
* (plus setting *isout true, if isout isn't NULL).
2206
WinGetFuncArgInFrame(WindowObject winobj, int argno,
2207
int relpos, int seektype, bool set_mark,
2208
bool *isnull, bool *isout)
2210
WindowAggState *winstate;
2211
ExprContext *econtext;
2212
TupleTableSlot *slot;
2216
Assert(WindowObjectIsValid(winobj));
2217
winstate = winobj->winstate;
2218
econtext = winstate->ss.ps.ps_ExprContext;
2219
slot = winstate->temp_slot_1;
2223
case WINDOW_SEEK_CURRENT:
2224
abs_pos = winstate->currentpos + relpos;
2226
case WINDOW_SEEK_HEAD:
2227
update_frameheadpos(winobj, slot);
2228
abs_pos = winstate->frameheadpos + relpos;
2230
case WINDOW_SEEK_TAIL:
2231
update_frametailpos(winobj, slot);
2232
abs_pos = winstate->frametailpos + relpos;
2235
elog(ERROR, "unrecognized window seek type: %d", seektype);
2236
abs_pos = 0; /* keep compiler quiet */
2240
gottuple = window_gettupleslot(winobj, abs_pos, slot);
2242
gottuple = row_is_in_frame(winstate, abs_pos, slot);
2257
int frameOptions = winstate->frameOptions;
2258
int64 mark_pos = abs_pos;
2261
* In RANGE mode with a moving frame head, we must not let the
2262
* mark advance past frameheadpos, since that row has to be
2263
* fetchable during future update_frameheadpos calls.
2265
* XXX it is very ugly to pollute window functions' marks with
2266
* this consideration; it could for instance mask a logic bug that
2267
* lets a window function fetch rows before what it had claimed
2268
* was its mark. Perhaps use a separate mark for frame head
2271
if ((frameOptions & FRAMEOPTION_RANGE) &&
2272
!(frameOptions & FRAMEOPTION_START_UNBOUNDED_PRECEDING))
2274
update_frameheadpos(winobj, winstate->temp_slot_2);
2275
if (mark_pos > winstate->frameheadpos)
2276
mark_pos = winstate->frameheadpos;
2278
WinSetMarkPosition(winobj, mark_pos);
2280
econtext->ecxt_outertuple = slot;
2281
return ExecEvalExpr((ExprState *) list_nth(winobj->argstates, argno),
2282
econtext, isnull, NULL);
2287
* WinGetFuncArgCurrent
2288
* Evaluate a window function's argument expression on the current row.
2290
* argno: argument number to evaluate (counted from 0)
2291
* isnull: output argument, receives isnull status of result
2293
* Note: this isn't quite equivalent to WinGetFuncArgInPartition or
2294
* WinGetFuncArgInFrame targeting the current row, because it will succeed
2295
* even if the WindowObject's mark has been set beyond the current row.
2296
* This should generally be used for "ordinary" arguments of a window
2297
* function, such as the offset argument of lead() or lag().
2300
WinGetFuncArgCurrent(WindowObject winobj, int argno, bool *isnull)
2302
WindowAggState *winstate;
2303
ExprContext *econtext;
2305
Assert(WindowObjectIsValid(winobj));
2306
winstate = winobj->winstate;
2308
econtext = winstate->ss.ps.ps_ExprContext;
2310
econtext->ecxt_outertuple = winstate->ss.ss_ScanTupleSlot;
2311
return ExecEvalExpr((ExprState *) list_nth(winobj->argstates, argno),
2312
econtext, isnull, NULL);