~ps10gel/ubuntu/xenial/trafficserver/6.2.0

« back to all changes in this revision

Viewing changes to iocore/cluster/P_ClusterInternal.h

  • Committer: Bazaar Package Importer
  • Author(s): Arno Toell
  • Date: 2011-01-13 11:49:18 UTC
  • Revision ID: james.westby@ubuntu.com-20110113114918-vu422h8dknrgkj15
Tags: upstream-2.1.5-unstable
ImportĀ upstreamĀ versionĀ 2.1.5-unstable

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/** @file
 
2
 
 
3
  A brief file description
 
4
 
 
5
  @section license License
 
6
 
 
7
  Licensed to the Apache Software Foundation (ASF) under one
 
8
  or more contributor license agreements.  See the NOTICE file
 
9
  distributed with this work for additional information
 
10
  regarding copyright ownership.  The ASF licenses this file
 
11
  to you under the Apache License, Version 2.0 (the
 
12
  "License"); you may not use this file except in compliance
 
13
  with the License.  You may obtain a copy of the License at
 
14
 
 
15
      http://www.apache.org/licenses/LICENSE-2.0
 
16
 
 
17
  Unless required by applicable law or agreed to in writing, software
 
18
  distributed under the License is distributed on an "AS IS" BASIS,
 
19
  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
20
  See the License for the specific language governing permissions and
 
21
  limitations under the License.
 
22
 */
 
23
 
 
24
 
 
25
/****************************************************************************
 
26
 
 
27
  ClusterInternal.h
 
28
****************************************************************************/
 
29
#include "P_ClusterCache.h"
 
30
 
 
31
#ifndef _P_ClusterInternal_h
 
32
#define _P_ClusterInternal_h
 
33
 
 
34
/*************************************************************************/
 
35
// Compilation Options
 
36
/*************************************************************************/
 
37
#define CLUSTER_THREAD_STEALING         1
 
38
#define CLUSTER_TOMCAT                  1
 
39
#define CLUSTER_STATS                   1
 
40
 
 
41
 
 
42
#define ALIGN_DOUBLE(_p)   ((((uintptr_t) (_p)) + 7) & ~7)
 
43
#define ALLOCA_DOUBLE(_sz) ALIGN_DOUBLE(alloca((_sz) + 8))
 
44
 
 
45
/*************************************************************************/
 
46
// Configuration Parameters
 
47
/*************************************************************************/
 
48
// Note: MAX_TCOUNT must be power of 2
 
49
#define MAX_TCOUNT               128
 
50
#define CONTROL_DATA             (128*1024)
 
51
#define READ_BANK_BUF_SIZE       DEFAULT_MAX_BUFFER_SIZE
 
52
#define READ_BANK_BUF_INDEX      (DEFAULT_BUFFER_SIZES-1)
 
53
#define ALLOC_DATA_MAGIC         0xA5   // 8 bits in size
 
54
#define READ_LOCK_SPIN_COUNT     1
 
55
#define WRITE_LOCK_SPIN_COUNT    1
 
56
 
 
57
// Unix specific optimizations
 
58
// #define CLUSTER_IMMEDIATE_NETIO       1
 
59
 
 
60
  // (see ClusterHandler::mainClusterEvent)
 
61
  // this is equivalent to a max of 0.7 seconds
 
62
#define CLUSTER_BUCKETS          64
 
63
#define CLUSTER_PERIOD           HRTIME_MSECONDS(10)
 
64
 
 
65
  // Per instance maximum time allotted to cluster thread
 
66
#define CLUSTER_MAX_RUN_TIME    HRTIME_MSECONDS(100)
 
67
  // Per instance maximum time allotted to thread stealing
 
68
#define CLUSTER_MAX_THREAD_STEAL_TIME   HRTIME_MSECONDS(10)
 
69
 
 
70
  // minimum number of channels to allocate
 
71
#define MIN_CHANNELS             4096
 
72
#define MAX_CHANNELS             ((32*1024) - 1)        // 15 bits in Descriptor
 
73
 
 
74
#define CLUSTER_CONTROL_CHANNEL  0
 
75
#define LAST_DEDICATED_CHANNEL   0
 
76
 
 
77
#define CLUSTER_PHASES           1
 
78
 
 
79
#define CLUSTER_INITIAL_PRIORITY CLUSTER_PHASES
 
80
  // how often to retry connect to machines which are supposed to be in the
 
81
  // cluster
 
82
#define CLUSTER_BUMP_LENGTH      1
 
83
#define CLUSTER_MEMBER_DELAY     HRTIME_SECONDS(1)
 
84
  // How long to leave an unconnected ClusterVConnection waiting
 
85
  // Note: assumes (CLUSTER_CONNECT_TIMEOUT == 2 * CACHE_CLUSTER_TIMEOUT)
 
86
#ifdef CLUSTER_TEST_DEBUG
 
87
#define CLUSTER_CONNECT_TIMEOUT  HRTIME_SECONDS(65536)
 
88
#else
 
89
#define CLUSTER_CONNECT_TIMEOUT  HRTIME_SECONDS(10)
 
90
#endif
 
91
#define CLUSTER_CONNECT_RETRY    HRTIME_MSECONDS(20)
 
92
#define CLUSTER_RETRY            HRTIME_MSECONDS(10)
 
93
#define CLUSTER_DELAY_BETWEEN_WRITES HRTIME_MSECONDS(10)
 
94
 
 
95
  // Force close on cluster channel if no activity detected in this interval
 
96
#ifdef CLUSTER_TEST_DEBUG
 
97
#define CLUSTER_CHANNEL_INACTIVITY_TIMEOUT (65536 * HRTIME_SECONDS(60))
 
98
#else
 
99
#define CLUSTER_CHANNEL_INACTIVITY_TIMEOUT (10 * HRTIME_SECONDS(60))
 
100
#endif
 
101
 
 
102
  // Defines for work deferred to ET_NET threads
 
103
#define COMPLETION_CALLBACK_PERIOD      HRTIME_MSECONDS(10)
 
104
#define MAX_COMPLETION_CALLBACK_EVENTS  16
 
105
 
 
106
  // ClusterHandler::mainClusterEvent() thread active state
 
107
#define CLUSTER_ACTIVE           1
 
108
#define CLUSTER_NOT_ACTIVE       0
 
109
 
 
110
  // defines for ClusterHandler::remote_closed
 
111
#define FORCE_CLOSE_ON_OPEN_CHANNEL     -2
 
112
 
 
113
  // defines for machine_config_change()
 
114
#define MACHINE_CONFIG          0
 
115
#define CLUSTER_CONFIG          1
 
116
 
 
117
// Debug interface category definitions
 
118
#define CL_NOTE         "cluster_note"
 
119
#define CL_WARN         "cluster_warn"
 
120
#define CL_PROTO        "cluster_proto"
 
121
#define CL_TRACE        "cluster_trace"
 
122
 
 
123
/*************************************************************************/
 
124
// Constants
 
125
/*************************************************************************/
 
126
#define MAX_FAST_CONTROL_MESSAGE 504    // 512 - 4 (cluster func #) - 4 align
 
127
#define SMALL_CONTROL_MESSAGE    MAX_FAST_CONTROL_MESSAGE       // copied instead
 
128
                                                           //  of vectored
 
129
#define WRITE_MESSAGE_ALREADY_BUILT -1
 
130
 
 
131
#define MAGIC_COUNT(_x) \
 
132
(0xBADBAD ^ ~(uint32_t)_x.msg.count \
 
133
 ^ ~(uint32_t)_x.msg.descriptor_cksum \
 
134
 ^ ~(uint32_t)_x.msg.control_bytes_cksum \
 
135
 ^ ~(uint32_t)_x.msg.unused \
 
136
 ^ ~((uint32_t)_x.msg.control_bytes << 16) ^_x.sequence_number)
 
137
 
 
138
#define DOUBLE_ALIGN(_x)    ((((uintptr_t)_x)+7)&~7)
 
139
 
 
140
/*************************************************************************/
 
141
// Testing Defines
 
142
/*************************************************************************/
 
143
#define MISS_TEST                0
 
144
#define TEST_PARTIAL_WRITES      0
 
145
#define TEST_PARTIAL_READS       0
 
146
#define TEST_TIMING              0
 
147
#define TEST_READ_LOCKS_MISSED   0
 
148
#define TEST_WRITE_LOCKS_MISSED  0
 
149
#define TEST_ENTER_EXIT          0
 
150
#define TEST_ENTER_EXIT          0
 
151
 
 
152
//
 
153
// Timing testing
 
154
//
 
155
#if TEST_TIMING
 
156
#define TTTEST(_x) \
 
157
fprintf(stderr, _x " at: %u\n", \
 
158
        ((unsigned int)(ink_get_hrtime()/HRTIME_MSECOND)) % 1000)
 
159
#define TTEST(_x) \
 
160
fprintf(stderr, _x " for: %d at: %u\n", vc->channel, \
 
161
        ((unsigned int)(ink_get_hrtime()/HRTIME_MSECOND)) % 1000)
 
162
#define TIMEOUT_TESTS(_s,_d) \
 
163
    if (*(int*)_d == 8)  \
 
164
      fprintf(stderr,_s" lookup  %d\n", *(int*)(_d+20)); \
 
165
    else if (*(int*)_d == 10) \
 
166
      fprintf(stderr,_s" op %d %d\n", *(int*)(_d+36), \
 
167
              *(int*)(_d+40)); \
 
168
    else if (*(int*)_d == 11) \
 
169
      fprintf(stderr,_s" rop %d %d\n", *(int*)(_d+4), \
 
170
              *(int*)(_d+8))
 
171
#else
 
172
#define TTTEST(_x)
 
173
#define TTEST(_x)
 
174
#define TIMEOUT_TESTS(_x,_y)
 
175
#endif
 
176
 
 
177
#if (TEST_READ_LOCKS_MISSED || TEST_WRITE_LOCKS_MISSED)
 
178
static unsigned int test_cluster_locks_missed = 0;
 
179
static
 
180
test_cluster_lock_might_fail()
 
181
{
 
182
  return (!(rand_r(&test_cluster_locks_missed) % 13));
 
183
}
 
184
#endif
 
185
#if TEST_READ_LOCKS_MISSED
 
186
#define TEST_READ_LOCK_MIGHT_FAIL test_cluster_lock_might_fail()
 
187
#else
 
188
#define TEST_READ_LOCK_MIGHT_FAIL false
 
189
#endif
 
190
#if TEST_WRITE_LOCKS_MISSED
 
191
#define TEST_WRITE_LOCK_MIGHT_FAIL test_cluster_lock_might_fail()
 
192
#else
 
193
#define TEST_WRITE_LOCK_MIGHT_FAIL false
 
194
#endif
 
195
 
 
196
#if TEST_ENTER_EXIT
 
197
struct enter_exit_class
 
198
{
 
199
  int *outv;
 
200
    enter_exit_class(int *in, int *out):outv(out)
 
201
  {
 
202
    (*in)++;
 
203
  }
 
204
   ~enter_exit_class()
 
205
  {
 
206
    (*outv)++;
 
207
  }
 
208
};
 
209
 
 
210
#define enter_exit(_x,_y) enter_exit_class a(_x,_y)
 
211
#else
 
212
#define enter_exit(_x,_y)
 
213
#endif
 
214
 
 
215
#define DOT_SEPARATED(_x)                             \
 
216
((unsigned char*)&(_x))[0], ((unsigned char*)&(_x))[1],   \
 
217
  ((unsigned char*)&(_x))[2], ((unsigned char*)&(_x))[3]
 
218
 
 
219
//
 
220
// RPC message for CLOSE_CHANNEL_CLUSTER_FUNCTION
 
221
//
 
222
struct CloseMessage:public ClusterMessageHeader
 
223
{
 
224
  uint32_t channel;
 
225
  int32_t status;
 
226
  int32_t lerrno;
 
227
  uint32_t sequence_number;
 
228
 
 
229
  enum
 
230
  {
 
231
    MIN_VERSION = 1,
 
232
    MAX_VERSION = 1,
 
233
    CLOSE_CHAN_MESSAGE_VERSION = MAX_VERSION
 
234
  };
 
235
 
 
236
    CloseMessage(uint16_t vers = CLOSE_CHAN_MESSAGE_VERSION)
 
237
:  ClusterMessageHeader(vers), channel(0), status(0), lerrno(0), sequence_number(0) {
 
238
  }
 
239
  ////////////////////////////////////////////////////////////////////////////
 
240
  static int protoToVersion(int protoMajor)
 
241
  {
 
242
    (void) protoMajor;
 
243
    return CLOSE_CHAN_MESSAGE_VERSION;
 
244
  }
 
245
  static int sizeof_fixedlen_msg()
 
246
  {
 
247
    return sizeof(CloseMessage);
 
248
  }
 
249
  void init(uint16_t vers = CLOSE_CHAN_MESSAGE_VERSION) {
 
250
    _init(vers);
 
251
  }
 
252
  inline void SwapBytes()
 
253
  {
 
254
    if (NeedByteSwap()) {
 
255
      swap32(&channel);
 
256
      swap32((uint32_t *) & status);
 
257
      swap32((uint32_t *) & lerrno);
 
258
      swap32(&sequence_number);
 
259
    }
 
260
  }
 
261
  ////////////////////////////////////////////////////////////////////////////
 
262
};
 
263
 
 
264
//
 
265
// RPC message for MACHINE_LIST_CLUSTER_FUNCTION
 
266
//
 
267
struct MachineListMessage:public ClusterMessageHeader
 
268
{
 
269
  uint32_t n_ip;                  // Valid entries in ip[]
 
270
  uint32_t ip[CLUSTER_MAX_MACHINES];      // variable length data
 
271
 
 
272
  enum
 
273
  {
 
274
    MIN_VERSION = 1,
 
275
    MAX_VERSION = 1,
 
276
    MACHINE_LIST_MESSAGE_VERSION = MAX_VERSION
 
277
  };
 
278
 
 
279
    MachineListMessage():ClusterMessageHeader(MACHINE_LIST_MESSAGE_VERSION), n_ip(0)
 
280
  {
 
281
    memset(ip, 0, sizeof(ip));
 
282
  }
 
283
  ////////////////////////////////////////////////////////////////////////////
 
284
  static int protoToVersion(int protoMajor)
 
285
  {
 
286
    (void) protoMajor;
 
287
    return MACHINE_LIST_MESSAGE_VERSION;
 
288
  }
 
289
  static int sizeof_fixedlen_msg()
 
290
  {
 
291
    return sizeof(ClusterMessageHeader);
 
292
  }
 
293
  void init(uint16_t vers = MACHINE_LIST_MESSAGE_VERSION) {
 
294
    _init(vers);
 
295
  }
 
296
  inline void SwapBytes()
 
297
  {
 
298
    swap32(&n_ip);
 
299
  }
 
300
  ////////////////////////////////////////////////////////////////////////////
 
301
};
 
302
 
 
303
//
 
304
// RPC message for SET_CHANNEL_DATA_CLUSTER_FUNCTION
 
305
//
 
306
struct SetChanDataMessage:public ClusterMessageHeader
 
307
{
 
308
  uint32_t channel;
 
309
  uint32_t sequence_number;
 
310
  uint32_t data_type;             // enum CacheDataType
 
311
  char data[4];
 
312
 
 
313
  enum
 
314
  {
 
315
    MIN_VERSION = 1,
 
316
    MAX_VERSION = 1,
 
317
    SET_CHANNEL_DATA_MESSAGE_VERSION = MAX_VERSION
 
318
  };
 
319
 
 
320
    SetChanDataMessage(uint16_t vers = SET_CHANNEL_DATA_MESSAGE_VERSION)
 
321
:  ClusterMessageHeader(vers), channel(0), sequence_number(0), data_type(0) {
 
322
    memset(data, 0, sizeof(data));
 
323
  }
 
324
  ////////////////////////////////////////////////////////////////////////////
 
325
  static int protoToVersion(int protoMajor)
 
326
  {
 
327
    (void) protoMajor;
 
328
    return SET_CHANNEL_DATA_MESSAGE_VERSION;
 
329
  }
 
330
  static int sizeof_fixedlen_msg()
 
331
  {
 
332
    SetChanDataMessage *p = 0;
 
333
    return (int) DOUBLE_ALIGN((int64_t) ((char *) &p->data[0] - (char *) p));
 
334
  }
 
335
  void init(uint16_t vers = SET_CHANNEL_DATA_MESSAGE_VERSION) {
 
336
    _init(vers);
 
337
  }
 
338
  inline void SwapBytes()
 
339
  {
 
340
    if (NeedByteSwap()) {
 
341
      swap32(&channel);
 
342
      swap32(&sequence_number);
 
343
      swap32(&data_type);
 
344
    }
 
345
  }
 
346
  ////////////////////////////////////////////////////////////////////////////
 
347
};
 
348
 
 
349
//
 
350
// RPC message for SET_CHANNEL_PIN_CLUSTER_FUNCTION
 
351
//
 
352
struct SetChanPinMessage:public ClusterMessageHeader
 
353
{
 
354
  uint32_t channel;
 
355
  uint32_t sequence_number;
 
356
  uint32_t pin_time;
 
357
 
 
358
  enum
 
359
  {
 
360
    MIN_VERSION = 1,
 
361
    MAX_VERSION = 1,
 
362
    SET_CHANNEL_PIN_MESSAGE_VERSION = MAX_VERSION
 
363
  };
 
364
 
 
365
    SetChanPinMessage(uint16_t vers = SET_CHANNEL_PIN_MESSAGE_VERSION)
 
366
:  ClusterMessageHeader(vers), channel(0), sequence_number(0), pin_time(0) {
 
367
  }
 
368
  ////////////////////////////////////////////////////////////////////////////
 
369
  static int protoToVersion(int protoMajor)
 
370
  {
 
371
    (void) protoMajor;
 
372
    return SET_CHANNEL_PIN_MESSAGE_VERSION;
 
373
  }
 
374
  static int sizeof_fixedlen_msg()
 
375
  {
 
376
    return (int) sizeof(SetChanPinMessage);
 
377
  }
 
378
  void init(uint16_t vers = SET_CHANNEL_PIN_MESSAGE_VERSION) {
 
379
    _init(vers);
 
380
  }
 
381
  inline void SwapBytes()
 
382
  {
 
383
    if (NeedByteSwap()) {
 
384
      swap32(&channel);
 
385
      swap32(&sequence_number);
 
386
      swap32(&pin_time);
 
387
    }
 
388
  }
 
389
  ////////////////////////////////////////////////////////////////////////////
 
390
};
 
391
 
 
392
//
 
393
// RPC message for SET_CHANNEL_PRIORITY_CLUSTER_FUNCTION
 
394
//
 
395
struct SetChanPriorityMessage:public ClusterMessageHeader
 
396
{
 
397
  uint32_t channel;
 
398
  uint32_t sequence_number;
 
399
  uint32_t disk_priority;
 
400
 
 
401
  enum
 
402
  {
 
403
    MIN_VERSION = 1,
 
404
    MAX_VERSION = 1,
 
405
    SET_CHANNEL_PRIORITY_MESSAGE_VERSION = MAX_VERSION
 
406
  };
 
407
 
 
408
    SetChanPriorityMessage(uint16_t vers = SET_CHANNEL_PRIORITY_MESSAGE_VERSION)
 
409
:  ClusterMessageHeader(vers), channel(0), sequence_number(0), disk_priority(0) {
 
410
  }
 
411
  ////////////////////////////////////////////////////////////////////////////
 
412
  static int protoToVersion(int protoMajor)
 
413
  {
 
414
    (void) protoMajor;
 
415
    return SET_CHANNEL_PRIORITY_MESSAGE_VERSION;
 
416
  }
 
417
  static int sizeof_fixedlen_msg()
 
418
  {
 
419
    return (int) sizeof(SetChanPriorityMessage);
 
420
  }
 
421
  void init(uint16_t vers = SET_CHANNEL_PRIORITY_MESSAGE_VERSION) {
 
422
    _init(vers);
 
423
  }
 
424
  inline void SwapBytes()
 
425
  {
 
426
    if (NeedByteSwap()) {
 
427
      swap32(&channel);
 
428
      swap32(&sequence_number);
 
429
      swap32(&disk_priority);
 
430
    }
 
431
  }
 
432
  ////////////////////////////////////////////////////////////////////////////
 
433
};
 
434
 
 
435
inline void
 
436
SetHighBit(int *val)
 
437
{
 
438
  *val |= (1 << ((sizeof(int) * 8) - 1));
 
439
}
 
440
 
 
441
inline void
 
442
ClearHighBit(int *val)
 
443
{
 
444
  *val &= ~(1 << ((sizeof(int) * 8) - 1));
 
445
}
 
446
 
 
447
inline int
 
448
IsHighBitSet(int *val)
 
449
{
 
450
  return (*val & (1 << ((sizeof(int) * 8) - 1)));
 
451
}
 
452
 
 
453
/////////////////////////////////////////////////////////////////
 
454
// ClusterAccept -- Handle cluster connect events from peer
 
455
//                  cluster nodes.
 
456
/////////////////////////////////////////////////////////////////
 
457
class ClusterAccept:public Continuation
 
458
{
 
459
public:
 
460
  ClusterAccept(int *, int, int);
 
461
  void Init();
 
462
  void ShutdownDelete();
 
463
  int ClusterAcceptEvent(int, void *);
 
464
  int ClusterAcceptMachine(NetVConnection *);
 
465
 
 
466
   ~ClusterAccept();
 
467
private:
 
468
 
 
469
  int *p_cluster_port;
 
470
  int socket_send_bufsize;
 
471
  int socket_recv_bufsize;
 
472
  unsigned long socket_opt_flags;
 
473
  int current_cluster_port;
 
474
  Action *accept_action;
 
475
  Event *periodic_event;
 
476
};
 
477
 
 
478
// VC++ 5.0 special
 
479
struct ClusterHandler;
 
480
typedef int (ClusterHandler::*ClusterContHandler) (int, void *);
 
481
 
 
482
struct OutgoingControl;
 
483
typedef int (OutgoingControl::*OutgoingCtrlHandler) (int, void *);
 
484
 
 
485
struct ClusterVConnection;
 
486
typedef int (ClusterVConnection::*ClusterVConnHandler) (int, void *);
 
487
 
 
488
typedef struct iovec IOVec;
 
489
 
 
490
// Library  declarations
 
491
extern void cluster_set_priority(ClusterHandler *, ClusterVConnState *, int);
 
492
extern void cluster_lower_priority(ClusterHandler *, ClusterVConnState *);
 
493
extern void cluster_raise_priority(ClusterHandler *, ClusterVConnState *);
 
494
extern void cluster_schedule(ClusterHandler *, ClusterVConnection *, ClusterVConnState *);
 
495
extern void cluster_reschedule(ClusterHandler *, ClusterVConnection *, ClusterVConnState *);
 
496
extern void cluster_disable(ClusterHandler *, ClusterVConnection *, ClusterVConnState *);
 
497
extern void cluster_update_priority(ClusterHandler *, ClusterVConnection *, ClusterVConnState *, int64_t, int64_t);
 
498
#define CLUSTER_BUMP_NO_REMOVE    -1
 
499
extern void cluster_bump(ClusterHandler *, ClusterVConnectionBase *, ClusterVConnState *, int);
 
500
extern int iov_memcpy(IOVec *, int, int, char *);
 
501
 
 
502
extern IOBufferBlock *clone_IOBufferBlockList(IOBufferBlock *, int, int, IOBufferBlock **);
 
503
extern IOBufferBlock *consume_IOBufferBlockList(IOBufferBlock *, int64_t);
 
504
extern int64_t bytes_IOBufferBlockList(IOBufferBlock *, int64_t);
 
505
 
 
506
// ClusterVConnection declarations
 
507
extern void clusterVCAllocator_free(ClusterVConnection * vc);
 
508
extern ClassAllocator<ClusterVConnection> clusterVCAllocator;
 
509
extern ClassAllocator<ByteBankDescriptor> byteBankAllocator;
 
510
 
 
511
// Cluster configuration declarations
 
512
extern int cluster_port;
 
513
// extern void * machine_config_change(void *, void *);
 
514
int machine_config_change(const char *, RecDataT, RecData, void *);
 
515
extern void do_machine_config_change(void *, const char *);
 
516
 
 
517
#ifdef NON_MODULAR
 
518
// Cluster API support functions
 
519
extern void clusterAPI_init();
 
520
extern void machine_online_APIcallout(int);
 
521
extern void machine_offline_APIcallout(int);
 
522
#endif
 
523
#endif /* _ClusterInternal_h */