3
A brief file description
5
@section license License
7
Licensed to the Apache Software Foundation (ASF) under one
8
or more contributor license agreements. See the NOTICE file
9
distributed with this work for additional information
10
regarding copyright ownership. The ASF licenses this file
11
to you under the Apache License, Version 2.0 (the
12
"License"); you may not use this file except in compliance
13
with the License. You may obtain a copy of the License at
15
http://www.apache.org/licenses/LICENSE-2.0
17
Unless required by applicable law or agreed to in writing, software
18
distributed under the License is distributed on an "AS IS" BASIS,
19
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20
See the License for the specific language governing permissions and
21
limitations under the License.
25
/****************************************************************************
28
****************************************************************************/
30
#include "P_Cluster.h"
31
// updated from the cluster port configuration variable
32
int cluster_port = DEFAULT_CLUSTER_PORT_NUMBER;
34
ClusterAccept::ClusterAccept(int *port, int send_bufsize, int recv_bufsize)
37
socket_send_bufsize(send_bufsize),
38
socket_recv_bufsize(recv_bufsize),
40
current_cluster_port(-1),
44
mutex = new_ProxyMutex();
45
SET_HANDLER(&ClusterAccept::ClusterAcceptEvent);
48
ClusterAccept::~ClusterAccept()
56
// Setup initial accept by simulating EVENT_INTERVAL
57
// where cluster accept port has changed.
59
current_cluster_port = ~*p_cluster_port;
60
ClusterAcceptEvent(EVENT_INTERVAL, 0);
62
// Setup periodic event to handle changing cluster accept port.
63
periodic_event = eventProcessor.schedule_every(this, HRTIME_SECONDS(60));
67
ClusterAccept::ShutdownDelete()
69
MUTEX_TRY_LOCK(lock, this->mutex, this_ethread());
71
eventProcessor.schedule_imm(this, ET_CALL);
74
// Kill all events and delete.
76
accept_action->cancel();
80
periodic_event->cancel();
87
ClusterAccept::ClusterAcceptEvent(int event, void *data)
97
int cluster_port = *p_cluster_port;
99
if (cluster_port != current_cluster_port) {
100
// Configuration changed cluster port, redo accept on new port.
102
accept_action->cancel();
105
NetProcessor::AcceptOptions opt;
106
opt.recv_bufsize = socket_recv_bufsize;
107
opt.send_bufsize = socket_send_bufsize;
108
opt.etype = ET_CLUSTER;
109
opt.port = cluster_port;
110
opt.domain = AF_INET;
111
accept_action = netProcessor.main_accept(this, NO_FD,
114
if (!accept_action) {
115
Warning("Unable to accept cluster connections on port: %d", cluster_port);
117
current_cluster_port = cluster_port;
122
case NET_EVENT_ACCEPT:
124
ClusterAcceptMachine((NetVConnection *) data);
129
Warning("ClusterAcceptEvent: received unknown event %d", event);
136
ClusterAccept::ClusterAcceptMachine(NetVConnection * NetVC)
138
// Validate remote IP address.
139
unsigned int remote_ip = NetVC->get_remote_ip();
140
MachineList *mc = the_cluster_machines_config();
142
if (mc && !mc->find(remote_ip)) {
143
Note("Illegal cluster connection from %u.%u.%u.%u", DOT_SEPARATED(remote_ip));
144
NetVC->do_io(VIO::CLOSE);
148
Debug(CL_NOTE, "Accepting machine %u.%u.%u.%u", DOT_SEPARATED(remote_ip));
149
ClusterHandler *ch = NEW(new ClusterHandler);
150
ch->machine = NEW(new ClusterMachine(NULL, remote_ip));
153
eventProcessor.schedule_imm(ch, ET_CLUSTER);
158
make_cluster_connections(MachineList * l, MachineList * old)
162
// Connect to all new machines.
164
uint32_t ip = this_cluster_machine()->ip;
167
for (int i = 0; i < l->n; i++)
168
#ifdef LOCAL_CLUSTER_TEST_MODE
169
if (ip < l->machine[i].ip || (ip == l->machine[i].ip && (cluster_port < l->machine[i].port)))
171
if (ip < l->machine[i].ip)
173
clusterProcessor.connect(l->machine[i].ip, l->machine[i].port);
178
machine_config_change(const char *name, RecDataT data_type, RecData data, void *cookie)
182
NOWARN_UNUSED(data_type);
183
// Handle changes to the cluster.config or machines.config
184
// file. cluster.config is the list of machines in the
185
// cluster proper ( in the cluster hash table ). machines.config
186
// is the list of machines which communicate with the cluster.
187
// This may include front-end load redirectors, machines going
188
// up or coming down etc.
190
char *filename = (char *) data.rec_string;
191
MachineList *l = read_MachineList(filename);
192
MachineList *old = NULL;
193
#ifdef USE_SEPARATE_MACHINE_CONFIG
194
switch ((int) cookie) {
196
old = machines_config;
200
old = cluster_config;
202
make_cluster_connections(l, old);
207
old = cluster_config;
210
make_cluster_connections(l, old);
213
free_MachineList(old);
218
do_machine_config_change(void *d, const char *s)
220
char cluster_config_filename[PATH_NAME_MAX] = "";
221
IOCORE_ReadConfigString(cluster_config_filename, s, sizeof(cluster_config_filename) - 1);
223
data.rec_string = cluster_config_filename;
224
machine_config_change(s, RECD_STRING, data, d);
227
/*************************************************************************/
228
// ClusterConfiguration member functions (Public Class)
229
/*************************************************************************/
230
ClusterConfiguration::ClusterConfiguration():n_machines(0), changed(0)
232
memset(machines, 0, sizeof(machines));
233
memset(hash_table, 0, sizeof(hash_table));
236
/*************************************************************************/
237
// ConfigurationContinuation member functions (Internal Class)
238
/*************************************************************************/
239
struct ConfigurationContinuation;
240
typedef int (ConfigurationContinuation::*CfgContHandler) (int, void *);
241
struct ConfigurationContinuation: public Continuation
243
ClusterConfiguration *c;
244
ClusterConfiguration *prev;
247
zombieEvent(int event, Event * e)
249
NOWARN_UNUSED(event);
250
prev->link.next = NULL; // remove that next pointer
251
SET_HANDLER((CfgContHandler) & ConfigurationContinuation::dieEvent);
252
e->schedule_in(CLUSTER_CONFIGURATION_ZOMBIE);
257
dieEvent(int event, Event * e)
266
ConfigurationContinuation(ClusterConfiguration * cc, ClusterConfiguration * aprev)
267
: Continuation(NULL), c(cc), prev(aprev) {
268
mutex = new_ProxyMutex();
269
SET_HANDLER((CfgContHandler) & ConfigurationContinuation::zombieEvent);
274
free_configuration(ClusterConfiguration * c, ClusterConfiguration * prev)
277
// Delete the configuration after a time.
278
// The problem is that configurations change infrequently, and
279
// are used in different threads, so reference counts are
280
// relatively difficult and expensive. The solution I have
281
// chosen is to simply delete the object after some (very long)
282
// time after it has ceased to be accessable.
284
eventProcessor.schedule_in(NEW(new ConfigurationContinuation(c, prev)), CLUSTER_CONFIGURATION_TIMEOUT, ET_CALL);
287
ClusterConfiguration *
288
configuration_add_machine(ClusterConfiguration * c, ClusterMachine * m)
290
// Build a new cluster configuration with the new machine.
291
// Machines are stored in ip sorted order.
293
EThread *thread = this_ethread();
294
ProxyMutex *mutex = thread->mutex;
296
ClusterConfiguration *cc = NEW(new ClusterConfiguration(*c));
298
// Find the place to insert this new machine
300
for (i = 0; i < cc->n_machines; i++) {
301
if (cc->machines[i]->ip > m->ip)
305
// Move the other machines out of the way
307
for (int j = cc->n_machines - 1; j >= i; j--)
308
cc->machines[j + 1] = cc->machines[j];
316
cc->changed = ink_get_hrtime();
317
ink_assert(cc->n_machines < CLUSTER_MAX_MACHINES);
319
build_cluster_hash_table(cc);
320
INK_MEMORY_BARRIER; // commit writes before freeing old hash table
321
CLUSTER_INCREMENT_DYN_STAT(CLUSTER_CONFIGURATION_CHANGES_STAT);
323
free_configuration(c, cc);
327
ClusterConfiguration *
328
configuration_remove_machine(ClusterConfiguration * c, ClusterMachine * m)
330
EThread *thread = this_ethread();
331
ProxyMutex *mutex = thread->mutex;
334
// Build a new cluster configuration without a machine
336
ClusterConfiguration *cc = NEW(new ClusterConfiguration(*c));
338
// remove m and move others down
340
for (int i = 0; i < cc->n_machines - 1; i++)
341
if (m == cc->machines[i])
342
m = cc->machines[i] = cc->machines[i + 1];
345
ink_assert(cc->n_machines > 0);
348
cc->changed = ink_get_hrtime();
350
build_cluster_hash_table(cc);
351
INK_MEMORY_BARRIER; // commit writes before freeing old hash table
352
CLUSTER_INCREMENT_DYN_STAT(CLUSTER_CONFIGURATION_CHANGES_STAT);
354
free_configuration(c, cc);
358
/*************************************************************************/
359
// Exported functions
360
/*************************************************************************/
363
// cluster_machine_depth_list()
364
// Return a machine list based on the given hash which is ordered from
365
// now to the past. Returned list contains no duplicates and depth_list[0]
366
// entry (current configuration) is always valid. Entries following
367
// depth_list[0] are considered online.
368
// Returns 0 on Success else non-zero on error.
371
cluster_machine_depth_list(unsigned int hash, ClusterMachine ** depth_list, int depth_list_size)
373
NOWARN_UNUSED(depth_list_size);
375
ClusterConfiguration *cc = this_cluster()->current_configuration();
376
ClusterMachine *m = cc->machine_hash(hash);
381
while (!cc && n < (CONFIGURATION_HISTORY_PROBE_DEPTH + 1)) {
382
m = cc->machine_hash(hash);
384
if (m->dead || machine_in_vector(m, depth_list, n)) {
385
depth_list[n++] = (ClusterMachine *) 0; // duplicate or offline
387
depth_list[n++] = m; // unique valid online entry
395
// cluster_machine_at_depth()
396
// Find a machine at a particular depth into the past.
397
// We don't want to probe the current machine or machines
398
// we have probed before, so we store a list of "past_probes".
399
// If probe_depth and past_probes are NULL we only want the
400
// owner (machine now as opposed to in the past).
403
cluster_machine_at_depth(unsigned int hash, int *pprobe_depth, ClusterMachine ** past_probes)
405
#ifdef CLUSTER_TOMCAT
406
if (!cache_clustering_enabled)
409
ClusterConfiguration *
410
cc = this_cluster()->current_configuration();
411
ClusterConfiguration *
413
ink_hrtime now = ink_get_hrtime();
414
int fake_probe_depth = 0;
415
int &probe_depth = pprobe_depth ? (*pprobe_depth) : fake_probe_depth;
416
int tprobe_depth = probe_depth;
419
if (cc->n_machines > 1) {
420
for (int i = 0; i < cc->n_machines; ++i) {
421
if (cc->machines[i] != this_cluster_machine()) {
422
return cc->machines[i];
426
#endif // CLUSTER_TEST
429
// If we are out of our depth, fail
431
if (probe_depth > CONFIGURATION_HISTORY_PROBE_DEPTH)
434
// If there is no configuration, fail
440
next_cc = next_cc->link.next;
442
// Find the correct configuration
445
if (cc->changed > (now + CLUSTER_CONFIGURATION_TIMEOUT))
452
m = cc->machine_hash(hash);
454
// If it is not this machine, or a machine we have done before
455
// and one that is still up, try again
458
ok = !(m == this_cluster_machine() || (past_probes && machine_in_vector(m, past_probes, probe_depth)) || m->dead);
460
// Store the all but the last probe, so that we never return
463
if (past_probes && probe_depth < CONFIGURATION_HISTORY_PROBE_DEPTH)
464
past_probes[probe_depth] = m;
469
break; // don't go down if we don't have a depth
479
// initialize_thread_for_cluster()
480
// This is not required since we have a separate handler
481
// for each machine-machine pair, the pointers to which are
482
// stored in the ClusterMachine structures
485
initialize_thread_for_cluster(EThread * e)
490
/*************************************************************************/
491
// Cluster member functions (Public Class)
492
/*************************************************************************/
497
// End of ClusterConfig.cc