3
A brief file description
5
@section license License
7
Licensed to the Apache Software Foundation (ASF) under one
8
or more contributor license agreements. See the NOTICE file
9
distributed with this work for additional information
10
regarding copyright ownership. The ASF licenses this file
11
to you under the Apache License, Version 2.0 (the
12
"License"); you may not use this file except in compliance
13
with the License. You may obtain a copy of the License at
15
http://www.apache.org/licenses/LICENSE-2.0
17
Unless required by applicable law or agreed to in writing, software
18
distributed under the License is distributed on an "AS IS" BASIS,
19
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20
See the License for the specific language governing permissions and
21
limitations under the License.
25
/****************************************************************************
29
Support for Cluster RPC API.
30
****************************************************************************/
31
#include "P_Cluster.h"
34
#include "InkAPIInternal.h"
36
class ClusterAPIPeriodicSM;
37
static void send_machine_online_list(TSClusterStatusHandle_t *);
39
typedef struct node_callout_entry
41
Ptr<ProxyMutex> mutex;
42
TSClusterStatusFunction func;
43
int state; // See NE_STATE_XXX defines
44
} node_callout_entry_t;
46
#define NE_STATE_FREE 0
47
#define NE_STATE_INITIALIZED 1
49
#define MAX_CLUSTERSTATUS_CALLOUTS 32
51
static ProxyMutex *ClusterAPI_mutex;
52
static ClusterAPIPeriodicSM *periodicSM;
54
static node_callout_entry_t status_callouts[MAX_CLUSTERSTATUS_CALLOUTS];
55
static TSClusterRPCFunction RPC_Functions[API_END_CLUSTER_FUNCTION];
57
#define INDEX_TO_CLUSTER_STATUS_HANDLE(i) ((TSClusterStatusHandle_t)((i)))
58
#define CLUSTER_STATUS_HANDLE_TO_INDEX(h) ((int) ((h)))
59
#define NODE_HANDLE_TO_IP(h) (*((struct in_addr *) &((h))))
60
#define RPC_FUNCTION_KEY_TO_CLUSTER_NUMBER(k) ((int)((k)))
61
#define IP_TO_NODE_HANDLE(ip) ((TSNodeHandle_t)((ip)))
62
#define SIZEOF_RPC_MSG_LESS_DATA (sizeof(TSClusterRPCMsg_t) - \
63
(sizeof(TSClusterRPCMsg_t) - sizeof(TSClusterRPCHandle_t)))
65
typedef struct RPCHandle
68
{ // Note: All union elements are assumed to be the same size
69
// sizeof(u.internal) == sizeof(u.external)
70
TSClusterRPCHandle_t external;
79
#define RPC_HANDLE_MAGIC 0x12345678
81
class MachineStatusSM;
82
typedef int (MachineStatusSM::*MachineStatusSMHandler) (int, void *);
83
class MachineStatusSM:public Continuation
86
// Broadcast constructor
87
MachineStatusSM(TSNodeHandle_t h, TSNodeStatus_t s):_node_handle(h), _node_status(s), _status_handle(0),
88
_broadcast(1), _restart(0), _next_n(0)
90
SET_HANDLER((MachineStatusSMHandler)
91
& MachineStatusSM::MachineStatusSMEvent);
93
// Unicast constructor
94
MachineStatusSM(TSNodeHandle_t h, TSNodeStatus_t s,
95
TSClusterStatusHandle_t sh):_node_handle(h), _node_status(s), _status_handle(sh),
96
_broadcast(0), _restart(0), _next_n(0)
98
SET_HANDLER((MachineStatusSMHandler)
99
& MachineStatusSM::MachineStatusSMEvent);
101
// Send machine online list constructor
102
MachineStatusSM(TSClusterStatusHandle_t sh):
103
_node_handle(0), _node_status(NODE_ONLINE), _status_handle(sh), _broadcast(0), _restart(0), _next_n(0) {
104
SET_HANDLER((MachineStatusSMHandler)
105
& MachineStatusSM::MachineStatusSMEvent);
109
int MachineStatusSMEvent(Event * e, void *d);
112
TSNodeHandle_t _node_handle;
113
TSNodeStatus_t _node_status;
114
TSClusterStatusHandle_t _status_handle; // Valid only if !_broadcast
121
MachineStatusSM::MachineStatusSMEvent(Event * e, void *d)
126
EThread *et = this_ethread();
129
/////////////////////////////////////////////////////
130
// Broadcast node transition to all subscribers
131
/////////////////////////////////////////////////////
132
n = _restart ? _next_n : 0;
133
for (; n < MAX_CLUSTERSTATUS_CALLOUTS; ++n) {
134
if (status_callouts[n].func && (status_callouts[n].state == NE_STATE_INITIALIZED)) {
136
MUTEX_TRY_LOCK(lock, status_callouts[n].mutex, et);
138
status_callouts[n].func(&_node_handle, _node_status);
139
Debug("cluster_api", "callout: n %d ([%u.%u.%u.%u], %d)", n, DOT_SEPARATED(_node_handle), _node_status);
149
/////////////////////////////////////////////////////
150
// Send online node list to a specific subscriber
151
/////////////////////////////////////////////////////
152
n = CLUSTER_STATUS_HANDLE_TO_INDEX(_status_handle);
153
if (status_callouts[n].func) {
154
MUTEX_TRY_LOCK(lock, status_callouts[n].mutex, et);
157
unsigned int my_ipaddr = (this_cluster_machine())->ip;
158
ClusterConfiguration *cc;
162
cc = this_cluster()->current_configuration();
164
for (mi = 0; mi < cc->n_machines; ++mi) {
165
if (cc->machines[mi]->ip != my_ipaddr) {
166
nh = IP_TO_NODE_HANDLE(cc->machines[mi]->ip);
167
status_callouts[n].func(&nh, NODE_ONLINE);
170
"initial callout: n %d ([%u.%u.%u.%u], %d)", n, DOT_SEPARATED(cc->machines[mi]->ip), NODE_ONLINE);
174
status_callouts[n].state = NE_STATE_INITIALIZED;
183
/////////////////////////////////////////////////////
184
// Send node status to a specific subscriber
185
/////////////////////////////////////////////////////
186
n = CLUSTER_STATUS_HANDLE_TO_INDEX(_status_handle);
187
if (status_callouts[n].func) {
188
MUTEX_TRY_LOCK(lock, status_callouts[n].mutex, et);
190
status_callouts[n].func(&_node_handle, _node_status);
193
"directed callout: n %d ([%u.%u.%u.%u], %d)", n, DOT_SEPARATED(_node_handle), _node_status);
206
class ClusterAPIPeriodicSM;
207
typedef int (ClusterAPIPeriodicSM::*ClusterAPIPeriodicSMHandler) (int, void *);
208
class ClusterAPIPeriodicSM:public Continuation
211
ClusterAPIPeriodicSM(ProxyMutex * m):Continuation(m), _active_msmp(0)
213
SET_HANDLER((ClusterAPIPeriodicSMHandler)
214
& ClusterAPIPeriodicSM::ClusterAPIPeriodicSMEvent);
216
~ClusterAPIPeriodicSM()
219
int ClusterAPIPeriodicSMEvent(int, void *);
220
MachineStatusSM *GetNextSM();
223
MachineStatusSM * _active_msmp;
226
static InkAtomicList status_callout_atomic_q;
227
static Queue<MachineStatusSM> status_callout_q;
230
ClusterAPIPeriodicSM::GetNextSM()
232
MachineStatusSM *msmp;
233
MachineStatusSM *msmp_next;
236
msmp = status_callout_q.pop();
238
msmp = (MachineStatusSM *)
239
ink_atomiclist_popall(&status_callout_atomic_q);
242
msmp_next = (MachineStatusSM *) msmp->link.next;
244
status_callout_q.push(msmp);
259
ClusterAPIPeriodicSM::ClusterAPIPeriodicSMEvent(int e, void *d)
261
// Maintain node status event order by serializing the processing.
266
ret = _active_msmp->handleEvent(e, d);
267
if (ret != EVENT_DONE) {
271
_active_msmp = GetNextSM();
282
MachineStatusSM *mssmp = 0;
283
// XXX: BIG RED WARNING!!! Direct null pointer dereference
284
// Either create MachineStatusSM before ose or axe this function.
285
// It is used only if NON_MODULAR is defined making that
286
// flag crashing ClusterProcessor::init()
288
ink_atomiclist_init(&status_callout_atomic_q,
289
"cluster API status_callout_q", (char *) &mssmp->link.next - (char *) mssmp);
290
ClusterAPI_mutex = new_ProxyMutex();
291
MUTEX_TRY_LOCK(lock, ClusterAPI_mutex, this_ethread());
292
ink_release_assert(lock); // Should never fail
293
periodicSM = NEW(new ClusterAPIPeriodicSM(ClusterAPI_mutex));
295
// TODO: Should we do something with this return value?
296
eventProcessor.schedule_every(periodicSM, HRTIME_SECONDS(1), ET_CALL);
300
* Add the given function to the node status callout list which is
301
* invoked on each machine up/down transition.
303
* Note: Using blocking mutex since interface is synchronous and is only
304
* called at plugin load time.
307
TSAddClusterStatusFunction(TSClusterStatusFunction Status_Function, TSMutex m, TSClusterStatusHandle_t * h)
309
Debug("cluster_api", "TSAddClusterStatusFunction func 0x%x", Status_Function);
311
EThread *e = this_ethread();
313
ink_release_assert(Status_Function);
314
MUTEX_TAKE_LOCK(ClusterAPI_mutex, e);
315
for (n = 0; n < MAX_CLUSTERSTATUS_CALLOUTS; ++n) {
316
if (!status_callouts[n].func) {
317
status_callouts[n].mutex = (ProxyMutex *) m;
318
status_callouts[n].func = Status_Function;
319
MUTEX_UNTAKE_LOCK(ClusterAPI_mutex, e);
320
*h = INDEX_TO_CLUSTER_STATUS_HANDLE(n);
322
Debug("cluster_api", "TSAddClusterStatusFunction: func 0x%x n %d", Status_Function, n);
326
MUTEX_UNTAKE_LOCK(ClusterAPI_mutex, e);
331
* Remove the given function from the node status callout list
332
* established via TSAddClusterStatusFunction().
334
* Note: Using blocking mutex since interface is synchronous and is only
335
* called at plugin unload time (unload currently not supported).
338
TSDeleteClusterStatusFunction(TSClusterStatusHandle_t * h)
340
int n = CLUSTER_STATUS_HANDLE_TO_INDEX(*h);
341
EThread *e = this_ethread();
343
ink_release_assert((n >= 0) && (n < MAX_CLUSTERSTATUS_CALLOUTS));
344
Debug("cluster_api", "TSDeleteClusterStatusFunction: n %d", n);
346
MUTEX_TAKE_LOCK(ClusterAPI_mutex, e);
347
status_callouts[n].mutex = 0;
348
status_callouts[n].func = (TSClusterStatusFunction) 0;
349
status_callouts[n].state = NE_STATE_FREE;
350
MUTEX_UNTAKE_LOCK(ClusterAPI_mutex, e);
356
TSNodeHandleToIPAddr(TSNodeHandle_t * h, struct in_addr *in)
358
*in = NODE_HANDLE_TO_IP(*h);
363
TSGetMyNodeHandle(TSNodeHandle_t * h)
365
*h = IP_TO_NODE_HANDLE((this_cluster_machine())->ip);
369
* Enable node status callouts for the added callout entry.
370
* Issued once after the call to TSAddClusterStatusFunction()
371
* to get the current node configuration. All subsequent
372
* callouts are updates to the state obtained at this point.
375
TSEnableClusterStatusCallout(TSClusterStatusHandle_t * h)
377
int ci = CLUSTER_STATUS_HANDLE_TO_INDEX(*h);
379
// int my_ipaddr = (this_cluster_machine())->ip;
380
ink_release_assert((ci >= 0) && (ci < MAX_CLUSTERSTATUS_CALLOUTS));
382
if (status_callouts[ci].state == NE_STATE_INITIALIZED) {
386
Debug("cluster_api", "TSEnableClusterStatusCallout: n %d", ci);
387
send_machine_online_list(h);
391
send_machine_online_list(TSClusterStatusHandle_t * h)
393
MachineStatusSM *msm = NEW(new MachineStatusSM(*h));
395
ink_atomiclist_push(&status_callout_atomic_q, (void *) msm);
399
* Send node online to a specific cluster status entry.
401
// This doesn't seem to be used...
404
directed_machine_online(int Ipaddr, TSClusterStatusHandle_t * h)
406
MachineStatusSM *msm = NEW(new MachineStatusSM(IP_TO_NODE_HANDLE(Ipaddr), NODE_ONLINE, *h));
408
ink_atomiclist_push(&status_callout_atomic_q, (void *) msm);
413
* Called directly by the Cluster upon detection of node online.
416
machine_online_APIcallout(int Ipaddr)
418
MachineStatusSM *msm = NEW(new MachineStatusSM(IP_TO_NODE_HANDLE(Ipaddr), NODE_ONLINE));
420
ink_atomiclist_push(&status_callout_atomic_q, (void *) msm);
424
* Called directly by the Cluster upon detection of node offline.
427
machine_offline_APIcallout(int Ipaddr)
429
MachineStatusSM *msm = NEW(new MachineStatusSM(IP_TO_NODE_HANDLE(Ipaddr), NODE_OFFLINE));
431
ink_atomiclist_push(&status_callout_atomic_q, (void *) msm);
435
* Associate the given RPC function with the given key.
437
* Note: Using blocking mutex since interface is synchronous and is only
438
* called at plugin load time.
441
TSAddClusterRPCFunction(TSClusterRPCKey_t k, TSClusterRPCFunction func, TSClusterRPCHandle_t * h)
444
int n = RPC_FUNCTION_KEY_TO_CLUSTER_NUMBER(k);
445
EThread *e = this_ethread();
447
ink_release_assert(func);
448
ink_release_assert((n >= API_STARECT_CLUSTER_FUNCTION)
449
&& (n <= API_END_CLUSTER_FUNCTION));
450
Debug("cluster_api", "TSAddClusterRPCFunction: key %d func 0x%x", k, func);
452
handle.u.internal.cluster_function = n;
453
handle.u.internal.magic = RPC_HANDLE_MAGIC;
455
MUTEX_TAKE_LOCK(ClusterAPI_mutex, e);
456
if (n < API_END_CLUSTER_FUNCTION)
457
RPC_Functions[n] = func;
458
MUTEX_UNTAKE_LOCK(ClusterAPI_mutex, e);
460
*h = handle.u.external;
465
* Remove the given RPC function added via TSAddClusterRPCFunction().
467
* Note: Using blocking mutex since interface is synchronous and is only
468
* called at plugin unload time (unload currently not supported).
471
TSDeleteClusterRPCFunction(TSClusterRPCHandle_t * rpch)
473
RPCHandle_t *h = (RPCHandle_t *) rpch;
474
EThread *e = this_ethread();
476
ink_release_assert(((h->u.internal.cluster_function >= API_STARECT_CLUSTER_FUNCTION)
477
&& (h->u.internal.cluster_function <= API_END_CLUSTER_FUNCTION)));
478
Debug("cluster_api", "TSDeleteClusterRPCFunction: n %d", h->u.internal.cluster_function);
480
MUTEX_TAKE_LOCK(ClusterAPI_mutex, e);
481
RPC_Functions[h->u.internal.cluster_function] = 0;
482
MUTEX_UNTAKE_LOCK(ClusterAPI_mutex, e);
487
* Cluster calls us here for each RPC API function.
490
default_api_ClusterFunction(ClusterMachine * m, void *data, int len)
492
Debug("cluster_api", "default_api_ClusterFunction: [%u.%u.%u.%u] data 0x%x len %d", DOT_SEPARATED(m->ip), data, len);
494
TSClusterRPCMsg_t *msg = (TSClusterRPCMsg_t *) data;
495
RPCHandle_t *rpch = (RPCHandle_t *) & msg->m_handle;
496
int cluster_function = rpch->u.internal.cluster_function;
498
ink_release_assert((size_t) len >= sizeof(TSClusterRPCMsg_t));
499
ink_release_assert(((cluster_function >= API_STARECT_CLUSTER_FUNCTION)
500
&& (cluster_function <= API_END_CLUSTER_FUNCTION)));
502
if (cluster_function < API_END_CLUSTER_FUNCTION && RPC_Functions[cluster_function]) {
503
int msg_data_len = len - SIZEOF_RPC_MSG_LESS_DATA;
504
TSNodeHandle_t nh = IP_TO_NODE_HANDLE(m->ip);
505
(*RPC_Functions[cluster_function]) (&nh, msg, msg_data_len);
507
clusterProcessor.free_remote_data((char *) data, len);
512
* Free TSClusterRPCMsg_t received via the RPC function.
515
TSFreeRPCMsg(TSClusterRPCMsg_t * msg, int msg_data_len)
517
RPCHandle_t *rpch = (RPCHandle_t *) & msg->m_handle;
518
ink_release_assert(rpch->u.internal.magic == RPC_HANDLE_MAGIC);
519
Debug("cluster_api", "TSFreeRPCMsg: msg 0x%x msg_data_len %d", msg, msg_data_len);
521
clusterProcessor.free_remote_data((char *) msg, msg_data_len + SIZEOF_RPC_MSG_LESS_DATA);
525
* Allocate a message structure for use in the call to TSSendClusterRPC().
528
TSAllocClusterRPCMsg(TSClusterRPCHandle_t * h, int data_size)
530
ink_debug_assert(data_size >= 4);
532
/* Message must be at least 4 bytes in length */
533
return (TSClusterRPCMsg_t *) 0;
536
TSClusterRPCMsg_t *rpcm;
537
OutgoingControl *c = OutgoingControl::alloc();
539
c->len = sizeof(OutgoingControl *) + SIZEOF_RPC_MSG_LESS_DATA + data_size;
541
*((OutgoingControl **) c->data) = c;
543
rpcm = (TSClusterRPCMsg_t *) (c->data + sizeof(OutgoingControl *));
547
* Note: We have carefully constructed TSClusterRPCMsg_t so
548
* m_data[] is 8 byte aligned. This allows the user to
549
* cast m_data[] to any type without any consideration
550
* for alignment issues.
556
* Send the given message to the specified node.
559
TSSendClusterRPC(TSNodeHandle_t * nh, TSClusterRPCMsg_t * msg)
561
struct in_addr ipaddr = NODE_HANDLE_TO_IP(*nh);
562
RPCHandle_t *rpch = (RPCHandle_t *) & msg->m_handle;
564
OutgoingControl *c = *((OutgoingControl **)
565
((char *) msg - sizeof(OutgoingControl *)));
566
ClusterConfiguration * cc = this_cluster()->current_configuration();
569
ink_release_assert(rpch->u.internal.magic == RPC_HANDLE_MAGIC);
571
if ((m = cc->find(ipaddr.s_addr))) {
572
int len = c->len - sizeof(OutgoingControl *);
573
ink_release_assert((size_t) len >= sizeof(TSClusterRPCMsg_t));
575
clusterProcessor.invoke_remote(m, rpch->u.internal.cluster_function,
576
msg, len, (CLUSTER_OPT_STEAL | CLUSTER_OPT_DATA_IS_OCONTROL));
577
Debug("cluster_api", "TSSendClusterRPC: msg 0x%x dlen %d [%u.%u.%u.%u] sent", msg, len, DOT_SEPARATED(ipaddr.s_addr));
579
Debug("cluster_api", "TSSendClusterRPC: msg 0x%x to [%u.%u.%u.%u] dropped", msg, DOT_SEPARATED(ipaddr.s_addr));
585
#endif /* NON_MODULAR */
588
* End of ClusterAPI.cc