~akiban-technologies/akiban-server/trunk

« back to all changes in this revision

Viewing changes to src/main/java/com/akiban/qp/operator/BranchLookup_Nested.java

merge mmcm: Add lookahead option to BranchLookup_Nested.

https://code.launchpad.net/~mmcm/akiban-server/pipeline-branch-lookup-nested/+merge/176825

Show diffs side-by-side

added added

removed removed

Lines of Context:
25
25
import com.akiban.qp.rowtype.RowType;
26
26
import com.akiban.qp.rowtype.UserTableRowType;
27
27
import com.akiban.qp.rowtype.*;
 
28
import com.akiban.server.api.dml.ColumnSelector;
28
29
import com.akiban.server.explain.*;
29
30
import com.akiban.server.explain.std.LookUpOperatorExplainer;
30
31
import com.akiban.util.ArgumentValidation;
33
34
import org.slf4j.Logger;
34
35
import org.slf4j.LoggerFactory;
35
36
 
 
37
import java.util.ArrayList;
 
38
import java.util.Collections;
 
39
import java.util.Comparator;
 
40
import java.util.Collection;
 
41
import java.util.List;
36
42
import java.util.Set;
37
43
 
38
44
import static java.lang.Math.min;
53
59
 
54
60
 <ul>
55
61
 
56
 
 <li><b>GroupTable groupTable:</b> The group table containing the
57
 
 ancestors of interest.
 
62
 <li><b>Group group:</b> The group containing the ancestors of interest.
58
63
 
59
64
 <li><b>RowType inputRowType:</b> Bound row will be of this type.
60
65
 
61
66
 <li><b>RowType sourceRowType:</b> Branches will be located for input
62
 
 rows of this type.
 
67
 rows of this type. Possibly a subrow of inputRowType.
63
68
 
64
69
 <li><b>UserTableRowType ancestorRowType:</b> Identifies the table in the group at which branching occurs.
65
 
 Must be an ancestor of both inputRowType's table and outputRowType's table.
 
70
 Must be an ancestor of both inputRowType's table and outputRowTypes' tables.
66
71
 
67
 
 <li><b>UserTableRowType outputRowType:</b> Type at the root of the branch to be
 
72
 <li><b>UserTableRowType outputRowTypes:</b> Types within the branch to be
68
73
 retrieved.
69
74
 
70
75
 <li><b>API.InputPreservationOption flag:</b> Indicates whether rows of type rowType
74
79
 <li><b>int inputBindingPosition:</b> Indicates input row's position in the query context. The hkey
75
80
 of this row will be used to locate ancestors.
76
81
 
 
82
 <li><b>int lookaheadQuantum:</b> Number of cursors to try to keep open by looking
 
83
  ahead in bindings stream.
 
84
 
77
85
 </ul>
78
86
 
79
87
 inputRowType may be an index row type, a user table row type, or an hkey row type. flag = KEEP_INPUT is permitted
80
88
 only for user table row types.
81
89
 
82
 
 The groupTable, inputRowType, and outputRowType must belong to the
 
90
 The groupTable, inputRowType, and outputRowTypes must belong to the
83
91
 same group.
84
92
 
85
93
 ancestorRowType's table must be an ancestor of
86
 
 inputRowType's table and outputRowType's table. outputRowType's table must be the parent
87
 
 of outputRowType's table.
 
94
 inputRowType's table and outputRowTypes' tables.
88
95
 
89
96
 <h1>Behavior</h1>
90
97
 
139
146
                             getClass().getSimpleName(),
140
147
                             group.getRoot().getName(),
141
148
                             sourceRowType,
142
 
                             outputRowType);
 
149
                             outputRowTypes);
143
150
    }
144
151
 
145
152
    // Operator interface
152
159
    @Override
153
160
    public Cursor cursor(QueryContext context, QueryBindingsCursor bindingsCursor)
154
161
    {
155
 
        return new Execution(context, bindingsCursor);
 
162
        if (lookaheadQuantum <= 1) {
 
163
            return new Execution(context, bindingsCursor);
 
164
        }
 
165
        else {
 
166
            return new LookaheadExecution(context, bindingsCursor, 
 
167
                                          context.getStore(commonAncestor),
 
168
                                          lookaheadQuantum);
 
169
        }
156
170
    }
157
171
 
158
172
    @Override
167
181
                               RowType inputRowType,
168
182
                               RowType sourceRowType,
169
183
                               UserTableRowType ancestorRowType,
170
 
                               UserTableRowType outputRowType,
 
184
                               Collection<UserTableRowType> outputRowTypes,
171
185
                               API.InputPreservationOption flag,
172
 
                               int inputBindingPosition)
 
186
                               int inputBindingPosition,
 
187
                               int lookaheadQuantum)
173
188
    {
174
189
        ArgumentValidation.notNull("group", group);
175
190
        ArgumentValidation.notNull("inputRowType", inputRowType);
176
191
        ArgumentValidation.notNull("sourceRowType", sourceRowType);
177
 
        ArgumentValidation.notNull("outputRowType", outputRowType);
 
192
        ArgumentValidation.notEmpty("outputRowTypes", outputRowTypes);
178
193
        ArgumentValidation.notNull("flag", flag);
179
194
        ArgumentValidation.isTrue("sourceRowType instanceof UserTableRowType || flag == API.InputPreservationOption.DISCARD_INPUT",
180
195
                                  sourceRowType instanceof UserTableRowType || flag == API.InputPreservationOption.DISCARD_INPUT);
185
200
        } else if (sourceRowType instanceof IndexRowType) {
186
201
            inputTableType = ((IndexRowType) sourceRowType).tableType();
187
202
        } else if (sourceRowType instanceof HKeyRowType) {
188
 
            Schema schema = outputRowType.schema();
 
203
            Schema schema = outputRowTypes.iterator().next().schema();
189
204
            inputTableType = schema.userTableRowType(sourceRowType.hKey().userTable());
190
205
        }
191
206
        assert inputTableType != null : sourceRowType;
192
207
        UserTable inputTable = inputTableType.userTable();
193
 
        UserTable outputTable = outputRowType.userTable();
194
 
        ArgumentValidation.isSame("inputTable.getGroup()",
195
 
                                  inputTable.getGroup(),
196
 
                                  "outputTable.getGroup()",
197
 
                                  outputTable.getGroup());
 
208
        ArgumentValidation.isSame("inputTable.getGroup()", inputTable.getGroup(), 
 
209
                                  "group", group);
 
210
        UserTable commonAncestor;
 
211
        if (ancestorRowType == null) {
 
212
            commonAncestor = inputTable;
 
213
        } else {
 
214
            commonAncestor = ancestorRowType.userTable();
 
215
            ArgumentValidation.isTrue("ancestorRowType.ancestorOf(inputTableType)",
 
216
                                      ancestorRowType.ancestorOf(inputTableType));
 
217
        }
 
218
        for (UserTableRowType outputRowType : outputRowTypes) {
 
219
            UserTable outputTable = outputRowType.userTable();
 
220
            ArgumentValidation.isSame("outputTable.getGroup()", outputTable.getGroup(), 
 
221
                                      "group", group);
 
222
            if (ancestorRowType == null) {
 
223
                commonAncestor = commonAncestor(commonAncestor, outputTable);
 
224
            }
 
225
            else {
 
226
                ArgumentValidation.isTrue("ancestorRowType.ancestorOf(outputRowType)",
 
227
                                          ancestorRowType.ancestorOf(outputRowType));
 
228
            }
 
229
        }
198
230
        this.group = group;
199
231
        this.inputRowType = inputRowType;
200
232
        this.sourceRowType = sourceRowType;
201
 
        this.outputRowType = outputRowType;
 
233
        this.outputRowTypes = new ArrayList<>(outputRowTypes);
 
234
        Collections.sort(this.outputRowTypes, 
 
235
                         new Comparator<UserTableRowType>() 
 
236
                         {
 
237
                             @Override
 
238
                             public int compare(UserTableRowType x, UserTableRowType y)
 
239
                                 {
 
240
                                     return x.userTable().getDepth() - y.userTable().getDepth();
 
241
                                 }
 
242
                         });
 
243
        this.commonAncestor = commonAncestor;
202
244
        this.keepInput = flag == API.InputPreservationOption.KEEP_INPUT;
203
245
        this.inputBindingPosition = inputBindingPosition;
204
 
        if (ancestorRowType == null) {
205
 
            this.commonAncestor = commonAncestor(inputTable, outputTable);
206
 
        } else {
207
 
            this.commonAncestor = ancestorRowType.userTable();
208
 
            ArgumentValidation.isTrue("ancestorRowType.ancestorOf(inputTableType)",
209
 
                                      ancestorRowType.ancestorOf(inputTableType));
210
 
            ArgumentValidation.isTrue("ancestorRowType.ancestorOf(outputRowType)",
211
 
                                      ancestorRowType.ancestorOf(outputRowType));
212
 
        }
213
 
        switch (outputTable.getDepth() - commonAncestor.getDepth()) {
214
 
            case 0:
215
 
                branchRootOrdinal = -1;
216
 
                break;
217
 
            case 1:
218
 
                branchRootOrdinal = ordinal(outputTable);
219
 
                break;
220
 
            default:
221
 
                branchRootOrdinal = -1;
222
 
                ArgumentValidation.isTrue("false", false);
223
 
                break;
 
246
        this.lookaheadQuantum = lookaheadQuantum;
 
247
        // See whether there is a single branch beneath commonAncestor
 
248
        // with all output row types.
 
249
        UserTable outputTable = this.outputRowTypes.get(0).userTable();
 
250
        boolean allOneBranch;
 
251
        if (outputTable == commonAncestor) {
 
252
            allOneBranch = false;
 
253
        }
 
254
        else {
 
255
            while (outputTable.parentTable() != commonAncestor) {
 
256
                outputTable = outputTable.parentTable();
 
257
            }
 
258
            UserTableRowType outputTableRowType = this.outputRowTypes.get(0).schema().userTableRowType(outputTable);
 
259
            allOneBranch = true;
 
260
            for (int i = 1; i < this.outputRowTypes.size(); i++) {
 
261
                if (!outputTableRowType.ancestorOf(this.outputRowTypes.get(i))) {
 
262
                    allOneBranch = false;
 
263
                    break;
 
264
                }
 
265
            }
 
266
        }
 
267
        if (allOneBranch) {
 
268
            branchRootOrdinal = ordinal(outputTable);
 
269
        }
 
270
        else {
 
271
            branchRootOrdinal = -1;
224
272
        }
225
273
        // branchRootOrdinal = -1 means that outputTable is an ancestor of inputTable. In this case, inputPrecedesBranch
226
274
        // is false. Otherwise, branchRoot's parent is the common ancestor. Find inputTable's ancestor that is also
268
316
 
269
317
    private final Group group;
270
318
    private final RowType inputRowType, sourceRowType;
271
 
    private final UserTableRowType outputRowType;
 
319
    private final List<UserTableRowType> outputRowTypes;
272
320
    private final boolean keepInput;
273
321
    // If keepInput is true, inputPrecedesBranch controls whether input row appears before the retrieved branch.
274
322
    private final boolean inputPrecedesBranch;
275
323
    private final int inputBindingPosition;
 
324
    private final int lookaheadQuantum;
276
325
    private final UserTable commonAncestor;
277
326
    private final int branchRootOrdinal;
278
327
 
281
330
    {
282
331
        Attributes atts = new Attributes();
283
332
        atts.put(Label.BINDING_POSITION, PrimitiveExplainer.getInstance(inputBindingPosition));
284
 
        atts.put(Label.OUTPUT_TYPE, outputRowType.getExplainer(context));
 
333
        for (UserTableRowType outputRowType : outputRowTypes) {
 
334
            atts.put(Label.OUTPUT_TYPE, outputRowType.getExplainer(context));
 
335
        }
 
336
        UserTableRowType outputRowType = outputRowTypes.get(0);
285
337
        UserTableRowType ancestorRowType = outputRowType.schema().userTableRowType(commonAncestor);
286
 
        if ((ancestorRowType != sourceRowType) && (ancestorRowType != outputRowType))
 
338
        if ((ancestorRowType != sourceRowType) && (ancestorRowType != outputRowType)) {
287
339
            atts.put(Label.ANCESTOR_TYPE, ancestorRowType.getExplainer(context));
 
340
        }
 
341
        atts.put(Label.PIPELINE, PrimitiveExplainer.getInstance(lookaheadQuantum));
288
342
        return new LookUpOperatorExplainer(getName(), atts, sourceRowType, false, null, context);
289
343
    }
290
344
 
334
388
                    row = inputRow.get();
335
389
                    inputRow.release();
336
390
                } else {
337
 
                    row = cursor.next();
 
391
                    do {
 
392
                        row = cursor.next();
 
393
                    } while ((row != null) && !outputRowTypes.contains(row.rowType()));
338
394
                    if (row == null) {
339
395
                        if (keepInput && !inputPrecedesBranch) {
340
396
                            assert inputRow.isHolding();
395
451
        {
396
452
            super(context, bindingsCursor);
397
453
            this.cursor = adapter().newGroupCursor(group);
398
 
            this.hKey = adapter().newHKey(outputRowType.hKey());
 
454
            this.hKey = adapter().newHKey(outputRowTypes.get(0).hKey());
399
455
        }
400
456
 
401
457
        // For use by this class
416
472
        private ShareHolder<Row> inputRow = new ShareHolder<>();
417
473
        private boolean idle = true;
418
474
    }
 
475
 
 
476
    private class BranchCursor implements BindingsAwareCursor
 
477
    {
 
478
        // BindingsAwareCursor interface
 
479
 
 
480
        @Override
 
481
        public void open() {
 
482
            Row rowFromBindings = bindings.getRow(inputBindingPosition);
 
483
            assert rowFromBindings.rowType() == inputRowType : rowFromBindings;
 
484
            if (inputRowType != sourceRowType) {
 
485
                rowFromBindings = rowFromBindings.subRow(sourceRowType);
 
486
            }
 
487
            computeLookupRowHKey(rowFromBindings);
 
488
            cursor.rebind(hKey, true);
 
489
            cursor.open();
 
490
            inputRow.hold(rowFromBindings);
 
491
        }
 
492
 
 
493
        @Override
 
494
        public Row next() {
 
495
            Row row = null;
 
496
            if (keepInput && inputPrecedesBranch && inputRow.isHolding()) {
 
497
                row = inputRow.get();
 
498
                inputRow.release();
 
499
            } else {
 
500
                do {
 
501
                    row = cursor.next();
 
502
                } while ((row != null) && !outputRowTypes.contains(row.rowType()));
 
503
                if (row == null) {
 
504
                    if (keepInput && !inputPrecedesBranch) {
 
505
                        assert inputRow.isHolding();
 
506
                        row = inputRow.get();
 
507
                        inputRow.release();
 
508
                    }
 
509
                    close();
 
510
                }
 
511
            }
 
512
            return row;
 
513
        }
 
514
 
 
515
        @Override
 
516
        public void jump(Row row, ColumnSelector columnSelector) {
 
517
            cursor.jump(row, columnSelector);
 
518
        }
 
519
 
 
520
        @Override
 
521
        public void close() {
 
522
            inputRow.release();
 
523
            cursor.close();
 
524
        }
 
525
 
 
526
        @Override
 
527
        public void destroy() {
 
528
            close();
 
529
            cursor.destroy();
 
530
        }
 
531
 
 
532
        @Override
 
533
        public boolean isIdle() {
 
534
            return cursor.isIdle();
 
535
        }
 
536
 
 
537
        @Override
 
538
        public boolean isActive() {
 
539
            return cursor.isActive();
 
540
        }
 
541
 
 
542
        @Override
 
543
        public boolean isDestroyed() {
 
544
            return cursor.isDestroyed();
 
545
        }
 
546
        
 
547
        @Override
 
548
        public void rebind(QueryBindings bindings) {
 
549
            this.bindings = bindings;
 
550
        }
 
551
 
 
552
        // BranchCursor interface
 
553
        public BranchCursor(StoreAdapter adapter) {
 
554
            this.cursor = adapter.newGroupCursor(group);
 
555
            this.hKey = adapter.newHKey(outputRowTypes.get(0).hKey());
 
556
        }
 
557
 
 
558
        // For use by this class
 
559
 
 
560
        private void computeLookupRowHKey(Row row)
 
561
        {
 
562
            HKey ancestorHKey = row.ancestorHKey(commonAncestor);
 
563
            ancestorHKey.copyTo(hKey);
 
564
            if (branchRootOrdinal != -1) {
 
565
                hKey.extendWithOrdinal(branchRootOrdinal);
 
566
            }
 
567
        }
 
568
 
 
569
        // Object state
 
570
 
 
571
        private final GroupCursor cursor;
 
572
        private final HKey hKey;
 
573
        private ShareHolder<Row> inputRow = new ShareHolder<>();
 
574
        private QueryBindings bindings;
 
575
    }
 
576
 
 
577
    private class LookaheadExecution extends LookaheadLeafCursor<BranchCursor>
 
578
    {
 
579
        // Cursor interface
 
580
 
 
581
        @Override
 
582
        public void open() {
 
583
            TAP_OPEN.in();
 
584
            try {
 
585
                super.open();
 
586
            } finally {
 
587
                TAP_OPEN.out();
 
588
            }
 
589
        }
 
590
 
 
591
        @Override
 
592
        public Row next() {
 
593
            if (TAP_NEXT_ENABLED) {
 
594
                TAP_NEXT.in();
 
595
            }
 
596
            try {
 
597
                Row row = super.next();
 
598
                if (LOG_EXECUTION) {
 
599
                    LOG.debug("BranchLookup_Nested: yield {}", row);
 
600
                }
 
601
                return row;
 
602
            } finally {
 
603
                if (TAP_NEXT_ENABLED) {
 
604
                    TAP_NEXT.out();
 
605
                }
 
606
            }
 
607
        }
 
608
 
 
609
        // LookaheadLeafCursor interface
 
610
 
 
611
        @Override
 
612
        protected BranchCursor newCursor(QueryContext context, StoreAdapter adapter) {
 
613
            return new BranchCursor(adapter);
 
614
        }
 
615
 
 
616
        @Override
 
617
        protected BranchCursor openACursor(QueryBindings bindings, boolean lookahead) {
 
618
            BranchCursor cursor = super.openACursor(bindings, lookahead);
 
619
            if (LOG_EXECUTION) {
 
620
                LOG.debug("BranchLookup_Nested: open{} using {}", lookahead ? " lookahead" : "", cursor.inputRow.get());
 
621
            }
 
622
            return cursor;
 
623
        }
 
624
        
 
625
        // LookaheadExecution interface
 
626
 
 
627
        LookaheadExecution(QueryContext context, QueryBindingsCursor bindingsCursor, 
 
628
                           StoreAdapter adapter, int quantum) {
 
629
            super(context, bindingsCursor, adapter, quantum);
 
630
        }
 
631
    }
419
632
}