~ubuntu-branches/ubuntu/vivid/cctools/vivid

« back to all changes in this revision

Viewing changes to dttools/src/catalog_server.c

  • Committer: Bazaar Package Importer
  • Author(s): Michael Hanke
  • Date: 2011-05-07 09:05:00 UTC
  • Revision ID: james.westby@ubuntu.com-20110507090500-lqpmdtwndor6e7os
Tags: upstream-3.3.2
ImportĀ upstreamĀ versionĀ 3.3.2

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*
 
2
Copyright (C) 2003-2004 Douglas Thain and the University of Wisconsin
 
3
Copyright (C) 2005- The University of Notre Dame
 
4
This software is distributed under the GNU General Public License.
 
5
See the file COPYING for details.
 
6
*/
 
7
 
 
8
#include "catalog_server.h"
 
9
#include "datagram.h"
 
10
#include "link.h"
 
11
#include "hash_cache.h"
 
12
#include "debug.h"
 
13
#include "getopt.h"
 
14
#include "nvpair.h"
 
15
#include "stringtools.h"
 
16
#include "domain_name_cache.h"
 
17
#include "username.h"
 
18
#include "list.h"
 
19
#include "xmalloc.h"
 
20
#include "macros.h"
 
21
 
 
22
#include <stdlib.h>
 
23
#include <stdio.h>
 
24
#include <string.h>
 
25
#ifdef HAS_ALLOCA_H
 
26
#include <alloca.h>
 
27
#endif
 
28
#include <errno.h>
 
29
#include <signal.h>
 
30
#include <unistd.h>
 
31
#include <sys/wait.h>
 
32
#include <sys/select.h>
 
33
 
 
34
#ifndef LINE_MAX
 
35
#define LINE_MAX 1024
 
36
#endif
 
37
 
 
38
#define MAX_TABLE_SIZE 10000
 
39
 
 
40
/* The table of nvpairs, hashed on address:port */
 
41
static struct hash_cache *table = 0;
 
42
 
 
43
/* An array of nvpais used to sort for display */
 
44
static struct nvpair *array[MAX_TABLE_SIZE];
 
45
 
 
46
/* The time for which updated data lives before automatic deletion */
 
47
static int lifetime = 1800;
 
48
 
 
49
/* The port upon which to listen. */
 
50
static int port = CATALOG_PORT_DEFAULT;
 
51
 
 
52
/* This machine's canonical name. */
 
53
static char hostname[DOMAIN_NAME_MAX];
 
54
 
 
55
/* This process's owner */
 
56
static char owner[USERNAME_MAX];
 
57
 
 
58
/* Time when the process was started. */
 
59
static time_t starttime;
 
60
 
 
61
/* If true, for for every query */
 
62
static int fork_mode = 1;
 
63
 
 
64
/* The maximum size of a server that will actually be believed. */
 
65
static INT64_T max_server_size=0;
 
66
 
 
67
/* Settings for the master catalog that we will report *to* */
 
68
static int outgoing_alarm = 0;
 
69
static int outgoing_timeout = 300;
 
70
static struct list *outgoing_host_list;
 
71
 
 
72
struct datagram *update_dgram = 0;
 
73
struct datagram *outgoing_dgram = 0;
 
74
 
 
75
void shutdown_clean(int sig)
 
76
{
 
77
        exit(0);
 
78
}
 
79
 
 
80
void ignore_signal(int sig)
 
81
{
 
82
}
 
83
 
 
84
void reap_child(int sig)
 
85
{
 
86
        pid_t pid;
 
87
        int status;
 
88
 
 
89
        do {
 
90
                pid = waitpid(-1, &status, WNOHANG);
 
91
        } while(pid > 0);
 
92
}
 
93
 
 
94
static void install_handler(int sig, void (*handler) (int sig))
 
95
{
 
96
        struct sigaction s;
 
97
        s.sa_handler = handler;
 
98
        sigfillset(&s.sa_mask);
 
99
        s.sa_flags = 0;
 
100
        sigaction(sig, &s, 0);
 
101
}
 
102
 
 
103
int compare_nvpair( const void *a, const void *b )
 
104
{
 
105
        struct nvpair **pa = (struct nvpair **)a;
 
106
        struct nvpair **pb = (struct nvpair **)b;
 
107
 
 
108
        const char *sa = nvpair_lookup_string(*pa,"name");
 
109
        const char *sb = nvpair_lookup_string(*pb,"name");
 
110
 
 
111
        if(!sa) sa = "unknown";
 
112
        if(!sb) sb = "unknown";
 
113
 
 
114
        return strcasecmp(sa,sb);
 
115
}
 
116
 
 
117
int update_one_catalog(void *outgoing_host, const void *text)
 
118
{
 
119
        char addr[DATAGRAM_ADDRESS_MAX];
 
120
        if(domain_name_cache_lookup(outgoing_host, addr)) {
 
121
                debug(D_DEBUG, "sending update to %s:%d", outgoing_host, CATALOG_PORT);
 
122
                datagram_send(outgoing_dgram, text, strlen(text), addr, CATALOG_PORT);
 
123
        }
 
124
        return 1;
 
125
}
 
126
 
 
127
static void update_all_catalogs(struct datagram *outgoing_dgram)
 
128
{
 
129
        char text[DATAGRAM_PAYLOAD_MAX];
 
130
        unsigned uptime;
 
131
        int length;
 
132
 
 
133
        uptime = time(0) - starttime;
 
134
 
 
135
        length = sprintf(text, "type catalog\nversion %d.%d.%d\nurl http://%s:%d\nname %s\nowner %s\nuptime %u\nport %d\n", CCTOOLS_VERSION_MAJOR, CCTOOLS_VERSION_MINOR, CCTOOLS_VERSION_MICRO, hostname, port, hostname, owner, uptime, port);
 
136
 
 
137
        list_iterate(outgoing_host_list, update_one_catalog, text);
 
138
}
 
139
 
 
140
static void make_hash_key(struct nvpair *nv, char *key)
 
141
{
 
142
        const char *name, *addr;
 
143
        int port;
 
144
 
 
145
        addr = nvpair_lookup_string(nv, "address");
 
146
        if(!addr)
 
147
                addr = "unknown";
 
148
 
 
149
        port = nvpair_lookup_integer(nv, "port");
 
150
 
 
151
        name = nvpair_lookup_string(nv, "name");
 
152
        if(!name)
 
153
                name = "unknown";
 
154
 
 
155
        sprintf(key, "%s:%d:%s", addr, port, name);
 
156
}
 
157
 
 
158
static void handle_updates(struct datagram *update_port)
 
159
{
 
160
        char data[DATAGRAM_PAYLOAD_MAX * 2];
 
161
        char addr[DATAGRAM_ADDRESS_MAX];
 
162
        char key[LINE_MAX];
 
163
        int port;
 
164
        int result;
 
165
        int timeout;
 
166
        struct nvpair *nv;
 
167
 
 
168
        while(1) {
 
169
                result = datagram_recv(update_port, data, DATAGRAM_PAYLOAD_MAX, addr, &port, 0);
 
170
                if(result <= 0)
 
171
                        return;
 
172
 
 
173
                data[result] = 0;
 
174
 
 
175
                nv = nvpair_create();
 
176
                nvpair_parse(nv, data);
 
177
 
 
178
                nvpair_insert_string(nv, "address", addr);
 
179
                nvpair_insert_integer(nv, "lastheardfrom", time(0));
 
180
 
 
181
                /* If the server reports unbelievable numbers, simply reset them */
 
182
 
 
183
                if(max_server_size>0) {
 
184
                        INT64_T total = nvpair_lookup_integer(nv,"total");
 
185
                        INT64_T avail = nvpair_lookup_integer(nv,"avail");
 
186
 
 
187
                        if( total>max_server_size || avail>max_server_size ) {
 
188
                                nvpair_insert_integer(nv,"total",0);
 
189
                                nvpair_insert_integer(nv,"avail",0);
 
190
                        }
 
191
                }
 
192
 
 
193
                /* Do not believe the server's reported name, just resolve it backwards. */
 
194
 
 
195
                char name[DOMAIN_NAME_MAX];
 
196
                if(domain_name_cache_lookup_reverse(addr,name)) {
 
197
                        nvpair_insert_string(nv,"name",name);
 
198
                } else {
 
199
                        nvpair_insert_string(nv,"name",addr);
 
200
                }
 
201
 
 
202
                timeout = nvpair_lookup_integer(nv,"lifetime");
 
203
                if (!timeout)
 
204
                    timeout = lifetime;
 
205
                timeout = MIN(timeout, lifetime);
 
206
 
 
207
                make_hash_key(nv, key);
 
208
                hash_cache_insert(table, key, nv, timeout);
 
209
 
 
210
                debug(D_DEBUG, "received udp update from %s", key);
 
211
        }
 
212
}
 
213
 
 
214
static struct nvpair_header html_headers[] = {
 
215
        {"type", NVPAIR_MODE_STRING, NVPAIR_ALIGN_LEFT, 0},
 
216
        {"name", NVPAIR_MODE_STRING, NVPAIR_ALIGN_LEFT, 0},
 
217
        {"port", NVPAIR_MODE_INTEGER, NVPAIR_ALIGN_LEFT, 0},
 
218
        {"owner", NVPAIR_MODE_STRING, NVPAIR_ALIGN_LEFT, 0},
 
219
        {"total", NVPAIR_MODE_METRIC, NVPAIR_ALIGN_RIGHT, 0},
 
220
        {"avail", NVPAIR_MODE_METRIC, NVPAIR_ALIGN_RIGHT, 0},
 
221
        {"load5", NVPAIR_MODE_STRING, NVPAIR_ALIGN_RIGHT, 0},
 
222
        {"version", NVPAIR_MODE_STRING, NVPAIR_ALIGN_LEFT, 0},
 
223
        {0,}
 
224
};
 
225
 
 
226
static void handle_query(struct link *query_link)
 
227
{
 
228
        FILE *stream;
 
229
        char line[LINE_MAX];
 
230
        char url[LINE_MAX];
 
231
        char path[LINE_MAX];
 
232
        char action[LINE_MAX];
 
233
        char version[LINE_MAX];
 
234
        char hostport[LINE_MAX];
 
235
        char addr[LINK_ADDRESS_MAX];
 
236
        char key[LINE_MAX];
 
237
        int port;
 
238
        time_t current;
 
239
 
 
240
        char *hkey;
 
241
        struct nvpair *nv;
 
242
        int i,n;
 
243
 
 
244
        link_address_remote(query_link, addr, &port);
 
245
 
 
246
        debug(D_DEBUG, "www query from %s:%d", addr, port);
 
247
 
 
248
        link_nonblocking(query_link, 0);
 
249
        stream = fdopen(link_fd(query_link), "r+");
 
250
        if(!stream) return;
 
251
 
 
252
        if(!fgets(line, sizeof(line), stream)) return;
 
253
        string_chomp(line);
 
254
        if(sscanf(line, "%s %s %s", action, url, version) != 3) return;
 
255
 
 
256
        while(1) {
 
257
                if(!fgets(line, sizeof(line), stream))
 
258
                        return;
 
259
                if(line[0] == '\n' || line[0] == '\r')
 
260
                        break;
 
261
        }
 
262
 
 
263
        current = time(0);
 
264
        fprintf(stream, "HTTP/1.1 200 OK\n");
 
265
        fprintf(stream, "Date: %s", ctime(&current));
 
266
        fprintf(stream, "Server: catalog_server\n");
 
267
        fprintf(stream, "Connection: close\n");
 
268
 
 
269
        if(sscanf(url, "http://%[^/]%s", hostport, path) == 2) {
 
270
                // continue on
 
271
        } else {
 
272
                strcpy(path, url);
 
273
        }
 
274
 
 
275
        /* load the hash table entries into one big array */
 
276
 
 
277
        n=0;
 
278
        hash_cache_firstkey(table);
 
279
        while(hash_cache_nextkey(table, &hkey, (void **) &nv)) {
 
280
                array[n] = nv;
 
281
                n++;
 
282
        }
 
283
 
 
284
        /* sort the array by name before displaying */
 
285
 
 
286
        qsort(array,n,sizeof(struct nvpair *),compare_nvpair);
 
287
 
 
288
        if(!strcmp(path, "/query.text")) {
 
289
                fprintf(stream,"Content-type: text/plain\n\n");
 
290
                for(i=0;i<n;i++) nvpair_print_text(array[i], stream);
 
291
        } else if(!strcmp(path, "/query.oldclassads")) {
 
292
                fprintf(stream,"Content-type: text/plain\n\n");
 
293
                for(i=0;i<n;i++) nvpair_print_old_classads(array[i], stream);
 
294
        } else if(!strcmp(path, "/query.newclassads")) {
 
295
                fprintf(stream,"Content-type: text/plain\n\n");
 
296
                for(i=0;i<n;i++) nvpair_print_new_classads(array[i], stream);
 
297
        } else if(!strcmp(path, "/query.xml")) {
 
298
                fprintf(stream,"Content-type: text/xml\n\n");
 
299
                fprintf(stream, "<?xml version=\"1.0\" standalone=\"yes\"?>\n");
 
300
                fprintf(stream, "<catalog>\n");
 
301
                for(i=0;i<n;i++) nvpair_print_xml(array[i], stream);
 
302
                fprintf(stream, "</catalog>\n");
 
303
        } else if(sscanf(path, "/detail/%s", key) == 1) {
 
304
                struct nvpair *nv;
 
305
                fprintf(stream,"Content-type: text/html\n\n");
 
306
                nv = hash_cache_lookup(table, key);
 
307
                if(nv) {
 
308
                        const char *name = nvpair_lookup_string(nv, "name");
 
309
                        if(!name)
 
310
                                name = "unknown";
 
311
                        fprintf(stream, "<title>%s storage catalog: %s</title>\n", hostname, name);
 
312
                        fprintf(stream, "<center>\n");
 
313
                        fprintf(stream, "<h1>%s storage catalog</h1>\n", hostname);
 
314
                        fprintf(stream, "<h2>%s</h2>\n", name);
 
315
                        fprintf(stream, "<p><a href=/>return to catalog view</a><p>\n");
 
316
                        nvpair_print_html_solo(nv, stream);
 
317
                        fprintf(stream, "</center>\n");
 
318
                } else {
 
319
                        fprintf(stream, "<title>%s storage catalog</title>\n", hostname);
 
320
                        fprintf(stream, "<center>\n");
 
321
                        fprintf(stream, "<h1>%s storage catalog</h1>\n", hostname);
 
322
                        fprintf(stream, "<h2>Unknown Item!</h2>\n");
 
323
                        fprintf(stream, "</center>\n");
 
324
                }
 
325
        } else {
 
326
                char avail_line[LINE_MAX];
 
327
                char total_line[LINE_MAX];
 
328
                INT64_T sum_total = 0;
 
329
                INT64_T sum_avail = 0;
 
330
                INT64_T sum_devices = 0;
 
331
 
 
332
                fprintf(stream, "Content-type: text/html\n\n");
 
333
                fprintf(stream, "<title>%s storage catalog</title>\n", hostname);
 
334
                fprintf(stream, "<center>\n");
 
335
                fprintf(stream, "<h1>%s storage catalog</h1>\n", hostname);
 
336
                fprintf(stream, "<a href=/query.text>text</a> - ");
 
337
                fprintf(stream, "<a href=/query.html>html</a> - ");
 
338
                fprintf(stream, "<a href=/query.xml>xml</a> - ");
 
339
                fprintf(stream, "<a href=/query.oldclassads>oldclassads</a> - ");
 
340
                fprintf(stream, "<a href=/query.newclassads>newclassads</a>");
 
341
                fprintf(stream, "<p>\n");
 
342
 
 
343
                for(i=0;i<n;i++) {
 
344
                        nv = array[i];
 
345
                        sum_total += nvpair_lookup_integer(nv, "total");
 
346
                        sum_avail += nvpair_lookup_integer(nv, "avail");
 
347
                        sum_devices++;
 
348
                }
 
349
 
 
350
                string_metric(sum_avail, -1, avail_line);
 
351
                string_metric(sum_total, -1, total_line);
 
352
                fprintf(stream, "<b>%sB available out of %sB on %d devices</b><p>\n", avail_line, total_line, (int) sum_devices);
 
353
 
 
354
                nvpair_print_html_header(stream, html_headers);
 
355
                for(i=0;i<n;i++) {
 
356
                        nv = array[i];
 
357
                        make_hash_key(nv, key);
 
358
                        sprintf(url, "/detail/%s", key);
 
359
                        nvpair_print_html_with_link(nv, stream, html_headers, "name", url);
 
360
                }
 
361
                nvpair_print_html_footer(stream, html_headers);
 
362
                fprintf(stream, "</center>\n");
 
363
        }
 
364
        fclose(stream);
 
365
}
 
366
 
 
367
static void show_version(const char *cmd)
 
368
{
 
369
        printf("%s version %d.%d.%d built by %s@%s on %s at %s\n", cmd, CCTOOLS_VERSION_MAJOR, CCTOOLS_VERSION_MINOR, CCTOOLS_VERSION_MICRO, BUILD_USER, BUILD_HOST, __DATE__, __TIME__);
 
370
}
 
371
 
 
372
static void show_help(const char *cmd)
 
373
{
 
374
        printf("Use: %s [options]\n", cmd);
 
375
        printf("where options are:\n");
 
376
        printf(" -p <port>      Port number to listen on (default is %d)\n", port);
 
377
        printf(" -l <secs>      Lifetime of data, in seconds (default is %d)\n", lifetime);
 
378
        printf(" -d <subsystem> Enable debugging for this subsystem\n");
 
379
        printf(" -o <file>      Send debugging to this file.\n");
 
380
        printf(" -O <bytes>     Rotate debug file once it reaches this size.\n");
 
381
        printf(" -u <host>      Send status updates to this host. (default is %s)\n", CATALOG_HOST_DEFAULT);
 
382
        printf(" -M <size>      Maximum size of a server to be believed.  (default is any)\n");
 
383
        printf(" -U <time>      Send status updates at this interval. (default is 5m)\n");
 
384
        printf(" -S             Single process mode; do not work on queries.\n");
 
385
        printf(" -v             Show version string\n");
 
386
        printf(" -h             Show this help screen\n");
 
387
}
 
388
 
 
389
int main(int argc, char *argv[])
 
390
{
 
391
        struct link *link, *list_port = 0;
 
392
        char ch;
 
393
        time_t current;
 
394
 
 
395
        outgoing_host_list = list_create();
 
396
 
 
397
        debug_config(argv[0]);
 
398
 
 
399
        while((ch = getopt(argc, argv, "p:l:M:d:o:O:u:U:Shv")) != (char) -1) {
 
400
                switch (ch) {
 
401
                case 'd':
 
402
                        debug_flags_set(optarg);
 
403
                        break;
 
404
                case 'M':
 
405
                        max_server_size = string_metric_parse(optarg);
 
406
                        break;
 
407
                case 'p':
 
408
                        port = atoi(optarg);
 
409
                        break;
 
410
                case 'o':
 
411
                        debug_config_file(optarg);
 
412
                        break;
 
413
                case 'O':
 
414
                        debug_config_file_size(string_metric_parse(optarg));
 
415
                        break;
 
416
                case 'u':
 
417
                        list_push_head(outgoing_host_list, xstrdup(optarg));
 
418
                        break;
 
419
                case 'U':
 
420
                        outgoing_timeout = string_time_parse(optarg);
 
421
                        break;
 
422
                case 'l':
 
423
                        lifetime = string_time_parse(optarg);
 
424
                        break;
 
425
                case 'S':
 
426
                        fork_mode = 0;
 
427
                        break;
 
428
                case 'v':
 
429
                        show_version(argv[0]);
 
430
                        return 0;
 
431
                case 'h':
 
432
                default:
 
433
                        show_help(argv[0]);
 
434
                        return 1;
 
435
                }
 
436
        }
 
437
 
 
438
        current = time(0);
 
439
        debug(D_ALL, "*** %s starting at %s", argv[0], ctime(&current));
 
440
 
 
441
        if(!list_size(outgoing_host_list)) {
 
442
                list_push_head(outgoing_host_list, CATALOG_HOST_DEFAULT);
 
443
        }
 
444
 
 
445
        install_handler(SIGPIPE, ignore_signal);
 
446
        install_handler(SIGHUP, ignore_signal);
 
447
        install_handler(SIGCHLD, reap_child);
 
448
        install_handler(SIGINT, shutdown_clean);
 
449
        install_handler(SIGTERM, shutdown_clean);
 
450
        install_handler(SIGQUIT, shutdown_clean);
 
451
 
 
452
        domain_name_cache_guess(hostname);
 
453
        username_get(owner);
 
454
        starttime = time(0);
 
455
 
 
456
        table = hash_cache_create(127, hash_string, (hash_cache_cleanup_t) nvpair_delete);
 
457
        if(!table)
 
458
                fatal("couldn't make hash table");
 
459
 
 
460
        update_dgram = datagram_create(port);
 
461
        if(!update_dgram)
 
462
                fatal("couldn't listen on udp port %d", port);
 
463
 
 
464
        outgoing_dgram = datagram_create(0);
 
465
        if(!outgoing_dgram)
 
466
                fatal("couldn't create outgoing udp port");
 
467
 
 
468
        list_port = link_serve(port);
 
469
        if(!list_port)
 
470
                fatal("couldn't listen on tcp port %d", port);
 
471
 
 
472
        while(1) {
 
473
                fd_set rfds;
 
474
                int ufd = datagram_fd(update_dgram);
 
475
                int lfd = link_fd(list_port);
 
476
                int result, maxfd;
 
477
                struct timeval timeout;
 
478
 
 
479
                if(time(0) > outgoing_alarm) {
 
480
                        update_all_catalogs(outgoing_dgram);
 
481
                        outgoing_alarm = time(0) + outgoing_timeout;
 
482
                }
 
483
 
 
484
                FD_ZERO(&rfds);
 
485
                FD_SET(ufd,&rfds);
 
486
                FD_SET(lfd,&rfds);
 
487
                maxfd = MAX(ufd,lfd)+1;
 
488
 
 
489
                timeout.tv_sec = 5;
 
490
                timeout.tv_usec = 0;
 
491
 
 
492
                result = select(maxfd,&rfds,0,0,&timeout);
 
493
                if(result<=0) continue;
 
494
 
 
495
                if(FD_ISSET(ufd,&rfds)) {
 
496
                        handle_updates(update_dgram);
 
497
                }
 
498
 
 
499
                if(FD_ISSET(lfd,&rfds)) {
 
500
                        link = link_accept(list_port, time(0) + 5);
 
501
                        if(link) {
 
502
                                if(fork_mode) {
 
503
                                        pid_t pid = fork();
 
504
                                        if(pid == 0) {
 
505
                                                handle_query(link);
 
506
                                                exit(0);
 
507
                                        }
 
508
                                } else {
 
509
                                        handle_query(link);
 
510
                                }
 
511
                                link_close(link);
 
512
                        }
 
513
                }
 
514
        }
 
515
 
 
516
        return 1;
 
517
}