1
// -*- mode:Java; tab-width:2; c-basic-offset:2; indent-tabs-mode:t -*-
5
* Licensed under the Apache License, Version 2.0
6
* (the "License"); you may not use this file except in compliance with
7
* the License. You may obtain a copy of the License at
9
* http://www.apache.org/licenses/LICENSE-2.0
11
* Unless required by applicable law or agreed to in writing, software
12
* distributed under the License is distributed on an "AS IS" BASIS,
13
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
14
* implied. See the License for the specific language governing
15
* permissions and limitations under the License.
18
* Implements the Hadoop FS interfaces to allow applications to store
22
package org.apache.hadoop.fs.ceph;
25
import java.io.IOException;
26
import java.io.OutputStream;
28
import org.apache.commons.logging.Log;
29
import org.apache.commons.logging.LogFactory;
30
import org.apache.hadoop.conf.Configuration;
31
import org.apache.hadoop.util.Progressable;
36
* An {@link OutputStream} for a CephFileSystem and corresponding
39
public class CephOutputStream extends OutputStream {
40
private static final Log LOG = LogFactory.getLog(CephOutputStream.class);
41
private boolean closed;
45
private int fileHandle;
47
private byte[] buffer;
48
private int bufUsed = 0;
51
* Construct the CephOutputStream.
52
* @param conf The FileSystem configuration.
53
* @param fh The Ceph filehandle to connect to.
55
public CephOutputStream(Configuration conf, CephFS cephfs,
56
int fh, int bufferSize) {
60
buffer = new byte[bufferSize];
63
/** Ceph likes things to be closed before it shuts down,
64
*so closing the IOStream stuff voluntarily is good
66
protected void finalize() throws Throwable {
77
* Get the current position in the file.
78
* @return The file offset in bytes.
80
public long getPos() throws IOException {
81
return ceph.ceph_getpos(fileHandle);
86
* @param b The byte to write.
87
* @throws IOException If you have closed the CephOutputStream or the
91
public synchronized void write(int b) throws IOException {
93
"CephOutputStream.write: writing a single byte to fd " + fileHandle);
96
throw new IOException(
97
"CephOutputStream.write: cannot write " + "a byte to fd " + fileHandle
100
// Stick the byte in a buffer and write it
101
byte buf[] = new byte[1];
109
* Write a byte buffer into the Ceph file.
110
* @param buf the byte array to write from
111
* @param off the position in the file to start writing at.
112
* @param len The number of bytes to actually write.
113
* @throws IOException if you have closed the CephOutputStream, or
114
* if buf is null or off + len > buf.length, or
115
* if the write fails due to a Ceph error.
118
public synchronized void write(byte buf[], int off, int len) throws IOException {
120
"CephOutputStream.write: writing " + len + " bytes to fd " + fileHandle);
121
// make sure stream is open
123
throw new IOException(
124
"CephOutputStream.write: cannot write " + len + "bytes to fd "
125
+ fileHandle + ": stream closed");
132
write = Math.min(len, buffer.length - bufUsed);
134
System.arraycopy(buf, off, buffer, bufUsed, write);
135
} catch (IndexOutOfBoundsException ie) {
136
throw new IOException(
137
"CephOutputStream.write: Indices out of bounds: "
138
+ "write length is " + len + ", buffer offset is " + off
139
+ ", and buffer size is " + buf.length);
140
} catch (ArrayStoreException ae) {
141
throw new IOException(
142
"Uh-oh, CephOutputStream failed to do an array"
143
+ " copy due to type mismatch...");
144
} catch (NullPointerException ne) {
145
throw new IOException(
146
"CephOutputStream.write: cannot write " + len + "bytes to fd "
147
+ fileHandle + ": buffer is null");
152
if (bufUsed == buffer.length) {
153
result = ceph.ceph_write(fileHandle, buffer, 0, bufUsed);
155
throw new IOException(
156
"CephOutputStream.write: Buffered write of " + bufUsed
159
if (result != bufUsed) {
160
throw new IOException(
161
"CephOutputStream.write: Wrote only " + result + " bytes of "
162
+ bufUsed + " in buffer! Data may be lost or written"
163
+ " twice to Ceph!");
173
* Flush the buffered data.
174
* @throws IOException if you've closed the stream or the write fails.
177
public synchronized void flush() throws IOException {
182
int result = ceph.ceph_write(fileHandle, buffer, 0, bufUsed);
185
throw new IOException(
186
"CephOutputStream.write: Write of " + bufUsed + "bytes to fd "
187
+ fileHandle + " failed");
189
if (result != bufUsed) {
190
throw new IOException(
191
"CephOutputStream.write: Write of " + bufUsed + "bytes to fd "
192
+ fileHandle + "was incomplete: only " + result + " of " + bufUsed
193
+ " bytes were written.");
201
* Close the CephOutputStream.
202
* @throws IOException if Ceph somehow returns an error. In current code it can't.
205
public synchronized void close() throws IOException {
206
LOG.trace("CephOutputStream.close:enter");
209
int result = ceph.ceph_close(fileHandle);
212
throw new IOException("Close failed!");
216
LOG.trace("CephOutputStream.close:exit");