1
diff --git a/src/core/core-default.xml b/src/core/core-default.xml
2
index 8bc3b99..26543bc 100644
3
--- a/src/core/core-default.xml
4
+++ b/src/core/core-default.xml
9
+ <name>fs.ceph.impl</name>
10
+ <value>org.apache.hadoop.fs.ceph.CephFileSystem</value>
11
+ <description>The file system for ceph: uris.</description>
15
<name>fs.har.impl.disable.cache</name>
17
<description>Don't cache 'har' filesystem instances.</description>
18
diff --git a/src/core/org/apache/hadoop/fs/ceph/CephFS.java b/src/core/org/apache/hadoop/fs/ceph/CephFS.java
20
index 0000000..5d51eb2
22
+++ b/src/core/org/apache/hadoop/fs/ceph/CephFS.java
24
+// -*- mode:Java; tab-width:2; c-basic-offset:2; indent-tabs-mode:t -*-
28
+ * Licensed under the Apache License, Version 2.0
29
+ * (the "License"); you may not use this file except in compliance with
30
+ * the License. You may obtain a copy of the License at
32
+ * http://www.apache.org/licenses/LICENSE-2.0
34
+ * Unless required by applicable law or agreed to in writing, software
35
+ * distributed under the License is distributed on an "AS IS" BASIS,
36
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
37
+ * implied. See the License for the specific language governing
38
+ * permissions and limitations under the License.
41
+ * Abstract base class for communicating with a Ceph filesystem and its
42
+ * C++ codebase from Java, or pretending to do so (for unit testing purposes).
43
+ * As only the Ceph package should be using this directly, all methods
46
+package org.apache.hadoop.fs.ceph;
48
+import org.apache.hadoop.conf.Configuration;
50
+abstract class CephFS {
52
+ protected static final int ENOTDIR = 20;
53
+ protected static final int EEXIST = 17;
54
+ protected static final int ENOENT = 2;
57
+ * Performs any necessary setup to allow general use of the filesystem.
59
+ * String argsuments -- a command-line style input of Ceph config params
60
+ * int block_size -- the size in bytes to use for blocks
61
+ * Returns: true on success, false otherwise
63
+ abstract protected boolean ceph_initializeClient(String arguments, int block_size);
66
+ * Returns the current working directory (absolute) as a String
68
+ abstract protected String ceph_getcwd();
71
+ * Changes the working directory.
73
+ * String path: The path (relative or absolute) to switch to
74
+ * Returns: true on success, false otherwise.
76
+ abstract protected boolean ceph_setcwd(String path);
79
+ * Given a path to a directory, removes the directory if empty.
81
+ * jstring j_path: The path (relative or absolute) to the directory
82
+ * Returns: true on successful delete; false otherwise
84
+ abstract protected boolean ceph_rmdir(String path);
87
+ * Given a path, unlinks it.
89
+ * String path: The path (relative or absolute) to the file or empty dir
90
+ * Returns: true if the unlink occurred, false otherwise.
92
+ abstract protected boolean ceph_unlink(String path);
95
+ * Changes a given path name to a new name, assuming new_path doesn't exist.
97
+ * jstring j_from: The path whose name you want to change.
98
+ * jstring j_to: The new name for the path.
99
+ * Returns: true if the rename occurred, false otherwise
101
+ abstract protected boolean ceph_rename(String old_path, String new_path);
104
+ * Returns true if it the input path exists, false
105
+ * if it does not or there is an unexpected failure.
107
+ abstract protected boolean ceph_exists(String path);
110
+ * Get the block size for a given path.
112
+ * String path: The path (relative or absolute) you want
113
+ * the block size for.
114
+ * Returns: block size if the path exists, otherwise a negative number
115
+ * corresponding to the standard C++ error codes (which are positive).
117
+ abstract protected long ceph_getblocksize(String path);
120
+ * Returns true if the given path is a directory, false otherwise.
122
+ abstract protected boolean ceph_isdirectory(String path);
125
+ * Returns true if the given path is a file; false otherwise.
127
+ abstract protected boolean ceph_isfile(String path);
130
+ * Get the contents of a given directory.
132
+ * String path: The path (relative or absolute) to the directory.
133
+ * Returns: A Java String[] of the contents of the directory, or
134
+ * NULL if there is an error (ie, path is not a dir). This listing
135
+ * will not contain . or .. entries.
137
+ abstract protected String[] ceph_getdir(String path);
140
+ * Create the specified directory and any required intermediate ones with the
143
+ abstract protected int ceph_mkdirs(String path, int mode);
146
+ * Open a file to append. If the file does not exist, it will be created.
147
+ * Opening a dir is possible but may have bad results.
149
+ * String path: The path to open.
150
+ * Returns: an int filehandle, or a number<0 if an error occurs.
152
+ abstract protected int ceph_open_for_append(String path);
155
+ * Open a file for reading.
156
+ * Opening a dir is possible but may have bad results.
158
+ * String path: The path to open.
159
+ * Returns: an int filehandle, or a number<0 if an error occurs.
161
+ abstract protected int ceph_open_for_read(String path);
164
+ * Opens a file for overwriting; creates it if necessary.
165
+ * Opening a dir is possible but may have bad results.
167
+ * String path: The path to open.
168
+ * int mode: The mode to open with.
169
+ * Returns: an int filehandle, or a number<0 if an error occurs.
171
+ abstract protected int ceph_open_for_overwrite(String path, int mode);
174
+ * Closes the given file. Returns 0 on success, or a negative
175
+ * error code otherwise.
177
+ abstract protected int ceph_close(int filehandle);
180
+ * Change the mode on a path.
182
+ * String path: The path to change mode on.
183
+ * int mode: The mode to apply.
184
+ * Returns: true if the mode is properly applied, false if there
187
+ abstract protected boolean ceph_setPermission(String path, int mode);
190
+ * Closes the Ceph client. This should be called before shutting down
191
+ * (multiple times is okay but redundant).
193
+ abstract protected boolean ceph_kill_client();
196
+ * Get the statistics on a path returned in a custom format defined
197
+ * in CephFileSystem.
199
+ * String path: The path to stat.
200
+ * Stat fill: The stat object to fill.
201
+ * Returns: true if the stat is successful, false otherwise.
203
+ abstract protected boolean ceph_stat(String path, CephFileSystem.Stat fill);
206
+ * Check how many times a file should be replicated. If it is,
207
+ * degraded it may not actually be replicated this often.
209
+ * int fh: a file descriptor
210
+ * Returns: an int containing the number of times replicated.
212
+ abstract protected int ceph_replication(String path);
215
+ * Find the IP address of the primary OSD for a given file and offset.
217
+ * int fh: The filehandle for the file.
218
+ * long offset: The offset to get the location of.
219
+ * Returns: an array of String of the location as IP, or NULL if there is an error.
221
+ abstract protected String[] ceph_hosts(int fh, long offset);
224
+ * Set the mtime and atime for a given path.
226
+ * String path: The path to set the times for.
227
+ * long mtime: The mtime to set, in millis since epoch (-1 to not set).
228
+ * long atime: The atime to set, in millis since epoch (-1 to not set)
229
+ * Returns: 0 if successful, an error code otherwise.
231
+ abstract protected int ceph_setTimes(String path, long mtime, long atime);
234
+ * Get the current position in a file (as a long) of a given filehandle.
235
+ * Returns: (long) current file position on success, or a
236
+ * negative error code on failure.
238
+ abstract protected long ceph_getpos(int fh);
241
+ * Write the given buffer contents to the given filehandle.
243
+ * int fh: The filehandle to write to.
244
+ * byte[] buffer: The buffer to write from
245
+ * int buffer_offset: The position in the buffer to write from
246
+ * int length: The number of (sequential) bytes to write.
247
+ * Returns: int, on success the number of bytes written, on failure
248
+ * a negative error code.
250
+ abstract protected int ceph_write(int fh, byte[] buffer, int buffer_offset, int length);
253
+ * Reads into the given byte array from the current position.
255
+ * int fh: the filehandle to read from
256
+ * byte[] buffer: the byte array to read into
257
+ * int buffer_offset: where in the buffer to start writing
258
+ * int length: how much to read.
259
+ * There'd better be enough space in the buffer to write all
260
+ * the data from the given offset!
261
+ * Returns: the number of bytes read on success (as an int),
262
+ * or an error code otherwise. */
263
+ abstract protected int ceph_read(int fh, byte[] buffer, int buffer_offset, int length);
266
+ * Seeks to the given position in the given file.
268
+ * int fh: The filehandle to seek in.
269
+ * long pos: The position to seek to.
270
+ * Returns: the new position (as a long) of the filehandle on success,
271
+ * or a negative error code on failure. */
272
+ abstract protected long ceph_seek_from_start(int fh, long pos);
274
diff --git a/src/core/org/apache/hadoop/fs/ceph/CephFaker.java b/src/core/org/apache/hadoop/fs/ceph/CephFaker.java
276
index 0000000..c598f53
278
+++ b/src/core/org/apache/hadoop/fs/ceph/CephFaker.java
280
+// -*- mode:Java; tab-width:2; c-basic-offset:2; indent-tabs-mode:t -*-
284
+ * Licensed under the Apache License, Version 2.0
285
+ * (the "License"); you may not use this file except in compliance with
286
+ * the License. You may obtain a copy of the License at
288
+ * http://www.apache.org/licenses/LICENSE-2.0
290
+ * Unless required by applicable law or agreed to in writing, software
291
+ * distributed under the License is distributed on an "AS IS" BASIS,
292
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
293
+ * implied. See the License for the specific language governing
294
+ * permissions and limitations under the License.
297
+ * This uses the local Filesystem but pretends to be communicating
298
+ * with a Ceph deployment, for unit testing the CephFileSystem.
301
+package org.apache.hadoop.fs.ceph;
304
+import java.net.URI;
305
+import java.util.Hashtable;
306
+import java.io.Closeable;
307
+import java.io.FileNotFoundException;
308
+import java.io.IOException;
310
+import org.apache.commons.logging.Log;
311
+import org.apache.commons.logging.LogFactory;
312
+import org.apache.hadoop.conf.Configuration;
313
+import org.apache.hadoop.fs.BlockLocation;
314
+import org.apache.hadoop.fs.FileStatus;
315
+import org.apache.hadoop.fs.FileSystem;
316
+import org.apache.hadoop.fs.FSDataInputStream;
317
+import org.apache.hadoop.fs.FSDataOutputStream;
318
+import org.apache.hadoop.fs.Path;
319
+import org.apache.hadoop.fs.permission.FsPermission;
322
+class CephFaker extends CephFS {
323
+ private static final Log LOG = LogFactory.getLog(CephFaker.class);
324
+ FileSystem localFS;
325
+ String localPrefix;
327
+ Configuration conf;
328
+ Hashtable<Integer, Object> files;
329
+ Hashtable<Integer, String> filenames;
331
+ boolean initialized = false;
333
+ public CephFaker(Configuration con, Log log) {
335
+ files = new Hashtable<Integer, Object>();
336
+ filenames = new Hashtable<Integer, String>();
339
+ protected boolean ceph_initializeClient(String args, int block_size) {
340
+ if (!initialized) {
341
+ // let's remember the default block_size
342
+ blockSize = block_size;
344
+ /* for a real Ceph deployment, this starts up the client,
345
+ * sets debugging levels, etc. We just need to get the
346
+ * local FileSystem to use, and we'll ignore any
347
+ * command-line arguments. */
349
+ localFS = FileSystem.getLocal(conf);
350
+ localFS.initialize(URI.create("file://localhost"), conf);
351
+ localFS.setVerifyChecksum(false);
352
+ String testDir = conf.get("hadoop.tmp.dir");
354
+ localPrefix = localFS.getWorkingDirectory().toString();
355
+ int testDirLoc = localPrefix.indexOf(testDir) - 1;
357
+ if (-2 == testDirLoc) {
358
+ testDirLoc = localPrefix.length();
360
+ localPrefix = localPrefix.substring(0, testDirLoc) + "/"
361
+ + conf.get("hadoop.tmp.dir");
363
+ localFS.setWorkingDirectory(
364
+ new Path(localPrefix + "/user/" + System.getProperty("user.name")));
365
+ // I don't know why, but the unit tests expect the default
366
+ // working dir to be /user/username, so satisfy them!
367
+ // debug("localPrefix is " + localPrefix, INFO);
368
+ } catch (IOException e) {
371
+ initialized = true;
376
+ protected String ceph_getcwd() {
377
+ return sanitize_path(localFS.getWorkingDirectory().toString());
380
+ protected boolean ceph_setcwd(String path) {
381
+ localFS.setWorkingDirectory(new Path(prepare_path(path)));
385
+ // the caller is responsible for ensuring empty dirs
386
+ protected boolean ceph_rmdir(String pth) {
387
+ Path path = new Path(prepare_path(pth));
388
+ boolean ret = false;
391
+ if (localFS.listStatus(path).length <= 1) {
392
+ ret = localFS.delete(path, true);
394
+ } catch (IOException e) {}
398
+ // this needs to work on (empty) directories too
399
+ protected boolean ceph_unlink(String path) {
400
+ path = prepare_path(path);
401
+ boolean ret = false;
403
+ if (ceph_isdirectory(path)) {
404
+ ret = ceph_rmdir(path);
407
+ ret = localFS.delete(new Path(path), false);
408
+ } catch (IOException e) {}
413
+ protected boolean ceph_rename(String oldName, String newName) {
414
+ oldName = prepare_path(oldName);
415
+ newName = prepare_path(newName);
417
+ Path parent = new Path(newName).getParent();
418
+ Path newPath = new Path(newName);
420
+ if (localFS.exists(parent) && !localFS.exists(newPath)) {
421
+ return localFS.rename(new Path(oldName), newPath);
424
+ } catch (IOException e) {
429
+ protected boolean ceph_exists(String path) {
430
+ path = prepare_path(path);
431
+ boolean ret = false;
434
+ ret = localFS.exists(new Path(path));
435
+ } catch (IOException e) {}
439
+ protected long ceph_getblocksize(String path) {
440
+ path = prepare_path(path);
442
+ FileStatus status = localFS.getFileStatus(new Path(path));
444
+ return status.getBlockSize();
445
+ } catch (FileNotFoundException e) {
446
+ return -CephFS.ENOENT;
447
+ } catch (IOException e) {
448
+ return -1; // just fail generically
452
+ protected boolean ceph_isdirectory(String path) {
453
+ path = prepare_path(path);
455
+ FileStatus status = localFS.getFileStatus(new Path(path));
457
+ return status.isDir();
458
+ } catch (IOException e) {
463
+ protected boolean ceph_isfile(String path) {
464
+ path = prepare_path(path);
465
+ boolean ret = false;
468
+ FileStatus status = localFS.getFileStatus(new Path(path));
470
+ ret = !status.isDir();
471
+ } catch (Exception e) {}
475
+ protected String[] ceph_getdir(String path) {
476
+ path = prepare_path(path);
477
+ if (!ceph_isdirectory(path)) {
481
+ FileStatus[] stats = localFS.listStatus(new Path(path));
482
+ String[] names = new String[stats.length];
485
+ for (int i = 0; i < stats.length; ++i) {
486
+ name = stats[i].getPath().toString();
487
+ names[i] = name.substring(name.lastIndexOf(Path.SEPARATOR) + 1);
490
+ } catch (IOException e) {}
494
+ protected int ceph_mkdirs(String path, int mode) {
495
+ path = prepare_path(path);
496
+ // debug("ceph_mkdirs on " + path, INFO);
498
+ if (localFS.mkdirs(new Path(path), new FsPermission((short) mode))) {
501
+ } catch (IOException e) {}
502
+ if (ceph_isdirectory(path)) { // apparently it already existed
504
+ } else if (ceph_isfile(path)) {
511
+ * Unlike a real Ceph deployment, you can't do opens on a directory.
512
+ * Since that has unpredictable behavior and you shouldn't do it anyway,
515
+ protected int ceph_open_for_append(String path) {
516
+ path = prepare_path(path);
517
+ FSDataOutputStream stream;
520
+ stream = localFS.append(new Path(path));
521
+ files.put(new Integer(fileCount), stream);
522
+ filenames.put(new Integer(fileCount), path);
523
+ return fileCount++;
524
+ } catch (IOException e) {}
525
+ return -1; // failure
528
+ protected int ceph_open_for_read(String path) {
529
+ path = prepare_path(path);
530
+ FSDataInputStream stream;
533
+ stream = localFS.open(new Path(path));
534
+ files.put(new Integer(fileCount), stream);
535
+ filenames.put(new Integer(fileCount), path);
536
+ LOG.info("ceph_open_for_read fh:" + fileCount + ", pathname:" + path);
537
+ return fileCount++;
538
+ } catch (IOException e) {}
539
+ return -1; // failure
542
+ protected int ceph_open_for_overwrite(String path, int mode) {
543
+ path = prepare_path(path);
544
+ FSDataOutputStream stream;
547
+ stream = localFS.create(new Path(path));
548
+ files.put(new Integer(fileCount), stream);
549
+ filenames.put(new Integer(fileCount), path);
550
+ LOG.info("ceph_open_for_overwrite fh:" + fileCount + ", pathname:" + path);
551
+ return fileCount++;
552
+ } catch (IOException e) {}
553
+ return -1; // failure
556
+ protected int ceph_close(int filehandle) {
557
+ LOG.info("ceph_close(filehandle " + filehandle + ")");
559
+ ((Closeable) files.get(new Integer(filehandle))).close();
560
+ if (null == files.get(new Integer(filehandle))) {
561
+ return -ENOENT; // this isn't quite the right error code,
562
+ // but the important part is it's negative
564
+ return 0; // hurray, success
565
+ } catch (NullPointerException ne) {
566
+ LOG.warn("ceph_close caught NullPointerException!" + ne);
568
+ catch (IOException ie) {
569
+ LOG.warn("ceph_close caught IOException!" + ie);
571
+ return -1; // failure
574
+ protected boolean ceph_setPermission(String pth, int mode) {
575
+ pth = prepare_path(pth);
576
+ Path path = new Path(pth);
577
+ boolean ret = false;
580
+ localFS.setPermission(path, new FsPermission((short) mode));
582
+ } catch (IOException e) {}
586
+ // rather than try and match a Ceph deployment's behavior exactly,
587
+ // just make bad things happen if they try and call methods after this
588
+ protected boolean ceph_kill_client() {
589
+ // debug("ceph_kill_client", INFO);
590
+ localFS.setWorkingDirectory(new Path(localPrefix));
591
+ // debug("working dir is now " + localFS.getWorkingDirectory(), INFO);
594
+ } catch (Exception e) {}
601
+ protected boolean ceph_stat(String pth, CephFileSystem.Stat fill) {
602
+ pth = prepare_path(pth);
603
+ Path path = new Path(pth);
604
+ boolean ret = false;
607
+ FileStatus status = localFS.getFileStatus(path);
609
+ fill.size = status.getLen();
610
+ fill.is_dir = status.isDir();
611
+ fill.block_size = status.getBlockSize();
612
+ fill.mod_time = status.getModificationTime();
613
+ fill.access_time = status.getAccessTime();
614
+ fill.mode = status.getPermission().toShort();
616
+ } catch (IOException e) {}
620
+ protected int ceph_replication(String path) {
621
+ path = prepare_path(path);
622
+ int ret = -1; // -1 for failure
625
+ ret = localFS.getFileStatus(new Path(path)).getReplication();
626
+ } catch (IOException e) {}
630
+ protected String[] ceph_hosts(int fh, long offset) {
631
+ String[] ret = null;
634
+ BlockLocation[] locs = localFS.getFileBlockLocations(
635
+ localFS.getFileStatus(new Path(filenames.get(new Integer(fh)))),
638
+ ret = locs[0].getNames();
639
+ } catch (IOException e) {} catch (NullPointerException f) {}
643
+ protected int ceph_setTimes(String pth, long mtime, long atime) {
644
+ pth = prepare_path(pth);
645
+ Path path = new Path(pth);
646
+ int ret = -1; // generic fail
649
+ localFS.setTimes(path, mtime, atime);
651
+ } catch (IOException e) {}
655
+ protected long ceph_getpos(int fh) {
656
+ long ret = -1; // generic fail
659
+ Object stream = files.get(new Integer(fh));
661
+ if (stream instanceof FSDataInputStream) {
662
+ ret = ((FSDataInputStream) stream).getPos();
663
+ } else if (stream instanceof FSDataOutputStream) {
664
+ ret = ((FSDataOutputStream) stream).getPos();
666
+ } catch (IOException e) {} catch (NullPointerException f) {}
670
+ protected int ceph_write(int fh, byte[] buffer,
671
+ int buffer_offset, int length) {
673
+ "ceph_write fh:" + fh + ", buffer_offset:" + buffer_offset + ", length:"
675
+ long ret = -1; // generic fail
678
+ FSDataOutputStream os = (FSDataOutputStream) files.get(new Integer(fh));
680
+ LOG.info("ceph_write got outputstream");
681
+ long startPos = os.getPos();
683
+ os.write(buffer, buffer_offset, length);
684
+ ret = os.getPos() - startPos;
685
+ } catch (IOException e) {
686
+ LOG.warn("ceph_write caught IOException!");
687
+ } catch (NullPointerException f) {
688
+ LOG.warn("ceph_write caught NullPointerException!");
693
+ protected int ceph_read(int fh, byte[] buffer,
694
+ int buffer_offset, int length) {
695
+ long ret = -1; // generic fail
698
+ FSDataInputStream is = (FSDataInputStream) files.get(new Integer(fh));
699
+ long startPos = is.getPos();
701
+ is.read(buffer, buffer_offset, length);
702
+ ret = is.getPos() - startPos;
703
+ } catch (IOException e) {} catch (NullPointerException f) {}
707
+ protected long ceph_seek_from_start(int fh, long pos) {
708
+ LOG.info("ceph_seek_from_start(fh " + fh + ", pos " + pos + ")");
709
+ long ret = -1; // generic fail
712
+ LOG.info("ceph_seek_from_start filename is " + filenames.get(new Integer(fh)));
713
+ if (null == files.get(new Integer(fh))) {
714
+ LOG.warn("ceph_seek_from_start: is is null!");
716
+ FSDataInputStream is = (FSDataInputStream) files.get(new Integer(fh));
718
+ LOG.info("ceph_seek_from_start retrieved is!");
721
+ } catch (IOException e) {
722
+ LOG.warn("ceph_seek_from_start caught IOException!");
723
+ } catch (NullPointerException f) {
724
+ LOG.warn("ceph_seek_from_start caught NullPointerException!");
730
+ * We need to remove the localFS file prefix before returning to Ceph
732
+ private String sanitize_path(String path) {
733
+ // debug("sanitize_path(" + path + ")", INFO);
734
+ /* if (path.startsWith("file:"))
735
+ path = path.substring("file:".length()); */
736
+ if (path.startsWith(localPrefix)) {
737
+ path = path.substring(localPrefix.length());
738
+ if (path.length() == 0) { // it was a root path
742
+ // debug("sanitize_path returning " + path, INFO);
747
+ * If it's an absolute path we need to shove the
748
+ * test dir onto the front as a prefix.
750
+ private String prepare_path(String path) {
751
+ // debug("prepare_path(" + path + ")", INFO);
752
+ if (path.startsWith("/")) {
753
+ path = localPrefix + path;
754
+ } else if (path.equals("..")) {
755
+ if (ceph_getcwd().equals("/")) {
757
+ } // you can't go up past root!
759
+ // debug("prepare_path returning" + path, INFO);
763
diff --git a/src/core/org/apache/hadoop/fs/ceph/CephFileSystem.java b/src/core/org/apache/hadoop/fs/ceph/CephFileSystem.java
765
index 0000000..95f2223
767
+++ b/src/core/org/apache/hadoop/fs/ceph/CephFileSystem.java
769
+// -*- mode:Java; tab-width:2; c-basic-offset:2; indent-tabs-mode:t -*-
773
+ * Licensed under the Apache License, Version 2.0
774
+ * (the "License"); you may not use this file except in compliance with
775
+ * the License. You may obtain a copy of the License at
777
+ * http://www.apache.org/licenses/LICENSE-2.0
779
+ * Unless required by applicable law or agreed to in writing, software
780
+ * distributed under the License is distributed on an "AS IS" BASIS,
781
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
782
+ * implied. See the License for the specific language governing
783
+ * permissions and limitations under the License.
786
+ * Implements the Hadoop FS interfaces to allow applications to store
789
+package org.apache.hadoop.fs.ceph;
792
+import java.io.IOException;
793
+import java.io.FileNotFoundException;
794
+import java.io.OutputStream;
795
+import java.net.URI;
796
+import java.net.InetAddress;
797
+import java.util.EnumSet;
798
+import java.lang.Math;
799
+import java.util.ArrayList;
801
+import org.apache.commons.logging.Log;
802
+import org.apache.commons.logging.LogFactory;
803
+import org.apache.hadoop.conf.Configuration;
804
+import org.apache.hadoop.fs.BlockLocation;
805
+import org.apache.hadoop.fs.FSDataInputStream;
806
+import org.apache.hadoop.fs.FSInputStream;
807
+import org.apache.hadoop.fs.FSDataOutputStream;
808
+import org.apache.hadoop.fs.FileSystem;
809
+import org.apache.hadoop.fs.FileUtil;
810
+import org.apache.hadoop.fs.Path;
811
+import org.apache.hadoop.fs.permission.FsPermission;
812
+import org.apache.hadoop.util.Progressable;
813
+import org.apache.hadoop.fs.FileStatus;
814
+import org.apache.hadoop.net.DNS;
819
+ * A {@link FileSystem} backed by <a href="http://ceph.newdream.net">Ceph.</a>.
820
+ * This will not start a Ceph instance; one must already be running.
822
+ * Configuration of the CephFileSystem is handled via a few Hadoop
823
+ * Configuration properties: <br>
824
+ * fs.ceph.monAddr -- the ip address/port of the monitor to connect to. <br>
825
+ * fs.ceph.libDir -- the directory that libcephfs and libhadoopceph are
826
+ * located in. This assumes Hadoop is being run on a linux-style machine
827
+ * with names like libcephfs.so.
828
+ * fs.ceph.commandLine -- if you prefer you can fill in this property
829
+ * just as you would when starting Ceph up from the command line. Specific
830
+ * properties override any configuration specified here.
832
+ * You can also enable debugging of the CephFileSystem and Ceph itself: <br>
833
+ * fs.ceph.debug -- if 'true' will print out method enter/exit messages,
834
+ * plus a little more.
835
+ * fs.ceph.clientDebug/fs.ceph.messengerDebug -- will print out debugging
836
+ * from the respective Ceph system of at least that importance.
838
+public class CephFileSystem extends FileSystem {
839
+ private static final Log LOG = LogFactory.getLog(CephFileSystem.class);
842
+ private Path workingDir;
843
+ private final Path root;
844
+ private CephFS ceph = null;
846
+ private static String CEPH_NAMESERVER;
847
+ private static final String CEPH_NAMESERVER_KEY = "fs.ceph.nameserver";
848
+ private static final String CEPH_NAMESERVER_DEFAULT = "localhost";
851
+ * Create a new CephFileSystem.
853
+ public CephFileSystem() {
854
+ root = new Path("/");
858
+ * Used for testing purposes, this constructor
859
+ * sets the given CephFS instead of defaulting to a
860
+ * CephTalker (with its assumed real Ceph instance to talk to).
862
+ public CephFileSystem(CephFS ceph_fs) {
864
+ root = new Path("/");
869
+ * Lets you get the URI of this CephFileSystem.
872
+ public URI getUri() {
873
+ LOG.debug("getUri:exit with return " + uri);
878
+ * Should be called after constructing a CephFileSystem but before calling
879
+ * any other methods.
880
+ * Starts up the connection to Ceph, reads in configuraton options, etc.
881
+ * @param uri The URI for this filesystem.
882
+ * @param conf The Hadoop Configuration to retrieve properties from.
883
+ * @throws IOException if necessary properties are unset.
886
+ public void initialize(URI uri, Configuration conf) throws IOException {
887
+ super.initialize(uri, conf);
889
+ this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
890
+ if (ceph == null) {
891
+ ceph = new CephTalker(conf, LOG);
894
+ CEPH_NAMESERVER = conf.get(CEPH_NAMESERVER_KEY, CEPH_NAMESERVER_DEFAULT);
896
+ // build up the arguments for Ceph
897
+ String arguments = "CephFSInterface";
899
+ arguments += conf.get("fs.ceph.commandLine", "");
900
+ if (conf.get("fs.ceph.clientDebug") != null) {
901
+ arguments += " --debug_client ";
902
+ arguments += conf.get("fs.ceph.clientDebug");
904
+ if (conf.get("fs.ceph.messengerDebug") != null) {
905
+ arguments += " --debug_ms ";
906
+ arguments += conf.get("fs.ceph.messengerDebug");
908
+ if (conf.get("fs.ceph.monAddr") != null) {
909
+ arguments += " -m ";
910
+ arguments += conf.get("fs.ceph.monAddr");
912
+ arguments += " --client-readahead-max-periods="
913
+ + conf.get("fs.ceph.readahead", "1");
914
+ // make sure they gave us a ceph monitor address or conf file
915
+ LOG.info("initialize:Ceph initialization arguments: " + arguments);
916
+ if ((conf.get("fs.ceph.monAddr") == null) && (arguments.indexOf("-m") == -1)
917
+ && (arguments.indexOf("-c") == -1)) {
918
+ LOG.fatal("initialize:You need to specify a Ceph monitor address.");
919
+ throw new IOException(
920
+ "You must specify a Ceph monitor address or config file!");
922
+ // Initialize the client
923
+ if (!ceph.ceph_initializeClient(arguments,
924
+ conf.getInt("fs.ceph.blockSize", 1 << 26))) {
925
+ LOG.fatal("initialize:Ceph initialization failed!");
926
+ throw new IOException("Ceph initialization failed!");
928
+ LOG.info("initialize:Ceph initialized client. Setting cwd to /");
929
+ ceph.ceph_setcwd("/");
930
+ LOG.debug("initialize:exit");
932
+ this.workingDir = getHomeDirectory();
936
+ * Close down the CephFileSystem. Runs the base-class close method
937
+ * and then kills the Ceph client itself.
940
+ public void close() throws IOException {
941
+ LOG.debug("close:enter");
942
+ super.close(); // this method does stuff, make sure it's run!
943
+ LOG.trace("close: Calling ceph_kill_client from Java");
944
+ ceph.ceph_kill_client();
945
+ LOG.debug("close:exit");
949
+ * Get an FSDataOutputStream to append onto a file.
950
+ * @param file The File you want to append onto
951
+ * @param bufferSize Ceph does internal buffering but you can buffer in the Java code as well if you like.
952
+ * @param progress The Progressable to report progress to.
953
+ * Reporting is limited but exists.
954
+ * @return An FSDataOutputStream that connects to the file on Ceph.
955
+ * @throws IOException If the file cannot be found or appended to.
957
+ public FSDataOutputStream append(Path file, int bufferSize,
958
+ Progressable progress) throws IOException {
959
+ LOG.debug("append:enter with path " + file + " bufferSize " + bufferSize);
960
+ Path abs_path = makeAbsolute(file);
962
+ if (progress != null) {
963
+ progress.progress();
965
+ LOG.trace("append: Entering ceph_open_for_append from Java");
966
+ int fd = ceph.ceph_open_for_append(getCephPath(abs_path));
968
+ LOG.trace("append: Returned to Java");
969
+ if (progress != null) {
970
+ progress.progress();
972
+ if (fd < 0) { // error in open
973
+ throw new IOException(
974
+ "append: Open for append failed on path \"" + abs_path.toString()
977
+ CephOutputStream cephOStream = new CephOutputStream(getConf(), ceph, fd,
980
+ LOG.debug("append:exit");
981
+ return new FSDataOutputStream(cephOStream, statistics);
985
+ * Get the current working directory for the given file system
986
+ * @return the directory Path
988
+ public Path getWorkingDirectory() {
993
+ * Set the current working directory for the given file system. All relative
994
+ * paths will be resolved relative to it.
996
+ * @param dir The directory to change to.
999
+ public void setWorkingDirectory(Path dir) {
1000
+ workingDir = makeAbsolute(dir);
1004
+ * Return only the path component from a potentially fully qualified path.
1006
+ private String getCephPath(Path path) {
1007
+ if (!path.isAbsolute()) {
1008
+ throw new IllegalArgumentException("Path must be absolute: " + path);
1010
+ return path.toUri().getPath();
1014
+ * Check if a path exists.
1015
+ * Overriden because it's moderately faster than the generic implementation.
1016
+ * @param path The file to check existence on.
1017
+ * @return true if the file exists, false otherwise.
1020
+ public boolean exists(Path path) throws IOException {
1021
+ LOG.debug("exists:enter with path " + path);
1023
+ Path abs_path = makeAbsolute(path);
1025
+ if (abs_path.equals(root)) {
1029
+ "exists:Calling ceph_exists from Java on path " + abs_path.toString());
1030
+ result = ceph.ceph_exists(getCephPath(abs_path));
1031
+ LOG.trace("exists:Returned from ceph_exists to Java");
1033
+ LOG.debug("exists:exit with value " + result);
1038
+ * Create a directory and any nonexistent parents. Any portion
1039
+ * of the directory tree can exist without error.
1040
+ * @param path The directory path to create
1041
+ * @param perms The permissions to apply to the created directories.
1042
+ * @return true if successful, false otherwise
1043
+ * @throws IOException if the path is a child of a file.
1046
+ public boolean mkdirs(Path path, FsPermission perms) throws IOException {
1047
+ LOG.debug("mkdirs:enter with path " + path);
1048
+ Path abs_path = makeAbsolute(path);
1050
+ LOG.trace("mkdirs:calling ceph_mkdirs from Java");
1051
+ int result = ceph.ceph_mkdirs(getCephPath(abs_path), (int) perms.toShort());
1053
+ if (result != 0) {
1055
+ "mkdirs: make directory " + abs_path + "Failing with result " + result);
1056
+ if (-ceph.ENOTDIR == result) {
1057
+ throw new IOException("Parent path is not a directory");
1061
+ LOG.debug("mkdirs:exiting succesfully");
1067
+ * Check if a path is a file. This is moderately faster than the
1068
+ * generic implementation.
1069
+ * @param path The path to check.
1070
+ * @return true if the path is definitely a file, false otherwise.
1073
+ public boolean isFile(Path path) throws IOException {
1074
+ LOG.debug("isFile:enter with path " + path);
1075
+ Path abs_path = makeAbsolute(path);
1078
+ if (abs_path.equals(root)) {
1081
+ LOG.trace("isFile:entering ceph_isfile from Java");
1082
+ result = ceph.ceph_isfile(getCephPath(abs_path));
1084
+ LOG.debug("isFile:exit with result " + result);
1089
+ * Get stat information on a file. This does not fill owner or group, as
1090
+ * Ceph's support for these is a bit different than HDFS'.
1091
+ * @param path The path to stat.
1092
+ * @return FileStatus object containing the stat information.
1093
+ * @throws FileNotFoundException if the path could not be resolved.
1095
+ public FileStatus getFileStatus(Path path) throws IOException {
1096
+ LOG.debug("getFileStatus:enter with path " + path);
1097
+ Path abs_path = makeAbsolute(path);
1098
+ // sadly, Ceph doesn't really do uids/gids just yet, but
1099
+ // everything else is filled
1100
+ FileStatus status;
1101
+ Stat lstat = new Stat();
1103
+ LOG.trace("getFileStatus: calling ceph_stat from Java");
1104
+ if (ceph.ceph_stat(getCephPath(abs_path), lstat)) {
1105
+ status = new FileStatus(lstat.size, lstat.is_dir,
1106
+ ceph.ceph_replication(getCephPath(abs_path)), lstat.block_size,
1107
+ lstat.mod_time, lstat.access_time,
1108
+ new FsPermission((short) lstat.mode), System.getProperty("user.name"), null,
1109
+ path.makeQualified(this));
1110
+ } else { // fail out
1111
+ throw new FileNotFoundException(
1112
+ "org.apache.hadoop.fs.ceph.CephFileSystem: File " + path
1113
+ + " does not exist or could not be accessed");
1116
+ LOG.debug("getFileStatus:exit");
1121
+ * Get the FileStatus for each listing in a directory.
1122
+ * @param path The directory to get listings from.
1123
+ * @return FileStatus[] containing one FileStatus for each directory listing;
1124
+ * null if path does not exist.
1126
+ public FileStatus[] listStatus(Path path) throws IOException {
1127
+ LOG.debug("listStatus:enter with path " + path);
1128
+ Path abs_path = makeAbsolute(path);
1129
+ Path[] paths = listPaths(abs_path);
1131
+ if (paths != null) {
1132
+ FileStatus[] statuses = new FileStatus[paths.length];
1134
+ for (int i = 0; i < paths.length; ++i) {
1135
+ statuses[i] = getFileStatus(paths[i]);
1137
+ LOG.debug("listStatus:exit");
1141
+ if (isFile(path)) {
1142
+ return new FileStatus[] { getFileStatus(path) };
1149
+ public void setPermission(Path p, FsPermission permission) throws IOException {
1151
+ "setPermission:enter with path " + p + " and permissions " + permission);
1152
+ Path abs_path = makeAbsolute(p);
1154
+ LOG.trace("setPermission:calling ceph_setpermission from Java");
1155
+ ceph.ceph_setPermission(getCephPath(abs_path), permission.toShort());
1156
+ LOG.debug("setPermission:exit");
1160
+ * Set access/modification times of a file.
1161
+ * @param p The path
1162
+ * @param mtime Set modification time in number of millis since Jan 1, 1970.
1163
+ * @param atime Set access time in number of millis since Jan 1, 1970.
1166
+ public void setTimes(Path p, long mtime, long atime) throws IOException {
1168
+ "setTimes:enter with path " + p + " mtime:" + mtime + " atime:" + atime);
1169
+ Path abs_path = makeAbsolute(p);
1171
+ LOG.trace("setTimes:calling ceph_setTimes from Java");
1172
+ int r = ceph.ceph_setTimes(getCephPath(abs_path), mtime, atime);
1175
+ throw new IOException(
1176
+ "Failed to set times on path " + abs_path.toString() + " Error code: "
1179
+ LOG.debug("setTimes:exit");
1183
+ * Create a new file and open an FSDataOutputStream that's connected to it.
1184
+ * @param path The file to create.
1185
+ * @param permission The permissions to apply to the file.
1186
+ * @param overwrite If true, overwrite any existing file with
1187
+ * this name; otherwise don't.
1188
+ * @param bufferSize Ceph does internal buffering, but you can buffer
1189
+ * in the Java code too if you like.
1190
+ * @param replication Ignored by Ceph. This can be
1191
+ * configured via Ceph configuration.
1192
+ * @param blockSize Ignored by Ceph. You can set client-wide block sizes
1193
+ * via the fs.ceph.blockSize param if you like.
1194
+ * @param progress A Progressable to report back to.
1195
+ * Reporting is limited but exists.
1196
+ * @return An FSDataOutputStream pointing to the created file.
1197
+ * @throws IOException if the path is an
1198
+ * existing directory, or the path exists but overwrite is false, or there is a
1199
+ * failure in attempting to open for append with Ceph.
1201
+ public FSDataOutputStream create(Path path,
1202
+ FsPermission permission,
1203
+ boolean overwrite,
1205
+ short replication,
1207
+ Progressable progress) throws IOException {
1208
+ LOG.debug("create:enter with path " + path);
1209
+ Path abs_path = makeAbsolute(path);
1211
+ if (progress != null) {
1212
+ progress.progress();
1214
+ // We ignore replication since that's not configurable here, and
1215
+ // progress reporting is quite limited.
1216
+ // Required semantics: if the file exists, overwrite if 'overwrite' is set;
1217
+ // otherwise, throw an exception
1219
+ // Step 1: existence test
1220
+ boolean exists = exists(abs_path);
1223
+ if (getFileStatus(abs_path).isDir()) {
1224
+ throw new IOException(
1225
+ "create: Cannot overwrite existing directory \"" + path.toString()
1226
+ + "\" with a file");
1229
+ throw new IOException(
1230
+ "createRaw: Cannot open existing file \"" + abs_path.toString()
1231
+ + "\" for writing without overwrite flag");
1235
+ if (progress != null) {
1236
+ progress.progress();
1239
+ // Step 2: create any nonexistent directories in the path
1241
+ Path parent = abs_path.getParent();
1243
+ if (parent != null) { // if parent is root, we're done
1244
+ int r = ceph.ceph_mkdirs(getCephPath(parent), permission.toShort());
1246
+ if (!(r == 0 || r == -ceph.EEXIST)) {
1247
+ throw new IOException("Error creating parent directory; code: " + r);
1250
+ if (progress != null) {
1251
+ progress.progress();
1254
+ // Step 3: open the file
1255
+ LOG.trace("calling ceph_open_for_overwrite from Java");
1256
+ int fh = ceph.ceph_open_for_overwrite(getCephPath(abs_path),
1257
+ (int) permission.toShort());
1259
+ if (progress != null) {
1260
+ progress.progress();
1262
+ LOG.trace("Returned from ceph_open_for_overwrite to Java with fh " + fh);
1264
+ throw new IOException(
1265
+ "create: Open for overwrite failed on path \"" + path.toString()
1269
+ // Step 4: create the stream
1270
+ OutputStream cephOStream = new CephOutputStream(getConf(), ceph, fh,
1273
+ LOG.debug("create:exit");
1274
+ return new FSDataOutputStream(cephOStream, statistics);
1278
+ * Open a Ceph file and attach the file handle to an FSDataInputStream.
1279
+ * @param path The file to open
1280
+ * @param bufferSize Ceph does internal buffering; but you can buffer in
1281
+ * the Java code too if you like.
1282
+ * @return FSDataInputStream reading from the given path.
1283
+ * @throws IOException if the path DNE or is a
1284
+ * directory, or there is an error getting data to set up the FSDataInputStream.
1286
+ public FSDataInputStream open(Path path, int bufferSize) throws IOException {
1287
+ LOG.debug("open:enter with path " + path);
1288
+ Path abs_path = makeAbsolute(path);
1290
+ int fh = ceph.ceph_open_for_read(getCephPath(abs_path));
1292
+ if (fh < 0) { // uh-oh, something's bad!
1293
+ if (fh == -ceph.ENOENT) { // well that was a stupid open
1294
+ throw new IOException(
1295
+ "open: absolute path \"" + abs_path.toString()
1296
+ + "\" does not exist");
1297
+ } else { // hrm...the file exists but we can't open it :(
1298
+ throw new IOException("open: Failed to open file " + abs_path.toString());
1302
+ if (getFileStatus(abs_path).isDir()) { // yes, it is possible to open Ceph directories
1303
+ // but that doesn't mean you should in Hadoop!
1304
+ ceph.ceph_close(fh);
1305
+ throw new IOException(
1306
+ "open: absolute path \"" + abs_path.toString() + "\" is a directory!");
1308
+ Stat lstat = new Stat();
1310
+ LOG.trace("open:calling ceph_stat from Java");
1311
+ ceph.ceph_stat(getCephPath(abs_path), lstat);
1312
+ LOG.trace("open:returned to Java");
1313
+ long size = lstat.size;
1316
+ throw new IOException(
1317
+ "Failed to get file size for file " + abs_path.toString()
1318
+ + " but succeeded in opening file. Something bizarre is going on.");
1320
+ FSInputStream cephIStream = new CephInputStream(getConf(), ceph, fh, size,
1323
+ LOG.debug("open:exit");
1324
+ return new FSDataInputStream(cephIStream);
1328
+ * Rename a file or directory.
1329
+ * @param src The current path of the file/directory
1330
+ * @param dst The new name for the path.
1331
+ * @return true if the rename succeeded, false otherwise.
1334
+ public boolean rename(Path src, Path dst) throws IOException {
1335
+ LOG.debug("rename:enter with src:" + src + " and dest:" + dst);
1336
+ Path abs_src = makeAbsolute(src);
1337
+ Path abs_dst = makeAbsolute(dst);
1339
+ LOG.trace("calling ceph_rename from Java");
1340
+ boolean result = ceph.ceph_rename(getCephPath(abs_src), getCephPath(abs_dst));
1343
+ boolean isDir = false;
1345
+ isDir = getFileStatus(abs_dst).isDir();
1346
+ } catch (FileNotFoundException e) {}
1347
+ if (isDir) { // move the srcdir into destdir
1348
+ LOG.debug("ceph_rename failed but dst is a directory!");
1349
+ Path new_dst = new Path(abs_dst, abs_src.getName());
1351
+ result = rename(abs_src, new_dst);
1353
+ "attempt to move " + abs_src.toString() + " to "
1354
+ + new_dst.toString() + "has result:" + result);
1357
+ LOG.debug("rename:exit with result: " + result);
1362
+ * Attempt to convert an IP into its hostname
1364
+ private String[] ips2Hosts(String[] ips) {
1365
+ ArrayList<String> hosts = new ArrayList<String>();
1366
+ for (String ip : ips) {
1368
+ String host = DNS.reverseDns(InetAddress.getByName(ip), CEPH_NAMESERVER);
1369
+ if (host.charAt(host.length()-1) == '.') {
1370
+ host = host.substring(0, host.length()-1);
1372
+ hosts.add(host); /* append */
1373
+ } catch (Exception e) {
1374
+ LOG.error("reverseDns ["+ip+"] failed: "+ e);
1377
+ return hosts.toArray(new String[hosts.size()]);
1381
+ * Get a BlockLocation object for each block in a file.
1383
+ * Note that this doesn't include port numbers in the name field as
1384
+ * Ceph handles slow/down servers internally. This data should be used
1385
+ * only for selecting which servers to run which jobs on.
1387
+ * @param file A FileStatus object corresponding to the file you want locations for.
1388
+ * @param start The offset of the first part of the file you are interested in.
1389
+ * @param len The amount of the file past the offset you are interested in.
1390
+ * @return A BlockLocation[] where each object corresponds to a block within
1391
+ * the given range.
1394
+ public BlockLocation[] getFileBlockLocations(FileStatus file, long start, long len) throws IOException {
1395
+ Path abs_path = makeAbsolute(file.getPath());
1397
+ int fh = ceph.ceph_open_for_read(getCephPath(abs_path));
1399
+ LOG.error("getFileBlockLocations:got error " + fh + ", exiting and returning null!");
1403
+ long blockSize = ceph.ceph_getblocksize(getCephPath(abs_path));
1404
+ BlockLocation[] locations = new BlockLocation[(int) Math.ceil(len / (float) blockSize)];
1406
+ for (int i = 0; i < locations.length; ++i) {
1407
+ long offset = start + i * blockSize;
1408
+ long blockStart = start + i * blockSize - (start % blockSize);
1409
+ String ips[] = ceph.ceph_hosts(fh, offset);
1410
+ String hosts[] = ips2Hosts(ips);
1411
+ locations[i] = new BlockLocation(null, hosts, blockStart, blockSize);
1412
+ LOG.debug("getFileBlockLocations: location[" + i + "]: " + locations[i]);
1415
+ ceph.ceph_close(fh);
1420
+ public boolean delete(Path path) throws IOException {
1421
+ return delete(path, false);
1425
+ * Delete the given path, and optionally its children.
1426
+ * @param path the path to delete.
1427
+ * @param recursive If the path is a non-empty directory and this is false,
1428
+ * delete will throw an IOException. If path is a file this is ignored.
1429
+ * @return true if the delete succeeded, false otherwise (including if
1430
+ * path doesn't exist).
1431
+ * @throws IOException if you attempt to non-recursively delete a directory,
1432
+ * or you attempt to delete the root directory.
1434
+ public boolean delete(Path path, boolean recursive) throws IOException {
1435
+ LOG.debug("delete:enter with path " + path + " and recursive=" + recursive);
1436
+ Path abs_path = makeAbsolute(path);
1439
+ if (abs_path.equals(root)) {
1440
+ throw new IOException("Error: deleting the root directory is a Bad Idea.");
1442
+ if (!exists(abs_path)) {
1446
+ // if the path is a file, try to delete it.
1447
+ if (isFile(abs_path)) {
1448
+ LOG.trace("delete:calling ceph_unlink from Java with path " + abs_path);
1449
+ boolean result = ceph.ceph_unlink(getCephPath(abs_path));
1453
+ "delete: failed to delete file \"" + abs_path.toString() + "\".");
1455
+ LOG.debug("delete:exit with success=" + result);
1459
+ /* The path is a directory, so recursively try to delete its contents,
1460
+ and then delete the directory. */
1461
+ // get the entries; listPaths will remove . and .. for us
1462
+ Path[] contents = listPaths(abs_path);
1464
+ if (contents == null) {
1466
+ "delete: Failed to read contents of directory \""
1467
+ + abs_path.toString() + "\" while trying to delete it, BAILING");
1470
+ if (!recursive && contents.length > 0) {
1471
+ throw new IOException("Directories must be deleted recursively!");
1473
+ // delete the entries
1474
+ LOG.debug("delete: recursively calling delete on contents of " + abs_path);
1475
+ for (Path p : contents) {
1476
+ if (!delete(p, true)) {
1478
+ "delete: Failed to delete file \"" + p.toString()
1479
+ + "\" while recursively deleting \"" + abs_path.toString()
1484
+ // if we've come this far it's a now-empty directory, so delete it!
1485
+ boolean result = ceph.ceph_rmdir(getCephPath(abs_path));
1489
+ "delete: failed to delete \"" + abs_path.toString() + "\", BAILING");
1491
+ LOG.debug("delete:exit");
1496
+ * Returns the default replication value of 1. This may
1497
+ * NOT be the actual value, as replication is controlled
1498
+ * by a separate Ceph configuration.
1501
+ public short getDefaultReplication() {
1506
+ * Get the default block size.
1507
+ * @return the default block size, in bytes, as a long.
1510
+ public long getDefaultBlockSize() {
1511
+ return getConf().getInt("fs.ceph.blockSize", 1 << 26);
1515
+ * Adds the working directory to path if path is not already
1516
+ * an absolute path. The URI scheme is not removed here. It
1517
+ * is removed only when users (e.g. ceph native calls) need
1518
+ * the path-only portion.
1520
+ private Path makeAbsolute(Path path) {
1521
+ if (path.isAbsolute()) {
1524
+ return new Path(workingDir, path);
1527
+ private Path[] listPaths(Path path) throws IOException {
1528
+ LOG.debug("listPaths:enter with path " + path);
1531
+ Path abs_path = makeAbsolute(path);
1533
+ // If it's a directory, get the listing. Otherwise, complain and give up.
1534
+ LOG.debug("calling ceph_getdir from Java with path " + abs_path);
1535
+ dirlist = ceph.ceph_getdir(getCephPath(abs_path));
1536
+ LOG.debug("returning from ceph_getdir to Java");
1538
+ if (dirlist == null) {
1542
+ // convert the strings to Paths
1543
+ Path[] paths = new Path[dirlist.length];
1545
+ for (int i = 0; i < dirlist.length; ++i) {
1547
+ "Raw enumeration of paths in \"" + abs_path.toString() + "\": \""
1548
+ + dirlist[i] + "\"");
1549
+ // convert each listing to an absolute path
1550
+ Path raw_path = new Path(dirlist[i]);
1552
+ if (raw_path.isAbsolute()) {
1553
+ paths[i] = raw_path;
1555
+ paths[i] = new Path(abs_path, raw_path);
1558
+ LOG.debug("listPaths:exit");
1562
+ static class Stat {
1564
+ public boolean is_dir;
1565
+ public long block_size;
1566
+ public long mod_time;
1567
+ public long access_time;
1573
diff --git a/src/core/org/apache/hadoop/fs/ceph/CephInputStream.java b/src/core/org/apache/hadoop/fs/ceph/CephInputStream.java
1574
new file mode 100644
1575
index 0000000..d9668d0
1577
+++ b/src/core/org/apache/hadoop/fs/ceph/CephInputStream.java
1579
+// -*- mode:Java; tab-width:2; c-basic-offset:2; indent-tabs-mode:t -*-
1583
+ * Licensed under the Apache License, Version 2.0
1584
+ * (the "License"); you may not use this file except in compliance with
1585
+ * the License. You may obtain a copy of the License at
1587
+ * http://www.apache.org/licenses/LICENSE-2.0
1589
+ * Unless required by applicable law or agreed to in writing, software
1590
+ * distributed under the License is distributed on an "AS IS" BASIS,
1591
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
1592
+ * implied. See the License for the specific language governing
1593
+ * permissions and limitations under the License.
1596
+ * Implements the Hadoop FS interfaces to allow applications to store
1599
+package org.apache.hadoop.fs.ceph;
1602
+import java.io.IOException;
1604
+import org.apache.commons.logging.Log;
1605
+import org.apache.commons.logging.LogFactory;
1606
+import org.apache.hadoop.conf.Configuration;
1607
+import org.apache.hadoop.fs.FSInputStream;
1612
+ * An {@link FSInputStream} for a CephFileSystem and corresponding
1615
+public class CephInputStream extends FSInputStream {
1616
+ private static final Log LOG = LogFactory.getLog(CephInputStream.class);
1617
+ private boolean closed;
1619
+ private int fileHandle;
1621
+ private long fileLength;
1623
+ private CephFS ceph;
1625
+ private byte[] buffer;
1626
+ private int bufPos = 0;
1627
+ private int bufValid = 0;
1628
+ private long cephPos = 0;
1631
+ * Create a new CephInputStream.
1632
+ * @param conf The system configuration. Unused.
1633
+ * @param fh The filehandle provided by Ceph to reference.
1634
+ * @param flength The current length of the file. If the length changes
1635
+ * you will need to close and re-open it to access the new data.
1637
+ public CephInputStream(Configuration conf, CephFS cephfs,
1638
+ int fh, long flength, int bufferSize) {
1639
+ // Whoever's calling the constructor is responsible for doing the actual ceph_open
1640
+ // call and providing the file handle.
1641
+ fileLength = flength;
1645
+ buffer = new byte[bufferSize];
1647
+ "CephInputStream constructor: initializing stream with fh " + fh
1648
+ + " and file length " + flength);
1652
+ /** Ceph likes things to be closed before it shuts down,
1653
+ * so closing the IOStream stuff voluntarily in a finalizer is good
1655
+ protected void finalize() throws Throwable {
1665
+ private synchronized boolean fillBuffer() throws IOException {
1666
+ bufValid = ceph.ceph_read(fileHandle, buffer, 0, buffer.length);
1668
+ if (bufValid < 0) {
1669
+ int err = bufValid;
1672
+ // attempt to reset to old position. If it fails, too bad.
1673
+ ceph.ceph_seek_from_start(fileHandle, cephPos);
1674
+ throw new IOException("Failed to fill read buffer! Error code:" + err);
1676
+ cephPos += bufValid;
1677
+ return (bufValid != 0);
1681
+ * Get the current position of the stream.
1683
+ public synchronized long getPos() throws IOException {
1684
+ return cephPos - bufValid + bufPos;
1688
+ * Find the number of bytes remaining in the file.
1691
+ public synchronized int available() throws IOException {
1692
+ return (int) (fileLength - getPos());
1695
+ public synchronized void seek(long targetPos) throws IOException {
1697
+ "CephInputStream.seek: Seeking to position " + targetPos + " on fd "
1699
+ if (targetPos > fileLength) {
1700
+ throw new IOException(
1701
+ "CephInputStream.seek: failed seek to position " + targetPos
1702
+ + " on fd " + fileHandle + ": Cannot seek after EOF " + fileLength);
1704
+ long oldPos = cephPos;
1706
+ cephPos = ceph.ceph_seek_from_start(fileHandle, targetPos);
1709
+ if (cephPos < 0) {
1711
+ throw new IOException("Ceph failed to seek to new position!");
1716
+ * Failovers are handled by the Ceph code at a very low level;
1717
+ * if there are issues that can be solved by changing sources
1718
+ * they'll be dealt with before anybody even tries to call this method!
1721
+ public synchronized boolean seekToNewSource(long targetPos) {
1726
+ * Read a byte from the file.
1727
+ * @return the next byte.
1730
+ public synchronized int read() throws IOException {
1732
+ "CephInputStream.read: Reading a single byte from fd " + fileHandle
1733
+ + " by calling general read function");
1735
+ byte result[] = new byte[1];
1737
+ if (getPos() >= fileLength) {
1740
+ if (-1 == read(result, 0, 1)) {
1743
+ if (result[0] < 0) {
1744
+ return 256 + (int) result[0];
1751
+ * Read a specified number of bytes from the file into a byte[].
1752
+ * @param buf the byte array to read into.
1753
+ * @param off the offset to start at in the file
1754
+ * @param len the number of bytes to read
1755
+ * @return 0 if successful, otherwise an error code.
1756
+ * @throws IOException on bad input.
1759
+ public synchronized int read(byte buf[], int off, int len)
1760
+ throws IOException {
1762
+ "CephInputStream.read: Reading " + len + " bytes from fd " + fileHandle);
1765
+ throw new IOException(
1766
+ "CephInputStream.read: cannot read " + len + " bytes from fd "
1767
+ + fileHandle + ": stream closed");
1770
+ // ensure we're not past the end of the file
1771
+ if (getPos() >= fileLength) {
1773
+ "CephInputStream.read: cannot read " + len + " bytes from fd "
1774
+ + fileHandle + ": current position is " + getPos()
1775
+ + " and file length is " + fileLength);
1780
+ int totalRead = 0;
1781
+ int initialLen = len;
1785
+ read = Math.min(len, bufValid - bufPos);
1787
+ System.arraycopy(buffer, bufPos, buf, off, read);
1788
+ } catch (IndexOutOfBoundsException ie) {
1789
+ throw new IOException(
1790
+ "CephInputStream.read: Indices out of bounds:" + "read length is "
1791
+ + len + ", buffer offset is " + off + ", and buffer size is "
1793
+ } catch (ArrayStoreException ae) {
1794
+ throw new IOException(
1795
+ "Uh-oh, CephInputStream failed to do an array"
1796
+ + "copy due to type mismatch...");
1797
+ } catch (NullPointerException ne) {
1798
+ throw new IOException(
1799
+ "CephInputStream.read: cannot read " + len + "bytes from fd:"
1800
+ + fileHandle + ": buf is null");
1805
+ totalRead += read;
1806
+ } while (len > 0 && fillBuffer());
1809
+ "CephInputStream.read: Reading " + initialLen + " bytes from fd "
1810
+ + fileHandle + ": succeeded in reading " + totalRead + " bytes");
1815
+ * Close the CephInputStream and release the associated filehandle.
1818
+ public void close() throws IOException {
1819
+ LOG.trace("CephOutputStream.close:enter");
1821
+ int result = ceph.ceph_close(fileHandle);
1824
+ if (result != 0) {
1825
+ throw new IOException(
1826
+ "Close somehow failed!"
1827
+ + "Don't try and use this stream again, though");
1829
+ LOG.trace("CephOutputStream.close:exit");
1833
diff --git a/src/core/org/apache/hadoop/fs/ceph/CephOutputStream.java b/src/core/org/apache/hadoop/fs/ceph/CephOutputStream.java
1834
new file mode 100644
1835
index 0000000..4c50f88
1837
+++ b/src/core/org/apache/hadoop/fs/ceph/CephOutputStream.java
1839
+// -*- mode:Java; tab-width:2; c-basic-offset:2; indent-tabs-mode:t -*-
1843
+ * Licensed under the Apache License, Version 2.0
1844
+ * (the "License"); you may not use this file except in compliance with
1845
+ * the License. You may obtain a copy of the License at
1847
+ * http://www.apache.org/licenses/LICENSE-2.0
1849
+ * Unless required by applicable law or agreed to in writing, software
1850
+ * distributed under the License is distributed on an "AS IS" BASIS,
1851
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
1852
+ * implied. See the License for the specific language governing
1853
+ * permissions and limitations under the License.
1856
+ * Implements the Hadoop FS interfaces to allow applications to store
1860
+package org.apache.hadoop.fs.ceph;
1863
+import java.io.IOException;
1864
+import java.io.OutputStream;
1866
+import org.apache.commons.logging.Log;
1867
+import org.apache.commons.logging.LogFactory;
1868
+import org.apache.hadoop.conf.Configuration;
1869
+import org.apache.hadoop.util.Progressable;
1874
+ * An {@link OutputStream} for a CephFileSystem and corresponding
1877
+public class CephOutputStream extends OutputStream {
1878
+ private static final Log LOG = LogFactory.getLog(CephOutputStream.class);
1879
+ private boolean closed;
1881
+ private CephFS ceph;
1883
+ private int fileHandle;
1885
+ private byte[] buffer;
1886
+ private int bufUsed = 0;
1889
+ * Construct the CephOutputStream.
1890
+ * @param conf The FileSystem configuration.
1891
+ * @param fh The Ceph filehandle to connect to.
1893
+ public CephOutputStream(Configuration conf, CephFS cephfs,
1894
+ int fh, int bufferSize) {
1898
+ buffer = new byte[bufferSize];
1901
+ /** Ceph likes things to be closed before it shuts down,
1902
+ *so closing the IOStream stuff voluntarily is good
1904
+ protected void finalize() throws Throwable {
1915
+ * Get the current position in the file.
1916
+ * @return The file offset in bytes.
1918
+ public long getPos() throws IOException {
1919
+ return ceph.ceph_getpos(fileHandle);
1924
+ * @param b The byte to write.
1925
+ * @throws IOException If you have closed the CephOutputStream or the
1929
+ public synchronized void write(int b) throws IOException {
1931
+ "CephOutputStream.write: writing a single byte to fd " + fileHandle);
1934
+ throw new IOException(
1935
+ "CephOutputStream.write: cannot write " + "a byte to fd " + fileHandle
1936
+ + ": stream closed");
1938
+ // Stick the byte in a buffer and write it
1939
+ byte buf[] = new byte[1];
1941
+ buf[0] = (byte) b;
1947
+ * Write a byte buffer into the Ceph file.
1948
+ * @param buf the byte array to write from
1949
+ * @param off the position in the file to start writing at.
1950
+ * @param len The number of bytes to actually write.
1951
+ * @throws IOException if you have closed the CephOutputStream, or
1952
+ * if buf is null or off + len > buf.length, or
1953
+ * if the write fails due to a Ceph error.
1956
+ public synchronized void write(byte buf[], int off, int len) throws IOException {
1958
+ "CephOutputStream.write: writing " + len + " bytes to fd " + fileHandle);
1959
+ // make sure stream is open
1961
+ throw new IOException(
1962
+ "CephOutputStream.write: cannot write " + len + "bytes to fd "
1963
+ + fileHandle + ": stream closed");
1970
+ write = Math.min(len, buffer.length - bufUsed);
1972
+ System.arraycopy(buf, off, buffer, bufUsed, write);
1973
+ } catch (IndexOutOfBoundsException ie) {
1974
+ throw new IOException(
1975
+ "CephOutputStream.write: Indices out of bounds: "
1976
+ + "write length is " + len + ", buffer offset is " + off
1977
+ + ", and buffer size is " + buf.length);
1978
+ } catch (ArrayStoreException ae) {
1979
+ throw new IOException(
1980
+ "Uh-oh, CephOutputStream failed to do an array"
1981
+ + " copy due to type mismatch...");
1982
+ } catch (NullPointerException ne) {
1983
+ throw new IOException(
1984
+ "CephOutputStream.write: cannot write " + len + "bytes to fd "
1985
+ + fileHandle + ": buffer is null");
1990
+ if (bufUsed == buffer.length) {
1991
+ result = ceph.ceph_write(fileHandle, buffer, 0, bufUsed);
1993
+ throw new IOException(
1994
+ "CephOutputStream.write: Buffered write of " + bufUsed
1995
+ + " bytes failed!");
1997
+ if (result != bufUsed) {
1998
+ throw new IOException(
1999
+ "CephOutputStream.write: Wrote only " + result + " bytes of "
2000
+ + bufUsed + " in buffer! Data may be lost or written"
2001
+ + " twice to Ceph!");
2011
+ * Flush the buffered data.
2012
+ * @throws IOException if you've closed the stream or the write fails.
2015
+ public synchronized void flush() throws IOException {
2017
+ if (bufUsed == 0) {
2020
+ int result = ceph.ceph_write(fileHandle, buffer, 0, bufUsed);
2023
+ throw new IOException(
2024
+ "CephOutputStream.write: Write of " + bufUsed + "bytes to fd "
2025
+ + fileHandle + " failed");
2027
+ if (result != bufUsed) {
2028
+ throw new IOException(
2029
+ "CephOutputStream.write: Write of " + bufUsed + "bytes to fd "
2030
+ + fileHandle + "was incomplete: only " + result + " of " + bufUsed
2031
+ + " bytes were written.");
2039
+ * Close the CephOutputStream.
2040
+ * @throws IOException if Ceph somehow returns an error. In current code it can't.
2043
+ public synchronized void close() throws IOException {
2044
+ LOG.trace("CephOutputStream.close:enter");
2047
+ int result = ceph.ceph_close(fileHandle);
2049
+ if (result != 0) {
2050
+ throw new IOException("Close failed!");
2054
+ LOG.trace("CephOutputStream.close:exit");
2058
diff --git a/src/core/org/apache/hadoop/fs/ceph/CephTalker.java b/src/core/org/apache/hadoop/fs/ceph/CephTalker.java
2059
new file mode 100644
2060
index 0000000..569652f
2062
+++ b/src/core/org/apache/hadoop/fs/ceph/CephTalker.java
2064
+// -*- mode:Java; tab-width:2; c-basic-offset:2; indent-tabs-mode:t -*-
2068
+ * Licensed under the Apache License, Version 2.0
2069
+ * (the "License"); you may not use this file except in compliance with
2070
+ * the License. You may obtain a copy of the License at
2072
+ * http://www.apache.org/licenses/LICENSE-2.0
2074
+ * Unless required by applicable law or agreed to in writing, software
2075
+ * distributed under the License is distributed on an "AS IS" BASIS,
2076
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
2077
+ * implied. See the License for the specific language governing
2078
+ * permissions and limitations under the License.
2081
+ * Wraps a number of native function calls to communicate with the Ceph
2084
+package org.apache.hadoop.fs.ceph;
2087
+import org.apache.hadoop.conf.Configuration;
2088
+import org.apache.commons.logging.Log;
2091
+class CephTalker extends CephFS {
2092
+ // JNI doesn't give us any way to store pointers, so use a long.
2093
+ // Here we're assuming pointers aren't longer than 8 bytes.
2096
+ // we write a constructor so we can load the libraries
2097
+ public CephTalker(Configuration conf, Log log) {
2098
+ System.load(conf.get("fs.ceph.libDir") + "/libcephfs.so");
2099
+ System.load(conf.get("fs.ceph.libDir") + "/libhadoopcephfs.so");
2103
+ protected native boolean ceph_initializeClient(String arguments, int block_size);
2105
+ protected native String ceph_getcwd();
2107
+ protected native boolean ceph_setcwd(String path);
2109
+ protected native boolean ceph_rmdir(String path);
2111
+ protected native boolean ceph_unlink(String path);
2113
+ protected native boolean ceph_rename(String old_path, String new_path);
2115
+ protected native boolean ceph_exists(String path);
2117
+ protected native long ceph_getblocksize(String path);
2119
+ protected native boolean ceph_isdirectory(String path);
2121
+ protected native boolean ceph_isfile(String path);
2123
+ protected native String[] ceph_getdir(String path);
2125
+ protected native int ceph_mkdirs(String path, int mode);
2127
+ protected native int ceph_open_for_append(String path);
2129
+ protected native int ceph_open_for_read(String path);
2131
+ protected native int ceph_open_for_overwrite(String path, int mode);
2133
+ protected native int ceph_close(int filehandle);
2135
+ protected native boolean ceph_setPermission(String path, int mode);
2137
+ protected native boolean ceph_kill_client();
2139
+ protected native boolean ceph_stat(String path, CephFileSystem.Stat fill);
2141
+ protected native int ceph_replication(String Path);
2143
+ protected native String[] ceph_hosts(int fh, long offset);
2145
+ protected native int ceph_setTimes(String path, long mtime, long atime);
2147
+ protected native long ceph_getpos(int fh);
2149
+ protected native int ceph_write(int fh, byte[] buffer, int buffer_offset, int length);
2151
+ protected native int ceph_read(int fh, byte[] buffer, int buffer_offset, int length);
2153
+ protected native long ceph_seek_from_start(int fh, long pos);
2155
diff --git a/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java b/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java
2156
index 9e22f1f..cd55361 100644
2157
--- a/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java
2158
+++ b/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java
2159
@@ -386,10 +386,12 @@ public class TrackerDistributedCacheManager {
2160
if (modifiedTime != desiredTimestamp) {
2161
DateFormat df = DateFormat.getDateTimeInstance(DateFormat.SHORT,
2164
throw new IOException("The distributed cache object " + source +
2165
" changed during the job from " +
2166
df.format(new Date(desiredTimestamp)) + " to " +
2167
df.format(new Date(modifiedTime)));
2171
Path parchive = null;
2172
diff --git a/src/test/commit-tests b/src/test/commit-tests
2173
index 1148c8b..85fa53d 100644
2174
--- a/src/test/commit-tests
2175
+++ b/src/test/commit-tests
2178
**/TestS3Credentials.java
2179
**/TestS3FileSystem.java
2182
**/TestScriptBasedMapping.java
2183
**/TestSequenceFileSerialization.java
2184
diff --git a/src/test/org/apache/hadoop/fs/ceph/TestCeph.java b/src/test/org/apache/hadoop/fs/ceph/TestCeph.java
2185
new file mode 100644
2186
index 0000000..e46b0ee
2188
+++ b/src/test/org/apache/hadoop/fs/ceph/TestCeph.java
2190
+// -*- mode:Java; tab-width:2; c-basic-offset:2; indent-tabs-mode:t -*-
2193
+ * Licensed to the Apache Software Foundation (ASF) under one
2194
+ * or more contributor license agreements. See the NOTICE file
2195
+ * distributed with this work for additional information
2196
+ * regarding copyright ownership. The ASF licenses this file
2197
+ * to you under the Apache License, Version 2.0 (the
2198
+ * "License"); you may not use this file except in compliance
2199
+ * with the License. You may obtain a copy of the License at
2201
+ * http://www.apache.org/licenses/LICENSE-2.0
2203
+ * Unless required by applicable law or agreed to in writing, software
2204
+ * distributed under the License is distributed on an "AS IS" BASIS,
2205
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
2206
+ * See the License for the specific language governing permissions and
2207
+ * limitations under the License.
2209
+ * Unit tests for the CephFileSystem API implementation.
2212
+package org.apache.hadoop.fs.ceph;
2215
+import java.io.IOException;
2216
+import java.net.URI;
2217
+import org.apache.hadoop.conf.Configuration;
2218
+import org.apache.hadoop.fs.FileSystemContractBaseTest;
2219
+import org.apache.hadoop.fs.FileSystem;
2220
+import org.apache.hadoop.fs.Path;
2223
+public class TestCeph extends FileSystemContractBaseTest {
2226
+ protected void setUp() throws IOException {
2227
+ Configuration conf = new Configuration();
2228
+ CephFaker cephfaker = new CephFaker(conf, FileSystem.LOG);
2229
+ CephFileSystem cephfs = new CephFileSystem(cephfaker);
2231
+ cephfs.initialize(URI.create("ceph://null"), conf);