~psmay/+junk/java-iomux

« back to all changes in this revision

Viewing changes to src/us/wxy/iomux/BytePipe.java

  • Committer: Peter S. May
  • Date: 2013-04-16 02:34:06 UTC
  • Revision ID: peter_s._may_httppsmay.com-20130416023406-07hq7qqw2j26jbhj
BytePipe, and nuances to BlockingByteInput to support it.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*
 
2
 * Copyright (c) 2013 Peter S. May
 
3
 * 
 
4
 * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and
 
5
 * associated documentation files (the "Software"), to deal in the Software without restriction,
 
6
 * including without limitation the rights to use, copy, modify, merge, publish, distribute,
 
7
 * sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is
 
8
 * furnished to do so, subject to the following conditions:
 
9
 * 
 
10
 * The above copyright notice and this permission notice shall be included in all copies or
 
11
 * substantial portions of the Software.
 
12
 * 
 
13
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT
 
14
 * NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
 
15
 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
 
16
 * DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 
17
 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 
18
 */
 
19
 
 
20
package us.wxy.iomux;
 
21
 
 
22
import java.io.IOException;
 
23
import java.io.InputStream;
 
24
import java.io.OutputStream;
 
25
import java.util.concurrent.atomic.AtomicBoolean;
 
26
 
 
27
/**
 
28
 * Combines a non-blocking pair, {@link ByteOutput} and {@link ByteInput}, into a conceptual pipe
 
29
 * that accepts non-blocking writes and produces blocking reads. Non-empty writes signal a blocking
 
30
 * read to unblock. It is possible to close this object for writing; the reader remains open to read
 
31
 * out the remaining buffer, then produces only non-blocked zero-length reads.
 
32
 */
 
33
public class BytePipe implements ByteOutput, ByteIOInput {
 
34
  private final ByteOutput output;
 
35
  private final BlockingByteInput input;
 
36
  private final AtomicBoolean closedForWrite = new AtomicBoolean(false);
 
37
 
 
38
  public BytePipe(ByteOutput output, ByteInput input) {
 
39
    if (output == null || input == null) throw new NullPointerException();
 
40
    this.output = output;
 
41
    this.input = getInput(input);
 
42
  }
 
43
 
 
44
  /**
 
45
   * Signals that no more data will be written to this pipe. The blocking mechanism will no longer
 
46
   * block once this has been called, and any further attempted writes will result in an
 
47
   * {@link IllegalStateException}.
 
48
   * <p>
 
49
   * This method may be called multiple times. The first time, it returns {@code true}; any
 
50
   * subsequent times it is a no-op and returns {@code false}.
 
51
   * 
 
52
   * @return {@code true} if the pipe was not yet closed; {@code false} otherwise
 
53
   */
 
54
  public boolean closeForWrite() {
 
55
    boolean previous = closedForWrite.getAndSet(true);
 
56
    if (!previous) {
 
57
      input.attentionAll();
 
58
    }
 
59
    // True if a change occurred
 
60
    return !previous;
 
61
  }
 
62
 
 
63
  private BlockingByteInput getInput(final ByteInput basis) {
 
64
    return new BlockingByteInput(basis) {
 
65
      @Override
 
66
      public boolean isClosed() {
 
67
        return closedForWrite.get();
 
68
      }
 
69
    };
 
70
  }
 
71
 
 
72
  private void mustBeWriteOpen() {
 
73
    if (closedForWrite.get()) throw new IllegalStateException();
 
74
  }
 
75
 
 
76
  @Override
 
77
  public int putBytes(byte[] src, int offset, int length) {
 
78
    mustBeWriteOpen();
 
79
    return reportPutBytes(output.putBytes(src, offset, length));
 
80
  }
 
81
 
 
82
  @Override
 
83
  public int putBytes(ByteArrayInput src) {
 
84
    mustBeWriteOpen();
 
85
    return reportPutBytes(output.putBytes(src));
 
86
  }
 
87
 
 
88
  @Override
 
89
  public int putBytes(ByteArrayInput src, int length) {
 
90
    mustBeWriteOpen();
 
91
    return reportPutBytes(output.putBytes(src, length));
 
92
  }
 
93
 
 
94
  @Override
 
95
  public int putBytes(InputStream src) throws IOException {
 
96
    mustBeWriteOpen();
 
97
    return reportPutBytes(output.putBytes(src));
 
98
  }
 
99
 
 
100
  @Override
 
101
  public int putBytes(InputStream src, int length) throws IOException {
 
102
    mustBeWriteOpen();
 
103
    return reportPutBytes(output.putBytes(src, length));
 
104
  }
 
105
 
 
106
  @Override
 
107
  public int readBytes(byte[] dest, int offset, int length) throws IOException {
 
108
    return input.readBytes(dest, offset, length);
 
109
  }
 
110
 
 
111
  @Override
 
112
  public int readBytes(ByteArrayOutput dest) throws IOException {
 
113
    return input.readBytes(dest);
 
114
  }
 
115
 
 
116
  @Override
 
117
  public int readBytes(ByteArrayOutput dest, int length) throws IOException {
 
118
    return input.readBytes(dest, length);
 
119
  }
 
120
 
 
121
  @Override
 
122
  public int readBytes(OutputStream dest) throws IOException {
 
123
    return input.readBytes(dest);
 
124
  }
 
125
 
 
126
  @Override
 
127
  public int readBytes(OutputStream dest, int length) throws IOException {
 
128
    return input.readBytes(dest, length);
 
129
  }
 
130
 
 
131
  private int reportPutBytes(int count) {
 
132
    if (count > 0) input.attention();
 
133
    return count;
 
134
  }
 
135
}