~ubuntu-branches/ubuntu/quantal/ceph/quantal

« back to all changes in this revision

Viewing changes to src/client/hadoop/HADOOP-ceph.patch

  • Committer: Package Import Robot
  • Author(s): James Page
  • Date: 2012-07-16 09:56:24 UTC
  • mfrom: (0.3.11)
  • mto: This revision was merged to the branch mainline in revision 17.
  • Revision ID: package-import@ubuntu.com-20120716095624-azr2w4hbhei1rxmx
Tags: upstream-0.48
ImportĀ upstreamĀ versionĀ 0.48

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
diff --git a/src/core/core-default.xml b/src/core/core-default.xml
 
2
index 8bc3b99..26543bc 100644
 
3
--- a/src/core/core-default.xml
 
4
+++ b/src/core/core-default.xml
 
5
@@ -210,6 +210,12 @@
 
6
 </property>
 
7
 
 
8
 <property>
 
9
+  <name>fs.ceph.impl</name>
 
10
+  <value>org.apache.hadoop.fs.ceph.CephFileSystem</value>
 
11
+  <description>The file system for ceph: uris.</description>
 
12
+</property>
 
13
+
 
14
+<property>
 
15
   <name>fs.har.impl.disable.cache</name>
 
16
   <value>true</value>
 
17
   <description>Don't cache 'har' filesystem instances.</description>
 
18
diff --git a/src/core/org/apache/hadoop/fs/ceph/CephFS.java b/src/core/org/apache/hadoop/fs/ceph/CephFS.java
 
19
new file mode 100644
 
20
index 0000000..5d51eb2
 
21
--- /dev/null
 
22
+++ b/src/core/org/apache/hadoop/fs/ceph/CephFS.java
 
23
@@ -0,0 +1,250 @@
 
24
+// -*- mode:Java; tab-width:2; c-basic-offset:2; indent-tabs-mode:t -*- 
 
25
+
 
26
+/**
 
27
+ *
 
28
+ * Licensed under the Apache License, Version 2.0
 
29
+ * (the "License"); you may not use this file except in compliance with
 
30
+ * the License. You may obtain a copy of the License at
 
31
+ *
 
32
+ * http://www.apache.org/licenses/LICENSE-2.0
 
33
+ *
 
34
+ * Unless required by applicable law or agreed to in writing, software
 
35
+ * distributed under the License is distributed on an "AS IS" BASIS,
 
36
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
 
37
+ * implied. See the License for the specific language governing
 
38
+ * permissions and limitations under the License.
 
39
+ *
 
40
+ * 
 
41
+ * Abstract base class for communicating with a Ceph filesystem and its
 
42
+ * C++ codebase from Java, or pretending to do so (for unit testing purposes).
 
43
+ * As only the Ceph package should be using this directly, all methods
 
44
+ * are protected.
 
45
+ */
 
46
+package org.apache.hadoop.fs.ceph;
 
47
+
 
48
+import org.apache.hadoop.conf.Configuration;
 
49
+
 
50
+abstract class CephFS {
 
51
+
 
52
+  protected static final int ENOTDIR = 20;
 
53
+  protected static final int EEXIST = 17;
 
54
+  protected static final int ENOENT = 2;
 
55
+
 
56
+  /*
 
57
+   * Performs any necessary setup to allow general use of the filesystem.
 
58
+   * Inputs:
 
59
+   *  String argsuments -- a command-line style input of Ceph config params
 
60
+   *  int block_size -- the size in bytes to use for blocks
 
61
+   * Returns: true on success, false otherwise
 
62
+   */
 
63
+  abstract protected boolean ceph_initializeClient(String arguments, int block_size);
 
64
+       
 
65
+  /*
 
66
+   * Returns the current working directory (absolute) as a String
 
67
+   */
 
68
+  abstract protected String ceph_getcwd();
 
69
+
 
70
+  /*
 
71
+   * Changes the working directory.
 
72
+   * Inputs:
 
73
+   *  String path: The path (relative or absolute) to switch to
 
74
+   * Returns: true on success, false otherwise.
 
75
+   */
 
76
+  abstract protected boolean ceph_setcwd(String path);
 
77
+
 
78
+  /*
 
79
+   * Given a path to a directory, removes the directory if empty.
 
80
+   * Inputs:
 
81
+   *  jstring j_path: The path (relative or absolute) to the directory
 
82
+   * Returns: true on successful delete; false otherwise
 
83
+   */
 
84
+  abstract protected boolean ceph_rmdir(String path);
 
85
+
 
86
+  /*
 
87
+   * Given a path, unlinks it.
 
88
+   * Inputs:
 
89
+   *  String path: The path (relative or absolute) to the file or empty dir
 
90
+   * Returns: true if the unlink occurred, false otherwise.
 
91
+   */
 
92
+  abstract protected boolean ceph_unlink(String path);
 
93
+
 
94
+  /*
 
95
+   * Changes a given path name to a new name, assuming new_path doesn't exist.
 
96
+   * Inputs:
 
97
+   *  jstring j_from: The path whose name you want to change.
 
98
+   *  jstring j_to: The new name for the path.
 
99
+   * Returns: true if the rename occurred, false otherwise
 
100
+   */
 
101
+  abstract protected boolean ceph_rename(String old_path, String new_path);
 
102
+
 
103
+  /*
 
104
+   * Returns true if it the input path exists, false
 
105
+   * if it does not or there is an unexpected failure.
 
106
+   */
 
107
+  abstract protected boolean ceph_exists(String path);
 
108
+
 
109
+  /*
 
110
+   * Get the block size for a given path.
 
111
+   * Input:
 
112
+   *  String path: The path (relative or absolute) you want
 
113
+   *  the block size for.
 
114
+   * Returns: block size if the path exists, otherwise a negative number
 
115
+   *  corresponding to the standard C++ error codes (which are positive).
 
116
+   */
 
117
+  abstract protected long ceph_getblocksize(String path);
 
118
+
 
119
+  /*
 
120
+   * Returns true if the given path is a directory, false otherwise.
 
121
+   */
 
122
+  abstract protected boolean ceph_isdirectory(String path);
 
123
+
 
124
+  /*
 
125
+   * Returns true if the given path is a file; false otherwise.
 
126
+   */
 
127
+  abstract protected boolean ceph_isfile(String path);
 
128
+
 
129
+  /*
 
130
+   * Get the contents of a given directory.
 
131
+   * Inputs:
 
132
+   *  String path: The path (relative or absolute) to the directory.
 
133
+   * Returns: A Java String[] of the contents of the directory, or
 
134
+   *  NULL if there is an error (ie, path is not a dir). This listing
 
135
+   *  will not contain . or .. entries.
 
136
+   */
 
137
+  abstract protected String[] ceph_getdir(String path);
 
138
+
 
139
+  /*
 
140
+   * Create the specified directory and any required intermediate ones with the
 
141
+   * given mode.
 
142
+   */
 
143
+  abstract protected int ceph_mkdirs(String path, int mode);
 
144
+
 
145
+  /*
 
146
+   * Open a file to append. If the file does not exist, it will be created.
 
147
+   * Opening a dir is possible but may have bad results.
 
148
+   * Inputs:
 
149
+   *  String path: The path to open.
 
150
+   * Returns: an int filehandle, or a number<0 if an error occurs.
 
151
+   */
 
152
+  abstract protected int ceph_open_for_append(String path);
 
153
+
 
154
+  /*
 
155
+   * Open a file for reading.
 
156
+   * Opening a dir is possible but may have bad results.
 
157
+   * Inputs:
 
158
+   *  String path: The path to open.
 
159
+   * Returns: an int filehandle, or a number<0 if an error occurs.
 
160
+   */
 
161
+  abstract protected int ceph_open_for_read(String path);
 
162
+
 
163
+  /*
 
164
+   * Opens a file for overwriting; creates it if necessary.
 
165
+   * Opening a dir is possible but may have bad results.
 
166
+   * Inputs:
 
167
+   *  String path: The path to open.
 
168
+   *  int mode: The mode to open with.
 
169
+   * Returns: an int filehandle, or a number<0 if an error occurs.
 
170
+   */
 
171
+  abstract protected int ceph_open_for_overwrite(String path, int mode);
 
172
+
 
173
+  /*
 
174
+   * Closes the given file. Returns 0 on success, or a negative
 
175
+   * error code otherwise.
 
176
+   */
 
177
+  abstract protected int ceph_close(int filehandle);
 
178
+
 
179
+  /*
 
180
+   * Change the mode on a path.
 
181
+   * Inputs:
 
182
+   *  String path: The path to change mode on.
 
183
+   *  int mode: The mode to apply.
 
184
+   * Returns: true if the mode is properly applied, false if there
 
185
+   *  is any error.
 
186
+   */
 
187
+  abstract protected boolean ceph_setPermission(String path, int mode);
 
188
+
 
189
+  /*
 
190
+   * Closes the Ceph client. This should be called before shutting down
 
191
+   * (multiple times is okay but redundant).
 
192
+   */
 
193
+  abstract protected boolean ceph_kill_client();
 
194
+
 
195
+  /*
 
196
+   * Get the statistics on a path returned in a custom format defined
 
197
+   * in CephFileSystem.
 
198
+   * Inputs:
 
199
+   *  String path: The path to stat.
 
200
+   *  Stat fill: The stat object to fill.
 
201
+   * Returns: true if the stat is successful, false otherwise.
 
202
+   */
 
203
+  abstract protected boolean ceph_stat(String path, CephFileSystem.Stat fill);
 
204
+
 
205
+  /*
 
206
+   * Check how many times a file should be replicated. If it is,
 
207
+   * degraded it may not actually be replicated this often.
 
208
+   * Inputs:
 
209
+   *  int fh: a file descriptor
 
210
+   * Returns: an int containing the number of times replicated.
 
211
+   */
 
212
+  abstract protected int ceph_replication(String path);
 
213
+
 
214
+  /*
 
215
+   * Find the IP address of the primary OSD for a given file and offset.
 
216
+   * Inputs:
 
217
+   *  int fh: The filehandle for the file.
 
218
+   *  long offset: The offset to get the location of.
 
219
+   * Returns: an array of String of the location as IP, or NULL if there is an error.
 
220
+   */
 
221
+  abstract protected String[] ceph_hosts(int fh, long offset);
 
222
+
 
223
+  /*
 
224
+   * Set the mtime and atime for a given path.
 
225
+   * Inputs:
 
226
+   *  String path: The path to set the times for.
 
227
+   *  long mtime: The mtime to set, in millis since epoch (-1 to not set).
 
228
+   *  long atime: The atime to set, in millis since epoch (-1 to not set)
 
229
+   * Returns: 0 if successful, an error code otherwise.
 
230
+   */
 
231
+  abstract protected int ceph_setTimes(String path, long mtime, long atime);
 
232
+
 
233
+  /*
 
234
+   * Get the current position in a file (as a long) of a given filehandle.
 
235
+   * Returns: (long) current file position on success, or a
 
236
+   *  negative error code on failure.
 
237
+   */
 
238
+  abstract protected long ceph_getpos(int fh);
 
239
+
 
240
+  /*
 
241
+   * Write the given buffer contents to the given filehandle.
 
242
+   * Inputs:
 
243
+   *  int fh: The filehandle to write to.
 
244
+   *  byte[] buffer: The buffer to write from
 
245
+   *  int buffer_offset: The position in the buffer to write from
 
246
+   *  int length: The number of (sequential) bytes to write.
 
247
+   * Returns: int, on success the number of bytes written, on failure
 
248
+   *  a negative error code.
 
249
+   */
 
250
+  abstract protected int ceph_write(int fh, byte[] buffer, int buffer_offset, int length);
 
251
+
 
252
+  /*
 
253
+   * Reads into the given byte array from the current position.
 
254
+   * Inputs:
 
255
+   *  int fh: the filehandle to read from
 
256
+   *  byte[] buffer: the byte array to read into
 
257
+   *  int buffer_offset: where in the buffer to start writing
 
258
+   *  int length: how much to read.
 
259
+   * There'd better be enough space in the buffer to write all
 
260
+   * the data from the given offset!
 
261
+   * Returns: the number of bytes read on success (as an int),
 
262
+   *  or an error code otherwise.       */
 
263
+  abstract protected int ceph_read(int fh, byte[] buffer, int buffer_offset, int length);
 
264
+
 
265
+  /*
 
266
+   * Seeks to the given position in the given file.
 
267
+   * Inputs:
 
268
+   *  int fh: The filehandle to seek in.
 
269
+   *  long pos: The position to seek to.
 
270
+   * Returns: the new position (as a long) of the filehandle on success,
 
271
+   *  or a negative error code on failure.      */
 
272
+  abstract protected long ceph_seek_from_start(int fh, long pos);
 
273
+}
 
274
diff --git a/src/core/org/apache/hadoop/fs/ceph/CephFaker.java b/src/core/org/apache/hadoop/fs/ceph/CephFaker.java
 
275
new file mode 100644
 
276
index 0000000..c598f53
 
277
--- /dev/null
 
278
+++ b/src/core/org/apache/hadoop/fs/ceph/CephFaker.java
 
279
@@ -0,0 +1,483 @@
 
280
+// -*- mode:Java; tab-width:2; c-basic-offset:2; indent-tabs-mode:t -*- 
 
281
+
 
282
+/**
 
283
+ *
 
284
+ * Licensed under the Apache License, Version 2.0
 
285
+ * (the "License"); you may not use this file except in compliance with
 
286
+ * the License. You may obtain a copy of the License at
 
287
+ *
 
288
+ * http://www.apache.org/licenses/LICENSE-2.0
 
289
+ *
 
290
+ * Unless required by applicable law or agreed to in writing, software
 
291
+ * distributed under the License is distributed on an "AS IS" BASIS,
 
292
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
 
293
+ * implied. See the License for the specific language governing
 
294
+ * permissions and limitations under the License.
 
295
+ *
 
296
+ * 
 
297
+ * This uses the local Filesystem but pretends to be communicating
 
298
+ * with a Ceph deployment, for unit testing the CephFileSystem.
 
299
+ */
 
300
+
 
301
+package org.apache.hadoop.fs.ceph;
 
302
+
 
303
+
 
304
+import java.net.URI;
 
305
+import java.util.Hashtable;
 
306
+import java.io.Closeable;
 
307
+import java.io.FileNotFoundException;
 
308
+import java.io.IOException;
 
309
+
 
310
+import org.apache.commons.logging.Log;
 
311
+import org.apache.commons.logging.LogFactory;
 
312
+import org.apache.hadoop.conf.Configuration;
 
313
+import org.apache.hadoop.fs.BlockLocation;
 
314
+import org.apache.hadoop.fs.FileStatus;
 
315
+import org.apache.hadoop.fs.FileSystem;
 
316
+import org.apache.hadoop.fs.FSDataInputStream;
 
317
+import org.apache.hadoop.fs.FSDataOutputStream;
 
318
+import org.apache.hadoop.fs.Path;
 
319
+import org.apache.hadoop.fs.permission.FsPermission;
 
320
+
 
321
+
 
322
+class CephFaker extends CephFS {
 
323
+  private static final Log LOG = LogFactory.getLog(CephFaker.class);
 
324
+  FileSystem localFS;
 
325
+  String localPrefix;
 
326
+  int blockSize;
 
327
+  Configuration conf;
 
328
+  Hashtable<Integer, Object> files;
 
329
+  Hashtable<Integer, String> filenames;
 
330
+  int fileCount = 0;
 
331
+  boolean initialized = false;
 
332
+       
 
333
+  public CephFaker(Configuration con, Log log) {
 
334
+    conf = con;
 
335
+    files = new Hashtable<Integer, Object>();
 
336
+    filenames = new Hashtable<Integer, String>();
 
337
+  }
 
338
+       
 
339
+  protected boolean ceph_initializeClient(String args, int block_size) {
 
340
+    if (!initialized) {
 
341
+      // let's remember the default block_size
 
342
+      blockSize = block_size;
 
343
+
 
344
+      /* for a real Ceph deployment, this starts up the client, 
 
345
+       * sets debugging levels, etc. We just need to get the
 
346
+       * local FileSystem to use, and we'll ignore any
 
347
+       * command-line arguments. */
 
348
+      try {
 
349
+        localFS = FileSystem.getLocal(conf);
 
350
+        localFS.initialize(URI.create("file://localhost"), conf);
 
351
+        localFS.setVerifyChecksum(false);
 
352
+        String testDir = conf.get("hadoop.tmp.dir");
 
353
+
 
354
+        localPrefix = localFS.getWorkingDirectory().toString();
 
355
+        int testDirLoc = localPrefix.indexOf(testDir) - 1;
 
356
+
 
357
+        if (-2 == testDirLoc) {
 
358
+          testDirLoc = localPrefix.length();
 
359
+        }
 
360
+        localPrefix = localPrefix.substring(0, testDirLoc) + "/"
 
361
+            + conf.get("hadoop.tmp.dir");
 
362
+
 
363
+        localFS.setWorkingDirectory(
 
364
+            new Path(localPrefix + "/user/" + System.getProperty("user.name")));
 
365
+        // I don't know why, but the unit tests expect the default
 
366
+        // working dir to be /user/username, so satisfy them!
 
367
+        // debug("localPrefix is " + localPrefix, INFO);
 
368
+      } catch (IOException e) {
 
369
+        return false;
 
370
+      }
 
371
+      initialized = true;
 
372
+    }
 
373
+    return true;
 
374
+  }
 
375
+
 
376
+  protected String ceph_getcwd() {
 
377
+    return sanitize_path(localFS.getWorkingDirectory().toString());
 
378
+  }
 
379
+
 
380
+  protected boolean ceph_setcwd(String path) {
 
381
+    localFS.setWorkingDirectory(new Path(prepare_path(path)));
 
382
+    return true;
 
383
+  }
 
384
+
 
385
+  // the caller is responsible for ensuring empty dirs
 
386
+  protected boolean ceph_rmdir(String pth) {
 
387
+    Path path = new Path(prepare_path(pth));
 
388
+    boolean ret = false;
 
389
+
 
390
+    try {
 
391
+      if (localFS.listStatus(path).length <= 1) {
 
392
+        ret = localFS.delete(path, true);
 
393
+      }
 
394
+    } catch (IOException e) {}
 
395
+    return ret;
 
396
+  }
 
397
+
 
398
+  // this needs to work on (empty) directories too
 
399
+  protected boolean ceph_unlink(String path) {
 
400
+    path = prepare_path(path);
 
401
+    boolean ret = false;
 
402
+
 
403
+    if (ceph_isdirectory(path)) {
 
404
+      ret = ceph_rmdir(path);
 
405
+    } else {
 
406
+      try {
 
407
+        ret = localFS.delete(new Path(path), false);
 
408
+      } catch (IOException e) {}
 
409
+    }
 
410
+    return ret;
 
411
+  }
 
412
+
 
413
+  protected boolean ceph_rename(String oldName, String newName) {
 
414
+    oldName = prepare_path(oldName);
 
415
+    newName = prepare_path(newName);
 
416
+    try {
 
417
+      Path parent = new Path(newName).getParent();
 
418
+      Path newPath = new Path(newName);
 
419
+
 
420
+      if (localFS.exists(parent) && !localFS.exists(newPath)) {
 
421
+        return localFS.rename(new Path(oldName), newPath);
 
422
+      }
 
423
+      return false;
 
424
+    } catch (IOException e) {
 
425
+      return false;
 
426
+    }
 
427
+  }
 
428
+
 
429
+  protected boolean ceph_exists(String path) {
 
430
+    path = prepare_path(path);
 
431
+    boolean ret = false;
 
432
+
 
433
+    try {
 
434
+      ret = localFS.exists(new Path(path));
 
435
+    } catch (IOException e) {}
 
436
+    return ret;
 
437
+  }
 
438
+
 
439
+  protected long ceph_getblocksize(String path) {
 
440
+    path = prepare_path(path);
 
441
+    try {
 
442
+      FileStatus status = localFS.getFileStatus(new Path(path));
 
443
+
 
444
+      return status.getBlockSize();
 
445
+    } catch (FileNotFoundException e) {
 
446
+      return -CephFS.ENOENT;
 
447
+    } catch (IOException e) {
 
448
+      return -1; // just fail generically
 
449
+    }
 
450
+  }
 
451
+
 
452
+  protected boolean ceph_isdirectory(String path) {
 
453
+    path = prepare_path(path);
 
454
+    try {
 
455
+      FileStatus status = localFS.getFileStatus(new Path(path));
 
456
+
 
457
+      return status.isDir();
 
458
+    } catch (IOException e) {
 
459
+      return false;
 
460
+    }
 
461
+  }
 
462
+
 
463
+  protected boolean ceph_isfile(String path) {
 
464
+    path = prepare_path(path);
 
465
+    boolean ret = false;
 
466
+
 
467
+    try {
 
468
+      FileStatus status = localFS.getFileStatus(new Path(path));
 
469
+
 
470
+      ret = !status.isDir();
 
471
+    } catch (Exception e) {}
 
472
+    return ret;
 
473
+  }
 
474
+
 
475
+  protected String[] ceph_getdir(String path) {
 
476
+    path = prepare_path(path);
 
477
+    if (!ceph_isdirectory(path)) {
 
478
+      return null;
 
479
+    }
 
480
+    try {
 
481
+      FileStatus[] stats = localFS.listStatus(new Path(path));
 
482
+      String[] names = new String[stats.length];
 
483
+      String name;
 
484
+
 
485
+      for (int i = 0; i < stats.length; ++i) {
 
486
+        name = stats[i].getPath().toString();
 
487
+        names[i] = name.substring(name.lastIndexOf(Path.SEPARATOR) + 1);
 
488
+      }
 
489
+      return names;
 
490
+    } catch (IOException e) {}
 
491
+    return null;
 
492
+  }
 
493
+
 
494
+  protected int ceph_mkdirs(String path, int mode) {
 
495
+    path = prepare_path(path);
 
496
+    // debug("ceph_mkdirs on " + path, INFO);
 
497
+    try {
 
498
+      if (localFS.mkdirs(new Path(path), new FsPermission((short) mode))) {
 
499
+        return 0;
 
500
+      }
 
501
+    } catch (IOException e) {}
 
502
+    if (ceph_isdirectory(path)) { // apparently it already existed
 
503
+      return -EEXIST;
 
504
+    } else if (ceph_isfile(path)) {
 
505
+                       return -ENOTDIR;
 
506
+               }
 
507
+    return -1;
 
508
+  }
 
509
+
 
510
+  /*
 
511
+   * Unlike a real Ceph deployment, you can't do opens on a directory.
 
512
+   * Since that has unpredictable behavior and you shouldn't do it anyway,
 
513
+   * it's okay.
 
514
+   */
 
515
+  protected int ceph_open_for_append(String path) {
 
516
+    path = prepare_path(path);
 
517
+    FSDataOutputStream stream;
 
518
+
 
519
+    try {
 
520
+      stream = localFS.append(new Path(path));
 
521
+      files.put(new Integer(fileCount), stream);
 
522
+      filenames.put(new Integer(fileCount), path);
 
523
+      return fileCount++;
 
524
+    } catch (IOException e) {}
 
525
+    return -1; // failure
 
526
+  }
 
527
+
 
528
+  protected int ceph_open_for_read(String path) {
 
529
+    path = prepare_path(path);
 
530
+    FSDataInputStream stream;
 
531
+
 
532
+    try {
 
533
+      stream = localFS.open(new Path(path));
 
534
+      files.put(new Integer(fileCount), stream);
 
535
+      filenames.put(new Integer(fileCount), path);
 
536
+      LOG.info("ceph_open_for_read fh:" + fileCount + ", pathname:" + path);
 
537
+      return fileCount++;
 
538
+    } catch (IOException e) {}
 
539
+    return -1; // failure
 
540
+  }
 
541
+
 
542
+  protected int ceph_open_for_overwrite(String path, int mode) {
 
543
+    path = prepare_path(path);
 
544
+    FSDataOutputStream stream;
 
545
+
 
546
+    try {
 
547
+      stream = localFS.create(new Path(path));
 
548
+      files.put(new Integer(fileCount), stream);
 
549
+      filenames.put(new Integer(fileCount), path);
 
550
+      LOG.info("ceph_open_for_overwrite fh:" + fileCount + ", pathname:" + path);
 
551
+      return fileCount++;
 
552
+    } catch (IOException e) {}
 
553
+    return -1; // failure
 
554
+  }
 
555
+
 
556
+  protected int ceph_close(int filehandle) {
 
557
+    LOG.info("ceph_close(filehandle " + filehandle + ")");
 
558
+    try {
 
559
+      ((Closeable) files.get(new Integer(filehandle))).close();
 
560
+      if (null == files.get(new Integer(filehandle))) {
 
561
+        return -ENOENT; // this isn't quite the right error code,
 
562
+        // but the important part is it's negative
 
563
+      }
 
564
+      return 0; // hurray, success
 
565
+    } catch (NullPointerException ne) {
 
566
+      LOG.warn("ceph_close caught NullPointerException!" + ne);
 
567
+    } // err, how?
 
568
+    catch (IOException ie) {
 
569
+      LOG.warn("ceph_close caught IOException!" + ie);
 
570
+    }
 
571
+    return -1; // failure
 
572
+  }
 
573
+
 
574
+  protected boolean ceph_setPermission(String pth, int mode) {
 
575
+    pth = prepare_path(pth);
 
576
+    Path path = new Path(pth);
 
577
+    boolean ret = false;
 
578
+
 
579
+    try {
 
580
+      localFS.setPermission(path, new FsPermission((short) mode));
 
581
+      ret = true;
 
582
+    } catch (IOException e) {}
 
583
+    return ret;
 
584
+  }
 
585
+
 
586
+  // rather than try and match a Ceph deployment's behavior exactly,
 
587
+  // just make bad things happen if they try and call methods after this
 
588
+  protected boolean ceph_kill_client() {
 
589
+    // debug("ceph_kill_client", INFO);
 
590
+    localFS.setWorkingDirectory(new Path(localPrefix));
 
591
+    // debug("working dir is now " + localFS.getWorkingDirectory(), INFO);
 
592
+    try {
 
593
+      localFS.close();
 
594
+    } catch (Exception e) {}
 
595
+    localFS = null;
 
596
+    files = null;
 
597
+    filenames = null;
 
598
+    return true;
 
599
+  }
 
600
+
 
601
+  protected boolean ceph_stat(String pth, CephFileSystem.Stat fill) {
 
602
+    pth = prepare_path(pth);
 
603
+    Path path = new Path(pth);
 
604
+    boolean ret = false;
 
605
+
 
606
+    try {
 
607
+      FileStatus status = localFS.getFileStatus(path);
 
608
+
 
609
+      fill.size = status.getLen();
 
610
+      fill.is_dir = status.isDir();
 
611
+      fill.block_size = status.getBlockSize();
 
612
+      fill.mod_time = status.getModificationTime();
 
613
+      fill.access_time = status.getAccessTime();
 
614
+      fill.mode = status.getPermission().toShort();
 
615
+      ret = true;
 
616
+    } catch (IOException e) {}
 
617
+    return ret;
 
618
+  }
 
619
+
 
620
+  protected int ceph_replication(String path) {
 
621
+    path = prepare_path(path);
 
622
+    int ret = -1; // -1 for failure
 
623
+
 
624
+    try {
 
625
+      ret = localFS.getFileStatus(new Path(path)).getReplication();
 
626
+    } catch (IOException e) {}
 
627
+    return ret;
 
628
+  }
 
629
+
 
630
+  protected String[] ceph_hosts(int fh, long offset) {
 
631
+    String[] ret = null;
 
632
+
 
633
+    try {
 
634
+      BlockLocation[] locs = localFS.getFileBlockLocations(
 
635
+          localFS.getFileStatus(new Path(filenames.get(new Integer(fh)))),
 
636
+          offset, 1);
 
637
+
 
638
+      ret = locs[0].getNames();
 
639
+    } catch (IOException e) {} catch (NullPointerException f) {}
 
640
+    return ret;
 
641
+  }
 
642
+
 
643
+  protected int ceph_setTimes(String pth, long mtime, long atime) {
 
644
+    pth = prepare_path(pth);
 
645
+    Path path = new Path(pth);
 
646
+    int ret = -1; // generic fail
 
647
+
 
648
+    try {
 
649
+      localFS.setTimes(path, mtime, atime);
 
650
+      ret = 0;
 
651
+    } catch (IOException e) {}
 
652
+    return ret;
 
653
+  }
 
654
+
 
655
+  protected long ceph_getpos(int fh) {
 
656
+    long ret = -1; // generic fail
 
657
+
 
658
+    try {
 
659
+      Object stream = files.get(new Integer(fh));
 
660
+
 
661
+      if (stream instanceof FSDataInputStream) {
 
662
+        ret = ((FSDataInputStream) stream).getPos();
 
663
+      } else if (stream instanceof FSDataOutputStream) {
 
664
+        ret = ((FSDataOutputStream) stream).getPos();
 
665
+      }
 
666
+    } catch (IOException e) {} catch (NullPointerException f) {}
 
667
+    return ret;
 
668
+  }
 
669
+
 
670
+  protected int ceph_write(int fh, byte[] buffer,
 
671
+      int buffer_offset, int length) {
 
672
+    LOG.info(
 
673
+        "ceph_write fh:" + fh + ", buffer_offset:" + buffer_offset + ", length:"
 
674
+        + length);
 
675
+    long ret = -1; // generic fail
 
676
+
 
677
+    try {
 
678
+      FSDataOutputStream os = (FSDataOutputStream) files.get(new Integer(fh));
 
679
+
 
680
+      LOG.info("ceph_write got outputstream");
 
681
+      long startPos = os.getPos();
 
682
+
 
683
+      os.write(buffer, buffer_offset, length);
 
684
+      ret = os.getPos() - startPos;
 
685
+    } catch (IOException e) {
 
686
+      LOG.warn("ceph_write caught IOException!");
 
687
+    } catch (NullPointerException f) {
 
688
+      LOG.warn("ceph_write caught NullPointerException!");
 
689
+    }
 
690
+    return (int) ret;
 
691
+  }
 
692
+
 
693
+  protected int ceph_read(int fh, byte[] buffer,
 
694
+      int buffer_offset, int length) {
 
695
+    long ret = -1; // generic fail
 
696
+
 
697
+    try {
 
698
+      FSDataInputStream is = (FSDataInputStream) files.get(new Integer(fh));
 
699
+      long startPos = is.getPos();
 
700
+
 
701
+      is.read(buffer, buffer_offset, length);
 
702
+      ret = is.getPos() - startPos;
 
703
+    } catch (IOException e) {} catch (NullPointerException f) {}
 
704
+    return (int) ret;
 
705
+  }
 
706
+
 
707
+  protected long ceph_seek_from_start(int fh, long pos) {
 
708
+    LOG.info("ceph_seek_from_start(fh " + fh + ", pos " + pos + ")");
 
709
+    long ret = -1; // generic fail
 
710
+
 
711
+    try {
 
712
+      LOG.info("ceph_seek_from_start filename is " + filenames.get(new Integer(fh)));
 
713
+      if (null == files.get(new Integer(fh))) {
 
714
+        LOG.warn("ceph_seek_from_start: is is null!");
 
715
+      }
 
716
+      FSDataInputStream is = (FSDataInputStream) files.get(new Integer(fh));
 
717
+
 
718
+      LOG.info("ceph_seek_from_start retrieved is!");
 
719
+      is.seek(pos);
 
720
+      ret = is.getPos();
 
721
+    } catch (IOException e) {
 
722
+      LOG.warn("ceph_seek_from_start caught IOException!");
 
723
+    } catch (NullPointerException f) {
 
724
+      LOG.warn("ceph_seek_from_start caught NullPointerException!");
 
725
+    }
 
726
+    return (int) ret;
 
727
+  }
 
728
+
 
729
+  /*
 
730
+   * We need to remove the localFS file prefix before returning to Ceph
 
731
+   */
 
732
+  private String sanitize_path(String path) {
 
733
+    // debug("sanitize_path(" + path + ")", INFO);
 
734
+    /* if (path.startsWith("file:"))
 
735
+     path = path.substring("file:".length()); */
 
736
+    if (path.startsWith(localPrefix)) {
 
737
+      path = path.substring(localPrefix.length());
 
738
+      if (path.length() == 0) { // it was a root path
 
739
+        path = "/";
 
740
+      }
 
741
+    }
 
742
+    // debug("sanitize_path returning " + path, INFO);
 
743
+    return path;
 
744
+  }
 
745
+
 
746
+  /*
 
747
+   * If it's an absolute path we need to shove the
 
748
+   * test dir onto the front as a prefix.
 
749
+   */
 
750
+  private String prepare_path(String path) {
 
751
+    // debug("prepare_path(" + path + ")", INFO);
 
752
+    if (path.startsWith("/")) {
 
753
+      path = localPrefix + path;
 
754
+    } else if (path.equals("..")) {
 
755
+      if (ceph_getcwd().equals("/")) {
 
756
+        path = ".";
 
757
+      } // you can't go up past root!
 
758
+    }
 
759
+    // debug("prepare_path returning" + path, INFO);
 
760
+    return path;
 
761
+  }
 
762
+}
 
763
diff --git a/src/core/org/apache/hadoop/fs/ceph/CephFileSystem.java b/src/core/org/apache/hadoop/fs/ceph/CephFileSystem.java
 
764
new file mode 100644
 
765
index 0000000..95f2223
 
766
--- /dev/null
 
767
+++ b/src/core/org/apache/hadoop/fs/ceph/CephFileSystem.java
 
768
@@ -0,0 +1,804 @@
 
769
+// -*- mode:Java; tab-width:2; c-basic-offset:2; indent-tabs-mode:t -*-
 
770
+
 
771
+/**
 
772
+ *
 
773
+ * Licensed under the Apache License, Version 2.0
 
774
+ * (the "License"); you may not use this file except in compliance with
 
775
+ * the License. You may obtain a copy of the License at
 
776
+ *
 
777
+ * http://www.apache.org/licenses/LICENSE-2.0
 
778
+ *
 
779
+ * Unless required by applicable law or agreed to in writing, software
 
780
+ * distributed under the License is distributed on an "AS IS" BASIS,
 
781
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
 
782
+ * implied. See the License for the specific language governing
 
783
+ * permissions and limitations under the License.
 
784
+ *
 
785
+ *
 
786
+ * Implements the Hadoop FS interfaces to allow applications to store
 
787
+ * files in Ceph.
 
788
+ */
 
789
+package org.apache.hadoop.fs.ceph;
 
790
+
 
791
+
 
792
+import java.io.IOException;
 
793
+import java.io.FileNotFoundException;
 
794
+import java.io.OutputStream;
 
795
+import java.net.URI;
 
796
+import java.net.InetAddress;
 
797
+import java.util.EnumSet;
 
798
+import java.lang.Math;
 
799
+import java.util.ArrayList;
 
800
+
 
801
+import org.apache.commons.logging.Log;
 
802
+import org.apache.commons.logging.LogFactory;
 
803
+import org.apache.hadoop.conf.Configuration;
 
804
+import org.apache.hadoop.fs.BlockLocation;
 
805
+import org.apache.hadoop.fs.FSDataInputStream;
 
806
+import org.apache.hadoop.fs.FSInputStream;
 
807
+import org.apache.hadoop.fs.FSDataOutputStream;
 
808
+import org.apache.hadoop.fs.FileSystem;
 
809
+import org.apache.hadoop.fs.FileUtil;
 
810
+import org.apache.hadoop.fs.Path;
 
811
+import org.apache.hadoop.fs.permission.FsPermission;
 
812
+import org.apache.hadoop.util.Progressable;
 
813
+import org.apache.hadoop.fs.FileStatus;
 
814
+import org.apache.hadoop.net.DNS;
 
815
+
 
816
+
 
817
+/**
 
818
+ * <p>
 
819
+ * A {@link FileSystem} backed by <a href="http://ceph.newdream.net">Ceph.</a>.
 
820
+ * This will not start a Ceph instance; one must already be running.
 
821
+ * </p>
 
822
+ * Configuration of the CephFileSystem is handled via a few Hadoop
 
823
+ * Configuration properties: <br>
 
824
+ * fs.ceph.monAddr -- the ip address/port of the monitor to connect to. <br>
 
825
+ * fs.ceph.libDir -- the directory that libcephfs and libhadoopceph are
 
826
+ * located in. This assumes Hadoop is being run on a linux-style machine
 
827
+ * with names like libcephfs.so.
 
828
+ * fs.ceph.commandLine -- if you prefer you can fill in this property
 
829
+ * just as you would when starting Ceph up from the command line. Specific
 
830
+ * properties override any configuration specified here.
 
831
+ * <p>
 
832
+ * You can also enable debugging of the CephFileSystem and Ceph itself: <br>
 
833
+ * fs.ceph.debug -- if 'true' will print out method enter/exit messages,
 
834
+ * plus a little more.
 
835
+ * fs.ceph.clientDebug/fs.ceph.messengerDebug -- will print out debugging
 
836
+ * from the respective Ceph system of at least that importance.
 
837
+ */
 
838
+public class CephFileSystem extends FileSystem {
 
839
+  private static final Log LOG = LogFactory.getLog(CephFileSystem.class);
 
840
+  private URI uri;
 
841
+
 
842
+  private Path workingDir;
 
843
+  private final Path root;
 
844
+  private CephFS ceph = null;
 
845
+
 
846
+  private static String CEPH_NAMESERVER;
 
847
+  private static final String CEPH_NAMESERVER_KEY = "fs.ceph.nameserver";
 
848
+  private static final String CEPH_NAMESERVER_DEFAULT = "localhost";
 
849
+
 
850
+  /**
 
851
+   * Create a new CephFileSystem.
 
852
+   */
 
853
+  public CephFileSystem() {
 
854
+    root = new Path("/");
 
855
+  }
 
856
+
 
857
+  /**
 
858
+   * Used for testing purposes, this constructor
 
859
+   * sets the given CephFS instead of defaulting to a
 
860
+   * CephTalker (with its assumed real Ceph instance to talk to).
 
861
+   */
 
862
+  public CephFileSystem(CephFS ceph_fs) {
 
863
+    super();
 
864
+    root = new Path("/");
 
865
+    ceph = ceph_fs;
 
866
+  }
 
867
+
 
868
+  /**
 
869
+   * Lets you get the URI of this CephFileSystem.
 
870
+   * @return the URI.
 
871
+   */
 
872
+  public URI getUri() {
 
873
+    LOG.debug("getUri:exit with return " + uri);
 
874
+    return uri;
 
875
+  }
 
876
+
 
877
+  /**
 
878
+   * Should be called after constructing a CephFileSystem but before calling
 
879
+   * any other methods.
 
880
+   * Starts up the connection to Ceph, reads in configuraton options, etc.
 
881
+   * @param uri The URI for this filesystem.
 
882
+   * @param conf The Hadoop Configuration to retrieve properties from.
 
883
+   * @throws IOException if necessary properties are unset.
 
884
+   */
 
885
+  @Override
 
886
+  public void initialize(URI uri, Configuration conf) throws IOException {
 
887
+    super.initialize(uri, conf);
 
888
+    setConf(conf);
 
889
+    this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
 
890
+    if (ceph == null) {
 
891
+      ceph = new CephTalker(conf, LOG);
 
892
+    }
 
893
+
 
894
+    CEPH_NAMESERVER = conf.get(CEPH_NAMESERVER_KEY, CEPH_NAMESERVER_DEFAULT);
 
895
+
 
896
+    // build up the arguments for Ceph
 
897
+    String arguments = "CephFSInterface";
 
898
+
 
899
+    arguments += conf.get("fs.ceph.commandLine", "");
 
900
+    if (conf.get("fs.ceph.clientDebug") != null) {
 
901
+      arguments += " --debug_client ";
 
902
+      arguments += conf.get("fs.ceph.clientDebug");
 
903
+    }
 
904
+    if (conf.get("fs.ceph.messengerDebug") != null) {
 
905
+      arguments += " --debug_ms ";
 
906
+      arguments += conf.get("fs.ceph.messengerDebug");
 
907
+    }
 
908
+    if (conf.get("fs.ceph.monAddr") != null) {
 
909
+      arguments += " -m ";
 
910
+      arguments += conf.get("fs.ceph.monAddr");
 
911
+    }
 
912
+    arguments += " --client-readahead-max-periods="
 
913
+        + conf.get("fs.ceph.readahead", "1");
 
914
+    // make sure they gave us a ceph monitor address or conf file
 
915
+    LOG.info("initialize:Ceph initialization arguments: " + arguments);
 
916
+    if ((conf.get("fs.ceph.monAddr") == null) && (arguments.indexOf("-m") == -1)
 
917
+        && (arguments.indexOf("-c") == -1)) {
 
918
+      LOG.fatal("initialize:You need to specify a Ceph monitor address.");
 
919
+      throw new IOException(
 
920
+          "You must specify a Ceph monitor address or config file!");
 
921
+    }
 
922
+    // Initialize the client
 
923
+    if (!ceph.ceph_initializeClient(arguments,
 
924
+        conf.getInt("fs.ceph.blockSize", 1 << 26))) {
 
925
+      LOG.fatal("initialize:Ceph initialization failed!");
 
926
+      throw new IOException("Ceph initialization failed!");
 
927
+    }
 
928
+    LOG.info("initialize:Ceph initialized client. Setting cwd to /");
 
929
+    ceph.ceph_setcwd("/");
 
930
+    LOG.debug("initialize:exit");
 
931
+
 
932
+    this.workingDir = getHomeDirectory();
 
933
+  }
 
934
+
 
935
+  /**
 
936
+   * Close down the CephFileSystem. Runs the base-class close method
 
937
+   * and then kills the Ceph client itself.
 
938
+   */
 
939
+  @Override
 
940
+  public void close() throws IOException {
 
941
+    LOG.debug("close:enter");
 
942
+    super.close(); // this method does stuff, make sure it's run!
 
943
+    LOG.trace("close: Calling ceph_kill_client from Java");
 
944
+    ceph.ceph_kill_client();
 
945
+    LOG.debug("close:exit");
 
946
+  }
 
947
+
 
948
+  /**
 
949
+   * Get an FSDataOutputStream to append onto a file.
 
950
+   * @param file The File you want to append onto
 
951
+   * @param bufferSize Ceph does internal buffering but you can buffer in the Java code as well if you like.
 
952
+   * @param progress The Progressable to report progress to.
 
953
+   * Reporting is limited but exists.
 
954
+   * @return An FSDataOutputStream that connects to the file on Ceph.
 
955
+   * @throws IOException If the file cannot be found or appended to.
 
956
+   */
 
957
+  public FSDataOutputStream append(Path file, int bufferSize,
 
958
+      Progressable progress) throws IOException {
 
959
+    LOG.debug("append:enter with path " + file + " bufferSize " + bufferSize);
 
960
+    Path abs_path = makeAbsolute(file);
 
961
+
 
962
+    if (progress != null) {
 
963
+      progress.progress();
 
964
+    }
 
965
+    LOG.trace("append: Entering ceph_open_for_append from Java");
 
966
+    int fd = ceph.ceph_open_for_append(getCephPath(abs_path));
 
967
+
 
968
+    LOG.trace("append: Returned to Java");
 
969
+    if (progress != null) {
 
970
+      progress.progress();
 
971
+    }
 
972
+    if (fd < 0) { // error in open
 
973
+      throw new IOException(
 
974
+          "append: Open for append failed on path \"" + abs_path.toString()
 
975
+          + "\"");
 
976
+    }
 
977
+    CephOutputStream cephOStream = new CephOutputStream(getConf(), ceph, fd,
 
978
+        bufferSize);
 
979
+
 
980
+    LOG.debug("append:exit");
 
981
+    return new FSDataOutputStream(cephOStream, statistics);
 
982
+  }
 
983
+
 
984
+  /**
 
985
+   * Get the current working directory for the given file system
 
986
+   * @return the directory Path
 
987
+   */
 
988
+  public Path getWorkingDirectory() {
 
989
+    return workingDir;
 
990
+  }
 
991
+
 
992
+  /**
 
993
+   * Set the current working directory for the given file system. All relative
 
994
+   * paths will be resolved relative to it.
 
995
+   *
 
996
+   * @param dir The directory to change to.
 
997
+   */
 
998
+  @Override
 
999
+  public void setWorkingDirectory(Path dir) {
 
1000
+    workingDir = makeAbsolute(dir);
 
1001
+  }
 
1002
+
 
1003
+  /**
 
1004
+   * Return only the path component from a potentially fully qualified path.
 
1005
+   */
 
1006
+  private String getCephPath(Path path) {
 
1007
+    if (!path.isAbsolute()) {
 
1008
+      throw new IllegalArgumentException("Path must be absolute: " + path);
 
1009
+    }
 
1010
+    return path.toUri().getPath();
 
1011
+  }
 
1012
+
 
1013
+  /**
 
1014
+   * Check if a path exists.
 
1015
+   * Overriden because it's moderately faster than the generic implementation.
 
1016
+   * @param path The file to check existence on.
 
1017
+   * @return true if the file exists, false otherwise.
 
1018
+   */
 
1019
+  @Override
 
1020
+  public boolean exists(Path path) throws IOException {
 
1021
+    LOG.debug("exists:enter with path " + path);
 
1022
+    boolean result;
 
1023
+    Path abs_path = makeAbsolute(path);
 
1024
+
 
1025
+    if (abs_path.equals(root)) {
 
1026
+      result = true;
 
1027
+    } else {
 
1028
+      LOG.trace(
 
1029
+          "exists:Calling ceph_exists from Java on path " + abs_path.toString());
 
1030
+      result = ceph.ceph_exists(getCephPath(abs_path));
 
1031
+      LOG.trace("exists:Returned from ceph_exists to Java");
 
1032
+    }
 
1033
+    LOG.debug("exists:exit with value " + result);
 
1034
+    return result;
 
1035
+  }
 
1036
+
 
1037
+  /**
 
1038
+   * Create a directory and any nonexistent parents. Any portion
 
1039
+   * of the directory tree can exist without error.
 
1040
+   * @param path The directory path to create
 
1041
+   * @param perms The permissions to apply to the created directories.
 
1042
+   * @return true if successful, false otherwise
 
1043
+   * @throws IOException if the path is a child of a file.
 
1044
+   */
 
1045
+  @Override
 
1046
+  public boolean mkdirs(Path path, FsPermission perms) throws IOException {
 
1047
+    LOG.debug("mkdirs:enter with path " + path);
 
1048
+    Path abs_path = makeAbsolute(path);
 
1049
+
 
1050
+    LOG.trace("mkdirs:calling ceph_mkdirs from Java");
 
1051
+    int result = ceph.ceph_mkdirs(getCephPath(abs_path), (int) perms.toShort());
 
1052
+
 
1053
+    if (result != 0) {
 
1054
+      LOG.warn(
 
1055
+          "mkdirs: make directory " + abs_path + "Failing with result " + result);
 
1056
+      if (-ceph.ENOTDIR == result) {
 
1057
+        throw new IOException("Parent path is not a directory");
 
1058
+      }
 
1059
+      return false;
 
1060
+    } else {
 
1061
+      LOG.debug("mkdirs:exiting succesfully");
 
1062
+      return true;
 
1063
+    }
 
1064
+  }
 
1065
+
 
1066
+  /**
 
1067
+   * Check if a path is a file. This is moderately faster than the
 
1068
+   * generic implementation.
 
1069
+   * @param path The path to check.
 
1070
+   * @return true if the path is definitely a file, false otherwise.
 
1071
+   */
 
1072
+  @Override
 
1073
+  public boolean isFile(Path path) throws IOException {
 
1074
+    LOG.debug("isFile:enter with path " + path);
 
1075
+    Path abs_path = makeAbsolute(path);
 
1076
+    boolean result;
 
1077
+
 
1078
+    if (abs_path.equals(root)) {
 
1079
+      result = false;
 
1080
+    } else {
 
1081
+      LOG.trace("isFile:entering ceph_isfile from Java");
 
1082
+      result = ceph.ceph_isfile(getCephPath(abs_path));
 
1083
+    }
 
1084
+    LOG.debug("isFile:exit with result " + result);
 
1085
+    return result;
 
1086
+  }
 
1087
+
 
1088
+  /**
 
1089
+   * Get stat information on a file. This does not fill owner or group, as
 
1090
+   * Ceph's support for these is a bit different than HDFS'.
 
1091
+   * @param path The path to stat.
 
1092
+   * @return FileStatus object containing the stat information.
 
1093
+   * @throws FileNotFoundException if the path could not be resolved.
 
1094
+   */
 
1095
+  public FileStatus getFileStatus(Path path) throws IOException {
 
1096
+    LOG.debug("getFileStatus:enter with path " + path);
 
1097
+    Path abs_path = makeAbsolute(path);
 
1098
+    // sadly, Ceph doesn't really do uids/gids just yet, but
 
1099
+    // everything else is filled
 
1100
+    FileStatus status;
 
1101
+    Stat lstat = new Stat();
 
1102
+
 
1103
+    LOG.trace("getFileStatus: calling ceph_stat from Java");
 
1104
+    if (ceph.ceph_stat(getCephPath(abs_path), lstat)) {
 
1105
+      status = new FileStatus(lstat.size, lstat.is_dir,
 
1106
+          ceph.ceph_replication(getCephPath(abs_path)), lstat.block_size,
 
1107
+          lstat.mod_time, lstat.access_time,
 
1108
+          new FsPermission((short) lstat.mode), System.getProperty("user.name"), null,
 
1109
+          path.makeQualified(this));
 
1110
+    } else { // fail out
 
1111
+      throw new FileNotFoundException(
 
1112
+          "org.apache.hadoop.fs.ceph.CephFileSystem: File " + path
 
1113
+          + " does not exist or could not be accessed");
 
1114
+    }
 
1115
+
 
1116
+    LOG.debug("getFileStatus:exit");
 
1117
+    return status;
 
1118
+  }
 
1119
+
 
1120
+  /**
 
1121
+   * Get the FileStatus for each listing in a directory.
 
1122
+   * @param path The directory to get listings from.
 
1123
+   * @return FileStatus[] containing one FileStatus for each directory listing;
 
1124
+   *         null if path does not exist.
 
1125
+   */
 
1126
+  public FileStatus[] listStatus(Path path) throws IOException {
 
1127
+    LOG.debug("listStatus:enter with path " + path);
 
1128
+    Path abs_path = makeAbsolute(path);
 
1129
+    Path[] paths = listPaths(abs_path);
 
1130
+
 
1131
+    if (paths != null) {
 
1132
+      FileStatus[] statuses = new FileStatus[paths.length];
 
1133
+
 
1134
+      for (int i = 0; i < paths.length; ++i) {
 
1135
+        statuses[i] = getFileStatus(paths[i]);
 
1136
+      }
 
1137
+      LOG.debug("listStatus:exit");
 
1138
+      return statuses;
 
1139
+    }
 
1140
+
 
1141
+    if (isFile(path)) {
 
1142
+      return new FileStatus[] { getFileStatus(path) };
 
1143
+    }
 
1144
+
 
1145
+    return null;
 
1146
+  }
 
1147
+
 
1148
+  @Override
 
1149
+  public void setPermission(Path p, FsPermission permission) throws IOException {
 
1150
+    LOG.debug(
 
1151
+        "setPermission:enter with path " + p + " and permissions " + permission);
 
1152
+    Path abs_path = makeAbsolute(p);
 
1153
+
 
1154
+    LOG.trace("setPermission:calling ceph_setpermission from Java");
 
1155
+    ceph.ceph_setPermission(getCephPath(abs_path), permission.toShort());
 
1156
+    LOG.debug("setPermission:exit");
 
1157
+  }
 
1158
+
 
1159
+  /**
 
1160
+   * Set access/modification times of a file.
 
1161
+   * @param p The path
 
1162
+   * @param mtime Set modification time in number of millis since Jan 1, 1970.
 
1163
+   * @param atime Set access time in number of millis since Jan 1, 1970.
 
1164
+   */
 
1165
+  @Override
 
1166
+  public void setTimes(Path p, long mtime, long atime) throws IOException {
 
1167
+    LOG.debug(
 
1168
+        "setTimes:enter with path " + p + " mtime:" + mtime + " atime:" + atime);
 
1169
+    Path abs_path = makeAbsolute(p);
 
1170
+
 
1171
+    LOG.trace("setTimes:calling ceph_setTimes from Java");
 
1172
+    int r = ceph.ceph_setTimes(getCephPath(abs_path), mtime, atime);
 
1173
+
 
1174
+    if (r < 0) {
 
1175
+      throw new IOException(
 
1176
+          "Failed to set times on path " + abs_path.toString() + " Error code: "
 
1177
+          + r);
 
1178
+    }
 
1179
+    LOG.debug("setTimes:exit");
 
1180
+  }
 
1181
+
 
1182
+  /**
 
1183
+   * Create a new file and open an FSDataOutputStream that's connected to it.
 
1184
+   * @param path The file to create.
 
1185
+   * @param permission The permissions to apply to the file.
 
1186
+   * @param overwrite If true, overwrite any existing file with
 
1187
+        * this name; otherwise don't.
 
1188
+   * @param bufferSize Ceph does internal buffering, but you can buffer
 
1189
+   *   in the Java code too if you like.
 
1190
+   * @param replication Ignored by Ceph. This can be
 
1191
+   * configured via Ceph configuration.
 
1192
+   * @param blockSize Ignored by Ceph. You can set client-wide block sizes
 
1193
+   * via the fs.ceph.blockSize param if you like.
 
1194
+   * @param progress A Progressable to report back to.
 
1195
+   * Reporting is limited but exists.
 
1196
+   * @return An FSDataOutputStream pointing to the created file.
 
1197
+   * @throws IOException if the path is an
 
1198
+   * existing directory, or the path exists but overwrite is false, or there is a
 
1199
+   * failure in attempting to open for append with Ceph.
 
1200
+   */
 
1201
+  public FSDataOutputStream create(Path path,
 
1202
+      FsPermission permission,
 
1203
+      boolean overwrite,
 
1204
+      int bufferSize,
 
1205
+      short replication,
 
1206
+      long blockSize,
 
1207
+      Progressable progress) throws IOException {
 
1208
+    LOG.debug("create:enter with path " + path);
 
1209
+    Path abs_path = makeAbsolute(path);
 
1210
+
 
1211
+    if (progress != null) {
 
1212
+      progress.progress();
 
1213
+    }
 
1214
+    // We ignore replication since that's not configurable here, and
 
1215
+    // progress reporting is quite limited.
 
1216
+    // Required semantics: if the file exists, overwrite if 'overwrite' is set;
 
1217
+    // otherwise, throw an exception
 
1218
+
 
1219
+    // Step 1: existence test
 
1220
+    boolean exists = exists(abs_path);
 
1221
+
 
1222
+    if (exists) {
 
1223
+      if (getFileStatus(abs_path).isDir()) {
 
1224
+        throw new IOException(
 
1225
+            "create: Cannot overwrite existing directory \"" + path.toString()
 
1226
+            + "\" with a file");
 
1227
+      }
 
1228
+      if (!overwrite) {
 
1229
+        throw new IOException(
 
1230
+            "createRaw: Cannot open existing file \"" + abs_path.toString()
 
1231
+            + "\" for writing without overwrite flag");
 
1232
+      }
 
1233
+    }
 
1234
+
 
1235
+    if (progress != null) {
 
1236
+      progress.progress();
 
1237
+    }
 
1238
+
 
1239
+    // Step 2: create any nonexistent directories in the path
 
1240
+    if (!exists) {
 
1241
+      Path parent = abs_path.getParent();
 
1242
+
 
1243
+      if (parent != null) { // if parent is root, we're done
 
1244
+        int r = ceph.ceph_mkdirs(getCephPath(parent), permission.toShort());
 
1245
+
 
1246
+        if (!(r == 0 || r == -ceph.EEXIST)) {
 
1247
+          throw new IOException("Error creating parent directory; code: " + r);
 
1248
+        }
 
1249
+      }
 
1250
+      if (progress != null) {
 
1251
+        progress.progress();
 
1252
+      }
 
1253
+    }
 
1254
+    // Step 3: open the file
 
1255
+    LOG.trace("calling ceph_open_for_overwrite from Java");
 
1256
+    int fh = ceph.ceph_open_for_overwrite(getCephPath(abs_path),
 
1257
+        (int) permission.toShort());
 
1258
+
 
1259
+    if (progress != null) {
 
1260
+      progress.progress();
 
1261
+    }
 
1262
+    LOG.trace("Returned from ceph_open_for_overwrite to Java with fh " + fh);
 
1263
+    if (fh < 0) {
 
1264
+      throw new IOException(
 
1265
+          "create: Open for overwrite failed on path \"" + path.toString()
 
1266
+          + "\"");
 
1267
+    }
 
1268
+
 
1269
+    // Step 4: create the stream
 
1270
+    OutputStream cephOStream = new CephOutputStream(getConf(), ceph, fh,
 
1271
+        bufferSize);
 
1272
+
 
1273
+    LOG.debug("create:exit");
 
1274
+    return new FSDataOutputStream(cephOStream, statistics);
 
1275
+  }
 
1276
+
 
1277
+  /**
 
1278
+   * Open a Ceph file and attach the file handle to an FSDataInputStream.
 
1279
+   * @param path The file to open
 
1280
+   * @param bufferSize Ceph does internal buffering; but you can buffer in
 
1281
+   *   the Java code too if you like.
 
1282
+   * @return FSDataInputStream reading from the given path.
 
1283
+   * @throws IOException if the path DNE or is a
 
1284
+   * directory, or there is an error getting data to set up the FSDataInputStream.
 
1285
+   */
 
1286
+  public FSDataInputStream open(Path path, int bufferSize) throws IOException {
 
1287
+    LOG.debug("open:enter with path " + path);
 
1288
+    Path abs_path = makeAbsolute(path);
 
1289
+
 
1290
+    int fh = ceph.ceph_open_for_read(getCephPath(abs_path));
 
1291
+
 
1292
+    if (fh < 0) { // uh-oh, something's bad!
 
1293
+      if (fh == -ceph.ENOENT) { // well that was a stupid open
 
1294
+        throw new IOException(
 
1295
+            "open:  absolute path \"" + abs_path.toString()
 
1296
+            + "\" does not exist");
 
1297
+      } else { // hrm...the file exists but we can't open it :(
 
1298
+        throw new IOException("open: Failed to open file " + abs_path.toString());
 
1299
+      }
 
1300
+    }
 
1301
+
 
1302
+    if (getFileStatus(abs_path).isDir()) { // yes, it is possible to open Ceph directories
 
1303
+      // but that doesn't mean you should in Hadoop!
 
1304
+      ceph.ceph_close(fh);
 
1305
+      throw new IOException(
 
1306
+          "open:  absolute path \"" + abs_path.toString() + "\" is a directory!");
 
1307
+    }
 
1308
+    Stat lstat = new Stat();
 
1309
+
 
1310
+    LOG.trace("open:calling ceph_stat from Java");
 
1311
+    ceph.ceph_stat(getCephPath(abs_path), lstat);
 
1312
+    LOG.trace("open:returned to Java");
 
1313
+    long size = lstat.size;
 
1314
+
 
1315
+    if (size < 0) {
 
1316
+      throw new IOException(
 
1317
+          "Failed to get file size for file " + abs_path.toString()
 
1318
+          + " but succeeded in opening file. Something bizarre is going on.");
 
1319
+    }
 
1320
+    FSInputStream cephIStream = new CephInputStream(getConf(), ceph, fh, size,
 
1321
+        bufferSize);
 
1322
+
 
1323
+    LOG.debug("open:exit");
 
1324
+    return new FSDataInputStream(cephIStream);
 
1325
+  }
 
1326
+
 
1327
+  /**
 
1328
+   * Rename a file or directory.
 
1329
+   * @param src The current path of the file/directory
 
1330
+   * @param dst The new name for the path.
 
1331
+   * @return true if the rename succeeded, false otherwise.
 
1332
+   */
 
1333
+  @Override
 
1334
+  public boolean rename(Path src, Path dst) throws IOException {
 
1335
+    LOG.debug("rename:enter with src:" + src + " and dest:" + dst);
 
1336
+    Path abs_src = makeAbsolute(src);
 
1337
+    Path abs_dst = makeAbsolute(dst);
 
1338
+
 
1339
+    LOG.trace("calling ceph_rename from Java");
 
1340
+    boolean result = ceph.ceph_rename(getCephPath(abs_src), getCephPath(abs_dst));
 
1341
+
 
1342
+    if (!result) {
 
1343
+      boolean isDir = false;
 
1344
+      try {
 
1345
+        isDir = getFileStatus(abs_dst).isDir();
 
1346
+      } catch (FileNotFoundException e) {}
 
1347
+      if (isDir) { // move the srcdir into destdir
 
1348
+        LOG.debug("ceph_rename failed but dst is a directory!");
 
1349
+        Path new_dst = new Path(abs_dst, abs_src.getName());
 
1350
+
 
1351
+        result = rename(abs_src, new_dst);
 
1352
+        LOG.debug(
 
1353
+            "attempt to move " + abs_src.toString() + " to "
 
1354
+            + new_dst.toString() + "has result:" + result);
 
1355
+      }
 
1356
+    }
 
1357
+    LOG.debug("rename:exit with result: " + result);
 
1358
+    return result;
 
1359
+  }
 
1360
+
 
1361
+  /*
 
1362
+   * Attempt to convert an IP into its hostname
 
1363
+   */
 
1364
+  private String[] ips2Hosts(String[] ips) {
 
1365
+    ArrayList<String> hosts = new ArrayList<String>();
 
1366
+    for (String ip : ips) {
 
1367
+      try {
 
1368
+        String host = DNS.reverseDns(InetAddress.getByName(ip), CEPH_NAMESERVER);
 
1369
+        if (host.charAt(host.length()-1) == '.') {
 
1370
+          host = host.substring(0, host.length()-1);
 
1371
+        }
 
1372
+        hosts.add(host); /* append */
 
1373
+      } catch (Exception e) {
 
1374
+        LOG.error("reverseDns ["+ip+"] failed: "+ e);
 
1375
+      }
 
1376
+    }
 
1377
+    return hosts.toArray(new String[hosts.size()]);
 
1378
+  }
 
1379
+
 
1380
+  /**
 
1381
+   * Get a BlockLocation object for each block in a file.
 
1382
+   *
 
1383
+   * Note that this doesn't include port numbers in the name field as
 
1384
+   * Ceph handles slow/down servers internally. This data should be used
 
1385
+   * only for selecting which servers to run which jobs on.
 
1386
+   *
 
1387
+   * @param file A FileStatus object corresponding to the file you want locations for.
 
1388
+   * @param start The offset of the first part of the file you are interested in.
 
1389
+   * @param len The amount of the file past the offset you are interested in.
 
1390
+   * @return A BlockLocation[] where each object corresponds to a block within
 
1391
+   * the given range.
 
1392
+   */
 
1393
+  @Override
 
1394
+  public BlockLocation[] getFileBlockLocations(FileStatus file, long start, long len) throws IOException {
 
1395
+    Path abs_path = makeAbsolute(file.getPath());
 
1396
+
 
1397
+    int fh = ceph.ceph_open_for_read(getCephPath(abs_path));
 
1398
+    if (fh < 0) {
 
1399
+      LOG.error("getFileBlockLocations:got error " + fh + ", exiting and returning null!");
 
1400
+      return null;
 
1401
+    }
 
1402
+
 
1403
+    long blockSize = ceph.ceph_getblocksize(getCephPath(abs_path));
 
1404
+    BlockLocation[] locations = new BlockLocation[(int) Math.ceil(len / (float) blockSize)];
 
1405
+
 
1406
+    for (int i = 0; i < locations.length; ++i) {
 
1407
+      long offset = start + i * blockSize;
 
1408
+      long blockStart = start + i * blockSize - (start % blockSize);
 
1409
+      String ips[] = ceph.ceph_hosts(fh, offset);
 
1410
+      String hosts[] = ips2Hosts(ips);
 
1411
+      locations[i] = new BlockLocation(null, hosts, blockStart, blockSize);
 
1412
+      LOG.debug("getFileBlockLocations: location[" + i + "]: " + locations[i]);
 
1413
+    }
 
1414
+
 
1415
+    ceph.ceph_close(fh);
 
1416
+    return locations;
 
1417
+  }
 
1418
+
 
1419
+  @Deprecated
 
1420
+       public boolean delete(Path path) throws IOException {
 
1421
+               return delete(path, false);
 
1422
+       }
 
1423
+
 
1424
+  /**
 
1425
+   * Delete the given path, and optionally its children.
 
1426
+   * @param path the path to delete.
 
1427
+   * @param recursive If the path is a non-empty directory and this is false,
 
1428
+   * delete will throw an IOException. If path is a file this is ignored.
 
1429
+   * @return true if the delete succeeded, false otherwise (including if
 
1430
+   * path doesn't exist).
 
1431
+   * @throws IOException if you attempt to non-recursively delete a directory,
 
1432
+   * or you attempt to delete the root directory.
 
1433
+   */
 
1434
+  public boolean delete(Path path, boolean recursive) throws IOException {
 
1435
+    LOG.debug("delete:enter with path " + path + " and recursive=" + recursive);
 
1436
+    Path abs_path = makeAbsolute(path);
 
1437
+
 
1438
+    // sanity check
 
1439
+    if (abs_path.equals(root)) {
 
1440
+      throw new IOException("Error: deleting the root directory is a Bad Idea.");
 
1441
+    }
 
1442
+    if (!exists(abs_path)) {
 
1443
+      return false;
 
1444
+    }
 
1445
+
 
1446
+    // if the path is a file, try to delete it.
 
1447
+    if (isFile(abs_path)) {
 
1448
+      LOG.trace("delete:calling ceph_unlink from Java with path " + abs_path);
 
1449
+      boolean result = ceph.ceph_unlink(getCephPath(abs_path));
 
1450
+
 
1451
+      if (!result) {
 
1452
+        LOG.error(
 
1453
+            "delete: failed to delete file \"" + abs_path.toString() + "\".");
 
1454
+      }
 
1455
+      LOG.debug("delete:exit with success=" + result);
 
1456
+      return result;
 
1457
+    }
 
1458
+
 
1459
+    /* The path is a directory, so recursively try to delete its contents,
 
1460
+     and then delete the directory. */
 
1461
+    // get the entries; listPaths will remove . and .. for us
 
1462
+    Path[] contents = listPaths(abs_path);
 
1463
+
 
1464
+    if (contents == null) {
 
1465
+      LOG.error(
 
1466
+          "delete: Failed to read contents of directory \""
 
1467
+              + abs_path.toString() + "\" while trying to delete it, BAILING");
 
1468
+      return false;
 
1469
+    }
 
1470
+    if (!recursive && contents.length > 0) {
 
1471
+      throw new IOException("Directories must be deleted recursively!");
 
1472
+    }
 
1473
+    // delete the entries
 
1474
+    LOG.debug("delete: recursively calling delete on contents of " + abs_path);
 
1475
+    for (Path p : contents) {
 
1476
+      if (!delete(p, true)) {
 
1477
+        LOG.error(
 
1478
+            "delete: Failed to delete file \"" + p.toString()
 
1479
+            + "\" while recursively deleting \"" + abs_path.toString()
 
1480
+            + "\", BAILING");
 
1481
+        return false;
 
1482
+      }
 
1483
+    }
 
1484
+    // if we've come this far it's a now-empty directory, so delete it!
 
1485
+    boolean result = ceph.ceph_rmdir(getCephPath(abs_path));
 
1486
+
 
1487
+    if (!result) {
 
1488
+      LOG.error(
 
1489
+          "delete: failed to delete \"" + abs_path.toString() + "\", BAILING");
 
1490
+    }
 
1491
+    LOG.debug("delete:exit");
 
1492
+    return result;
 
1493
+  }
 
1494
+
 
1495
+  /**
 
1496
+   * Returns the default replication value of 1. This may
 
1497
+   * NOT be the actual value, as replication is controlled
 
1498
+   * by a separate Ceph configuration.
 
1499
+   */
 
1500
+  @Override
 
1501
+  public short getDefaultReplication() {
 
1502
+    return 1;
 
1503
+  }
 
1504
+
 
1505
+  /**
 
1506
+   * Get the default block size.
 
1507
+   * @return the default block size, in bytes, as a long.
 
1508
+   */
 
1509
+  @Override
 
1510
+  public long getDefaultBlockSize() {
 
1511
+    return getConf().getInt("fs.ceph.blockSize", 1 << 26);
 
1512
+  }
 
1513
+
 
1514
+  /**
 
1515
+   * Adds the working directory to path if path is not already
 
1516
+   * an absolute path. The URI scheme is not removed here. It
 
1517
+   * is removed only when users (e.g. ceph native calls) need
 
1518
+   * the path-only portion.
 
1519
+   */
 
1520
+  private Path makeAbsolute(Path path) {
 
1521
+    if (path.isAbsolute()) {
 
1522
+      return path;
 
1523
+    }
 
1524
+    return new Path(workingDir, path);
 
1525
+  }
 
1526
+
 
1527
+  private Path[] listPaths(Path path) throws IOException {
 
1528
+    LOG.debug("listPaths:enter with path " + path);
 
1529
+    String dirlist[];
 
1530
+
 
1531
+    Path abs_path = makeAbsolute(path);
 
1532
+
 
1533
+    // If it's a directory, get the listing. Otherwise, complain and give up.
 
1534
+    LOG.debug("calling ceph_getdir from Java with path " + abs_path);
 
1535
+    dirlist = ceph.ceph_getdir(getCephPath(abs_path));
 
1536
+    LOG.debug("returning from ceph_getdir to Java");
 
1537
+
 
1538
+    if (dirlist == null) {
 
1539
+      return null;
 
1540
+    }
 
1541
+
 
1542
+    // convert the strings to Paths
 
1543
+    Path[] paths = new Path[dirlist.length];
 
1544
+
 
1545
+    for (int i = 0; i < dirlist.length; ++i) {
 
1546
+      LOG.trace(
 
1547
+          "Raw enumeration of paths in \"" + abs_path.toString() + "\": \""
 
1548
+          + dirlist[i] + "\"");
 
1549
+      // convert each listing to an absolute path
 
1550
+      Path raw_path = new Path(dirlist[i]);
 
1551
+
 
1552
+      if (raw_path.isAbsolute()) {
 
1553
+        paths[i] = raw_path;
 
1554
+      } else {
 
1555
+        paths[i] = new Path(abs_path, raw_path);
 
1556
+      }
 
1557
+    }
 
1558
+    LOG.debug("listPaths:exit");
 
1559
+    return paths;
 
1560
+  }
 
1561
+
 
1562
+  static class Stat {
 
1563
+    public long size;
 
1564
+    public boolean is_dir;
 
1565
+    public long block_size;
 
1566
+    public long mod_time;
 
1567
+    public long access_time;
 
1568
+    public int mode;
 
1569
+
 
1570
+    public Stat() {}
 
1571
+  }
 
1572
+}
 
1573
diff --git a/src/core/org/apache/hadoop/fs/ceph/CephInputStream.java b/src/core/org/apache/hadoop/fs/ceph/CephInputStream.java
 
1574
new file mode 100644
 
1575
index 0000000..d9668d0
 
1576
--- /dev/null
 
1577
+++ b/src/core/org/apache/hadoop/fs/ceph/CephInputStream.java
 
1578
@@ -0,0 +1,254 @@
 
1579
+// -*- mode:Java; tab-width:2; c-basic-offset:2; indent-tabs-mode:t -*- 
 
1580
+
 
1581
+/**
 
1582
+ *
 
1583
+ * Licensed under the Apache License, Version 2.0
 
1584
+ * (the "License"); you may not use this file except in compliance with
 
1585
+ * the License. You may obtain a copy of the License at
 
1586
+ *
 
1587
+ * http://www.apache.org/licenses/LICENSE-2.0
 
1588
+ *
 
1589
+ * Unless required by applicable law or agreed to in writing, software
 
1590
+ * distributed under the License is distributed on an "AS IS" BASIS,
 
1591
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
 
1592
+ * implied. See the License for the specific language governing
 
1593
+ * permissions and limitations under the License.
 
1594
+ *
 
1595
+ * 
 
1596
+ * Implements the Hadoop FS interfaces to allow applications to store
 
1597
+ * files in Ceph.
 
1598
+ */
 
1599
+package org.apache.hadoop.fs.ceph;
 
1600
+
 
1601
+
 
1602
+import java.io.IOException;
 
1603
+
 
1604
+import org.apache.commons.logging.Log;
 
1605
+import org.apache.commons.logging.LogFactory;
 
1606
+import org.apache.hadoop.conf.Configuration;
 
1607
+import org.apache.hadoop.fs.FSInputStream;
 
1608
+
 
1609
+
 
1610
+/**
 
1611
+ * <p>
 
1612
+ * An {@link FSInputStream} for a CephFileSystem and corresponding
 
1613
+ * Ceph instance.
 
1614
+ */
 
1615
+public class CephInputStream extends FSInputStream {
 
1616
+  private static final Log LOG = LogFactory.getLog(CephInputStream.class);
 
1617
+  private boolean closed;
 
1618
+
 
1619
+  private int fileHandle;
 
1620
+
 
1621
+  private long fileLength;
 
1622
+
 
1623
+  private CephFS ceph;
 
1624
+
 
1625
+  private byte[] buffer;
 
1626
+  private int bufPos = 0;
 
1627
+  private int bufValid = 0;
 
1628
+  private long cephPos = 0;
 
1629
+
 
1630
+  /**
 
1631
+   * Create a new CephInputStream.
 
1632
+   * @param conf The system configuration. Unused.
 
1633
+   * @param fh The filehandle provided by Ceph to reference.
 
1634
+   * @param flength The current length of the file. If the length changes
 
1635
+   * you will need to close and re-open it to access the new data.
 
1636
+   */
 
1637
+  public CephInputStream(Configuration conf, CephFS cephfs,
 
1638
+      int fh, long flength, int bufferSize) {
 
1639
+    // Whoever's calling the constructor is responsible for doing the actual ceph_open
 
1640
+    // call and providing the file handle.
 
1641
+    fileLength = flength;
 
1642
+    fileHandle = fh;
 
1643
+    closed = false;
 
1644
+    ceph = cephfs;
 
1645
+    buffer = new byte[bufferSize];
 
1646
+    LOG.debug(
 
1647
+        "CephInputStream constructor: initializing stream with fh " + fh
 
1648
+        + " and file length " + flength);
 
1649
+      
 
1650
+  }
 
1651
+
 
1652
+  /** Ceph likes things to be closed before it shuts down,
 
1653
+   * so closing the IOStream stuff voluntarily in a finalizer is good
 
1654
+   */
 
1655
+  protected void finalize() throws Throwable {
 
1656
+    try {
 
1657
+      if (!closed) {
 
1658
+        close();
 
1659
+      }
 
1660
+    } finally {
 
1661
+      super.finalize();
 
1662
+    }
 
1663
+  }
 
1664
+
 
1665
+  private synchronized boolean fillBuffer() throws IOException {
 
1666
+    bufValid = ceph.ceph_read(fileHandle, buffer, 0, buffer.length);
 
1667
+    bufPos = 0;
 
1668
+    if (bufValid < 0) {
 
1669
+      int err = bufValid;
 
1670
+
 
1671
+      bufValid = 0;
 
1672
+      // attempt to reset to old position. If it fails, too bad.
 
1673
+      ceph.ceph_seek_from_start(fileHandle, cephPos);
 
1674
+      throw new IOException("Failed to fill read buffer! Error code:" + err);
 
1675
+    }
 
1676
+    cephPos += bufValid;
 
1677
+    return (bufValid != 0);
 
1678
+  }
 
1679
+
 
1680
+  /*
 
1681
+   * Get the current position of the stream.
 
1682
+   */
 
1683
+  public synchronized long getPos() throws IOException {
 
1684
+    return cephPos - bufValid + bufPos;
 
1685
+  }
 
1686
+
 
1687
+  /**
 
1688
+   * Find the number of bytes remaining in the file.
 
1689
+   */
 
1690
+  @Override
 
1691
+  public synchronized int available() throws IOException {
 
1692
+    return (int) (fileLength - getPos());
 
1693
+  }
 
1694
+
 
1695
+  public synchronized void seek(long targetPos) throws IOException {
 
1696
+    LOG.trace(
 
1697
+        "CephInputStream.seek: Seeking to position " + targetPos + " on fd "
 
1698
+        + fileHandle);
 
1699
+    if (targetPos > fileLength) {
 
1700
+      throw new IOException(
 
1701
+          "CephInputStream.seek: failed seek to position " + targetPos
 
1702
+          + " on fd " + fileHandle + ": Cannot seek after EOF " + fileLength);
 
1703
+    }
 
1704
+    long oldPos = cephPos;
 
1705
+
 
1706
+    cephPos = ceph.ceph_seek_from_start(fileHandle, targetPos);
 
1707
+    bufValid = 0;
 
1708
+    bufPos = 0;
 
1709
+    if (cephPos < 0) {
 
1710
+      cephPos = oldPos;
 
1711
+      throw new IOException("Ceph failed to seek to new position!");
 
1712
+    }
 
1713
+  }
 
1714
+
 
1715
+  /**
 
1716
+   * Failovers are handled by the Ceph code at a very low level;
 
1717
+   * if there are issues that can be solved by changing sources
 
1718
+   * they'll be dealt with before anybody even tries to call this method!
 
1719
+   * @return false.
 
1720
+   */
 
1721
+  public synchronized boolean seekToNewSource(long targetPos) {
 
1722
+    return false;
 
1723
+  }
 
1724
+    
 
1725
+  /**
 
1726
+   * Read a byte from the file.
 
1727
+   * @return the next byte.
 
1728
+   */
 
1729
+  @Override
 
1730
+  public synchronized int read() throws IOException {
 
1731
+    LOG.trace(
 
1732
+        "CephInputStream.read: Reading a single byte from fd " + fileHandle
 
1733
+        + " by calling general read function");
 
1734
+
 
1735
+    byte result[] = new byte[1];
 
1736
+
 
1737
+    if (getPos() >= fileLength) {
 
1738
+      return -1;
 
1739
+    }
 
1740
+    if (-1 == read(result, 0, 1)) {
 
1741
+      return -1;
 
1742
+    }
 
1743
+    if (result[0] < 0) {
 
1744
+      return 256 + (int) result[0];
 
1745
+    } else {
 
1746
+      return result[0];
 
1747
+    }
 
1748
+  }
 
1749
+
 
1750
+  /**
 
1751
+   * Read a specified number of bytes from the file into a byte[].
 
1752
+   * @param buf the byte array to read into.
 
1753
+   * @param off the offset to start at in the file
 
1754
+   * @param len the number of bytes to read
 
1755
+   * @return 0 if successful, otherwise an error code.
 
1756
+   * @throws IOException on bad input.
 
1757
+   */
 
1758
+  @Override
 
1759
+  public synchronized int read(byte buf[], int off, int len)
 
1760
+    throws IOException {
 
1761
+    LOG.trace(
 
1762
+        "CephInputStream.read: Reading " + len + " bytes from fd " + fileHandle);
 
1763
+      
 
1764
+    if (closed) {
 
1765
+      throw new IOException(
 
1766
+          "CephInputStream.read: cannot read " + len + " bytes from fd "
 
1767
+          + fileHandle + ": stream closed");
 
1768
+    }
 
1769
+                       
 
1770
+    // ensure we're not past the end of the file
 
1771
+    if (getPos() >= fileLength) {
 
1772
+      LOG.debug(
 
1773
+          "CephInputStream.read: cannot read " + len + " bytes from fd "
 
1774
+          + fileHandle + ": current position is " + getPos()
 
1775
+          + " and file length is " + fileLength);
 
1776
+                               
 
1777
+      return -1;
 
1778
+    }
 
1779
+
 
1780
+    int totalRead = 0;
 
1781
+    int initialLen = len;
 
1782
+    int read;
 
1783
+
 
1784
+    do {
 
1785
+      read = Math.min(len, bufValid - bufPos);
 
1786
+      try {
 
1787
+        System.arraycopy(buffer, bufPos, buf, off, read);
 
1788
+      } catch (IndexOutOfBoundsException ie) {
 
1789
+        throw new IOException(
 
1790
+            "CephInputStream.read: Indices out of bounds:" + "read length is "
 
1791
+            + len + ", buffer offset is " + off + ", and buffer size is "
 
1792
+            + buf.length);
 
1793
+      } catch (ArrayStoreException ae) {
 
1794
+        throw new IOException(
 
1795
+            "Uh-oh, CephInputStream failed to do an array"
 
1796
+                + "copy due to type mismatch...");
 
1797
+      } catch (NullPointerException ne) {
 
1798
+        throw new IOException(
 
1799
+            "CephInputStream.read: cannot read " + len + "bytes from fd:"
 
1800
+            + fileHandle + ": buf is null");
 
1801
+      }
 
1802
+      bufPos += read;
 
1803
+      len -= read;
 
1804
+      off += read;
 
1805
+      totalRead += read;
 
1806
+    } while (len > 0 && fillBuffer());
 
1807
+
 
1808
+    LOG.trace(
 
1809
+        "CephInputStream.read: Reading " + initialLen + " bytes from fd "
 
1810
+        + fileHandle + ": succeeded in reading " + totalRead + " bytes");
 
1811
+    return totalRead;
 
1812
+  }
 
1813
+
 
1814
+  /**
 
1815
+   * Close the CephInputStream and release the associated filehandle.
 
1816
+   */
 
1817
+  @Override
 
1818
+  public void close() throws IOException {
 
1819
+    LOG.trace("CephOutputStream.close:enter");
 
1820
+    if (!closed) {
 
1821
+      int result = ceph.ceph_close(fileHandle);
 
1822
+
 
1823
+      closed = true;
 
1824
+      if (result != 0) {
 
1825
+        throw new IOException(
 
1826
+            "Close somehow failed!"
 
1827
+                + "Don't try and use this stream again, though");
 
1828
+      }
 
1829
+      LOG.trace("CephOutputStream.close:exit");
 
1830
+    }
 
1831
+  }
 
1832
+}
 
1833
diff --git a/src/core/org/apache/hadoop/fs/ceph/CephOutputStream.java b/src/core/org/apache/hadoop/fs/ceph/CephOutputStream.java
 
1834
new file mode 100644
 
1835
index 0000000..4c50f88
 
1836
--- /dev/null
 
1837
+++ b/src/core/org/apache/hadoop/fs/ceph/CephOutputStream.java
 
1838
@@ -0,0 +1,219 @@
 
1839
+// -*- mode:Java; tab-width:2; c-basic-offset:2; indent-tabs-mode:t -*- 
 
1840
+
 
1841
+/**
 
1842
+ *
 
1843
+ * Licensed under the Apache License, Version 2.0
 
1844
+ * (the "License"); you may not use this file except in compliance with
 
1845
+ * the License. You may obtain a copy of the License at
 
1846
+ *
 
1847
+ * http://www.apache.org/licenses/LICENSE-2.0
 
1848
+ *
 
1849
+ * Unless required by applicable law or agreed to in writing, software
 
1850
+ * distributed under the License is distributed on an "AS IS" BASIS,
 
1851
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
 
1852
+ * implied. See the License for the specific language governing
 
1853
+ * permissions and limitations under the License.
 
1854
+ *
 
1855
+ * 
 
1856
+ * Implements the Hadoop FS interfaces to allow applications to store
 
1857
+ * files in Ceph.
 
1858
+ */
 
1859
+
 
1860
+package org.apache.hadoop.fs.ceph;
 
1861
+
 
1862
+
 
1863
+import java.io.IOException;
 
1864
+import java.io.OutputStream;
 
1865
+
 
1866
+import org.apache.commons.logging.Log;
 
1867
+import org.apache.commons.logging.LogFactory;
 
1868
+import org.apache.hadoop.conf.Configuration;
 
1869
+import org.apache.hadoop.util.Progressable;
 
1870
+
 
1871
+
 
1872
+/**
 
1873
+ * <p>
 
1874
+ * An {@link OutputStream} for a CephFileSystem and corresponding
 
1875
+ * Ceph instance.
 
1876
+ */
 
1877
+public class CephOutputStream extends OutputStream {
 
1878
+  private static final Log LOG = LogFactory.getLog(CephOutputStream.class);
 
1879
+  private boolean closed;
 
1880
+
 
1881
+  private CephFS ceph;
 
1882
+
 
1883
+  private int fileHandle;
 
1884
+
 
1885
+  private byte[] buffer;
 
1886
+  private int bufUsed = 0;
 
1887
+
 
1888
+  /**
 
1889
+   * Construct the CephOutputStream.
 
1890
+   * @param conf The FileSystem configuration.
 
1891
+   * @param fh The Ceph filehandle to connect to.
 
1892
+   */
 
1893
+  public CephOutputStream(Configuration conf, CephFS cephfs,
 
1894
+      int fh, int bufferSize) {
 
1895
+    ceph = cephfs;
 
1896
+    fileHandle = fh;
 
1897
+    closed = false;
 
1898
+    buffer = new byte[bufferSize];
 
1899
+  }
 
1900
+
 
1901
+  /** Ceph likes things to be closed before it shuts down,
 
1902
+   *so closing the IOStream stuff voluntarily is good
 
1903
+   */
 
1904
+  protected void finalize() throws Throwable {
 
1905
+    try {
 
1906
+      if (!closed) {
 
1907
+        close();
 
1908
+      }
 
1909
+    } finally {
 
1910
+      super.finalize();
 
1911
+    }
 
1912
+  }
 
1913
+
 
1914
+  /**
 
1915
+   * Get the current position in the file.
 
1916
+   * @return The file offset in bytes.
 
1917
+   */
 
1918
+  public long getPos() throws IOException {
 
1919
+    return ceph.ceph_getpos(fileHandle);
 
1920
+  }
 
1921
+
 
1922
+  /**
 
1923
+   * Write a byte.
 
1924
+   * @param b The byte to write.
 
1925
+   * @throws IOException If you have closed the CephOutputStream or the
 
1926
+   * write fails.
 
1927
+   */
 
1928
+  @Override
 
1929
+  public synchronized void write(int b) throws IOException {
 
1930
+    LOG.trace(
 
1931
+        "CephOutputStream.write: writing a single byte to fd " + fileHandle);
 
1932
+
 
1933
+    if (closed) {
 
1934
+      throw new IOException(
 
1935
+          "CephOutputStream.write: cannot write " + "a byte to fd " + fileHandle
 
1936
+          + ": stream closed");
 
1937
+    }
 
1938
+    // Stick the byte in a buffer and write it
 
1939
+    byte buf[] = new byte[1];
 
1940
+
 
1941
+    buf[0] = (byte) b;    
 
1942
+    write(buf, 0, 1);
 
1943
+    return;
 
1944
+  }
 
1945
+
 
1946
+  /**
 
1947
+   * Write a byte buffer into the Ceph file.
 
1948
+   * @param buf the byte array to write from
 
1949
+   * @param off the position in the file to start writing at.
 
1950
+   * @param len The number of bytes to actually write.
 
1951
+   * @throws IOException if you have closed the CephOutputStream, or
 
1952
+   * if buf is null or off + len > buf.length, or
 
1953
+   * if the write fails due to a Ceph error.
 
1954
+   */
 
1955
+  @Override
 
1956
+  public synchronized void write(byte buf[], int off, int len) throws IOException {
 
1957
+    LOG.trace(
 
1958
+        "CephOutputStream.write: writing " + len + " bytes to fd " + fileHandle);
 
1959
+    // make sure stream is open
 
1960
+    if (closed) {
 
1961
+      throw new IOException(
 
1962
+          "CephOutputStream.write: cannot write " + len + "bytes to fd "
 
1963
+          + fileHandle + ": stream closed");
 
1964
+    }
 
1965
+               
 
1966
+    int result;
 
1967
+    int write;
 
1968
+
 
1969
+    while (len > 0) {
 
1970
+      write = Math.min(len, buffer.length - bufUsed);
 
1971
+      try {
 
1972
+        System.arraycopy(buf, off, buffer, bufUsed, write);
 
1973
+      } catch (IndexOutOfBoundsException ie) {
 
1974
+        throw new IOException(
 
1975
+            "CephOutputStream.write: Indices out of bounds: "
 
1976
+                + "write length is " + len + ", buffer offset is " + off
 
1977
+                + ", and buffer size is " + buf.length);
 
1978
+      } catch (ArrayStoreException ae) {
 
1979
+        throw new IOException(
 
1980
+            "Uh-oh, CephOutputStream failed to do an array"
 
1981
+                + " copy due to type mismatch...");
 
1982
+      } catch (NullPointerException ne) {
 
1983
+        throw new IOException(
 
1984
+            "CephOutputStream.write: cannot write " + len + "bytes to fd "
 
1985
+            + fileHandle + ": buffer is null");
 
1986
+      }
 
1987
+      bufUsed += write;
 
1988
+      len -= write;
 
1989
+      off += write;
 
1990
+      if (bufUsed == buffer.length) {
 
1991
+        result = ceph.ceph_write(fileHandle, buffer, 0, bufUsed);
 
1992
+        if (result < 0) {
 
1993
+          throw new IOException(
 
1994
+              "CephOutputStream.write: Buffered write of " + bufUsed
 
1995
+              + " bytes failed!");
 
1996
+        }
 
1997
+        if (result != bufUsed) {
 
1998
+          throw new IOException(
 
1999
+              "CephOutputStream.write: Wrote only " + result + " bytes of "
 
2000
+              + bufUsed + " in buffer! Data may be lost or written"
 
2001
+              + " twice to Ceph!");
 
2002
+        }
 
2003
+        bufUsed = 0;
 
2004
+      }
 
2005
+
 
2006
+    }
 
2007
+    return; 
 
2008
+  }
 
2009
+   
 
2010
+  /**
 
2011
+   * Flush the buffered data.
 
2012
+   * @throws IOException if you've closed the stream or the write fails.
 
2013
+   */
 
2014
+  @Override
 
2015
+  public synchronized void flush() throws IOException {
 
2016
+    if (!closed) {
 
2017
+      if (bufUsed == 0) {
 
2018
+        return;
 
2019
+      }
 
2020
+      int result = ceph.ceph_write(fileHandle, buffer, 0, bufUsed);
 
2021
+
 
2022
+      if (result < 0) {
 
2023
+        throw new IOException(
 
2024
+            "CephOutputStream.write: Write of " + bufUsed + "bytes to fd "
 
2025
+            + fileHandle + " failed");
 
2026
+      }
 
2027
+      if (result != bufUsed) {
 
2028
+        throw new IOException(
 
2029
+            "CephOutputStream.write: Write of " + bufUsed + "bytes to fd "
 
2030
+            + fileHandle + "was incomplete:  only " + result + " of " + bufUsed
 
2031
+            + " bytes were written.");
 
2032
+      }
 
2033
+      bufUsed = 0;
 
2034
+      return;
 
2035
+    }
 
2036
+  }
 
2037
+  
 
2038
+  /**
 
2039
+   * Close the CephOutputStream.
 
2040
+   * @throws IOException if Ceph somehow returns an error. In current code it can't.
 
2041
+   */
 
2042
+  @Override
 
2043
+  public synchronized void close() throws IOException {
 
2044
+    LOG.trace("CephOutputStream.close:enter");
 
2045
+    if (!closed) {
 
2046
+      flush();
 
2047
+      int result = ceph.ceph_close(fileHandle);
 
2048
+
 
2049
+      if (result != 0) {
 
2050
+        throw new IOException("Close failed!");
 
2051
+      }
 
2052
+                               
 
2053
+      closed = true;
 
2054
+      LOG.trace("CephOutputStream.close:exit");
 
2055
+    }
 
2056
+  }
 
2057
+}
 
2058
diff --git a/src/core/org/apache/hadoop/fs/ceph/CephTalker.java b/src/core/org/apache/hadoop/fs/ceph/CephTalker.java
 
2059
new file mode 100644
 
2060
index 0000000..569652f
 
2061
--- /dev/null
 
2062
+++ b/src/core/org/apache/hadoop/fs/ceph/CephTalker.java
 
2063
@@ -0,0 +1,91 @@
 
2064
+// -*- mode:Java; tab-width:2; c-basic-offset:2; indent-tabs-mode:t -*- 
 
2065
+
 
2066
+/**
 
2067
+ *
 
2068
+ * Licensed under the Apache License, Version 2.0
 
2069
+ * (the "License"); you may not use this file except in compliance with
 
2070
+ * the License. You may obtain a copy of the License at
 
2071
+ *
 
2072
+ * http://www.apache.org/licenses/LICENSE-2.0
 
2073
+ *
 
2074
+ * Unless required by applicable law or agreed to in writing, software
 
2075
+ * distributed under the License is distributed on an "AS IS" BASIS,
 
2076
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
 
2077
+ * implied. See the License for the specific language governing
 
2078
+ * permissions and limitations under the License.
 
2079
+ *
 
2080
+ * 
 
2081
+ * Wraps a number of native function calls to communicate with the Ceph
 
2082
+ * filesystem.
 
2083
+ */
 
2084
+package org.apache.hadoop.fs.ceph;
 
2085
+
 
2086
+
 
2087
+import org.apache.hadoop.conf.Configuration;
 
2088
+import org.apache.commons.logging.Log;
 
2089
+
 
2090
+
 
2091
+class CephTalker extends CephFS {
 
2092
+  // JNI doesn't give us any way to store pointers, so use a long.
 
2093
+  // Here we're assuming pointers aren't longer than 8 bytes.
 
2094
+  long cluster;
 
2095
+
 
2096
+  // we write a constructor so we can load the libraries
 
2097
+  public CephTalker(Configuration conf, Log log) {
 
2098
+    System.load(conf.get("fs.ceph.libDir") + "/libcephfs.so");
 
2099
+    System.load(conf.get("fs.ceph.libDir") + "/libhadoopcephfs.so");
 
2100
+    cluster = 0;
 
2101
+  }
 
2102
+
 
2103
+  protected native boolean ceph_initializeClient(String arguments, int block_size);
 
2104
+
 
2105
+  protected native String ceph_getcwd();
 
2106
+
 
2107
+  protected native boolean ceph_setcwd(String path);
 
2108
+
 
2109
+  protected native boolean ceph_rmdir(String path);
 
2110
+
 
2111
+  protected native boolean ceph_unlink(String path);
 
2112
+
 
2113
+  protected native boolean ceph_rename(String old_path, String new_path);
 
2114
+
 
2115
+  protected native boolean ceph_exists(String path);
 
2116
+
 
2117
+  protected native long ceph_getblocksize(String path);
 
2118
+
 
2119
+  protected native boolean ceph_isdirectory(String path);
 
2120
+
 
2121
+  protected native boolean ceph_isfile(String path);
 
2122
+
 
2123
+  protected native String[] ceph_getdir(String path);
 
2124
+
 
2125
+  protected native int ceph_mkdirs(String path, int mode);
 
2126
+
 
2127
+  protected native int ceph_open_for_append(String path);
 
2128
+
 
2129
+  protected native int ceph_open_for_read(String path);
 
2130
+
 
2131
+  protected native int ceph_open_for_overwrite(String path, int mode);
 
2132
+
 
2133
+  protected native int ceph_close(int filehandle);
 
2134
+
 
2135
+  protected native boolean ceph_setPermission(String path, int mode);
 
2136
+
 
2137
+  protected native boolean ceph_kill_client();
 
2138
+
 
2139
+  protected native boolean ceph_stat(String path, CephFileSystem.Stat fill);
 
2140
+
 
2141
+  protected native int ceph_replication(String Path);
 
2142
+
 
2143
+  protected native String[] ceph_hosts(int fh, long offset);
 
2144
+
 
2145
+  protected native int ceph_setTimes(String path, long mtime, long atime);
 
2146
+
 
2147
+  protected native long ceph_getpos(int fh);
 
2148
+
 
2149
+  protected native int ceph_write(int fh, byte[] buffer, int buffer_offset, int length);
 
2150
+
 
2151
+  protected native int ceph_read(int fh, byte[] buffer, int buffer_offset, int length);
 
2152
+
 
2153
+  protected native long ceph_seek_from_start(int fh, long pos);
 
2154
+}
 
2155
diff --git a/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java b/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java
 
2156
index 9e22f1f..cd55361 100644
 
2157
--- a/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java
 
2158
+++ b/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java
 
2159
@@ -386,10 +386,12 @@ public class TrackerDistributedCacheManager {
 
2160
     if (modifiedTime != desiredTimestamp) {
 
2161
       DateFormat df = DateFormat.getDateTimeInstance(DateFormat.SHORT, 
 
2162
                                                      DateFormat.SHORT);
 
2163
+      /*
 
2164
       throw new IOException("The distributed cache object " + source + 
 
2165
                             " changed during the job from " + 
 
2166
                             df.format(new Date(desiredTimestamp)) + " to " +
 
2167
                             df.format(new Date(modifiedTime)));
 
2168
+      */
 
2169
     }
 
2170
     
 
2171
     Path parchive = null;
 
2172
diff --git a/src/test/commit-tests b/src/test/commit-tests
 
2173
index 1148c8b..85fa53d 100644
 
2174
--- a/src/test/commit-tests
 
2175
+++ b/src/test/commit-tests
 
2176
@@ -53,6 +53,7 @@
 
2177
 **/TestRPC.java
 
2178
 **/TestS3Credentials.java
 
2179
 **/TestS3FileSystem.java
 
2180
+**/TestCeph.java
 
2181
 **/TestSaslRPC.java
 
2182
 **/TestScriptBasedMapping.java
 
2183
 **/TestSequenceFileSerialization.java
 
2184
diff --git a/src/test/org/apache/hadoop/fs/ceph/TestCeph.java b/src/test/org/apache/hadoop/fs/ceph/TestCeph.java
 
2185
new file mode 100644
 
2186
index 0000000..e46b0ee
 
2187
--- /dev/null
 
2188
+++ b/src/test/org/apache/hadoop/fs/ceph/TestCeph.java
 
2189
@@ -0,0 +1,45 @@
 
2190
+// -*- mode:Java; tab-width:2; c-basic-offset:2; indent-tabs-mode:t -*- 
 
2191
+
 
2192
+/**
 
2193
+ * Licensed to the Apache Software Foundation (ASF) under one
 
2194
+ * or more contributor license agreements.  See the NOTICE file
 
2195
+ * distributed with this work for additional information
 
2196
+ * regarding copyright ownership.  The ASF licenses this file
 
2197
+ * to you under the Apache License, Version 2.0 (the
 
2198
+ * "License"); you may not use this file except in compliance
 
2199
+ * with the License.  You may obtain a copy of the License at
 
2200
+ *
 
2201
+ *     http://www.apache.org/licenses/LICENSE-2.0
 
2202
+ *
 
2203
+ * Unless required by applicable law or agreed to in writing, software
 
2204
+ * distributed under the License is distributed on an "AS IS" BASIS,
 
2205
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
2206
+ * See the License for the specific language governing permissions and
 
2207
+ * limitations under the License.
 
2208
+ *
 
2209
+ * Unit tests for the CephFileSystem API implementation.
 
2210
+ */
 
2211
+
 
2212
+package org.apache.hadoop.fs.ceph;
 
2213
+
 
2214
+
 
2215
+import java.io.IOException;
 
2216
+import java.net.URI;
 
2217
+import org.apache.hadoop.conf.Configuration;
 
2218
+import org.apache.hadoop.fs.FileSystemContractBaseTest;
 
2219
+import org.apache.hadoop.fs.FileSystem;
 
2220
+import org.apache.hadoop.fs.Path;
 
2221
+
 
2222
+
 
2223
+public class TestCeph extends FileSystemContractBaseTest {
 
2224
+
 
2225
+  @Override
 
2226
+  protected void setUp() throws IOException {
 
2227
+    Configuration conf = new Configuration();
 
2228
+    CephFaker cephfaker = new CephFaker(conf, FileSystem.LOG);
 
2229
+    CephFileSystem cephfs = new CephFileSystem(cephfaker);
 
2230
+
 
2231
+    cephfs.initialize(URI.create("ceph://null"), conf);
 
2232
+    fs = cephfs;
 
2233
+  }
 
2234
+}