34
34
#include "ncbi_ansi_ext.h"
35
35
#include "ncbi_comm.h"
36
36
#include "ncbi_dispd.h"
37
38
#include "ncbi_priv.h"
38
39
#include <connect/ncbi_connection.h>
39
40
#include <connect/ncbi_http_connector.h>
40
#include <connect/ncbi_service_misc.h>
43
43
#include <stdlib.h>
46
#if 0/*defined(_DEBUG) && !defined(NDEBUG)*/
47
# define SERV_DISPD_DEBUG 1
48
#endif /*_DEBUG && !NDEBUG*/
50
45
/* Lower bound of up-to-date/out-of-date ratio */
51
46
#define SERV_DISPD_STALE_RATIO_OK 0.8
52
/* Default rate increase if svc runs locally */
47
/* Default rate increase 20% if svc runs locally */
53
48
#define SERV_DISPD_LOCAL_SVC_BONUS 1.2
56
/* Dispatcher messaging support */
57
static int s_MessageIssued = 0;
58
static FDISP_MessageHook s_MessageHook = 0;
63
53
#endif /*__cplusplus*/
64
54
static void s_Reset (SERV_ITER);
65
55
static SSERV_Info* s_GetNextInfo(SERV_ITER, HOST_INFO*);
66
static int/*bool*/ s_Update (SERV_ITER, TNCBI_Time, const char*);
56
static int/*bool*/ s_Update (SERV_ITER, const char*, int);
67
57
static void s_Close (SERV_ITER);
69
59
static const SSERV_VTable s_op = {
74
64
#endif /*__cplusplus*/
77
int g_NCBIConnectRandomSeed = 0;
87
int/*bool*/ disp_fail;
88
SConnNetInfo* net_info;
96
static int/*bool*/ s_AddServerInfo(SDISPD_Data* data, SSERV_Info* info)
68
int/*bool*/ disp_fail;
69
SConnNetInfo* net_info;
76
static int/*bool*/ s_AddServerInfo(struct SDISPD_Data* data, SSERV_Info* info)
79
const char* name = SERV_NameOfInfo(info);
100
80
/* First check that the new server info updates an existing one */
101
for (i = 0; i < data->n_node; i++) {
102
if (SERV_EqualInfo(data->s_node[i].info, info)) {
81
for (i = 0; i < data->n_cand; i++) {
82
if (strcasecmp(name, SERV_NameOfInfo(data->cand[i].info))
83
&& SERV_EqualInfo(info, data->cand[i].info)) {
103
84
/* Replace older version */
104
free(data->s_node[i].info);
105
data->s_node[i].info = info;
85
free((void*) data->cand[i].info);
86
data->cand[i].info = info;
110
90
/* Next, add new service to the list */
111
if (data->n_node == data->a_node) {
112
size_t n = data->a_node + 10;
116
temp = (SDISPD_Node*) realloc(data->s_node, sizeof(*temp) * n);
118
temp = (SDISPD_Node*) malloc(sizeof(*temp) * n);
91
if (data->n_cand == data->a_cand) {
92
size_t n = data->a_cand + 10;
93
SLB_Candidate* temp = (SLB_Candidate*)
95
? realloc(data->cand, n * sizeof(*temp))
96
: malloc ( n * sizeof(*temp)));
126
data->s_node[data->n_node++].info = info;
102
data->cand[data->n_cand++].info = info;
158
SDISPD_Data* data = (SDISPD_Data*)((SERV_ITER) iter)->data;
134
struct SDISPD_Data* data = (struct SDISPD_Data*)((SERV_ITER) iter)->data;
159
135
return data->disp_fail ? 0/*failed*/ : 1/*try again*/;
163
139
static int/*bool*/ s_Resolve(SERV_ITER iter)
165
static const char service[] = "service";
166
static const char address[] = "address";
167
static const char platform[] = "platform";
168
SDISPD_Data* data = (SDISPD_Data*) iter->data;
169
SConnNetInfo *net_info = data->net_info;
141
struct SDISPD_Data* data = (struct SDISPD_Data*) iter->data;
142
SConnNetInfo* net_info = data->net_info;
170
143
CONNECTOR conn = 0;
177
/* Dispatcher CGI arguments (sacrifice some if they all do not fit) */
178
if ((arch = CORE_GetPlatform()) != 0 && *arch)
179
ConnNetInfo_PreOverrideArg(net_info, platform, arch);
180
if (*net_info->client_host && !strchr(net_info->client_host, '.') &&
181
(ip = SOCK_gethostbyname(net_info->client_host)) != 0 &&
182
SOCK_ntoa(ip, addr, sizeof(addr)) == 0) {
183
if ((s = (char*) malloc(strlen(net_info->client_host) +
184
strlen(addr) + 3)) != 0) {
185
sprintf(s, "%s(%s)", net_info->client_host, addr);
187
s = net_info->client_host;
189
s = net_info->client_host;
191
ConnNetInfo_PreOverrideArg(net_info, address, s);
192
if (s != net_info->client_host)
194
if (!ConnNetInfo_PreOverrideArg(net_info, service, iter->name)) {
195
ConnNetInfo_DeleteArg(net_info, platform);
196
if (!ConnNetInfo_PreOverrideArg(net_info, service, iter->name)) {
197
ConnNetInfo_DeleteArg(net_info, address);
198
if (!ConnNetInfo_PreOverrideArg(net_info, service, iter->name))
202
/* Reset request method to be GET ('cause no HTTP body will follow) */
203
net_info->req_method = eReqMethod_Get;
147
assert(!!net_info->stateless == !!iter->stateless);
204
148
/* Obtain additional header information */
205
149
if ((!(s = SERV_Print(iter, 0))
206
|| ConnNetInfo_OverrideUserHeader(net_info, s)) &&
207
ConnNetInfo_OverrideUserHeader(net_info, net_info->stateless
208
?"Client-Mode: STATELESS_ONLY\r\n"
209
:"Client-Mode: STATEFUL_CAPABLE\r\n") &&
210
ConnNetInfo_OverrideUserHeader(net_info,
211
"Dispatch-Mode: INFORMATION_ONLY\r\n")){
212
ConnNetInfo_OverrideUserHeader
213
(net_info, "User-Agent: NCBIServiceDispatcher/"
214
DISP_PROTOCOL_VERSION
215
#ifdef NCBI_CXX_TOOLKIT
219
#endif /*NCBI_CXX_TOOLKIT*/
150
|| ConnNetInfo_OverrideUserHeader(net_info, s)) &&
151
ConnNetInfo_OverrideUserHeader(net_info, !iter->promiscuous
152
? "Dispatch-Mode: INFORMATION_ONLY\r\n"
153
: "Dispatch-Mode: PROMISCUOUS\r\n") &&
154
ConnNetInfo_OverrideUserHeader(net_info, iter->reverse_dns
155
? "Client-Mode: REVERSE_DNS\r\n"
156
: !net_info->stateless
157
? "Client-Mode: STATEFUL_CAPABLE\r\n"
158
: "Client-Mode: STATELESS_ONLY\r\n")) {
159
/* all the rest in the net_info structure should be already fine */
221
160
data->disp_fail = 0;
222
/* All the rest in the net_info structure is fine with us */
223
161
conn = HTTP_CreateConnectorEx(net_info, fHCC_SureFlush, s_ParseHeader,
224
162
s_Adjust, iter/*data*/, 0/*cleanup*/);
236
174
/* This will also send all the HTTP data, and trigger header callback */
239
return ((SDISPD_Data*) iter->data)->n_node != 0;
177
return data->n_cand != 0 ||
178
(!data->disp_fail && net_info->stateless && net_info->firewall);
243
static int/*bool*/ s_Update(SERV_ITER iter, TNCBI_Time now, const char* text)
182
static int/*bool*/ s_Update(SERV_ITER iter, const char* text, int code)
245
static const char service_name[] = "Service: ";
246
184
static const char server_info[] = "Server-Info-";
247
SDISPD_Data* data = (SDISPD_Data*) iter->data;
248
size_t len = strlen(text);
185
struct SDISPD_Data* data = (struct SDISPD_Data*) iter->data;
250
if (len >= sizeof(service_name) &&
251
strncasecmp(text, service_name, sizeof(service_name) - 1) == 0) {
254
free((void*) data->name);
255
text += sizeof(service_name) - 1;
256
while (*text && isspace((unsigned char)(*text)))
258
data->name = strdup(text);
260
} else if (len >= sizeof(server_info) &&
261
strncasecmp(text, server_info, sizeof(server_info) - 1) == 0) {
262
const char* name = data->name ? data->name : "";
190
if (strncasecmp(text, server_info, sizeof(server_info) - 1) == 0) {
263
192
SSERV_Info* info;
267
197
text += sizeof(server_info) - 1;
268
if (sscanf(text, "%u: %n", &d1, &d2) < 1)
198
if (sscanf(text, "%u: %n", &d1, &d2) < 1 || d1 < 1)
269
199
return 0/*not updated*/;
270
info = SERV_ReadInfoEx(text + d2, strlen(name) + 1);
200
if (iter->ismask || iter->reverse_dns) {
202
if (!(s = strdup(text + d2)))
203
return 0/*not updated*/;
205
while (*name && isspace((unsigned char)(*name)))
209
return 0/*not updated*/;
211
for (c = s + (name - s); *c; c++) {
212
if (isspace((unsigned char)(*c)))
221
info = SERV_ReadInfoEx(text + d2, name);
272
assert(info->rate != 0.0);
273
info->time += now; /* expiration time now */
274
strcpy((char*) info + SERV_SizeOfInfo(info), name);
225
if (info->time != NCBI_TIME_INFINITE)
226
info->time += iter->time; /* expiration time now */
275
227
if (s_AddServerInfo(data, info))
276
228
return 1/*updated*/;
279
} else if (len >= sizeof(HTTP_DISP_FAILURES) &&
280
strncasecmp(text, HTTP_DISP_FAILURES,
281
sizeof(HTTP_DISP_FAILURES) - 1) == 0) {
231
} else if (((failure = strncasecmp(text, HTTP_DISP_FAILURES,
232
sizeof(HTTP_DISP_FAILURES) - 1) == 0)
233
|| strncasecmp(text, HTTP_DISP_MESSAGES,
234
sizeof(HTTP_DISP_MESSAGES) - 1) == 0)) {
235
assert(sizeof(HTTP_DISP_FAILURES) == sizeof(HTTP_DISP_MESSAGES));
282
236
#if defined(_DEBUG) && !defined(NDEBUG)
283
text += sizeof(HTTP_DISP_FAILURES) - 1;
284
while (*text && isspace((unsigned char)(*text)))
286
if (data->net_info->debug_printout)
287
CORE_LOGF(eLOG_Warning, ("[DISPATCHER] %s", text));
237
if (data->net_info->debug_printout) {
238
text += sizeof(HTTP_DISP_FAILURES) - 1;
239
while (*text && isspace((unsigned char)(*text)))
241
CORE_LOGF(eLOG_Warning, ("[DISPATCHER %s] %s",
242
failure ? "FAILURE" : "MESSAGE", text));
288
244
#endif /*_DEBUG && !NDEBUG*/
290
247
return 1/*updated*/;
291
} else if (len >= sizeof(HTTP_DISP_MESSAGE) &&
292
strncasecmp(text, HTTP_DISP_MESSAGE,
293
sizeof(HTTP_DISP_MESSAGE) - 1) == 0) {
294
text += sizeof(HTTP_DISP_MESSAGE) - 1;
295
while (*text && isspace((unsigned char)(*text)))
298
if (s_MessageIssued <= 0) {
303
s_MessageIssued = -1;
304
CORE_LOGF(eLOG_Warning, ("[DISPATCHER] %s", text));
308
250
return 0/*not updated*/;
312
static int/*bool*/ s_IsUpdateNeeded(SDISPD_Data *data)
254
static int/*bool*/ s_IsUpdateNeeded(TNCBI_Time now, struct SDISPD_Data *data)
314
256
double status = 0.0, total = 0.0;
317
TNCBI_Time t = (TNCBI_Time) time(0);
319
while (i < data->n_node) {
320
SSERV_Info* info = data->s_node[i].info;
260
while (i < data->n_cand) {
261
const SSERV_Info* info = data->cand[i].info;
322
263
total += info->rate;
323
if (info->time < t) {
324
if (i < --data->n_node)
325
memmove(data->s_node + i, data->s_node + i + 1,
326
(data->n_node - i)*sizeof(*data->s_node));
264
if (info->time < now) {
265
if (i < --data->n_cand) {
266
memmove(data->cand + i, data->cand + i + 1,
267
sizeof(*data->cand)*(data->n_cand - i));
329
271
status += info->rate;
335
return total == 0.0 ? 1 : (status/total < SERV_DISPD_STALE_RATIO_OK);
277
return total == 0.0 ? 1 : status/total < SERV_DISPD_STALE_RATIO_OK;
281
static SLB_Candidate* s_GetCandidate(void* user_data, size_t n)
283
struct SDISPD_Data* data = (struct SDISPD_Data*) user_data;
284
return n < data->n_cand ? &data->cand[n] : 0;
339
288
static SSERV_Info* s_GetNextInfo(SERV_ITER iter, HOST_INFO* host_info)
341
double total = 0.0, point = 0.0, access = 0.0, p = 0.0, status;
342
SDISPD_Data* data = (SDISPD_Data*) iter->data;
290
struct SDISPD_Data* data = (struct SDISPD_Data*) iter->data;
343
291
SSERV_Info* info;
349
if (s_IsUpdateNeeded(data) && !s_Resolve(iter))
351
assert(data->n_node != 0);
353
for (i = 0; i < data->n_node; i++) {
354
info = data->s_node[i].info;
356
assert(status != 0.0);
358
if (iter->host == info->host ||
360
info->locl && info->coef < 0.0)) {
361
if (iter->pref || info->coef <= 0.0) {
362
status *= SERV_DISPD_LOCAL_SVC_BONUS;
363
if (access < status &&
364
(iter->pref || info->coef < 0.0)) {
366
point = total + status; /* Latch this local server */
371
status *= info->coef;
374
data->s_node[i].status = total;
377
if (point > 0.0 && iter->pref) {
378
if (total != access) {
379
p = SERV_Preference(iter->pref, access/total, data->n_node);
380
#ifdef SERV_DISPD_DEBUG
381
CORE_LOGF(eLOG_Note, ("(P = %lf, A = %lf, T = %lf, N = %d)"
382
" -> Pref = %lf", iter->pref,
383
access, total, (int) data->n_node, p));
384
#endif /*SERV_DISPD_DEBUG*/
386
p = total*(1.0 - p)/(total - access);
387
for (i = 0; i < data->n_node; i++) {
388
data->s_node[i].status *= p;
389
if (p*point <= data->s_node[i].status)
390
data->s_node[i].status += status - p*access;
392
#ifdef SERV_DISPD_DEBUG
393
for (i = 0; i < data->n_node; i++) {
395
SOCK_ntoa(data->s_node[i].info->host, addr, sizeof(addr));
396
status = data->s_node[i].status -
397
(i ? data->s_node[i-1].status : 0.0);
398
CORE_LOGF(eLOG_Note, ("%s %lf %.2lf%%", addr,
399
status, status/total*100.0));
401
#endif /*SERV_DISPD_DEBUG*/
406
/* We take pre-chosen local server only if its status is not less than
407
p% of the average remaining status; otherwise, we ignore the server,
408
and apply the generic procedure by seeding a random point. */
409
if (point <= 0.0 || access*(data->n_node - 1) < p*0.01*(total - access))
410
point = (total * rand()) / (double) RAND_MAX;
411
for (i = 0; i < data->n_node; i++) {
412
if (point <= data->s_node[i].status)
415
assert(i < data->n_node);
417
info = data->s_node[i].info;
418
info->rate = data->s_node[i].status - (i ? data->s_node[i-1].status : 0.0);
419
if (i < --data->n_node) {
420
memmove(data->s_node + i, data->s_node + i + 1,
421
(data->n_node - i)*sizeof(*data->s_node));
295
if (s_IsUpdateNeeded(iter->time, data) &&
296
(!s_Resolve(iter) || !data->n_cand)) {
300
for (n = 0; n < data->n_cand; n++)
301
data->cand[n].status = data->cand[n].info->rate;
302
n = LB_Select(iter, data, s_GetCandidate, SERV_DISPD_LOCAL_SVC_BONUS);
303
info = (SSERV_Info*) data->cand[n].info;
304
info->rate = data->cand[n].status;
305
if (n < --data->n_cand) {
306
memmove(data->cand + n, data->cand + n + 1,
307
(data->n_cand - n) * sizeof(*data->cand));
430
317
static void s_Reset(SERV_ITER iter)
432
SDISPD_Data* data = (SDISPD_Data*) iter->data;
433
if (data && data->s_node) {
319
struct SDISPD_Data* data = (struct SDISPD_Data*) iter->data;
320
if (data && data->cand) {
435
assert(data->a_node);
436
for (i = 0; i < data->n_node; i++)
437
free(data->s_node[i].info);
440
free((void*) data->name);
322
assert(data->a_cand);
323
for (i = 0; i < data->n_cand; i++)
324
free((void*) data->cand[i].info);
447
330
static void s_Close(SERV_ITER iter)
449
SDISPD_Data* data = (SDISPD_Data*) iter->data;
450
assert(!data->n_node && !data->name);/*s_Reset() had to be called before */
332
struct SDISPD_Data* data = (struct SDISPD_Data*) iter->data;
333
assert(!data->n_cand); /* s_Reset() had to be called before */
453
336
ConnNetInfo_Destroy(data->net_info);
465
348
const SConnNetInfo* net_info,
466
349
SSERV_Info** info, HOST_INFO* u/*unused*/)
470
if (!(data = (SDISPD_Data*) calloc(1, sizeof(*data))))
472
if (g_NCBI_ConnectRandomSeed == 0) {
473
g_NCBI_ConnectRandomSeed = (int) time(0) ^ NCBI_CONNECT_SRAND_ADDEND;
474
srand(g_NCBI_ConnectRandomSeed);
351
struct SDISPD_Data* data;
353
if (!iter->ismask && strpbrk(iter->name, "?*"))
354
return 0/*failed to start unallowed wildcard search*/;
356
if (!(data = (struct SDISPD_Data*) calloc(1, sizeof(*data))))
359
assert(net_info); /*must called with non-NULL*/
360
if ((data->net_info = ConnNetInfo_Clone(net_info)) != 0)
361
data->net_info->service = iter->name; /* SetupStandardArgs() expects */
362
if (!ConnNetInfo_SetupStandardArgs(data->net_info)) {
363
ConnNetInfo_Destroy(data->net_info);
476
data->net_info = ConnNetInfo_Clone(net_info); /*called with non-NULL*/
477
if (iter->type & fSERV_StatelessOnly)
367
/* Reset request method to be GET ('cause no HTTP body is ever used) */
368
data->net_info->req_method = eReqMethod_Get;
478
370
data->net_info->stateless = 1/*true*/;
479
371
if (iter->type & fSERV_Firewall)
480
372
data->net_info->firewall = 1/*true*/;
373
ConnNetInfo_ExtendUserHeader(data->net_info,
374
"User-Agent: NCBIServiceDispatcher/"
375
DISP_PROTOCOL_VERSION
376
#ifdef NCBI_CXX_TOOLKIT
380
#endif /*NCBI_CXX_TOOLKIT*/
481
382
iter->data = data;
483
383
iter->op = &s_op; /* SERV_Update() - from HTTP callback - expects this */
385
if (g_NCBI_ConnectRandomSeed == 0) {
386
g_NCBI_ConnectRandomSeed = iter->time ^ NCBI_CONNECT_SRAND_ADDEND;
387
srand(g_NCBI_ConnectRandomSeed);
484
390
if (!s_Resolve(iter)) {
491
/* call GetNextInfo if info is needed */
397
/* call GetNextInfo subsequently if info is actually needed */
498
extern void DISP_SetMessageHook(FDISP_MessageHook hook)
501
if (hook != s_MessageHook)
502
s_MessageIssued = s_MessageIssued ? -1 : -2;
503
} else if (s_MessageIssued < -1)
505
s_MessageHook = hook;
510
405
* --------------------------------------------------------------------------
511
406
* $Log: ncbi_dispd.c,v $
407
* Revision 6.79 2006/04/05 14:59:32 lavr
410
* Revision 6.78 2006/03/05 17:38:12 lavr
411
* Adjust for SERV_ITER to now carry time (s_Update)
413
* Revision 6.77 2006/02/21 14:57:59 lavr
414
* Dispatcher to not require server-infos for stateless firewalled client
416
* Revision 6.76 2006/02/01 17:13:30 lavr
417
* Remove spurious definition of g_NCBI_ConnectRandomSeed (ncbi_priv.c has it)
419
* Revision 6.75 2006/01/17 20:25:32 lavr
420
* Handling of NCBI messages moved to HTTP connector
421
* Dispatcher messages handled separately (FAILURES vs MESSAGES)
423
* Revision 6.74 2006/01/11 20:26:29 lavr
424
* Take advantage of ConnNetInfo_SetupStandardArgs(); other related changes
426
* Revision 6.73 2006/01/11 16:29:17 lavr
427
* Take advantage of UTIL_ClientAddress(), some other minor improvements
429
* Revision 6.72 2005/12/23 18:18:17 lavr
430
* Rework to use newer dispatcher protocol to closely match the functionality
431
* of local (LBSM shmem based) service locator. Factoring out invariant
432
* initializations/tuneups; better detection of sufficiency of client addr.
434
* Revision 6.71 2005/12/16 16:00:16 lavr
435
* Take advantage of new generic LB API
437
* Revision 6.70 2005/12/14 21:33:15 lavr
438
* Use new SERV_ReadInfoEx() prototype
440
* Revision 6.69 2005/12/08 03:52:28 lavr
441
* Do not override User-Agent but extend it
512
443
* Revision 6.68 2005/07/11 18:23:14 lavr
513
444
* Break grounds for receiving wildcard matches via HTTP header tags