~ubuntu-branches/ubuntu/vivid/cctools/vivid

« back to all changes in this revision

Viewing changes to dttools/src/work_queue_worker.c

  • Committer: Bazaar Package Importer
  • Author(s): Michael Hanke
  • Date: 2011-05-07 09:05:00 UTC
  • Revision ID: james.westby@ubuntu.com-20110507090500-lqpmdtwndor6e7os
Tags: upstream-3.3.2
ImportĀ upstreamĀ versionĀ 3.3.2

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*
 
2
Copyright (C) 2008- The University of Notre Dame
 
3
This software is distributed under the GNU General Public License.
 
4
See the file COPYING for details.
 
5
*/
 
6
 
 
7
#include "work_queue.h"
 
8
 
 
9
#include "catalog_query.h"
 
10
#include "catalog_server.h"
 
11
#include "datagram.h"
 
12
#include "domain_name_cache.h"
 
13
#include "nvpair.h"
 
14
#include "copy_stream.h"
 
15
#include "memory_info.h"
 
16
#include "disk_info.h"
 
17
#include "hash_cache.h"
 
18
#include "link.h"
 
19
#include "list.h"
 
20
#include "xmalloc.h"
 
21
#include "debug.h"
 
22
#include "stringtools.h"
 
23
#include "load_average.h"
 
24
#include "domain_name_cache.h"
 
25
#include "getopt.h"
 
26
#include "full_io.h"
 
27
#include "create_dir.h"
 
28
 
 
29
#include <stdio.h>
 
30
#include <time.h>
 
31
#include <unistd.h>
 
32
#include <stdlib.h>
 
33
#include <string.h>
 
34
#include <fcntl.h>
 
35
#include <errno.h>
 
36
#include <math.h>
 
37
#include <signal.h>
 
38
#include <dirent.h>
 
39
 
 
40
#include <sys/types.h>
 
41
#include <sys/stat.h>
 
42
#include <sys/mman.h>
 
43
#include <sys/signal.h>
 
44
 
 
45
 
 
46
// Maximum time to wait before aborting if there is no connection to the master.
 
47
static int idle_timeout = 900;
 
48
 
 
49
// Maxium time to wait before switching to another master.
 
50
static int master_timeout = 60;
 
51
 
 
52
// Maximum time to wait when actively communicating with the master.
 
53
static int active_timeout = 3600;
 
54
 
 
55
// Flag gets set on receipt of a terminal signal.
 
56
static int abort_flag = 0;
 
57
 
 
58
// Catalog mode control variables
 
59
static int auto_worker = 0;
 
60
static int exclusive_worker = 1;
 
61
static const int non_preference_priority_max = 100;
 
62
static char *catalog_server_host = NULL;
 
63
static int catalog_server_port = 0;
 
64
static struct wq_master *actual_master = NULL;
 
65
 
 
66
static struct list *preferred_masters = NULL;
 
67
static struct hash_cache *bad_masters = NULL;
 
68
 
 
69
struct wq_master {
 
70
        char addr[LINK_ADDRESS_MAX];
 
71
        int port;
 
72
        char proj[WORK_QUEUE_NAME_MAX];
 
73
        int priority;
 
74
};
 
75
 
 
76
void debug_print_masters(struct list *ml)
 
77
{
 
78
        struct wq_master *m;
 
79
        int count = 0;
 
80
 
 
81
        debug(D_WQ, "All available Masters:\n");
 
82
        list_first_item(ml);
 
83
        while((m = (struct wq_master *) list_next_item(ml))) {
 
84
                debug(D_WQ, "Master %d:\n", ++count);
 
85
                debug(D_WQ, "addr:\t%s\n", m->addr);
 
86
                debug(D_WQ, "port:\t%d\n", m->port);
 
87
                debug(D_WQ, "project:\t%s\n", m->proj);
 
88
                debug(D_WQ, "priority:\t%d\n", m->priority);
 
89
                debug(D_WQ, "\n");
 
90
        }
 
91
}
 
92
 
 
93
 
 
94
static void make_hash_key(const char *addr, int port, char *key)
 
95
{
 
96
        sprintf(key, "%s:%d", addr, port);
 
97
}
 
98
 
 
99
int parse_catalog_server_description(char *server_string, char **host, int *port)
 
100
{
 
101
        char *colon;
 
102
 
 
103
        colon = strchr(server_string, ':');
 
104
 
 
105
        if(!colon) {
 
106
                *host = NULL;
 
107
                *port = 0;
 
108
                return 0;
 
109
        }
 
110
 
 
111
        *colon = '\0';
 
112
 
 
113
        *host = strdup(server_string);
 
114
        *port = atoi(colon + 1);
 
115
 
 
116
        return *port;
 
117
}
 
118
 
 
119
struct wq_master *parse_wq_master_nvpair(struct nvpair *nv)
 
120
{
 
121
        struct wq_master *m;
 
122
 
 
123
        m = xxmalloc(sizeof(struct wq_master));
 
124
 
 
125
        strncpy(m->addr, nvpair_lookup_string(nv, "address"), LINK_ADDRESS_MAX);
 
126
        strncpy(m->proj, nvpair_lookup_string(nv, "project"), WORK_QUEUE_NAME_MAX);
 
127
        m->port = nvpair_lookup_integer(nv, "port");
 
128
        m->priority = nvpair_lookup_integer(nv, "priority");
 
129
        if(m->priority < 0)
 
130
                m->priority = 0;
 
131
 
 
132
        return m;
 
133
}
 
134
 
 
135
struct wq_master *duplicate_wq_master(struct wq_master *master)
 
136
{
 
137
        struct wq_master *m;
 
138
 
 
139
        m = xxmalloc(sizeof(struct wq_master));
 
140
        strncpy(m->addr, master->addr, LINK_ADDRESS_MAX);
 
141
        strncpy(m->proj, master->proj, WORK_QUEUE_NAME_MAX);
 
142
        m->port = master->port;
 
143
        m->priority = master->priority;
 
144
        if(m->priority < 0)
 
145
                m->priority = 0;
 
146
 
 
147
        return m;
 
148
}
 
149
 
 
150
/**
 
151
 * Reasons for a master being bad:
 
152
 * 1. The master does not need more workers right now;
 
153
 * 2. The master is already shut down but its record is still in the catalog server.
 
154
 */
 
155
static void record_bad_master(struct wq_master *m)
 
156
{
 
157
        char key[LINK_ADDRESS_MAX + 10];        // addr:port
 
158
        int lifetime = 10;
 
159
 
 
160
        if(!m)
 
161
                return;
 
162
 
 
163
        make_hash_key(m->addr, m->port, key);
 
164
        hash_cache_insert(bad_masters, key, m, lifetime);
 
165
        debug(D_WQ, "Master at %s:%d is not receiving more workers.\nWon't connect to this master in %d seconds.", m->addr, m->port, lifetime);
 
166
}
 
167
 
 
168
struct list *get_work_queue_masters(const char *catalog_host, int catalog_port)
 
169
{
 
170
        struct catalog_query *q;
 
171
        struct nvpair *nv;
 
172
        struct list *ml;
 
173
        struct wq_master *m;
 
174
        char *pm;
 
175
        time_t timeout = 60, stoptime;
 
176
        char key[LINK_ADDRESS_MAX + 10];        // addr:port
 
177
 
 
178
        stoptime = time(0) + timeout;
 
179
 
 
180
        q = catalog_query_create(catalog_host, catalog_port, stoptime);
 
181
        if(!q) {
 
182
                fprintf(stderr, "Failed to query catalog server at %s:%d\n", catalog_host, catalog_port);
 
183
                return NULL;
 
184
        }
 
185
 
 
186
        ml = list_create();
 
187
        if(!ml)
 
188
                return NULL;
 
189
 
 
190
        while((nv = catalog_query_read(q, stoptime))) {
 
191
                if(strcmp(nvpair_lookup_string(nv, "type"), CATALOG_TYPE_WORK_QUEUE_MASTER) == 0) {
 
192
                        m = parse_wq_master_nvpair(nv);
 
193
 
 
194
                        list_first_item(preferred_masters);
 
195
                        while((pm = (char *) list_next_item(preferred_masters))) {
 
196
                                if(whole_string_match_regex(m->proj, pm)) {
 
197
                                        // preferred master found
 
198
                                        break;
 
199
                                }
 
200
                        }
 
201
 
 
202
                        if(pm) {
 
203
                                // This is a preferred master
 
204
                                m->priority += non_preference_priority_max;
 
205
                        } else {
 
206
                                // Master name does not match any preferred master names
 
207
                                if(exclusive_worker) {
 
208
                                        continue;
 
209
                                } else {
 
210
                                        m->priority = non_preference_priority_max < m->priority ? non_preference_priority_max : m->priority;
 
211
                                }
 
212
                        }
 
213
 
 
214
                        // exclude 'bad' masters
 
215
                        make_hash_key(m->addr, m->port, key);
 
216
                        if(!hash_cache_lookup(bad_masters, key)) {
 
217
                                list_push_priority(ml, m, m->priority);
 
218
                        }
 
219
                }
 
220
                nvpair_delete(nv);
 
221
        }
 
222
 
 
223
        // Must delete the query otherwise it would occupy 1 tcp connection forever!
 
224
        catalog_query_delete(q);
 
225
        return ml;
 
226
}
 
227
 
 
228
struct link *auto_link_connect(char *addr, int *port, time_t master_stoptime)
 
229
{
 
230
        struct link *master = 0;
 
231
        struct list *ml;
 
232
        struct wq_master *m;
 
233
 
 
234
        ml = get_work_queue_masters(catalog_server_host, catalog_server_port);
 
235
        if(!ml)
 
236
                return NULL;
 
237
        debug_print_masters(ml);
 
238
 
 
239
        list_first_item(ml);
 
240
        while((m = (struct wq_master *) list_next_item(ml))) {
 
241
                master = link_connect(m->addr, m->port, master_stoptime);
 
242
                if(master) {
 
243
                        debug(D_WQ, "Talking to the Master at:\n");
 
244
                        debug(D_WQ, "addr:\t%s\n", m->addr);
 
245
                        debug(D_WQ, "port:\t%d\n", m->port);
 
246
                        debug(D_WQ, "project:\t%s\n", m->proj);
 
247
                        debug(D_WQ, "priority:\t%d\n", m->priority);
 
248
                        debug(D_WQ, "\n");
 
249
 
 
250
                        strncpy(addr, m->addr, LINK_ADDRESS_MAX);
 
251
                        (*port) = m->port;
 
252
 
 
253
                        if(actual_master)
 
254
                                free(actual_master);
 
255
                        actual_master = duplicate_wq_master(m);
 
256
 
 
257
                        break;
 
258
                } else {
 
259
                        record_bad_master(duplicate_wq_master(m));
 
260
                }
 
261
        }
 
262
 
 
263
        list_free(ml);
 
264
        list_delete(ml);
 
265
 
 
266
        return master;
 
267
}
 
268
 
 
269
/**
 
270
 * Stream file/directory contents for the rget protocol.
 
271
 * Format:
 
272
 *              for a directory: a new line in the format of "dir $DIR_NAME 0"
 
273
 *              for a file: a new line in the format of "file $FILE_NAME $FILE_LENGTH"
 
274
 *                                      then file contents.
 
275
 *              string "end" at the end of the stream (on a new line).
 
276
 *
 
277
 * Example:
 
278
 * Assume we have the following directory structure:
 
279
 * mydir
 
280
 *              -- 1.txt
 
281
 *              -- 2.txt
 
282
 *              -- mysubdir
 
283
 *                      -- a.txt
 
284
 *                      -- b.txt
 
285
 *              -- z.jpg
 
286
 *
 
287
 * The stream contents would be:
 
288
 *
 
289
 * dir mydir 0
 
290
 * file 1.txt $file_len
 
291
 * $$ FILE 1.txt's CONTENTS $$
 
292
 * file 2.txt $file_len
 
293
 * $$ FILE 2.txt's CONTENTS $$
 
294
 * dir mysubdir 0
 
295
 * file mysubdir/a.txt $file_len
 
296
 * $$ FILE mysubdir/a.txt's CONTENTS $$
 
297
 * file mysubdir/b.txt $file_len
 
298
 * $$ FILE mysubdir/b.txt's CONTENTS $$
 
299
 * file z.jpg $file_len
 
300
 * $$ FILE z.jpg's CONTENTS $$
 
301
 * end
 
302
 *
 
303
 */
 
304
int stream_output_item(struct link *master, const char *filename)
 
305
{
 
306
        DIR *dir;
 
307
        struct dirent *dent;
 
308
        char dentline[WORK_QUEUE_LINE_MAX];
 
309
        struct stat info;
 
310
        INT64_T actual, length;
 
311
        int fd;
 
312
 
 
313
        if(stat(filename, &info) != 0) {
 
314
                goto failure;
 
315
        }
 
316
 
 
317
        if(S_ISDIR(info.st_mode)) {
 
318
                // stream a directory
 
319
                dir = opendir(filename);
 
320
                if(!dir) {
 
321
                        goto failure;
 
322
                }
 
323
                link_putfstring(master, "dir %s %lld\n", time(0) + active_timeout, filename, (INT64_T) 0);
 
324
 
 
325
                while((dent = readdir(dir))) {
 
326
                        if(!strcmp(dent->d_name, ".") || !strcmp(dent->d_name, ".."))
 
327
                                continue;
 
328
                        sprintf(dentline, "%s/%s", filename, dent->d_name);
 
329
                        stream_output_item(master, dentline);
 
330
                }
 
331
 
 
332
                closedir(dir);
 
333
        } else {
 
334
                // stream a file
 
335
                fd = open(filename, O_RDONLY, 0);
 
336
                if(fd >= 0) {
 
337
                        length = (INT64_T) info.st_size;
 
338
                        link_putfstring(master, "file %s %lld\n", time(0) + active_timeout, filename, length);
 
339
                        actual = link_stream_from_fd(master, fd, length, time(0) + active_timeout);
 
340
                        close(fd);
 
341
                        if(actual != length) {
 
342
                                debug(D_WQ, "Sending back output file - %s failed: bytes to send = %lld and bytes actually sent = %lld.", filename, length, actual);
 
343
                                return 0;
 
344
                        }
 
345
                } else {
 
346
                        goto failure;
 
347
                }
 
348
        }
 
349
 
 
350
        return 1;
 
351
 
 
352
      failure:
 
353
        fprintf(stderr, "Failed to transfer ouput item - %s. (%s)\n", filename, strerror(errno));
 
354
        link_putfstring(master, "missing %s %d\n", time(0) + active_timeout, filename, errno);
 
355
        return 0;
 
356
}
 
357
 
 
358
static void handle_abort(int sig)
 
359
{
 
360
        abort_flag = 1;
 
361
}
 
362
 
 
363
static void show_version(const char *cmd)
 
364
{
 
365
        printf("%s version %d.%d.%d built by %s@%s on %s at %s\n", cmd, CCTOOLS_VERSION_MAJOR, CCTOOLS_VERSION_MINOR, CCTOOLS_VERSION_MICRO, BUILD_USER, BUILD_HOST, __DATE__, __TIME__);
 
366
}
 
367
 
 
368
static void show_help(const char *cmd)
 
369
{
 
370
        printf("Use: %s <masterhost> <port>\n", cmd);
 
371
        printf("where options are:\n");
 
372
        printf(" -a             Enable auto mode. In this mode the worker would ask a catalog server for available masters.\n");
 
373
        printf(" -C <catalog>   Set catalog server to <catalog>. Format: HOSTNAME:PORT \n");
 
374
        printf(" -d <subsystem> Enable debugging for this subsystem.\n");
 
375
        printf(" -N <project>   Name of a preferred project. A worker can have multiple preferred projects.\n");
 
376
        printf(" -s             Run as a shared worker. By default the worker would only work on preferred projects.\n");
 
377
        printf(" -t <time>      Abort after this amount of idle time. (default=%ds)\n", idle_timeout);
 
378
        printf(" -o <file>      Send debugging to this file.\n");
 
379
        printf(" -v             Show version string\n");
 
380
        printf(" -w <size>      Set TCP window size.\n");
 
381
        printf(" -h             Show this help screen\n");
 
382
}
 
383
 
 
384
int main(int argc, char *argv[])
 
385
{
 
386
        const char *host = NULL;
 
387
        char *pm;
 
388
        int port = WORK_QUEUE_DEFAULT_PORT;
 
389
        char actual_addr[LINK_ADDRESS_MAX];
 
390
        int actual_port;
 
391
        struct link *master = 0;
 
392
        char addr[LINK_ADDRESS_MAX];
 
393
        UINT64_T memory_avail, memory_total;
 
394
        UINT64_T disk_avail, disk_total;
 
395
        int ncpus;
 
396
        char c;
 
397
        char hostname[DOMAIN_NAME_MAX];
 
398
        int w;
 
399
        char preferred_master_names[WORK_QUEUE_LINE_MAX];
 
400
 
 
401
        ncpus = load_average_get_cpus();
 
402
        memory_info_get(&memory_avail, &memory_total);
 
403
        disk_info_get(".", &disk_avail, &disk_total);
 
404
 
 
405
        preferred_masters = list_create();
 
406
        if(!preferred_masters) {
 
407
                fprintf(stderr, "Cannot allocate memory to store preferred work queue masters names.\n");
 
408
                exit(1);
 
409
        }
 
410
 
 
411
 
 
412
        debug_config(argv[0]);
 
413
 
 
414
        while((c = getopt(argc, argv, "aC:d:ihN:o:st:w:v")) != (char) -1) {
 
415
                switch (c) {
 
416
                case 'a':
 
417
                        auto_worker = 1;
 
418
                        break;
 
419
                case 'C':
 
420
                        port = parse_catalog_server_description(optarg, &catalog_server_host, &catalog_server_port);
 
421
                        if(!port) {
 
422
                                fprintf(stderr, "The provided catalog server is invalid. The format of the '-s' option should be '-s HOSTNAME:PORT'.\n");
 
423
                                exit(1);
 
424
                        }
 
425
                        auto_worker = 1;
 
426
                        break;
 
427
                case 's':
 
428
                        auto_worker = 1;
 
429
                        // This is a shared worker
 
430
                        exclusive_worker = 0;
 
431
                        break;
 
432
                case 'd':
 
433
                        debug_flags_set(optarg);
 
434
                        break;
 
435
                case 't':
 
436
                        idle_timeout = string_time_parse(optarg);
 
437
                        break;
 
438
                case 'o':
 
439
                        debug_config_file(optarg);
 
440
                        break;
 
441
                case 'N':
 
442
                        list_push_tail(preferred_masters, strdup(optarg));
 
443
                        break;
 
444
                case 'v':
 
445
                        show_version(argv[0]);
 
446
                        return 0;
 
447
                case 'w':
 
448
                        w = string_metric_parse(optarg);
 
449
                        link_window_set(w, w);
 
450
                        break;
 
451
                case 'h':
 
452
                default:
 
453
                        show_help(argv[0]);
 
454
                        return 1;
 
455
                }
 
456
        }
 
457
 
 
458
        if(!auto_worker) {
 
459
                if((argc - optind) != 2) {
 
460
                        show_help(argv[0]);
 
461
                        exit(1);
 
462
                }
 
463
                host = argv[optind];
 
464
                port = atoi(argv[optind + 1]);
 
465
 
 
466
                if(!domain_name_cache_lookup(host, addr)) {
 
467
                        fprintf(stderr, "couldn't lookup address of host %s\n", host);
 
468
                        exit(1);
 
469
                }
 
470
        }
 
471
 
 
472
 
 
473
        preferred_master_names[0] = 0;
 
474
        list_first_item(preferred_masters);
 
475
        while((pm = (char *) list_next_item(preferred_masters))) {
 
476
                sprintf(&(preferred_master_names[strlen(preferred_master_names)]), "%s ", pm);
 
477
        }
 
478
 
 
479
        if(auto_worker && exclusive_worker && !list_size(preferred_masters)) {
 
480
                fprintf(stderr, "Worker is running under exclusive mode. But no preferred master is specified.\n");
 
481
                fprintf(stderr, "Please specify the preferred master names with -N option or add -s option to allow the worker to work for any available masters.\n");
 
482
                exit(1);
 
483
        }
 
484
 
 
485
 
 
486
        signal(SIGTERM, handle_abort);
 
487
        signal(SIGQUIT, handle_abort);
 
488
        signal(SIGINT, handle_abort);
 
489
 
 
490
        const char *workdir;
 
491
 
 
492
        if(getenv("_CONDOR_SCRATCH_DIR")) {
 
493
                workdir = getenv("_CONDOR_SCRATCH_DIR");
 
494
        } else {
 
495
                workdir = "/tmp";
 
496
        }
 
497
 
 
498
        char tempdir[WORK_QUEUE_LINE_MAX];
 
499
        sprintf(tempdir, "%s/worker-%d-%d", workdir, (int) getuid(), (int) getpid());
 
500
 
 
501
        printf("worker: working in %s\n", tempdir);
 
502
        mkdir(tempdir, 0700);
 
503
        chdir(tempdir);
 
504
 
 
505
        domain_name_cache_guess(hostname);
 
506
 
 
507
        time_t idle_stoptime = time(0) + idle_timeout;
 
508
        time_t switch_master_time = time(0) + master_timeout;
 
509
 
 
510
        bad_masters = hash_cache_create(127, hash_string, (hash_cache_cleanup_t) free);
 
511
 
 
512
        while(!abort_flag) {
 
513
                char line[WORK_QUEUE_LINE_MAX];
 
514
                int result, mode, fd;
 
515
                INT64_T length;
 
516
                char filename[WORK_QUEUE_LINE_MAX];
 
517
                char path[WORK_QUEUE_LINE_MAX];
 
518
                char *buffer;
 
519
                FILE *stream;
 
520
 
 
521
                if(time(0) > idle_stoptime) {
 
522
                        if(master) {
 
523
                                printf("worker: gave up after waiting %ds to receive a task.\n", idle_timeout);
 
524
                        } else {
 
525
                                if(auto_worker) {
 
526
                                        printf("worker: gave up after waiting %ds to connect to all the available masters.\n", idle_timeout);
 
527
                                } else {
 
528
                                        printf("worker: gave up after waiting %ds to connect to %s port %d.\n", idle_timeout, host, port);
 
529
                                }
 
530
                        }
 
531
                        break;
 
532
                }
 
533
 
 
534
                switch_master_time = time(0) + master_timeout;
 
535
                if(!master) {
 
536
                        if(auto_worker) {
 
537
                                master = auto_link_connect(actual_addr, &actual_port, switch_master_time);
 
538
                        } else {
 
539
                                master = link_connect(addr, port, idle_stoptime);
 
540
                        }
 
541
                        if(!master) {
 
542
                                sleep(5);
 
543
                                continue;
 
544
                        }
 
545
 
 
546
                        link_tune(master, LINK_TUNE_INTERACTIVE);
 
547
 
 
548
 
 
549
                        if(exclusive_worker) {
 
550
                                link_putfstring(master, "ready %s %d %llu %llu %llu %llu \"%s\"\n", time(0) + active_timeout, hostname, ncpus, memory_avail, memory_total, disk_avail, disk_total, preferred_master_names);
 
551
                        } else {
 
552
                                link_putfstring(master, "ready %s %d %llu %llu %llu %llu\n", time(0) + active_timeout, hostname, ncpus, memory_avail, memory_total, disk_avail, disk_total);
 
553
                        }
 
554
                }
 
555
 
 
556
                if(link_readline(master, line, sizeof(line), time(0) + active_timeout)) {
 
557
                        debug(D_WQ, "%s", line);
 
558
                        if(sscanf(line, "work %lld", &length)) {
 
559
                                buffer = malloc(length + 10);
 
560
                                link_read(master, buffer, length, time(0) + active_timeout);
 
561
                                buffer[length] = 0;
 
562
                                strcat(buffer, " 2>&1");
 
563
                                debug(D_WQ, "%s", buffer);
 
564
                                stream = popen(buffer, "r");
 
565
                                free(buffer);
 
566
                                if(stream) {
 
567
                                        length = copy_stream_to_buffer(stream, &buffer);
 
568
                                        if(length < 0)
 
569
                                                length = 0;
 
570
                                        result = pclose(stream);
 
571
                                } else {
 
572
                                        length = 0;
 
573
                                        result = -1;
 
574
                                        buffer = 0;
 
575
                                }
 
576
                                debug(D_WQ, "result %d %lld", result, length);
 
577
                                link_putfstring(master, "result %d %lld\n", time(0) + active_timeout, result, length);
 
578
                                link_putlstring(master, buffer, length, time(0) + active_timeout);
 
579
                                if(buffer)
 
580
                                        free(buffer);
 
581
                        } else if(sscanf(line, "stat %s", filename) == 1) {
 
582
                                struct stat st;
 
583
                                if(!stat(filename, &st)) {
 
584
                                        debug(D_WQ, "result 1 %lu %lu", (unsigned long int) st.st_size, (unsigned long int) st.st_mtime);
 
585
                                        link_putfstring(master, "result 1 %lu %lu\n", time(0) + active_timeout, (unsigned long int) st.st_size, (unsigned long int) st.st_mtime);
 
586
                                } else {
 
587
                                        debug(D_WQ, "result 0 0 0");
 
588
                                        link_putliteral(master, "result 0 0 0\n", time(0) + active_timeout);
 
589
                                }
 
590
                        } else if(sscanf(line, "symlink %s %s", path, filename) == 2) {
 
591
                                char *cur_pos, *tmp_pos;
 
592
 
 
593
                                cur_pos = filename;
 
594
 
 
595
                                if(!strncmp(cur_pos, "./", 2)) {
 
596
                                        cur_pos += 2;
 
597
                                }
 
598
 
 
599
                                tmp_pos = strrchr(cur_pos, '/');
 
600
                                if(tmp_pos) {
 
601
                                        *tmp_pos = '\0';
 
602
                                        if(!create_dir(cur_pos, mode | 0700)) {
 
603
                                                debug(D_WQ, "Could not create directory - %s (%s)\n", cur_pos, strerror(errno));
 
604
                                                goto recover;
 
605
                                        }
 
606
                                        *tmp_pos = '/';
 
607
                                }
 
608
                                symlink(path, filename);
 
609
                        } else if(sscanf(line, "put %s %lld %o", filename, &length, &mode) == 3) {
 
610
                                mode = mode | 0600;
 
611
                                char *cur_pos, *tmp_pos;
 
612
 
 
613
                                cur_pos = filename;
 
614
 
 
615
                                if(!strncmp(cur_pos, "./", 2)) {
 
616
                                        cur_pos += 2;
 
617
                                }
 
618
 
 
619
                                tmp_pos = strrchr(cur_pos, '/');
 
620
                                if(tmp_pos) {
 
621
                                        *tmp_pos = '\0';
 
622
                                        if(!create_dir(cur_pos, mode | 0700)) {
 
623
                                                debug(D_WQ, "Could not create directory - %s (%s)\n", cur_pos, strerror(errno));
 
624
                                                goto recover;
 
625
                                        }
 
626
                                        *tmp_pos = '/';
 
627
                                }
 
628
 
 
629
                                fd = open(filename, O_WRONLY | O_CREAT | O_TRUNC, mode);
 
630
                                if(fd < 0)
 
631
                                        goto recover;
 
632
                                INT64_T actual = link_stream_to_fd(master, fd, length, time(0) + active_timeout);
 
633
                                close(fd);
 
634
                                if(actual != length)
 
635
                                        goto recover;
 
636
                        } else if(sscanf(line, "unlink %s", path) == 1) {
 
637
                                result = remove(path);
 
638
                                if(result != 0) {       // 0 - succeeded; otherwise, failed
 
639
                                        fprintf(stderr, "Could not remove file: %s.(%s)\n", path, strerror(errno));
 
640
                                        goto recover;
 
641
                                }
 
642
                        } else if(sscanf(line, "mkdir %s %o", filename, &mode) == 2) {
 
643
                                if(!create_dir(filename, mode | 0700)) {
 
644
                                        debug(D_WQ, "Could not create directory - %s (%s)\n", filename, strerror(errno));
 
645
                                        goto recover;
 
646
                                }
 
647
                        } else if(sscanf(line, "rget %s", filename) == 1) {
 
648
                                stream_output_item(master, filename);
 
649
                                link_putliteral(master, "end\n", time(0) + active_timeout);
 
650
                        } else if(sscanf(line, "get %s", filename) == 1) {      // for backward compatibility
 
651
                                struct stat info;
 
652
                                if(stat(filename, &info) != 0) {
 
653
                                        fprintf(stderr, "Output file %s was not created. (%s)\n", filename, strerror(errno));
 
654
                                        goto recover;
 
655
                                }
 
656
                                // send back a single file
 
657
                                fd = open(filename, O_RDONLY, 0);
 
658
                                if(fd >= 0) {
 
659
                                        length = (INT64_T) info.st_size;
 
660
                                        link_putfstring(master, "%lld\n", time(0) + active_timeout, length);
 
661
                                        INT64_T actual = link_stream_from_fd(master, fd, length, time(0) + active_timeout);
 
662
                                        close(fd);
 
663
                                        if(actual != length) {
 
664
                                                debug(D_WQ, "Sending back output file - %s failed: bytes to send = %lld and bytes actually sent = %lld.\nEntering recovery process now ...\n", filename, length, actual);
 
665
                                                goto recover;
 
666
                                        }
 
667
                                } else {
 
668
                                        fprintf(stderr, "Could not open output file %s. (%s)\n", filename, strerror(errno));
 
669
                                        goto recover;
 
670
                                }
 
671
                        } else if(!strcmp(line, "exit")) {
 
672
                                break;
 
673
                        } else {
 
674
                                link_putliteral(master, "error\n", time(0) + active_timeout);
 
675
                        }
 
676
 
 
677
                        idle_stoptime = time(0) + idle_timeout;
 
678
 
 
679
                } else {
 
680
                      recover:
 
681
                        link_close(master);
 
682
                        master = 0;
 
683
                        if(auto_worker) {
 
684
                                record_bad_master(duplicate_wq_master(actual_master));
 
685
                        }
 
686
                        sleep(5);
 
687
                }
 
688
        }
 
689
 
 
690
        char deletecmd[WORK_QUEUE_LINE_MAX];
 
691
        printf("worker: cleaning up %s\n", tempdir);
 
692
        sprintf(deletecmd, "rm -rf %s", tempdir);
 
693
        system(deletecmd);
 
694
 
 
695
        return 0;
 
696
}