1
//////////////////////////////////////////////////////////////////////
3
// JCSP ("CSP for Java") Libraries //
4
// Copyright (C) 1996-2008 Peter Welch and Paul Austin. //
5
// 2001-2004 Quickstone Technologies Limited. //
7
// This library is free software; you can redistribute it and/or //
8
// modify it under the terms of the GNU Lesser General Public //
9
// License as published by the Free Software Foundation; either //
10
// version 2.1 of the License, or (at your option) any later //
13
// This library is distributed in the hope that it will be //
14
// useful, but WITHOUT ANY WARRANTY; without even the implied //
15
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR //
16
// PURPOSE. See the GNU Lesser General Public License for more //
19
// You should have received a copy of the GNU Lesser General //
20
// Public License along with this library; if not, write to the //
21
// Free Software Foundation, Inc., 59 Temple Place, Suite 330, //
22
// Boston, MA 02111-1307, USA. //
24
// Author contact: P.H.Welch@kent.ac.uk //
27
//////////////////////////////////////////////////////////////////////
29
package org.jcsp.plugNplay;
31
import org.jcsp.lang.*;
34
* <I>Parallel</I> multiplexes its input Object stream array on to one output stream.
35
* <H2>Process Diagram</H2>
36
* <p><img src="doc-files/Paraplex1.gif"></p>
37
* <H2>Description</H2>
38
* <TT>Paraplex</TT> is a process to convert multiple streams of
39
* <TT>Object</TT>s to a single stream. It assumes data will always be available
40
* on all its input streams. In each cycle, it inputs <I>in parallel</I> one
41
* <TT>Object</TT> from each of its inputs, packs them into an array and outputs
42
* that array as a single communication.
44
* The parallel input means that the process will wait until something arrives
45
* from every input channel, accepting items in whatever order they turn up.
46
* The ordering of the channels in the <TT>in</TT> array, therefore, makes
47
* no difference to the functionality of this process.
49
* <B>Caution:</B> the process receiving packets from <TT>Paraplex</TT>
50
* must agree to the following contract:
52
* <I>Input of an array
53
* packet means that previously input arrays must not be looked at any more (neither
54
* by itself nor any other processes to which they may have been passed).</I>
56
* Supporting the above, there is one more rule:
58
* <I>There must be only one process receiving array packets
59
* from <TT>Paraplex</TT> (i.e. its output channel must <I>not</I> be connected
60
* to a {@link org.jcsp.lang.One2AnyChannel} or {@link org.jcsp.lang.Any2AnyChannel}).</I>
62
* The reason for these obligations is to remove the need for <TT>Paraplex</TT>
63
* to generate a <TT>new</TT> array packet for each <I>paraplexed</I> communication
64
* -- an array that will normally be discarded by the receiving process after
65
* dealing with its contents.
66
* Instead of that, <TT>Paraplex</TT> operates a <I>double-buffered</I> protocol,
67
* constructing and reusing just two array packets. It switches between them
68
* after every cycle. In this way, it fills one packet while the process
69
* receiving its output consumes the other one. This is safe so long as that
70
* receiving process agrees to the above rules.
71
* See the <I>Low Level</I> example in {@link org.jcsp.lang.Parallel} for the details
72
* of this implementation.
74
* <I>Note:</I> the above two constraints should work naturally with most applications.
75
* However, by converting the first rule into a protocol where the receiving process
76
* explicitly returns the packet (through an <I>acknowledgment</I> channel)
77
* when it has finished using it and before inputting the next one, the second
78
* rule could be dropped. This is trivial to do by piping the output from
79
* <TT>Paraplex</TT> through a simple cyclic process that inputs a packet,
80
* forwards it (down a {@link org.jcsp.lang.One2AnyChannel} or
81
* {@link org.jcsp.lang.Any2AnyChannel}) and waits for the acknowledgment
82
* (for which only a {@link org.jcsp.lang.One2OneChannel} is needed).
84
* Of course, avoiding uncontrolled sharing of the <TT>Object</TT> passing
85
* through this process is something that must be done. But that is not
86
* the responsibility of this process and must be arranged between the
87
* originator and recipient (or recipients).
89
* <H2>Channel Protocols</H2>
92
* <TH COLSPAN="3">Input Channels</TH>
98
* Most channels in this package carry integers.
102
* <TH COLSPAN="3">Output Channels</TH>
108
* A packet carrying the <I>paraplexed</I> data.
115
* import org.jcsp.lang.*;
116
* import org.jcsp.plugNplay.*;
118
* class ParaplexExample {
120
* public static void main (String[] args) {
122
* final One2OneChannel[] a = Channel.one2oneArray (3);
123
* final One2OneChannel b = Channel.one2one ();
127
* new Numbers (a[0].out ()),
128
* new Squares (a[1].out ()),
129
* new Fibonacci (a[2].out ()),
130
* new Paraplex (Channel.getInputArray (a), b.out ()),
132
* public void run () {
133
* ChannelInput in = b.in ();
134
* System.out.println ("\n\t\tNumbers\t\tSquares\t\tFibonacci\n");
136
* Object[] data = (Object[]) in.read ();
137
* for (int i = 0; i < data.length; i++) {
138
* System.out.print ("\t\t" + data[i]);
140
* System.out.println ();
151
* @see org.jcsp.plugNplay.Deparaplex
152
* @see org.jcsp.plugNplay.Multiplex
153
* @see org.jcsp.plugNplay.Demultiplex
158
public final class Paraplex implements CSProcess
160
/** The input channels */
161
private final ChannelInput[] in;
163
/** The output channel */
164
private final ChannelOutput out;
167
* Construct a new Paraplex process with the input Channel in and the output
168
* Channels out. The ordering of the Channels in the in array make
169
* no difference to the functionality of this process.
171
* @param in the input channels
172
* @param out the output channel
174
public Paraplex(final ChannelInput[] in, final ChannelOutput out)
181
* The main body of this process.
185
final ProcessRead[] inputProcess = new ProcessRead[in.length];
186
for (int i = 0; i < in.length; i++)
187
inputProcess[i] = new ProcessRead(in[i]);
188
Parallel parInput = new Parallel(inputProcess);
190
Object[][] data = new Object[2][in.length];
195
Object[] buffer = data[index];
196
for (int i = 0; i < in.length; i++)
197
buffer[i] = inputProcess[i].value;