~ubuntu-branches/ubuntu/maverick/rrdtool/maverick

« back to all changes in this revision

Viewing changes to src/rrd_daemon.c

  • Committer: Bazaar Package Importer
  • Author(s): Clint Byrum
  • Date: 2010-07-22 08:07:01 UTC
  • mfrom: (1.2.8 upstream) (3.1.6 sid)
  • Revision ID: james.westby@ubuntu.com-20100722080701-k46mgdfz6euxwqsm
Tags: 1.4.3-1ubuntu1
* Merge from debian unstable, Remaining changes:
  - debian/control: Don't build against ruby1.9 as we don't want
    it in main.
* require libdbi >= 0.8.3 to prevent aborts when using dbi datasources

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/**
 
2
 * RRDTool - src/rrd_daemon.c
 
3
 * Copyright (C) 2008,2009 Florian octo Forster
 
4
 * Copyright (C) 2008,2009 Kevin Brintnall
 
5
 *
 
6
 * This program is free software; you can redistribute it and/or modify it
 
7
 * under the terms of the GNU General Public License as published by the
 
8
 * Free Software Foundation; only version 2 of the License is applicable.
 
9
 *
 
10
 * This program is distributed in the hope that it will be useful, but
 
11
 * WITHOUT ANY WARRANTY; without even the implied warranty of
 
12
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 
13
 * General Public License for more details.
 
14
 *
 
15
 * You should have received a copy of the GNU General Public License along
 
16
 * with this program; if not, write to the Free Software Foundation, Inc.,
 
17
 * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
 
18
 *
 
19
 * Authors:
 
20
 *   Florian octo Forster <octo at verplant.org>
 
21
 *   kevin brintnall <kbrint@rufus.net>
 
22
 **/
 
23
 
 
24
#if 0
 
25
/*
 
26
 * First tell the compiler to stick to the C99 and POSIX standards as close as
 
27
 * possible.
 
28
 */
 
29
#ifndef __STRICT_ANSI__ /* {{{ */
 
30
# define __STRICT_ANSI__
 
31
#endif
 
32
 
 
33
#ifndef _ISOC99_SOURCE
 
34
# define _ISOC99_SOURCE
 
35
#endif
 
36
 
 
37
#ifdef _POSIX_C_SOURCE
 
38
# undef _POSIX_C_SOURCE
 
39
#endif
 
40
#define _POSIX_C_SOURCE 200112L
 
41
 
 
42
/* Single UNIX needed for strdup. */
 
43
#ifdef _XOPEN_SOURCE
 
44
# undef _XOPEN_SOURCE
 
45
#endif
 
46
#define _XOPEN_SOURCE 500
 
47
 
 
48
#ifndef _REENTRANT
 
49
# define _REENTRANT
 
50
#endif
 
51
 
 
52
#ifndef _THREAD_SAFE
 
53
# define _THREAD_SAFE
 
54
#endif
 
55
 
 
56
#ifdef _GNU_SOURCE
 
57
# undef _GNU_SOURCE
 
58
#endif
 
59
/* }}} */
 
60
#endif /* 0 */
 
61
 
 
62
/*
 
63
 * Now for some includes..
 
64
 */
 
65
/* {{{ */
 
66
#if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__) && !defined(HAVE_CONFIG_H)
 
67
#include "../win32/config.h"
 
68
#else
 
69
#ifdef HAVE_CONFIG_H
 
70
#include "../rrd_config.h"
 
71
#endif
 
72
#endif
 
73
 
 
74
#include "rrd.h"
 
75
#include "rrd_client.h"
 
76
 
 
77
#include <stdlib.h>
 
78
 
 
79
#ifndef WIN32
 
80
#ifdef HAVE_STDINT_H
 
81
#  include <stdint.h>
 
82
#endif
 
83
#include <unistd.h>
 
84
#include <strings.h>
 
85
#include <inttypes.h>
 
86
#include <sys/socket.h>
 
87
 
 
88
#else
 
89
 
 
90
#endif
 
91
#include <stdio.h>
 
92
#include <string.h>
 
93
 
 
94
#include <sys/types.h>
 
95
#include <sys/stat.h>
 
96
#include <dirent.h>
 
97
#include <fcntl.h>
 
98
#include <signal.h>
 
99
#include <sys/un.h>
 
100
#include <netdb.h>
 
101
#include <poll.h>
 
102
#include <syslog.h>
 
103
#include <pthread.h>
 
104
#include <errno.h>
 
105
#include <assert.h>
 
106
#include <sys/time.h>
 
107
#include <time.h>
 
108
#include <libgen.h>
 
109
#include <grp.h>
 
110
 
 
111
#include <glib-2.0/glib.h>
 
112
/* }}} */
 
113
 
 
114
#define RRDD_LOG(severity, ...) \
 
115
  do { \
 
116
    if (stay_foreground) \
 
117
      fprintf(stderr, __VA_ARGS__); \
 
118
    syslog ((severity), __VA_ARGS__); \
 
119
  } while (0)
 
120
 
 
121
#ifndef __GNUC__
 
122
# define __attribute__(x) /**/
 
123
#endif
 
124
 
 
125
/*
 
126
 * Types
 
127
 */
 
128
typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
 
129
 
 
130
struct listen_socket_s
 
131
{
 
132
  int fd;
 
133
  char addr[PATH_MAX + 1];
 
134
  int family;
 
135
 
 
136
  /* state for BATCH processing */
 
137
  time_t batch_start;
 
138
  int batch_cmd;
 
139
 
 
140
  /* buffered IO */
 
141
  char *rbuf;
 
142
  off_t next_cmd;
 
143
  off_t next_read;
 
144
 
 
145
  char *wbuf;
 
146
  ssize_t wbuf_len;
 
147
 
 
148
  uint32_t permissions;
 
149
 
 
150
  gid_t  socket_group;
 
151
  mode_t socket_permissions;
 
152
};
 
153
typedef struct listen_socket_s listen_socket_t;
 
154
 
 
155
struct command_s;
 
156
typedef struct command_s command_t;
 
157
/* note: guard against "unused" warnings in the handlers */
 
158
#define DISPATCH_PROTO  listen_socket_t *sock   __attribute__((unused)),\
 
159
                        time_t now              __attribute__((unused)),\
 
160
                        char  *buffer           __attribute__((unused)),\
 
161
                        size_t buffer_size      __attribute__((unused))
 
162
 
 
163
#define HANDLER_PROTO   command_t *cmd          __attribute__((unused)),\
 
164
                        DISPATCH_PROTO
 
165
 
 
166
struct command_s {
 
167
  char   *cmd;
 
168
  int (*handler)(HANDLER_PROTO);
 
169
 
 
170
  char  context;                /* where we expect to see it */
 
171
#define CMD_CONTEXT_CLIENT      (1<<0)
 
172
#define CMD_CONTEXT_BATCH       (1<<1)
 
173
#define CMD_CONTEXT_JOURNAL     (1<<2)
 
174
#define CMD_CONTEXT_ANY         (0x7f)
 
175
 
 
176
  char *syntax;
 
177
  char *help;
 
178
};
 
179
 
 
180
struct cache_item_s;
 
181
typedef struct cache_item_s cache_item_t;
 
182
struct cache_item_s
 
183
{
 
184
  char *file;
 
185
  char **values;
 
186
  size_t values_num;
 
187
  time_t last_flush_time;
 
188
  time_t last_update_stamp;
 
189
#define CI_FLAGS_IN_TREE  (1<<0)
 
190
#define CI_FLAGS_IN_QUEUE (1<<1)
 
191
  int flags;
 
192
  pthread_cond_t  flushed;
 
193
  cache_item_t *prev;
 
194
  cache_item_t *next;
 
195
};
 
196
 
 
197
struct callback_flush_data_s
 
198
{
 
199
  time_t now;
 
200
  time_t abs_timeout;
 
201
  char **keys;
 
202
  size_t keys_num;
 
203
};
 
204
typedef struct callback_flush_data_s callback_flush_data_t;
 
205
 
 
206
enum queue_side_e
 
207
{
 
208
  HEAD,
 
209
  TAIL
 
210
};
 
211
typedef enum queue_side_e queue_side_t;
 
212
 
 
213
/* describe a set of journal files */
 
214
typedef struct {
 
215
  char **files;
 
216
  size_t files_num;
 
217
} journal_set;
 
218
 
 
219
/* max length of socket command or response */
 
220
#define CMD_MAX 4096
 
221
#define RBUF_SIZE (CMD_MAX*2)
 
222
 
 
223
/*
 
224
 * Variables
 
225
 */
 
226
static int stay_foreground = 0;
 
227
static uid_t daemon_uid;
 
228
 
 
229
static listen_socket_t *listen_fds = NULL;
 
230
static size_t listen_fds_num = 0;
 
231
 
 
232
enum {
 
233
  RUNNING,              /* normal operation */
 
234
  FLUSHING,             /* flushing remaining values */
 
235
  SHUTDOWN              /* shutting down */
 
236
} state = RUNNING;
 
237
 
 
238
static pthread_t *queue_threads;
 
239
static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
 
240
static int config_queue_threads = 4;
 
241
 
 
242
static pthread_t flush_thread;
 
243
static pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER;
 
244
 
 
245
static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
 
246
static pthread_cond_t  connection_threads_done = PTHREAD_COND_INITIALIZER;
 
247
static int connection_threads_num = 0;
 
248
 
 
249
/* Cache stuff */
 
250
static GTree          *cache_tree = NULL;
 
251
static cache_item_t   *cache_queue_head = NULL;
 
252
static cache_item_t   *cache_queue_tail = NULL;
 
253
static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
 
254
 
 
255
static int config_write_interval = 300;
 
256
static int config_write_jitter   = 0;
 
257
static int config_flush_interval = 3600;
 
258
static int config_flush_at_shutdown = 0;
 
259
static char *config_pid_file = NULL;
 
260
static char *config_base_dir = NULL;
 
261
static size_t _config_base_dir_len = 0;
 
262
static int config_write_base_only = 0;
 
263
 
 
264
static listen_socket_t **config_listen_address_list = NULL;
 
265
static size_t config_listen_address_list_len = 0;
 
266
 
 
267
static uint64_t stats_queue_length = 0;
 
268
static uint64_t stats_updates_received = 0;
 
269
static uint64_t stats_flush_received = 0;
 
270
static uint64_t stats_updates_written = 0;
 
271
static uint64_t stats_data_sets_written = 0;
 
272
static uint64_t stats_journal_bytes = 0;
 
273
static uint64_t stats_journal_rotate = 0;
 
274
static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
 
275
 
 
276
/* Journaled updates */
 
277
#define JOURNAL_REPLAY(s) ((s) == NULL)
 
278
#define JOURNAL_BASE "rrd.journal"
 
279
static journal_set *journal_cur = NULL;
 
280
static journal_set *journal_old = NULL;
 
281
static char *journal_dir = NULL;
 
282
static FILE *journal_fh = NULL;         /* current journal file handle */
 
283
static long  journal_size = 0;          /* current journal size */
 
284
#define JOURNAL_MAX (1 * 1024 * 1024 * 1024)
 
285
static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
 
286
static int journal_write(char *cmd, char *args);
 
287
static void journal_done(void);
 
288
static void journal_rotate(void);
 
289
 
 
290
/* prototypes for forward refernces */
 
291
static int handle_request_help (HANDLER_PROTO);
 
292
 
 
293
/* 
 
294
 * Functions
 
295
 */
 
296
static void sig_common (const char *sig) /* {{{ */
 
297
{
 
298
  RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
 
299
  state = FLUSHING;
 
300
  pthread_cond_broadcast(&flush_cond);
 
301
  pthread_cond_broadcast(&queue_cond);
 
302
} /* }}} void sig_common */
 
303
 
 
304
static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
 
305
{
 
306
  sig_common("INT");
 
307
} /* }}} void sig_int_handler */
 
308
 
 
309
static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
 
310
{
 
311
  sig_common("TERM");
 
312
} /* }}} void sig_term_handler */
 
313
 
 
314
static void sig_usr1_handler (int s __attribute__((unused))) /* {{{ */
 
315
{
 
316
  config_flush_at_shutdown = 1;
 
317
  sig_common("USR1");
 
318
} /* }}} void sig_usr1_handler */
 
319
 
 
320
static void sig_usr2_handler (int s __attribute__((unused))) /* {{{ */
 
321
{
 
322
  config_flush_at_shutdown = 0;
 
323
  sig_common("USR2");
 
324
} /* }}} void sig_usr2_handler */
 
325
 
 
326
static void install_signal_handlers(void) /* {{{ */
 
327
{
 
328
  /* These structures are static, because `sigaction' behaves weird if the are
 
329
   * overwritten.. */
 
330
  static struct sigaction sa_int;
 
331
  static struct sigaction sa_term;
 
332
  static struct sigaction sa_pipe;
 
333
  static struct sigaction sa_usr1;
 
334
  static struct sigaction sa_usr2;
 
335
 
 
336
  /* Install signal handlers */
 
337
  memset (&sa_int, 0, sizeof (sa_int));
 
338
  sa_int.sa_handler = sig_int_handler;
 
339
  sigaction (SIGINT, &sa_int, NULL);
 
340
 
 
341
  memset (&sa_term, 0, sizeof (sa_term));
 
342
  sa_term.sa_handler = sig_term_handler;
 
343
  sigaction (SIGTERM, &sa_term, NULL);
 
344
 
 
345
  memset (&sa_pipe, 0, sizeof (sa_pipe));
 
346
  sa_pipe.sa_handler = SIG_IGN;
 
347
  sigaction (SIGPIPE, &sa_pipe, NULL);
 
348
 
 
349
  memset (&sa_pipe, 0, sizeof (sa_usr1));
 
350
  sa_usr1.sa_handler = sig_usr1_handler;
 
351
  sigaction (SIGUSR1, &sa_usr1, NULL);
 
352
 
 
353
  memset (&sa_usr2, 0, sizeof (sa_usr2));
 
354
  sa_usr2.sa_handler = sig_usr2_handler;
 
355
  sigaction (SIGUSR2, &sa_usr2, NULL);
 
356
 
 
357
} /* }}} void install_signal_handlers */
 
358
 
 
359
static int open_pidfile(char *action, int oflag) /* {{{ */
 
360
{
 
361
  int fd;
 
362
  const char *file;
 
363
  char *file_copy, *dir;
 
364
 
 
365
  file = (config_pid_file != NULL)
 
366
    ? config_pid_file
 
367
    : LOCALSTATEDIR "/run/rrdcached.pid";
 
368
 
 
369
  /* dirname may modify its argument */
 
370
  file_copy = strdup(file);
 
371
  if (file_copy == NULL)
 
372
  {
 
373
    fprintf(stderr, "rrdcached: strdup(): %s\n",
 
374
        rrd_strerror(errno));
 
375
    return -1;
 
376
  }
 
377
 
 
378
  dir = dirname(file_copy);
 
379
  if (rrd_mkdir_p(dir, 0777) != 0)
 
380
  {
 
381
    fprintf(stderr, "Failed to create pidfile directory '%s': %s\n",
 
382
        dir, rrd_strerror(errno));
 
383
    return -1;
 
384
  }
 
385
 
 
386
  free(file_copy);
 
387
 
 
388
  fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
 
389
  if (fd < 0)
 
390
    fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
 
391
            action, file, rrd_strerror(errno));
 
392
 
 
393
  return(fd);
 
394
} /* }}} static int open_pidfile */
 
395
 
 
396
/* check existing pid file to see whether a daemon is running */
 
397
static int check_pidfile(void)
 
398
{
 
399
  int pid_fd;
 
400
  pid_t pid;
 
401
  char pid_str[16];
 
402
 
 
403
  pid_fd = open_pidfile("open", O_RDWR);
 
404
  if (pid_fd < 0)
 
405
    return pid_fd;
 
406
 
 
407
  if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
 
408
    return -1;
 
409
 
 
410
  pid = atoi(pid_str);
 
411
  if (pid <= 0)
 
412
    return -1;
 
413
 
 
414
  /* another running process that we can signal COULD be
 
415
   * a competing rrdcached */
 
416
  if (pid != getpid() && kill(pid, 0) == 0)
 
417
  {
 
418
    fprintf(stderr,
 
419
            "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
 
420
    close(pid_fd);
 
421
    return -1;
 
422
  }
 
423
 
 
424
  lseek(pid_fd, 0, SEEK_SET);
 
425
  if (ftruncate(pid_fd, 0) == -1)
 
426
  {
 
427
    fprintf(stderr,
 
428
            "FATAL: Faild to truncate stale PID file. (pid %d)\n", pid);
 
429
    close(pid_fd);
 
430
    return -1;
 
431
  }
 
432
 
 
433
  fprintf(stderr,
 
434
          "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
 
435
          "rrdcached: starting normally.\n", pid);
 
436
 
 
437
  return pid_fd;
 
438
} /* }}} static int check_pidfile */
 
439
 
 
440
static int write_pidfile (int fd) /* {{{ */
 
441
{
 
442
  pid_t pid;
 
443
  FILE *fh;
 
444
 
 
445
  pid = getpid ();
 
446
 
 
447
  fh = fdopen (fd, "w");
 
448
  if (fh == NULL)
 
449
  {
 
450
    RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
 
451
    close(fd);
 
452
    return (-1);
 
453
  }
 
454
 
 
455
  fprintf (fh, "%i\n", (int) pid);
 
456
  fclose (fh);
 
457
 
 
458
  return (0);
 
459
} /* }}} int write_pidfile */
 
460
 
 
461
static int remove_pidfile (void) /* {{{ */
 
462
{
 
463
  char *file;
 
464
  int status;
 
465
 
 
466
  file = (config_pid_file != NULL)
 
467
    ? config_pid_file
 
468
    : LOCALSTATEDIR "/run/rrdcached.pid";
 
469
 
 
470
  status = unlink (file);
 
471
  if (status == 0)
 
472
    return (0);
 
473
  return (errno);
 
474
} /* }}} int remove_pidfile */
 
475
 
 
476
static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
 
477
{
 
478
  char *eol;
 
479
 
 
480
  eol = memchr(sock->rbuf + sock->next_cmd, '\n',
 
481
               sock->next_read - sock->next_cmd);
 
482
 
 
483
  if (eol == NULL)
 
484
  {
 
485
    /* no commands left, move remainder back to front of rbuf */
 
486
    memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
 
487
            sock->next_read - sock->next_cmd);
 
488
    sock->next_read -= sock->next_cmd;
 
489
    sock->next_cmd = 0;
 
490
    *len = 0;
 
491
    return NULL;
 
492
  }
 
493
  else
 
494
  {
 
495
    char *cmd = sock->rbuf + sock->next_cmd;
 
496
    *eol = '\0';
 
497
 
 
498
    sock->next_cmd = eol - sock->rbuf + 1;
 
499
 
 
500
    if (eol > sock->rbuf && *(eol-1) == '\r')
 
501
      *(--eol) = '\0'; /* handle "\r\n" EOL */
 
502
 
 
503
    *len = eol - cmd;
 
504
 
 
505
    return cmd;
 
506
  }
 
507
 
 
508
  /* NOTREACHED */
 
509
  assert(1==0);
 
510
} /* }}} char *next_cmd */
 
511
 
 
512
/* add the characters directly to the write buffer */
 
513
static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
 
514
{
 
515
  char *new_buf;
 
516
 
 
517
  assert(sock != NULL);
 
518
 
 
519
  new_buf = rrd_realloc(sock->wbuf, sock->wbuf_len + len + 1);
 
520
  if (new_buf == NULL)
 
521
  {
 
522
    RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
 
523
    return -1;
 
524
  }
 
525
 
 
526
  strncpy(new_buf + sock->wbuf_len, str, len + 1);
 
527
 
 
528
  sock->wbuf = new_buf;
 
529
  sock->wbuf_len += len;
 
530
 
 
531
  return 0;
 
532
} /* }}} static int add_to_wbuf */
 
533
 
 
534
/* add the text to the "extra" info that's sent after the status line */
 
535
static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
 
536
{
 
537
  va_list argp;
 
538
  char buffer[CMD_MAX];
 
539
  int len;
 
540
 
 
541
  if (JOURNAL_REPLAY(sock)) return 0;
 
542
  if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
 
543
 
 
544
  va_start(argp, fmt);
 
545
#ifdef HAVE_VSNPRINTF
 
546
  len = vsnprintf(buffer, sizeof(buffer), fmt, argp);
 
547
#else
 
548
  len = vsprintf(buffer, fmt, argp);
 
549
#endif
 
550
  va_end(argp);
 
551
  if (len < 0)
 
552
  {
 
553
    RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
 
554
    return -1;
 
555
  }
 
556
 
 
557
  return add_to_wbuf(sock, buffer, len);
 
558
} /* }}} static int add_response_info */
 
559
 
 
560
static int count_lines(char *str) /* {{{ */
 
561
{
 
562
  int lines = 0;
 
563
 
 
564
  if (str != NULL)
 
565
  {
 
566
    while ((str = strchr(str, '\n')) != NULL)
 
567
    {
 
568
      ++lines;
 
569
      ++str;
 
570
    }
 
571
  }
 
572
 
 
573
  return lines;
 
574
} /* }}} static int count_lines */
 
575
 
 
576
/* send the response back to the user.
 
577
 * returns 0 on success, -1 on error
 
578
 * write buffer is always zeroed after this call */
 
579
static int send_response (listen_socket_t *sock, response_code rc,
 
580
                          char *fmt, ...) /* {{{ */
 
581
{
 
582
  va_list argp;
 
583
  char buffer[CMD_MAX];
 
584
  int lines;
 
585
  ssize_t wrote;
 
586
  int rclen, len;
 
587
 
 
588
  if (JOURNAL_REPLAY(sock)) return rc;
 
589
 
 
590
  if (sock->batch_start)
 
591
  {
 
592
    if (rc == RESP_OK)
 
593
      return rc; /* no response on success during BATCH */
 
594
    lines = sock->batch_cmd;
 
595
  }
 
596
  else if (rc == RESP_OK)
 
597
    lines = count_lines(sock->wbuf);
 
598
  else
 
599
    lines = -1;
 
600
 
 
601
  rclen = sprintf(buffer, "%d ", lines);
 
602
  va_start(argp, fmt);
 
603
#ifdef HAVE_VSNPRINTF
 
604
  len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen, fmt, argp);
 
605
#else
 
606
  len = vsprintf(buffer+rclen, fmt, argp);
 
607
#endif
 
608
  va_end(argp);
 
609
  if (len < 0)
 
610
    return -1;
 
611
 
 
612
  len += rclen;
 
613
 
 
614
  /* append the result to the wbuf, don't write to the user */
 
615
  if (sock->batch_start)
 
616
    return add_to_wbuf(sock, buffer, len);
 
617
 
 
618
  /* first write must be complete */
 
619
  if (len != write(sock->fd, buffer, len))
 
620
  {
 
621
    RRDD_LOG(LOG_INFO, "send_response: could not write status message");
 
622
    return -1;
 
623
  }
 
624
 
 
625
  if (sock->wbuf != NULL && rc == RESP_OK)
 
626
  {
 
627
    wrote = 0;
 
628
    while (wrote < sock->wbuf_len)
 
629
    {
 
630
      ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
 
631
      if (wb <= 0)
 
632
      {
 
633
        RRDD_LOG(LOG_INFO, "send_response: could not write results");
 
634
        return -1;
 
635
      }
 
636
      wrote += wb;
 
637
    }
 
638
  }
 
639
 
 
640
  free(sock->wbuf); sock->wbuf = NULL;
 
641
  sock->wbuf_len = 0;
 
642
 
 
643
  return 0;
 
644
} /* }}} */
 
645
 
 
646
static void wipe_ci_values(cache_item_t *ci, time_t when)
 
647
{
 
648
  ci->values = NULL;
 
649
  ci->values_num = 0;
 
650
 
 
651
  ci->last_flush_time = when;
 
652
  if (config_write_jitter > 0)
 
653
    ci->last_flush_time += (rrd_random() % config_write_jitter);
 
654
}
 
655
 
 
656
/* remove_from_queue
 
657
 * remove a "cache_item_t" item from the queue.
 
658
 * must hold 'cache_lock' when calling this
 
659
 */
 
660
static void remove_from_queue(cache_item_t *ci) /* {{{ */
 
661
{
 
662
  if (ci == NULL) return;
 
663
  if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
 
664
 
 
665
  if (ci->prev == NULL)
 
666
    cache_queue_head = ci->next; /* reset head */
 
667
  else
 
668
    ci->prev->next = ci->next;
 
669
 
 
670
  if (ci->next == NULL)
 
671
    cache_queue_tail = ci->prev; /* reset the tail */
 
672
  else
 
673
    ci->next->prev = ci->prev;
 
674
 
 
675
  ci->next = ci->prev = NULL;
 
676
  ci->flags &= ~CI_FLAGS_IN_QUEUE;
 
677
 
 
678
  pthread_mutex_lock (&stats_lock);
 
679
  assert (stats_queue_length > 0);
 
680
  stats_queue_length--;
 
681
  pthread_mutex_unlock (&stats_lock);
 
682
 
 
683
} /* }}} static void remove_from_queue */
 
684
 
 
685
/* free the resources associated with the cache_item_t
 
686
 * must hold cache_lock when calling this function
 
687
 */
 
688
static void *free_cache_item(cache_item_t *ci) /* {{{ */
 
689
{
 
690
  if (ci == NULL) return NULL;
 
691
 
 
692
  remove_from_queue(ci);
 
693
 
 
694
  for (size_t i=0; i < ci->values_num; i++)
 
695
    free(ci->values[i]);
 
696
 
 
697
  free (ci->values);
 
698
  free (ci->file);
 
699
 
 
700
  /* in case anyone is waiting */
 
701
  pthread_cond_broadcast(&ci->flushed);
 
702
  pthread_cond_destroy(&ci->flushed);
 
703
 
 
704
  free (ci);
 
705
 
 
706
  return NULL;
 
707
} /* }}} static void *free_cache_item */
 
708
 
 
709
/*
 
710
 * enqueue_cache_item:
 
711
 * `cache_lock' must be acquired before calling this function!
 
712
 */
 
713
static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
 
714
    queue_side_t side)
 
715
{
 
716
  if (ci == NULL)
 
717
    return (-1);
 
718
 
 
719
  if (ci->values_num == 0)
 
720
    return (0);
 
721
 
 
722
  if (side == HEAD)
 
723
  {
 
724
    if (cache_queue_head == ci)
 
725
      return 0;
 
726
 
 
727
    /* remove if further down in queue */
 
728
    remove_from_queue(ci);
 
729
 
 
730
    ci->prev = NULL;
 
731
    ci->next = cache_queue_head;
 
732
    if (ci->next != NULL)
 
733
      ci->next->prev = ci;
 
734
    cache_queue_head = ci;
 
735
 
 
736
    if (cache_queue_tail == NULL)
 
737
      cache_queue_tail = cache_queue_head;
 
738
  }
 
739
  else /* (side == TAIL) */
 
740
  {
 
741
    /* We don't move values back in the list.. */
 
742
    if (ci->flags & CI_FLAGS_IN_QUEUE)
 
743
      return (0);
 
744
 
 
745
    assert (ci->next == NULL);
 
746
    assert (ci->prev == NULL);
 
747
 
 
748
    ci->prev = cache_queue_tail;
 
749
 
 
750
    if (cache_queue_tail == NULL)
 
751
      cache_queue_head = ci;
 
752
    else
 
753
      cache_queue_tail->next = ci;
 
754
 
 
755
    cache_queue_tail = ci;
 
756
  }
 
757
 
 
758
  ci->flags |= CI_FLAGS_IN_QUEUE;
 
759
 
 
760
  pthread_cond_signal(&queue_cond);
 
761
  pthread_mutex_lock (&stats_lock);
 
762
  stats_queue_length++;
 
763
  pthread_mutex_unlock (&stats_lock);
 
764
 
 
765
  return (0);
 
766
} /* }}} int enqueue_cache_item */
 
767
 
 
768
/*
 
769
 * tree_callback_flush:
 
770
 * Called via `g_tree_foreach' in `flush_thread_main'. `cache_lock' is held
 
771
 * while this is in progress.
 
772
 */
 
773
static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
 
774
    gpointer data)
 
775
{
 
776
  cache_item_t *ci;
 
777
  callback_flush_data_t *cfd;
 
778
 
 
779
  ci = (cache_item_t *) value;
 
780
  cfd = (callback_flush_data_t *) data;
 
781
 
 
782
  if (ci->flags & CI_FLAGS_IN_QUEUE)
 
783
    return FALSE;
 
784
 
 
785
  if (ci->values_num > 0
 
786
      && (ci->last_flush_time <= cfd->abs_timeout || state != RUNNING))
 
787
  {
 
788
    enqueue_cache_item (ci, TAIL);
 
789
  }
 
790
  else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
 
791
      && (ci->values_num <= 0))
 
792
  {
 
793
    assert ((char *) key == ci->file);
 
794
    if (!rrd_add_ptr((void ***)&cfd->keys, &cfd->keys_num, (void *)key))
 
795
    {
 
796
      RRDD_LOG (LOG_ERR, "tree_callback_flush: rrd_add_ptrs failed.");
 
797
      return (FALSE);
 
798
    }
 
799
  }
 
800
 
 
801
  return (FALSE);
 
802
} /* }}} gboolean tree_callback_flush */
 
803
 
 
804
static int flush_old_values (int max_age)
 
805
{
 
806
  callback_flush_data_t cfd;
 
807
  size_t k;
 
808
 
 
809
  memset (&cfd, 0, sizeof (cfd));
 
810
  /* Pass the current time as user data so that we don't need to call
 
811
   * `time' for each node. */
 
812
  cfd.now = time (NULL);
 
813
  cfd.keys = NULL;
 
814
  cfd.keys_num = 0;
 
815
 
 
816
  if (max_age > 0)
 
817
    cfd.abs_timeout = cfd.now - max_age;
 
818
  else
 
819
    cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
 
820
 
 
821
  /* `tree_callback_flush' will return the keys of all values that haven't
 
822
   * been touched in the last `config_flush_interval' seconds in `cfd'.
 
823
   * The char*'s in this array point to the same memory as ci->file, so we
 
824
   * don't need to free them separately. */
 
825
  g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
 
826
 
 
827
  for (k = 0; k < cfd.keys_num; k++)
 
828
  {
 
829
    gboolean status = g_tree_remove(cache_tree, cfd.keys[k]);
 
830
    /* should never fail, since we have held the cache_lock
 
831
     * the entire time */
 
832
    assert(status == TRUE);
 
833
  }
 
834
 
 
835
  if (cfd.keys != NULL)
 
836
  {
 
837
    free (cfd.keys);
 
838
    cfd.keys = NULL;
 
839
  }
 
840
 
 
841
  return (0);
 
842
} /* int flush_old_values */
 
843
 
 
844
static void *flush_thread_main (void *args __attribute__((unused))) /* {{{ */
 
845
{
 
846
  struct timeval now;
 
847
  struct timespec next_flush;
 
848
  int status;
 
849
 
 
850
  gettimeofday (&now, NULL);
 
851
  next_flush.tv_sec = now.tv_sec + config_flush_interval;
 
852
  next_flush.tv_nsec = 1000 * now.tv_usec;
 
853
 
 
854
  pthread_mutex_lock(&cache_lock);
 
855
 
 
856
  while (state == RUNNING)
 
857
  {
 
858
    gettimeofday (&now, NULL);
 
859
    if ((now.tv_sec > next_flush.tv_sec)
 
860
        || ((now.tv_sec == next_flush.tv_sec)
 
861
          && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
 
862
    {
 
863
      RRDD_LOG(LOG_DEBUG, "flushing old values");
 
864
 
 
865
      /* Determine the time of the next cache flush. */
 
866
      next_flush.tv_sec = now.tv_sec + config_flush_interval;
 
867
 
 
868
      /* Flush all values that haven't been written in the last
 
869
       * `config_write_interval' seconds. */
 
870
      flush_old_values (config_write_interval);
 
871
 
 
872
      /* unlock the cache while we rotate so we don't block incoming
 
873
       * updates if the fsync() blocks on disk I/O */
 
874
      pthread_mutex_unlock(&cache_lock);
 
875
      journal_rotate();
 
876
      pthread_mutex_lock(&cache_lock);
 
877
    }
 
878
 
 
879
    status = pthread_cond_timedwait(&flush_cond, &cache_lock, &next_flush);
 
880
    if (status != 0 && status != ETIMEDOUT)
 
881
    {
 
882
      RRDD_LOG (LOG_ERR, "flush_thread_main: "
 
883
                "pthread_cond_timedwait returned %i.", status);
 
884
    }
 
885
  }
 
886
 
 
887
  if (config_flush_at_shutdown)
 
888
    flush_old_values (-1); /* flush everything */
 
889
 
 
890
  state = SHUTDOWN;
 
891
 
 
892
  pthread_mutex_unlock(&cache_lock);
 
893
 
 
894
  return NULL;
 
895
} /* void *flush_thread_main */
 
896
 
 
897
static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
 
898
{
 
899
  pthread_mutex_lock (&cache_lock);
 
900
 
 
901
  while (state != SHUTDOWN
 
902
         || (cache_queue_head != NULL && config_flush_at_shutdown))
 
903
  {
 
904
    cache_item_t *ci;
 
905
    char *file;
 
906
    char **values;
 
907
    size_t values_num;
 
908
    int status;
 
909
 
 
910
    /* Now, check if there's something to store away. If not, wait until
 
911
     * something comes in. */
 
912
    if (cache_queue_head == NULL)
 
913
    {
 
914
      status = pthread_cond_wait (&queue_cond, &cache_lock);
 
915
      if ((status != 0) && (status != ETIMEDOUT))
 
916
      {
 
917
        RRDD_LOG (LOG_ERR, "queue_thread_main: "
 
918
            "pthread_cond_wait returned %i.", status);
 
919
      }
 
920
    }
 
921
 
 
922
    /* Check if a value has arrived. This may be NULL if we timed out or there
 
923
     * was an interrupt such as a signal. */
 
924
    if (cache_queue_head == NULL)
 
925
      continue;
 
926
 
 
927
    ci = cache_queue_head;
 
928
 
 
929
    /* copy the relevant parts */
 
930
    file = strdup (ci->file);
 
931
    if (file == NULL)
 
932
    {
 
933
      RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
 
934
      continue;
 
935
    }
 
936
 
 
937
    assert(ci->values != NULL);
 
938
    assert(ci->values_num > 0);
 
939
 
 
940
    values = ci->values;
 
941
    values_num = ci->values_num;
 
942
 
 
943
    wipe_ci_values(ci, time(NULL));
 
944
    remove_from_queue(ci);
 
945
 
 
946
    pthread_mutex_unlock (&cache_lock);
 
947
 
 
948
    rrd_clear_error ();
 
949
    status = rrd_update_r (file, NULL, (int) values_num, (void *) values);
 
950
    if (status != 0)
 
951
    {
 
952
      RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
 
953
          "rrd_update_r (%s) failed with status %i. (%s)",
 
954
          file, status, rrd_get_error());
 
955
    }
 
956
 
 
957
    journal_write("wrote", file);
 
958
 
 
959
    /* Search again in the tree.  It's possible someone issued a "FORGET"
 
960
     * while we were writing the update values. */
 
961
    pthread_mutex_lock(&cache_lock);
 
962
    ci = (cache_item_t *) g_tree_lookup(cache_tree, file);
 
963
    if (ci)
 
964
      pthread_cond_broadcast(&ci->flushed);
 
965
    pthread_mutex_unlock(&cache_lock);
 
966
 
 
967
    if (status == 0)
 
968
    {
 
969
      pthread_mutex_lock (&stats_lock);
 
970
      stats_updates_written++;
 
971
      stats_data_sets_written += values_num;
 
972
      pthread_mutex_unlock (&stats_lock);
 
973
    }
 
974
 
 
975
    rrd_free_ptrs((void ***) &values, &values_num);
 
976
    free(file);
 
977
 
 
978
    pthread_mutex_lock (&cache_lock);
 
979
  }
 
980
  pthread_mutex_unlock (&cache_lock);
 
981
 
 
982
  return (NULL);
 
983
} /* }}} void *queue_thread_main */
 
984
 
 
985
static int buffer_get_field (char **buffer_ret, /* {{{ */
 
986
    size_t *buffer_size_ret, char **field_ret)
 
987
{
 
988
  char *buffer;
 
989
  size_t buffer_pos;
 
990
  size_t buffer_size;
 
991
  char *field;
 
992
  size_t field_size;
 
993
  int status;
 
994
 
 
995
  buffer = *buffer_ret;
 
996
  buffer_pos = 0;
 
997
  buffer_size = *buffer_size_ret;
 
998
  field = *buffer_ret;
 
999
  field_size = 0;
 
1000
 
 
1001
  if (buffer_size <= 0)
 
1002
    return (-1);
 
1003
 
 
1004
  /* This is ensured by `handle_request'. */
 
1005
  assert (buffer[buffer_size - 1] == '\0');
 
1006
 
 
1007
  status = -1;
 
1008
  while (buffer_pos < buffer_size)
 
1009
  {
 
1010
    /* Check for end-of-field or end-of-buffer */
 
1011
    if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
 
1012
    {
 
1013
      field[field_size] = 0;
 
1014
      field_size++;
 
1015
      buffer_pos++;
 
1016
      status = 0;
 
1017
      break;
 
1018
    }
 
1019
    /* Handle escaped characters. */
 
1020
    else if (buffer[buffer_pos] == '\\')
 
1021
    {
 
1022
      if (buffer_pos >= (buffer_size - 1))
 
1023
        break;
 
1024
      buffer_pos++;
 
1025
      field[field_size] = buffer[buffer_pos];
 
1026
      field_size++;
 
1027
      buffer_pos++;
 
1028
    }
 
1029
    /* Normal operation */ 
 
1030
    else
 
1031
    {
 
1032
      field[field_size] = buffer[buffer_pos];
 
1033
      field_size++;
 
1034
      buffer_pos++;
 
1035
    }
 
1036
  } /* while (buffer_pos < buffer_size) */
 
1037
 
 
1038
  if (status != 0)
 
1039
    return (status);
 
1040
 
 
1041
  *buffer_ret = buffer + buffer_pos;
 
1042
  *buffer_size_ret = buffer_size - buffer_pos;
 
1043
  *field_ret = field;
 
1044
 
 
1045
  return (0);
 
1046
} /* }}} int buffer_get_field */
 
1047
 
 
1048
/* if we're restricting writes to the base directory,
 
1049
 * check whether the file falls within the dir
 
1050
 * returns 1 if OK, otherwise 0
 
1051
 */
 
1052
static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
 
1053
{
 
1054
  assert(file != NULL);
 
1055
 
 
1056
  if (!config_write_base_only
 
1057
      || JOURNAL_REPLAY(sock)
 
1058
      || config_base_dir == NULL)
 
1059
    return 1;
 
1060
 
 
1061
  if (strstr(file, "../") != NULL) goto err;
 
1062
 
 
1063
  /* relative paths without "../" are ok */
 
1064
  if (*file != '/') return 1;
 
1065
 
 
1066
  /* file must be of the format base + "/" + <1+ char filename> */
 
1067
  if (strlen(file) < _config_base_dir_len + 2) goto err;
 
1068
  if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
 
1069
  if (*(file + _config_base_dir_len) != '/') goto err;
 
1070
 
 
1071
  return 1;
 
1072
 
 
1073
err:
 
1074
  if (sock != NULL && sock->fd >= 0)
 
1075
    send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
 
1076
 
 
1077
  return 0;
 
1078
} /* }}} static int check_file_access */
 
1079
 
 
1080
/* when using a base dir, convert relative paths to absolute paths.
 
1081
 * if necessary, modifies the "filename" pointer to point
 
1082
 * to the new path created in "tmp".  "tmp" is provided
 
1083
 * by the caller and sizeof(tmp) must be >= PATH_MAX.
 
1084
 *
 
1085
 * this allows us to optimize for the expected case (absolute path)
 
1086
 * with a no-op.
 
1087
 */
 
1088
static void get_abs_path(char **filename, char *tmp)
 
1089
{
 
1090
  assert(tmp != NULL);
 
1091
  assert(filename != NULL && *filename != NULL);
 
1092
 
 
1093
  if (config_base_dir == NULL || **filename == '/')
 
1094
    return;
 
1095
 
 
1096
  snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
 
1097
  *filename = tmp;
 
1098
} /* }}} static int get_abs_path */
 
1099
 
 
1100
static int flush_file (const char *filename) /* {{{ */
 
1101
{
 
1102
  cache_item_t *ci;
 
1103
 
 
1104
  pthread_mutex_lock (&cache_lock);
 
1105
 
 
1106
  ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
 
1107
  if (ci == NULL)
 
1108
  {
 
1109
    pthread_mutex_unlock (&cache_lock);
 
1110
    return (ENOENT);
 
1111
  }
 
1112
 
 
1113
  if (ci->values_num > 0)
 
1114
  {
 
1115
    /* Enqueue at head */
 
1116
    enqueue_cache_item (ci, HEAD);
 
1117
    pthread_cond_wait(&ci->flushed, &cache_lock);
 
1118
  }
 
1119
 
 
1120
  /* DO NOT DO ANYTHING WITH ci HERE!!  The entry
 
1121
   * may have been purged during our cond_wait() */
 
1122
 
 
1123
  pthread_mutex_unlock(&cache_lock);
 
1124
 
 
1125
  return (0);
 
1126
} /* }}} int flush_file */
 
1127
 
 
1128
static int syntax_error(listen_socket_t *sock, command_t *cmd) /* {{{ */
 
1129
{
 
1130
  char *err = "Syntax error.\n";
 
1131
 
 
1132
  if (cmd && cmd->syntax)
 
1133
    err = cmd->syntax;
 
1134
 
 
1135
  return send_response(sock, RESP_ERR, "Usage: %s", err);
 
1136
} /* }}} static int syntax_error() */
 
1137
 
 
1138
static int handle_request_stats (HANDLER_PROTO) /* {{{ */
 
1139
{
 
1140
  uint64_t copy_queue_length;
 
1141
  uint64_t copy_updates_received;
 
1142
  uint64_t copy_flush_received;
 
1143
  uint64_t copy_updates_written;
 
1144
  uint64_t copy_data_sets_written;
 
1145
  uint64_t copy_journal_bytes;
 
1146
  uint64_t copy_journal_rotate;
 
1147
 
 
1148
  uint64_t tree_nodes_number;
 
1149
  uint64_t tree_depth;
 
1150
 
 
1151
  pthread_mutex_lock (&stats_lock);
 
1152
  copy_queue_length       = stats_queue_length;
 
1153
  copy_updates_received   = stats_updates_received;
 
1154
  copy_flush_received     = stats_flush_received;
 
1155
  copy_updates_written    = stats_updates_written;
 
1156
  copy_data_sets_written  = stats_data_sets_written;
 
1157
  copy_journal_bytes      = stats_journal_bytes;
 
1158
  copy_journal_rotate     = stats_journal_rotate;
 
1159
  pthread_mutex_unlock (&stats_lock);
 
1160
 
 
1161
  pthread_mutex_lock (&cache_lock);
 
1162
  tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
 
1163
  tree_depth        = (uint64_t) g_tree_height (cache_tree);
 
1164
  pthread_mutex_unlock (&cache_lock);
 
1165
 
 
1166
  add_response_info(sock,
 
1167
                    "QueueLength: %"PRIu64"\n", copy_queue_length);
 
1168
  add_response_info(sock,
 
1169
                    "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
 
1170
  add_response_info(sock,
 
1171
                    "FlushesReceived: %"PRIu64"\n", copy_flush_received);
 
1172
  add_response_info(sock,
 
1173
                    "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
 
1174
  add_response_info(sock,
 
1175
                    "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
 
1176
  add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
 
1177
  add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
 
1178
  add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
 
1179
  add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
 
1180
 
 
1181
  send_response(sock, RESP_OK, "Statistics follow\n");
 
1182
 
 
1183
  return (0);
 
1184
} /* }}} int handle_request_stats */
 
1185
 
 
1186
static int handle_request_flush (HANDLER_PROTO) /* {{{ */
 
1187
{
 
1188
  char *file, file_tmp[PATH_MAX];
 
1189
  int status;
 
1190
 
 
1191
  status = buffer_get_field (&buffer, &buffer_size, &file);
 
1192
  if (status != 0)
 
1193
  {
 
1194
    return syntax_error(sock,cmd);
 
1195
  }
 
1196
  else
 
1197
  {
 
1198
    pthread_mutex_lock(&stats_lock);
 
1199
    stats_flush_received++;
 
1200
    pthread_mutex_unlock(&stats_lock);
 
1201
 
 
1202
    get_abs_path(&file, file_tmp);
 
1203
    if (!check_file_access(file, sock)) return 0;
 
1204
 
 
1205
    status = flush_file (file);
 
1206
    if (status == 0)
 
1207
      return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
 
1208
    else if (status == ENOENT)
 
1209
    {
 
1210
      /* no file in our tree; see whether it exists at all */
 
1211
      struct stat statbuf;
 
1212
 
 
1213
      memset(&statbuf, 0, sizeof(statbuf));
 
1214
      if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
 
1215
        return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
 
1216
      else
 
1217
        return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
 
1218
    }
 
1219
    else if (status < 0)
 
1220
      return send_response(sock, RESP_ERR, "Internal error.\n");
 
1221
    else
 
1222
      return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
 
1223
  }
 
1224
 
 
1225
  /* NOTREACHED */
 
1226
  assert(1==0);
 
1227
} /* }}} int handle_request_flush */
 
1228
 
 
1229
static int handle_request_flushall(HANDLER_PROTO) /* {{{ */
 
1230
{
 
1231
  RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
 
1232
 
 
1233
  pthread_mutex_lock(&cache_lock);
 
1234
  flush_old_values(-1);
 
1235
  pthread_mutex_unlock(&cache_lock);
 
1236
 
 
1237
  return send_response(sock, RESP_OK, "Started flush.\n");
 
1238
} /* }}} static int handle_request_flushall */
 
1239
 
 
1240
static int handle_request_pending(HANDLER_PROTO) /* {{{ */
 
1241
{
 
1242
  int status;
 
1243
  char *file, file_tmp[PATH_MAX];
 
1244
  cache_item_t *ci;
 
1245
 
 
1246
  status = buffer_get_field(&buffer, &buffer_size, &file);
 
1247
  if (status != 0)
 
1248
    return syntax_error(sock,cmd);
 
1249
 
 
1250
  get_abs_path(&file, file_tmp);
 
1251
 
 
1252
  pthread_mutex_lock(&cache_lock);
 
1253
  ci = g_tree_lookup(cache_tree, file);
 
1254
  if (ci == NULL)
 
1255
  {
 
1256
    pthread_mutex_unlock(&cache_lock);
 
1257
    return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
 
1258
  }
 
1259
 
 
1260
  for (size_t i=0; i < ci->values_num; i++)
 
1261
    add_response_info(sock, "%s\n", ci->values[i]);
 
1262
 
 
1263
  pthread_mutex_unlock(&cache_lock);
 
1264
  return send_response(sock, RESP_OK, "updates pending\n");
 
1265
} /* }}} static int handle_request_pending */
 
1266
 
 
1267
static int handle_request_forget(HANDLER_PROTO) /* {{{ */
 
1268
{
 
1269
  int status;
 
1270
  gboolean found;
 
1271
  char *file, file_tmp[PATH_MAX];
 
1272
 
 
1273
  status = buffer_get_field(&buffer, &buffer_size, &file);
 
1274
  if (status != 0)
 
1275
    return syntax_error(sock,cmd);
 
1276
 
 
1277
  get_abs_path(&file, file_tmp);
 
1278
  if (!check_file_access(file, sock)) return 0;
 
1279
 
 
1280
  pthread_mutex_lock(&cache_lock);
 
1281
  found = g_tree_remove(cache_tree, file);
 
1282
  pthread_mutex_unlock(&cache_lock);
 
1283
 
 
1284
  if (found == TRUE)
 
1285
  {
 
1286
    if (!JOURNAL_REPLAY(sock))
 
1287
      journal_write("forget", file);
 
1288
 
 
1289
    return send_response(sock, RESP_OK, "Gone!\n");
 
1290
  }
 
1291
  else
 
1292
    return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
 
1293
 
 
1294
  /* NOTREACHED */
 
1295
  assert(1==0);
 
1296
} /* }}} static int handle_request_forget */
 
1297
 
 
1298
static int handle_request_queue (HANDLER_PROTO) /* {{{ */
 
1299
{
 
1300
  cache_item_t *ci;
 
1301
 
 
1302
  pthread_mutex_lock(&cache_lock);
 
1303
 
 
1304
  ci = cache_queue_head;
 
1305
  while (ci != NULL)
 
1306
  {
 
1307
    add_response_info(sock, "%d %s\n", ci->values_num, ci->file);
 
1308
    ci = ci->next;
 
1309
  }
 
1310
 
 
1311
  pthread_mutex_unlock(&cache_lock);
 
1312
 
 
1313
  return send_response(sock, RESP_OK, "in queue.\n");
 
1314
} /* }}} int handle_request_queue */
 
1315
 
 
1316
static int handle_request_update (HANDLER_PROTO) /* {{{ */
 
1317
{
 
1318
  char *file, file_tmp[PATH_MAX];
 
1319
  int values_num = 0;
 
1320
  int status;
 
1321
  char orig_buf[CMD_MAX];
 
1322
 
 
1323
  cache_item_t *ci;
 
1324
 
 
1325
  /* save it for the journal later */
 
1326
  if (!JOURNAL_REPLAY(sock))
 
1327
    strncpy(orig_buf, buffer, buffer_size);
 
1328
 
 
1329
  status = buffer_get_field (&buffer, &buffer_size, &file);
 
1330
  if (status != 0)
 
1331
    return syntax_error(sock,cmd);
 
1332
 
 
1333
  pthread_mutex_lock(&stats_lock);
 
1334
  stats_updates_received++;
 
1335
  pthread_mutex_unlock(&stats_lock);
 
1336
 
 
1337
  get_abs_path(&file, file_tmp);
 
1338
  if (!check_file_access(file, sock)) return 0;
 
1339
 
 
1340
  pthread_mutex_lock (&cache_lock);
 
1341
  ci = g_tree_lookup (cache_tree, file);
 
1342
 
 
1343
  if (ci == NULL) /* {{{ */
 
1344
  {
 
1345
    struct stat statbuf;
 
1346
    cache_item_t *tmp;
 
1347
 
 
1348
    /* don't hold the lock while we setup; stat(2) might block */
 
1349
    pthread_mutex_unlock(&cache_lock);
 
1350
 
 
1351
    memset (&statbuf, 0, sizeof (statbuf));
 
1352
    status = stat (file, &statbuf);
 
1353
    if (status != 0)
 
1354
    {
 
1355
      RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
 
1356
 
 
1357
      status = errno;
 
1358
      if (status == ENOENT)
 
1359
        return send_response(sock, RESP_ERR, "No such file: %s\n", file);
 
1360
      else
 
1361
        return send_response(sock, RESP_ERR,
 
1362
                             "stat failed with error %i.\n", status);
 
1363
    }
 
1364
    if (!S_ISREG (statbuf.st_mode))
 
1365
      return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
 
1366
 
 
1367
    if (access(file, R_OK|W_OK) != 0)
 
1368
      return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
 
1369
                           file, rrd_strerror(errno));
 
1370
 
 
1371
    ci = (cache_item_t *) malloc (sizeof (cache_item_t));
 
1372
    if (ci == NULL)
 
1373
    {
 
1374
      RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
 
1375
 
 
1376
      return send_response(sock, RESP_ERR, "malloc failed.\n");
 
1377
    }
 
1378
    memset (ci, 0, sizeof (cache_item_t));
 
1379
 
 
1380
    ci->file = strdup (file);
 
1381
    if (ci->file == NULL)
 
1382
    {
 
1383
      free (ci);
 
1384
      RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
 
1385
 
 
1386
      return send_response(sock, RESP_ERR, "strdup failed.\n");
 
1387
    }
 
1388
 
 
1389
    wipe_ci_values(ci, now);
 
1390
    ci->flags = CI_FLAGS_IN_TREE;
 
1391
    pthread_cond_init(&ci->flushed, NULL);
 
1392
 
 
1393
    pthread_mutex_lock(&cache_lock);
 
1394
 
 
1395
    /* another UPDATE might have added this entry in the meantime */
 
1396
    tmp = g_tree_lookup (cache_tree, file);
 
1397
    if (tmp == NULL)
 
1398
      g_tree_replace (cache_tree, (void *) ci->file, (void *) ci);
 
1399
    else
 
1400
    {
 
1401
      free_cache_item (ci);
 
1402
      ci = tmp;
 
1403
    }
 
1404
 
 
1405
    /* state may have changed while we were unlocked */
 
1406
    if (state == SHUTDOWN)
 
1407
      return -1;
 
1408
  } /* }}} */
 
1409
  assert (ci != NULL);
 
1410
 
 
1411
  /* don't re-write updates in replay mode */
 
1412
  if (!JOURNAL_REPLAY(sock))
 
1413
    journal_write("update", orig_buf);
 
1414
 
 
1415
  while (buffer_size > 0)
 
1416
  {
 
1417
    char *value;
 
1418
    time_t stamp;
 
1419
    char *eostamp;
 
1420
 
 
1421
    status = buffer_get_field (&buffer, &buffer_size, &value);
 
1422
    if (status != 0)
 
1423
    {
 
1424
      RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
 
1425
      break;
 
1426
    }
 
1427
 
 
1428
    /* make sure update time is always moving forward */
 
1429
    stamp = strtol(value, &eostamp, 10);
 
1430
    if (eostamp == value || eostamp == NULL || *eostamp != ':')
 
1431
    {
 
1432
      pthread_mutex_unlock(&cache_lock);
 
1433
      return send_response(sock, RESP_ERR,
 
1434
                           "Cannot find timestamp in '%s'!\n", value);
 
1435
    }
 
1436
    else if (stamp <= ci->last_update_stamp)
 
1437
    {
 
1438
      pthread_mutex_unlock(&cache_lock);
 
1439
      return send_response(sock, RESP_ERR,
 
1440
                           "illegal attempt to update using time %ld when last"
 
1441
                           " update time is %ld (minimum one second step)\n",
 
1442
                           stamp, ci->last_update_stamp);
 
1443
    }
 
1444
    else
 
1445
      ci->last_update_stamp = stamp;
 
1446
 
 
1447
    if (!rrd_add_strdup(&ci->values, &ci->values_num, value))
 
1448
    {
 
1449
      RRDD_LOG (LOG_ERR, "handle_request_update: rrd_add_strdup failed.");
 
1450
      continue;
 
1451
    }
 
1452
 
 
1453
    values_num++;
 
1454
  }
 
1455
 
 
1456
  if (((now - ci->last_flush_time) >= config_write_interval)
 
1457
      && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
 
1458
      && (ci->values_num > 0))
 
1459
  {
 
1460
    enqueue_cache_item (ci, TAIL);
 
1461
  }
 
1462
 
 
1463
  pthread_mutex_unlock (&cache_lock);
 
1464
 
 
1465
  if (values_num < 1)
 
1466
    return send_response(sock, RESP_ERR, "No values updated.\n");
 
1467
  else
 
1468
    return send_response(sock, RESP_OK,
 
1469
                         "errors, enqueued %i value(s).\n", values_num);
 
1470
 
 
1471
  /* NOTREACHED */
 
1472
  assert(1==0);
 
1473
 
 
1474
} /* }}} int handle_request_update */
 
1475
 
 
1476
/* we came across a "WROTE" entry during journal replay.
 
1477
 * throw away any values that we have accumulated for this file
 
1478
 */
 
1479
static int handle_request_wrote (HANDLER_PROTO) /* {{{ */
 
1480
{
 
1481
  cache_item_t *ci;
 
1482
  const char *file = buffer;
 
1483
 
 
1484
  pthread_mutex_lock(&cache_lock);
 
1485
 
 
1486
  ci = g_tree_lookup(cache_tree, file);
 
1487
  if (ci == NULL)
 
1488
  {
 
1489
    pthread_mutex_unlock(&cache_lock);
 
1490
    return (0);
 
1491
  }
 
1492
 
 
1493
  if (ci->values)
 
1494
    rrd_free_ptrs((void ***) &ci->values, &ci->values_num);
 
1495
 
 
1496
  wipe_ci_values(ci, now);
 
1497
  remove_from_queue(ci);
 
1498
 
 
1499
  pthread_mutex_unlock(&cache_lock);
 
1500
  return (0);
 
1501
} /* }}} int handle_request_wrote */
 
1502
 
 
1503
/* start "BATCH" processing */
 
1504
static int batch_start (HANDLER_PROTO) /* {{{ */
 
1505
{
 
1506
  int status;
 
1507
  if (sock->batch_start)
 
1508
    return send_response(sock, RESP_ERR, "Already in BATCH\n");
 
1509
 
 
1510
  status = send_response(sock, RESP_OK,
 
1511
                         "Go ahead.  End with dot '.' on its own line.\n");
 
1512
  sock->batch_start = time(NULL);
 
1513
  sock->batch_cmd = 0;
 
1514
 
 
1515
  return status;
 
1516
} /* }}} static int batch_start */
 
1517
 
 
1518
/* finish "BATCH" processing and return results to the client */
 
1519
static int batch_done (HANDLER_PROTO) /* {{{ */
 
1520
{
 
1521
  assert(sock->batch_start);
 
1522
  sock->batch_start = 0;
 
1523
  sock->batch_cmd  = 0;
 
1524
  return send_response(sock, RESP_OK, "errors\n");
 
1525
} /* }}} static int batch_done */
 
1526
 
 
1527
static int handle_request_quit (HANDLER_PROTO) /* {{{ */
 
1528
{
 
1529
  return -1;
 
1530
} /* }}} static int handle_request_quit */
 
1531
 
 
1532
static command_t list_of_commands[] = { /* {{{ */
 
1533
  {
 
1534
    "UPDATE",
 
1535
    handle_request_update,
 
1536
    CMD_CONTEXT_ANY,
 
1537
    "UPDATE <filename> <values> [<values> ...]\n"
 
1538
    ,
 
1539
    "Adds the given file to the internal cache if it is not yet known and\n"
 
1540
    "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
 
1541
    "for details.\n"
 
1542
    "\n"
 
1543
    "Each <values> has the following form:\n"
 
1544
    "  <values> = <time>:<value>[:<value>[...]]\n"
 
1545
    "See the rrdupdate(1) manpage for details.\n"
 
1546
  },
 
1547
  {
 
1548
    "WROTE",
 
1549
    handle_request_wrote,
 
1550
    CMD_CONTEXT_JOURNAL,
 
1551
    NULL,
 
1552
    NULL
 
1553
  },
 
1554
  {
 
1555
    "FLUSH",
 
1556
    handle_request_flush,
 
1557
    CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
 
1558
    "FLUSH <filename>\n"
 
1559
    ,
 
1560
    "Adds the given filename to the head of the update queue and returns\n"
 
1561
    "after it has been dequeued.\n"
 
1562
  },
 
1563
  {
 
1564
    "FLUSHALL",
 
1565
    handle_request_flushall,
 
1566
    CMD_CONTEXT_CLIENT,
 
1567
    "FLUSHALL\n"
 
1568
    ,
 
1569
    "Triggers writing of all pending updates.  Returns immediately.\n"
 
1570
  },
 
1571
  {
 
1572
    "PENDING",
 
1573
    handle_request_pending,
 
1574
    CMD_CONTEXT_CLIENT,
 
1575
    "PENDING <filename>\n"
 
1576
    ,
 
1577
    "Shows any 'pending' updates for a file, in order.\n"
 
1578
    "The updates shown have not yet been written to the underlying RRD file.\n"
 
1579
  },
 
1580
  {
 
1581
    "FORGET",
 
1582
    handle_request_forget,
 
1583
    CMD_CONTEXT_ANY,
 
1584
    "FORGET <filename>\n"
 
1585
    ,
 
1586
    "Removes the file completely from the cache.\n"
 
1587
    "Any pending updates for the file will be lost.\n"
 
1588
  },
 
1589
  {
 
1590
    "QUEUE",
 
1591
    handle_request_queue,
 
1592
    CMD_CONTEXT_CLIENT,
 
1593
    "QUEUE\n"
 
1594
    ,
 
1595
        "Shows all files in the output queue.\n"
 
1596
    "The output is zero or more lines in the following format:\n"
 
1597
    "(where <num_vals> is the number of values to be written)\n"
 
1598
    "\n"
 
1599
    "<num_vals> <filename>\n"
 
1600
  },
 
1601
  {
 
1602
    "STATS",
 
1603
    handle_request_stats,
 
1604
    CMD_CONTEXT_CLIENT,
 
1605
    "STATS\n"
 
1606
    ,
 
1607
    "Returns some performance counters, see the rrdcached(1) manpage for\n"
 
1608
    "a description of the values.\n"
 
1609
  },
 
1610
  {
 
1611
    "HELP",
 
1612
    handle_request_help,
 
1613
    CMD_CONTEXT_CLIENT,
 
1614
    "HELP [<command>]\n",
 
1615
    NULL, /* special! */
 
1616
  },
 
1617
  {
 
1618
    "BATCH",
 
1619
    batch_start,
 
1620
    CMD_CONTEXT_CLIENT,
 
1621
    "BATCH\n"
 
1622
    ,
 
1623
    "The 'BATCH' command permits the client to initiate a bulk load\n"
 
1624
    "   of commands to rrdcached.\n"
 
1625
    "\n"
 
1626
    "Usage:\n"
 
1627
    "\n"
 
1628
    "    client: BATCH\n"
 
1629
    "    server: 0 Go ahead.  End with dot '.' on its own line.\n"
 
1630
    "    client: command #1\n"
 
1631
    "    client: command #2\n"
 
1632
    "    client: ... and so on\n"
 
1633
    "    client: .\n"
 
1634
    "    server: 2 errors\n"
 
1635
    "    server: 7 message for command #7\n"
 
1636
    "    server: 9 message for command #9\n"
 
1637
    "\n"
 
1638
    "For more information, consult the rrdcached(1) documentation.\n"
 
1639
  },
 
1640
  {
 
1641
    ".",   /* BATCH terminator */
 
1642
    batch_done,
 
1643
    CMD_CONTEXT_BATCH,
 
1644
    NULL,
 
1645
    NULL
 
1646
  },
 
1647
  {
 
1648
    "QUIT",
 
1649
    handle_request_quit,
 
1650
    CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
 
1651
    "QUIT\n"
 
1652
    ,
 
1653
    "Disconnect from rrdcached.\n"
 
1654
  }
 
1655
}; /* }}} command_t list_of_commands[] */
 
1656
static size_t list_of_commands_len = sizeof (list_of_commands)
 
1657
  / sizeof (list_of_commands[0]);
 
1658
 
 
1659
static command_t *find_command(char *cmd)
 
1660
{
 
1661
  size_t i;
 
1662
 
 
1663
  for (i = 0; i < list_of_commands_len; i++)
 
1664
    if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
 
1665
      return (&list_of_commands[i]);
 
1666
  return NULL;
 
1667
}
 
1668
 
 
1669
/* We currently use the index in the `list_of_commands' array as a bit position
 
1670
 * in `listen_socket_t.permissions'. This member schould NEVER be accessed from
 
1671
 * outside these functions so that switching to a more elegant storage method
 
1672
 * is easily possible. */
 
1673
static ssize_t find_command_index (const char *cmd) /* {{{ */
 
1674
{
 
1675
  size_t i;
 
1676
 
 
1677
  for (i = 0; i < list_of_commands_len; i++)
 
1678
    if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
 
1679
      return ((ssize_t) i);
 
1680
  return (-1);
 
1681
} /* }}} ssize_t find_command_index */
 
1682
 
 
1683
static int socket_permission_check (listen_socket_t *sock, /* {{{ */
 
1684
    const char *cmd)
 
1685
{
 
1686
  ssize_t i;
 
1687
 
 
1688
  if (JOURNAL_REPLAY(sock))
 
1689
    return (1);
 
1690
 
 
1691
  if (cmd == NULL)
 
1692
    return (-1);
 
1693
 
 
1694
  if ((strcasecmp ("QUIT", cmd) == 0)
 
1695
      || (strcasecmp ("HELP", cmd) == 0))
 
1696
    return (1);
 
1697
  else if (strcmp (".", cmd) == 0)
 
1698
    cmd = "BATCH";
 
1699
 
 
1700
  i = find_command_index (cmd);
 
1701
  if (i < 0)
 
1702
    return (-1);
 
1703
  assert (i < 32);
 
1704
 
 
1705
  if ((sock->permissions & (1 << i)) != 0)
 
1706
    return (1);
 
1707
  return (0);
 
1708
} /* }}} int socket_permission_check */
 
1709
 
 
1710
static int socket_permission_add (listen_socket_t *sock, /* {{{ */
 
1711
    const char *cmd)
 
1712
{
 
1713
  ssize_t i;
 
1714
 
 
1715
  i = find_command_index (cmd);
 
1716
  if (i < 0)
 
1717
    return (-1);
 
1718
  assert (i < 32);
 
1719
 
 
1720
  sock->permissions |= (1 << i);
 
1721
  return (0);
 
1722
} /* }}} int socket_permission_add */
 
1723
 
 
1724
/* check whether commands are received in the expected context */
 
1725
static int command_check_context(listen_socket_t *sock, command_t *cmd)
 
1726
{
 
1727
  if (JOURNAL_REPLAY(sock))
 
1728
    return (cmd->context & CMD_CONTEXT_JOURNAL);
 
1729
  else if (sock->batch_start)
 
1730
    return (cmd->context & CMD_CONTEXT_BATCH);
 
1731
  else
 
1732
    return (cmd->context & CMD_CONTEXT_CLIENT);
 
1733
 
 
1734
  /* NOTREACHED */
 
1735
  assert(1==0);
 
1736
}
 
1737
 
 
1738
static int handle_request_help (HANDLER_PROTO) /* {{{ */
 
1739
{
 
1740
  int status;
 
1741
  char *cmd_str;
 
1742
  char *resp_txt;
 
1743
  command_t *help = NULL;
 
1744
 
 
1745
  status = buffer_get_field (&buffer, &buffer_size, &cmd_str);
 
1746
  if (status == 0)
 
1747
    help = find_command(cmd_str);
 
1748
 
 
1749
  if (help && (help->syntax || help->help))
 
1750
  {
 
1751
    char tmp[CMD_MAX];
 
1752
 
 
1753
    snprintf(tmp, sizeof(tmp)-1, "Help for %s\n", help->cmd);
 
1754
    resp_txt = tmp;
 
1755
 
 
1756
    if (help->syntax)
 
1757
      add_response_info(sock, "Usage: %s\n", help->syntax);
 
1758
 
 
1759
    if (help->help)
 
1760
      add_response_info(sock, "%s\n", help->help);
 
1761
  }
 
1762
  else
 
1763
  {
 
1764
    size_t i;
 
1765
 
 
1766
    resp_txt = "Command overview\n";
 
1767
 
 
1768
    for (i = 0; i < list_of_commands_len; i++)
 
1769
    {
 
1770
      if (list_of_commands[i].syntax == NULL)
 
1771
        continue;
 
1772
      add_response_info (sock, "%s", list_of_commands[i].syntax);
 
1773
    }
 
1774
  }
 
1775
 
 
1776
  return send_response(sock, RESP_OK, resp_txt);
 
1777
} /* }}} int handle_request_help */
 
1778
 
 
1779
static int handle_request (DISPATCH_PROTO) /* {{{ */
 
1780
{
 
1781
  char *buffer_ptr = buffer;
 
1782
  char *cmd_str = NULL;
 
1783
  command_t *cmd = NULL;
 
1784
  int status;
 
1785
 
 
1786
  assert (buffer[buffer_size - 1] == '\0');
 
1787
 
 
1788
  status = buffer_get_field (&buffer_ptr, &buffer_size, &cmd_str);
 
1789
  if (status != 0)
 
1790
  {
 
1791
    RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
 
1792
    return (-1);
 
1793
  }
 
1794
 
 
1795
  if (sock != NULL && sock->batch_start)
 
1796
    sock->batch_cmd++;
 
1797
 
 
1798
  cmd = find_command(cmd_str);
 
1799
  if (!cmd)
 
1800
    return send_response(sock, RESP_ERR, "Unknown command: %s\n", cmd_str);
 
1801
 
 
1802
  if (!socket_permission_check (sock, cmd->cmd))
 
1803
    return send_response(sock, RESP_ERR, "Permission denied.\n");
 
1804
 
 
1805
  if (!command_check_context(sock, cmd))
 
1806
    return send_response(sock, RESP_ERR, "Can't use '%s' here.\n", cmd_str);
 
1807
 
 
1808
  return cmd->handler(cmd, sock, now, buffer_ptr, buffer_size);
 
1809
} /* }}} int handle_request */
 
1810
 
 
1811
static void journal_set_free (journal_set *js) /* {{{ */
 
1812
{
 
1813
  if (js == NULL)
 
1814
    return;
 
1815
 
 
1816
  rrd_free_ptrs((void ***) &js->files, &js->files_num);
 
1817
 
 
1818
  free(js);
 
1819
} /* }}} journal_set_free */
 
1820
 
 
1821
static void journal_set_remove (journal_set *js) /* {{{ */
 
1822
{
 
1823
  if (js == NULL)
 
1824
    return;
 
1825
 
 
1826
  for (uint i=0; i < js->files_num; i++)
 
1827
  {
 
1828
    RRDD_LOG(LOG_DEBUG, "removing old journal %s", js->files[i]);
 
1829
    unlink(js->files[i]);
 
1830
  }
 
1831
} /* }}} journal_set_remove */
 
1832
 
 
1833
/* close current journal file handle.
 
1834
 * MUST hold journal_lock before calling */
 
1835
static void journal_close(void) /* {{{ */
 
1836
{
 
1837
  if (journal_fh != NULL)
 
1838
  {
 
1839
    if (fclose(journal_fh) != 0)
 
1840
      RRDD_LOG(LOG_ERR, "cannot close journal: %s", rrd_strerror(errno));
 
1841
  }
 
1842
 
 
1843
  journal_fh = NULL;
 
1844
  journal_size = 0;
 
1845
} /* }}} journal_close */
 
1846
 
 
1847
/* MUST hold journal_lock before calling */
 
1848
static void journal_new_file(void) /* {{{ */
 
1849
{
 
1850
  struct timeval now;
 
1851
  int  new_fd;
 
1852
  char new_file[PATH_MAX + 1];
 
1853
 
 
1854
  assert(journal_dir != NULL);
 
1855
  assert(journal_cur != NULL);
 
1856
 
 
1857
  journal_close();
 
1858
 
 
1859
  gettimeofday(&now, NULL);
 
1860
  /* this format assures that the files sort in strcmp() order */
 
1861
  snprintf(new_file, PATH_MAX, "%s/%s.%010d.%06d",
 
1862
           journal_dir, JOURNAL_BASE, (int)now.tv_sec, (int)now.tv_usec);
 
1863
 
 
1864
  new_fd = open(new_file, O_WRONLY|O_CREAT|O_APPEND,
 
1865
                S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
 
1866
  if (new_fd < 0)
 
1867
    goto error;
 
1868
 
 
1869
  journal_fh = fdopen(new_fd, "a");
 
1870
  if (journal_fh == NULL)
 
1871
    goto error;
 
1872
 
 
1873
  journal_size = ftell(journal_fh);
 
1874
  RRDD_LOG(LOG_DEBUG, "started new journal %s", new_file);
 
1875
 
 
1876
  /* record the file in the journal set */
 
1877
  rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, new_file);
 
1878
 
 
1879
  return;
 
1880
 
 
1881
error:
 
1882
  RRDD_LOG(LOG_CRIT,
 
1883
           "JOURNALING DISABLED: Error while trying to create %s : %s",
 
1884
           new_file, rrd_strerror(errno));
 
1885
  RRDD_LOG(LOG_CRIT,
 
1886
           "JOURNALING DISABLED: All values will be flushed at shutdown");
 
1887
 
 
1888
  close(new_fd);
 
1889
  config_flush_at_shutdown = 1;
 
1890
 
 
1891
} /* }}} journal_new_file */
 
1892
 
 
1893
/* MUST NOT hold journal_lock before calling this */
 
1894
static void journal_rotate(void) /* {{{ */
 
1895
{
 
1896
  journal_set *old_js = NULL;
 
1897
 
 
1898
  if (journal_dir == NULL)
 
1899
    return;
 
1900
 
 
1901
  RRDD_LOG(LOG_DEBUG, "rotating journals");
 
1902
 
 
1903
  pthread_mutex_lock(&stats_lock);
 
1904
  ++stats_journal_rotate;
 
1905
  pthread_mutex_unlock(&stats_lock);
 
1906
 
 
1907
  pthread_mutex_lock(&journal_lock);
 
1908
 
 
1909
  journal_close();
 
1910
 
 
1911
  /* rotate the journal sets */
 
1912
  old_js = journal_old;
 
1913
  journal_old = journal_cur;
 
1914
  journal_cur = calloc(1, sizeof(journal_set));
 
1915
 
 
1916
  if (journal_cur != NULL)
 
1917
    journal_new_file();
 
1918
  else
 
1919
    RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
 
1920
 
 
1921
  pthread_mutex_unlock(&journal_lock);
 
1922
 
 
1923
  journal_set_remove(old_js);
 
1924
  journal_set_free  (old_js);
 
1925
 
 
1926
} /* }}} static void journal_rotate */
 
1927
 
 
1928
/* MUST hold journal_lock when calling */
 
1929
static void journal_done(void) /* {{{ */
 
1930
{
 
1931
  if (journal_cur == NULL)
 
1932
    return;
 
1933
 
 
1934
  journal_close();
 
1935
 
 
1936
  if (config_flush_at_shutdown)
 
1937
  {
 
1938
    RRDD_LOG(LOG_INFO, "removing journals");
 
1939
    journal_set_remove(journal_old);
 
1940
    journal_set_remove(journal_cur);
 
1941
  }
 
1942
  else
 
1943
  {
 
1944
    RRDD_LOG(LOG_INFO, "expedited shutdown; "
 
1945
             "journals will be used at next startup");
 
1946
  }
 
1947
 
 
1948
  journal_set_free(journal_cur);
 
1949
  journal_set_free(journal_old);
 
1950
  free(journal_dir);
 
1951
 
 
1952
} /* }}} static void journal_done */
 
1953
 
 
1954
static int journal_write(char *cmd, char *args) /* {{{ */
 
1955
{
 
1956
  int chars;
 
1957
 
 
1958
  if (journal_fh == NULL)
 
1959
    return 0;
 
1960
 
 
1961
  pthread_mutex_lock(&journal_lock);
 
1962
  chars = fprintf(journal_fh, "%s %s\n", cmd, args);
 
1963
  journal_size += chars;
 
1964
 
 
1965
  if (journal_size > JOURNAL_MAX)
 
1966
    journal_new_file();
 
1967
 
 
1968
  pthread_mutex_unlock(&journal_lock);
 
1969
 
 
1970
  if (chars > 0)
 
1971
  {
 
1972
    pthread_mutex_lock(&stats_lock);
 
1973
    stats_journal_bytes += chars;
 
1974
    pthread_mutex_unlock(&stats_lock);
 
1975
  }
 
1976
 
 
1977
  return chars;
 
1978
} /* }}} static int journal_write */
 
1979
 
 
1980
static int journal_replay (const char *file) /* {{{ */
 
1981
{
 
1982
  FILE *fh;
 
1983
  int entry_cnt = 0;
 
1984
  int fail_cnt = 0;
 
1985
  uint64_t line = 0;
 
1986
  char entry[CMD_MAX];
 
1987
  time_t now;
 
1988
 
 
1989
  if (file == NULL) return 0;
 
1990
 
 
1991
  {
 
1992
    char *reason = "unknown error";
 
1993
    int status = 0;
 
1994
    struct stat statbuf;
 
1995
 
 
1996
    memset(&statbuf, 0, sizeof(statbuf));
 
1997
    if (stat(file, &statbuf) != 0)
 
1998
    {
 
1999
      reason = "stat error";
 
2000
      status = errno;
 
2001
    }
 
2002
    else if (!S_ISREG(statbuf.st_mode))
 
2003
    {
 
2004
      reason = "not a regular file";
 
2005
      status = EPERM;
 
2006
    }
 
2007
    if (statbuf.st_uid != daemon_uid)
 
2008
    {
 
2009
      reason = "not owned by daemon user";
 
2010
      status = EACCES;
 
2011
    }
 
2012
    if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
 
2013
    {
 
2014
      reason = "must not be user/group writable";
 
2015
      status = EACCES;
 
2016
    }
 
2017
 
 
2018
    if (status != 0)
 
2019
    {
 
2020
      RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
 
2021
               file, rrd_strerror(status), reason);
 
2022
      return 0;
 
2023
    }
 
2024
  }
 
2025
 
 
2026
  fh = fopen(file, "r");
 
2027
  if (fh == NULL)
 
2028
  {
 
2029
    if (errno != ENOENT)
 
2030
      RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
 
2031
               file, rrd_strerror(errno));
 
2032
    return 0;
 
2033
  }
 
2034
  else
 
2035
    RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
 
2036
 
 
2037
  now = time(NULL);
 
2038
 
 
2039
  while(!feof(fh))
 
2040
  {
 
2041
    size_t entry_len;
 
2042
 
 
2043
    ++line;
 
2044
    if (fgets(entry, sizeof(entry), fh) == NULL)
 
2045
      break;
 
2046
    entry_len = strlen(entry);
 
2047
 
 
2048
    /* check \n termination in case journal writing crashed mid-line */
 
2049
    if (entry_len == 0)
 
2050
      continue;
 
2051
    else if (entry[entry_len - 1] != '\n')
 
2052
    {
 
2053
      RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
 
2054
      ++fail_cnt;
 
2055
      continue;
 
2056
    }
 
2057
 
 
2058
    entry[entry_len - 1] = '\0';
 
2059
 
 
2060
    if (handle_request(NULL, now, entry, entry_len) == 0)
 
2061
      ++entry_cnt;
 
2062
    else
 
2063
      ++fail_cnt;
 
2064
  }
 
2065
 
 
2066
  fclose(fh);
 
2067
 
 
2068
  RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
 
2069
           entry_cnt, fail_cnt);
 
2070
 
 
2071
  return entry_cnt > 0 ? 1 : 0;
 
2072
} /* }}} static int journal_replay */
 
2073
 
 
2074
static int journal_sort(const void *v1, const void *v2)
 
2075
{
 
2076
  char **jn1 = (char **) v1;
 
2077
  char **jn2 = (char **) v2;
 
2078
 
 
2079
  return strcmp(*jn1,*jn2);
 
2080
}
 
2081
 
 
2082
static void journal_init(void) /* {{{ */
 
2083
{
 
2084
  int had_journal = 0;
 
2085
  DIR *dir;
 
2086
  struct dirent *dent;
 
2087
  char path[PATH_MAX+1];
 
2088
 
 
2089
  if (journal_dir == NULL) return;
 
2090
 
 
2091
  pthread_mutex_lock(&journal_lock);
 
2092
 
 
2093
  journal_cur = calloc(1, sizeof(journal_set));
 
2094
  if (journal_cur == NULL)
 
2095
  {
 
2096
    RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
 
2097
    return;
 
2098
  }
 
2099
 
 
2100
  RRDD_LOG(LOG_INFO, "checking for journal files");
 
2101
 
 
2102
  /* Handle old journal files during transition.  This gives them the
 
2103
   * correct sort order.  TODO: remove after first release
 
2104
   */
 
2105
  {
 
2106
    char old_path[PATH_MAX+1];
 
2107
    snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".old" );
 
2108
    snprintf(path,     PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0000");
 
2109
    rename(old_path, path);
 
2110
 
 
2111
    snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE        );
 
2112
    snprintf(path,     PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0001");
 
2113
    rename(old_path, path);
 
2114
  }
 
2115
 
 
2116
  dir = opendir(journal_dir);
 
2117
  while ((dent = readdir(dir)) != NULL)
 
2118
  {
 
2119
    /* looks like a journal file? */
 
2120
    if (strncmp(dent->d_name, JOURNAL_BASE, strlen(JOURNAL_BASE)))
 
2121
      continue;
 
2122
 
 
2123
    snprintf(path, PATH_MAX, "%s/%s", journal_dir, dent->d_name);
 
2124
 
 
2125
    if (!rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, path))
 
2126
    {
 
2127
      RRDD_LOG(LOG_CRIT, "journal_init: cannot add journal file %s!",
 
2128
               dent->d_name);
 
2129
      break;
 
2130
    }
 
2131
  }
 
2132
  closedir(dir);
 
2133
 
 
2134
  qsort(journal_cur->files, journal_cur->files_num,
 
2135
        sizeof(journal_cur->files[0]), journal_sort);
 
2136
 
 
2137
  for (uint i=0; i < journal_cur->files_num; i++)
 
2138
    had_journal += journal_replay(journal_cur->files[i]);
 
2139
 
 
2140
  journal_new_file();
 
2141
 
 
2142
  /* it must have been a crash.  start a flush */
 
2143
  if (had_journal && config_flush_at_shutdown)
 
2144
    flush_old_values(-1);
 
2145
 
 
2146
  pthread_mutex_unlock(&journal_lock);
 
2147
 
 
2148
  RRDD_LOG(LOG_INFO, "journal processing complete");
 
2149
 
 
2150
} /* }}} static void journal_init */
 
2151
 
 
2152
static void free_listen_socket(listen_socket_t *sock) /* {{{ */
 
2153
{
 
2154
  assert(sock != NULL);
 
2155
 
 
2156
  free(sock->rbuf);  sock->rbuf = NULL;
 
2157
  free(sock->wbuf);  sock->wbuf = NULL;
 
2158
  free(sock);
 
2159
} /* }}} void free_listen_socket */
 
2160
 
 
2161
static void close_connection(listen_socket_t *sock) /* {{{ */
 
2162
{
 
2163
  if (sock->fd >= 0)
 
2164
  {
 
2165
    close(sock->fd);
 
2166
    sock->fd = -1;
 
2167
  }
 
2168
 
 
2169
  free_listen_socket(sock);
 
2170
 
 
2171
} /* }}} void close_connection */
 
2172
 
 
2173
static void *connection_thread_main (void *args) /* {{{ */
 
2174
{
 
2175
  listen_socket_t *sock;
 
2176
  int fd;
 
2177
 
 
2178
  sock = (listen_socket_t *) args;
 
2179
  fd = sock->fd;
 
2180
 
 
2181
  /* init read buffers */
 
2182
  sock->next_read = sock->next_cmd = 0;
 
2183
  sock->rbuf = malloc(RBUF_SIZE);
 
2184
  if (sock->rbuf == NULL)
 
2185
  {
 
2186
    RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
 
2187
    close_connection(sock);
 
2188
    return NULL;
 
2189
  }
 
2190
 
 
2191
  pthread_mutex_lock (&connection_threads_lock);
 
2192
  connection_threads_num++;
 
2193
  pthread_mutex_unlock (&connection_threads_lock);
 
2194
 
 
2195
  while (state == RUNNING)
 
2196
  {
 
2197
    char *cmd;
 
2198
    ssize_t cmd_len;
 
2199
    ssize_t rbytes;
 
2200
    time_t now;
 
2201
 
 
2202
    struct pollfd pollfd;
 
2203
    int status;
 
2204
 
 
2205
    pollfd.fd = fd;
 
2206
    pollfd.events = POLLIN | POLLPRI;
 
2207
    pollfd.revents = 0;
 
2208
 
 
2209
    status = poll (&pollfd, 1, /* timeout = */ 500);
 
2210
    if (state != RUNNING)
 
2211
      break;
 
2212
    else if (status == 0) /* timeout */
 
2213
      continue;
 
2214
    else if (status < 0) /* error */
 
2215
    {
 
2216
      status = errno;
 
2217
      if (status != EINTR)
 
2218
        RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
 
2219
      continue;
 
2220
    }
 
2221
 
 
2222
    if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
 
2223
      break;
 
2224
    else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
 
2225
    {
 
2226
      RRDD_LOG (LOG_WARNING, "connection_thread_main: "
 
2227
          "poll(2) returned something unexpected: %#04hx",
 
2228
          pollfd.revents);
 
2229
      break;
 
2230
    }
 
2231
 
 
2232
    rbytes = read(fd, sock->rbuf + sock->next_read,
 
2233
                  RBUF_SIZE - sock->next_read);
 
2234
    if (rbytes < 0)
 
2235
    {
 
2236
      RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
 
2237
      break;
 
2238
    }
 
2239
    else if (rbytes == 0)
 
2240
      break; /* eof */
 
2241
 
 
2242
    sock->next_read += rbytes;
 
2243
 
 
2244
    if (sock->batch_start)
 
2245
      now = sock->batch_start;
 
2246
    else
 
2247
      now = time(NULL);
 
2248
 
 
2249
    while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
 
2250
    {
 
2251
      status = handle_request (sock, now, cmd, cmd_len+1);
 
2252
      if (status != 0)
 
2253
        goto out_close;
 
2254
    }
 
2255
  }
 
2256
 
 
2257
out_close:
 
2258
  close_connection(sock);
 
2259
 
 
2260
  /* Remove this thread from the connection threads list */
 
2261
  pthread_mutex_lock (&connection_threads_lock);
 
2262
  connection_threads_num--;
 
2263
  if (connection_threads_num <= 0)
 
2264
    pthread_cond_broadcast(&connection_threads_done);
 
2265
  pthread_mutex_unlock (&connection_threads_lock);
 
2266
 
 
2267
  return (NULL);
 
2268
} /* }}} void *connection_thread_main */
 
2269
 
 
2270
static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
 
2271
{
 
2272
  int fd;
 
2273
  struct sockaddr_un sa;
 
2274
  listen_socket_t *temp;
 
2275
  int status;
 
2276
  const char *path;
 
2277
  char *path_copy, *dir;
 
2278
 
 
2279
  path = sock->addr;
 
2280
  if (strncmp(path, "unix:", strlen("unix:")) == 0)
 
2281
    path += strlen("unix:");
 
2282
 
 
2283
  /* dirname may modify its argument */
 
2284
  path_copy = strdup(path);
 
2285
  if (path_copy == NULL)
 
2286
  {
 
2287
    fprintf(stderr, "rrdcached: strdup(): %s\n",
 
2288
        rrd_strerror(errno));
 
2289
    return (-1);
 
2290
  }
 
2291
 
 
2292
  dir = dirname(path_copy);
 
2293
  if (rrd_mkdir_p(dir, 0777) != 0)
 
2294
  {
 
2295
    fprintf(stderr, "Failed to create socket directory '%s': %s\n",
 
2296
        dir, rrd_strerror(errno));
 
2297
    return (-1);
 
2298
  }
 
2299
 
 
2300
  free(path_copy);
 
2301
 
 
2302
  temp = (listen_socket_t *) rrd_realloc (listen_fds,
 
2303
      sizeof (listen_fds[0]) * (listen_fds_num + 1));
 
2304
  if (temp == NULL)
 
2305
  {
 
2306
    fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
 
2307
    return (-1);
 
2308
  }
 
2309
  listen_fds = temp;
 
2310
  memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
 
2311
 
 
2312
  fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
 
2313
  if (fd < 0)
 
2314
  {
 
2315
    fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
 
2316
             rrd_strerror(errno));
 
2317
    return (-1);
 
2318
  }
 
2319
 
 
2320
  memset (&sa, 0, sizeof (sa));
 
2321
  sa.sun_family = AF_UNIX;
 
2322
  strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
 
2323
 
 
2324
  /* if we've gotten this far, we own the pid file.  any daemon started
 
2325
   * with the same args must not be alive.  therefore, ensure that we can
 
2326
   * create the socket...
 
2327
   */
 
2328
  unlink(path);
 
2329
 
 
2330
  status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
 
2331
  if (status != 0)
 
2332
  {
 
2333
    fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
 
2334
             path, rrd_strerror(errno));
 
2335
    close (fd);
 
2336
    return (-1);
 
2337
  }
 
2338
 
 
2339
  /* tweak the sockets group ownership */
 
2340
  if (sock->socket_group != (gid_t)-1)
 
2341
  {
 
2342
    if ( (chown(path, getuid(), sock->socket_group) != 0) ||
 
2343
         (chmod(path, (S_IRUSR|S_IWUSR|S_IXUSR | S_IRGRP|S_IWGRP)) != 0) )
 
2344
    {
 
2345
      fprintf(stderr, "rrdcached: failed to set socket group permissions (%s)\n", strerror(errno));
 
2346
    }
 
2347
  }
 
2348
 
 
2349
  if (sock->socket_permissions != (mode_t)-1)
 
2350
  {
 
2351
    if (chmod(path, sock->socket_permissions) != 0)
 
2352
      fprintf(stderr, "rrdcached: failed to set socket file permissions (%o): %s\n",
 
2353
          (unsigned int)sock->socket_permissions, strerror(errno));
 
2354
  }
 
2355
 
 
2356
  status = listen (fd, /* backlog = */ 10);
 
2357
  if (status != 0)
 
2358
  {
 
2359
    fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
 
2360
             path, rrd_strerror(errno));
 
2361
    close (fd);
 
2362
    unlink (path);
 
2363
    return (-1);
 
2364
  }
 
2365
 
 
2366
  listen_fds[listen_fds_num].fd = fd;
 
2367
  listen_fds[listen_fds_num].family = PF_UNIX;
 
2368
  strncpy(listen_fds[listen_fds_num].addr, path,
 
2369
          sizeof (listen_fds[listen_fds_num].addr) - 1);
 
2370
  listen_fds_num++;
 
2371
 
 
2372
  return (0);
 
2373
} /* }}} int open_listen_socket_unix */
 
2374
 
 
2375
static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
 
2376
{
 
2377
  struct addrinfo ai_hints;
 
2378
  struct addrinfo *ai_res;
 
2379
  struct addrinfo *ai_ptr;
 
2380
  char addr_copy[NI_MAXHOST];
 
2381
  char *addr;
 
2382
  char *port;
 
2383
  int status;
 
2384
 
 
2385
  strncpy (addr_copy, sock->addr, sizeof(addr_copy)-1);
 
2386
  addr_copy[sizeof (addr_copy) - 1] = 0;
 
2387
  addr = addr_copy;
 
2388
 
 
2389
  memset (&ai_hints, 0, sizeof (ai_hints));
 
2390
  ai_hints.ai_flags = 0;
 
2391
#ifdef AI_ADDRCONFIG
 
2392
  ai_hints.ai_flags |= AI_ADDRCONFIG;
 
2393
#endif
 
2394
  ai_hints.ai_family = AF_UNSPEC;
 
2395
  ai_hints.ai_socktype = SOCK_STREAM;
 
2396
 
 
2397
  port = NULL;
 
2398
  if (*addr == '[') /* IPv6+port format */
 
2399
  {
 
2400
    /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
 
2401
    addr++;
 
2402
 
 
2403
    port = strchr (addr, ']');
 
2404
    if (port == NULL)
 
2405
    {
 
2406
      fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
 
2407
      return (-1);
 
2408
    }
 
2409
    *port = 0;
 
2410
    port++;
 
2411
 
 
2412
    if (*port == ':')
 
2413
      port++;
 
2414
    else if (*port == 0)
 
2415
      port = NULL;
 
2416
    else
 
2417
    {
 
2418
      fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
 
2419
      return (-1);
 
2420
    }
 
2421
  } /* if (*addr == '[') */
 
2422
  else
 
2423
  {
 
2424
    port = rindex(addr, ':');
 
2425
    if (port != NULL)
 
2426
    {
 
2427
      *port = 0;
 
2428
      port++;
 
2429
    }
 
2430
  }
 
2431
  ai_res = NULL;
 
2432
  status = getaddrinfo (addr,
 
2433
                        port == NULL ? RRDCACHED_DEFAULT_PORT : port,
 
2434
                        &ai_hints, &ai_res);
 
2435
  if (status != 0)
 
2436
  {
 
2437
    fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
 
2438
             addr, gai_strerror (status));
 
2439
    return (-1);
 
2440
  }
 
2441
 
 
2442
  for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
 
2443
  {
 
2444
    int fd;
 
2445
    listen_socket_t *temp;
 
2446
    int one = 1;
 
2447
 
 
2448
    temp = (listen_socket_t *) rrd_realloc (listen_fds,
 
2449
        sizeof (listen_fds[0]) * (listen_fds_num + 1));
 
2450
    if (temp == NULL)
 
2451
    {
 
2452
      fprintf (stderr,
 
2453
               "rrdcached: open_listen_socket_network: realloc failed.\n");
 
2454
      continue;
 
2455
    }
 
2456
    listen_fds = temp;
 
2457
    memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
 
2458
 
 
2459
    fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
 
2460
    if (fd < 0)
 
2461
    {
 
2462
      fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
 
2463
               rrd_strerror(errno));
 
2464
      continue;
 
2465
    }
 
2466
 
 
2467
    setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
 
2468
 
 
2469
    status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
 
2470
    if (status != 0)
 
2471
    {
 
2472
      fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
 
2473
               sock->addr, rrd_strerror(errno));
 
2474
      close (fd);
 
2475
      continue;
 
2476
    }
 
2477
 
 
2478
    status = listen (fd, /* backlog = */ 10);
 
2479
    if (status != 0)
 
2480
    {
 
2481
      fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
 
2482
               sock->addr, rrd_strerror(errno));
 
2483
      close (fd);
 
2484
      freeaddrinfo(ai_res);
 
2485
      return (-1);
 
2486
    }
 
2487
 
 
2488
    listen_fds[listen_fds_num].fd = fd;
 
2489
    listen_fds[listen_fds_num].family = ai_ptr->ai_family;
 
2490
    listen_fds_num++;
 
2491
  } /* for (ai_ptr) */
 
2492
 
 
2493
  freeaddrinfo(ai_res);
 
2494
  return (0);
 
2495
} /* }}} static int open_listen_socket_network */
 
2496
 
 
2497
static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
 
2498
{
 
2499
  assert(sock != NULL);
 
2500
  assert(sock->addr != NULL);
 
2501
 
 
2502
  if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
 
2503
      || sock->addr[0] == '/')
 
2504
    return (open_listen_socket_unix(sock));
 
2505
  else
 
2506
    return (open_listen_socket_network(sock));
 
2507
} /* }}} int open_listen_socket */
 
2508
 
 
2509
static int close_listen_sockets (void) /* {{{ */
 
2510
{
 
2511
  size_t i;
 
2512
 
 
2513
  for (i = 0; i < listen_fds_num; i++)
 
2514
  {
 
2515
    close (listen_fds[i].fd);
 
2516
 
 
2517
    if (listen_fds[i].family == PF_UNIX)
 
2518
      unlink(listen_fds[i].addr);
 
2519
  }
 
2520
 
 
2521
  free (listen_fds);
 
2522
  listen_fds = NULL;
 
2523
  listen_fds_num = 0;
 
2524
 
 
2525
  return (0);
 
2526
} /* }}} int close_listen_sockets */
 
2527
 
 
2528
static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
 
2529
{
 
2530
  struct pollfd *pollfds;
 
2531
  int pollfds_num;
 
2532
  int status;
 
2533
  int i;
 
2534
 
 
2535
  if (listen_fds_num < 1)
 
2536
  {
 
2537
    RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
 
2538
    return (NULL);
 
2539
  }
 
2540
 
 
2541
  pollfds_num = listen_fds_num;
 
2542
  pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
 
2543
  if (pollfds == NULL)
 
2544
  {
 
2545
    RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
 
2546
    return (NULL);
 
2547
  }
 
2548
  memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
 
2549
 
 
2550
  RRDD_LOG(LOG_INFO, "listening for connections");
 
2551
 
 
2552
  while (state == RUNNING)
 
2553
  {
 
2554
    for (i = 0; i < pollfds_num; i++)
 
2555
    {
 
2556
      pollfds[i].fd = listen_fds[i].fd;
 
2557
      pollfds[i].events = POLLIN | POLLPRI;
 
2558
      pollfds[i].revents = 0;
 
2559
    }
 
2560
 
 
2561
    status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
 
2562
    if (state != RUNNING)
 
2563
      break;
 
2564
    else if (status == 0) /* timeout */
 
2565
      continue;
 
2566
    else if (status < 0) /* error */
 
2567
    {
 
2568
      status = errno;
 
2569
      if (status != EINTR)
 
2570
      {
 
2571
        RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
 
2572
      }
 
2573
      continue;
 
2574
    }
 
2575
 
 
2576
    for (i = 0; i < pollfds_num; i++)
 
2577
    {
 
2578
      listen_socket_t *client_sock;
 
2579
      struct sockaddr_storage client_sa;
 
2580
      socklen_t client_sa_size;
 
2581
      pthread_t tid;
 
2582
      pthread_attr_t attr;
 
2583
 
 
2584
      if (pollfds[i].revents == 0)
 
2585
        continue;
 
2586
 
 
2587
      if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
 
2588
      {
 
2589
        RRDD_LOG (LOG_ERR, "listen_thread_main: "
 
2590
            "poll(2) returned something unexpected for listen FD #%i.",
 
2591
            pollfds[i].fd);
 
2592
        continue;
 
2593
      }
 
2594
 
 
2595
      client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
 
2596
      if (client_sock == NULL)
 
2597
      {
 
2598
        RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
 
2599
        continue;
 
2600
      }
 
2601
      memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
 
2602
 
 
2603
      client_sa_size = sizeof (client_sa);
 
2604
      client_sock->fd = accept (pollfds[i].fd,
 
2605
          (struct sockaddr *) &client_sa, &client_sa_size);
 
2606
      if (client_sock->fd < 0)
 
2607
      {
 
2608
        RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
 
2609
        free(client_sock);
 
2610
        continue;
 
2611
      }
 
2612
 
 
2613
      pthread_attr_init (&attr);
 
2614
      pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
 
2615
 
 
2616
      status = pthread_create (&tid, &attr, connection_thread_main,
 
2617
                               client_sock);
 
2618
      if (status != 0)
 
2619
      {
 
2620
        RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
 
2621
        close_connection(client_sock);
 
2622
        continue;
 
2623
      }
 
2624
    } /* for (pollfds_num) */
 
2625
  } /* while (state == RUNNING) */
 
2626
 
 
2627
  RRDD_LOG(LOG_INFO, "starting shutdown");
 
2628
 
 
2629
  close_listen_sockets ();
 
2630
 
 
2631
  pthread_mutex_lock (&connection_threads_lock);
 
2632
  while (connection_threads_num > 0)
 
2633
    pthread_cond_wait(&connection_threads_done, &connection_threads_lock);
 
2634
  pthread_mutex_unlock (&connection_threads_lock);
 
2635
 
 
2636
  free(pollfds);
 
2637
 
 
2638
  return (NULL);
 
2639
} /* }}} void *listen_thread_main */
 
2640
 
 
2641
static int daemonize (void) /* {{{ */
 
2642
{
 
2643
  int pid_fd;
 
2644
  char *base_dir;
 
2645
 
 
2646
  daemon_uid = geteuid();
 
2647
 
 
2648
  pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
 
2649
  if (pid_fd < 0)
 
2650
    pid_fd = check_pidfile();
 
2651
  if (pid_fd < 0)
 
2652
    return pid_fd;
 
2653
 
 
2654
  /* open all the listen sockets */
 
2655
  if (config_listen_address_list_len > 0)
 
2656
  {
 
2657
    for (size_t i = 0; i < config_listen_address_list_len; i++)
 
2658
      open_listen_socket (config_listen_address_list[i]);
 
2659
 
 
2660
    rrd_free_ptrs((void ***) &config_listen_address_list,
 
2661
                  &config_listen_address_list_len);
 
2662
  }
 
2663
  else
 
2664
  {
 
2665
    listen_socket_t sock;
 
2666
    memset(&sock, 0, sizeof(sock));
 
2667
    strncpy(sock.addr, RRDCACHED_DEFAULT_ADDRESS, sizeof(sock.addr)-1);
 
2668
    open_listen_socket (&sock);
 
2669
  }
 
2670
 
 
2671
  if (listen_fds_num < 1)
 
2672
  {
 
2673
    fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
 
2674
    goto error;
 
2675
  }
 
2676
 
 
2677
  if (!stay_foreground)
 
2678
  {
 
2679
    pid_t child;
 
2680
 
 
2681
    child = fork ();
 
2682
    if (child < 0)
 
2683
    {
 
2684
      fprintf (stderr, "daemonize: fork(2) failed.\n");
 
2685
      goto error;
 
2686
    }
 
2687
    else if (child > 0)
 
2688
      exit(0);
 
2689
 
 
2690
    /* Become session leader */
 
2691
    setsid ();
 
2692
 
 
2693
    /* Open the first three file descriptors to /dev/null */
 
2694
    close (2);
 
2695
    close (1);
 
2696
    close (0);
 
2697
 
 
2698
    open ("/dev/null", O_RDWR);
 
2699
    if (dup(0) == -1 || dup(0) == -1){
 
2700
        RRDD_LOG (LOG_ERR, "faild to run dup.\n");
 
2701
    }
 
2702
  } /* if (!stay_foreground) */
 
2703
 
 
2704
  /* Change into the /tmp directory. */
 
2705
  base_dir = (config_base_dir != NULL)
 
2706
    ? config_base_dir
 
2707
    : "/tmp";
 
2708
 
 
2709
  if (chdir (base_dir) != 0)
 
2710
  {
 
2711
    fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
 
2712
    goto error;
 
2713
  }
 
2714
 
 
2715
  install_signal_handlers();
 
2716
 
 
2717
  openlog ("rrdcached", LOG_PID, LOG_DAEMON);
 
2718
  RRDD_LOG(LOG_INFO, "starting up");
 
2719
 
 
2720
  cache_tree = g_tree_new_full ((GCompareDataFunc) strcmp, NULL, NULL,
 
2721
                                (GDestroyNotify) free_cache_item);
 
2722
  if (cache_tree == NULL)
 
2723
  {
 
2724
    RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
 
2725
    goto error;
 
2726
  }
 
2727
 
 
2728
  return write_pidfile (pid_fd);
 
2729
 
 
2730
error:
 
2731
  remove_pidfile();
 
2732
  return -1;
 
2733
} /* }}} int daemonize */
 
2734
 
 
2735
static int cleanup (void) /* {{{ */
 
2736
{
 
2737
  pthread_cond_broadcast (&flush_cond);
 
2738
  pthread_join (flush_thread, NULL);
 
2739
 
 
2740
  pthread_cond_broadcast (&queue_cond);
 
2741
  for (int i = 0; i < config_queue_threads; i++)
 
2742
    pthread_join (queue_threads[i], NULL);
 
2743
 
 
2744
  if (config_flush_at_shutdown)
 
2745
  {
 
2746
    assert(cache_queue_head == NULL);
 
2747
    RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
 
2748
  }
 
2749
 
 
2750
  free(queue_threads);
 
2751
  free(config_base_dir);
 
2752
 
 
2753
  pthread_mutex_lock(&cache_lock);
 
2754
  g_tree_destroy(cache_tree);
 
2755
 
 
2756
  pthread_mutex_lock(&journal_lock);
 
2757
  journal_done();
 
2758
 
 
2759
  RRDD_LOG(LOG_INFO, "goodbye");
 
2760
  closelog ();
 
2761
 
 
2762
  remove_pidfile ();
 
2763
  free(config_pid_file);
 
2764
 
 
2765
  return (0);
 
2766
} /* }}} int cleanup */
 
2767
 
 
2768
static int read_options (int argc, char **argv) /* {{{ */
 
2769
{
 
2770
  int option;
 
2771
  int status = 0;
 
2772
 
 
2773
  char **permissions = NULL;
 
2774
  size_t permissions_len = 0;
 
2775
 
 
2776
  gid_t  socket_group = (gid_t)-1;
 
2777
  mode_t socket_permissions = (mode_t)-1;
 
2778
 
 
2779
  while ((option = getopt(argc, argv, "gl:s:m:P:f:w:z:t:Bb:p:Fj:h?")) != -1)
 
2780
  {
 
2781
    switch (option)
 
2782
    {
 
2783
      case 'g':
 
2784
        stay_foreground=1;
 
2785
        break;
 
2786
 
 
2787
      case 'l':
 
2788
      {
 
2789
        listen_socket_t *new;
 
2790
 
 
2791
        new = malloc(sizeof(listen_socket_t));
 
2792
        if (new == NULL)
 
2793
        {
 
2794
          fprintf(stderr, "read_options: malloc failed.\n");
 
2795
          return(2);
 
2796
        }
 
2797
        memset(new, 0, sizeof(listen_socket_t));
 
2798
 
 
2799
        strncpy(new->addr, optarg, sizeof(new->addr)-1);
 
2800
 
 
2801
        /* Add permissions to the socket {{{ */
 
2802
        if (permissions_len != 0)
 
2803
        {
 
2804
          size_t i;
 
2805
          for (i = 0; i < permissions_len; i++)
 
2806
          {
 
2807
            status = socket_permission_add (new, permissions[i]);
 
2808
            if (status != 0)
 
2809
            {
 
2810
              fprintf (stderr, "read_options: Adding permission \"%s\" to "
 
2811
                  "socket failed. Most likely, this permission doesn't "
 
2812
                  "exist. Check your command line.\n", permissions[i]);
 
2813
              status = 4;
 
2814
            }
 
2815
          }
 
2816
        }
 
2817
        else /* if (permissions_len == 0) */
 
2818
        {
 
2819
          /* Add permission for ALL commands to the socket. */
 
2820
          size_t i;
 
2821
          for (i = 0; i < list_of_commands_len; i++)
 
2822
          {
 
2823
            status = socket_permission_add (new, list_of_commands[i].cmd);
 
2824
            if (status != 0)
 
2825
            {
 
2826
              fprintf (stderr, "read_options: Adding permission \"%s\" to "
 
2827
                  "socket failed. This should never happen, ever! Sorry.\n",
 
2828
                  permissions[i]);
 
2829
              status = 4;
 
2830
            }
 
2831
          }
 
2832
        }
 
2833
        /* }}} Done adding permissions. */
 
2834
 
 
2835
        new->socket_group = socket_group;
 
2836
        new->socket_permissions = socket_permissions;
 
2837
 
 
2838
        if (!rrd_add_ptr((void ***)&config_listen_address_list,
 
2839
                         &config_listen_address_list_len, new))
 
2840
        {
 
2841
          fprintf(stderr, "read_options: rrd_add_ptr failed.\n");
 
2842
          return (2);
 
2843
        }
 
2844
      }
 
2845
      break;
 
2846
 
 
2847
      /* set socket group permissions */
 
2848
      case 's':
 
2849
      {
 
2850
        gid_t group_gid;
 
2851
        struct group *grp;
 
2852
 
 
2853
        group_gid = strtoul(optarg, NULL, 10);
 
2854
        if (errno != EINVAL && group_gid>0)
 
2855
        {
 
2856
          /* we were passed a number */
 
2857
          grp = getgrgid(group_gid);
 
2858
        }
 
2859
        else
 
2860
        {
 
2861
          grp = getgrnam(optarg);
 
2862
        }
 
2863
 
 
2864
        if (grp)
 
2865
        {
 
2866
          socket_group = grp->gr_gid;
 
2867
        }
 
2868
        else
 
2869
        {
 
2870
          /* no idea what the user wanted... */
 
2871
          fprintf (stderr, "read_options: couldn't map \"%s\" to a group, Sorry\n", optarg);
 
2872
          return (5);
 
2873
        }
 
2874
      }
 
2875
      break;
 
2876
 
 
2877
      /* set socket file permissions */
 
2878
      case 'm':
 
2879
      {
 
2880
        long  tmp;
 
2881
        char *endptr = NULL;
 
2882
 
 
2883
        tmp = strtol (optarg, &endptr, 8);
 
2884
        if ((endptr == optarg) || (! endptr) || (*endptr != '\0')
 
2885
            || (tmp > 07777) || (tmp < 0)) {
 
2886
          fprintf (stderr, "read_options: Invalid file mode \"%s\".\n",
 
2887
              optarg);
 
2888
          return (5);
 
2889
        }
 
2890
 
 
2891
        socket_permissions = (mode_t)tmp;
 
2892
      }
 
2893
      break;
 
2894
 
 
2895
      case 'P':
 
2896
      {
 
2897
        char *optcopy;
 
2898
        char *saveptr;
 
2899
        char *dummy;
 
2900
        char *ptr;
 
2901
 
 
2902
        rrd_free_ptrs ((void *) &permissions, &permissions_len);
 
2903
 
 
2904
        optcopy = strdup (optarg);
 
2905
        dummy = optcopy;
 
2906
        saveptr = NULL;
 
2907
        while ((ptr = strtok_r (dummy, ", ", &saveptr)) != NULL)
 
2908
        {
 
2909
          dummy = NULL;
 
2910
          rrd_add_strdup ((void *) &permissions, &permissions_len, ptr);
 
2911
        }
 
2912
 
 
2913
        free (optcopy);
 
2914
      }
 
2915
      break;
 
2916
 
 
2917
      case 'f':
 
2918
      {
 
2919
        int temp;
 
2920
 
 
2921
        temp = atoi (optarg);
 
2922
        if (temp > 0)
 
2923
          config_flush_interval = temp;
 
2924
        else
 
2925
        {
 
2926
          fprintf (stderr, "Invalid flush interval: %s\n", optarg);
 
2927
          status = 3;
 
2928
        }
 
2929
      }
 
2930
      break;
 
2931
 
 
2932
      case 'w':
 
2933
      {
 
2934
        int temp;
 
2935
 
 
2936
        temp = atoi (optarg);
 
2937
        if (temp > 0)
 
2938
          config_write_interval = temp;
 
2939
        else
 
2940
        {
 
2941
          fprintf (stderr, "Invalid write interval: %s\n", optarg);
 
2942
          status = 2;
 
2943
        }
 
2944
      }
 
2945
      break;
 
2946
 
 
2947
      case 'z':
 
2948
      {
 
2949
        int temp;
 
2950
 
 
2951
        temp = atoi(optarg);
 
2952
        if (temp > 0)
 
2953
          config_write_jitter = temp;
 
2954
        else
 
2955
        {
 
2956
          fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
 
2957
          status = 2;
 
2958
        }
 
2959
 
 
2960
        break;
 
2961
      }
 
2962
 
 
2963
      case 't':
 
2964
      {
 
2965
        int threads;
 
2966
        threads = atoi(optarg);
 
2967
        if (threads >= 1)
 
2968
          config_queue_threads = threads;
 
2969
        else
 
2970
        {
 
2971
          fprintf (stderr, "Invalid thread count: -t %s\n", optarg);
 
2972
          return 1;
 
2973
        }
 
2974
      }
 
2975
      break;
 
2976
 
 
2977
      case 'B':
 
2978
        config_write_base_only = 1;
 
2979
        break;
 
2980
 
 
2981
      case 'b':
 
2982
      {
 
2983
        size_t len;
 
2984
        char base_realpath[PATH_MAX];
 
2985
 
 
2986
        if (config_base_dir != NULL)
 
2987
          free (config_base_dir);
 
2988
        config_base_dir = strdup (optarg);
 
2989
        if (config_base_dir == NULL)
 
2990
        {
 
2991
          fprintf (stderr, "read_options: strdup failed.\n");
 
2992
          return (3);
 
2993
        }
 
2994
 
 
2995
        if (rrd_mkdir_p (config_base_dir, 0777) != 0)
 
2996
        {
 
2997
          fprintf (stderr, "Failed to create base directory '%s': %s\n",
 
2998
              config_base_dir, rrd_strerror (errno));
 
2999
          return (3);
 
3000
        }
 
3001
 
 
3002
        /* make sure that the base directory is not resolved via
 
3003
         * symbolic links.  this makes some performance-enhancing
 
3004
         * assumptions possible (we don't have to resolve paths
 
3005
         * that start with a "/")
 
3006
         */
 
3007
        if (realpath(config_base_dir, base_realpath) == NULL)
 
3008
        {
 
3009
          fprintf (stderr, "Failed to canonicalize the base directory '%s': "
 
3010
              "%s\n", config_base_dir, rrd_strerror(errno));
 
3011
          return 5;
 
3012
        }
 
3013
 
 
3014
        len = strlen (config_base_dir);
 
3015
        while ((len > 0) && (config_base_dir[len - 1] == '/'))
 
3016
        {
 
3017
          config_base_dir[len - 1] = 0;
 
3018
          len--;
 
3019
        }
 
3020
 
 
3021
        if (len < 1)
 
3022
        {
 
3023
          fprintf (stderr, "Invalid base directory: %s\n", optarg);
 
3024
          return (4);
 
3025
        }
 
3026
 
 
3027
        _config_base_dir_len = len;
 
3028
 
 
3029
        len = strlen (base_realpath);
 
3030
        while ((len > 0) && (base_realpath[len - 1] == '/'))
 
3031
        {
 
3032
          base_realpath[len - 1] = '\0';
 
3033
          len--;
 
3034
        }
 
3035
 
 
3036
        if (strncmp(config_base_dir,
 
3037
                         base_realpath, sizeof(base_realpath)) != 0)
 
3038
        {
 
3039
          fprintf(stderr,
 
3040
                  "Base directory (-b) resolved via file system links!\n"
 
3041
                  "Please consult rrdcached '-b' documentation!\n"
 
3042
                  "Consider specifying the real directory (%s)\n",
 
3043
                  base_realpath);
 
3044
          return 5;
 
3045
        }
 
3046
      }
 
3047
      break;
 
3048
 
 
3049
      case 'p':
 
3050
      {
 
3051
        if (config_pid_file != NULL)
 
3052
          free (config_pid_file);
 
3053
        config_pid_file = strdup (optarg);
 
3054
        if (config_pid_file == NULL)
 
3055
        {
 
3056
          fprintf (stderr, "read_options: strdup failed.\n");
 
3057
          return (3);
 
3058
        }
 
3059
      }
 
3060
      break;
 
3061
 
 
3062
      case 'F':
 
3063
        config_flush_at_shutdown = 1;
 
3064
        break;
 
3065
 
 
3066
      case 'j':
 
3067
      {
 
3068
        const char *dir = journal_dir = strdup(optarg);
 
3069
 
 
3070
        status = rrd_mkdir_p(dir, 0777);
 
3071
        if (status != 0)
 
3072
        {
 
3073
          fprintf(stderr, "Failed to create journal directory '%s': %s\n",
 
3074
              dir, rrd_strerror(errno));
 
3075
          return 6;
 
3076
        }
 
3077
 
 
3078
        if (access(dir, R_OK|W_OK|X_OK) != 0)
 
3079
        {
 
3080
          fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
 
3081
                  errno ? rrd_strerror(errno) : "");
 
3082
          return 6;
 
3083
        }
 
3084
      }
 
3085
      break;
 
3086
 
 
3087
      case 'h':
 
3088
      case '?':
 
3089
        printf ("RRDCacheD %s\n"
 
3090
            "Copyright (C) 2008,2009 Florian octo Forster and Kevin Brintnall\n"
 
3091
            "\n"
 
3092
            "Usage: rrdcached [options]\n"
 
3093
            "\n"
 
3094
            "Valid options are:\n"
 
3095
            "  -l <address>  Socket address to listen to.\n"
 
3096
            "  -P <perms>    Sets the permissions to assign to all following "
 
3097
                            "sockets\n"
 
3098
            "  -w <seconds>  Interval in which to write data.\n"
 
3099
            "  -z <delay>    Delay writes up to <delay> seconds to spread load\n"
 
3100
            "  -t <threads>  Number of write threads.\n"
 
3101
            "  -f <seconds>  Interval in which to flush dead data.\n"
 
3102
            "  -p <file>     Location of the PID-file.\n"
 
3103
            "  -b <dir>      Base directory to change to.\n"
 
3104
            "  -B            Restrict file access to paths within -b <dir>\n"
 
3105
            "  -g            Do not fork and run in the foreground.\n"
 
3106
            "  -j <dir>      Directory in which to create the journal files.\n"
 
3107
            "  -F            Always flush all updates at shutdown\n"
 
3108
            "  -s <id|name>  Make socket g+rw to named group\n"
 
3109
            "\n"
 
3110
            "For more information and a detailed description of all options "
 
3111
            "please refer\n"
 
3112
            "to the rrdcached(1) manual page.\n",
 
3113
            VERSION);
 
3114
        status = -1;
 
3115
        break;
 
3116
    } /* switch (option) */
 
3117
  } /* while (getopt) */
 
3118
 
 
3119
  /* advise the user when values are not sane */
 
3120
  if (config_flush_interval < 2 * config_write_interval)
 
3121
    fprintf(stderr, "WARNING: flush interval (-f) should be at least"
 
3122
            " 2x write interval (-w) !\n");
 
3123
  if (config_write_jitter > config_write_interval)
 
3124
    fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
 
3125
            " write interval (-w) !\n");
 
3126
 
 
3127
  if (config_write_base_only && config_base_dir == NULL)
 
3128
    fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
 
3129
            "  Consult the rrdcached documentation\n");
 
3130
 
 
3131
  if (journal_dir == NULL)
 
3132
    config_flush_at_shutdown = 1;
 
3133
 
 
3134
  rrd_free_ptrs ((void *) &permissions, &permissions_len);
 
3135
 
 
3136
  return (status);
 
3137
} /* }}} int read_options */
 
3138
 
 
3139
int main (int argc, char **argv)
 
3140
{
 
3141
  int status;
 
3142
 
 
3143
  status = read_options (argc, argv);
 
3144
  if (status != 0)
 
3145
  {
 
3146
    if (status < 0)
 
3147
      status = 0;
 
3148
    return (status);
 
3149
  }
 
3150
 
 
3151
  status = daemonize ();
 
3152
  if (status != 0)
 
3153
  {
 
3154
    fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
 
3155
    return (1);
 
3156
  }
 
3157
 
 
3158
  journal_init();
 
3159
 
 
3160
  /* start the queue threads */
 
3161
  queue_threads = calloc(config_queue_threads, sizeof(*queue_threads));
 
3162
  if (queue_threads == NULL)
 
3163
  {
 
3164
    RRDD_LOG (LOG_ERR, "FATAL: cannot calloc queue threads");
 
3165
    cleanup();
 
3166
    return (1);
 
3167
  }
 
3168
  for (int i = 0; i < config_queue_threads; i++)
 
3169
  {
 
3170
    memset (&queue_threads[i], 0, sizeof (*queue_threads));
 
3171
    status = pthread_create (&queue_threads[i], NULL, queue_thread_main, NULL);
 
3172
    if (status != 0)
 
3173
    {
 
3174
      RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
 
3175
      cleanup();
 
3176
      return (1);
 
3177
    }
 
3178
  }
 
3179
 
 
3180
  /* start the flush thread */
 
3181
  memset(&flush_thread, 0, sizeof(flush_thread));
 
3182
  status = pthread_create (&flush_thread, NULL, flush_thread_main, NULL);
 
3183
  if (status != 0)
 
3184
  {
 
3185
    RRDD_LOG (LOG_ERR, "FATAL: cannot create flush thread");
 
3186
    cleanup();
 
3187
    return (1);
 
3188
  }
 
3189
 
 
3190
  listen_thread_main (NULL);
 
3191
  cleanup ();
 
3192
 
 
3193
  return (0);
 
3194
} /* int main */
 
3195
 
 
3196
/*
 
3197
 * vim: set sw=2 sts=2 ts=8 et fdm=marker :
 
3198
 */