~ubuntu-branches/ubuntu/vivid/cctools/vivid

« back to all changes in this revision

Viewing changes to chirp/src/chirp_hdfs.c

  • Committer: Bazaar Package Importer
  • Author(s): Michael Hanke
  • Date: 2011-05-07 09:05:00 UTC
  • Revision ID: james.westby@ubuntu.com-20110507090500-lqpmdtwndor6e7os
Tags: upstream-3.3.2
ImportĀ upstreamĀ versionĀ 3.3.2

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*
 
2
Copyright (C) 2008- The University of Notre Dame
 
3
This software is distributed under the GNU General Public License.
 
4
See the file COPYING for details.
 
5
*/
 
6
 
 
7
#include "chirp_filesystem.h"
 
8
#include "chirp_hdfs.h"
 
9
#include "chirp_protocol.h"
 
10
 
 
11
#include "macros.h"
 
12
#include "xmalloc.h"
 
13
#include "hash_table.h"
 
14
#include "debug.h"
 
15
#include "md5.h"
 
16
#include "username.h"
 
17
 
 
18
#include "hdfs_library.h"
 
19
 
 
20
#include <assert.h>
 
21
#include <string.h>
 
22
#include <unistd.h>
 
23
#include <stdio.h>
 
24
#include <stdlib.h>
 
25
#include <fcntl.h>
 
26
#include <errno.h>
 
27
#include <utime.h>
 
28
#include <dlfcn.h>
 
29
#include <pwd.h>
 
30
#include <grp.h>
 
31
#include <sys/stat.h>
 
32
 
 
33
// HDFS gets upset if a path begins with two slashes.
 
34
// This macro simply skips over the first slash if needed.
 
35
#define FIXPATH(p) ( (p[0]=='/' && p[1]=='/') ? &p[1] : p )
 
36
 
 
37
char *chirp_hdfs_hostname = NULL;
 
38
UINT16_T chirp_hdfs_port = 0;
 
39
 
 
40
extern char chirp_owner[USERNAME_MAX];
 
41
 
 
42
static struct hdfs_library *hdfs_services = 0;
 
43
static hdfsFS fs = NULL;
 
44
 
 
45
/* Array of open HDFS Files */
 
46
#define BASE_SIZE 1024
 
47
static struct chirp_hdfs_file {
 
48
        char *name;
 
49
        hdfsFile file;
 
50
} open_files[BASE_SIZE];        // = NULL;
 
51
 
 
52
INT64_T chirp_hdfs_init(const char *path)
 
53
{
 
54
        static const char *groups[] = { "supergroup" };
 
55
 
 
56
        (void) path;
 
57
 
 
58
        int i;
 
59
 
 
60
        if(chirp_hdfs_hostname == NULL)
 
61
                fatal("hostname and port must be specified, use -x option");
 
62
 
 
63
        debug(D_HDFS, "initializing", chirp_hdfs_hostname, chirp_hdfs_port);
 
64
 
 
65
        assert(fs == NULL);
 
66
 
 
67
        for(i = 0; i < BASE_SIZE; i++)
 
68
                open_files[i].name = NULL;
 
69
 
 
70
        if(!hdfs_services) {
 
71
                hdfs_services = hdfs_library_open();
 
72
                if(!hdfs_services)
 
73
                        return -1;
 
74
        }
 
75
 
 
76
        debug(D_HDFS, "connecting to %s:%u as '%s'\n", chirp_hdfs_hostname, chirp_hdfs_port, chirp_owner);
 
77
        fs = hdfs_services->connect_as_user(chirp_hdfs_hostname, chirp_hdfs_port, chirp_owner, groups, 1);
 
78
 
 
79
        if(fs == NULL)
 
80
                return (errno = ENOSYS, -1);
 
81
        else
 
82
                return 0;
 
83
}
 
84
 
 
85
INT64_T chirp_hdfs_destroy(void)
 
86
{
 
87
        int ret;
 
88
        if(fs == NULL)
 
89
                return 0;
 
90
        debug(D_HDFS, "destroying hdfs connection", chirp_hdfs_hostname, chirp_hdfs_port);
 
91
        ret = hdfs_services->disconnect(fs);
 
92
        if(ret == -1)
 
93
                return ret;
 
94
        fs = NULL;
 
95
        hdfs_library_close(hdfs_services);
 
96
        return 0;
 
97
}
 
98
 
 
99
static void copystat(struct chirp_stat *cs, hdfsFileInfo * hs, const char *path )
 
100
{
 
101
        memset(cs,0,sizeof(*cs));
 
102
        cs->cst_dev = -1;
 
103
        cs->cst_rdev = -2;
 
104
        cs->cst_ino = hash_string(path);
 
105
        cs->cst_mode = hs->mKind == kObjectKindDirectory ? S_IFDIR : S_IFREG;
 
106
 
 
107
        /* HDFS does not have execute bit, lie and set it for all files */
 
108
        cs->cst_mode |= hs->mPermissions | S_IXUSR | S_IXGRP;
 
109
        cs->cst_nlink = hs->mReplication;
 
110
        cs->cst_uid = 0;
 
111
        cs->cst_gid = 0;
 
112
        cs->cst_size = hs->mSize;
 
113
        cs->cst_blksize = hs->mBlockSize;
 
114
 
 
115
        /* If the blocksize is not set, assume 64MB chunksize */
 
116
        if(cs->cst_blksize<1) cs->cst_blksize = 64*1024*1024;
 
117
        cs->cst_blocks = MAX(1,cs->cst_size/cs->cst_blksize);
 
118
 
 
119
        /* Note that hs->mLastAccess is typically zero. */
 
120
        cs->cst_atime = cs->cst_mtime = cs->cst_ctime = hs->mLastMod;
 
121
}
 
122
 
 
123
INT64_T chirp_hdfs_fstat(int fd, struct chirp_stat *buf)
 
124
{
 
125
        return chirp_hdfs_stat(open_files[fd].name, buf);
 
126
}
 
127
 
 
128
INT64_T chirp_hdfs_stat(const char *path, struct chirp_stat * buf)
 
129
{
 
130
        hdfsFileInfo *file_info;
 
131
 
 
132
        path = FIXPATH(path);
 
133
 
 
134
        debug(D_HDFS, "stat %s", path);
 
135
 
 
136
        file_info = hdfs_services->stat(fs,path);
 
137
        if(file_info == NULL)
 
138
                return (errno = ENOENT, -1);
 
139
        copystat(buf, file_info, path);
 
140
        hdfs_services->free_stat(file_info, 1);
 
141
 
 
142
        return 0;
 
143
}
 
144
 
 
145
struct chirp_hdfs_dir {
 
146
        int i;
 
147
        int n;
 
148
        hdfsFileInfo *info;
 
149
        char *path;
 
150
};
 
151
 
 
152
void *chirp_hdfs_opendir(const char *path)
 
153
{
 
154
        struct chirp_hdfs_dir *d;
 
155
 
 
156
        path = FIXPATH(path);
 
157
 
 
158
        debug(D_HDFS, "opendir %s", path);
 
159
 
 
160
        d = xxmalloc(sizeof(struct chirp_hdfs_dir));
 
161
        d->info = hdfs_services->listdir(fs,path, &d->n);
 
162
        d->i = 0;
 
163
        d->path = xstrdup(path);
 
164
 
 
165
        if(d->info == NULL) {
 
166
                free(d);
 
167
                errno = ENOENT;
 
168
                return 0;
 
169
        }
 
170
 
 
171
        return d;
 
172
}
 
173
 
 
174
char *chirp_hdfs_readdir(void *dir)
 
175
{
 
176
        struct chirp_hdfs_dir *d = (struct chirp_hdfs_dir *) dir;
 
177
        debug(D_HDFS, "readdir %s", d->path);
 
178
        if(d->i < d->n) {
 
179
                /* mName is of the form hdfs:/hostname:port/path/to/file */
 
180
                char *entry = d->info[d->i++].mName;
 
181
                entry += strlen(entry); /* now points to nul byte */
 
182
                while(entry[-1] != '/')
 
183
                        entry--;
 
184
                return entry;
 
185
        } else
 
186
                return NULL;
 
187
}
 
188
 
 
189
void chirp_hdfs_closedir(void *dir)
 
190
{
 
191
        struct chirp_hdfs_dir *d = (struct chirp_hdfs_dir *) dir;
 
192
        debug(D_HDFS, "closedir", d->path);
 
193
        hdfs_services->free_stat(d->info, d->n);
 
194
        free(d->path);
 
195
        free(d);
 
196
}
 
197
 
 
198
INT64_T chirp_hdfs_file_size(const char *path)
 
199
{
 
200
        struct chirp_stat info;
 
201
        path = FIXPATH(path);
 
202
        if(chirp_hdfs_stat(path, &info) == 0) {
 
203
                return info.cst_size;
 
204
        } else {
 
205
                return -1;
 
206
        }
 
207
}
 
208
 
 
209
INT64_T chirp_hdfs_fd_size(int fd)
 
210
{
 
211
        struct chirp_stat info;
 
212
        debug(D_HDFS, "fstat on file descriptor %d, path = %s", fd, open_files[fd].name);
 
213
        if(chirp_hdfs_fstat(fd, &info) == 0) {
 
214
                return info.cst_size;
 
215
        } else {
 
216
                return -1;
 
217
        }
 
218
}
 
219
 
 
220
static INT64_T get_fd(void)
 
221
{
 
222
        INT64_T fd;
 
223
        /* find an unused file descriptor */
 
224
        for(fd = 0; fd < BASE_SIZE; fd++)
 
225
                if(open_files[fd].name == NULL)
 
226
                        return fd;
 
227
        debug(D_HDFS, "too many files open");
 
228
        errno = EMFILE;
 
229
        return -1;
 
230
}
 
231
 
 
232
static char *read_buffer(const char *path, int entire_file, INT64_T * size)
 
233
{
 
234
        hdfsFile file;
 
235
        char *buffer;
 
236
        INT64_T current = 0;
 
237
 
 
238
        if(entire_file) {       /* read entire file? */
 
239
                struct chirp_stat info;
 
240
                if(chirp_hdfs_stat(path, &info) == -1)
 
241
                        return NULL;
 
242
                *size = info.cst_size;
 
243
        }
 
244
 
 
245
        file = hdfs_services->open(fs,path, O_RDONLY, 0, 0, 0);
 
246
        if(file == NULL)
 
247
                return NULL;
 
248
 
 
249
        buffer = xxmalloc(sizeof(char) * (*size));
 
250
        memset(buffer, 0, sizeof(char) * (*size));
 
251
 
 
252
        while(current < *size) {
 
253
                INT64_T ractual = hdfs_services->read(fs, file, buffer + current, *size - current);
 
254
                if(ractual <= 0)
 
255
                        break;
 
256
                current += ractual;
 
257
        }
 
258
        hdfs_services->close(fs, file);
 
259
        return buffer;
 
260
}
 
261
 
 
262
static INT64_T write_buffer(const char *path, const char *buffer, size_t size)
 
263
{
 
264
        hdfsFile file;
 
265
        INT64_T current = 0;
 
266
        INT64_T fd;
 
267
 
 
268
        fd = get_fd();
 
269
        if(fd == -1)
 
270
                return -1;
 
271
 
 
272
        file = hdfs_services->open(fs,path, O_WRONLY, 0, 0, 0);
 
273
        if(file == NULL)
 
274
                return -1;      /* errno is set */
 
275
 
 
276
        while(current < size) {
 
277
                INT64_T wactual = hdfs_services->write(fs, file, buffer, size - current);
 
278
                if(wactual == -1)
 
279
                        return -1;
 
280
                current += wactual;
 
281
        }
 
282
        open_files[fd].file = file;
 
283
        open_files[fd].name = xstrdup(path);
 
284
        return fd;
 
285
 
 
286
}
 
287
 
 
288
INT64_T chirp_hdfs_open(const char *path, INT64_T flags, INT64_T mode)
 
289
{
 
290
        INT64_T fd, stat_result;
 
291
        struct chirp_stat info;
 
292
 
 
293
        path = FIXPATH(path);
 
294
 
 
295
        stat_result = chirp_hdfs_stat(path, &info);
 
296
 
 
297
        fd = get_fd();
 
298
        if(fd == -1)
 
299
                return -1;
 
300
 
 
301
        mode = 0600 | (mode & 0100);
 
302
        switch (flags & O_ACCMODE) {
 
303
        case O_RDONLY:
 
304
                debug(D_HDFS, "opening file %s (flags: %o) for reading; mode: %o", path, flags, mode);
 
305
                if(stat_result == -1)
 
306
                        return (errno = ENOENT, -1);    /* HDFS screws this up */
 
307
                break;
 
308
        case O_WRONLY:
 
309
                debug(D_HDFS, "opening file %s (flags: %o) for writing; mode: %o", path, flags, mode);
 
310
                /* Check if file exists already */
 
311
                if(stat_result < 0) {
 
312
                        flags = O_WRONLY;
 
313
                        break;  /* probably doesn't exist, continue.... */
 
314
                } else if(S_ISDIR(info.cst_mode))
 
315
                        return (errno = EISDIR, -1);
 
316
                else if(O_TRUNC & flags) {
 
317
                        /* delete file, then open again */
 
318
                        INT64_T result = hdfs_services->unlink(fs,path);
 
319
                        if(result == -1)
 
320
                                return (errno = EIO, -1);
 
321
                        flags ^= O_TRUNC;
 
322
                        break;
 
323
                } else if(!(O_APPEND & flags)) {
 
324
                        debug(D_HDFS, "file does not have append flag set, setting it anyway");
 
325
                        /* return (errno = ENOTSUP, -1); */
 
326
                        flags |= O_APPEND;
 
327
                }
 
328
                INT64_T size;
 
329
                char *buffer = read_buffer(path, 1, &size);
 
330
                if(buffer == NULL)
 
331
                        return -1;
 
332
                INT64_T fd = write_buffer(path, buffer, size);
 
333
                free(buffer);
 
334
                return fd;
 
335
        default:
 
336
                debug(D_HDFS, "invalid file open flag %o", flags & O_ACCMODE);
 
337
                return (errno = EINVAL, -1);
 
338
        }
 
339
 
 
340
        open_files[fd].file = hdfs_services->open(fs, path, flags, 0, 0, 0);
 
341
        if(open_files[fd].file == NULL) {
 
342
                debug(D_HDFS, "could not open file %s", path);
 
343
                return -1;
 
344
        } else {
 
345
                open_files[fd].name = xstrdup(path);
 
346
                return fd;
 
347
        }
 
348
}
 
349
 
 
350
INT64_T chirp_hdfs_close(int fd)
 
351
{
 
352
        debug(D_HDFS, "closing file %s", open_files[fd].name);
 
353
        free(open_files[fd].name);
 
354
        open_files[fd].name = NULL;
 
355
        return hdfs_services->close(fs, open_files[fd].file);
 
356
}
 
357
 
 
358
INT64_T chirp_hdfs_pread(int fd, void *buffer, INT64_T length, INT64_T offset)
 
359
{
 
360
        debug(D_HDFS, "pread %s", open_files[fd].name);
 
361
        return hdfs_services->pread(fs, open_files[fd].file, offset, buffer, length);
 
362
}
 
363
 
 
364
INT64_T chirp_hdfs_sread(int fd, void *vbuffer, INT64_T length, INT64_T stride_length, INT64_T stride_skip, INT64_T offset)
 
365
{
 
366
        INT64_T total = 0;
 
367
        INT64_T actual = 0;
 
368
        char *buffer = vbuffer;
 
369
 
 
370
        if(stride_length < 0 || stride_skip < 0 || offset < 0) {
 
371
                errno = EINVAL;
 
372
                return -1;
 
373
        }
 
374
 
 
375
        while(length >= stride_length) {
 
376
                actual = chirp_hdfs_pread(fd, &buffer[total], stride_length, offset);
 
377
                if(actual > 0) {
 
378
                        length -= actual;
 
379
                        total += actual;
 
380
                        offset += stride_skip;
 
381
                        if(actual == stride_length) {
 
382
                                continue;
 
383
                        } else {
 
384
                                break;
 
385
                        }
 
386
                } else {
 
387
                        break;
 
388
                }
 
389
        }
 
390
 
 
391
        if(total > 0) {
 
392
                return total;
 
393
        } else {
 
394
                if(actual < 0) {
 
395
                        return -1;
 
396
                } else {
 
397
                        return 0;
 
398
                }
 
399
        }
 
400
}
 
401
 
 
402
INT64_T chirp_hdfs_pwrite(int fd, const void *buffer, INT64_T length, INT64_T offset)
 
403
{
 
404
        /* FIXME deal with non-appends gracefully using an error if not costly */
 
405
        debug(D_HDFS, "pwrite %s", open_files[fd].name);
 
406
        return hdfs_services->write(fs, open_files[fd].file, buffer, length);
 
407
}
 
408
 
 
409
INT64_T chirp_hdfs_swrite(int fd, const void *vbuffer, INT64_T length, INT64_T stride_length, INT64_T stride_skip, INT64_T offset)
 
410
{
 
411
        INT64_T total = 0;
 
412
        INT64_T actual = 0;
 
413
        const char *buffer = vbuffer;
 
414
 
 
415
        if(stride_length < 0 || stride_skip < 0 || offset < 0) {
 
416
                errno = EINVAL;
 
417
                return -1;
 
418
        }
 
419
 
 
420
        while(length >= stride_length) {
 
421
                actual = chirp_hdfs_pwrite(fd, &buffer[total], stride_length, offset);
 
422
                if(actual > 0) {
 
423
                        length -= actual;
 
424
                        total += actual;
 
425
                        offset += stride_skip;
 
426
                        if(actual == stride_length) {
 
427
                                continue;
 
428
                        } else {
 
429
                                break;
 
430
                        }
 
431
                } else {
 
432
                        break;
 
433
                }
 
434
        }
 
435
 
 
436
        if(total > 0) {
 
437
                return total;
 
438
        } else {
 
439
                if(actual < 0) {
 
440
                        return -1;
 
441
                } else {
 
442
                        return 0;
 
443
                }
 
444
        }
 
445
}
 
446
 
 
447
INT64_T chirp_hdfs_fchown(int fd, INT64_T uid, INT64_T gid)
 
448
{
 
449
        // Changing file ownership is silently ignored,
 
450
        // because permissions are handled through the ACL model.
 
451
        debug(D_HDFS, "fchown %s %ld %ld", open_files[fd].name, (long) uid, (long) gid);
 
452
        return 0;
 
453
}
 
454
 
 
455
INT64_T chirp_hdfs_fchmod(int fd, INT64_T mode)
 
456
{
 
457
        // The owner may only add or remove the execute bit,
 
458
        // because permissions are handled through the ACL model.
 
459
        debug(D_HDFS, "fchmod %s %lo", open_files[fd].name, (long) mode);
 
460
        mode = 0600 | (mode & 0100);
 
461
        return hdfs_services->chmod(fs, open_files[fd].name, mode);
 
462
}
 
463
 
 
464
INT64_T chirp_hdfs_ftruncate(int fd, INT64_T length)
 
465
{
 
466
        debug(D_HDFS, "ftruncate %s %ld", open_files[fd].name, (long) length);
 
467
        INT64_T size = length;
 
468
        char *buffer = read_buffer(open_files[fd].name, 0, &size);
 
469
        if(buffer == NULL)
 
470
                return -1;
 
471
        /* simulate truncate */
 
472
        if(hdfs_services->close(fs, open_files[fd].file) == -1)
 
473
                return (free(buffer), -1);
 
474
        INT64_T fd2 = write_buffer(open_files[fd].name, buffer, size);
 
475
        open_files[fd].file = open_files[fd2].file;     /* copy over new file */
 
476
        free(open_files[fd2].name);     /* close new fd */
 
477
        open_files[fd2].name = NULL;
 
478
        return 0;
 
479
}
 
480
 
 
481
INT64_T chirp_hdfs_fsync(int fd)
 
482
{
 
483
        debug(D_HDFS, "fsync %s", open_files[fd].name);
 
484
        return hdfs_services->flush(fs, open_files[fd].file);
 
485
}
 
486
 
 
487
INT64_T chirp_hdfs_getfile(const char *path, struct link * link, time_t stoptime)
 
488
{
 
489
        int fd;
 
490
        INT64_T result;
 
491
        struct chirp_stat info;
 
492
 
 
493
        path = FIXPATH(path);
 
494
        debug(D_HDFS, "getfile %s", path);
 
495
 
 
496
        result = chirp_hdfs_stat(path, &info);
 
497
        if(result < 0)
 
498
                return result;
 
499
 
 
500
        if(S_ISDIR(info.cst_mode)) {
 
501
                errno = EISDIR;
 
502
                return -1;
 
503
        }
 
504
 
 
505
        fd = chirp_hdfs_open(path, O_RDONLY, 0);
 
506
        if(fd >= 0) {
 
507
                char buffer[65536];
 
508
                INT64_T total = 0;
 
509
                INT64_T ractual, wactual;
 
510
                INT64_T length = info.cst_size;
 
511
 
 
512
                link_putfstring(link, "%lld\n", stoptime, length);
 
513
 
 
514
                // Copy Pasta from link.c
 
515
 
 
516
                while(length > 0) {
 
517
                        INT64_T chunk = MIN(sizeof(buffer), length);
 
518
 
 
519
                        ractual = hdfs_services->read(fs, open_files[fd].file, buffer, chunk);
 
520
                        if(ractual <= 0)
 
521
                                break;
 
522
 
 
523
                        wactual = link_putlstring(link, buffer, ractual, stoptime);
 
524
                        if(wactual != ractual) {
 
525
                                total = -1;
 
526
                                break;
 
527
                        }
 
528
 
 
529
                        total += ractual;
 
530
                        length -= ractual;
 
531
                }
 
532
                result = total;
 
533
                chirp_hdfs_close(fd);
 
534
        } else {
 
535
                result = -1;
 
536
        }
 
537
 
 
538
        return result;
 
539
}
 
540
 
 
541
INT64_T chirp_hdfs_putfile(const char *path, struct link * link, INT64_T length, INT64_T mode, time_t stoptime)
 
542
{
 
543
        int fd;
 
544
        INT64_T result;
 
545
 
 
546
        path = FIXPATH(path);
 
547
 
 
548
        debug(D_HDFS, "putfile %s", path);
 
549
 
 
550
        mode = 0600 | (mode & 0100);
 
551
 
 
552
        fd = chirp_hdfs_open(path, O_WRONLY | O_CREAT | O_TRUNC, (int) mode);
 
553
        if(fd >= 0) {
 
554
                char buffer[65536];
 
555
                INT64_T total = 0;
 
556
 
 
557
                link_putliteral(link, "0\n", stoptime);
 
558
 
 
559
                // Copy Pasta from link.c
 
560
 
 
561
                while(length > 0) {
 
562
                        INT64_T ractual, wactual;
 
563
                        INT64_T chunk = MIN(sizeof(buffer), length);
 
564
 
 
565
                        ractual = link_read(link, buffer, chunk, stoptime);
 
566
                        if(ractual <= 0)
 
567
                                break;
 
568
 
 
569
                        wactual = hdfs_services->write(fs, open_files[fd].file, buffer, ractual);
 
570
                        if(wactual != ractual) {
 
571
                                total = -1;
 
572
                                break;
 
573
                        }
 
574
 
 
575
                        total += ractual;
 
576
                        length -= ractual;
 
577
                }
 
578
 
 
579
                result = total;
 
580
 
 
581
                if(length != 0) {
 
582
                        if(result >= 0)
 
583
                                link_soak(link, length - result, stoptime);
 
584
                        result = -1;
 
585
                }
 
586
                chirp_hdfs_close(fd);
 
587
        } else {
 
588
                result = -1;
 
589
        }
 
590
        return result;
 
591
}
 
592
 
 
593
INT64_T chirp_hdfs_mkfifo(const char *path)
 
594
{
 
595
        path = FIXPATH(path);
 
596
        debug(D_HDFS, "mkfifo %s", path);
 
597
        return (errno = ENOTSUP, -1);
 
598
}
 
599
 
 
600
INT64_T chirp_hdfs_unlink(const char *path)
 
601
{
 
602
        path = FIXPATH(path);
 
603
        debug(D_HDFS, "unlink %s", path);
 
604
        /* FIXME unlink does not set errno properly on failure! */
 
605
        int ret = hdfs_services->unlink(fs, path);
 
606
        if(ret == -1)
 
607
                errno = EEXIST; /* FIXME bad fix to above problem */
 
608
        return 0;
 
609
}
 
610
 
 
611
INT64_T chirp_hdfs_rename(const char *path, const char *newpath)
 
612
{
 
613
        path = FIXPATH(path);
 
614
        newpath = FIXPATH(path);
 
615
        debug(D_HDFS, "rename %s -> %s", path, newpath);
 
616
        hdfs_services->unlink(fs, newpath);
 
617
        return hdfs_services->rename(fs, path, newpath);
 
618
}
 
619
 
 
620
INT64_T chirp_hdfs_link(const char *path, const char *newpath)
 
621
{
 
622
        path = FIXPATH(path);
 
623
        newpath = FIXPATH(path);
 
624
        debug(D_HDFS, "link %s -> %s", path, newpath);
 
625
        return (errno = ENOTSUP, -1);
 
626
}
 
627
 
 
628
INT64_T chirp_hdfs_symlink(const char *path, const char *newpath)
 
629
{
 
630
        path = FIXPATH(path);
 
631
        newpath = FIXPATH(path);
 
632
        debug(D_HDFS, "symlink %s -> %s", path, newpath);
 
633
        return (errno = ENOTSUP, -1);
 
634
}
 
635
 
 
636
INT64_T chirp_hdfs_readlink(const char *path, char *buf, INT64_T length)
 
637
{
 
638
        path = FIXPATH(path);
 
639
        debug(D_HDFS, "readlink %s", path);
 
640
        return (errno = EINVAL, -1);
 
641
}
 
642
 
 
643
INT64_T chirp_hdfs_mkdir(const char *path, INT64_T mode)
 
644
{
 
645
        path = FIXPATH(path);
 
646
        debug(D_HDFS, "mkdir %s", path);
 
647
        return hdfs_services->mkdir(fs, path);
 
648
}
 
649
 
 
650
/*
 
651
rmdir is a little unusual.
 
652
An 'empty' directory may contain some administrative
 
653
files such as an ACL and an allocation state.
 
654
Only delete the directory if it contains only those files.
 
655
*/
 
656
 
 
657
INT64_T chirp_hdfs_rmdir(const char *path)
 
658
{
 
659
        void *dir;
 
660
        char *d;
 
661
        int empty = 1;
 
662
 
 
663
        path = FIXPATH(path);
 
664
        debug(D_HDFS, "rmdir %s", path);
 
665
 
 
666
        dir = chirp_hdfs_opendir(path);
 
667
        if(dir) {
 
668
                while((d = chirp_hdfs_readdir(dir))) {
 
669
                        if(!strcmp(d, "."))
 
670
                                continue;
 
671
                        if(!strcmp(d, ".."))
 
672
                                continue;
 
673
                        if(!strncmp(d, ".__", 3))
 
674
                                continue;
 
675
                        empty = 0;
 
676
                        break;
 
677
                }
 
678
                chirp_hdfs_closedir(dir);
 
679
 
 
680
                if(empty) {
 
681
                        return hdfs_services->unlink(fs, path);
 
682
                } else {
 
683
                        errno = ENOTEMPTY;
 
684
                        return -1;
 
685
                }
 
686
        } else {
 
687
                return -1;
 
688
        }
 
689
}
 
690
 
 
691
INT64_T chirp_hdfs_lstat(const char *path, struct chirp_stat * buf)
 
692
{
 
693
        path = FIXPATH(path);
 
694
        debug(D_HDFS, "lstat %s", path);
 
695
        return chirp_hdfs_stat(path, buf);
 
696
}
 
697
 
 
698
INT64_T chirp_hdfs_statfs(const char *path, struct chirp_statfs * buf)
 
699
{
 
700
        path = FIXPATH(path);
 
701
        debug(D_HDFS, "statfs %s", path);
 
702
 
 
703
        INT64_T capacity = hdfs_services->get_capacity(fs);
 
704
        INT64_T used = hdfs_services->get_used(fs);
 
705
        INT64_T blocksize = hdfs_services->get_default_block_size(fs);
 
706
 
 
707
        if(capacity == -1 || used == -1 || blocksize == -1)
 
708
                return (errno = EIO, -1);
 
709
 
 
710
        buf->f_type = 0;        /* FIXME */
 
711
        buf->f_bsize = blocksize;
 
712
        buf->f_blocks = capacity / blocksize;
 
713
        buf->f_bavail = buf->f_bfree = used / blocksize;
 
714
        buf->f_files = buf->f_ffree = 0;
 
715
 
 
716
        return 0;
 
717
}
 
718
 
 
719
INT64_T chirp_hdfs_fstatfs(int fd, struct chirp_statfs * buf)
 
720
{
 
721
        debug(D_HDFS, "fstatfs %d", fd);
 
722
 
 
723
        return chirp_hdfs_statfs("/", buf);
 
724
}
 
725
 
 
726
INT64_T chirp_hdfs_access(const char *path, INT64_T mode)
 
727
{
 
728
        /* W_OK is ok to delete, not to write, but we can't distinguish intent */
 
729
        /* Chirp ACL will check that we can access the file the way we want, so
 
730
           we just do a redundant "exists" check */
 
731
        path = FIXPATH(path);
 
732
        debug(D_HDFS, "access %s %ld", path, (long) mode);
 
733
        return hdfs_services->exists(fs, path);
 
734
}
 
735
 
 
736
INT64_T chirp_hdfs_chmod(const char *path, INT64_T mode)
 
737
{
 
738
        // The owner may only add or remove the execute bit,
 
739
        // because permissions are handled through the ACL model.
 
740
        path = FIXPATH(path);
 
741
        debug(D_HDFS, "chmod %s %ld", path, (long) mode);
 
742
        mode = 0600 | (mode & 0100);
 
743
        return hdfs_services->chmod(fs, path, mode);
 
744
}
 
745
 
 
746
INT64_T chirp_hdfs_chown(const char *path, INT64_T uid, INT64_T gid)
 
747
{
 
748
        // Changing file ownership is silently ignored,
 
749
        // because permissions are handled through the ACL model.
 
750
        path = FIXPATH(path);
 
751
        debug(D_HDFS, "chown (ignored) %s %ld %ld", path, (long) uid, (long) gid);
 
752
        return 0;
 
753
}
 
754
 
 
755
INT64_T chirp_hdfs_lchown(const char *path, INT64_T uid, INT64_T gid)
 
756
{
 
757
        // Changing file ownership is silently ignored,
 
758
        // because permissions are handled through the ACL model.
 
759
        path = FIXPATH(path);
 
760
        debug(D_HDFS, "lchown (ignored) %s %ld %ld", path, (long) uid, (long) gid);
 
761
        return 0;
 
762
}
 
763
 
 
764
INT64_T chirp_hdfs_truncate(const char *path, INT64_T length)
 
765
{
 
766
        path = FIXPATH(path);
 
767
        debug(D_HDFS, "truncate %s %ld", path, (long) length);
 
768
        /* simulate truncate */
 
769
        INT64_T size = length;
 
770
        char *buffer = read_buffer(path, 0, &size);
 
771
        if(buffer == NULL)
 
772
                return -1;
 
773
        INT64_T fd = write_buffer(path, buffer, size);
 
774
        free(open_files[fd].name);
 
775
        free(buffer);
 
776
        open_files[fd].name = NULL;
 
777
        return 0;
 
778
}
 
779
 
 
780
INT64_T chirp_hdfs_utime(const char *path, time_t actime, time_t modtime)
 
781
{
 
782
        path = FIXPATH(path);
 
783
        debug(D_HDFS, "utime %s %ld %ld", path, (long) actime, (long) modtime);
 
784
        return hdfs_services->utime(fs, path, modtime, actime);
 
785
}
 
786
 
 
787
INT64_T chirp_hdfs_md5(const char *path, unsigned char digest[16])
 
788
{
 
789
        int fd;
 
790
        INT64_T result;
 
791
        struct chirp_stat info;
 
792
 
 
793
        path = FIXPATH(path);
 
794
 
 
795
        debug(D_HDFS, "md5sum %s", path);
 
796
 
 
797
        result = chirp_hdfs_stat(path, &info);
 
798
        if(result < 0)
 
799
                return result;
 
800
 
 
801
        if(S_ISDIR(info.cst_mode)) {
 
802
                errno = EISDIR;
 
803
                return -1;
 
804
        }
 
805
 
 
806
        fd = chirp_hdfs_open(path, O_RDONLY, 0);
 
807
        if(fd >= 0) {
 
808
                char buffer[65536];
 
809
                //INT64_T total=0;
 
810
                INT64_T ractual;
 
811
                INT64_T length = info.cst_size;
 
812
                md5_context_t ctx;
 
813
 
 
814
                md5_init(&ctx);
 
815
 
 
816
                while(length > 0) {
 
817
                        INT64_T chunk = MIN(sizeof(buffer), length);
 
818
 
 
819
                        ractual = hdfs_services->read(fs, open_files[fd].file, buffer, chunk);
 
820
                        if(ractual <= 0)
 
821
                                break;
 
822
 
 
823
                        md5_update(&ctx, (unsigned char *) buffer, ractual);
 
824
 
 
825
                        //total += ractual;
 
826
                        length -= ractual;
 
827
                }
 
828
                result = 0;
 
829
                chirp_hdfs_close(fd);
 
830
                md5_final(digest, &ctx);
 
831
        } else {
 
832
                result = -1;
 
833
        }
 
834
 
 
835
        return result;
 
836
}
 
837
 
 
838
INT64_T chirp_hdfs_chdir(const char *path)
 
839
{
 
840
        debug(D_HDFS, "chdir %s", path);
 
841
        return hdfs_services->chdir(fs, path);
 
842
}
 
843
 
 
844
struct chirp_filesystem chirp_hdfs_fs = {
 
845
        chirp_hdfs_init,
 
846
        chirp_hdfs_destroy,
 
847
 
 
848
        chirp_hdfs_open,
 
849
        chirp_hdfs_close,
 
850
        chirp_hdfs_pread,
 
851
        chirp_hdfs_pwrite,
 
852
        chirp_hdfs_sread,
 
853
        chirp_hdfs_swrite,
 
854
        chirp_hdfs_fstat,
 
855
        chirp_hdfs_fstatfs,
 
856
        chirp_hdfs_fchown,
 
857
        chirp_hdfs_fchmod,
 
858
        chirp_hdfs_ftruncate,
 
859
        chirp_hdfs_fsync,
 
860
 
 
861
        chirp_hdfs_opendir,
 
862
        chirp_hdfs_readdir,
 
863
        chirp_hdfs_closedir,
 
864
 
 
865
        chirp_hdfs_getfile,
 
866
        chirp_hdfs_putfile,
 
867
 
 
868
        chirp_hdfs_mkfifo,
 
869
        chirp_hdfs_unlink,
 
870
        chirp_hdfs_rename,
 
871
        chirp_hdfs_link,
 
872
        chirp_hdfs_symlink,
 
873
        chirp_hdfs_readlink,
 
874
        chirp_hdfs_chdir,
 
875
        chirp_hdfs_mkdir,
 
876
        chirp_hdfs_rmdir,
 
877
        chirp_hdfs_stat,
 
878
        chirp_hdfs_lstat,
 
879
        chirp_hdfs_statfs,
 
880
        chirp_hdfs_access,
 
881
        chirp_hdfs_chmod,
 
882
        chirp_hdfs_chown,
 
883
        chirp_hdfs_lchown,
 
884
        chirp_hdfs_truncate,
 
885
        chirp_hdfs_utime,
 
886
        chirp_hdfs_md5,
 
887
 
 
888
        chirp_hdfs_file_size,
 
889
        chirp_hdfs_fd_size,
 
890
};