2
* Copyright (c) 2013 Peter S. May
4
* Permission is hereby granted, free of charge, to any person obtaining a copy of this software and
5
* associated documentation files (the "Software"), to deal in the Software without restriction,
6
* including without limitation the rights to use, copy, modify, merge, publish, distribute,
7
* sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is
8
* furnished to do so, subject to the following conditions:
10
* The above copyright notice and this permission notice shall be included in all copies or
11
* substantial portions of the Software.
13
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT
14
* NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
15
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
16
* DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
17
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
22
import java.io.IOException;
23
import java.io.InputStream;
24
import java.io.OutputStream;
25
import java.util.concurrent.atomic.AtomicBoolean;
28
* Combines a non-blocking pair, {@link ByteOutput} and {@link ByteInput}, into a conceptual pipe
29
* that accepts non-blocking writes and produces blocking reads. Non-empty writes signal a blocking
30
* read to unblock. It is possible to close this object for writing; the reader remains open to read
31
* out the remaining buffer, then produces only non-blocked zero-length reads.
33
public class BytePipe implements ByteOutput, ByteIOInput {
34
private final ByteOutput output;
35
private final BlockingByteInput input;
36
private final AtomicBoolean closedForWrite = new AtomicBoolean(false);
38
public BytePipe(ByteOutput output, ByteInput input) {
39
if (output == null || input == null) throw new NullPointerException();
41
this.input = getInput(input);
45
* Signals that no more data will be written to this pipe. The blocking mechanism will no longer
46
* block once this has been called, and any further attempted writes will result in an
47
* {@link IllegalStateException}.
49
* This method may be called multiple times. The first time, it returns {@code true}; any
50
* subsequent times it is a no-op and returns {@code false}.
52
* @return {@code true} if the pipe was not yet closed; {@code false} otherwise
54
public boolean closeForWrite() {
55
boolean previous = closedForWrite.getAndSet(true);
59
// True if a change occurred
63
private BlockingByteInput getInput(final ByteInput basis) {
64
return new BlockingByteInput(basis) {
66
public boolean isClosed() {
67
return closedForWrite.get();
72
private void mustBeWriteOpen() {
73
if (closedForWrite.get()) throw new IllegalStateException();
77
public int putBytes(byte[] src, int offset, int length) {
79
return reportPutBytes(output.putBytes(src, offset, length));
83
public int putBytes(ByteArrayInput src) {
85
return reportPutBytes(output.putBytes(src));
89
public int putBytes(ByteArrayInput src, int length) {
91
return reportPutBytes(output.putBytes(src, length));
95
public int putBytes(InputStream src) throws IOException {
97
return reportPutBytes(output.putBytes(src));
101
public int putBytes(InputStream src, int length) throws IOException {
103
return reportPutBytes(output.putBytes(src, length));
107
public int readBytes(byte[] dest, int offset, int length) throws IOException {
108
return input.readBytes(dest, offset, length);
112
public int readBytes(ByteArrayOutput dest) throws IOException {
113
return input.readBytes(dest);
117
public int readBytes(ByteArrayOutput dest, int length) throws IOException {
118
return input.readBytes(dest, length);
122
public int readBytes(OutputStream dest) throws IOException {
123
return input.readBytes(dest);
127
public int readBytes(OutputStream dest, int length) throws IOException {
128
return input.readBytes(dest, length);
131
private int reportPutBytes(int count) {
132
if (count > 0) input.attention();