43
46
#include <import/ip_tproxy.h>
49
/* sends a log message when a backend goes down, and also sets last
52
static void set_backend_down(struct proxy *be)
54
be->last_change = now.tv_sec;
57
Alert("%s '%s' has no server available!\n", proxy_type_str(be), be->id);
58
send_log(be, LOG_EMERG, "%s %s has no server available!\n", proxy_type_str(be), be->id);
61
/* Redistribute pending connections when a server goes down. The number of
62
* connections redistributed is returned.
64
static int redistribute_pending(struct server *s)
66
struct pendconn *pc, *pc_bck, *pc_end;
69
FOREACH_ITEM_SAFE(pc, pc_bck, &s->pendconns, pc_end, struct pendconn *, list) {
70
struct session *sess = pc->sess;
71
if (sess->be->options & PR_O_REDISP) {
72
/* The REDISP option was specified. We will ignore
73
* cookie and force to balance or use the dispatcher.
75
sess->flags &= ~(SN_DIRECT | SN_ASSIGNED | SN_ADDR_SET);
76
sess->srv = NULL; /* it's left to the dispatcher to choose a server */
77
http_flush_cookie_flags(&sess->txn);
79
task_wakeup(sess->task);
86
/* Check for pending connections at the backend, and assign some of them to
87
* the server coming up. The server's weight is checked before being assigned
88
* connections it may not be able to handle. The total number of transferred
89
* connections is returned.
91
static int check_for_pending(struct server *s)
98
for (xferred = 0; !s->maxconn || xferred < srv_dynamic_maxconn(s); xferred++) {
102
p = pendconn_from_px(s->proxy);
108
task_wakeup(sess->task);
47
113
/* Sets server <s> down, notifies by all available means, recounts the
48
114
* remaining servers on the proxy and transfers queued sessions whenever
49
* possible to other servers.
115
* possible to other servers. It automatically recomputes the number of
116
* servers, but not the map.
51
118
static void set_server_down(struct server *s)
53
struct pendconn *pc, *pc_bck, *pc_end;
57
s->state &= ~SRV_RUNNING;
59
122
if (s->health == s->rise) {
60
recount_servers(s->proxy);
61
recalc_server_map(s->proxy);
123
int srv_was_paused = s->state & SRV_GOINGDOWN;
125
s->last_change = now.tv_sec;
126
s->state &= ~(SRV_RUNNING | SRV_GOINGDOWN);
127
s->proxy->lbprm.set_server_status_down(s);
63
129
/* we might have sessions queued on this server and waiting for
64
130
* a connection. Those which are redispatchable will be queued
65
131
* to another server or to the proxy itself.
68
FOREACH_ITEM_SAFE(pc, pc_bck, &s->pendconns, pc_end, struct pendconn *, list) {
70
if ((sess->be->options & PR_O_REDISP)) {
71
/* The REDISP option was specified. We will ignore
72
* cookie and force to balance or use the dispatcher.
74
sess->flags &= ~(SN_DIRECT | SN_ASSIGNED | SN_ADDR_SET);
75
sess->srv = NULL; /* it's left to the dispatcher to choose a server */
76
http_flush_cookie_flags(&sess->txn);
78
task_wakeup(sess->task);
133
xferred = redistribute_pending(s);
83
134
sprintf(trash, "%sServer %s/%s is DOWN. %d active and %d backup servers left.%s"
84
135
" %d sessions active, %d requeued, %d remaining in queue.\n",
85
136
s->state & SRV_BACKUP ? "Backup " : "",
88
139
s->cur_sess, xferred, s->nbpend);
90
141
Warning("%s", trash);
91
send_log(s->proxy, LOG_ALERT, "%s", trash);
93
if (s->proxy->srv_bck == 0 && s->proxy->srv_act == 0) {
94
Alert("%s '%s' has no server available !\n", proxy_type_str(s->proxy), s->proxy->id);
95
send_log(s->proxy, LOG_EMERG, "%s %s has no server available !\n", proxy_type_str(s->proxy), s->proxy->id);
143
/* we don't send an alert if the server was previously paused */
145
send_log(s->proxy, LOG_NOTICE, "%s", trash);
147
send_log(s->proxy, LOG_ALERT, "%s", trash);
149
if (s->proxy->srv_bck == 0 && s->proxy->srv_act == 0)
150
set_backend_down(s->proxy);
99
154
s->health = 0; /* failure */
201
255
* This function is used only for server health-checks. It handles the server's
202
* reply to an HTTP request or SSL HELLO. It returns 1 in s->result if the
203
* server replies HTTP 2xx or 3xx (valid responses), or if it returns at least
204
* 5 bytes in response to SSL HELLO. The principle is that this is enough to
205
* distinguish between an SSL server and a pure TCP relay. All other cases will
206
* return -1. The function returns 0 if it needs to be called again after some
207
* polling, otherwise non-zero..
256
* reply to an HTTP request or SSL HELLO. It sets s->result to SRV_CHK_RUNNING
257
* if an HTTP server replies HTTP 2xx or 3xx (valid responses), if an SMTP
258
* server returns 2xx, or if an SSL server returns at least 5 bytes in response
259
* to an SSL HELLO (the principle is that this is enough to distinguish between
260
* an SSL server and a pure TCP relay). All other cases will set s->result to
261
* SRV_CHK_ERROR. The function returns 0 if it needs to be called again after
262
* some polling, otherwise non-zero..
209
264
static int event_srv_chk_r(int fd)
211
266
__label__ out_wakeup;
214
268
struct task *t = fdtab[fd].owner;
215
269
struct server *s = t->context;
217
271
socklen_t lskerr = sizeof(skerr);
221
if (unlikely(fdtab[fd].state == FD_STERROR ||
275
if (unlikely((s->result & SRV_CHK_ERROR) ||
276
(fdtab[fd].state == FD_STERROR) ||
222
277
(fdtab[fd].ev & FD_POLL_ERR) ||
223
278
(getsockopt(fd, SOL_SOCKET, SO_ERROR, &skerr, &lskerr) == -1) ||
225
280
/* in case of TCP only, this tells us if the connection failed */
227
fdtab[fd].state = FD_STERROR;
281
s->result |= SRV_CHK_ERROR;
231
285
#ifndef MSG_NOSIGNAL
232
len = recv(fd, reply, sizeof(reply), 0);
286
len = recv(fd, trash, sizeof(trash), 0);
234
288
/* Warning! Linux returns EAGAIN on SO_ERROR if data are still available
235
289
* but the connection was closed on the remote end. Fortunately, recv still
236
290
* works correctly and we don't need to do the getsockopt() on linux.
238
len = recv(fd, reply, sizeof(reply), MSG_NOSIGNAL);
292
len = recv(fd, trash, sizeof(trash), MSG_NOSIGNAL);
240
294
if (unlikely(len < 0 && errno == EAGAIN)) {
241
295
/* we want some polling to happen first */
242
fdtab[fd].ev &= ~FD_POLL_RD;
296
fdtab[fd].ev &= ~FD_POLL_IN;
246
if ((s->proxy->options & PR_O_HTTP_CHK) && (len >= sizeof("HTTP/1.0 000")) &&
247
(memcmp(reply, "HTTP/1.", 7) == 0) && (reply[9] == '2' || reply[9] == '3')) {
248
/* HTTP/1.X 2xx or 3xx */
251
else if ((s->proxy->options & PR_O_SSL3_CHK) && (len >= 5) &&
252
(reply[0] == 0x15 || reply[0] == 0x16)) {
253
/* SSLv3 alert or handshake */
256
else if ((s->proxy->options & PR_O_SMTP_CHK) && (len >= 3) &&
257
(reply[0] == '2')) /* 2xx (should be 250) */ {
300
/* Note: the response will only be accepted if read at once */
301
if (s->proxy->options & PR_O_HTTP_CHK) {
302
/* Check if the server speaks HTTP 1.X */
303
if ((len < strlen("HTTP/1.0 000\r")) ||
304
(memcmp(trash, "HTTP/1.", 7) != 0)) {
305
s->result |= SRV_CHK_ERROR;
309
/* check the reply : HTTP/1.X 2xx and 3xx are OK */
310
if (trash[9] == '2' || trash[9] == '3')
311
s->result |= SRV_CHK_RUNNING;
312
else if ((s->proxy->options & PR_O_DISABLE404) &&
313
(s->state & SRV_RUNNING) &&
314
(memcmp(&trash[9], "404", 3) == 0)) {
315
/* 404 may be accepted as "stopping" only if the server was up */
316
s->result |= SRV_CHK_RUNNING | SRV_CHK_DISABLE;
319
s->result |= SRV_CHK_ERROR;
321
else if (s->proxy->options & PR_O_SSL3_CHK) {
322
/* Check for SSLv3 alert or handshake */
323
if ((len >= 5) && (trash[0] == 0x15 || trash[0] == 0x16))
324
s->result |= SRV_CHK_RUNNING;
326
s->result |= SRV_CHK_ERROR;
328
else if (s->proxy->options & PR_O_SMTP_CHK) {
329
/* Check for SMTP code 2xx (should be 250) */
330
if ((len >= 3) && (trash[0] == '2'))
331
s->result |= SRV_CHK_RUNNING;
333
s->result |= SRV_CHK_ERROR;
336
/* other checks are valid if the connection succeeded anyway */
337
s->result |= SRV_CHK_RUNNING;
341
if (s->result & SRV_CHK_ERROR)
262
342
fdtab[fd].state = FD_STERROR;
268
344
EV_FD_CLR(fd, DIR_RD);
270
fdtab[fd].ev &= ~FD_POLL_RD;
346
fdtab[fd].ev &= ~FD_POLL_IN;
308
386
/* we'll initiate a new check */
309
s->result = 0; /* no result yet */
387
s->result = SRV_CHK_UNKNOWN; /* no result yet */
310
388
if ((fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) != -1) {
311
389
if ((fd < global.maxsock) &&
312
390
(fcntl(fd, F_SETFL, O_NONBLOCK) != -1) &&
313
391
(setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, (char *) &one, sizeof(one)) != -1)) {
314
392
//fprintf(stderr, "process_chk: 3\n");
394
if (s->proxy->options & PR_O_TCP_NOLING) {
395
/* We don't want to useless data */
396
setsockopt(fd, SOL_SOCKET, SO_LINGER, (struct linger *) &nolinger, sizeof(struct linger));
317
399
if (s->check_addr.sin_addr.s_addr)
318
400
/* we'll connect to the check addr specified on the server */
445
528
//fprintf(stderr, "process_chk: 8\n");
446
529
/* there was a test running */
447
if (s->result > 0) { /* good server detected */
530
if ((s->result & (SRV_CHK_ERROR|SRV_CHK_RUNNING)) == SRV_CHK_RUNNING) { /* good server detected */
448
531
//fprintf(stderr, "process_chk: 9\n");
449
s->health++; /* was bad, stays for a while */
450
if (s->health >= s->rise) {
451
s->state |= SRV_RUNNING;
533
if (s->state & SRV_WARMINGUP) {
534
if (now.tv_sec < s->last_change || now.tv_sec >= s->last_change + s->slowstart) {
535
s->state &= ~SRV_WARMINGUP;
536
if (s->proxy->lbprm.algo & BE_LB_PROP_DYN)
537
s->eweight = s->uweight * BE_WEIGHT_SCALE;
538
if (s->proxy->lbprm.update_server_eweight)
539
s->proxy->lbprm.update_server_eweight(s);
541
else if (s->proxy->lbprm.algo & BE_LB_PROP_DYN) {
542
/* for dynamic algorithms, let's update the weight */
543
s->eweight = (BE_WEIGHT_SCALE * (now.tv_sec - s->last_change) +
544
s->slowstart - 1) / s->slowstart;
545
s->eweight *= s->uweight;
546
if (s->proxy->lbprm.update_server_eweight)
547
s->proxy->lbprm.update_server_eweight(s);
549
/* probably that we can refill this server with a bit more connections */
550
check_for_pending(s);
553
/* we may have to add/remove this server from the LB group */
554
if ((s->state & SRV_RUNNING) && (s->proxy->options & PR_O_DISABLE404)) {
555
if ((s->state & SRV_GOINGDOWN) &&
556
((s->result & (SRV_CHK_RUNNING|SRV_CHK_DISABLE)) == SRV_CHK_RUNNING)) {
557
/* server enabled again */
558
s->state &= ~SRV_GOINGDOWN;
559
s->proxy->lbprm.set_server_status_up(s);
561
/* check if we can handle some connections queued at the proxy. We
562
* will take as many as we can handle.
564
xferred = check_for_pending(s);
567
"Load-balancing on %sServer %s/%s is enabled again. %d active and %d backup servers online.%s"
568
" %d sessions requeued, %d total in queue.\n",
569
s->state & SRV_BACKUP ? "Backup " : "",
570
s->proxy->id, s->id, s->proxy->srv_act, s->proxy->srv_bck,
571
(s->proxy->srv_bck && !s->proxy->srv_act) ? " Running on backup." : "",
574
Warning("%s", trash);
575
send_log(s->proxy, LOG_NOTICE, "%s", trash);
577
else if (!(s->state & SRV_GOINGDOWN) &&
578
((s->result & (SRV_CHK_RUNNING | SRV_CHK_DISABLE)) ==
579
(SRV_CHK_RUNNING | SRV_CHK_DISABLE))) {
580
/* server disabled */
581
s->state |= SRV_GOINGDOWN;
582
s->proxy->lbprm.set_server_status_down(s);
584
/* we might have sessions queued on this server and waiting for
585
* a connection. Those which are redispatchable will be queued
586
* to another server or to the proxy itself.
588
xferred = redistribute_pending(s);
591
"Load-balancing on %sServer %s/%s is disabled. %d active and %d backup servers online.%s"
592
" %d sessions requeued, %d total in queue.\n",
593
s->state & SRV_BACKUP ? "Backup " : "",
594
s->proxy->id, s->id, s->proxy->srv_act, s->proxy->srv_bck,
595
(s->proxy->srv_bck && !s->proxy->srv_act) ? " Running on backup." : "",
598
Warning("%s", trash);
600
send_log(s->proxy, LOG_NOTICE, "%s", trash);
601
if (!s->proxy->srv_bck && !s->proxy->srv_act)
602
set_backend_down(s->proxy);
606
if (s->health < s->rise + s->fall - 1) {
607
s->health++; /* was bad, stays for a while */
453
609
if (s->health == s->rise) {
456
recount_servers(s->proxy);
457
recalc_server_map(s->proxy);
610
if (s->proxy->srv_bck == 0 && s->proxy->srv_act == 0) {
611
if (s->proxy->last_change < now.tv_sec) // ignore negative times
612
s->proxy->down_time += now.tv_sec - s->proxy->last_change;
613
s->proxy->last_change = now.tv_sec;
616
if (s->last_change < now.tv_sec) // ignore negative times
617
s->down_time += now.tv_sec - s->last_change;
619
s->last_change = now.tv_sec;
620
s->state |= SRV_RUNNING;
621
if (s->slowstart > 0) {
622
s->state |= SRV_WARMINGUP;
623
if (s->proxy->lbprm.algo & BE_LB_PROP_DYN) {
624
/* For dynamic algorithms, start at the first step of the weight,
625
* without multiplying by BE_WEIGHT_SCALE.
627
s->eweight = s->uweight;
628
if (s->proxy->lbprm.update_server_eweight)
629
s->proxy->lbprm.update_server_eweight(s);
632
s->proxy->lbprm.set_server_status_up(s);
459
634
/* check if we can handle some connections queued at the proxy. We
460
635
* will take as many as we can handle.
462
for (xferred = 0; !s->maxconn || xferred < srv_dynamic_maxconn(s); xferred++) {
463
struct session *sess;
466
p = pendconn_from_px(s->proxy);
472
task_wakeup(sess->task);
637
xferred = check_for_pending(s);
476
640
"%sServer %s/%s is UP. %d active and %d backup servers online.%s"
484
648
send_log(s->proxy, LOG_NOTICE, "%s", trash);
487
s->health = s->rise + s->fall - 1; /* OK now */
651
if (s->health >= s->rise)
652
s->health = s->rise + s->fall - 1; /* OK now */
489
654
s->curfd = -1; /* no check running anymore */
491
while (tv_isle(&t->expire, &now))
492
tv_ms_add(&t->expire, &t->expire, s->inter);
658
if (global.spread_checks > 0) {
659
rv = s->inter * global.spread_checks / 100;
660
rv -= (int) (2 * rv * (rand() / (RAND_MAX + 1.0)));
661
//fprintf(stderr, "process_chk(%p): (%d+/-%d%%) random=%d\n", s, s->inter, global.spread_checks, rv);
663
tv_ms_add(&t->expire, &now, s->inter + rv);
495
else if (s->result < 0 || tv_isle(&t->expire, &now)) {
666
else if ((s->result & SRV_CHK_ERROR) || tv_isle(&t->expire, &now)) {
496
667
//fprintf(stderr, "process_chk: 10\n");
497
668
/* failure or timeout detected */
498
669
if (s->health > s->rise) {
503
674
set_server_down(s);
506
while (tv_isle(&t->expire, &now))
507
tv_ms_add(&t->expire, &t->expire, s->inter);
679
if (global.spread_checks > 0) {
680
rv = s->inter * global.spread_checks / 100;
681
rv -= (int) (2 * rv * (rand() / (RAND_MAX + 1.0)));
682
//fprintf(stderr, "process_chk(%p): (%d+/-%d%%) random=%d\n", s, s->inter, global.spread_checks, rv);
684
tv_ms_add(&t->expire, &now, s->inter + rv);
510
/* if result is 0 and there's no timeout, we have to wait again */
687
/* if result is unknown and there's no timeout, we have to wait again */
512
689
//fprintf(stderr, "process_chk: 11\n");
690
s->result = SRV_CHK_UNKNOWN;
514
691
task_queue(t); /* restore t to its place in the task list */
515
692
*next = t->expire;
698
* Start health-check.
699
* Returns 0 if OK, -1 if error, and prints the error in this case.
706
int nbchk=0, mininter=0, srvpos=0;
708
/* 1- count the checkers to run simultaneously.
709
* We also determine the minimum interval among all of those which
710
* have an interval larger than SRV_CHK_INTER_THRES. This interval
711
* will be used to spread their start-up date. Those which have
712
* a shorter interval will start independantly and will not dictate
713
* too short an interval for all others.
715
for (px = proxy; px; px = px->next) {
716
for (s = px->srv; s; s = s->next) {
717
if (!(s->state & SRV_CHECKED))
720
if ((s->inter >= SRV_CHK_INTER_THRES) &&
721
(!mininter || mininter > s->inter))
731
srand((unsigned)time(NULL));
734
* 2- start them as far as possible from each others. For this, we will
735
* start them after their interval set to the min interval divided by
736
* the number of servers, weighted by the server's position in the list.
738
for (px = proxy; px; px = px->next) {
739
for (s = px->srv; s; s = s->next) {
740
if (!(s->state & SRV_CHECKED))
743
if ((t = pool_alloc2(pool2_task)) == NULL) {
744
Alert("Starting [%s:%s] check: out of memory.\n", px->id, s->id);
750
t->state = TASK_IDLE;
751
t->process = process_chk;
754
/* check this every ms */
755
tv_ms_add(&t->expire, &now,
756
((mininter && mininter >= s->inter) ? mininter : s->inter) * srvpos / nbchk);
522
766
* Local variables: