~ubuntu-branches/ubuntu/vivid/cctools/vivid

« back to all changes in this revision

Viewing changes to parrot/src/pfs_service_hdfs.cc

  • Committer: Bazaar Package Importer
  • Author(s): Michael Hanke
  • Date: 2011-05-07 09:05:00 UTC
  • Revision ID: james.westby@ubuntu.com-20110507090500-lqpmdtwndor6e7os
Tags: upstream-3.3.2
ImportĀ upstreamĀ versionĀ 3.3.2

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*
 
2
Copyright (C) 2003-2004 Douglas Thain and the University of Wisconsin
 
3
Copyright (C) 2005- The University of Notre Dame
 
4
This software is distributed under the GNU General Public License.
 
5
See the file COPYING for details.
 
6
*/
 
7
 
 
8
#include "pfs_dircache.h"
 
9
#include "pfs_table.h"
 
10
#include "pfs_service.h"
 
11
 
 
12
extern "C" {
 
13
#include "debug.h"
 
14
#include "hash_table.h"
 
15
#include "hdfs_library.h"
 
16
}
 
17
 
 
18
#include <string.h>
 
19
#include <unistd.h>
 
20
#include <stdio.h>
 
21
#include <stdlib.h>
 
22
#include <fcntl.h>
 
23
#include <errno.h>
 
24
#include <utime.h>
 
25
#include <dlfcn.h>
 
26
#include <pwd.h>
 
27
#include <grp.h>
 
28
#include <sys/statfs.h>
 
29
 
 
30
extern int pfs_enable_small_file_optimizations;
 
31
 
 
32
#define HDFS_DEFAULT_PORT 9100
 
33
 
 
34
#define HDFS_STAT_MODE (S_IRUSR|S_IWUSR|S_IXUSR|S_IRGRP|S_IWGRP|S_IXGRP|S_IROTH|S_IWOTH|S_IXOTH)
 
35
 
 
36
#define HDFS_CHECK_INIT(e) if (!is_initialized && initialize() < 0) return (e);
 
37
#define HDFS_CHECK_FS(e)   if (!fs) return (e);
 
38
 
 
39
#define HDFS_END debug(D_HDFS,"= %d %s",(int)result,((result>=0) ? "" : strerror(errno))); return result;
 
40
 
 
41
static pfs_dircache hdfs_dircache;
 
42
 
 
43
class pfs_file_hdfs : public pfs_file
 
44
{
 
45
private:
 
46
        struct hdfs_library *hdfs;
 
47
        hdfsFS fs;
 
48
        hdfsFile handle;
 
49
public:
 
50
        pfs_file_hdfs( pfs_name *n, struct hdfs_library *hs, hdfsFS f, hdfsFile h ) : pfs_file(n) {
 
51
                hdfs = hs;
 
52
                fs = f;
 
53
                handle = h;
 
54
        }
 
55
 
 
56
        virtual int close() {
 
57
                int result;
 
58
 
 
59
                debug(D_HDFS, "closing file %s", name.rest);
 
60
                result = hdfs->close(fs, handle);
 
61
                HDFS_END
 
62
        }
 
63
 
 
64
        virtual int fsync() {
 
65
                int result; 
 
66
 
 
67
                hdfs_dircache.invalidate();
 
68
 
 
69
                debug(D_HDFS, "flushing file %s ", name.rest);
 
70
                result = hdfs->flush(fs, handle);
 
71
                HDFS_END
 
72
        }
 
73
 
 
74
        virtual pfs_ssize_t read( void *data, pfs_size_t length, pfs_off_t offset ) {
 
75
                pfs_ssize_t result;
 
76
 
 
77
                debug(D_HDFS, "reading from file %s", name.rest);
 
78
                result = hdfs->pread(fs, handle, offset, data, length);
 
79
                HDFS_END
 
80
        }
 
81
        
 
82
        virtual pfs_ssize_t write( const void *data, pfs_size_t length, pfs_off_t offset ) {
 
83
                pfs_ssize_t result;
 
84
 
 
85
                hdfs_dircache.invalidate();
 
86
 
 
87
                /* Ignore offset since HDFS does not support seekable writes. */
 
88
                debug(D_HDFS, "writing to file %s ", name.rest);
 
89
                result = hdfs->write(fs, handle, data, length);
 
90
                HDFS_END
 
91
        }
 
92
};
 
93
 
 
94
class pfs_service_hdfs : public pfs_service {
 
95
private:
 
96
        struct hdfs_library *hdfs;
 
97
        struct hash_table *uid_table;
 
98
        struct hash_table *gid_table;
 
99
 
 
100
        bool is_initialized;
 
101
public:
 
102
        pfs_service_hdfs() {
 
103
                uid_table = hash_table_create(0, 0);
 
104
                gid_table = hash_table_create(0, 0);
 
105
                is_initialized = false;
 
106
        }
 
107
 
 
108
        ~pfs_service_hdfs() {
 
109
                hash_table_delete(uid_table);
 
110
                hash_table_delete(gid_table);
 
111
        }
 
112
 
 
113
        int initialize() {
 
114
                int result;
 
115
 
 
116
                debug(D_HDFS, "loading dynamically shared libraries");
 
117
                hdfs = hdfs_library_open();
 
118
                if(!hdfs){
 
119
                        is_initialized = false;
 
120
                        result = -1;
 
121
                } else {
 
122
                        is_initialized = true;
 
123
                        result = 0;
 
124
                }
 
125
 
 
126
                HDFS_END
 
127
        }
 
128
 
 
129
        int get_uid_from_name( const char *name ) {
 
130
                int key;
 
131
                
 
132
                key = (PTRINT_T)hash_table_lookup(uid_table, name);
 
133
                if (key) {
 
134
                        return key;
 
135
                } else {
 
136
                        struct passwd *owner;
 
137
 
 
138
                        owner = getpwnam(name);
 
139
                        if (owner) {
 
140
                                hash_table_insert(uid_table, name, (void*)(PTRINT_T)owner->pw_uid);
 
141
                                return owner->pw_uid;
 
142
                        } else {
 
143
                                return -1;
 
144
                        }
 
145
                }
 
146
        }
 
147
        
 
148
        int get_gid_from_name( const char *name ) {
 
149
                int key;
 
150
                
 
151
                key = (PTRINT_T)hash_table_lookup(gid_table, name);
 
152
                if (key) {
 
153
                        return key;
 
154
                } else {
 
155
                        struct group *group;
 
156
 
 
157
                        group = getgrnam(name);
 
158
                        if (group) {
 
159
                                hash_table_insert(gid_table, name, (void*)(PTRINT_T)group->gr_gid);
 
160
                                return group->gr_gid;
 
161
                        } else {
 
162
                                return -1;
 
163
                        }
 
164
                }
 
165
        }
 
166
 
 
167
        void hdfs_copy_fileinfo(pfs_name *name, hdfsFileInfo *file_info, struct pfs_stat *buf) {
 
168
                int file_uid;
 
169
                int file_gid;
 
170
 
 
171
                pfs_service_emulate_stat(name, buf);
 
172
 
 
173
                if (file_info->mKind == kObjectKindDirectory) {
 
174
                        buf->st_mode  = S_IFDIR;
 
175
                } else {
 
176
                        buf->st_mode = S_IFREG;
 
177
                }
 
178
 
 
179
                buf->st_mode   |= file_info->mPermissions;
 
180
                buf->st_size    = file_info->mSize;
 
181
                buf->st_mtime   = file_info->mLastMod;
 
182
                buf->st_atime   = file_info->mLastAccess;
 
183
                buf->st_blksize = file_info->mBlockSize;
 
184
 
 
185
                file_uid = get_uid_from_name(file_info->mOwner);
 
186
                if (file_uid >= 0) {
 
187
                        buf->st_uid = file_uid;
 
188
                }
 
189
 
 
190
                file_gid = get_gid_from_name(file_info->mGroup);
 
191
                if (file_gid >= 0) {
 
192
                        buf->st_gid = file_gid;
 
193
                }
 
194
        }
 
195
 
 
196
        virtual void * connect( pfs_name *name ) {
 
197
                hdfsFS fs;
 
198
                HDFS_CHECK_INIT(0)
 
199
 
 
200
                debug(D_HDFS, "connecting to %s:%d", name->host, name->port);
 
201
                fs = hdfs->connect(name->host, name->port);
 
202
                if (errno == HDFS_EINTERNAL) {
 
203
                        errno = ECONNRESET;
 
204
                }
 
205
 
 
206
                debug(D_HDFS, "= %ld", fs);
 
207
                return fs;
 
208
        }
 
209
 
 
210
        virtual void disconnect( pfs_name *name, void *fs) {
 
211
                debug(D_HDFS, "disconnecting from %s:%d", name->host, name->port);
 
212
                hdfs->disconnect((hdfsFS)fs);
 
213
        }
 
214
 
 
215
        virtual pfs_file * open( pfs_name *name, int flags, mode_t mode ) {
 
216
                pfs_file *file = 0;
 
217
                hdfsFile handle;
 
218
                hdfsFS fs = (hdfsFS)pfs_service_connect_cache(name);
 
219
 
 
220
                HDFS_CHECK_INIT(0)
 
221
                HDFS_CHECK_FS(0)
 
222
 
 
223
                hdfs_dircache.invalidate();
 
224
 
 
225
                switch (flags&O_ACCMODE) {
 
226
                        case O_RDONLY:
 
227
                                debug(D_HDFS, "opening file %s for reading", name->rest);
 
228
                                flags = O_RDONLY;
 
229
                                if (hdfs->exists(fs, name->rest) < 0) {
 
230
                                        debug(D_HDFS, "file %s does not exist", name->rest);
 
231
                                        errno = ENOENT;
 
232
                                        return 0;
 
233
                                }
 
234
                                break;
 
235
                        case O_WRONLY:
 
236
                                debug(D_HDFS, "opening file %s for writing", name->rest);
 
237
                                flags = O_WRONLY;
 
238
                                break;
 
239
                        default:
 
240
                                debug(D_HDFS, "invalid file open flag %d", flags&O_ACCMODE);
 
241
                                errno = ENOTSUP;
 
242
                                return 0;
 
243
                }
 
244
                        
 
245
                struct pfs_stat buf;
 
246
                if (!this->_stat(fs, name, &buf) && S_ISDIR(buf.st_mode)) {
 
247
                        errno = EISDIR;
 
248
                        return 0;
 
249
                }
 
250
 
 
251
                handle = hdfs->open(fs, name->rest, flags, 0, 0, 0);
 
252
                if (handle != NULL) {
 
253
                        file = new pfs_file_hdfs(name, hdfs, fs, handle);
 
254
                        if (!file) {
 
255
                                errno = ENOENT;
 
256
                        }
 
257
                } else {
 
258
                        errno = EINVAL;
 
259
                        file = 0;
 
260
                }
 
261
 
 
262
                pfs_service_disconnect_cache(name, fs, (errno == HDFS_EINTERNAL));
 
263
 
 
264
                debug(D_HDFS, "= %ld", file);
 
265
                return file;
 
266
        }
 
267
 
 
268
        virtual pfs_dir * getdir( pfs_name *name ) {
 
269
                pfs_dir *dir = new pfs_dir(name);
 
270
 
 
271
                hdfsFileInfo *file_list = 0;
 
272
                hdfsFS fs = (hdfsFS)pfs_service_connect_cache(name);
 
273
                int num_entries = 0;
 
274
 
 
275
                HDFS_CHECK_INIT(0)
 
276
                HDFS_CHECK_FS(0)
 
277
 
 
278
                if (pfs_enable_small_file_optimizations) {
 
279
                        hdfs_dircache.begin(name->path);
 
280
                }
 
281
 
 
282
                debug(D_HDFS, "checking if directory %s exists", name->rest);
 
283
                if (hdfs->exists(fs, name->rest) < 0) {
 
284
                        errno = EINVAL;
 
285
                        delete dir;
 
286
                        return 0;
 
287
                }
 
288
 
 
289
                debug(D_HDFS, "getting directory of %s", name->rest);
 
290
                file_list = hdfs->listdir(fs, name->rest, &num_entries);
 
291
                struct pfs_stat buf;
 
292
                if (file_list != NULL) {
 
293
                        for (int i = 0; i < num_entries; i++) {
 
294
                                if (pfs_enable_small_file_optimizations) {
 
295
                                        hdfs_copy_fileinfo(name, &file_list[i], &buf);
 
296
                                        hdfs_dircache.insert(file_list[i].mName, &buf, dir);
 
297
                                } else {
 
298
                                        dir->append(file_list[i].mName);
 
299
                                }
 
300
                        }
 
301
                        
 
302
                        hdfs->free_stat(file_list, num_entries);
 
303
                }
 
304
                
 
305
                pfs_service_disconnect_cache(name, (void*)fs, (errno == HDFS_EINTERNAL));
 
306
                return dir;
 
307
        }
 
308
        
 
309
        virtual int stat( pfs_name *name, struct pfs_stat *buf ) {
 
310
                int result;
 
311
                hdfsFS fs = (hdfsFS)pfs_service_connect_cache(name);
 
312
                
 
313
                HDFS_CHECK_INIT(-1)
 
314
                HDFS_CHECK_FS(-1)
 
315
                
 
316
                debug(D_HDFS, "stat %s", name->rest);
 
317
                result = this->_stat(fs, name, buf);
 
318
                buf->st_mode |= (S_IXUSR | S_IXGRP);
 
319
                pfs_service_disconnect_cache(name, (void*)fs, (errno == HDFS_EINTERNAL));
 
320
 
 
321
                HDFS_END
 
322
        }
 
323
        
 
324
        virtual int lstat( pfs_name *name, struct pfs_stat *buf ) {
 
325
                int result;
 
326
                hdfsFS fs = (hdfsFS)pfs_service_connect_cache(name);
 
327
                
 
328
                HDFS_CHECK_INIT(-1)
 
329
                HDFS_CHECK_FS(-1)
 
330
                
 
331
                debug(D_HDFS, "lstat %s", name->rest);
 
332
                result = this->_stat(fs, name, buf);
 
333
                buf->st_mode |= (S_IXUSR | S_IXGRP);
 
334
                pfs_service_disconnect_cache(name, (void*)fs, (errno == HDFS_EINTERNAL));
 
335
 
 
336
                HDFS_END
 
337
        }
 
338
 
 
339
        virtual int _stat( hdfsFS fs, pfs_name *name, struct pfs_stat *buf ) {
 
340
                int result;
 
341
                hdfsFileInfo *file_info = 0;
 
342
 
 
343
                if (hdfs_dircache.lookup(name->rest, buf)) {
 
344
                        result = 0;
 
345
                } else {
 
346
                        file_info = hdfs->stat(fs, name->rest);
 
347
 
 
348
                        if (file_info != NULL) {
 
349
                                hdfs_copy_fileinfo(name, file_info, buf);
 
350
                                hdfs->free_stat(file_info, 1);
 
351
                                result = 0;
 
352
                        } else {
 
353
                                errno = ENOENT;
 
354
                                result = -1;
 
355
                        }
 
356
                }
 
357
                
 
358
                HDFS_END
 
359
        }
 
360
 
 
361
        virtual int access( pfs_name *name, mode_t mode ) {
 
362
                int result = -1;
 
363
                hdfsFS fs = (hdfsFS)pfs_service_connect_cache(name);
 
364
                
 
365
                HDFS_CHECK_INIT(-1)
 
366
                HDFS_CHECK_FS(-1)
 
367
 
 
368
                debug(D_HDFS, "access %s", name->rest);
 
369
                result = hdfs->exists(fs, name->rest);
 
370
                
 
371
                pfs_service_disconnect_cache(name, (void*)fs, (errno == HDFS_EINTERNAL));
 
372
                HDFS_END
 
373
        }
 
374
 
 
375
        virtual int chdir( pfs_name *name, char *newname ) {
 
376
                int result = -1;
 
377
                struct pfs_stat buf;
 
378
                hdfsFS fs = (hdfsFS)pfs_service_connect_cache(name);
 
379
                
 
380
                HDFS_CHECK_INIT(-1)
 
381
                HDFS_CHECK_FS(-1)
 
382
 
 
383
                debug(D_HDFS, "chdir %s", name->rest);
 
384
                if (this->_stat(fs, name, &buf) >= 0) {
 
385
                        if (S_ISDIR(buf.st_mode)) {
 
386
                                sprintf(newname, "/%s/%s:%d%s", name->service_name, name->host, name->port, name->rest);
 
387
                                result = 0;
 
388
                        } else {
 
389
                                errno = ENOTDIR;
 
390
                                result = -1;
 
391
                        }
 
392
                }
 
393
 
 
394
                pfs_service_disconnect_cache(name, (void*)fs, (errno == HDFS_EINTERNAL));
 
395
                HDFS_END
 
396
        }
 
397
 
 
398
        virtual int mkdir( pfs_name *name, mode_t mode ) {
 
399
                int result;
 
400
                hdfsFS fs = (hdfsFS)pfs_service_connect_cache(name);
 
401
                
 
402
                HDFS_CHECK_INIT(-1)
 
403
                HDFS_CHECK_FS(-1)
 
404
                
 
405
                hdfs_dircache.invalidate();
 
406
                
 
407
                debug(D_HDFS, "mkdir %s", name->rest);
 
408
                result = hdfs->mkdir(fs, name->rest);
 
409
 
 
410
                pfs_service_disconnect_cache(name, (void*)fs, (errno == HDFS_EINTERNAL));
 
411
                HDFS_END
 
412
        }
 
413
        
 
414
        virtual int rmdir( pfs_name *name ) {
 
415
                int result;
 
416
                hdfsFS fs = (hdfsFS)pfs_service_connect_cache(name);
 
417
                
 
418
                HDFS_CHECK_INIT(-1)
 
419
                HDFS_CHECK_FS(-1)
 
420
                
 
421
                hdfs_dircache.invalidate();
 
422
                
 
423
                debug(D_HDFS, "rmdir %s", name->rest);
 
424
                result = hdfs->unlink(fs, name->rest);
 
425
 
 
426
                pfs_service_disconnect_cache(name, (void*)fs, (errno == HDFS_EINTERNAL));
 
427
                HDFS_END
 
428
        }
 
429
        
 
430
        virtual int unlink( pfs_name *name ) {
 
431
                int result;
 
432
                hdfsFS fs = (hdfsFS)pfs_service_connect_cache(name);
 
433
                
 
434
                HDFS_CHECK_INIT(-1)
 
435
                HDFS_CHECK_FS(-1)
 
436
 
 
437
                hdfs_dircache.invalidate();
 
438
                
 
439
                debug(D_HDFS, "unlink %s", name->rest);
 
440
                result = hdfs->unlink(fs, name->rest);
 
441
 
 
442
                pfs_service_disconnect_cache(name, (void*)fs, (errno == HDFS_EINTERNAL));
 
443
                HDFS_END
 
444
        }
 
445
 
 
446
        virtual int rename( pfs_name *name, pfs_name *newname ) {
 
447
                int result;
 
448
                hdfsFS fs = (hdfsFS)pfs_service_connect_cache(name);
 
449
                
 
450
                HDFS_CHECK_INIT(-1)
 
451
                HDFS_CHECK_FS(-1)
 
452
 
 
453
                hdfs_dircache.invalidate();
 
454
                
 
455
                debug(D_HDFS, "rename %s to %s", name->rest, newname->rest);
 
456
                result = hdfs->rename(fs, name->rest, newname->rest);
 
457
 
 
458
                pfs_service_disconnect_cache(name, (void*)fs, (errno == HDFS_EINTERNAL));
 
459
                HDFS_END
 
460
        }
 
461
 
 
462
        /**
 
463
         * locate file
 
464
         *
 
465
         * Returns locations of first block of file. */
 
466
        virtual pfs_location *locate( pfs_name *name ) {
 
467
                struct pfs_stat buf;
 
468
                hdfsFS fs = (hdfsFS)pfs_service_connect_cache(name);
 
469
                pfs_location *loc = NULL;
 
470
                char ***hosts;
 
471
 
 
472
                HDFS_CHECK_FS(NULL)
 
473
                
 
474
                debug(D_HDFS, "locate %s", name->rest);
 
475
                if (this->_stat(fs, name, &buf) >= 0) {
 
476
                        if (S_ISDIR(buf.st_mode)) {
 
477
                                errno = ENOTSUP;
 
478
                        } else {
 
479
                                hosts = hdfs->get_hosts(fs, name->rest, 0, buf.st_blksize);
 
480
                                if (hosts) {
 
481
                                        loc = new pfs_location();
 
482
                                        for (int i = 0; hosts[i]; i++)
 
483
                                                for (int j = 0; hosts[i][j]; j++)
 
484
                                                        loc->append(hosts[i][j]);
 
485
                                        hdfs->free_hosts(hosts);
 
486
                                }
 
487
                        }
 
488
 
 
489
                }
 
490
 
 
491
                pfs_service_disconnect_cache(name, (void*)fs, (errno == HDFS_EINTERNAL));
 
492
                return loc;
 
493
        }
 
494
 
 
495
        virtual int get_default_port() {
 
496
                return HDFS_DEFAULT_PORT;
 
497
        }
 
498
 
 
499
        virtual int is_seekable() {
 
500
                return 1;
 
501
        }
 
502
};
 
503
 
 
504
static pfs_service_hdfs pfs_service_hdfs_instance;
 
505
pfs_service *pfs_service_hdfs = &pfs_service_hdfs_instance;
 
506
 
 
507
// vim: sw=8 sts=8 ts=8 ft=cpp