2
Copyright (C) 2008- The University of Notre Dame
3
This software is distributed under the GNU General Public License.
4
See the file COPYING for details.
7
#include "work_queue.h"
9
#include "catalog_query.h"
10
#include "catalog_server.h"
12
#include "domain_name_cache.h"
14
#include "copy_stream.h"
15
#include "memory_info.h"
16
#include "disk_info.h"
17
#include "hash_cache.h"
22
#include "stringtools.h"
23
#include "load_average.h"
24
#include "domain_name_cache.h"
27
#include "create_dir.h"
40
#include <sys/types.h>
43
#include <sys/signal.h>
46
// Maximum time to wait before aborting if there is no connection to the master.
47
static int idle_timeout = 900;
49
// Maxium time to wait before switching to another master.
50
static int master_timeout = 60;
52
// Maximum time to wait when actively communicating with the master.
53
static int active_timeout = 3600;
55
// Flag gets set on receipt of a terminal signal.
56
static int abort_flag = 0;
58
// Catalog mode control variables
59
static int auto_worker = 0;
60
static int exclusive_worker = 1;
61
static const int non_preference_priority_max = 100;
62
static char *catalog_server_host = NULL;
63
static int catalog_server_port = 0;
64
static struct wq_master *actual_master = NULL;
66
static struct list *preferred_masters = NULL;
67
static struct hash_cache *bad_masters = NULL;
70
char addr[LINK_ADDRESS_MAX];
72
char proj[WORK_QUEUE_NAME_MAX];
76
void debug_print_masters(struct list *ml)
81
debug(D_WQ, "All available Masters:\n");
83
while((m = (struct wq_master *) list_next_item(ml))) {
84
debug(D_WQ, "Master %d:\n", ++count);
85
debug(D_WQ, "addr:\t%s\n", m->addr);
86
debug(D_WQ, "port:\t%d\n", m->port);
87
debug(D_WQ, "project:\t%s\n", m->proj);
88
debug(D_WQ, "priority:\t%d\n", m->priority);
94
static void make_hash_key(const char *addr, int port, char *key)
96
sprintf(key, "%s:%d", addr, port);
99
int parse_catalog_server_description(char *server_string, char **host, int *port)
103
colon = strchr(server_string, ':');
113
*host = strdup(server_string);
114
*port = atoi(colon + 1);
119
struct wq_master *parse_wq_master_nvpair(struct nvpair *nv)
123
m = xxmalloc(sizeof(struct wq_master));
125
strncpy(m->addr, nvpair_lookup_string(nv, "address"), LINK_ADDRESS_MAX);
126
strncpy(m->proj, nvpair_lookup_string(nv, "project"), WORK_QUEUE_NAME_MAX);
127
m->port = nvpair_lookup_integer(nv, "port");
128
m->priority = nvpair_lookup_integer(nv, "priority");
135
struct wq_master *duplicate_wq_master(struct wq_master *master)
139
m = xxmalloc(sizeof(struct wq_master));
140
strncpy(m->addr, master->addr, LINK_ADDRESS_MAX);
141
strncpy(m->proj, master->proj, WORK_QUEUE_NAME_MAX);
142
m->port = master->port;
143
m->priority = master->priority;
151
* Reasons for a master being bad:
152
* 1. The master does not need more workers right now;
153
* 2. The master is already shut down but its record is still in the catalog server.
155
static void record_bad_master(struct wq_master *m)
157
char key[LINK_ADDRESS_MAX + 10]; // addr:port
163
make_hash_key(m->addr, m->port, key);
164
hash_cache_insert(bad_masters, key, m, lifetime);
165
debug(D_WQ, "Master at %s:%d is not receiving more workers.\nWon't connect to this master in %d seconds.", m->addr, m->port, lifetime);
168
struct list *get_work_queue_masters(const char *catalog_host, int catalog_port)
170
struct catalog_query *q;
175
time_t timeout = 60, stoptime;
176
char key[LINK_ADDRESS_MAX + 10]; // addr:port
178
stoptime = time(0) + timeout;
180
q = catalog_query_create(catalog_host, catalog_port, stoptime);
182
fprintf(stderr, "Failed to query catalog server at %s:%d\n", catalog_host, catalog_port);
190
while((nv = catalog_query_read(q, stoptime))) {
191
if(strcmp(nvpair_lookup_string(nv, "type"), CATALOG_TYPE_WORK_QUEUE_MASTER) == 0) {
192
m = parse_wq_master_nvpair(nv);
194
list_first_item(preferred_masters);
195
while((pm = (char *) list_next_item(preferred_masters))) {
196
if(whole_string_match_regex(m->proj, pm)) {
197
// preferred master found
203
// This is a preferred master
204
m->priority += non_preference_priority_max;
206
// Master name does not match any preferred master names
207
if(exclusive_worker) {
210
m->priority = non_preference_priority_max < m->priority ? non_preference_priority_max : m->priority;
214
// exclude 'bad' masters
215
make_hash_key(m->addr, m->port, key);
216
if(!hash_cache_lookup(bad_masters, key)) {
217
list_push_priority(ml, m, m->priority);
223
// Must delete the query otherwise it would occupy 1 tcp connection forever!
224
catalog_query_delete(q);
228
struct link *auto_link_connect(char *addr, int *port, time_t master_stoptime)
230
struct link *master = 0;
234
ml = get_work_queue_masters(catalog_server_host, catalog_server_port);
237
debug_print_masters(ml);
240
while((m = (struct wq_master *) list_next_item(ml))) {
241
master = link_connect(m->addr, m->port, master_stoptime);
243
debug(D_WQ, "Talking to the Master at:\n");
244
debug(D_WQ, "addr:\t%s\n", m->addr);
245
debug(D_WQ, "port:\t%d\n", m->port);
246
debug(D_WQ, "project:\t%s\n", m->proj);
247
debug(D_WQ, "priority:\t%d\n", m->priority);
250
strncpy(addr, m->addr, LINK_ADDRESS_MAX);
255
actual_master = duplicate_wq_master(m);
259
record_bad_master(duplicate_wq_master(m));
270
* Stream file/directory contents for the rget protocol.
272
* for a directory: a new line in the format of "dir $DIR_NAME 0"
273
* for a file: a new line in the format of "file $FILE_NAME $FILE_LENGTH"
274
* then file contents.
275
* string "end" at the end of the stream (on a new line).
278
* Assume we have the following directory structure:
287
* The stream contents would be:
290
* file 1.txt $file_len
291
* $$ FILE 1.txt's CONTENTS $$
292
* file 2.txt $file_len
293
* $$ FILE 2.txt's CONTENTS $$
295
* file mysubdir/a.txt $file_len
296
* $$ FILE mysubdir/a.txt's CONTENTS $$
297
* file mysubdir/b.txt $file_len
298
* $$ FILE mysubdir/b.txt's CONTENTS $$
299
* file z.jpg $file_len
300
* $$ FILE z.jpg's CONTENTS $$
304
int stream_output_item(struct link *master, const char *filename)
308
char dentline[WORK_QUEUE_LINE_MAX];
310
INT64_T actual, length;
313
if(stat(filename, &info) != 0) {
317
if(S_ISDIR(info.st_mode)) {
318
// stream a directory
319
dir = opendir(filename);
323
link_putfstring(master, "dir %s %lld\n", time(0) + active_timeout, filename, (INT64_T) 0);
325
while((dent = readdir(dir))) {
326
if(!strcmp(dent->d_name, ".") || !strcmp(dent->d_name, ".."))
328
sprintf(dentline, "%s/%s", filename, dent->d_name);
329
stream_output_item(master, dentline);
335
fd = open(filename, O_RDONLY, 0);
337
length = (INT64_T) info.st_size;
338
link_putfstring(master, "file %s %lld\n", time(0) + active_timeout, filename, length);
339
actual = link_stream_from_fd(master, fd, length, time(0) + active_timeout);
341
if(actual != length) {
342
debug(D_WQ, "Sending back output file - %s failed: bytes to send = %lld and bytes actually sent = %lld.", filename, length, actual);
353
fprintf(stderr, "Failed to transfer ouput item - %s. (%s)\n", filename, strerror(errno));
354
link_putfstring(master, "missing %s %d\n", time(0) + active_timeout, filename, errno);
358
static void handle_abort(int sig)
363
static void show_version(const char *cmd)
365
printf("%s version %d.%d.%d built by %s@%s on %s at %s\n", cmd, CCTOOLS_VERSION_MAJOR, CCTOOLS_VERSION_MINOR, CCTOOLS_VERSION_MICRO, BUILD_USER, BUILD_HOST, __DATE__, __TIME__);
368
static void show_help(const char *cmd)
370
printf("Use: %s <masterhost> <port>\n", cmd);
371
printf("where options are:\n");
372
printf(" -a Enable auto mode. In this mode the worker would ask a catalog server for available masters.\n");
373
printf(" -C <catalog> Set catalog server to <catalog>. Format: HOSTNAME:PORT \n");
374
printf(" -d <subsystem> Enable debugging for this subsystem.\n");
375
printf(" -N <project> Name of a preferred project. A worker can have multiple preferred projects.\n");
376
printf(" -s Run as a shared worker. By default the worker would only work on preferred projects.\n");
377
printf(" -t <time> Abort after this amount of idle time. (default=%ds)\n", idle_timeout);
378
printf(" -o <file> Send debugging to this file.\n");
379
printf(" -v Show version string\n");
380
printf(" -w <size> Set TCP window size.\n");
381
printf(" -h Show this help screen\n");
384
int main(int argc, char *argv[])
386
const char *host = NULL;
388
int port = WORK_QUEUE_DEFAULT_PORT;
389
char actual_addr[LINK_ADDRESS_MAX];
391
struct link *master = 0;
392
char addr[LINK_ADDRESS_MAX];
393
UINT64_T memory_avail, memory_total;
394
UINT64_T disk_avail, disk_total;
397
char hostname[DOMAIN_NAME_MAX];
399
char preferred_master_names[WORK_QUEUE_LINE_MAX];
401
ncpus = load_average_get_cpus();
402
memory_info_get(&memory_avail, &memory_total);
403
disk_info_get(".", &disk_avail, &disk_total);
405
preferred_masters = list_create();
406
if(!preferred_masters) {
407
fprintf(stderr, "Cannot allocate memory to store preferred work queue masters names.\n");
412
debug_config(argv[0]);
414
while((c = getopt(argc, argv, "aC:d:ihN:o:st:w:v")) != (char) -1) {
420
port = parse_catalog_server_description(optarg, &catalog_server_host, &catalog_server_port);
422
fprintf(stderr, "The provided catalog server is invalid. The format of the '-s' option should be '-s HOSTNAME:PORT'.\n");
429
// This is a shared worker
430
exclusive_worker = 0;
433
debug_flags_set(optarg);
436
idle_timeout = string_time_parse(optarg);
439
debug_config_file(optarg);
442
list_push_tail(preferred_masters, strdup(optarg));
445
show_version(argv[0]);
448
w = string_metric_parse(optarg);
449
link_window_set(w, w);
459
if((argc - optind) != 2) {
464
port = atoi(argv[optind + 1]);
466
if(!domain_name_cache_lookup(host, addr)) {
467
fprintf(stderr, "couldn't lookup address of host %s\n", host);
473
preferred_master_names[0] = 0;
474
list_first_item(preferred_masters);
475
while((pm = (char *) list_next_item(preferred_masters))) {
476
sprintf(&(preferred_master_names[strlen(preferred_master_names)]), "%s ", pm);
479
if(auto_worker && exclusive_worker && !list_size(preferred_masters)) {
480
fprintf(stderr, "Worker is running under exclusive mode. But no preferred master is specified.\n");
481
fprintf(stderr, "Please specify the preferred master names with -N option or add -s option to allow the worker to work for any available masters.\n");
486
signal(SIGTERM, handle_abort);
487
signal(SIGQUIT, handle_abort);
488
signal(SIGINT, handle_abort);
492
if(getenv("_CONDOR_SCRATCH_DIR")) {
493
workdir = getenv("_CONDOR_SCRATCH_DIR");
498
char tempdir[WORK_QUEUE_LINE_MAX];
499
sprintf(tempdir, "%s/worker-%d-%d", workdir, (int) getuid(), (int) getpid());
501
printf("worker: working in %s\n", tempdir);
502
mkdir(tempdir, 0700);
505
domain_name_cache_guess(hostname);
507
time_t idle_stoptime = time(0) + idle_timeout;
508
time_t switch_master_time = time(0) + master_timeout;
510
bad_masters = hash_cache_create(127, hash_string, (hash_cache_cleanup_t) free);
513
char line[WORK_QUEUE_LINE_MAX];
514
int result, mode, fd;
516
char filename[WORK_QUEUE_LINE_MAX];
517
char path[WORK_QUEUE_LINE_MAX];
521
if(time(0) > idle_stoptime) {
523
printf("worker: gave up after waiting %ds to receive a task.\n", idle_timeout);
526
printf("worker: gave up after waiting %ds to connect to all the available masters.\n", idle_timeout);
528
printf("worker: gave up after waiting %ds to connect to %s port %d.\n", idle_timeout, host, port);
534
switch_master_time = time(0) + master_timeout;
537
master = auto_link_connect(actual_addr, &actual_port, switch_master_time);
539
master = link_connect(addr, port, idle_stoptime);
546
link_tune(master, LINK_TUNE_INTERACTIVE);
549
if(exclusive_worker) {
550
link_putfstring(master, "ready %s %d %llu %llu %llu %llu \"%s\"\n", time(0) + active_timeout, hostname, ncpus, memory_avail, memory_total, disk_avail, disk_total, preferred_master_names);
552
link_putfstring(master, "ready %s %d %llu %llu %llu %llu\n", time(0) + active_timeout, hostname, ncpus, memory_avail, memory_total, disk_avail, disk_total);
556
if(link_readline(master, line, sizeof(line), time(0) + active_timeout)) {
557
debug(D_WQ, "%s", line);
558
if(sscanf(line, "work %lld", &length)) {
559
buffer = malloc(length + 10);
560
link_read(master, buffer, length, time(0) + active_timeout);
562
strcat(buffer, " 2>&1");
563
debug(D_WQ, "%s", buffer);
564
stream = popen(buffer, "r");
567
length = copy_stream_to_buffer(stream, &buffer);
570
result = pclose(stream);
576
debug(D_WQ, "result %d %lld", result, length);
577
link_putfstring(master, "result %d %lld\n", time(0) + active_timeout, result, length);
578
link_putlstring(master, buffer, length, time(0) + active_timeout);
581
} else if(sscanf(line, "stat %s", filename) == 1) {
583
if(!stat(filename, &st)) {
584
debug(D_WQ, "result 1 %lu %lu", (unsigned long int) st.st_size, (unsigned long int) st.st_mtime);
585
link_putfstring(master, "result 1 %lu %lu\n", time(0) + active_timeout, (unsigned long int) st.st_size, (unsigned long int) st.st_mtime);
587
debug(D_WQ, "result 0 0 0");
588
link_putliteral(master, "result 0 0 0\n", time(0) + active_timeout);
590
} else if(sscanf(line, "symlink %s %s", path, filename) == 2) {
591
char *cur_pos, *tmp_pos;
595
if(!strncmp(cur_pos, "./", 2)) {
599
tmp_pos = strrchr(cur_pos, '/');
602
if(!create_dir(cur_pos, mode | 0700)) {
603
debug(D_WQ, "Could not create directory - %s (%s)\n", cur_pos, strerror(errno));
608
symlink(path, filename);
609
} else if(sscanf(line, "put %s %lld %o", filename, &length, &mode) == 3) {
611
char *cur_pos, *tmp_pos;
615
if(!strncmp(cur_pos, "./", 2)) {
619
tmp_pos = strrchr(cur_pos, '/');
622
if(!create_dir(cur_pos, mode | 0700)) {
623
debug(D_WQ, "Could not create directory - %s (%s)\n", cur_pos, strerror(errno));
629
fd = open(filename, O_WRONLY | O_CREAT | O_TRUNC, mode);
632
INT64_T actual = link_stream_to_fd(master, fd, length, time(0) + active_timeout);
636
} else if(sscanf(line, "unlink %s", path) == 1) {
637
result = remove(path);
638
if(result != 0) { // 0 - succeeded; otherwise, failed
639
fprintf(stderr, "Could not remove file: %s.(%s)\n", path, strerror(errno));
642
} else if(sscanf(line, "mkdir %s %o", filename, &mode) == 2) {
643
if(!create_dir(filename, mode | 0700)) {
644
debug(D_WQ, "Could not create directory - %s (%s)\n", filename, strerror(errno));
647
} else if(sscanf(line, "rget %s", filename) == 1) {
648
stream_output_item(master, filename);
649
link_putliteral(master, "end\n", time(0) + active_timeout);
650
} else if(sscanf(line, "get %s", filename) == 1) { // for backward compatibility
652
if(stat(filename, &info) != 0) {
653
fprintf(stderr, "Output file %s was not created. (%s)\n", filename, strerror(errno));
656
// send back a single file
657
fd = open(filename, O_RDONLY, 0);
659
length = (INT64_T) info.st_size;
660
link_putfstring(master, "%lld\n", time(0) + active_timeout, length);
661
INT64_T actual = link_stream_from_fd(master, fd, length, time(0) + active_timeout);
663
if(actual != length) {
664
debug(D_WQ, "Sending back output file - %s failed: bytes to send = %lld and bytes actually sent = %lld.\nEntering recovery process now ...\n", filename, length, actual);
668
fprintf(stderr, "Could not open output file %s. (%s)\n", filename, strerror(errno));
671
} else if(!strcmp(line, "exit")) {
674
link_putliteral(master, "error\n", time(0) + active_timeout);
677
idle_stoptime = time(0) + idle_timeout;
684
record_bad_master(duplicate_wq_master(actual_master));
690
char deletecmd[WORK_QUEUE_LINE_MAX];
691
printf("worker: cleaning up %s\n", tempdir);
692
sprintf(deletecmd, "rm -rf %s", tempdir);