~ubuntu-branches/ubuntu/quantal/ceph/quantal

« back to all changes in this revision

Viewing changes to src/client/hadoop/ceph/CephOutputStream.java

  • Committer: Package Import Robot
  • Author(s): James Page
  • Date: 2012-07-16 09:56:24 UTC
  • mfrom: (0.3.11)
  • mto: This revision was merged to the branch mainline in revision 17.
  • Revision ID: package-import@ubuntu.com-20120716095624-azr2w4hbhei1rxmx
Tags: upstream-0.48
ImportĀ upstreamĀ versionĀ 0.48

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
// -*- mode:Java; tab-width:2; c-basic-offset:2; indent-tabs-mode:t -*- 
 
2
 
 
3
/**
 
4
 *
 
5
 * Licensed under the Apache License, Version 2.0
 
6
 * (the "License"); you may not use this file except in compliance with
 
7
 * the License. You may obtain a copy of the License at
 
8
 *
 
9
 * http://www.apache.org/licenses/LICENSE-2.0
 
10
 *
 
11
 * Unless required by applicable law or agreed to in writing, software
 
12
 * distributed under the License is distributed on an "AS IS" BASIS,
 
13
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
 
14
 * implied. See the License for the specific language governing
 
15
 * permissions and limitations under the License.
 
16
 *
 
17
 * 
 
18
 * Implements the Hadoop FS interfaces to allow applications to store
 
19
 * files in Ceph.
 
20
 */
 
21
 
 
22
package org.apache.hadoop.fs.ceph;
 
23
 
 
24
 
 
25
import java.io.IOException;
 
26
import java.io.OutputStream;
 
27
 
 
28
import org.apache.commons.logging.Log;
 
29
import org.apache.commons.logging.LogFactory;
 
30
import org.apache.hadoop.conf.Configuration;
 
31
import org.apache.hadoop.util.Progressable;
 
32
 
 
33
 
 
34
/**
 
35
 * <p>
 
36
 * An {@link OutputStream} for a CephFileSystem and corresponding
 
37
 * Ceph instance.
 
38
 */
 
39
public class CephOutputStream extends OutputStream {
 
40
  private static final Log LOG = LogFactory.getLog(CephOutputStream.class);
 
41
  private boolean closed;
 
42
 
 
43
  private CephFS ceph;
 
44
 
 
45
  private int fileHandle;
 
46
 
 
47
  private byte[] buffer;
 
48
  private int bufUsed = 0;
 
49
 
 
50
  /**
 
51
   * Construct the CephOutputStream.
 
52
   * @param conf The FileSystem configuration.
 
53
   * @param fh The Ceph filehandle to connect to.
 
54
   */
 
55
  public CephOutputStream(Configuration conf, CephFS cephfs,
 
56
      int fh, int bufferSize) {
 
57
    ceph = cephfs;
 
58
    fileHandle = fh;
 
59
    closed = false;
 
60
    buffer = new byte[bufferSize];
 
61
  }
 
62
 
 
63
  /** Ceph likes things to be closed before it shuts down,
 
64
   *so closing the IOStream stuff voluntarily is good
 
65
   */
 
66
  protected void finalize() throws Throwable {
 
67
    try {
 
68
      if (!closed) {
 
69
        close();
 
70
      }
 
71
    } finally {
 
72
      super.finalize();
 
73
    }
 
74
  }
 
75
 
 
76
  /**
 
77
   * Get the current position in the file.
 
78
   * @return The file offset in bytes.
 
79
   */
 
80
  public long getPos() throws IOException {
 
81
    return ceph.ceph_getpos(fileHandle);
 
82
  }
 
83
 
 
84
  /**
 
85
   * Write a byte.
 
86
   * @param b The byte to write.
 
87
   * @throws IOException If you have closed the CephOutputStream or the
 
88
   * write fails.
 
89
   */
 
90
  @Override
 
91
  public synchronized void write(int b) throws IOException {
 
92
    LOG.trace(
 
93
        "CephOutputStream.write: writing a single byte to fd " + fileHandle);
 
94
 
 
95
    if (closed) {
 
96
      throw new IOException(
 
97
          "CephOutputStream.write: cannot write " + "a byte to fd " + fileHandle
 
98
          + ": stream closed");
 
99
    }
 
100
    // Stick the byte in a buffer and write it
 
101
    byte buf[] = new byte[1];
 
102
 
 
103
    buf[0] = (byte) b;    
 
104
    write(buf, 0, 1);
 
105
    return;
 
106
  }
 
107
 
 
108
  /**
 
109
   * Write a byte buffer into the Ceph file.
 
110
   * @param buf the byte array to write from
 
111
   * @param off the position in the file to start writing at.
 
112
   * @param len The number of bytes to actually write.
 
113
   * @throws IOException if you have closed the CephOutputStream, or
 
114
   * if buf is null or off + len > buf.length, or
 
115
   * if the write fails due to a Ceph error.
 
116
   */
 
117
  @Override
 
118
  public synchronized void write(byte buf[], int off, int len) throws IOException {
 
119
    LOG.trace(
 
120
        "CephOutputStream.write: writing " + len + " bytes to fd " + fileHandle);
 
121
    // make sure stream is open
 
122
    if (closed) {
 
123
      throw new IOException(
 
124
          "CephOutputStream.write: cannot write " + len + "bytes to fd "
 
125
          + fileHandle + ": stream closed");
 
126
    }
 
127
                
 
128
    int result;
 
129
    int write;
 
130
 
 
131
    while (len > 0) {
 
132
      write = Math.min(len, buffer.length - bufUsed);
 
133
      try {
 
134
        System.arraycopy(buf, off, buffer, bufUsed, write);
 
135
      } catch (IndexOutOfBoundsException ie) {
 
136
        throw new IOException(
 
137
            "CephOutputStream.write: Indices out of bounds: "
 
138
                + "write length is " + len + ", buffer offset is " + off
 
139
                + ", and buffer size is " + buf.length);
 
140
      } catch (ArrayStoreException ae) {
 
141
        throw new IOException(
 
142
            "Uh-oh, CephOutputStream failed to do an array"
 
143
                + " copy due to type mismatch...");
 
144
      } catch (NullPointerException ne) {
 
145
        throw new IOException(
 
146
            "CephOutputStream.write: cannot write " + len + "bytes to fd "
 
147
            + fileHandle + ": buffer is null");
 
148
      }
 
149
      bufUsed += write;
 
150
      len -= write;
 
151
      off += write;
 
152
      if (bufUsed == buffer.length) {
 
153
        result = ceph.ceph_write(fileHandle, buffer, 0, bufUsed);
 
154
        if (result < 0) {
 
155
          throw new IOException(
 
156
              "CephOutputStream.write: Buffered write of " + bufUsed
 
157
              + " bytes failed!");
 
158
        }
 
159
        if (result != bufUsed) {
 
160
          throw new IOException(
 
161
              "CephOutputStream.write: Wrote only " + result + " bytes of "
 
162
              + bufUsed + " in buffer! Data may be lost or written"
 
163
              + " twice to Ceph!");
 
164
        }
 
165
        bufUsed = 0;
 
166
      }
 
167
 
 
168
    }
 
169
    return; 
 
170
  }
 
171
   
 
172
  /**
 
173
   * Flush the buffered data.
 
174
   * @throws IOException if you've closed the stream or the write fails.
 
175
   */
 
176
  @Override
 
177
  public synchronized void flush() throws IOException {
 
178
    if (!closed) {
 
179
      if (bufUsed == 0) {
 
180
        return;
 
181
      }
 
182
      int result = ceph.ceph_write(fileHandle, buffer, 0, bufUsed);
 
183
 
 
184
      if (result < 0) {
 
185
        throw new IOException(
 
186
            "CephOutputStream.write: Write of " + bufUsed + "bytes to fd "
 
187
            + fileHandle + " failed");
 
188
      }
 
189
      if (result != bufUsed) {
 
190
        throw new IOException(
 
191
            "CephOutputStream.write: Write of " + bufUsed + "bytes to fd "
 
192
            + fileHandle + "was incomplete:  only " + result + " of " + bufUsed
 
193
            + " bytes were written.");
 
194
      }
 
195
      bufUsed = 0;
 
196
      return;
 
197
    }
 
198
  }
 
199
  
 
200
  /**
 
201
   * Close the CephOutputStream.
 
202
   * @throws IOException if Ceph somehow returns an error. In current code it can't.
 
203
   */
 
204
  @Override
 
205
  public synchronized void close() throws IOException {
 
206
    LOG.trace("CephOutputStream.close:enter");
 
207
    if (!closed) {
 
208
      flush();
 
209
      int result = ceph.ceph_close(fileHandle);
 
210
 
 
211
      if (result != 0) {
 
212
        throw new IOException("Close failed!");
 
213
      }
 
214
                                
 
215
      closed = true;
 
216
      LOG.trace("CephOutputStream.close:exit");
 
217
    }
 
218
  }
 
219
}