2
Copyright (C) 2003-2004 Douglas Thain and the University of Wisconsin
3
Copyright (C) 2005- The University of Notre Dame
4
This software is distributed under the GNU General Public License.
5
See the file COPYING for details.
8
#include "catalog_server.h"
11
#include "hash_cache.h"
15
#include "stringtools.h"
16
#include "domain_name_cache.h"
32
#include <sys/select.h>
38
#define MAX_TABLE_SIZE 10000
40
/* The table of nvpairs, hashed on address:port */
41
static struct hash_cache *table = 0;
43
/* An array of nvpais used to sort for display */
44
static struct nvpair *array[MAX_TABLE_SIZE];
46
/* The time for which updated data lives before automatic deletion */
47
static int lifetime = 1800;
49
/* The port upon which to listen. */
50
static int port = CATALOG_PORT_DEFAULT;
52
/* This machine's canonical name. */
53
static char hostname[DOMAIN_NAME_MAX];
55
/* This process's owner */
56
static char owner[USERNAME_MAX];
58
/* Time when the process was started. */
59
static time_t starttime;
61
/* If true, for for every query */
62
static int fork_mode = 1;
64
/* The maximum size of a server that will actually be believed. */
65
static INT64_T max_server_size=0;
67
/* Settings for the master catalog that we will report *to* */
68
static int outgoing_alarm = 0;
69
static int outgoing_timeout = 300;
70
static struct list *outgoing_host_list;
72
struct datagram *update_dgram = 0;
73
struct datagram *outgoing_dgram = 0;
75
void shutdown_clean(int sig)
80
void ignore_signal(int sig)
84
void reap_child(int sig)
90
pid = waitpid(-1, &status, WNOHANG);
94
static void install_handler(int sig, void (*handler) (int sig))
97
s.sa_handler = handler;
98
sigfillset(&s.sa_mask);
100
sigaction(sig, &s, 0);
103
int compare_nvpair( const void *a, const void *b )
105
struct nvpair **pa = (struct nvpair **)a;
106
struct nvpair **pb = (struct nvpair **)b;
108
const char *sa = nvpair_lookup_string(*pa,"name");
109
const char *sb = nvpair_lookup_string(*pb,"name");
111
if(!sa) sa = "unknown";
112
if(!sb) sb = "unknown";
114
return strcasecmp(sa,sb);
117
int update_one_catalog(void *outgoing_host, const void *text)
119
char addr[DATAGRAM_ADDRESS_MAX];
120
if(domain_name_cache_lookup(outgoing_host, addr)) {
121
debug(D_DEBUG, "sending update to %s:%d", outgoing_host, CATALOG_PORT);
122
datagram_send(outgoing_dgram, text, strlen(text), addr, CATALOG_PORT);
127
static void update_all_catalogs(struct datagram *outgoing_dgram)
129
char text[DATAGRAM_PAYLOAD_MAX];
133
uptime = time(0) - starttime;
135
length = sprintf(text, "type catalog\nversion %d.%d.%d\nurl http://%s:%d\nname %s\nowner %s\nuptime %u\nport %d\n", CCTOOLS_VERSION_MAJOR, CCTOOLS_VERSION_MINOR, CCTOOLS_VERSION_MICRO, hostname, port, hostname, owner, uptime, port);
137
list_iterate(outgoing_host_list, update_one_catalog, text);
140
static void make_hash_key(struct nvpair *nv, char *key)
142
const char *name, *addr;
145
addr = nvpair_lookup_string(nv, "address");
149
port = nvpair_lookup_integer(nv, "port");
151
name = nvpair_lookup_string(nv, "name");
155
sprintf(key, "%s:%d:%s", addr, port, name);
158
static void handle_updates(struct datagram *update_port)
160
char data[DATAGRAM_PAYLOAD_MAX * 2];
161
char addr[DATAGRAM_ADDRESS_MAX];
169
result = datagram_recv(update_port, data, DATAGRAM_PAYLOAD_MAX, addr, &port, 0);
175
nv = nvpair_create();
176
nvpair_parse(nv, data);
178
nvpair_insert_string(nv, "address", addr);
179
nvpair_insert_integer(nv, "lastheardfrom", time(0));
181
/* If the server reports unbelievable numbers, simply reset them */
183
if(max_server_size>0) {
184
INT64_T total = nvpair_lookup_integer(nv,"total");
185
INT64_T avail = nvpair_lookup_integer(nv,"avail");
187
if( total>max_server_size || avail>max_server_size ) {
188
nvpair_insert_integer(nv,"total",0);
189
nvpair_insert_integer(nv,"avail",0);
193
/* Do not believe the server's reported name, just resolve it backwards. */
195
char name[DOMAIN_NAME_MAX];
196
if(domain_name_cache_lookup_reverse(addr,name)) {
197
nvpair_insert_string(nv,"name",name);
199
nvpair_insert_string(nv,"name",addr);
202
timeout = nvpair_lookup_integer(nv,"lifetime");
205
timeout = MIN(timeout, lifetime);
207
make_hash_key(nv, key);
208
hash_cache_insert(table, key, nv, timeout);
210
debug(D_DEBUG, "received udp update from %s", key);
214
static struct nvpair_header html_headers[] = {
215
{"type", NVPAIR_MODE_STRING, NVPAIR_ALIGN_LEFT, 0},
216
{"name", NVPAIR_MODE_STRING, NVPAIR_ALIGN_LEFT, 0},
217
{"port", NVPAIR_MODE_INTEGER, NVPAIR_ALIGN_LEFT, 0},
218
{"owner", NVPAIR_MODE_STRING, NVPAIR_ALIGN_LEFT, 0},
219
{"total", NVPAIR_MODE_METRIC, NVPAIR_ALIGN_RIGHT, 0},
220
{"avail", NVPAIR_MODE_METRIC, NVPAIR_ALIGN_RIGHT, 0},
221
{"load5", NVPAIR_MODE_STRING, NVPAIR_ALIGN_RIGHT, 0},
222
{"version", NVPAIR_MODE_STRING, NVPAIR_ALIGN_LEFT, 0},
226
static void handle_query(struct link *query_link)
232
char action[LINE_MAX];
233
char version[LINE_MAX];
234
char hostport[LINE_MAX];
235
char addr[LINK_ADDRESS_MAX];
244
link_address_remote(query_link, addr, &port);
246
debug(D_DEBUG, "www query from %s:%d", addr, port);
248
link_nonblocking(query_link, 0);
249
stream = fdopen(link_fd(query_link), "r+");
252
if(!fgets(line, sizeof(line), stream)) return;
254
if(sscanf(line, "%s %s %s", action, url, version) != 3) return;
257
if(!fgets(line, sizeof(line), stream))
259
if(line[0] == '\n' || line[0] == '\r')
264
fprintf(stream, "HTTP/1.1 200 OK\n");
265
fprintf(stream, "Date: %s", ctime(¤t));
266
fprintf(stream, "Server: catalog_server\n");
267
fprintf(stream, "Connection: close\n");
269
if(sscanf(url, "http://%[^/]%s", hostport, path) == 2) {
275
/* load the hash table entries into one big array */
278
hash_cache_firstkey(table);
279
while(hash_cache_nextkey(table, &hkey, (void **) &nv)) {
284
/* sort the array by name before displaying */
286
qsort(array,n,sizeof(struct nvpair *),compare_nvpair);
288
if(!strcmp(path, "/query.text")) {
289
fprintf(stream,"Content-type: text/plain\n\n");
290
for(i=0;i<n;i++) nvpair_print_text(array[i], stream);
291
} else if(!strcmp(path, "/query.oldclassads")) {
292
fprintf(stream,"Content-type: text/plain\n\n");
293
for(i=0;i<n;i++) nvpair_print_old_classads(array[i], stream);
294
} else if(!strcmp(path, "/query.newclassads")) {
295
fprintf(stream,"Content-type: text/plain\n\n");
296
for(i=0;i<n;i++) nvpair_print_new_classads(array[i], stream);
297
} else if(!strcmp(path, "/query.xml")) {
298
fprintf(stream,"Content-type: text/xml\n\n");
299
fprintf(stream, "<?xml version=\"1.0\" standalone=\"yes\"?>\n");
300
fprintf(stream, "<catalog>\n");
301
for(i=0;i<n;i++) nvpair_print_xml(array[i], stream);
302
fprintf(stream, "</catalog>\n");
303
} else if(sscanf(path, "/detail/%s", key) == 1) {
305
fprintf(stream,"Content-type: text/html\n\n");
306
nv = hash_cache_lookup(table, key);
308
const char *name = nvpair_lookup_string(nv, "name");
311
fprintf(stream, "<title>%s storage catalog: %s</title>\n", hostname, name);
312
fprintf(stream, "<center>\n");
313
fprintf(stream, "<h1>%s storage catalog</h1>\n", hostname);
314
fprintf(stream, "<h2>%s</h2>\n", name);
315
fprintf(stream, "<p><a href=/>return to catalog view</a><p>\n");
316
nvpair_print_html_solo(nv, stream);
317
fprintf(stream, "</center>\n");
319
fprintf(stream, "<title>%s storage catalog</title>\n", hostname);
320
fprintf(stream, "<center>\n");
321
fprintf(stream, "<h1>%s storage catalog</h1>\n", hostname);
322
fprintf(stream, "<h2>Unknown Item!</h2>\n");
323
fprintf(stream, "</center>\n");
326
char avail_line[LINE_MAX];
327
char total_line[LINE_MAX];
328
INT64_T sum_total = 0;
329
INT64_T sum_avail = 0;
330
INT64_T sum_devices = 0;
332
fprintf(stream, "Content-type: text/html\n\n");
333
fprintf(stream, "<title>%s storage catalog</title>\n", hostname);
334
fprintf(stream, "<center>\n");
335
fprintf(stream, "<h1>%s storage catalog</h1>\n", hostname);
336
fprintf(stream, "<a href=/query.text>text</a> - ");
337
fprintf(stream, "<a href=/query.html>html</a> - ");
338
fprintf(stream, "<a href=/query.xml>xml</a> - ");
339
fprintf(stream, "<a href=/query.oldclassads>oldclassads</a> - ");
340
fprintf(stream, "<a href=/query.newclassads>newclassads</a>");
341
fprintf(stream, "<p>\n");
345
sum_total += nvpair_lookup_integer(nv, "total");
346
sum_avail += nvpair_lookup_integer(nv, "avail");
350
string_metric(sum_avail, -1, avail_line);
351
string_metric(sum_total, -1, total_line);
352
fprintf(stream, "<b>%sB available out of %sB on %d devices</b><p>\n", avail_line, total_line, (int) sum_devices);
354
nvpair_print_html_header(stream, html_headers);
357
make_hash_key(nv, key);
358
sprintf(url, "/detail/%s", key);
359
nvpair_print_html_with_link(nv, stream, html_headers, "name", url);
361
nvpair_print_html_footer(stream, html_headers);
362
fprintf(stream, "</center>\n");
367
static void show_version(const char *cmd)
369
printf("%s version %d.%d.%d built by %s@%s on %s at %s\n", cmd, CCTOOLS_VERSION_MAJOR, CCTOOLS_VERSION_MINOR, CCTOOLS_VERSION_MICRO, BUILD_USER, BUILD_HOST, __DATE__, __TIME__);
372
static void show_help(const char *cmd)
374
printf("Use: %s [options]\n", cmd);
375
printf("where options are:\n");
376
printf(" -p <port> Port number to listen on (default is %d)\n", port);
377
printf(" -l <secs> Lifetime of data, in seconds (default is %d)\n", lifetime);
378
printf(" -d <subsystem> Enable debugging for this subsystem\n");
379
printf(" -o <file> Send debugging to this file.\n");
380
printf(" -O <bytes> Rotate debug file once it reaches this size.\n");
381
printf(" -u <host> Send status updates to this host. (default is %s)\n", CATALOG_HOST_DEFAULT);
382
printf(" -M <size> Maximum size of a server to be believed. (default is any)\n");
383
printf(" -U <time> Send status updates at this interval. (default is 5m)\n");
384
printf(" -S Single process mode; do not work on queries.\n");
385
printf(" -v Show version string\n");
386
printf(" -h Show this help screen\n");
389
int main(int argc, char *argv[])
391
struct link *link, *list_port = 0;
395
outgoing_host_list = list_create();
397
debug_config(argv[0]);
399
while((ch = getopt(argc, argv, "p:l:M:d:o:O:u:U:Shv")) != (char) -1) {
402
debug_flags_set(optarg);
405
max_server_size = string_metric_parse(optarg);
411
debug_config_file(optarg);
414
debug_config_file_size(string_metric_parse(optarg));
417
list_push_head(outgoing_host_list, xstrdup(optarg));
420
outgoing_timeout = string_time_parse(optarg);
423
lifetime = string_time_parse(optarg);
429
show_version(argv[0]);
439
debug(D_ALL, "*** %s starting at %s", argv[0], ctime(¤t));
441
if(!list_size(outgoing_host_list)) {
442
list_push_head(outgoing_host_list, CATALOG_HOST_DEFAULT);
445
install_handler(SIGPIPE, ignore_signal);
446
install_handler(SIGHUP, ignore_signal);
447
install_handler(SIGCHLD, reap_child);
448
install_handler(SIGINT, shutdown_clean);
449
install_handler(SIGTERM, shutdown_clean);
450
install_handler(SIGQUIT, shutdown_clean);
452
domain_name_cache_guess(hostname);
456
table = hash_cache_create(127, hash_string, (hash_cache_cleanup_t) nvpair_delete);
458
fatal("couldn't make hash table");
460
update_dgram = datagram_create(port);
462
fatal("couldn't listen on udp port %d", port);
464
outgoing_dgram = datagram_create(0);
466
fatal("couldn't create outgoing udp port");
468
list_port = link_serve(port);
470
fatal("couldn't listen on tcp port %d", port);
474
int ufd = datagram_fd(update_dgram);
475
int lfd = link_fd(list_port);
477
struct timeval timeout;
479
if(time(0) > outgoing_alarm) {
480
update_all_catalogs(outgoing_dgram);
481
outgoing_alarm = time(0) + outgoing_timeout;
487
maxfd = MAX(ufd,lfd)+1;
492
result = select(maxfd,&rfds,0,0,&timeout);
493
if(result<=0) continue;
495
if(FD_ISSET(ufd,&rfds)) {
496
handle_updates(update_dgram);
499
if(FD_ISSET(lfd,&rfds)) {
500
link = link_accept(list_port, time(0) + 5);