~ps10gel/ubuntu/xenial/trafficserver/6.2.0

« back to all changes in this revision

Viewing changes to iocore/cluster/ClusterAPI.cc

  • Committer: Bazaar Package Importer
  • Author(s): Arno Toell
  • Date: 2011-01-13 11:49:18 UTC
  • Revision ID: james.westby@ubuntu.com-20110113114918-vu422h8dknrgkj15
Tags: upstream-2.1.5-unstable
ImportĀ upstreamĀ versionĀ 2.1.5-unstable

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/** @file
 
2
 
 
3
  A brief file description
 
4
 
 
5
  @section license License
 
6
 
 
7
  Licensed to the Apache Software Foundation (ASF) under one
 
8
  or more contributor license agreements.  See the NOTICE file
 
9
  distributed with this work for additional information
 
10
  regarding copyright ownership.  The ASF licenses this file
 
11
  to you under the Apache License, Version 2.0 (the
 
12
  "License"); you may not use this file except in compliance
 
13
  with the License.  You may obtain a copy of the License at
 
14
 
 
15
      http://www.apache.org/licenses/LICENSE-2.0
 
16
 
 
17
  Unless required by applicable law or agreed to in writing, software
 
18
  distributed under the License is distributed on an "AS IS" BASIS,
 
19
  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
20
  See the License for the specific language governing permissions and
 
21
  limitations under the License.
 
22
 */
 
23
 
 
24
 
 
25
/****************************************************************************
 
26
 
 
27
  ClusterAPI.cc
 
28
 
 
29
        Support for Cluster RPC API.
 
30
****************************************************************************/
 
31
#include "P_Cluster.h"
 
32
 
 
33
#ifdef NON_MODULAR
 
34
#include "InkAPIInternal.h"
 
35
 
 
36
class ClusterAPIPeriodicSM;
 
37
static void send_machine_online_list(TSClusterStatusHandle_t *);
 
38
 
 
39
typedef struct node_callout_entry
 
40
{
 
41
  Ptr<ProxyMutex> mutex;
 
42
  TSClusterStatusFunction func;
 
43
  int state;                    // See NE_STATE_XXX defines
 
44
} node_callout_entry_t;
 
45
 
 
46
#define NE_STATE_FREE                   0
 
47
#define NE_STATE_INITIALIZED            1
 
48
 
 
49
#define MAX_CLUSTERSTATUS_CALLOUTS      32
 
50
 
 
51
static ProxyMutex *ClusterAPI_mutex;
 
52
static ClusterAPIPeriodicSM *periodicSM;
 
53
 
 
54
static node_callout_entry_t status_callouts[MAX_CLUSTERSTATUS_CALLOUTS];
 
55
static TSClusterRPCFunction RPC_Functions[API_END_CLUSTER_FUNCTION];
 
56
 
 
57
#define INDEX_TO_CLUSTER_STATUS_HANDLE(i) ((TSClusterStatusHandle_t)((i)))
 
58
#define CLUSTER_STATUS_HANDLE_TO_INDEX(h) ((int) ((h)))
 
59
#define NODE_HANDLE_TO_IP(h) (*((struct in_addr *) &((h))))
 
60
#define RPC_FUNCTION_KEY_TO_CLUSTER_NUMBER(k) ((int)((k)))
 
61
#define IP_TO_NODE_HANDLE(ip) ((TSNodeHandle_t)((ip)))
 
62
#define SIZEOF_RPC_MSG_LESS_DATA (sizeof(TSClusterRPCMsg_t) - \
 
63
         (sizeof(TSClusterRPCMsg_t) - sizeof(TSClusterRPCHandle_t)))
 
64
 
 
65
typedef struct RPCHandle
 
66
{
 
67
  union
 
68
  {                             // Note: All union elements are assumed to be the same size
 
69
    //       sizeof(u.internal) == sizeof(u.external)
 
70
    TSClusterRPCHandle_t external;
 
71
    struct real_format
 
72
    {
 
73
      int cluster_function;
 
74
      int magic;
 
75
    } internal;
 
76
  } u;
 
77
} RPCHandle_t;
 
78
 
 
79
#define RPC_HANDLE_MAGIC 0x12345678
 
80
 
 
81
class MachineStatusSM;
 
82
typedef int (MachineStatusSM::*MachineStatusSMHandler) (int, void *);
 
83
class MachineStatusSM:public Continuation
 
84
{
 
85
public:
 
86
  // Broadcast constructor
 
87
  MachineStatusSM(TSNodeHandle_t h, TSNodeStatus_t s):_node_handle(h), _node_status(s), _status_handle(0),
 
88
    _broadcast(1), _restart(0), _next_n(0)
 
89
  {
 
90
    SET_HANDLER((MachineStatusSMHandler)
 
91
                & MachineStatusSM::MachineStatusSMEvent);
 
92
  }
 
93
  // Unicast constructor
 
94
  MachineStatusSM(TSNodeHandle_t h, TSNodeStatus_t s,
 
95
                  TSClusterStatusHandle_t sh):_node_handle(h), _node_status(s), _status_handle(sh),
 
96
    _broadcast(0), _restart(0), _next_n(0)
 
97
  {
 
98
    SET_HANDLER((MachineStatusSMHandler)
 
99
                & MachineStatusSM::MachineStatusSMEvent);
 
100
  }
 
101
  // Send machine online list constructor
 
102
MachineStatusSM(TSClusterStatusHandle_t sh):
 
103
  _node_handle(0), _node_status(NODE_ONLINE), _status_handle(sh), _broadcast(0), _restart(0), _next_n(0) {
 
104
    SET_HANDLER((MachineStatusSMHandler)
 
105
                & MachineStatusSM::MachineStatusSMEvent);
 
106
  }
 
107
  ~MachineStatusSM() {
 
108
  }
 
109
  int MachineStatusSMEvent(Event * e, void *d);
 
110
 
 
111
private:
 
112
  TSNodeHandle_t _node_handle;
 
113
  TSNodeStatus_t _node_status;
 
114
  TSClusterStatusHandle_t _status_handle;      // Valid only if !_broadcast
 
115
  int _broadcast;
 
116
  int _restart;
 
117
  int _next_n;
 
118
};
 
119
 
 
120
int
 
121
MachineStatusSM::MachineStatusSMEvent(Event * e, void *d)
 
122
{
 
123
  NOWARN_UNUSED(e);
 
124
  NOWARN_UNUSED(d);
 
125
  int n;
 
126
  EThread *et = this_ethread();
 
127
 
 
128
  if (_broadcast) {
 
129
    /////////////////////////////////////////////////////
 
130
    // Broadcast node transition to all subscribers
 
131
    /////////////////////////////////////////////////////
 
132
    n = _restart ? _next_n : 0;
 
133
    for (; n < MAX_CLUSTERSTATUS_CALLOUTS; ++n) {
 
134
      if (status_callouts[n].func && (status_callouts[n].state == NE_STATE_INITIALIZED)) {
 
135
 
 
136
        MUTEX_TRY_LOCK(lock, status_callouts[n].mutex, et);
 
137
        if (lock) {
 
138
          status_callouts[n].func(&_node_handle, _node_status);
 
139
          Debug("cluster_api", "callout: n %d ([%u.%u.%u.%u], %d)", n, DOT_SEPARATED(_node_handle), _node_status);
 
140
        } else {
 
141
          _restart = 1;
 
142
          _next_n = n;
 
143
          return EVENT_CONT;
 
144
        }
 
145
      }
 
146
    }
 
147
  } else {
 
148
    if (!_node_handle) {
 
149
      /////////////////////////////////////////////////////
 
150
      // Send online node list to a specific subscriber
 
151
      /////////////////////////////////////////////////////
 
152
      n = CLUSTER_STATUS_HANDLE_TO_INDEX(_status_handle);
 
153
      if (status_callouts[n].func) {
 
154
        MUTEX_TRY_LOCK(lock, status_callouts[n].mutex, et);
 
155
        if (lock) {
 
156
          int mi;
 
157
          unsigned int my_ipaddr = (this_cluster_machine())->ip;
 
158
          ClusterConfiguration *cc;
 
159
 
 
160
          TSNodeHandle_t nh;
 
161
 
 
162
          cc = this_cluster()->current_configuration();
 
163
          if (cc) {
 
164
            for (mi = 0; mi < cc->n_machines; ++mi) {
 
165
              if (cc->machines[mi]->ip != my_ipaddr) {
 
166
                nh = IP_TO_NODE_HANDLE(cc->machines[mi]->ip);
 
167
                status_callouts[n].func(&nh, NODE_ONLINE);
 
168
 
 
169
                Debug("cluster_api",
 
170
                      "initial callout: n %d ([%u.%u.%u.%u], %d)", n, DOT_SEPARATED(cc->machines[mi]->ip), NODE_ONLINE);
 
171
              }
 
172
            }
 
173
          }
 
174
          status_callouts[n].state = NE_STATE_INITIALIZED;
 
175
 
 
176
        } else {
 
177
          _restart = 1;
 
178
          _next_n = n;
 
179
          return EVENT_CONT;
 
180
        }
 
181
      }
 
182
    } else {
 
183
      /////////////////////////////////////////////////////
 
184
      // Send node status to a specific subscriber
 
185
      /////////////////////////////////////////////////////
 
186
      n = CLUSTER_STATUS_HANDLE_TO_INDEX(_status_handle);
 
187
      if (status_callouts[n].func) {
 
188
        MUTEX_TRY_LOCK(lock, status_callouts[n].mutex, et);
 
189
        if (lock) {
 
190
          status_callouts[n].func(&_node_handle, _node_status);
 
191
 
 
192
          Debug("cluster_api",
 
193
                "directed callout: n %d ([%u.%u.%u.%u], %d)", n, DOT_SEPARATED(_node_handle), _node_status);
 
194
        } else {
 
195
          _restart = 1;
 
196
          _next_n = n;
 
197
          return EVENT_CONT;
 
198
        }
 
199
      }
 
200
    }
 
201
  }
 
202
  delete this;
 
203
  return EVENT_DONE;
 
204
}
 
205
 
 
206
class ClusterAPIPeriodicSM;
 
207
typedef int (ClusterAPIPeriodicSM::*ClusterAPIPeriodicSMHandler) (int, void *);
 
208
class ClusterAPIPeriodicSM:public Continuation
 
209
{
 
210
public:
 
211
  ClusterAPIPeriodicSM(ProxyMutex * m):Continuation(m), _active_msmp(0)
 
212
  {
 
213
    SET_HANDLER((ClusterAPIPeriodicSMHandler)
 
214
                & ClusterAPIPeriodicSM::ClusterAPIPeriodicSMEvent);
 
215
  }
 
216
   ~ClusterAPIPeriodicSM()
 
217
  {
 
218
  }
 
219
  int ClusterAPIPeriodicSMEvent(int, void *);
 
220
  MachineStatusSM *GetNextSM();
 
221
 
 
222
private:
 
223
  MachineStatusSM * _active_msmp;
 
224
};
 
225
 
 
226
static InkAtomicList status_callout_atomic_q;
 
227
static Queue<MachineStatusSM> status_callout_q;
 
228
 
 
229
MachineStatusSM *
 
230
ClusterAPIPeriodicSM::GetNextSM()
 
231
{
 
232
  MachineStatusSM *msmp;
 
233
  MachineStatusSM *msmp_next;
 
234
 
 
235
  while (1) {
 
236
    msmp = status_callout_q.pop();
 
237
    if (!msmp) {
 
238
      msmp = (MachineStatusSM *)
 
239
        ink_atomiclist_popall(&status_callout_atomic_q);
 
240
      if (msmp) {
 
241
        while (msmp) {
 
242
          msmp_next = (MachineStatusSM *) msmp->link.next;
 
243
          msmp->link.next = 0;
 
244
          status_callout_q.push(msmp);
 
245
          msmp = msmp_next;
 
246
        }
 
247
        continue;
 
248
      } else {
 
249
        break;
 
250
      }
 
251
    } else {
 
252
      break;
 
253
    }
 
254
  }
 
255
  return msmp;
 
256
}
 
257
 
 
258
int
 
259
ClusterAPIPeriodicSM::ClusterAPIPeriodicSMEvent(int e, void *d)
 
260
{
 
261
  // Maintain node status event order by serializing the processing.
 
262
  int ret;
 
263
 
 
264
  while (1) {
 
265
    if (_active_msmp) {
 
266
      ret = _active_msmp->handleEvent(e, d);
 
267
      if (ret != EVENT_DONE) {
 
268
        return EVENT_CONT;
 
269
      }
 
270
    }
 
271
    _active_msmp = GetNextSM();
 
272
    if (!_active_msmp) {
 
273
      break;
 
274
    }
 
275
  }
 
276
  return EVENT_CONT;
 
277
}
 
278
 
 
279
void
 
280
clusterAPI_init()
 
281
{
 
282
  MachineStatusSM *mssmp = 0;
 
283
  // XXX: BIG RED WARNING!!! Direct null pointer dereference
 
284
  //      Either create MachineStatusSM before ose or axe this function.
 
285
  //      It is used only if NON_MODULAR is defined making that
 
286
  //      flag crashing ClusterProcessor::init()
 
287
  //
 
288
  ink_atomiclist_init(&status_callout_atomic_q,
 
289
                      "cluster API status_callout_q", (char *) &mssmp->link.next - (char *) mssmp);
 
290
  ClusterAPI_mutex = new_ProxyMutex();
 
291
  MUTEX_TRY_LOCK(lock, ClusterAPI_mutex, this_ethread());
 
292
  ink_release_assert(lock);     // Should never fail
 
293
  periodicSM = NEW(new ClusterAPIPeriodicSM(ClusterAPI_mutex));
 
294
 
 
295
  // TODO: Should we do something with this return value?
 
296
  eventProcessor.schedule_every(periodicSM, HRTIME_SECONDS(1), ET_CALL);
 
297
}
 
298
 
 
299
/*
 
300
 *  Add the given function to the node status callout list which is
 
301
 *  invoked on each machine up/down transition.
 
302
 *
 
303
 *  Note: Using blocking mutex since interface is synchronous and is only
 
304
 *        called at plugin load time.
 
305
 */
 
306
int
 
307
TSAddClusterStatusFunction(TSClusterStatusFunction Status_Function, TSMutex m, TSClusterStatusHandle_t * h)
 
308
{
 
309
  Debug("cluster_api", "TSAddClusterStatusFunction func 0x%x", Status_Function);
 
310
  int n;
 
311
  EThread *e = this_ethread();
 
312
 
 
313
  ink_release_assert(Status_Function);
 
314
  MUTEX_TAKE_LOCK(ClusterAPI_mutex, e);
 
315
  for (n = 0; n < MAX_CLUSTERSTATUS_CALLOUTS; ++n) {
 
316
    if (!status_callouts[n].func) {
 
317
      status_callouts[n].mutex = (ProxyMutex *) m;
 
318
      status_callouts[n].func = Status_Function;
 
319
      MUTEX_UNTAKE_LOCK(ClusterAPI_mutex, e);
 
320
      *h = INDEX_TO_CLUSTER_STATUS_HANDLE(n);
 
321
 
 
322
      Debug("cluster_api", "TSAddClusterStatusFunction: func 0x%x n %d", Status_Function, n);
 
323
      return 0;
 
324
    }
 
325
  }
 
326
  MUTEX_UNTAKE_LOCK(ClusterAPI_mutex, e);
 
327
  return 1;
 
328
}
 
329
 
 
330
/*
 
331
 *  Remove the given function from the node status callout list
 
332
 *  established via TSAddClusterStatusFunction().
 
333
 *
 
334
 *  Note: Using blocking mutex since interface is synchronous and is only
 
335
 *        called at plugin unload time (unload currently not supported).
 
336
 */
 
337
int
 
338
TSDeleteClusterStatusFunction(TSClusterStatusHandle_t * h)
 
339
{
 
340
  int n = CLUSTER_STATUS_HANDLE_TO_INDEX(*h);
 
341
  EThread *e = this_ethread();
 
342
 
 
343
  ink_release_assert((n >= 0) && (n < MAX_CLUSTERSTATUS_CALLOUTS));
 
344
  Debug("cluster_api", "TSDeleteClusterStatusFunction: n %d", n);
 
345
 
 
346
  MUTEX_TAKE_LOCK(ClusterAPI_mutex, e);
 
347
  status_callouts[n].mutex = 0;
 
348
  status_callouts[n].func = (TSClusterStatusFunction) 0;
 
349
  status_callouts[n].state = NE_STATE_FREE;
 
350
  MUTEX_UNTAKE_LOCK(ClusterAPI_mutex, e);
 
351
 
 
352
  return 0;
 
353
}
 
354
 
 
355
int
 
356
TSNodeHandleToIPAddr(TSNodeHandle_t * h, struct in_addr *in)
 
357
{
 
358
  *in = NODE_HANDLE_TO_IP(*h);
 
359
  return 0;
 
360
}
 
361
 
 
362
void
 
363
TSGetMyNodeHandle(TSNodeHandle_t * h)
 
364
{
 
365
  *h = IP_TO_NODE_HANDLE((this_cluster_machine())->ip);
 
366
}
 
367
 
 
368
/*
 
369
 *  Enable node status callouts for the added callout entry.
 
370
 *  Issued once after the call to TSAddClusterStatusFunction()
 
371
 *  to get the current node configuration.  All subsequent
 
372
 *  callouts are updates to the state obtained at this point.
 
373
 */
 
374
void
 
375
TSEnableClusterStatusCallout(TSClusterStatusHandle_t * h)
 
376
{
 
377
  int ci = CLUSTER_STATUS_HANDLE_TO_INDEX(*h);
 
378
  // This isn't used.
 
379
  // int my_ipaddr = (this_cluster_machine())->ip;
 
380
  ink_release_assert((ci >= 0) && (ci < MAX_CLUSTERSTATUS_CALLOUTS));
 
381
 
 
382
  if (status_callouts[ci].state == NE_STATE_INITIALIZED) {
 
383
    return;
 
384
  }
 
385
 
 
386
  Debug("cluster_api", "TSEnableClusterStatusCallout: n %d", ci);
 
387
  send_machine_online_list(h);
 
388
}
 
389
 
 
390
static void
 
391
send_machine_online_list(TSClusterStatusHandle_t * h)
 
392
{
 
393
  MachineStatusSM *msm = NEW(new MachineStatusSM(*h));
 
394
 
 
395
  ink_atomiclist_push(&status_callout_atomic_q, (void *) msm);
 
396
}
 
397
 
 
398
/*
 
399
 *  Send node online to a specific cluster status entry.
 
400
 */
 
401
// This doesn't seem to be used...
 
402
#ifdef NOT_USED_HERE
 
403
static void
 
404
directed_machine_online(int Ipaddr, TSClusterStatusHandle_t * h)
 
405
{
 
406
  MachineStatusSM *msm = NEW(new MachineStatusSM(IP_TO_NODE_HANDLE(Ipaddr), NODE_ONLINE, *h));
 
407
 
 
408
  ink_atomiclist_push(&status_callout_atomic_q, (void *) msm);
 
409
}
 
410
#endif
 
411
 
 
412
/*
 
413
 *  Called directly by the Cluster upon detection of node online.
 
414
 */
 
415
void
 
416
machine_online_APIcallout(int Ipaddr)
 
417
{
 
418
  MachineStatusSM *msm = NEW(new MachineStatusSM(IP_TO_NODE_HANDLE(Ipaddr), NODE_ONLINE));
 
419
 
 
420
  ink_atomiclist_push(&status_callout_atomic_q, (void *) msm);
 
421
}
 
422
 
 
423
/*
 
424
 *  Called directly by the Cluster upon detection of node offline.
 
425
 */
 
426
void
 
427
machine_offline_APIcallout(int Ipaddr)
 
428
{
 
429
  MachineStatusSM *msm = NEW(new MachineStatusSM(IP_TO_NODE_HANDLE(Ipaddr), NODE_OFFLINE));
 
430
 
 
431
  ink_atomiclist_push(&status_callout_atomic_q, (void *) msm);
 
432
}
 
433
 
 
434
/*
 
435
 *  Associate the given RPC function with the given key.
 
436
 *
 
437
 *  Note: Using blocking mutex since interface is synchronous and is only
 
438
 *        called at plugin load time.
 
439
 */
 
440
int
 
441
TSAddClusterRPCFunction(TSClusterRPCKey_t k, TSClusterRPCFunction func, TSClusterRPCHandle_t * h)
 
442
{
 
443
  RPCHandle_t handle;
 
444
  int n = RPC_FUNCTION_KEY_TO_CLUSTER_NUMBER(k);
 
445
  EThread *e = this_ethread();
 
446
 
 
447
  ink_release_assert(func);
 
448
  ink_release_assert((n >= API_STARECT_CLUSTER_FUNCTION)
 
449
                     && (n <= API_END_CLUSTER_FUNCTION));
 
450
  Debug("cluster_api", "TSAddClusterRPCFunction: key %d func 0x%x", k, func);
 
451
 
 
452
  handle.u.internal.cluster_function = n;
 
453
  handle.u.internal.magic = RPC_HANDLE_MAGIC;
 
454
 
 
455
  MUTEX_TAKE_LOCK(ClusterAPI_mutex, e);
 
456
  if (n < API_END_CLUSTER_FUNCTION)
 
457
    RPC_Functions[n] = func;
 
458
  MUTEX_UNTAKE_LOCK(ClusterAPI_mutex, e);
 
459
 
 
460
  *h = handle.u.external;
 
461
  return 0;
 
462
}
 
463
 
 
464
/*
 
465
 *  Remove the given RPC function added via TSAddClusterRPCFunction().
 
466
 *
 
467
 *  Note: Using blocking mutex since interface is synchronous and is only
 
468
 *        called at plugin unload time (unload currently not supported).
 
469
 */
 
470
int
 
471
TSDeleteClusterRPCFunction(TSClusterRPCHandle_t * rpch)
 
472
{
 
473
  RPCHandle_t *h = (RPCHandle_t *) rpch;
 
474
  EThread *e = this_ethread();
 
475
 
 
476
  ink_release_assert(((h->u.internal.cluster_function >= API_STARECT_CLUSTER_FUNCTION)
 
477
                      && (h->u.internal.cluster_function <= API_END_CLUSTER_FUNCTION)));
 
478
  Debug("cluster_api", "TSDeleteClusterRPCFunction: n %d", h->u.internal.cluster_function);
 
479
 
 
480
  MUTEX_TAKE_LOCK(ClusterAPI_mutex, e);
 
481
  RPC_Functions[h->u.internal.cluster_function] = 0;
 
482
  MUTEX_UNTAKE_LOCK(ClusterAPI_mutex, e);
 
483
  return 0;
 
484
}
 
485
 
 
486
/*
 
487
 *  Cluster calls us here for each RPC API function.
 
488
 */
 
489
void
 
490
default_api_ClusterFunction(ClusterMachine * m, void *data, int len)
 
491
{
 
492
  Debug("cluster_api", "default_api_ClusterFunction: [%u.%u.%u.%u] data 0x%x len %d", DOT_SEPARATED(m->ip), data, len);
 
493
 
 
494
  TSClusterRPCMsg_t *msg = (TSClusterRPCMsg_t *) data;
 
495
  RPCHandle_t *rpch = (RPCHandle_t *) & msg->m_handle;
 
496
  int cluster_function = rpch->u.internal.cluster_function;
 
497
 
 
498
  ink_release_assert((size_t) len >= sizeof(TSClusterRPCMsg_t));
 
499
  ink_release_assert(((cluster_function >= API_STARECT_CLUSTER_FUNCTION)
 
500
                      && (cluster_function <= API_END_CLUSTER_FUNCTION)));
 
501
 
 
502
  if (cluster_function < API_END_CLUSTER_FUNCTION && RPC_Functions[cluster_function]) {
 
503
    int msg_data_len = len - SIZEOF_RPC_MSG_LESS_DATA;
 
504
    TSNodeHandle_t nh = IP_TO_NODE_HANDLE(m->ip);
 
505
    (*RPC_Functions[cluster_function]) (&nh, msg, msg_data_len);
 
506
  } else {
 
507
    clusterProcessor.free_remote_data((char *) data, len);
 
508
  }
 
509
}
 
510
 
 
511
/*
 
512
 *  Free TSClusterRPCMsg_t received via the RPC function.
 
513
 */
 
514
void
 
515
TSFreeRPCMsg(TSClusterRPCMsg_t * msg, int msg_data_len)
 
516
{
 
517
  RPCHandle_t *rpch = (RPCHandle_t *) & msg->m_handle;
 
518
  ink_release_assert(rpch->u.internal.magic == RPC_HANDLE_MAGIC);
 
519
  Debug("cluster_api", "TSFreeRPCMsg: msg 0x%x msg_data_len %d", msg, msg_data_len);
 
520
 
 
521
  clusterProcessor.free_remote_data((char *) msg, msg_data_len + SIZEOF_RPC_MSG_LESS_DATA);
 
522
}
 
523
 
 
524
/*
 
525
 *  Allocate a message structure for use in the call to TSSendClusterRPC().
 
526
 */
 
527
TSClusterRPCMsg_t *
 
528
TSAllocClusterRPCMsg(TSClusterRPCHandle_t * h, int data_size)
 
529
{
 
530
  ink_debug_assert(data_size >= 4);
 
531
  if (data_size < 4) {
 
532
    /* Message must be at least 4 bytes in length */
 
533
    return (TSClusterRPCMsg_t *) 0;
 
534
  }
 
535
 
 
536
  TSClusterRPCMsg_t *rpcm;
 
537
  OutgoingControl *c = OutgoingControl::alloc();
 
538
 
 
539
  c->len = sizeof(OutgoingControl *) + SIZEOF_RPC_MSG_LESS_DATA + data_size;
 
540
  c->alloc_data();
 
541
  *((OutgoingControl **) c->data) = c;
 
542
 
 
543
  rpcm = (TSClusterRPCMsg_t *) (c->data + sizeof(OutgoingControl *));
 
544
  rpcm->m_handle = *h;
 
545
 
 
546
  /*
 
547
   * Note: We have carefully constructed TSClusterRPCMsg_t so
 
548
   *       m_data[] is 8 byte aligned.  This allows the user to
 
549
   *       cast m_data[] to any type without any consideration
 
550
   *       for alignment issues.
 
551
   */
 
552
  return rpcm;
 
553
}
 
554
 
 
555
/*
 
556
 *  Send the given message to the specified node.
 
557
 */
 
558
int
 
559
TSSendClusterRPC(TSNodeHandle_t * nh, TSClusterRPCMsg_t * msg)
 
560
{
 
561
  struct in_addr ipaddr = NODE_HANDLE_TO_IP(*nh);
 
562
  RPCHandle_t *rpch = (RPCHandle_t *) & msg->m_handle;
 
563
 
 
564
  OutgoingControl *c = *((OutgoingControl **)
 
565
                         ((char *) msg - sizeof(OutgoingControl *)));
 
566
  ClusterConfiguration * cc = this_cluster()->current_configuration();
 
567
  ClusterMachine *m;
 
568
 
 
569
  ink_release_assert(rpch->u.internal.magic == RPC_HANDLE_MAGIC);
 
570
 
 
571
  if ((m = cc->find(ipaddr.s_addr))) {
 
572
    int len = c->len - sizeof(OutgoingControl *);
 
573
    ink_release_assert((size_t) len >= sizeof(TSClusterRPCMsg_t));
 
574
 
 
575
    clusterProcessor.invoke_remote(m, rpch->u.internal.cluster_function,
 
576
                                   msg, len, (CLUSTER_OPT_STEAL | CLUSTER_OPT_DATA_IS_OCONTROL));
 
577
    Debug("cluster_api", "TSSendClusterRPC: msg 0x%x dlen %d [%u.%u.%u.%u] sent", msg, len, DOT_SEPARATED(ipaddr.s_addr));
 
578
  } else {
 
579
    Debug("cluster_api", "TSSendClusterRPC: msg 0x%x to [%u.%u.%u.%u] dropped", msg, DOT_SEPARATED(ipaddr.s_addr));
 
580
    c->freeall();
 
581
  }
 
582
 
 
583
  return 0;
 
584
}
 
585
#endif /* NON_MODULAR */
 
586
 
 
587
/*
 
588
 *  End of ClusterAPI.cc
 
589
 */