~ps10gel/ubuntu/xenial/trafficserver/6.2.0

« back to all changes in this revision

Viewing changes to lib/records/P_RecCore.i

  • Committer: Package Import Robot
  • Author(s): Aron Xu
  • Date: 2013-05-09 01:00:04 UTC
  • mto: (1.1.11) (5.3.3 experimental)
  • mto: This revision was merged to the branch mainline in revision 15.
  • Revision ID: package-import@ubuntu.com-20130509010004-9fqq9n0adseg3f8w
Tags: upstream-3.3.2
ImportĀ upstreamĀ versionĀ 3.3.2

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
/** @file
2
 
 
3
 
  Private record core definitions
4
 
 
5
 
  @section license License
6
 
 
7
 
  Licensed to the Apache Software Foundation (ASF) under one
8
 
  or more contributor license agreements.  See the NOTICE file
9
 
  distributed with this work for additional information
10
 
  regarding copyright ownership.  The ASF licenses this file
11
 
  to you under the Apache License, Version 2.0 (the
12
 
  "License"); you may not use this file except in compliance
13
 
  with the License.  You may obtain a copy of the License at
14
 
 
15
 
      http://www.apache.org/licenses/LICENSE-2.0
16
 
 
17
 
  Unless required by applicable law or agreed to in writing, software
18
 
  distributed under the License is distributed on an "AS IS" BASIS,
19
 
  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20
 
  See the License for the specific language governing permissions and
21
 
  limitations under the License.
22
 
 */
23
 
 
24
 
#include "TextBuffer.h"
25
 
#include "Tokenizer.h"
26
 
#include "ink_string.h"
27
 
 
28
 
#include "P_RecCompatibility.h"
29
 
#include "P_RecUtils.h"
30
 
 
31
 
 
32
 
//-------------------------------------------------------------------------
33
 
// i_am_the_record_owner
34
 
//-------------------------------------------------------------------------
35
 
static bool
36
 
i_am_the_record_owner(RecT rec_type)
37
 
{
38
 
#if defined (REC_LOCAL)
39
 
 
40
 
  switch (rec_type) {
41
 
  case RECT_CONFIG:
42
 
  case RECT_NODE:
43
 
  case RECT_CLUSTER:
44
 
  case RECT_LOCAL:
45
 
    return true;
46
 
  case RECT_PROCESS:
47
 
  case RECT_PLUGIN:
48
 
    return false;
49
 
  default:
50
 
    ink_debug_assert(!"Unexpected RecT type");
51
 
    return false;
52
 
  }
53
 
 
54
 
#elif defined (REC_PROCESS)
55
 
 
56
 
  // g_mode_type is defined in either RecLocal.cc or RecProcess.cc.
57
 
  // We can access it since we're inlined by on of these two files.
58
 
  if (g_mode_type == RECM_CLIENT) {
59
 
    switch (rec_type) {
60
 
    case RECT_PROCESS:
61
 
    case RECT_PLUGIN:
62
 
      return true;
63
 
    case RECT_CONFIG:
64
 
    case RECT_NODE:
65
 
    case RECT_CLUSTER:
66
 
    case RECT_LOCAL:
67
 
      return false;
68
 
    default:
69
 
      ink_debug_assert(!"Unexpected RecT type");
70
 
      return false;
71
 
    }
72
 
  } else if (g_mode_type == RECM_STAND_ALONE) {
73
 
    switch (rec_type) {
74
 
    case RECT_CONFIG:
75
 
    case RECT_PROCESS:
76
 
    case RECT_NODE:
77
 
    case RECT_CLUSTER:
78
 
    case RECT_LOCAL:
79
 
    case RECT_PLUGIN:
80
 
      return true;
81
 
    default:
82
 
      ink_debug_assert(!"Unexpected RecT type");
83
 
      return false;
84
 
    }
85
 
  }
86
 
#else
87
 
 
88
 
#error "Required #define not specificed; expected REC_LOCAL or REC_PROCESS"
89
 
 
90
 
#endif
91
 
 
92
 
  return false;
93
 
}
94
 
 
95
 
 
96
 
//-------------------------------------------------------------------------
97
 
// send_set_message
98
 
//-------------------------------------------------------------------------
99
 
static int
100
 
send_set_message(RecRecord * record)
101
 
{
102
 
  RecMessage *m;
103
 
 
104
 
  rec_mutex_acquire(&(record->lock));
105
 
  m = RecMessageAlloc(RECG_SET);
106
 
  m = RecMessageMarshal_Realloc(m, record);
107
 
  RecDebug(DL_Note, "[send] RECG_SET [%d bytes]", sizeof(RecMessageHdr) + m->o_write - m->o_start);
108
 
  RecMessageSend(m);
109
 
  RecMessageFree(m);
110
 
  rec_mutex_release(&(record->lock));
111
 
 
112
 
  return REC_ERR_OKAY;
113
 
}
114
 
 
115
 
 
116
 
//-------------------------------------------------------------------------
117
 
// send_register_message
118
 
//-------------------------------------------------------------------------
119
 
static int
120
 
send_register_message(RecRecord * record)
121
 
{
122
 
  RecMessage *m;
123
 
 
124
 
  rec_mutex_acquire(&(record->lock));
125
 
  m = RecMessageAlloc(RECG_REGISTER);
126
 
  m = RecMessageMarshal_Realloc(m, record);
127
 
  RecDebug(DL_Note, "[send] RECG_REGISTER [%d bytes]", sizeof(RecMessageHdr) + m->o_write - m->o_start);
128
 
  RecMessageSend(m);
129
 
  RecMessageFree(m);
130
 
  rec_mutex_release(&(record->lock));
131
 
 
132
 
  return REC_ERR_OKAY;
133
 
}
134
 
 
135
 
 
136
 
//-------------------------------------------------------------------------
137
 
// send_push_message
138
 
//-------------------------------------------------------------------------
139
 
static int
140
 
send_push_message()
141
 
{
142
 
  RecRecord *r;
143
 
  RecMessage *m;
144
 
  int i, num_records;
145
 
  bool send_msg = false;
146
 
 
147
 
  m = RecMessageAlloc(RECG_PUSH);
148
 
  num_records = g_num_records;
149
 
  for (i = 0; i < num_records; i++) {
150
 
    r = &(g_records[i]);
151
 
    rec_mutex_acquire(&(r->lock));
152
 
    if (i_am_the_record_owner(r->rec_type)) {
153
 
      if (r->sync_required & REC_PEER_SYNC_REQUIRED) {
154
 
        m = RecMessageMarshal_Realloc(m, r);
155
 
        r->sync_required = r->sync_required & ~REC_PEER_SYNC_REQUIRED;
156
 
        send_msg = true;
157
 
      }
158
 
    }
159
 
    rec_mutex_release(&(r->lock));
160
 
  }
161
 
  if (send_msg) {
162
 
    RecDebug(DL_Note, "[send] RECG_PUSH [%d bytes]", sizeof(RecMessageHdr) + m->o_write - m->o_start);
163
 
    RecMessageSend(m);
164
 
  }
165
 
  RecMessageFree(m);
166
 
 
167
 
  return REC_ERR_OKAY;
168
 
}
169
 
 
170
 
 
171
 
//-------------------------------------------------------------------------
172
 
// send_pull_message
173
 
//-------------------------------------------------------------------------
174
 
static int
175
 
send_pull_message(RecMessageT msg_type)
176
 
{
177
 
  RecRecord *r;
178
 
  RecMessage *m;
179
 
  int i, num_records;
180
 
 
181
 
  m = RecMessageAlloc(msg_type);
182
 
  switch (msg_type) {
183
 
 
184
 
  case RECG_PULL_REQ:
185
 
    // We're requesting all of the records from our peer.  No payload
186
 
    // here, just send the message.
187
 
    RecDebug(DL_Note, "[send] RECG_PULL_REQ [%d bytes]", sizeof(RecMessageHdr) + m->o_write - m->o_start);
188
 
    break;
189
 
 
190
 
  case RECG_PULL_ACK:
191
 
    // Respond to a RECG_PULL_REQ message from our peer.  Send ALL
192
 
    // records!  Also be sure to send a response even if it has no
193
 
    // payload.  Our peer may be blocking and waiting for a response!
194
 
    num_records = g_num_records;
195
 
    for (i = 0; i < num_records; i++) {
196
 
      r = &(g_records[i]);
197
 
      if (i_am_the_record_owner(r->rec_type) ||
198
 
          (REC_TYPE_IS_STAT(r->rec_type) && !(r->registered)) ||
199
 
          (REC_TYPE_IS_STAT(r->rec_type) && !(r->stat_meta.persist_type != RECP_NON_PERSISTENT))) {
200
 
        rec_mutex_acquire(&(r->lock));
201
 
        m = RecMessageMarshal_Realloc(m, r);
202
 
        r->sync_required = r->sync_required & ~REC_PEER_SYNC_REQUIRED;
203
 
        rec_mutex_release(&(r->lock));
204
 
      }
205
 
    }
206
 
    RecDebug(DL_Note, "[send] RECG_PULL_ACK [%d bytes]", sizeof(RecMessageHdr) + m->o_write - m->o_start);
207
 
    break;
208
 
 
209
 
  default:
210
 
    RecMessageFree(m);
211
 
    return REC_ERR_FAIL;
212
 
 
213
 
  }
214
 
 
215
 
  RecMessageSend(m);
216
 
  RecMessageFree(m);
217
 
 
218
 
  return REC_ERR_OKAY;
219
 
}
220
 
 
221
 
 
222
 
//-------------------------------------------------------------------------
223
 
// recv_message_cb
224
 
//-------------------------------------------------------------------------
225
 
static int
226
 
recv_message_cb(RecMessage * msg, RecMessageT msg_type, void *cookie)
227
 
{
228
 
  REC_NOWARN_UNUSED(cookie);
229
 
 
230
 
  RecRecord *r;
231
 
  RecMessageItr itr;
232
 
 
233
 
  switch (msg_type) {
234
 
 
235
 
  case RECG_SET:
236
 
 
237
 
    RecDebug(DL_Note, "[recv] RECG_SET [%d bytes]", sizeof(RecMessageHdr) + msg->o_end - msg->o_start);
238
 
    if (RecMessageUnmarshalFirst(msg, &itr, &r) != REC_ERR_FAIL) {
239
 
      do {
240
 
        if (REC_TYPE_IS_STAT(r->rec_type)) {
241
 
          RecSetRecord(r->rec_type, r->name, r->data_type, &(r->data), &(r->stat_meta.data_raw));
242
 
        } else {
243
 
          RecSetRecord(r->rec_type, r->name, r->data_type, &(r->data), NULL);
244
 
        }
245
 
      } while (RecMessageUnmarshalNext(msg, &itr, &r) != REC_ERR_FAIL);
246
 
    }
247
 
    break;
248
 
 
249
 
  case RECG_REGISTER:
250
 
    RecDebug(DL_Note, "[recv] RECG_REGISTER [%d bytes]", sizeof(RecMessageHdr) + msg->o_end - msg->o_start);
251
 
    if (RecMessageUnmarshalFirst(msg, &itr, &r) != REC_ERR_FAIL) {
252
 
      do {
253
 
        if (REC_TYPE_IS_STAT(r->rec_type)) {
254
 
          RecRegisterStat(r->rec_type, r->name, r->data_type, r->data_default, r->stat_meta.persist_type);
255
 
        } else if (REC_TYPE_IS_CONFIG(r->rec_type)) {
256
 
          RecRegisterConfig(r->rec_type, r->name, r->data_type,
257
 
                            r->data_default, r->config_meta.update_type,
258
 
                            r->config_meta.check_type, r->config_meta.check_expr, r->config_meta.access_type);
259
 
        }
260
 
      } while (RecMessageUnmarshalNext(msg, &itr, &r) != REC_ERR_FAIL);
261
 
    }
262
 
    break;
263
 
 
264
 
  case RECG_PUSH:
265
 
    RecDebug(DL_Note, "[recv] RECG_PUSH [%d bytes]", sizeof(RecMessageHdr) + msg->o_end - msg->o_start);
266
 
    if (RecMessageUnmarshalFirst(msg, &itr, &r) != REC_ERR_FAIL) {
267
 
      do {
268
 
        RecForceInsert(r);
269
 
      } while (RecMessageUnmarshalNext(msg, &itr, &r) != REC_ERR_FAIL);
270
 
    }
271
 
    break;
272
 
 
273
 
  case RECG_PULL_ACK:
274
 
    RecDebug(DL_Note, "[recv] RECG_PULL_ACK [%d bytes]", sizeof(RecMessageHdr) + msg->o_end - msg->o_start);
275
 
    if (RecMessageUnmarshalFirst(msg, &itr, &r) != REC_ERR_FAIL) {
276
 
      do {
277
 
        RecForceInsert(r);
278
 
      } while (RecMessageUnmarshalNext(msg, &itr, &r) != REC_ERR_FAIL);
279
 
    }
280
 
    break;
281
 
 
282
 
  case RECG_PULL_REQ:
283
 
    RecDebug(DL_Note, "[recv] RECG_PULL_REQ [%d bytes]", sizeof(RecMessageHdr) + msg->o_end - msg->o_start);
284
 
    send_pull_message(RECG_PULL_ACK);
285
 
    break;
286
 
 
287
 
  default:
288
 
    ink_debug_assert(!"Unexpected RecG type");
289
 
    return REC_ERR_FAIL;
290
 
 
291
 
  }
292
 
 
293
 
  return REC_ERR_OKAY;
294
 
}
295
 
 
296
 
 
297
 
//-------------------------------------------------------------------------
298
 
// RecRegisterStatXXX
299
 
//-------------------------------------------------------------------------
300
 
#define REC_REGISTER_STAT_XXX(A, B) \
301
 
  ink_debug_assert((rec_type == RECT_NODE)    || \
302
 
                   (rec_type == RECT_CLUSTER) || \
303
 
                   (rec_type == RECT_PROCESS) || \
304
 
                   (rec_type == RECT_LOCAL)   || \
305
 
                   (rec_type == RECT_PLUGIN));   \
306
 
  RecRecord *r; \
307
 
  RecData my_data_default; \
308
 
  my_data_default.A = data_default; \
309
 
  if ((r = RecRegisterStat(rec_type, name, B, my_data_default, \
310
 
                           persist_type)) != NULL) { \
311
 
    if (i_am_the_record_owner(r->rec_type)) { \
312
 
      r->sync_required = r->sync_required | REC_PEER_SYNC_REQUIRED; \
313
 
    } else { \
314
 
      send_register_message(r); \
315
 
    } \
316
 
    return REC_ERR_OKAY; \
317
 
  } else { \
318
 
    return REC_ERR_FAIL; \
319
 
  }
320
 
 
321
 
int
322
 
RecRegisterStatInt(RecT rec_type, const char *name, RecInt data_default, RecPersistT persist_type)
323
 
{
324
 
  REC_REGISTER_STAT_XXX(rec_int, RECD_INT);
325
 
}
326
 
 
327
 
int
328
 
RecRegisterStatFloat(RecT rec_type, const char *name, RecFloat data_default, RecPersistT persist_type)
329
 
{
330
 
  REC_REGISTER_STAT_XXX(rec_float, RECD_FLOAT);
331
 
}
332
 
 
333
 
int
334
 
RecRegisterStatString(RecT rec_type, const char *name, RecString data_default, RecPersistT persist_type)
335
 
{
336
 
  REC_REGISTER_STAT_XXX(rec_string, RECD_STRING);
337
 
}
338
 
 
339
 
int
340
 
RecRegisterStatCounter(RecT rec_type, const char *name, RecCounter data_default, RecPersistT persist_type)
341
 
{
342
 
  REC_REGISTER_STAT_XXX(rec_counter, RECD_COUNTER);
343
 
}
344
 
 
345
 
 
346
 
//-------------------------------------------------------------------------
347
 
// RecRegisterConfigXXX
348
 
//-------------------------------------------------------------------------
349
 
#define REC_REGISTER_CONFIG_XXX(A, B) \
350
 
  RecRecord *r; \
351
 
  RecData my_data_default; \
352
 
  my_data_default.A = data_default; \
353
 
  if ((r = RecRegisterConfig(rec_type, name, B, my_data_default, \
354
 
                             update_type, check_type, \
355
 
                             check_regex, access_type)) != NULL) { \
356
 
    if (i_am_the_record_owner(r->rec_type)) { \
357
 
      r->sync_required = r->sync_required | REC_PEER_SYNC_REQUIRED; \
358
 
    } else { \
359
 
      send_register_message(r); \
360
 
    } \
361
 
    return REC_ERR_OKAY; \
362
 
  } else { \
363
 
    return REC_ERR_FAIL; \
364
 
  }
365
 
 
366
 
int
367
 
RecRegisterConfigInt(RecT rec_type, const char *name,
368
 
                     RecInt data_default, RecUpdateT update_type,
369
 
                     RecCheckT check_type, const char *check_regex, RecAccessT access_type)
370
 
{
371
 
  ink_debug_assert((rec_type == RECT_CONFIG) || (rec_type == RECT_LOCAL));
372
 
  REC_REGISTER_CONFIG_XXX(rec_int, RECD_INT);
373
 
}
374
 
 
375
 
int
376
 
RecRegisterConfigFloat(RecT rec_type, const char *name,
377
 
                       RecFloat data_default, RecUpdateT update_type,
378
 
                       RecCheckT check_type, const char *check_regex, RecAccessT access_type)
379
 
{
380
 
  ink_debug_assert((rec_type == RECT_CONFIG) || (rec_type == RECT_LOCAL));
381
 
  REC_REGISTER_CONFIG_XXX(rec_float, RECD_FLOAT);
382
 
}
383
 
 
384
 
 
385
 
int
386
 
RecRegisterConfigString(RecT rec_type, const char *name,
387
 
                        const char *data_default_tmp, RecUpdateT update_type,
388
 
                        RecCheckT check_type, const char *check_regex, RecAccessT access_type)
389
 
{
390
 
  RecString data_default = (RecString)data_default_tmp;
391
 
  ink_debug_assert((rec_type == RECT_CONFIG) || (rec_type == RECT_LOCAL));
392
 
  REC_REGISTER_CONFIG_XXX(rec_string, RECD_STRING);
393
 
}
394
 
 
395
 
int
396
 
RecRegisterConfigCounter(RecT rec_type, const char *name,
397
 
                         RecCounter data_default, RecUpdateT update_type,
398
 
                         RecCheckT check_type, const char *check_regex, RecAccessT access_type)
399
 
{
400
 
  ink_debug_assert((rec_type == RECT_CONFIG) || (rec_type == RECT_LOCAL));
401
 
  REC_REGISTER_CONFIG_XXX(rec_counter, RECD_COUNTER);
402
 
}
403
 
 
404
 
 
405
 
//-------------------------------------------------------------------------
406
 
// RecSetRecordXXX
407
 
//-------------------------------------------------------------------------
408
 
int
409
 
RecSetRecord(RecT rec_type, const char *name, RecDataT data_type, RecData *data, RecRawStat *data_raw, bool lock)
410
 
{
411
 
  int err = REC_ERR_OKAY;
412
 
  RecRecord *r1;
413
 
 
414
 
  // FIXME: Most of the time we set, we don't actually need to wrlock
415
 
  // since we are not modifying the g_records_ht.
416
 
  if (lock) {
417
 
    ink_rwlock_wrlock(&g_records_rwlock);
418
 
  }
419
 
 
420
 
  if (ink_hash_table_lookup(g_records_ht, name, (void **) &r1)) {
421
 
    if (i_am_the_record_owner(r1->rec_type)) {
422
 
      rec_mutex_acquire(&(r1->lock));
423
 
      if ((data_type != RECD_NULL) && (r1->data_type != data_type)) {
424
 
        err = REC_ERR_FAIL;
425
 
      } else {
426
 
        if (data_type == RECD_NULL) {
427
 
          ink_assert(data->rec_string);
428
 
          switch (r1->data_type) {
429
 
          case RECD_INT:
430
 
            r1->data.rec_int = ink_atoi64(data->rec_string);
431
 
            data_type = RECD_INT;
432
 
            break;
433
 
          case RECD_FLOAT:
434
 
            r1->data.rec_float = atof(data->rec_string);
435
 
            data_type = RECD_FLOAT;
436
 
            break;
437
 
          case RECD_STRING:
438
 
            data_type = RECD_STRING;
439
 
            r1->data.rec_string = data->rec_string;
440
 
            break;
441
 
          case RECD_COUNTER:
442
 
            r1->data.rec_int = ink_atoi64(data->rec_string);
443
 
            data_type = RECD_COUNTER;
444
 
            break;
445
 
          default:
446
 
            err = REC_ERR_FAIL;
447
 
            break;
448
 
          }
449
 
        }
450
 
        g_num_update[r1->rec_type]++;
451
 
 
452
 
        if (RecDataSet(data_type, &(r1->data), data)) {
453
 
          r1->sync_required = REC_SYNC_REQUIRED;
454
 
          if (REC_TYPE_IS_CONFIG(r1->rec_type)) {
455
 
            r1->config_meta.update_required = REC_UPDATE_REQUIRED;
456
 
          }
457
 
        }
458
 
        if (REC_TYPE_IS_STAT(r1->rec_type) && (data_raw != NULL)) {
459
 
          r1->stat_meta.data_raw = *data_raw;
460
 
        }
461
 
      }
462
 
      rec_mutex_release(&(r1->lock));
463
 
    } else {
464
 
      // We don't need to ats_strdup() here as we will make copies of any
465
 
      // strings when we marshal them into our RecMessage buffer.
466
 
      RecRecord r2;
467
 
      memset(&r2, 0, sizeof(RecRecord));
468
 
      r2.rec_type = rec_type;
469
 
      r2.name = name;
470
 
      r2.data_type = (data_type != RECD_NULL) ? data_type : r1->data_type;
471
 
      r2.data = *data;
472
 
      if (REC_TYPE_IS_STAT(r2.rec_type) && (data_raw != NULL)) {
473
 
        r2.stat_meta.data_raw = *data_raw;
474
 
      }
475
 
      err = send_set_message(&r2);
476
 
    }
477
 
  } else {
478
 
    // Add the record but do not set the 'registered' flag, as this
479
 
    // record really hasn't been registered yet.  Also, in order to
480
 
    // add the record, we need to have a rec_type, so if the user
481
 
    // calls RecSetRecord on a record we haven't registered yet, we
482
 
    // should fail out here.
483
 
    if ((rec_type == RECT_NULL) || (data_type == RECD_NULL)) {
484
 
      err = REC_ERR_FAIL;
485
 
      goto Ldone;
486
 
    }
487
 
    r1 = RecAlloc(rec_type, name, data_type);
488
 
    RecDataSet(data_type, &(r1->data), data);
489
 
    if (REC_TYPE_IS_STAT(r1->rec_type) && (data_raw != NULL)) {
490
 
      r1->stat_meta.data_raw = *data_raw;
491
 
    }
492
 
    if (i_am_the_record_owner(r1->rec_type)) {
493
 
      r1->sync_required = r1->sync_required | REC_PEER_SYNC_REQUIRED;
494
 
    } else {
495
 
      err = send_set_message(r1);
496
 
    }
497
 
    ink_hash_table_insert(g_records_ht, name, (void *) r1);
498
 
 
499
 
  }
500
 
 
501
 
Ldone:
502
 
  if (lock) {
503
 
    ink_rwlock_unlock(&g_records_rwlock);
504
 
  }
505
 
 
506
 
  return err;
507
 
}
508
 
 
509
 
int
510
 
RecSetRecordConvert(const char *name, const RecString rec_string, bool lock)
511
 
{
512
 
  RecData data;
513
 
  data.rec_string = rec_string;
514
 
  return RecSetRecord(RECT_NULL, name, RECD_NULL, &data, NULL, lock);
515
 
}
516
 
 
517
 
int
518
 
RecSetRecordInt(const char *name, RecInt rec_int, bool lock)
519
 
{
520
 
  RecData data;
521
 
  data.rec_int = rec_int;
522
 
  return RecSetRecord(RECT_NULL, name, RECD_INT, &data, NULL, lock);
523
 
}
524
 
 
525
 
int
526
 
RecSetRecordFloat(const char *name, RecFloat rec_float, bool lock)
527
 
{
528
 
  RecData data;
529
 
  data.rec_float = rec_float;
530
 
  return RecSetRecord(RECT_NULL, name, RECD_FLOAT, &data, NULL, lock);
531
 
}
532
 
 
533
 
int
534
 
RecSetRecordString(const char *name, const RecString rec_string, bool lock)
535
 
{
536
 
  RecData data;
537
 
  data.rec_string = rec_string;
538
 
  return RecSetRecord(RECT_NULL, name, RECD_STRING, &data, NULL, lock);
539
 
}
540
 
 
541
 
int
542
 
RecSetRecordCounter(const char *name, RecCounter rec_counter, bool lock)
543
 
{
544
 
  RecData data;
545
 
  data.rec_counter = rec_counter;
546
 
  return RecSetRecord(RECT_NULL, name, RECD_COUNTER, &data, NULL, lock);
547
 
}
548
 
 
549
 
 
550
 
//-------------------------------------------------------------------------
551
 
// RecReadStatsFile
552
 
//-------------------------------------------------------------------------
553
 
int
554
 
RecReadStatsFile()
555
 
{
556
 
  RecRecord *r;
557
 
  RecMessage *m;
558
 
  RecMessageItr itr;
559
 
 
560
 
  // lock our hash table
561
 
  ink_rwlock_wrlock(&g_records_rwlock);
562
 
 
563
 
  if ((m = RecMessageReadFromDisk(g_stats_snap_fpath)) != NULL) {
564
 
    if (RecMessageUnmarshalFirst(m, &itr, &r) != REC_ERR_FAIL) {
565
 
      do {
566
 
        if ((r->name == NULL) || (!strlen(r->name)))
567
 
          continue;
568
 
        RecSetRecord(r->rec_type, r->name, r->data_type, &(r->data), &(r->stat_meta.data_raw), false);
569
 
      } while (RecMessageUnmarshalNext(m, &itr, &r) != REC_ERR_FAIL);
570
 
    }
571
 
  }
572
 
 
573
 
  ink_rwlock_unlock(&g_records_rwlock);
574
 
  ats_free(m);
575
 
 
576
 
  return REC_ERR_OKAY;
577
 
}
578
 
 
579
 
 
580
 
//-------------------------------------------------------------------------
581
 
// RecSyncStatsFile
582
 
//-------------------------------------------------------------------------
583
 
int
584
 
RecSyncStatsFile()
585
 
{
586
 
  RecRecord *r;
587
 
  RecMessage *m;
588
 
  int i, num_records;
589
 
  bool sync_to_disk;
590
 
 
591
 
  // g_mode_type is defined in either RecLocal.cc or RecProcess.cc.
592
 
  // We can access it since we're inlined by on of these two files.
593
 
  if (g_mode_type == RECM_SERVER || g_mode_type == RECM_STAND_ALONE) {
594
 
    m = RecMessageAlloc(RECG_NULL);
595
 
    num_records = g_num_records;
596
 
    sync_to_disk = false;
597
 
    for (i = 0; i < num_records; i++) {
598
 
      r = &(g_records[i]);
599
 
      rec_mutex_acquire(&(r->lock));
600
 
      if (REC_TYPE_IS_STAT(r->rec_type)) {
601
 
        if (r->stat_meta.persist_type != RECP_NON_PERSISTENT) {
602
 
          m = RecMessageMarshal_Realloc(m, r);
603
 
          sync_to_disk = true;
604
 
        }
605
 
      }
606
 
      rec_mutex_release(&(r->lock));
607
 
    }
608
 
    if (sync_to_disk) {
609
 
      RecDebug(DL_Note, "Writing '%s' [%d bytes]", g_stats_snap_fpath, m->o_write - m->o_start + sizeof(RecMessageHdr));
610
 
      RecMessageWriteToDisk(m, g_stats_snap_fpath);
611
 
    }
612
 
    RecMessageFree(m);
613
 
  }
614
 
 
615
 
  return REC_ERR_OKAY;
616
 
}
617
 
 
618
 
 
619
 
//-------------------------------------------------------------------------
620
 
// RecReadConfigFile
621
 
//-------------------------------------------------------------------------
622
 
int
623
 
RecReadConfigFile()
624
 
{
625
 
  char *fbuf;
626
 
  int fsize;
627
 
 
628
 
  const char *line;
629
 
  int line_num;
630
 
 
631
 
  char *rec_type_str, *name_str, *data_type_str, *data_str;
632
 
  RecT rec_type;
633
 
  RecDataT data_type;
634
 
  RecData data;
635
 
 
636
 
  Tokenizer line_tok("\r\n");
637
 
  tok_iter_state line_tok_state;
638
 
 
639
 
  RecConfigFileEntry *cfe;
640
 
 
641
 
  RecDebug(DL_Note, "Reading '%s'", g_rec_config_fpath);
642
 
 
643
 
  // watch out, we're altering our g_rec_config_xxx structures
644
 
  ink_mutex_acquire(&g_rec_config_lock);
645
 
 
646
 
  if (RecFileImport_Xmalloc(g_rec_config_fpath, &fbuf, &fsize) == REC_ERR_FAIL) {
647
 
    RecLog(DL_Warning, "Could not import '%s'", g_rec_config_fpath);
648
 
    ink_mutex_release(&g_rec_config_lock);
649
 
    return REC_ERR_FAIL;
650
 
  }
651
 
  // clear our g_rec_config_contents_xxx structures
652
 
  while (!queue_is_empty(g_rec_config_contents_llq)) {
653
 
    cfe = (RecConfigFileEntry *) dequeue(g_rec_config_contents_llq);
654
 
    ats_free(cfe->entry);
655
 
    ats_free(cfe);
656
 
  }
657
 
  ink_hash_table_destroy(g_rec_config_contents_ht);
658
 
  g_rec_config_contents_ht = ink_hash_table_create(InkHashTableKeyType_String);
659
 
 
660
 
  // lock our hash table
661
 
  ink_rwlock_wrlock(&g_records_rwlock);
662
 
 
663
 
  memset(&data, 0, sizeof(RecData));
664
 
  line_tok.Initialize(fbuf, SHARE_TOKS);
665
 
  line = line_tok.iterFirst(&line_tok_state);
666
 
  line_num = 1;
667
 
  while (line) {
668
 
    char *lc = ats_strdup(line);
669
 
    char *lt = lc;
670
 
    char *ln;
671
 
 
672
 
    while (isspace(*lt))
673
 
      lt++;
674
 
    rec_type_str = ink_strtok_r(lt, " \t", &ln);
675
 
 
676
 
    // check for blank lines and comments
677
 
    if ((!rec_type_str) || (rec_type_str && (*rec_type_str == '#'))) {
678
 
      goto L_next_line;
679
 
    }
680
 
 
681
 
    name_str = ink_strtok_r(NULL, " \t", &ln);
682
 
    data_type_str = ink_strtok_r(NULL, " \t", &ln);
683
 
 
684
 
    // extract the string data (a little bit tricker since it can have spaces)
685
 
    if (ln) {
686
 
      // 'ln' will point to either the next token or a bunch of spaces
687
 
      // if the user didn't supply a value (e.g. 'STRING   ').  First
688
 
      // scan past all of the spaces.  If we hit a '\0', then we we
689
 
      // know we didn't have a valid value.  If not, set 'data_str' to
690
 
      // the start of the token and scan until we find the end.  Once
691
 
      // the end is found, back-peddle to remove any trailing spaces.
692
 
      while (isspace(*ln))
693
 
        ln++;
694
 
      if (*ln == '\0') {
695
 
        data_str = NULL;
696
 
      } else {
697
 
        data_str = ln;
698
 
        while (*ln != '\0')
699
 
          ln++;
700
 
        ln--;
701
 
        while (isspace(*ln) && (ln > data_str))
702
 
          ln--;
703
 
        ln++;
704
 
        *ln = '\0';
705
 
      }
706
 
    } else {
707
 
      data_str = NULL;
708
 
    }
709
 
 
710
 
    // check for errors
711
 
    if (!(rec_type_str && name_str && data_type_str && data_str)) {
712
 
      RecLog(DL_Warning, "Could not parse line at '%s:%d' -- skipping line: '%s'", g_rec_config_fpath, line_num, line);
713
 
      goto L_next_line;
714
 
    }
715
 
    // record type
716
 
    rec_type = RECT_NULL;
717
 
    if (strcmp(rec_type_str, "CONFIG") == 0) {
718
 
      rec_type = RECT_CONFIG;
719
 
    } else if (strcmp(rec_type_str, "PROCESS") == 0) {
720
 
      rec_type = RECT_PROCESS;
721
 
    } else if (strcmp(rec_type_str, "NODE") == 0) {
722
 
      rec_type = RECT_NODE;
723
 
    } else if (strcmp(rec_type_str, "CLUSTER") == 0) {
724
 
      rec_type = RECT_CLUSTER;
725
 
    } else if (strcmp(rec_type_str, "LOCAL") == 0) {
726
 
      rec_type = RECT_LOCAL;
727
 
    } else {
728
 
      RecLog(DL_Warning, "Unknown record type '%s' at '%s:%d' -- skipping line", rec_type_str, g_rec_config_fpath, line_num);
729
 
      goto L_next_line;
730
 
    }
731
 
 
732
 
    // data_type
733
 
    data_type = RECD_NULL;
734
 
    if (strcmp(data_type_str, "INT") == 0) {
735
 
      data_type = RECD_INT;
736
 
    } else if (strcmp(data_type_str, "FLOAT") == 0) {
737
 
      data_type = RECD_FLOAT;
738
 
    } else if (strcmp(data_type_str, "STRING") == 0) {
739
 
      data_type = RECD_STRING;
740
 
    } else if (strcmp(data_type_str, "COUNTER") == 0) {
741
 
      data_type = RECD_COUNTER;
742
 
    } else {
743
 
      RecLog(DL_Warning, "Unknown data type '%s' at '%s:%d' -- skipping line", data_type_str, g_rec_config_fpath, line_num);
744
 
      goto L_next_line;
745
 
    }
746
 
 
747
 
    // set the record
748
 
    RecDataSetFromString(data_type, &data, data_str);
749
 
    RecSetRecord(rec_type, name_str, data_type, &data, NULL, false);
750
 
    RecDataClear(data_type, &data);
751
 
 
752
 
    // update our g_rec_config_contents_xxx
753
 
    cfe = (RecConfigFileEntry *)ats_malloc(sizeof(RecConfigFileEntry));
754
 
    cfe->entry_type = RECE_RECORD;
755
 
    cfe->entry = ats_strdup(name_str);
756
 
    enqueue(g_rec_config_contents_llq, (void *) cfe);
757
 
    ink_hash_table_insert(g_rec_config_contents_ht, name_str, NULL);
758
 
    goto L_done;
759
 
 
760
 
  L_next_line:
761
 
    // store this line into g_rec_config_contents_llq so that we can
762
 
    // write it out later
763
 
    cfe = (RecConfigFileEntry *)ats_malloc(sizeof(RecConfigFileEntry));
764
 
    cfe->entry_type = RECE_COMMENT;
765
 
    cfe->entry = ats_strdup(line);
766
 
    enqueue(g_rec_config_contents_llq, (void *) cfe);
767
 
 
768
 
  L_done:
769
 
    line = line_tok.iterNext(&line_tok_state);
770
 
    line_num++;
771
 
    ats_free(lc);
772
 
  }
773
 
 
774
 
  // release our hash table
775
 
  ink_rwlock_unlock(&g_records_rwlock);
776
 
  ink_mutex_release(&g_rec_config_lock);
777
 
  ats_free(fbuf);
778
 
 
779
 
  return REC_ERR_OKAY;
780
 
}
781
 
 
782
 
 
783
 
//-------------------------------------------------------------------------
784
 
// RecSyncConfigFile
785
 
//-------------------------------------------------------------------------
786
 
int
787
 
RecSyncConfigToTB(textBuffer * tb)
788
 
{
789
 
  int err = REC_ERR_FAIL;
790
 
 
791
 
  // g_mode_type is defined in either RecLocal.cc or RecProcess.cc.
792
 
  // We can access it since we're inlined by on of these two files.
793
 
  if (g_mode_type == RECM_SERVER || g_mode_type == RECM_STAND_ALONE) {
794
 
    RecRecord *r;
795
 
    int i, num_records;
796
 
    RecConfigFileEntry *cfe;
797
 
    bool sync_to_disk;
798
 
 
799
 
    ink_mutex_acquire(&g_rec_config_lock);
800
 
 
801
 
    num_records = g_num_records;
802
 
    sync_to_disk = false;
803
 
    for (i = 0; i < num_records; i++) {
804
 
      r = &(g_records[i]);
805
 
      rec_mutex_acquire(&(r->lock));
806
 
      if (REC_TYPE_IS_CONFIG(r->rec_type)) {
807
 
        if (r->sync_required & REC_DISK_SYNC_REQUIRED) {
808
 
          if (!ink_hash_table_isbound(g_rec_config_contents_ht, r->name)) {
809
 
            cfe = (RecConfigFileEntry *)ats_malloc(sizeof(RecConfigFileEntry));
810
 
            cfe->entry_type = RECE_RECORD;
811
 
            cfe->entry = ats_strdup(r->name);
812
 
            enqueue(g_rec_config_contents_llq, (void *) cfe);
813
 
            ink_hash_table_insert(g_rec_config_contents_ht, r->name, NULL);
814
 
          }
815
 
          r->sync_required = r->sync_required & ~REC_DISK_SYNC_REQUIRED;
816
 
          sync_to_disk = true;
817
 
        }
818
 
      }
819
 
      rec_mutex_release(&(r->lock));
820
 
    }
821
 
 
822
 
    if (sync_to_disk) {
823
 
      char b[1024];
824
 
 
825
 
      // okay, we're going to write into our textBuffer
826
 
      err = REC_ERR_OKAY;
827
 
      tb->reUse();
828
 
 
829
 
      ink_rwlock_rdlock(&g_records_rwlock);
830
 
 
831
 
      LLQrec *llq_rec = g_rec_config_contents_llq->head;
832
 
      while (llq_rec != NULL) {
833
 
        cfe = (RecConfigFileEntry *) llq_rec->data;
834
 
        if (cfe->entry_type == RECE_COMMENT) {
835
 
          tb->copyFrom(cfe->entry, strlen(cfe->entry));
836
 
          tb->copyFrom("\n", 1);
837
 
        } else {
838
 
          if (ink_hash_table_lookup(g_records_ht, cfe->entry, (void **) &r)) {
839
 
            rec_mutex_acquire(&(r->lock));
840
 
            // rec_type
841
 
            switch (r->rec_type) {
842
 
            case RECT_CONFIG:
843
 
              tb->copyFrom("CONFIG ", 7);
844
 
              break;
845
 
            case RECT_PROCESS:
846
 
              tb->copyFrom("PROCESS ", 8);
847
 
              break;
848
 
            case RECT_NODE:
849
 
              tb->copyFrom("NODE ", 5);
850
 
              break;
851
 
            case RECT_CLUSTER:
852
 
              tb->copyFrom("CLUSTER ", 8);
853
 
              break;
854
 
            case RECT_LOCAL:
855
 
              tb->copyFrom("LOCAL ", 6);
856
 
              break;
857
 
            default:
858
 
              ink_debug_assert(!"Unexpected RecT type");
859
 
              break;
860
 
            }
861
 
            // name
862
 
            tb->copyFrom(cfe->entry, strlen(cfe->entry));
863
 
            tb->copyFrom(" ", 1);
864
 
            // data_type and value
865
 
            switch (r->data_type) {
866
 
            case RECD_INT:
867
 
              tb->copyFrom("INT ", 4);
868
 
              snprintf(b, 1023, "%" PRId64 "", r->data.rec_int);
869
 
              tb->copyFrom(b, strlen(b));
870
 
              break;
871
 
            case RECD_FLOAT:
872
 
              tb->copyFrom("FLOAT ", 6);
873
 
              snprintf(b, 1023, "%f", r->data.rec_float);
874
 
              tb->copyFrom(b, strlen(b));
875
 
              break;
876
 
            case RECD_STRING:
877
 
              tb->copyFrom("STRING ", 7);
878
 
              if (r->data.rec_string) {
879
 
                tb->copyFrom(r->data.rec_string, strlen(r->data.rec_string));
880
 
              } else {
881
 
                tb->copyFrom("NULL", strlen("NULL"));
882
 
              }
883
 
              break;
884
 
            case RECD_COUNTER:
885
 
              tb->copyFrom("COUNTER ", 8);
886
 
              snprintf(b, 1023, "%" PRId64 "", r->data.rec_counter);
887
 
              tb->copyFrom(b, strlen(b));
888
 
              break;
889
 
            default:
890
 
              ink_debug_assert(!"Unexpected RecD type");
891
 
              break;
892
 
            }
893
 
            tb->copyFrom("\n", 1);
894
 
            rec_mutex_release(&(r->lock));
895
 
          }
896
 
        }
897
 
        llq_rec = llq_rec->next;
898
 
      }
899
 
      ink_rwlock_unlock(&g_records_rwlock);
900
 
    }
901
 
    ink_mutex_release(&g_rec_config_lock);
902
 
  }
903
 
 
904
 
  return err;
905
 
}
906
 
 
907
 
 
908
 
//-------------------------------------------------------------------------
909
 
// RecExecConifgUpdateCbs
910
 
//-------------------------------------------------------------------------
911
 
int
912
 
RecExecConfigUpdateCbs()
913
 
{
914
 
  RecRecord *r;
915
 
  int i, num_records;
916
 
  unsigned int update_required_type;
917
 
 
918
 
#if defined (REC_LOCAL)
919
 
  update_required_type = REC_LOCAL_UPDATE_REQUIRED;
920
 
#elif defined (REC_PROCESS)
921
 
  update_required_type = REC_PROCESS_UPDATE_REQUIRED;
922
 
#else
923
 
#error "Required #define not specificed; expected REC_LOCAL or REC_PROCESS"
924
 
#endif
925
 
 
926
 
  num_records = g_num_records;
927
 
  for (i = 0; i < num_records; i++) {
928
 
    r = &(g_records[i]);
929
 
    rec_mutex_acquire(&(r->lock));
930
 
    if (REC_TYPE_IS_CONFIG(r->rec_type)) {
931
 
      /* -- upgrade to support a list of callback functions
932
 
         if ((r->config_meta.update_required & update_required_type) &&
933
 
         (r->config_meta.update_cb)) {
934
 
         (*(r->config_meta.update_cb))(r->name, r->data_type, r->data,
935
 
         r->config_meta.update_cookie);
936
 
         r->config_meta.update_required =
937
 
         r->config_meta.update_required & ~update_required_type;
938
 
         }
939
 
       */
940
 
 
941
 
      if ((r->config_meta.update_required & update_required_type) && (r->config_meta.update_cb_list)) {
942
 
        RecConfigUpdateCbList *cur_callback = NULL;
943
 
        for (cur_callback = r->config_meta.update_cb_list; cur_callback; cur_callback = cur_callback->next) {
944
 
          (*(cur_callback->update_cb)) (r->name, r->data_type, r->data, cur_callback->update_cookie);
945
 
        }
946
 
        r->config_meta.update_required = r->config_meta.update_required & ~update_required_type;
947
 
      }
948
 
    }
949
 
    rec_mutex_release(&(r->lock));
950
 
  }
951
 
 
952
 
  return REC_ERR_OKAY;
953
 
}
954
 
 
955
 
 
956
 
//------------------------------------------------------------------------
957
 
// RecResetStatRecord
958
 
//------------------------------------------------------------------------
959
 
int
960
 
RecResetStatRecord(char *name)
961
 
{
962
 
  RecRecord *r1 = NULL;
963
 
  int err = REC_ERR_OKAY;
964
 
 
965
 
  if (ink_hash_table_lookup(g_records_ht, name, (void **) &r1)) {
966
 
    if (i_am_the_record_owner(r1->rec_type)) {
967
 
      rec_mutex_acquire(&(r1->lock));
968
 
      RecDataSet(r1->data_type, &(r1->data), &(r1->data_default));
969
 
      rec_mutex_release(&(r1->lock));
970
 
      err = REC_ERR_OKAY;
971
 
    } else {
972
 
      RecRecord r2;
973
 
      memset(&r2, 0, sizeof(RecRecord));
974
 
      r2.rec_type = r1->rec_type;
975
 
      r2.name = r1->name;
976
 
      r2.data_type = r1->data_type;
977
 
      r2.data = r1->data_default;
978
 
 
979
 
      err = send_set_message(&r2);
980
 
    }
981
 
  } else {
982
 
    err = REC_ERR_FAIL;
983
 
  }
984
 
 
985
 
  return err;
986
 
}
987
 
 
988
 
 
989
 
//------------------------------------------------------------------------
990
 
// RecResetStatRecord
991
 
//------------------------------------------------------------------------
992
 
int
993
 
RecResetStatRecord(RecT type, bool all)
994
 
{
995
 
  int i, num_records;
996
 
  int err = REC_ERR_OKAY;
997
 
 
998
 
  RecDebug(DL_Note, "Reset Statistics Records");
999
 
 
1000
 
  num_records = g_num_records;
1001
 
  for (i = 0; i < num_records; i++) {
1002
 
    RecRecord *r1 = &(g_records[i]);
1003
 
 
1004
 
    if (REC_TYPE_IS_STAT(r1->rec_type) && ((type == RECT_NULL) || (r1->rec_type == type)) &&
1005
 
        (all || (r1->stat_meta.persist_type != RECP_NON_PERSISTENT)) &&
1006
 
        (r1->data_type != RECD_STRING)) {
1007
 
      if (i_am_the_record_owner(r1->rec_type)) {
1008
 
        rec_mutex_acquire(&(r1->lock));
1009
 
        if (!RecDataSet(r1->data_type, &(r1->data), &(r1->data_default))) {
1010
 
          err = REC_ERR_FAIL;
1011
 
        }
1012
 
        rec_mutex_release(&(r1->lock));
1013
 
      } else {
1014
 
        RecRecord r2;
1015
 
        memset(&r2, 0, sizeof(RecRecord));
1016
 
        r2.rec_type = r1->rec_type;
1017
 
        r2.name = r1->name;
1018
 
        r2.data_type = r1->data_type;
1019
 
        r2.data = r1->data_default;
1020
 
 
1021
 
        err = send_set_message(&r2);
1022
 
      }
1023
 
    }
1024
 
  }
1025
 
  return err;
1026
 
}
1027
 
 
1028
 
 
1029
 
int
1030
 
RecSetSyncRequired(char *name, bool lock)
1031
 
{
1032
 
  int err = REC_ERR_FAIL;
1033
 
  RecRecord *r1;
1034
 
 
1035
 
  // FIXME: Most of the time we set, we don't actually need to wrlock
1036
 
  // since we are not modifying the g_records_ht.
1037
 
  if (lock) {
1038
 
    ink_rwlock_wrlock(&g_records_rwlock);
1039
 
  }
1040
 
 
1041
 
  if (ink_hash_table_lookup(g_records_ht, name, (void **) &r1)) {
1042
 
    if (i_am_the_record_owner(r1->rec_type)) {
1043
 
      rec_mutex_acquire(&(r1->lock));
1044
 
      r1->sync_required = REC_SYNC_REQUIRED;
1045
 
      if (REC_TYPE_IS_CONFIG(r1->rec_type)) {
1046
 
        r1->config_meta.update_required = REC_UPDATE_REQUIRED;
1047
 
      }
1048
 
      rec_mutex_release(&(r1->lock));
1049
 
      err = REC_ERR_OKAY;
1050
 
    } else {
1051
 
      // No point of doing the following because our peer will
1052
 
      // set the value with RecDataSet. However, since
1053
 
      // r2.name == r1->name, the sync_required bit will not be
1054
 
      // set.
1055
 
 
1056
 
      /*
1057
 
         RecRecord r2;
1058
 
         memset(&r2, 0, sizeof(RecRecord));
1059
 
         r2.rec_type  = r1->rec_type;
1060
 
         r2.name      = r1->name;
1061
 
         r2.data_type = r1->data_type;
1062
 
         r2.data      = r1->data_default;
1063
 
 
1064
 
         err = send_set_message(&r2);
1065
 
       */
1066
 
    }
1067
 
  }
1068
 
 
1069
 
  if (lock) {
1070
 
    ink_rwlock_unlock(&g_records_rwlock);
1071
 
  }
1072
 
 
1073
 
  return err;
1074
 
}