2
* Copyright 2013 Goldman Sachs.
4
* Licensed under the Apache License, Version 2.0 (the "License");
5
* you may not use this file except in compliance with the License.
6
* You may obtain a copy of the License at
8
* http://www.apache.org/licenses/LICENSE-2.0
10
* Unless required by applicable law or agreed to in writing, software
11
* distributed under the License is distributed on an "AS IS" BASIS,
12
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
* See the License for the specific language governing permissions and
14
* limitations under the License.
17
package com.gs.collections.impl.forkjoin;
19
import java.io.Serializable;
20
import java.util.List;
21
import java.util.concurrent.ArrayBlockingQueue;
22
import java.util.concurrent.BlockingQueue;
23
import java.util.concurrent.ExecutionException;
24
import java.util.concurrent.ForkJoinPool;
25
import java.util.concurrent.ForkJoinTask;
27
import com.gs.collections.api.block.function.Function;
28
import com.gs.collections.api.block.procedure.Procedure;
29
import com.gs.collections.impl.list.mutable.FastList;
30
import com.gs.collections.impl.parallel.Combiner;
31
import com.gs.collections.impl.parallel.ProcedureFactory;
33
public class FJListProcedureRunner<T, PT extends Procedure<? super T>> implements Serializable
35
private static final long serialVersionUID = 1L;
37
private Throwable error;
38
private final Combiner<PT> combiner;
39
private final int taskCount;
40
private final BlockingQueue<PT> outputQueue;
42
public FJListProcedureRunner(Combiner<PT> newCombiner, int taskCount)
44
this.combiner = newCombiner;
45
this.taskCount = taskCount;
46
this.outputQueue = this.combiner.useCombineOne() ? new ArrayBlockingQueue<PT>(taskCount) : null;
49
private FastList<ForkJoinTask<PT>> createAndExecuteTasks(ForkJoinPool executor, ProcedureFactory<PT> procedureFactory, List<T> list)
51
FastList<ForkJoinTask<PT>> tasks = FastList.newList(this.taskCount);
52
int sectionSize = list.size() / this.taskCount;
53
int taskCountMinusOne = this.taskCount - 1;
54
for (int index = 0; index < this.taskCount; index++)
56
ForkJoinTask<PT> task = this.createTask(procedureFactory, list, sectionSize, taskCountMinusOne, index);
58
executor.execute(task);
63
protected FJListProcedureTask<T, PT> createTask(ProcedureFactory<PT> procedureFactory, List<T> list, int sectionSize, int taskCountMinusOne, int index)
65
return new FJListProcedureTask<>(this, procedureFactory, list, index, sectionSize, index == taskCountMinusOne);
68
public void setFailed(Throwable newError)
70
this.error = newError;
73
public void taskCompleted(ForkJoinTask<PT> task)
75
if (this.combiner.useCombineOne())
77
this.outputQueue.add(task.getRawResult());
81
public void executeAndCombine(ForkJoinPool executor, ProcedureFactory<PT> procedureFactory, List<T> list)
83
FastList<ForkJoinTask<PT>> tasks = this.createAndExecuteTasks(executor, procedureFactory, list);
84
if (this.combiner.useCombineOne())
88
if (this.error != null)
90
throw new RuntimeException("One or more parallel tasks failed", this.error);
92
if (!this.combiner.useCombineOne())
94
this.combiner.combineAll(tasks.asLazy().collect(new ProcedureExtractor()));
102
int remainingTaskCount = this.taskCount;
103
while (remainingTaskCount > 0)
105
this.combiner.combineOne(this.outputQueue.take());
106
remainingTaskCount--;
109
catch (InterruptedException e)
111
throw new RuntimeException("Combine failed", e);
115
private final class ProcedureExtractor implements Function<ForkJoinTask<PT>, PT>
117
private static final long serialVersionUID = 1L;
120
public PT valueOf(ForkJoinTask<PT> object)
126
catch (InterruptedException | ExecutionException e)
128
throw new RuntimeException(e);