~ubuntu-branches/ubuntu/wily/gs-collections/wily

« back to all changes in this revision

Viewing changes to gs-collections-forkjoin/src/main/java/com/gs/collections/impl/forkjoin/FJListProcedureRunner.java

  • Committer: Package Import Robot
  • Author(s): Emmanuel Bourg
  • Date: 2015-07-23 12:42:30 UTC
  • Revision ID: package-import@ubuntu.com-20150723124230-2rjvfv6elyn2m7d4
Tags: upstream-5.1.0
ImportĀ upstreamĀ versionĀ 5.1.0

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*
 
2
 * Copyright 2013 Goldman Sachs.
 
3
 *
 
4
 * Licensed under the Apache License, Version 2.0 (the "License");
 
5
 * you may not use this file except in compliance with the License.
 
6
 * You may obtain a copy of the License at
 
7
 *
 
8
 *     http://www.apache.org/licenses/LICENSE-2.0
 
9
 *
 
10
 * Unless required by applicable law or agreed to in writing, software
 
11
 * distributed under the License is distributed on an "AS IS" BASIS,
 
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
13
 * See the License for the specific language governing permissions and
 
14
 * limitations under the License.
 
15
 */
 
16
 
 
17
package com.gs.collections.impl.forkjoin;
 
18
 
 
19
import java.io.Serializable;
 
20
import java.util.List;
 
21
import java.util.concurrent.ArrayBlockingQueue;
 
22
import java.util.concurrent.BlockingQueue;
 
23
import java.util.concurrent.ExecutionException;
 
24
import java.util.concurrent.ForkJoinPool;
 
25
import java.util.concurrent.ForkJoinTask;
 
26
 
 
27
import com.gs.collections.api.block.function.Function;
 
28
import com.gs.collections.api.block.procedure.Procedure;
 
29
import com.gs.collections.impl.list.mutable.FastList;
 
30
import com.gs.collections.impl.parallel.Combiner;
 
31
import com.gs.collections.impl.parallel.ProcedureFactory;
 
32
 
 
33
public class FJListProcedureRunner<T, PT extends Procedure<? super T>> implements Serializable
 
34
{
 
35
    private static final long serialVersionUID = 1L;
 
36
 
 
37
    private Throwable error;
 
38
    private final Combiner<PT> combiner;
 
39
    private final int taskCount;
 
40
    private final BlockingQueue<PT> outputQueue;
 
41
 
 
42
    public FJListProcedureRunner(Combiner<PT> newCombiner, int taskCount)
 
43
    {
 
44
        this.combiner = newCombiner;
 
45
        this.taskCount = taskCount;
 
46
        this.outputQueue = this.combiner.useCombineOne() ? new ArrayBlockingQueue<PT>(taskCount) : null;
 
47
    }
 
48
 
 
49
    private FastList<ForkJoinTask<PT>> createAndExecuteTasks(ForkJoinPool executor, ProcedureFactory<PT> procedureFactory, List<T> list)
 
50
    {
 
51
        FastList<ForkJoinTask<PT>> tasks = FastList.newList(this.taskCount);
 
52
        int sectionSize = list.size() / this.taskCount;
 
53
        int taskCountMinusOne = this.taskCount - 1;
 
54
        for (int index = 0; index < this.taskCount; index++)
 
55
        {
 
56
            ForkJoinTask<PT> task = this.createTask(procedureFactory, list, sectionSize, taskCountMinusOne, index);
 
57
            tasks.add(task);
 
58
            executor.execute(task);
 
59
        }
 
60
        return tasks;
 
61
    }
 
62
 
 
63
    protected FJListProcedureTask<T, PT> createTask(ProcedureFactory<PT> procedureFactory, List<T> list, int sectionSize, int taskCountMinusOne, int index)
 
64
    {
 
65
        return new FJListProcedureTask<>(this, procedureFactory, list, index, sectionSize, index == taskCountMinusOne);
 
66
    }
 
67
 
 
68
    public void setFailed(Throwable newError)
 
69
    {
 
70
        this.error = newError;
 
71
    }
 
72
 
 
73
    public void taskCompleted(ForkJoinTask<PT> task)
 
74
    {
 
75
        if (this.combiner.useCombineOne())
 
76
        {
 
77
            this.outputQueue.add(task.getRawResult());
 
78
        }
 
79
    }
 
80
 
 
81
    public void executeAndCombine(ForkJoinPool executor, ProcedureFactory<PT> procedureFactory, List<T> list)
 
82
    {
 
83
        FastList<ForkJoinTask<PT>> tasks = this.createAndExecuteTasks(executor, procedureFactory, list);
 
84
        if (this.combiner.useCombineOne())
 
85
        {
 
86
            this.join();
 
87
        }
 
88
        if (this.error != null)
 
89
        {
 
90
            throw new RuntimeException("One or more parallel tasks failed", this.error);
 
91
        }
 
92
        if (!this.combiner.useCombineOne())
 
93
        {
 
94
            this.combiner.combineAll(tasks.asLazy().collect(new ProcedureExtractor()));
 
95
        }
 
96
    }
 
97
 
 
98
    private void join()
 
99
    {
 
100
        try
 
101
        {
 
102
            int remainingTaskCount = this.taskCount;
 
103
            while (remainingTaskCount > 0)
 
104
            {
 
105
                this.combiner.combineOne(this.outputQueue.take());
 
106
                remainingTaskCount--;
 
107
            }
 
108
        }
 
109
        catch (InterruptedException e)
 
110
        {
 
111
            throw new RuntimeException("Combine failed", e);
 
112
        }
 
113
    }
 
114
 
 
115
    private final class ProcedureExtractor implements Function<ForkJoinTask<PT>, PT>
 
116
    {
 
117
        private static final long serialVersionUID = 1L;
 
118
 
 
119
        @Override
 
120
        public PT valueOf(ForkJoinTask<PT> object)
 
121
        {
 
122
            try
 
123
            {
 
124
                return object.get();
 
125
            }
 
126
            catch (InterruptedException | ExecutionException e)
 
127
            {
 
128
                throw new RuntimeException(e);
 
129
            }
 
130
        }
 
131
    }
 
132
}