3
A brief file description
5
@section license License
7
Licensed to the Apache Software Foundation (ASF) under one
8
or more contributor license agreements. See the NOTICE file
9
distributed with this work for additional information
10
regarding copyright ownership. The ASF licenses this file
11
to you under the Apache License, Version 2.0 (the
12
"License"); you may not use this file except in compliance
13
with the License. You may obtain a copy of the License at
15
http://www.apache.org/licenses/LICENSE-2.0
17
Unless required by applicable law or agreed to in writing, software
18
distributed under the License is distributed on an "AS IS" BASIS,
19
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20
See the License for the specific language governing permissions and
21
limitations under the License.
25
/****************************************************************************
28
****************************************************************************/
29
#include "P_ClusterCache.h"
31
#ifndef _P_ClusterInternal_h
32
#define _P_ClusterInternal_h
34
/*************************************************************************/
35
// Compilation Options
36
/*************************************************************************/
37
#define CLUSTER_THREAD_STEALING 1
38
#define CLUSTER_TOMCAT 1
39
#define CLUSTER_STATS 1
42
#define ALIGN_DOUBLE(_p) ((((uintptr_t) (_p)) + 7) & ~7)
43
#define ALLOCA_DOUBLE(_sz) ALIGN_DOUBLE(alloca((_sz) + 8))
45
/*************************************************************************/
46
// Configuration Parameters
47
/*************************************************************************/
48
// Note: MAX_TCOUNT must be power of 2
49
#define MAX_TCOUNT 128
50
#define CONTROL_DATA (128*1024)
51
#define READ_BANK_BUF_SIZE DEFAULT_MAX_BUFFER_SIZE
52
#define READ_BANK_BUF_INDEX (DEFAULT_BUFFER_SIZES-1)
53
#define ALLOC_DATA_MAGIC 0xA5 // 8 bits in size
54
#define READ_LOCK_SPIN_COUNT 1
55
#define WRITE_LOCK_SPIN_COUNT 1
57
// Unix specific optimizations
58
// #define CLUSTER_IMMEDIATE_NETIO 1
60
// (see ClusterHandler::mainClusterEvent)
61
// this is equivalent to a max of 0.7 seconds
62
#define CLUSTER_BUCKETS 64
63
#define CLUSTER_PERIOD HRTIME_MSECONDS(10)
65
// Per instance maximum time allotted to cluster thread
66
#define CLUSTER_MAX_RUN_TIME HRTIME_MSECONDS(100)
67
// Per instance maximum time allotted to thread stealing
68
#define CLUSTER_MAX_THREAD_STEAL_TIME HRTIME_MSECONDS(10)
70
// minimum number of channels to allocate
71
#define MIN_CHANNELS 4096
72
#define MAX_CHANNELS ((32*1024) - 1) // 15 bits in Descriptor
74
#define CLUSTER_CONTROL_CHANNEL 0
75
#define LAST_DEDICATED_CHANNEL 0
77
#define CLUSTER_PHASES 1
79
#define CLUSTER_INITIAL_PRIORITY CLUSTER_PHASES
80
// how often to retry connect to machines which are supposed to be in the
82
#define CLUSTER_BUMP_LENGTH 1
83
#define CLUSTER_MEMBER_DELAY HRTIME_SECONDS(1)
84
// How long to leave an unconnected ClusterVConnection waiting
85
// Note: assumes (CLUSTER_CONNECT_TIMEOUT == 2 * CACHE_CLUSTER_TIMEOUT)
86
#ifdef CLUSTER_TEST_DEBUG
87
#define CLUSTER_CONNECT_TIMEOUT HRTIME_SECONDS(65536)
89
#define CLUSTER_CONNECT_TIMEOUT HRTIME_SECONDS(10)
91
#define CLUSTER_CONNECT_RETRY HRTIME_MSECONDS(20)
92
#define CLUSTER_RETRY HRTIME_MSECONDS(10)
93
#define CLUSTER_DELAY_BETWEEN_WRITES HRTIME_MSECONDS(10)
95
// Force close on cluster channel if no activity detected in this interval
96
#ifdef CLUSTER_TEST_DEBUG
97
#define CLUSTER_CHANNEL_INACTIVITY_TIMEOUT (65536 * HRTIME_SECONDS(60))
99
#define CLUSTER_CHANNEL_INACTIVITY_TIMEOUT (10 * HRTIME_SECONDS(60))
102
// Defines for work deferred to ET_NET threads
103
#define COMPLETION_CALLBACK_PERIOD HRTIME_MSECONDS(10)
104
#define MAX_COMPLETION_CALLBACK_EVENTS 16
106
// ClusterHandler::mainClusterEvent() thread active state
107
#define CLUSTER_ACTIVE 1
108
#define CLUSTER_NOT_ACTIVE 0
110
// defines for ClusterHandler::remote_closed
111
#define FORCE_CLOSE_ON_OPEN_CHANNEL -2
113
// defines for machine_config_change()
114
#define MACHINE_CONFIG 0
115
#define CLUSTER_CONFIG 1
117
// Debug interface category definitions
118
#define CL_NOTE "cluster_note"
119
#define CL_WARN "cluster_warn"
120
#define CL_PROTO "cluster_proto"
121
#define CL_TRACE "cluster_trace"
123
/*************************************************************************/
125
/*************************************************************************/
126
#define MAX_FAST_CONTROL_MESSAGE 504 // 512 - 4 (cluster func #) - 4 align
127
#define SMALL_CONTROL_MESSAGE MAX_FAST_CONTROL_MESSAGE // copied instead
129
#define WRITE_MESSAGE_ALREADY_BUILT -1
131
#define MAGIC_COUNT(_x) \
132
(0xBADBAD ^ ~(uint32_t)_x.msg.count \
133
^ ~(uint32_t)_x.msg.descriptor_cksum \
134
^ ~(uint32_t)_x.msg.control_bytes_cksum \
135
^ ~(uint32_t)_x.msg.unused \
136
^ ~((uint32_t)_x.msg.control_bytes << 16) ^_x.sequence_number)
138
#define DOUBLE_ALIGN(_x) ((((uintptr_t)_x)+7)&~7)
140
/*************************************************************************/
142
/*************************************************************************/
144
#define TEST_PARTIAL_WRITES 0
145
#define TEST_PARTIAL_READS 0
146
#define TEST_TIMING 0
147
#define TEST_READ_LOCKS_MISSED 0
148
#define TEST_WRITE_LOCKS_MISSED 0
149
#define TEST_ENTER_EXIT 0
150
#define TEST_ENTER_EXIT 0
157
fprintf(stderr, _x " at: %u\n", \
158
((unsigned int)(ink_get_hrtime()/HRTIME_MSECOND)) % 1000)
160
fprintf(stderr, _x " for: %d at: %u\n", vc->channel, \
161
((unsigned int)(ink_get_hrtime()/HRTIME_MSECOND)) % 1000)
162
#define TIMEOUT_TESTS(_s,_d) \
163
if (*(int*)_d == 8) \
164
fprintf(stderr,_s" lookup %d\n", *(int*)(_d+20)); \
165
else if (*(int*)_d == 10) \
166
fprintf(stderr,_s" op %d %d\n", *(int*)(_d+36), \
168
else if (*(int*)_d == 11) \
169
fprintf(stderr,_s" rop %d %d\n", *(int*)(_d+4), \
174
#define TIMEOUT_TESTS(_x,_y)
177
#if (TEST_READ_LOCKS_MISSED || TEST_WRITE_LOCKS_MISSED)
178
static unsigned int test_cluster_locks_missed = 0;
180
test_cluster_lock_might_fail()
182
return (!(rand_r(&test_cluster_locks_missed) % 13));
185
#if TEST_READ_LOCKS_MISSED
186
#define TEST_READ_LOCK_MIGHT_FAIL test_cluster_lock_might_fail()
188
#define TEST_READ_LOCK_MIGHT_FAIL false
190
#if TEST_WRITE_LOCKS_MISSED
191
#define TEST_WRITE_LOCK_MIGHT_FAIL test_cluster_lock_might_fail()
193
#define TEST_WRITE_LOCK_MIGHT_FAIL false
197
struct enter_exit_class
200
enter_exit_class(int *in, int *out):outv(out)
210
#define enter_exit(_x,_y) enter_exit_class a(_x,_y)
212
#define enter_exit(_x,_y)
215
#define DOT_SEPARATED(_x) \
216
((unsigned char*)&(_x))[0], ((unsigned char*)&(_x))[1], \
217
((unsigned char*)&(_x))[2], ((unsigned char*)&(_x))[3]
220
// RPC message for CLOSE_CHANNEL_CLUSTER_FUNCTION
222
struct CloseMessage:public ClusterMessageHeader
227
uint32_t sequence_number;
233
CLOSE_CHAN_MESSAGE_VERSION = MAX_VERSION
236
CloseMessage(uint16_t vers = CLOSE_CHAN_MESSAGE_VERSION)
237
: ClusterMessageHeader(vers), channel(0), status(0), lerrno(0), sequence_number(0) {
239
////////////////////////////////////////////////////////////////////////////
240
static int protoToVersion(int protoMajor)
243
return CLOSE_CHAN_MESSAGE_VERSION;
245
static int sizeof_fixedlen_msg()
247
return sizeof(CloseMessage);
249
void init(uint16_t vers = CLOSE_CHAN_MESSAGE_VERSION) {
252
inline void SwapBytes()
254
if (NeedByteSwap()) {
256
swap32((uint32_t *) & status);
257
swap32((uint32_t *) & lerrno);
258
swap32(&sequence_number);
261
////////////////////////////////////////////////////////////////////////////
265
// RPC message for MACHINE_LIST_CLUSTER_FUNCTION
267
struct MachineListMessage:public ClusterMessageHeader
269
uint32_t n_ip; // Valid entries in ip[]
270
uint32_t ip[CLUSTER_MAX_MACHINES]; // variable length data
276
MACHINE_LIST_MESSAGE_VERSION = MAX_VERSION
279
MachineListMessage():ClusterMessageHeader(MACHINE_LIST_MESSAGE_VERSION), n_ip(0)
281
memset(ip, 0, sizeof(ip));
283
////////////////////////////////////////////////////////////////////////////
284
static int protoToVersion(int protoMajor)
287
return MACHINE_LIST_MESSAGE_VERSION;
289
static int sizeof_fixedlen_msg()
291
return sizeof(ClusterMessageHeader);
293
void init(uint16_t vers = MACHINE_LIST_MESSAGE_VERSION) {
296
inline void SwapBytes()
300
////////////////////////////////////////////////////////////////////////////
304
// RPC message for SET_CHANNEL_DATA_CLUSTER_FUNCTION
306
struct SetChanDataMessage:public ClusterMessageHeader
309
uint32_t sequence_number;
310
uint32_t data_type; // enum CacheDataType
317
SET_CHANNEL_DATA_MESSAGE_VERSION = MAX_VERSION
320
SetChanDataMessage(uint16_t vers = SET_CHANNEL_DATA_MESSAGE_VERSION)
321
: ClusterMessageHeader(vers), channel(0), sequence_number(0), data_type(0) {
322
memset(data, 0, sizeof(data));
324
////////////////////////////////////////////////////////////////////////////
325
static int protoToVersion(int protoMajor)
328
return SET_CHANNEL_DATA_MESSAGE_VERSION;
330
static int sizeof_fixedlen_msg()
332
SetChanDataMessage *p = 0;
333
return (int) DOUBLE_ALIGN((int64_t) ((char *) &p->data[0] - (char *) p));
335
void init(uint16_t vers = SET_CHANNEL_DATA_MESSAGE_VERSION) {
338
inline void SwapBytes()
340
if (NeedByteSwap()) {
342
swap32(&sequence_number);
346
////////////////////////////////////////////////////////////////////////////
350
// RPC message for SET_CHANNEL_PIN_CLUSTER_FUNCTION
352
struct SetChanPinMessage:public ClusterMessageHeader
355
uint32_t sequence_number;
362
SET_CHANNEL_PIN_MESSAGE_VERSION = MAX_VERSION
365
SetChanPinMessage(uint16_t vers = SET_CHANNEL_PIN_MESSAGE_VERSION)
366
: ClusterMessageHeader(vers), channel(0), sequence_number(0), pin_time(0) {
368
////////////////////////////////////////////////////////////////////////////
369
static int protoToVersion(int protoMajor)
372
return SET_CHANNEL_PIN_MESSAGE_VERSION;
374
static int sizeof_fixedlen_msg()
376
return (int) sizeof(SetChanPinMessage);
378
void init(uint16_t vers = SET_CHANNEL_PIN_MESSAGE_VERSION) {
381
inline void SwapBytes()
383
if (NeedByteSwap()) {
385
swap32(&sequence_number);
389
////////////////////////////////////////////////////////////////////////////
393
// RPC message for SET_CHANNEL_PRIORITY_CLUSTER_FUNCTION
395
struct SetChanPriorityMessage:public ClusterMessageHeader
398
uint32_t sequence_number;
399
uint32_t disk_priority;
405
SET_CHANNEL_PRIORITY_MESSAGE_VERSION = MAX_VERSION
408
SetChanPriorityMessage(uint16_t vers = SET_CHANNEL_PRIORITY_MESSAGE_VERSION)
409
: ClusterMessageHeader(vers), channel(0), sequence_number(0), disk_priority(0) {
411
////////////////////////////////////////////////////////////////////////////
412
static int protoToVersion(int protoMajor)
415
return SET_CHANNEL_PRIORITY_MESSAGE_VERSION;
417
static int sizeof_fixedlen_msg()
419
return (int) sizeof(SetChanPriorityMessage);
421
void init(uint16_t vers = SET_CHANNEL_PRIORITY_MESSAGE_VERSION) {
424
inline void SwapBytes()
426
if (NeedByteSwap()) {
428
swap32(&sequence_number);
429
swap32(&disk_priority);
432
////////////////////////////////////////////////////////////////////////////
438
*val |= (1 << ((sizeof(int) * 8) - 1));
442
ClearHighBit(int *val)
444
*val &= ~(1 << ((sizeof(int) * 8) - 1));
448
IsHighBitSet(int *val)
450
return (*val & (1 << ((sizeof(int) * 8) - 1)));
453
/////////////////////////////////////////////////////////////////
454
// ClusterAccept -- Handle cluster connect events from peer
456
/////////////////////////////////////////////////////////////////
457
class ClusterAccept:public Continuation
460
ClusterAccept(int *, int, int);
462
void ShutdownDelete();
463
int ClusterAcceptEvent(int, void *);
464
int ClusterAcceptMachine(NetVConnection *);
470
int socket_send_bufsize;
471
int socket_recv_bufsize;
472
unsigned long socket_opt_flags;
473
int current_cluster_port;
474
Action *accept_action;
475
Event *periodic_event;
479
struct ClusterHandler;
480
typedef int (ClusterHandler::*ClusterContHandler) (int, void *);
482
struct OutgoingControl;
483
typedef int (OutgoingControl::*OutgoingCtrlHandler) (int, void *);
485
struct ClusterVConnection;
486
typedef int (ClusterVConnection::*ClusterVConnHandler) (int, void *);
488
typedef struct iovec IOVec;
490
// Library declarations
491
extern void cluster_set_priority(ClusterHandler *, ClusterVConnState *, int);
492
extern void cluster_lower_priority(ClusterHandler *, ClusterVConnState *);
493
extern void cluster_raise_priority(ClusterHandler *, ClusterVConnState *);
494
extern void cluster_schedule(ClusterHandler *, ClusterVConnection *, ClusterVConnState *);
495
extern void cluster_reschedule(ClusterHandler *, ClusterVConnection *, ClusterVConnState *);
496
extern void cluster_disable(ClusterHandler *, ClusterVConnection *, ClusterVConnState *);
497
extern void cluster_update_priority(ClusterHandler *, ClusterVConnection *, ClusterVConnState *, int64_t, int64_t);
498
#define CLUSTER_BUMP_NO_REMOVE -1
499
extern void cluster_bump(ClusterHandler *, ClusterVConnectionBase *, ClusterVConnState *, int);
500
extern int iov_memcpy(IOVec *, int, int, char *);
502
extern IOBufferBlock *clone_IOBufferBlockList(IOBufferBlock *, int, int, IOBufferBlock **);
503
extern IOBufferBlock *consume_IOBufferBlockList(IOBufferBlock *, int64_t);
504
extern int64_t bytes_IOBufferBlockList(IOBufferBlock *, int64_t);
506
// ClusterVConnection declarations
507
extern void clusterVCAllocator_free(ClusterVConnection * vc);
508
extern ClassAllocator<ClusterVConnection> clusterVCAllocator;
509
extern ClassAllocator<ByteBankDescriptor> byteBankAllocator;
511
// Cluster configuration declarations
512
extern int cluster_port;
513
// extern void * machine_config_change(void *, void *);
514
int machine_config_change(const char *, RecDataT, RecData, void *);
515
extern void do_machine_config_change(void *, const char *);
518
// Cluster API support functions
519
extern void clusterAPI_init();
520
extern void machine_online_APIcallout(int);
521
extern void machine_offline_APIcallout(int);
523
#endif /* _ClusterInternal_h */