2
Copyright (C) 2003-2004 Douglas Thain and the University of Wisconsin
3
Copyright (C) 2005- The University of Notre Dame
4
This software is distributed under the GNU General Public License.
5
See the file COPYING for details.
8
#include "pfs_dircache.h"
10
#include "pfs_service.h"
14
#include "hash_table.h"
15
#include "hdfs_library.h"
28
#include <sys/statfs.h>
30
extern int pfs_enable_small_file_optimizations;
32
#define HDFS_DEFAULT_PORT 9100
34
#define HDFS_STAT_MODE (S_IRUSR|S_IWUSR|S_IXUSR|S_IRGRP|S_IWGRP|S_IXGRP|S_IROTH|S_IWOTH|S_IXOTH)
36
#define HDFS_CHECK_INIT(e) if (!is_initialized && initialize() < 0) return (e);
37
#define HDFS_CHECK_FS(e) if (!fs) return (e);
39
#define HDFS_END debug(D_HDFS,"= %d %s",(int)result,((result>=0) ? "" : strerror(errno))); return result;
41
static pfs_dircache hdfs_dircache;
43
class pfs_file_hdfs : public pfs_file
46
struct hdfs_library *hdfs;
50
pfs_file_hdfs( pfs_name *n, struct hdfs_library *hs, hdfsFS f, hdfsFile h ) : pfs_file(n) {
59
debug(D_HDFS, "closing file %s", name.rest);
60
result = hdfs->close(fs, handle);
67
hdfs_dircache.invalidate();
69
debug(D_HDFS, "flushing file %s ", name.rest);
70
result = hdfs->flush(fs, handle);
74
virtual pfs_ssize_t read( void *data, pfs_size_t length, pfs_off_t offset ) {
77
debug(D_HDFS, "reading from file %s", name.rest);
78
result = hdfs->pread(fs, handle, offset, data, length);
82
virtual pfs_ssize_t write( const void *data, pfs_size_t length, pfs_off_t offset ) {
85
hdfs_dircache.invalidate();
87
/* Ignore offset since HDFS does not support seekable writes. */
88
debug(D_HDFS, "writing to file %s ", name.rest);
89
result = hdfs->write(fs, handle, data, length);
94
class pfs_service_hdfs : public pfs_service {
96
struct hdfs_library *hdfs;
97
struct hash_table *uid_table;
98
struct hash_table *gid_table;
103
uid_table = hash_table_create(0, 0);
104
gid_table = hash_table_create(0, 0);
105
is_initialized = false;
108
~pfs_service_hdfs() {
109
hash_table_delete(uid_table);
110
hash_table_delete(gid_table);
116
debug(D_HDFS, "loading dynamically shared libraries");
117
hdfs = hdfs_library_open();
119
is_initialized = false;
122
is_initialized = true;
129
int get_uid_from_name( const char *name ) {
132
key = (PTRINT_T)hash_table_lookup(uid_table, name);
136
struct passwd *owner;
138
owner = getpwnam(name);
140
hash_table_insert(uid_table, name, (void*)(PTRINT_T)owner->pw_uid);
141
return owner->pw_uid;
148
int get_gid_from_name( const char *name ) {
151
key = (PTRINT_T)hash_table_lookup(gid_table, name);
157
group = getgrnam(name);
159
hash_table_insert(gid_table, name, (void*)(PTRINT_T)group->gr_gid);
160
return group->gr_gid;
167
void hdfs_copy_fileinfo(pfs_name *name, hdfsFileInfo *file_info, struct pfs_stat *buf) {
171
pfs_service_emulate_stat(name, buf);
173
if (file_info->mKind == kObjectKindDirectory) {
174
buf->st_mode = S_IFDIR;
176
buf->st_mode = S_IFREG;
179
buf->st_mode |= file_info->mPermissions;
180
buf->st_size = file_info->mSize;
181
buf->st_mtime = file_info->mLastMod;
182
buf->st_atime = file_info->mLastAccess;
183
buf->st_blksize = file_info->mBlockSize;
185
file_uid = get_uid_from_name(file_info->mOwner);
187
buf->st_uid = file_uid;
190
file_gid = get_gid_from_name(file_info->mGroup);
192
buf->st_gid = file_gid;
196
virtual void * connect( pfs_name *name ) {
200
debug(D_HDFS, "connecting to %s:%d", name->host, name->port);
201
fs = hdfs->connect(name->host, name->port);
202
if (errno == HDFS_EINTERNAL) {
206
debug(D_HDFS, "= %ld", fs);
210
virtual void disconnect( pfs_name *name, void *fs) {
211
debug(D_HDFS, "disconnecting from %s:%d", name->host, name->port);
212
hdfs->disconnect((hdfsFS)fs);
215
virtual pfs_file * open( pfs_name *name, int flags, mode_t mode ) {
218
hdfsFS fs = (hdfsFS)pfs_service_connect_cache(name);
223
hdfs_dircache.invalidate();
225
switch (flags&O_ACCMODE) {
227
debug(D_HDFS, "opening file %s for reading", name->rest);
229
if (hdfs->exists(fs, name->rest) < 0) {
230
debug(D_HDFS, "file %s does not exist", name->rest);
236
debug(D_HDFS, "opening file %s for writing", name->rest);
240
debug(D_HDFS, "invalid file open flag %d", flags&O_ACCMODE);
246
if (!this->_stat(fs, name, &buf) && S_ISDIR(buf.st_mode)) {
251
handle = hdfs->open(fs, name->rest, flags, 0, 0, 0);
252
if (handle != NULL) {
253
file = new pfs_file_hdfs(name, hdfs, fs, handle);
262
pfs_service_disconnect_cache(name, fs, (errno == HDFS_EINTERNAL));
264
debug(D_HDFS, "= %ld", file);
268
virtual pfs_dir * getdir( pfs_name *name ) {
269
pfs_dir *dir = new pfs_dir(name);
271
hdfsFileInfo *file_list = 0;
272
hdfsFS fs = (hdfsFS)pfs_service_connect_cache(name);
278
if (pfs_enable_small_file_optimizations) {
279
hdfs_dircache.begin(name->path);
282
debug(D_HDFS, "checking if directory %s exists", name->rest);
283
if (hdfs->exists(fs, name->rest) < 0) {
289
debug(D_HDFS, "getting directory of %s", name->rest);
290
file_list = hdfs->listdir(fs, name->rest, &num_entries);
292
if (file_list != NULL) {
293
for (int i = 0; i < num_entries; i++) {
294
if (pfs_enable_small_file_optimizations) {
295
hdfs_copy_fileinfo(name, &file_list[i], &buf);
296
hdfs_dircache.insert(file_list[i].mName, &buf, dir);
298
dir->append(file_list[i].mName);
302
hdfs->free_stat(file_list, num_entries);
305
pfs_service_disconnect_cache(name, (void*)fs, (errno == HDFS_EINTERNAL));
309
virtual int stat( pfs_name *name, struct pfs_stat *buf ) {
311
hdfsFS fs = (hdfsFS)pfs_service_connect_cache(name);
316
debug(D_HDFS, "stat %s", name->rest);
317
result = this->_stat(fs, name, buf);
318
buf->st_mode |= (S_IXUSR | S_IXGRP);
319
pfs_service_disconnect_cache(name, (void*)fs, (errno == HDFS_EINTERNAL));
324
virtual int lstat( pfs_name *name, struct pfs_stat *buf ) {
326
hdfsFS fs = (hdfsFS)pfs_service_connect_cache(name);
331
debug(D_HDFS, "lstat %s", name->rest);
332
result = this->_stat(fs, name, buf);
333
buf->st_mode |= (S_IXUSR | S_IXGRP);
334
pfs_service_disconnect_cache(name, (void*)fs, (errno == HDFS_EINTERNAL));
339
virtual int _stat( hdfsFS fs, pfs_name *name, struct pfs_stat *buf ) {
341
hdfsFileInfo *file_info = 0;
343
if (hdfs_dircache.lookup(name->rest, buf)) {
346
file_info = hdfs->stat(fs, name->rest);
348
if (file_info != NULL) {
349
hdfs_copy_fileinfo(name, file_info, buf);
350
hdfs->free_stat(file_info, 1);
361
virtual int access( pfs_name *name, mode_t mode ) {
363
hdfsFS fs = (hdfsFS)pfs_service_connect_cache(name);
368
debug(D_HDFS, "access %s", name->rest);
369
result = hdfs->exists(fs, name->rest);
371
pfs_service_disconnect_cache(name, (void*)fs, (errno == HDFS_EINTERNAL));
375
virtual int chdir( pfs_name *name, char *newname ) {
378
hdfsFS fs = (hdfsFS)pfs_service_connect_cache(name);
383
debug(D_HDFS, "chdir %s", name->rest);
384
if (this->_stat(fs, name, &buf) >= 0) {
385
if (S_ISDIR(buf.st_mode)) {
386
sprintf(newname, "/%s/%s:%d%s", name->service_name, name->host, name->port, name->rest);
394
pfs_service_disconnect_cache(name, (void*)fs, (errno == HDFS_EINTERNAL));
398
virtual int mkdir( pfs_name *name, mode_t mode ) {
400
hdfsFS fs = (hdfsFS)pfs_service_connect_cache(name);
405
hdfs_dircache.invalidate();
407
debug(D_HDFS, "mkdir %s", name->rest);
408
result = hdfs->mkdir(fs, name->rest);
410
pfs_service_disconnect_cache(name, (void*)fs, (errno == HDFS_EINTERNAL));
414
virtual int rmdir( pfs_name *name ) {
416
hdfsFS fs = (hdfsFS)pfs_service_connect_cache(name);
421
hdfs_dircache.invalidate();
423
debug(D_HDFS, "rmdir %s", name->rest);
424
result = hdfs->unlink(fs, name->rest);
426
pfs_service_disconnect_cache(name, (void*)fs, (errno == HDFS_EINTERNAL));
430
virtual int unlink( pfs_name *name ) {
432
hdfsFS fs = (hdfsFS)pfs_service_connect_cache(name);
437
hdfs_dircache.invalidate();
439
debug(D_HDFS, "unlink %s", name->rest);
440
result = hdfs->unlink(fs, name->rest);
442
pfs_service_disconnect_cache(name, (void*)fs, (errno == HDFS_EINTERNAL));
446
virtual int rename( pfs_name *name, pfs_name *newname ) {
448
hdfsFS fs = (hdfsFS)pfs_service_connect_cache(name);
453
hdfs_dircache.invalidate();
455
debug(D_HDFS, "rename %s to %s", name->rest, newname->rest);
456
result = hdfs->rename(fs, name->rest, newname->rest);
458
pfs_service_disconnect_cache(name, (void*)fs, (errno == HDFS_EINTERNAL));
465
* Returns locations of first block of file. */
466
virtual pfs_location *locate( pfs_name *name ) {
468
hdfsFS fs = (hdfsFS)pfs_service_connect_cache(name);
469
pfs_location *loc = NULL;
474
debug(D_HDFS, "locate %s", name->rest);
475
if (this->_stat(fs, name, &buf) >= 0) {
476
if (S_ISDIR(buf.st_mode)) {
479
hosts = hdfs->get_hosts(fs, name->rest, 0, buf.st_blksize);
481
loc = new pfs_location();
482
for (int i = 0; hosts[i]; i++)
483
for (int j = 0; hosts[i][j]; j++)
484
loc->append(hosts[i][j]);
485
hdfs->free_hosts(hosts);
491
pfs_service_disconnect_cache(name, (void*)fs, (errno == HDFS_EINTERNAL));
495
virtual int get_default_port() {
496
return HDFS_DEFAULT_PORT;
499
virtual int is_seekable() {
504
static pfs_service_hdfs pfs_service_hdfs_instance;
505
pfs_service *pfs_service_hdfs = &pfs_service_hdfs_instance;
507
// vim: sw=8 sts=8 ts=8 ft=cpp