2
Copyright (C) 2008- The University of Notre Dame
3
This software is distributed under the GNU General Public License.
4
See the file COPYING for details.
7
#include "chirp_filesystem.h"
8
#include "chirp_hdfs.h"
9
#include "chirp_protocol.h"
13
#include "hash_table.h"
18
#include "hdfs_library.h"
33
// HDFS gets upset if a path begins with two slashes.
34
// This macro simply skips over the first slash if needed.
35
#define FIXPATH(p) ( (p[0]=='/' && p[1]=='/') ? &p[1] : p )
37
char *chirp_hdfs_hostname = NULL;
38
UINT16_T chirp_hdfs_port = 0;
40
extern char chirp_owner[USERNAME_MAX];
42
static struct hdfs_library *hdfs_services = 0;
43
static hdfsFS fs = NULL;
45
/* Array of open HDFS Files */
46
#define BASE_SIZE 1024
47
static struct chirp_hdfs_file {
50
} open_files[BASE_SIZE]; // = NULL;
52
INT64_T chirp_hdfs_init(const char *path)
54
static const char *groups[] = { "supergroup" };
60
if(chirp_hdfs_hostname == NULL)
61
fatal("hostname and port must be specified, use -x option");
63
debug(D_HDFS, "initializing", chirp_hdfs_hostname, chirp_hdfs_port);
67
for(i = 0; i < BASE_SIZE; i++)
68
open_files[i].name = NULL;
71
hdfs_services = hdfs_library_open();
76
debug(D_HDFS, "connecting to %s:%u as '%s'\n", chirp_hdfs_hostname, chirp_hdfs_port, chirp_owner);
77
fs = hdfs_services->connect_as_user(chirp_hdfs_hostname, chirp_hdfs_port, chirp_owner, groups, 1);
80
return (errno = ENOSYS, -1);
85
INT64_T chirp_hdfs_destroy(void)
90
debug(D_HDFS, "destroying hdfs connection", chirp_hdfs_hostname, chirp_hdfs_port);
91
ret = hdfs_services->disconnect(fs);
95
hdfs_library_close(hdfs_services);
99
static void copystat(struct chirp_stat *cs, hdfsFileInfo * hs, const char *path )
101
memset(cs,0,sizeof(*cs));
104
cs->cst_ino = hash_string(path);
105
cs->cst_mode = hs->mKind == kObjectKindDirectory ? S_IFDIR : S_IFREG;
107
/* HDFS does not have execute bit, lie and set it for all files */
108
cs->cst_mode |= hs->mPermissions | S_IXUSR | S_IXGRP;
109
cs->cst_nlink = hs->mReplication;
112
cs->cst_size = hs->mSize;
113
cs->cst_blksize = hs->mBlockSize;
115
/* If the blocksize is not set, assume 64MB chunksize */
116
if(cs->cst_blksize<1) cs->cst_blksize = 64*1024*1024;
117
cs->cst_blocks = MAX(1,cs->cst_size/cs->cst_blksize);
119
/* Note that hs->mLastAccess is typically zero. */
120
cs->cst_atime = cs->cst_mtime = cs->cst_ctime = hs->mLastMod;
123
INT64_T chirp_hdfs_fstat(int fd, struct chirp_stat *buf)
125
return chirp_hdfs_stat(open_files[fd].name, buf);
128
INT64_T chirp_hdfs_stat(const char *path, struct chirp_stat * buf)
130
hdfsFileInfo *file_info;
132
path = FIXPATH(path);
134
debug(D_HDFS, "stat %s", path);
136
file_info = hdfs_services->stat(fs,path);
137
if(file_info == NULL)
138
return (errno = ENOENT, -1);
139
copystat(buf, file_info, path);
140
hdfs_services->free_stat(file_info, 1);
145
struct chirp_hdfs_dir {
152
void *chirp_hdfs_opendir(const char *path)
154
struct chirp_hdfs_dir *d;
156
path = FIXPATH(path);
158
debug(D_HDFS, "opendir %s", path);
160
d = xxmalloc(sizeof(struct chirp_hdfs_dir));
161
d->info = hdfs_services->listdir(fs,path, &d->n);
163
d->path = xstrdup(path);
165
if(d->info == NULL) {
174
char *chirp_hdfs_readdir(void *dir)
176
struct chirp_hdfs_dir *d = (struct chirp_hdfs_dir *) dir;
177
debug(D_HDFS, "readdir %s", d->path);
179
/* mName is of the form hdfs:/hostname:port/path/to/file */
180
char *entry = d->info[d->i++].mName;
181
entry += strlen(entry); /* now points to nul byte */
182
while(entry[-1] != '/')
189
void chirp_hdfs_closedir(void *dir)
191
struct chirp_hdfs_dir *d = (struct chirp_hdfs_dir *) dir;
192
debug(D_HDFS, "closedir", d->path);
193
hdfs_services->free_stat(d->info, d->n);
198
INT64_T chirp_hdfs_file_size(const char *path)
200
struct chirp_stat info;
201
path = FIXPATH(path);
202
if(chirp_hdfs_stat(path, &info) == 0) {
203
return info.cst_size;
209
INT64_T chirp_hdfs_fd_size(int fd)
211
struct chirp_stat info;
212
debug(D_HDFS, "fstat on file descriptor %d, path = %s", fd, open_files[fd].name);
213
if(chirp_hdfs_fstat(fd, &info) == 0) {
214
return info.cst_size;
220
static INT64_T get_fd(void)
223
/* find an unused file descriptor */
224
for(fd = 0; fd < BASE_SIZE; fd++)
225
if(open_files[fd].name == NULL)
227
debug(D_HDFS, "too many files open");
232
static char *read_buffer(const char *path, int entire_file, INT64_T * size)
238
if(entire_file) { /* read entire file? */
239
struct chirp_stat info;
240
if(chirp_hdfs_stat(path, &info) == -1)
242
*size = info.cst_size;
245
file = hdfs_services->open(fs,path, O_RDONLY, 0, 0, 0);
249
buffer = xxmalloc(sizeof(char) * (*size));
250
memset(buffer, 0, sizeof(char) * (*size));
252
while(current < *size) {
253
INT64_T ractual = hdfs_services->read(fs, file, buffer + current, *size - current);
258
hdfs_services->close(fs, file);
262
static INT64_T write_buffer(const char *path, const char *buffer, size_t size)
272
file = hdfs_services->open(fs,path, O_WRONLY, 0, 0, 0);
274
return -1; /* errno is set */
276
while(current < size) {
277
INT64_T wactual = hdfs_services->write(fs, file, buffer, size - current);
282
open_files[fd].file = file;
283
open_files[fd].name = xstrdup(path);
288
INT64_T chirp_hdfs_open(const char *path, INT64_T flags, INT64_T mode)
290
INT64_T fd, stat_result;
291
struct chirp_stat info;
293
path = FIXPATH(path);
295
stat_result = chirp_hdfs_stat(path, &info);
301
mode = 0600 | (mode & 0100);
302
switch (flags & O_ACCMODE) {
304
debug(D_HDFS, "opening file %s (flags: %o) for reading; mode: %o", path, flags, mode);
305
if(stat_result == -1)
306
return (errno = ENOENT, -1); /* HDFS screws this up */
309
debug(D_HDFS, "opening file %s (flags: %o) for writing; mode: %o", path, flags, mode);
310
/* Check if file exists already */
311
if(stat_result < 0) {
313
break; /* probably doesn't exist, continue.... */
314
} else if(S_ISDIR(info.cst_mode))
315
return (errno = EISDIR, -1);
316
else if(O_TRUNC & flags) {
317
/* delete file, then open again */
318
INT64_T result = hdfs_services->unlink(fs,path);
320
return (errno = EIO, -1);
323
} else if(!(O_APPEND & flags)) {
324
debug(D_HDFS, "file does not have append flag set, setting it anyway");
325
/* return (errno = ENOTSUP, -1); */
329
char *buffer = read_buffer(path, 1, &size);
332
INT64_T fd = write_buffer(path, buffer, size);
336
debug(D_HDFS, "invalid file open flag %o", flags & O_ACCMODE);
337
return (errno = EINVAL, -1);
340
open_files[fd].file = hdfs_services->open(fs, path, flags, 0, 0, 0);
341
if(open_files[fd].file == NULL) {
342
debug(D_HDFS, "could not open file %s", path);
345
open_files[fd].name = xstrdup(path);
350
INT64_T chirp_hdfs_close(int fd)
352
debug(D_HDFS, "closing file %s", open_files[fd].name);
353
free(open_files[fd].name);
354
open_files[fd].name = NULL;
355
return hdfs_services->close(fs, open_files[fd].file);
358
INT64_T chirp_hdfs_pread(int fd, void *buffer, INT64_T length, INT64_T offset)
360
debug(D_HDFS, "pread %s", open_files[fd].name);
361
return hdfs_services->pread(fs, open_files[fd].file, offset, buffer, length);
364
INT64_T chirp_hdfs_sread(int fd, void *vbuffer, INT64_T length, INT64_T stride_length, INT64_T stride_skip, INT64_T offset)
368
char *buffer = vbuffer;
370
if(stride_length < 0 || stride_skip < 0 || offset < 0) {
375
while(length >= stride_length) {
376
actual = chirp_hdfs_pread(fd, &buffer[total], stride_length, offset);
380
offset += stride_skip;
381
if(actual == stride_length) {
402
INT64_T chirp_hdfs_pwrite(int fd, const void *buffer, INT64_T length, INT64_T offset)
404
/* FIXME deal with non-appends gracefully using an error if not costly */
405
debug(D_HDFS, "pwrite %s", open_files[fd].name);
406
return hdfs_services->write(fs, open_files[fd].file, buffer, length);
409
INT64_T chirp_hdfs_swrite(int fd, const void *vbuffer, INT64_T length, INT64_T stride_length, INT64_T stride_skip, INT64_T offset)
413
const char *buffer = vbuffer;
415
if(stride_length < 0 || stride_skip < 0 || offset < 0) {
420
while(length >= stride_length) {
421
actual = chirp_hdfs_pwrite(fd, &buffer[total], stride_length, offset);
425
offset += stride_skip;
426
if(actual == stride_length) {
447
INT64_T chirp_hdfs_fchown(int fd, INT64_T uid, INT64_T gid)
449
// Changing file ownership is silently ignored,
450
// because permissions are handled through the ACL model.
451
debug(D_HDFS, "fchown %s %ld %ld", open_files[fd].name, (long) uid, (long) gid);
455
INT64_T chirp_hdfs_fchmod(int fd, INT64_T mode)
457
// The owner may only add or remove the execute bit,
458
// because permissions are handled through the ACL model.
459
debug(D_HDFS, "fchmod %s %lo", open_files[fd].name, (long) mode);
460
mode = 0600 | (mode & 0100);
461
return hdfs_services->chmod(fs, open_files[fd].name, mode);
464
INT64_T chirp_hdfs_ftruncate(int fd, INT64_T length)
466
debug(D_HDFS, "ftruncate %s %ld", open_files[fd].name, (long) length);
467
INT64_T size = length;
468
char *buffer = read_buffer(open_files[fd].name, 0, &size);
471
/* simulate truncate */
472
if(hdfs_services->close(fs, open_files[fd].file) == -1)
473
return (free(buffer), -1);
474
INT64_T fd2 = write_buffer(open_files[fd].name, buffer, size);
475
open_files[fd].file = open_files[fd2].file; /* copy over new file */
476
free(open_files[fd2].name); /* close new fd */
477
open_files[fd2].name = NULL;
481
INT64_T chirp_hdfs_fsync(int fd)
483
debug(D_HDFS, "fsync %s", open_files[fd].name);
484
return hdfs_services->flush(fs, open_files[fd].file);
487
INT64_T chirp_hdfs_getfile(const char *path, struct link * link, time_t stoptime)
491
struct chirp_stat info;
493
path = FIXPATH(path);
494
debug(D_HDFS, "getfile %s", path);
496
result = chirp_hdfs_stat(path, &info);
500
if(S_ISDIR(info.cst_mode)) {
505
fd = chirp_hdfs_open(path, O_RDONLY, 0);
509
INT64_T ractual, wactual;
510
INT64_T length = info.cst_size;
512
link_putfstring(link, "%lld\n", stoptime, length);
514
// Copy Pasta from link.c
517
INT64_T chunk = MIN(sizeof(buffer), length);
519
ractual = hdfs_services->read(fs, open_files[fd].file, buffer, chunk);
523
wactual = link_putlstring(link, buffer, ractual, stoptime);
524
if(wactual != ractual) {
533
chirp_hdfs_close(fd);
541
INT64_T chirp_hdfs_putfile(const char *path, struct link * link, INT64_T length, INT64_T mode, time_t stoptime)
546
path = FIXPATH(path);
548
debug(D_HDFS, "putfile %s", path);
550
mode = 0600 | (mode & 0100);
552
fd = chirp_hdfs_open(path, O_WRONLY | O_CREAT | O_TRUNC, (int) mode);
557
link_putliteral(link, "0\n", stoptime);
559
// Copy Pasta from link.c
562
INT64_T ractual, wactual;
563
INT64_T chunk = MIN(sizeof(buffer), length);
565
ractual = link_read(link, buffer, chunk, stoptime);
569
wactual = hdfs_services->write(fs, open_files[fd].file, buffer, ractual);
570
if(wactual != ractual) {
583
link_soak(link, length - result, stoptime);
586
chirp_hdfs_close(fd);
593
INT64_T chirp_hdfs_mkfifo(const char *path)
595
path = FIXPATH(path);
596
debug(D_HDFS, "mkfifo %s", path);
597
return (errno = ENOTSUP, -1);
600
INT64_T chirp_hdfs_unlink(const char *path)
602
path = FIXPATH(path);
603
debug(D_HDFS, "unlink %s", path);
604
/* FIXME unlink does not set errno properly on failure! */
605
int ret = hdfs_services->unlink(fs, path);
607
errno = EEXIST; /* FIXME bad fix to above problem */
611
INT64_T chirp_hdfs_rename(const char *path, const char *newpath)
613
path = FIXPATH(path);
614
newpath = FIXPATH(path);
615
debug(D_HDFS, "rename %s -> %s", path, newpath);
616
hdfs_services->unlink(fs, newpath);
617
return hdfs_services->rename(fs, path, newpath);
620
INT64_T chirp_hdfs_link(const char *path, const char *newpath)
622
path = FIXPATH(path);
623
newpath = FIXPATH(path);
624
debug(D_HDFS, "link %s -> %s", path, newpath);
625
return (errno = ENOTSUP, -1);
628
INT64_T chirp_hdfs_symlink(const char *path, const char *newpath)
630
path = FIXPATH(path);
631
newpath = FIXPATH(path);
632
debug(D_HDFS, "symlink %s -> %s", path, newpath);
633
return (errno = ENOTSUP, -1);
636
INT64_T chirp_hdfs_readlink(const char *path, char *buf, INT64_T length)
638
path = FIXPATH(path);
639
debug(D_HDFS, "readlink %s", path);
640
return (errno = EINVAL, -1);
643
INT64_T chirp_hdfs_mkdir(const char *path, INT64_T mode)
645
path = FIXPATH(path);
646
debug(D_HDFS, "mkdir %s", path);
647
return hdfs_services->mkdir(fs, path);
651
rmdir is a little unusual.
652
An 'empty' directory may contain some administrative
653
files such as an ACL and an allocation state.
654
Only delete the directory if it contains only those files.
657
INT64_T chirp_hdfs_rmdir(const char *path)
663
path = FIXPATH(path);
664
debug(D_HDFS, "rmdir %s", path);
666
dir = chirp_hdfs_opendir(path);
668
while((d = chirp_hdfs_readdir(dir))) {
673
if(!strncmp(d, ".__", 3))
678
chirp_hdfs_closedir(dir);
681
return hdfs_services->unlink(fs, path);
691
INT64_T chirp_hdfs_lstat(const char *path, struct chirp_stat * buf)
693
path = FIXPATH(path);
694
debug(D_HDFS, "lstat %s", path);
695
return chirp_hdfs_stat(path, buf);
698
INT64_T chirp_hdfs_statfs(const char *path, struct chirp_statfs * buf)
700
path = FIXPATH(path);
701
debug(D_HDFS, "statfs %s", path);
703
INT64_T capacity = hdfs_services->get_capacity(fs);
704
INT64_T used = hdfs_services->get_used(fs);
705
INT64_T blocksize = hdfs_services->get_default_block_size(fs);
707
if(capacity == -1 || used == -1 || blocksize == -1)
708
return (errno = EIO, -1);
710
buf->f_type = 0; /* FIXME */
711
buf->f_bsize = blocksize;
712
buf->f_blocks = capacity / blocksize;
713
buf->f_bavail = buf->f_bfree = used / blocksize;
714
buf->f_files = buf->f_ffree = 0;
719
INT64_T chirp_hdfs_fstatfs(int fd, struct chirp_statfs * buf)
721
debug(D_HDFS, "fstatfs %d", fd);
723
return chirp_hdfs_statfs("/", buf);
726
INT64_T chirp_hdfs_access(const char *path, INT64_T mode)
728
/* W_OK is ok to delete, not to write, but we can't distinguish intent */
729
/* Chirp ACL will check that we can access the file the way we want, so
730
we just do a redundant "exists" check */
731
path = FIXPATH(path);
732
debug(D_HDFS, "access %s %ld", path, (long) mode);
733
return hdfs_services->exists(fs, path);
736
INT64_T chirp_hdfs_chmod(const char *path, INT64_T mode)
738
// The owner may only add or remove the execute bit,
739
// because permissions are handled through the ACL model.
740
path = FIXPATH(path);
741
debug(D_HDFS, "chmod %s %ld", path, (long) mode);
742
mode = 0600 | (mode & 0100);
743
return hdfs_services->chmod(fs, path, mode);
746
INT64_T chirp_hdfs_chown(const char *path, INT64_T uid, INT64_T gid)
748
// Changing file ownership is silently ignored,
749
// because permissions are handled through the ACL model.
750
path = FIXPATH(path);
751
debug(D_HDFS, "chown (ignored) %s %ld %ld", path, (long) uid, (long) gid);
755
INT64_T chirp_hdfs_lchown(const char *path, INT64_T uid, INT64_T gid)
757
// Changing file ownership is silently ignored,
758
// because permissions are handled through the ACL model.
759
path = FIXPATH(path);
760
debug(D_HDFS, "lchown (ignored) %s %ld %ld", path, (long) uid, (long) gid);
764
INT64_T chirp_hdfs_truncate(const char *path, INT64_T length)
766
path = FIXPATH(path);
767
debug(D_HDFS, "truncate %s %ld", path, (long) length);
768
/* simulate truncate */
769
INT64_T size = length;
770
char *buffer = read_buffer(path, 0, &size);
773
INT64_T fd = write_buffer(path, buffer, size);
774
free(open_files[fd].name);
776
open_files[fd].name = NULL;
780
INT64_T chirp_hdfs_utime(const char *path, time_t actime, time_t modtime)
782
path = FIXPATH(path);
783
debug(D_HDFS, "utime %s %ld %ld", path, (long) actime, (long) modtime);
784
return hdfs_services->utime(fs, path, modtime, actime);
787
INT64_T chirp_hdfs_md5(const char *path, unsigned char digest[16])
791
struct chirp_stat info;
793
path = FIXPATH(path);
795
debug(D_HDFS, "md5sum %s", path);
797
result = chirp_hdfs_stat(path, &info);
801
if(S_ISDIR(info.cst_mode)) {
806
fd = chirp_hdfs_open(path, O_RDONLY, 0);
811
INT64_T length = info.cst_size;
817
INT64_T chunk = MIN(sizeof(buffer), length);
819
ractual = hdfs_services->read(fs, open_files[fd].file, buffer, chunk);
823
md5_update(&ctx, (unsigned char *) buffer, ractual);
829
chirp_hdfs_close(fd);
830
md5_final(digest, &ctx);
838
INT64_T chirp_hdfs_chdir(const char *path)
840
debug(D_HDFS, "chdir %s", path);
841
return hdfs_services->chdir(fs, path);
844
struct chirp_filesystem chirp_hdfs_fs = {
858
chirp_hdfs_ftruncate,
888
chirp_hdfs_file_size,