~ps10gel/ubuntu/xenial/trafficserver/6.2.0

« back to all changes in this revision

Viewing changes to iocore/cluster/ClusterConfig.cc

  • Committer: Bazaar Package Importer
  • Author(s): Arno Toell
  • Date: 2011-01-13 11:49:18 UTC
  • Revision ID: james.westby@ubuntu.com-20110113114918-vu422h8dknrgkj15
Tags: upstream-2.1.5-unstable
ImportĀ upstreamĀ versionĀ 2.1.5-unstable

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/** @file
 
2
 
 
3
  A brief file description
 
4
 
 
5
  @section license License
 
6
 
 
7
  Licensed to the Apache Software Foundation (ASF) under one
 
8
  or more contributor license agreements.  See the NOTICE file
 
9
  distributed with this work for additional information
 
10
  regarding copyright ownership.  The ASF licenses this file
 
11
  to you under the Apache License, Version 2.0 (the
 
12
  "License"); you may not use this file except in compliance
 
13
  with the License.  You may obtain a copy of the License at
 
14
 
 
15
      http://www.apache.org/licenses/LICENSE-2.0
 
16
 
 
17
  Unless required by applicable law or agreed to in writing, software
 
18
  distributed under the License is distributed on an "AS IS" BASIS,
 
19
  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
20
  See the License for the specific language governing permissions and
 
21
  limitations under the License.
 
22
 */
 
23
 
 
24
 
 
25
/****************************************************************************
 
26
 
 
27
  ClusterConfig.cc
 
28
****************************************************************************/
 
29
 
 
30
#include "P_Cluster.h"
 
31
// updated from the cluster port configuration variable
 
32
int cluster_port = DEFAULT_CLUSTER_PORT_NUMBER;
 
33
 
 
34
ClusterAccept::ClusterAccept(int *port, int send_bufsize, int recv_bufsize)
 
35
  : Continuation(0),
 
36
    p_cluster_port(port),
 
37
    socket_send_bufsize(send_bufsize),
 
38
    socket_recv_bufsize(recv_bufsize),
 
39
    socket_opt_flags(0),
 
40
    current_cluster_port(-1),
 
41
    accept_action(0),
 
42
    periodic_event(0)
 
43
{
 
44
  mutex = new_ProxyMutex();
 
45
  SET_HANDLER(&ClusterAccept::ClusterAcceptEvent);
 
46
}
 
47
 
 
48
ClusterAccept::~ClusterAccept()
 
49
{
 
50
  mutex = 0;
 
51
}
 
52
 
 
53
void
 
54
ClusterAccept::Init()
 
55
{
 
56
  // Setup initial accept by simulating EVENT_INTERVAL
 
57
  // where cluster accept port has changed.
 
58
 
 
59
  current_cluster_port = ~*p_cluster_port;
 
60
  ClusterAcceptEvent(EVENT_INTERVAL, 0);
 
61
 
 
62
  // Setup periodic event to handle changing cluster accept port.
 
63
  periodic_event = eventProcessor.schedule_every(this, HRTIME_SECONDS(60));
 
64
}
 
65
 
 
66
void
 
67
ClusterAccept::ShutdownDelete()
 
68
{
 
69
  MUTEX_TRY_LOCK(lock, this->mutex, this_ethread());
 
70
  if (!lock) {
 
71
    eventProcessor.schedule_imm(this, ET_CALL);
 
72
    return;
 
73
  }
 
74
  // Kill all events and delete.
 
75
  if (accept_action) {
 
76
    accept_action->cancel();
 
77
    accept_action = 0;
 
78
  }
 
79
  if (periodic_event) {
 
80
    periodic_event->cancel();
 
81
    periodic_event = 0;
 
82
  }
 
83
  delete this;
 
84
}
 
85
 
 
86
int
 
87
ClusterAccept::ClusterAcceptEvent(int event, void *data)
 
88
{
 
89
  switch (event) {
 
90
  case EVENT_IMMEDIATE:
 
91
    {
 
92
      ShutdownDelete();
 
93
      return EVENT_DONE;
 
94
    }
 
95
  case EVENT_INTERVAL:
 
96
    {
 
97
      int cluster_port = *p_cluster_port;
 
98
 
 
99
      if (cluster_port != current_cluster_port) {
 
100
        // Configuration changed cluster port, redo accept on new port.
 
101
        if (accept_action) {
 
102
          accept_action->cancel();
 
103
          accept_action = 0;
 
104
        }
 
105
        NetProcessor::AcceptOptions opt;
 
106
        opt.recv_bufsize = socket_recv_bufsize;
 
107
        opt.send_bufsize = socket_send_bufsize;
 
108
        opt.etype = ET_CLUSTER;
 
109
        opt.port = cluster_port;
 
110
        opt.domain = AF_INET;
 
111
        accept_action = netProcessor.main_accept(this, NO_FD,
 
112
                                                 NULL, NULL,
 
113
                                                 false, opt);
 
114
        if (!accept_action) {
 
115
          Warning("Unable to accept cluster connections on port: %d", cluster_port);
 
116
        } else {
 
117
          current_cluster_port = cluster_port;
 
118
        }
 
119
      }
 
120
      return EVENT_CONT;
 
121
    }
 
122
  case NET_EVENT_ACCEPT:
 
123
    {
 
124
      ClusterAcceptMachine((NetVConnection *) data);
 
125
      return EVENT_DONE;
 
126
    }
 
127
  default:
 
128
    {
 
129
      Warning("ClusterAcceptEvent: received unknown event %d", event);
 
130
      return EVENT_DONE;
 
131
    }
 
132
  }                             // End of switch
 
133
}
 
134
 
 
135
int
 
136
ClusterAccept::ClusterAcceptMachine(NetVConnection * NetVC)
 
137
{
 
138
  // Validate remote IP address.
 
139
  unsigned int remote_ip = NetVC->get_remote_ip();
 
140
  MachineList *mc = the_cluster_machines_config();
 
141
 
 
142
  if (mc && !mc->find(remote_ip)) {
 
143
    Note("Illegal cluster connection from %u.%u.%u.%u", DOT_SEPARATED(remote_ip));
 
144
    NetVC->do_io(VIO::CLOSE);
 
145
    return 0;
 
146
  }
 
147
 
 
148
  Debug(CL_NOTE, "Accepting machine %u.%u.%u.%u", DOT_SEPARATED(remote_ip));
 
149
  ClusterHandler *ch = NEW(new ClusterHandler);
 
150
  ch->machine = NEW(new ClusterMachine(NULL, remote_ip));
 
151
  ch->ip = remote_ip;
 
152
  ch->net_vc = NetVC;
 
153
  eventProcessor.schedule_imm(ch, ET_CLUSTER);
 
154
  return 1;
 
155
}
 
156
 
 
157
static void
 
158
make_cluster_connections(MachineList * l, MachineList * old)
 
159
{
 
160
  NOWARN_UNUSED(old);
 
161
  //
 
162
  // Connect to all new machines.
 
163
  //
 
164
  uint32_t ip = this_cluster_machine()->ip;
 
165
 
 
166
  if (l) {
 
167
    for (int i = 0; i < l->n; i++)
 
168
#ifdef LOCAL_CLUSTER_TEST_MODE
 
169
      if (ip < l->machine[i].ip || (ip == l->machine[i].ip && (cluster_port < l->machine[i].port)))
 
170
#else
 
171
      if (ip < l->machine[i].ip)
 
172
#endif
 
173
        clusterProcessor.connect(l->machine[i].ip, l->machine[i].port);
 
174
  }
 
175
}
 
176
 
 
177
int
 
178
machine_config_change(const char *name, RecDataT data_type, RecData data, void *cookie)
 
179
{
 
180
  NOWARN_UNUSED(name);
 
181
  NOWARN_UNUSED(data);
 
182
  NOWARN_UNUSED(data_type);
 
183
  // Handle changes to the cluster.config or machines.config
 
184
  // file.  cluster.config is the list of machines in the
 
185
  // cluster proper ( in the cluster hash table ).  machines.config
 
186
  // is the list of machines which communicate with the cluster.
 
187
  // This may include front-end load redirectors, machines going
 
188
  // up or coming down etc.
 
189
  //
 
190
  char *filename = (char *) data.rec_string;
 
191
  MachineList *l = read_MachineList(filename);
 
192
  MachineList *old = NULL;
 
193
#ifdef USE_SEPARATE_MACHINE_CONFIG
 
194
  switch ((int) cookie) {
 
195
  case MACHINE_CONFIG:
 
196
    old = machines_config;
 
197
    machines_config = l;
 
198
    break;
 
199
  case CLUSTER_CONFIG:
 
200
    old = cluster_config;
 
201
    cluster_config = l;
 
202
    make_cluster_connections(l, old);
 
203
    break;
 
204
  }
 
205
#else
 
206
  (void) cookie;
 
207
  old = cluster_config;
 
208
  machines_config = l;
 
209
  cluster_config = l;
 
210
  make_cluster_connections(l, old);
 
211
#endif
 
212
  if (old)
 
213
    free_MachineList(old);
 
214
  return 0;
 
215
}
 
216
 
 
217
void
 
218
do_machine_config_change(void *d, const char *s)
 
219
{
 
220
  char cluster_config_filename[PATH_NAME_MAX] = "";
 
221
  IOCORE_ReadConfigString(cluster_config_filename, s, sizeof(cluster_config_filename) - 1);
 
222
  RecData data;
 
223
  data.rec_string = cluster_config_filename;
 
224
  machine_config_change(s, RECD_STRING, data, d);
 
225
}
 
226
 
 
227
/*************************************************************************/
 
228
// ClusterConfiguration member functions (Public Class)
 
229
/*************************************************************************/
 
230
ClusterConfiguration::ClusterConfiguration():n_machines(0), changed(0)
 
231
{
 
232
  memset(machines, 0, sizeof(machines));
 
233
  memset(hash_table, 0, sizeof(hash_table));
 
234
}
 
235
 
 
236
/*************************************************************************/
 
237
// ConfigurationContinuation member functions (Internal Class)
 
238
/*************************************************************************/
 
239
struct ConfigurationContinuation;
 
240
typedef int (ConfigurationContinuation::*CfgContHandler) (int, void *);
 
241
struct ConfigurationContinuation: public Continuation
 
242
{
 
243
  ClusterConfiguration *c;
 
244
  ClusterConfiguration *prev;
 
245
 
 
246
  int
 
247
  zombieEvent(int event, Event * e)
 
248
  {
 
249
    NOWARN_UNUSED(event);
 
250
    prev->link.next = NULL;     // remove that next pointer
 
251
    SET_HANDLER((CfgContHandler) & ConfigurationContinuation::dieEvent);
 
252
    e->schedule_in(CLUSTER_CONFIGURATION_ZOMBIE);
 
253
    return EVENT_CONT;
 
254
  }
 
255
 
 
256
  int
 
257
  dieEvent(int event, Event * e)
 
258
  {
 
259
    (void) event;
 
260
    (void) e;
 
261
    delete c;
 
262
    delete this;
 
263
    return EVENT_DONE;
 
264
  }
 
265
 
 
266
  ConfigurationContinuation(ClusterConfiguration * cc, ClusterConfiguration * aprev)
 
267
    : Continuation(NULL), c(cc), prev(aprev) {
 
268
    mutex = new_ProxyMutex();
 
269
    SET_HANDLER((CfgContHandler) & ConfigurationContinuation::zombieEvent);
 
270
  }
 
271
};
 
272
 
 
273
static void
 
274
free_configuration(ClusterConfiguration * c, ClusterConfiguration * prev)
 
275
{
 
276
  //
 
277
  // Delete the configuration after a time.
 
278
  // The problem is that configurations change infrequently, and
 
279
  // are used in different threads, so reference counts are
 
280
  // relatively difficult and expensive.  The solution I have
 
281
  // chosen is to simply delete the object after some (very long)
 
282
  // time after it has ceased to be accessable.
 
283
  //
 
284
  eventProcessor.schedule_in(NEW(new ConfigurationContinuation(c, prev)), CLUSTER_CONFIGURATION_TIMEOUT, ET_CALL);
 
285
}
 
286
 
 
287
ClusterConfiguration *
 
288
configuration_add_machine(ClusterConfiguration * c, ClusterMachine * m)
 
289
{
 
290
  // Build a new cluster configuration with the new machine.
 
291
  // Machines are stored in ip sorted order.
 
292
  //
 
293
  EThread *thread = this_ethread();
 
294
  ProxyMutex *mutex = thread->mutex;
 
295
  int i = 0;
 
296
  ClusterConfiguration *cc = NEW(new ClusterConfiguration(*c));
 
297
 
 
298
  // Find the place to insert this new machine
 
299
  //
 
300
  for (i = 0; i < cc->n_machines; i++) {
 
301
    if (cc->machines[i]->ip > m->ip)
 
302
      break;
 
303
  }
 
304
 
 
305
  // Move the other machines out of the way
 
306
  //
 
307
  for (int j = cc->n_machines - 1; j >= i; j--)
 
308
    cc->machines[j + 1] = cc->machines[j];
 
309
 
 
310
  // Insert it
 
311
  //
 
312
  cc->machines[i] = m;
 
313
  cc->n_machines++;
 
314
 
 
315
  cc->link.next = c;
 
316
  cc->changed = ink_get_hrtime();
 
317
  ink_assert(cc->n_machines < CLUSTER_MAX_MACHINES);
 
318
 
 
319
  build_cluster_hash_table(cc);
 
320
  INK_MEMORY_BARRIER;           // commit writes before freeing old hash table
 
321
  CLUSTER_INCREMENT_DYN_STAT(CLUSTER_CONFIGURATION_CHANGES_STAT);
 
322
 
 
323
  free_configuration(c, cc);
 
324
  return cc;
 
325
}
 
326
 
 
327
ClusterConfiguration *
 
328
configuration_remove_machine(ClusterConfiguration * c, ClusterMachine * m)
 
329
{
 
330
  EThread *thread = this_ethread();
 
331
  ProxyMutex *mutex = thread->mutex;
 
332
 
 
333
  //
 
334
  // Build a new cluster configuration without a machine
 
335
  //
 
336
  ClusterConfiguration *cc = NEW(new ClusterConfiguration(*c));
 
337
  //
 
338
  // remove m and move others down
 
339
  //
 
340
  for (int i = 0; i < cc->n_machines - 1; i++)
 
341
    if (m == cc->machines[i])
 
342
      m = cc->machines[i] = cc->machines[i + 1];
 
343
  cc->n_machines--;
 
344
 
 
345
  ink_assert(cc->n_machines > 0);
 
346
 
 
347
  cc->link.next = c;
 
348
  cc->changed = ink_get_hrtime();
 
349
 
 
350
  build_cluster_hash_table(cc);
 
351
  INK_MEMORY_BARRIER;           // commit writes before freeing old hash table
 
352
  CLUSTER_INCREMENT_DYN_STAT(CLUSTER_CONFIGURATION_CHANGES_STAT);
 
353
 
 
354
  free_configuration(c, cc);
 
355
  return cc;
 
356
}
 
357
 
 
358
/*************************************************************************/
 
359
// Exported functions
 
360
/*************************************************************************/
 
361
 
 
362
//
 
363
// cluster_machine_depth_list()
 
364
//   Return a machine list based on the given hash which is ordered from
 
365
//   now to the past.  Returned list contains no duplicates and depth_list[0]
 
366
//   entry (current configuration) is always valid.  Entries following
 
367
//   depth_list[0] are considered online.
 
368
//   Returns 0 on Success else non-zero on error.
 
369
//
 
370
int
 
371
cluster_machine_depth_list(unsigned int hash, ClusterMachine ** depth_list, int depth_list_size)
 
372
{
 
373
  NOWARN_UNUSED(depth_list_size);
 
374
  int n = 0;
 
375
  ClusterConfiguration *cc = this_cluster()->current_configuration();
 
376
  ClusterMachine *m = cc->machine_hash(hash);
 
377
 
 
378
  depth_list[n++] = m;
 
379
  cc = cc->link.next;
 
380
 
 
381
  while (!cc && n < (CONFIGURATION_HISTORY_PROBE_DEPTH + 1)) {
 
382
    m = cc->machine_hash(hash);
 
383
 
 
384
    if (m->dead || machine_in_vector(m, depth_list, n)) {
 
385
      depth_list[n++] = (ClusterMachine *) 0;   // duplicate or offline
 
386
    } else {
 
387
      depth_list[n++] = m;      // unique valid online entry
 
388
    }
 
389
    cc = cc->link.next;
 
390
  }
 
391
  return 0;                     // Success
 
392
}
 
393
 
 
394
//
 
395
// cluster_machine_at_depth()
 
396
//   Find a machine at a particular depth into the past.
 
397
//   We don't want to probe the current machine or machines
 
398
//   we have probed before, so we store a list of "past_probes".
 
399
//   If probe_depth and past_probes are NULL we only want the
 
400
//   owner (machine now as opposed to in the past).
 
401
//
 
402
ClusterMachine *
 
403
cluster_machine_at_depth(unsigned int hash, int *pprobe_depth, ClusterMachine ** past_probes)
 
404
{
 
405
#ifdef CLUSTER_TOMCAT
 
406
  if (!cache_clustering_enabled)
 
407
    return NULL;
 
408
#endif
 
409
  ClusterConfiguration *
 
410
    cc = this_cluster()->current_configuration();
 
411
  ClusterConfiguration *
 
412
    next_cc = cc;
 
413
  ink_hrtime now = ink_get_hrtime();
 
414
  int fake_probe_depth = 0;
 
415
  int &probe_depth = pprobe_depth ? (*pprobe_depth) : fake_probe_depth;
 
416
  int tprobe_depth = probe_depth;
 
417
 
 
418
#ifdef CLUSTER_TEST
 
419
  if (cc->n_machines > 1) {
 
420
    for (int i = 0; i < cc->n_machines; ++i) {
 
421
      if (cc->machines[i] != this_cluster_machine()) {
 
422
        return cc->machines[i];
 
423
      }
 
424
    }
 
425
  }
 
426
#endif // CLUSTER_TEST
 
427
 
 
428
  while (1) {
 
429
    // If we are out of our depth, fail
 
430
    //
 
431
    if (probe_depth > CONFIGURATION_HISTORY_PROBE_DEPTH)
 
432
      break;
 
433
 
 
434
    // If there is no configuration, fail
 
435
    //
 
436
    if (!cc || !next_cc)
 
437
      break;
 
438
 
 
439
    cc = next_cc;
 
440
    next_cc = next_cc->link.next;
 
441
 
 
442
    // Find the correct configuration
 
443
    //
 
444
    if (tprobe_depth) {
 
445
      if (cc->changed > (now + CLUSTER_CONFIGURATION_TIMEOUT))
 
446
        break;
 
447
      tprobe_depth--;
 
448
      continue;
 
449
    }
 
450
 
 
451
    ClusterMachine *
 
452
      m = cc->machine_hash(hash);
 
453
 
 
454
    // If it is not this machine, or a machine we have done before
 
455
    // and one that is still up, try again
 
456
    //
 
457
    bool
 
458
      ok = !(m == this_cluster_machine() || (past_probes && machine_in_vector(m, past_probes, probe_depth)) || m->dead);
 
459
 
 
460
    // Store the all but the last probe, so that we never return
 
461
    // the same machine
 
462
    //
 
463
    if (past_probes && probe_depth < CONFIGURATION_HISTORY_PROBE_DEPTH)
 
464
      past_probes[probe_depth] = m;
 
465
    probe_depth++;
 
466
 
 
467
    if (!ok) {
 
468
      if (!pprobe_depth)
 
469
        break;                  // don't go down if we don't have a depth
 
470
      continue;
 
471
    }
 
472
 
 
473
    return m;
 
474
  }
 
475
  return NULL;
 
476
}
 
477
 
 
478
//
 
479
// initialize_thread_for_cluster()
 
480
//   This is not required since we have a separate handler
 
481
//   for each machine-machine pair, the pointers to which are
 
482
//   stored in the ClusterMachine structures
 
483
//
 
484
void
 
485
initialize_thread_for_cluster(EThread * e)
 
486
{
 
487
  (void) e;
 
488
}
 
489
 
 
490
/*************************************************************************/
 
491
// Cluster member functions (Public Class)
 
492
/*************************************************************************/
 
493
Cluster::Cluster()
 
494
{
 
495
}
 
496
 
 
497
// End of ClusterConfig.cc