~ps10gel/ubuntu/xenial/trafficserver/6.2.0

« back to all changes in this revision

Viewing changes to proxy/ICP.cc

  • Committer: Bazaar Package Importer
  • Author(s): Arno Toell
  • Date: 2011-01-13 11:49:18 UTC
  • Revision ID: james.westby@ubuntu.com-20110113114918-vu422h8dknrgkj15
Tags: upstream-2.1.5-unstable
ImportĀ upstreamĀ versionĀ 2.1.5-unstable

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/** @file
 
2
 
 
3
  A brief file description
 
4
 
 
5
  @section license License
 
6
 
 
7
  Licensed to the Apache Software Foundation (ASF) under one
 
8
  or more contributor license agreements.  See the NOTICE file
 
9
  distributed with this work for additional information
 
10
  regarding copyright ownership.  The ASF licenses this file
 
11
  to you under the Apache License, Version 2.0 (the
 
12
  "License"); you may not use this file except in compliance
 
13
  with the License.  You may obtain a copy of the License at
 
14
 
 
15
      http://www.apache.org/licenses/LICENSE-2.0
 
16
 
 
17
  Unless required by applicable law or agreed to in writing, software
 
18
  distributed under the License is distributed on an "AS IS" BASIS,
 
19
  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
20
  See the License for the specific language governing permissions and
 
21
  limitations under the License.
 
22
 */
 
23
 
 
24
#include "ink_unused.h"  /* MAGIC_EDITING_TAG */
 
25
 
 
26
 
 
27
/****************************************************************************
 
28
 
 
29
  ICP.cc
 
30
 
 
31
 
 
32
****************************************************************************/
 
33
 
 
34
#include "libts.h"
 
35
#include "Main.h"
 
36
#include "P_EventSystem.h"
 
37
#include "P_Cache.h"
 
38
#include "P_Net.h"
 
39
#include "MgmtUtils.h"
 
40
#include "P_RecProcess.h"
 
41
#include "ICP.h"
 
42
#include "ICPProcessor.h"
 
43
#include "ICPlog.h"
 
44
#include "logging/Log.h"
 
45
#include "logging/LogAccessICP.h"
 
46
#include "BaseManager.h"
 
47
#include "HdrUtils.h"
 
48
 
 
49
#if defined (_WIN32)
 
50
extern long glShutdownInProgress;
 
51
#endif
 
52
 
 
53
extern CacheLookupHttpConfig global_cache_lookup_config;
 
54
HTTPHdr gclient_request;
 
55
 
 
56
//****************************************************************************
 
57
//  File Overview:
 
58
//  ==============
 
59
//      ICP files
 
60
//        ICP.h           -- All ICP class definitions.
 
61
//        ICPlog.h        -- ICP log object for logging system
 
62
//        ICP.cc          -- Incoming/outgoing ICP request and ICP configuration
 
63
//                           data base management.
 
64
//        ICPConfig.cc    -- ICP interface to Traffic Server configuration
 
65
//                           management, member functions for ICPlog (object
 
66
//                           passed to logging system) along with
 
67
//                           miscellaneous support routines.
 
68
//        ICPevents.h     -- Event definitions specific to ICP.
 
69
//        ICPProcessor.h  -- ICP external interface for other subsystems.
 
70
//                           External subsystems only need to include this
 
71
//                           header to use ICP.
 
72
//        ICPProcessor.cc -- ICP external interface implementation.
 
73
//        ICPStats.cc     -- ICP statistic callback registration.
 
74
//
 
75
//
 
76
//  Class Overview:
 
77
//  ===============
 
78
//    ICPConfigData  -- Manages global ICP data from the TS configuration
 
79
//                      manager.
 
80
//    PeerConfigData -- Manages  ICP peer data from the TS configuration
 
81
//                      manager.
 
82
//    ICPConfigUpdateCont -- Used by
 
83
//                        ICPConfiguration::icp_config_change_callback()
 
84
//                        to retry callout after a delay in cases where
 
85
//                        we cannot acquire the configuration lock.
 
86
//    ICPConfiguration -- Overall manager of ICP configuration from TS
 
87
//                        configuration.  Acts as interface and uses
 
88
//                        ICPConfigData and PeerConfigData to implement
 
89
//                        actions.  Also fields/processes TS configuration
 
90
//                        callouts for "icp.config" changes.  ICP classes only
 
91
//                        see ICPConfiguration when dealing with TS
 
92
//                        configuration info.
 
93
//
 
94
//    Peer (base class) -- abstract base class
 
95
//      ParentSiblingPeer : Peer  -- ICP object describing parent/sibling
 
96
//                                   peer which is initialized from the
 
97
//                                   TS configuration data.
 
98
//      MultiCastPeer : Peer -- ICP object describing MultiCast peer.
 
99
//                              Object is initialized from the TS
 
100
//                              configuration data.
 
101
//
 
102
//    BitMap -- Generic bit map management class
 
103
//
 
104
//    ICPProcessor -- Central class which starts all periodic events
 
105
//                 and maintains ICP configuration database.  Delegates
 
106
//                 incoming data processing to ICPHandlerCont and
 
107
//                 outgoing data processing to ICPRequestCont. Implements
 
108
//                 reconfiguration actions and query requests from the
 
109
//                 external interface.
 
110
//
 
111
//    ICPRequestCont -- Implements the state machine which processes
 
112
//                 locally generated ICP queries.  Generates message
 
113
//                 queries and processes query responses.  Responses
 
114
//                 received via callout from ICPPeerReadCont.
 
115
//
 
116
//    PeriodicCont (base class) -- abstract base class
 
117
//      ICPPeriodicCont : PeriodicCont -- Periodic which looks for ICP
 
118
//                 configuration changes sent by the Traffic Server
 
119
//                 configuration manager, and initiates ICP reconfiguration
 
120
//                 in the event we have a valid configuration change via
 
121
//                 ICPProcessor::ReconfigureStateMachine().
 
122
//
 
123
//      ICPHandlerCont : PeriodicCont -- Periodic which monitors incoming
 
124
//                 ICP sockets and starts processing of the incoming ICP data.
 
125
//
 
126
//    ICPPeerReadCont -- Implements the incoming data state machine.
 
127
//                 Processes remote ICP query requests and passes query
 
128
//                 responses to ICPRequestCont via a callout.
 
129
//    ICPlog -- Logging object which encapsulates ICP query info required
 
130
//              by the new logging subsystem to produce squid access log
 
131
//              data for ICP queries.
 
132
//
 
133
//****************************************************************************
 
134
//
 
135
//  ICP is integrated into HTTP miss processing as follows.
 
136
//
 
137
//  if (HTTP Traffic Server Miss) {
 
138
//    if (proxy.config.icp.enabled) {
 
139
//      Status = QueryICP(URL, &target_ip);
 
140
//      if (Status == ICP_HIT)
 
141
//        Issue Http Request to (target_ip, proxy_port);
 
142
//    }
 
143
//    if (proxy.config.http.parent_proxy_routing_enable) {
 
144
//      Issue Http Request to (proxy.config.http.parent_proxy_hostname,
 
145
//                             proxy.config.http.parent_proxy_port)
 
146
//    }
 
147
//    else
 
148
//      Issue Http Request to Origin Server
 
149
//  }
 
150
//
 
151
//****************************************************************************
 
152
 
 
153
// VC++ 5.0 is rather picky
 
154
typedef int (ICPPeerReadCont::*ICPPeerReadContHandler) (int, void *);
 
155
typedef int (ICPPeriodicCont::*ICPPeriodicContHandler) (int, void *);
 
156
typedef int (ICPHandlerCont::*ICPHandlerContHandler) (int, void *);
 
157
typedef int (ICPRequestCont::*ICPRequestContHandler) (int, void *);
 
158
 
 
159
// Plugin freshness function
 
160
PluginFreshnessCalcFunc pluginFreshnessCalcFunc = (PluginFreshnessCalcFunc) NULL;
 
161
 
 
162
//---------------------------------------
 
163
// Class ICPHandlerCont member functions
 
164
//      Deal with incoming ICP data
 
165
//---------------------------------------
 
166
 
 
167
// Static data declarations
 
168
//Allocator *ICPHandlerCont::IncomingICPDataBuf;
 
169
int64_t ICPHandlerCont::ICPDataBuf_IOBuffer_sizeindex;
 
170
static ClassAllocator <ICPPeerReadCont::PeerReadData>PeerReadDataAllocator("PeerReadDataAllocator");
 
171
static ClassAllocator<ICPPeerReadCont> ICPPeerReadContAllocator("ICPPeerReadContAllocator");
 
172
 
 
173
static Action *default_action = NULL;
 
174
 
 
175
 
 
176
ICPHandlerCont::ICPHandlerCont(ICPProcessor * icpP)
 
177
 : PeriodicCont(icpP)
 
178
{
 
179
}
 
180
 
 
181
// do nothing continuation handler
 
182
int
 
183
ICPHandlerCont::TossEvent(int event, Event * e)
 
184
{
 
185
  NOWARN_UNUSED(event);
 
186
  NOWARN_UNUSED(e);
 
187
  return EVENT_DONE;
 
188
}
 
189
 
 
190
int
 
191
ICPHandlerCont::PeriodicEvent(int event, Event * e)
 
192
{
 
193
  NOWARN_UNUSED(event);
 
194
  NOWARN_UNUSED(e);
 
195
  int n_peer, valid_peers;
 
196
  Peer *P;
 
197
 
 
198
  // Periodic handler which initiates incoming message processing
 
199
  // on the defined peers.
 
200
 
 
201
  valid_peers = _ICPpr->GetRecvPeers();
 
202
 
 
203
  // get peer info from the completionEvent token.
 
204
  switch (event) {
 
205
  case EVENT_POLL:
 
206
  case EVENT_INTERVAL:
 
207
    {
 
208
      // start read I/Os on peers which don't have outstanding I/Os
 
209
      for (n_peer = 0; n_peer < valid_peers; ++n_peer) {
 
210
        P = _ICPpr->GetNthRecvPeer(n_peer, _ICPpr->GetLastRecvPeerBias());
 
211
        if (!P || (P && !P->IsOnline()))
 
212
          continue;
 
213
        if (P->shouldStartRead()) {
 
214
          P->startingRead();
 
215
          ///////////////////////////////////////////
 
216
          // Setup state machine
 
217
          ///////////////////////////////////////////
 
218
          ICPPeerReadCont *s = ICPPeerReadContAllocator.alloc();
 
219
          int local_lookup = _ICPpr->GetConfig()->globalConfig()->ICPLocalCacheLookup();
 
220
 
 
221
          s->init(_ICPpr, P, local_lookup);
 
222
          RECORD_ICP_STATE_CHANGE(s, event, ICPPeerReadCont::READ_ACTIVE);
 
223
 
 
224
          ///////////////////////////////////////////
 
225
          // Start processing
 
226
          ///////////////////////////////////////////
 
227
          s->handleEvent(EVENT_INTERVAL, (Event *) 0);
 
228
        }
 
229
      }
 
230
      break;
 
231
    }
 
232
  default:
 
233
    {
 
234
      ink_release_assert(!"unexpected event");
 
235
      break;
 
236
    }
 
237
  }                             // End of switch
 
238
  return EVENT_CONT;
 
239
}
 
240
 
 
241
//***************************************************************************
 
242
// Nested Class PeerReadData member functions
 
243
//      Used by ICPPeerReadCont to encapsulate the data required by
 
244
//      PeerReadStateMachine
 
245
//***************************************************************************
 
246
ICPPeerReadCont::PeerReadData::PeerReadData()
 
247
{
 
248
  init();
 
249
}
 
250
 
 
251
void
 
252
ICPPeerReadCont::PeerReadData::init()
 
253
{
 
254
  _start_time = 0;
 
255
  _mycont = 0;
 
256
  _peer = 0;
 
257
  _next_state = READ_ACTIVE;
 
258
  _cache_lookup_local = 0;
 
259
  _buf = 0;
 
260
  _rICPmsg = 0;
 
261
  _rICPmsg_len = 0;
 
262
  _cachelookupURL.clear();
 
263
  _queryResult = 0;
 
264
  _ICPReqCont = 0;
 
265
  _bytesReceived = 0;
 
266
#ifdef DEBUG_ICP
 
267
  _nhistory = 0;
 
268
#endif
 
269
  memset((void *) &_sender, 0, sizeof(_sender));
 
270
}
 
271
 
 
272
ICPPeerReadCont::PeerReadData::~PeerReadData()
 
273
{
 
274
  reset(1);
 
275
}
 
276
 
 
277
void
 
278
ICPPeerReadCont::PeerReadData::reset(int full_reset)
 
279
{
 
280
  if (full_reset) {
 
281
    _peer = 0;
 
282
    _buf = 0;
 
283
  }
 
284
  if (_rICPmsg) {
 
285
    _rICPmsg = 0;
 
286
    _rICPmsg_len = 0;
 
287
  }
 
288
 
 
289
  if (_cachelookupURL.valid()) {
 
290
    _cachelookupURL.destroy();
 
291
  }
 
292
}
 
293
 
 
294
//***************************************************************************
 
295
 
 
296
//------------------------------------------------------------------------
 
297
// ICPPeerReadCont -- ICP incoming message processing state machine
 
298
//------------------------------------------------------------------------
 
299
ICPPeerReadCont::ICPPeerReadCont():Continuation(0), _object_vc(NULL), _object_read(NULL),
 
300
_cache_req_hdr_heap_handle(NULL), _cache_resp_hdr_heap_handle(NULL), _ICPpr(NULL), _state(NULL),
 
301
_start_time(0), _recursion_depth(0)
 
302
{
 
303
}
 
304
 
 
305
void
 
306
ICPPeerReadCont::init(ICPProcessor * ICPpr, Peer * p, int lookup_local)
 
307
{
 
308
  PeerReadData *s = PeerReadDataAllocator.alloc();
 
309
  s->init();
 
310
  s->_start_time = ink_get_hrtime();
 
311
  s->_peer = p;
 
312
  s->_next_state = READ_ACTIVE;
 
313
  s->_cache_lookup_local = lookup_local;
 
314
  SET_HANDLER((ICPPeerReadContHandler) & ICPPeerReadCont::ICPPeerReadEvent);
 
315
  _ICPpr = ICPpr;
 
316
  _state = s;
 
317
  _recursion_depth = -1;
 
318
  _object_vc = NULL;
 
319
  _object_read = NULL;
 
320
  _cache_req_hdr_heap_handle = NULL;
 
321
  _cache_resp_hdr_heap_handle = NULL;
 
322
  mutex = new_ProxyMutex();
 
323
}
 
324
 
 
325
ICPPeerReadCont::~ICPPeerReadCont()
 
326
{
 
327
  reset(1);                     // Full reset
 
328
}
 
329
 
 
330
void
 
331
ICPPeerReadCont::reset(int full_reset)
 
332
{
 
333
  mutex = 0;
 
334
  if (this->_state) {
 
335
    this->_state->reset(full_reset);
 
336
    PeerReadDataAllocator.free(this->_state);
 
337
  }
 
338
  if (_cache_req_hdr_heap_handle) {
 
339
    xfree(_cache_req_hdr_heap_handle);
 
340
    _cache_req_hdr_heap_handle = NULL;
 
341
  }
 
342
  if (_cache_resp_hdr_heap_handle) {
 
343
    xfree(_cache_resp_hdr_heap_handle);
 
344
    _cache_resp_hdr_heap_handle = NULL;
 
345
  }
 
346
}
 
347
 
 
348
int
 
349
ICPPeerReadCont::ICPPeerReadEvent(int event, Event * e)
 
350
{
 
351
  switch (event) {
 
352
  case EVENT_INTERVAL:
 
353
  case EVENT_IMMEDIATE:
 
354
    {
 
355
      break;
 
356
    }
 
357
  case NET_EVENT_DATAGRAM_WRITE_COMPLETE:
 
358
  case NET_EVENT_DATAGRAM_READ_COMPLETE:
 
359
  case NET_EVENT_DATAGRAM_READ_ERROR:
 
360
  case NET_EVENT_DATAGRAM_WRITE_ERROR:
 
361
    {
 
362
      ink_assert((event != NET_EVENT_DATAGRAM_READ_COMPLETE)
 
363
                 || (_state->_next_state == READ_DATA_DONE));
 
364
      ink_assert((event != NET_EVENT_DATAGRAM_WRITE_COMPLETE)
 
365
                 || (_state->_next_state == WRITE_DONE));
 
366
 
 
367
      ink_release_assert(this == (ICPPeerReadCont *)
 
368
                         completionUtil::getHandle(e));
 
369
      break;
 
370
    }
 
371
  case CACHE_EVENT_LOOKUP_FAILED:
 
372
  case CACHE_EVENT_LOOKUP:
 
373
    {
 
374
      ink_assert(_state->_next_state == AWAITING_CACHE_LOOKUP_RESPONSE);
 
375
      break;
 
376
    }
 
377
  default:
 
378
    {
 
379
      ink_release_assert(!"unexpected event");
 
380
    }
 
381
  }                             // End of switch
 
382
 
 
383
  // Front end to PeerReadStateMachine(), invoked by Event subsystem.
 
384
  if (PeerReadStateMachine(_state, e) == EVENT_CONT) {
 
385
    eventProcessor.schedule_in(this, RETRY_INTERVAL, ET_ICP);
 
386
    return EVENT_DONE;
 
387
 
 
388
  } else if (_state->_next_state == READ_PROCESSING_COMPLETE) {
 
389
    _state->_peer->cancelRead();
 
390
    this->reset(1);             // Full reset
 
391
    ICPPeerReadContAllocator.free(this);
 
392
    return EVENT_DONE;
 
393
 
 
394
  } else {
 
395
    return EVENT_DONE;
 
396
  }
 
397
}
 
398
 
 
399
int
 
400
ICPPeerReadCont::StaleCheck(int event, Event * e)
 
401
{
 
402
  NOWARN_UNUSED(e);
 
403
  ink_release_assert(mutex->thread_holding == this_ethread());
 
404
 
 
405
  Debug("icp-stale", "Stale check res=%d for id=%d, [%s] from [%s:%d]",
 
406
        event, _state->_rICPmsg->h.requestno,
 
407
        _state->_rICPmsg->un.query.URL, inet_ntoa(_state->_sender.sin_addr), ntohs(_state->_sender.sin_port));
 
408
 
 
409
  switch (event) {
 
410
  case ICP_STALE_OBJECT:
 
411
    {
 
412
      _state->_queryResult = CACHE_EVENT_LOOKUP_FAILED;
 
413
      break;
 
414
    }
 
415
  case ICP_FRESH_OBJECT:
 
416
    {
 
417
      _state->_queryResult = CACHE_EVENT_LOOKUP;
 
418
      break;
 
419
    }
 
420
  default:
 
421
    {
 
422
      Debug("icp-stale", "ICPPeerReadCont::StaleCheck: Invalid Event %d\n", event);
 
423
      _state->_queryResult = CACHE_EVENT_LOOKUP_FAILED;
 
424
      break;
 
425
    }
 
426
  }
 
427
  _object_vc->do_io(VIO::CLOSE);
 
428
  _object_vc = 0;
 
429
  SET_HANDLER((ICPPeerReadContHandler) & ICPPeerReadCont::ICPPeerReadEvent);
 
430
  return handleEvent(_state->_queryResult, 0);
 
431
}
 
432
 
 
433
int
 
434
ICPPeerReadCont::ICPPeerQueryEvent(int event, Event * e)
 
435
{
 
436
  NOWARN_UNUSED(e);
 
437
  Debug("icp", "Remote Query lookup res=%d for id=%d, [%s] from [%s:%d]",
 
438
        event, _state->_rICPmsg->h.requestno,
 
439
        _state->_rICPmsg->un.query.URL, inet_ntoa(_state->_sender.sin_addr), ntohs(_state->_sender.sin_port));
 
440
  if (pluginFreshnessCalcFunc) {
 
441
    switch (event) {
 
442
    case CACHE_EVENT_OPEN_READ:
 
443
      {
 
444
        _object_vc = (CacheVConnection *) e;
 
445
        SET_HANDLER((ICPPeerReadContHandler) & ICPPeerReadCont::StaleCheck);
 
446
        _object_vc->get_http_info(&_object_read);
 
447
        (*pluginFreshnessCalcFunc) ((void *) this);
 
448
        return EVENT_DONE;
 
449
      }
 
450
    case CACHE_EVENT_OPEN_READ_FAILED:
 
451
      {
 
452
        event = CACHE_EVENT_LOOKUP_FAILED;
 
453
        break;
 
454
      }
 
455
    default:
 
456
      break;
 
457
    }
 
458
  }
 
459
  // Process result
 
460
  _state->_queryResult = event;
 
461
  SET_HANDLER((ICPPeerReadContHandler) & ICPPeerReadCont::ICPPeerReadEvent);
 
462
  return handleEvent(event, e);
 
463
}
 
464
 
 
465
int
 
466
ICPPeerReadCont::ICPPeerQueryCont(int event, Event * e)
 
467
{
 
468
  NOWARN_UNUSED(event);
 
469
  NOWARN_UNUSED(e);
 
470
 
 
471
  Action *a;
 
472
 
 
473
  // Perform lookup()/open_read() on behalf of PeerReadStateMachine()
 
474
 
 
475
  ((char *) _state->_rICPmsg)[MAX_ICP_MSGSIZE - 1] = 0; // null terminate
 
476
  _state->_cachelookupURL.create(NULL);
 
477
  const char *qurl = (const char *) _state->_rICPmsg->un.query.URL;
 
478
  _state->_cachelookupURL.parse(qurl, strlen(qurl));
 
479
  Debug("icp", "Remote Query for id=%d, [%s] from [%s:%d]",
 
480
        _state->_rICPmsg->h.requestno,
 
481
        _state->_rICPmsg->un.query.URL, inet_ntoa(_state->_sender.sin_addr), ntohs(_state->_sender.sin_port));
 
482
 
 
483
  SET_HANDLER((ICPPeerReadContHandler) & ICPPeerReadCont::ICPPeerQueryEvent);
 
484
  if (_state->_rICPmsg->un.query.URL && *_state->_rICPmsg->un.query.URL) {
 
485
    _state->_queryResult = ~CACHE_EVENT_LOOKUP_FAILED;
 
486
    _start_time = ink_get_hrtime();
 
487
    if (pluginFreshnessCalcFunc && _ICPpr->GetConfig()->globalConfig()->ICPStaleLookup()) {
 
488
      //////////////////////////////////////////////////////////////
 
489
      // Note: _cache_lookup_local is ignored in this case, since
 
490
      //       cache clustering is not used with stale lookup.
 
491
      //////////////////////////////////////////////////////////////
 
492
      a = cacheProcessor.open_read(this, &_state->_cachelookupURL,
 
493
                                   &gclient_request, &global_cache_lookup_config, (time_t) 0);
 
494
    } else {
 
495
      a = cacheProcessor.lookup(this, &_state->_cachelookupURL, _state->_cache_lookup_local);
 
496
    }
 
497
    if (!a) {
 
498
      a = ACTION_IO_ERROR;
 
499
    }
 
500
    if (a == ACTION_RESULT_DONE) {
 
501
      return EVENT_DONE;        // callback complete
 
502
    } else if (a == ACTION_IO_ERROR) {
 
503
      handleEvent(CACHE_EVENT_LOOKUP_FAILED, 0);
 
504
      return EVENT_DONE;        // callback complete
 
505
    } else {
 
506
      return EVENT_CONT;        // callback pending
 
507
    }
 
508
  } else {
 
509
    // Null URL, return failed lookup
 
510
    handleEvent(CACHE_EVENT_LOOKUP_FAILED, 0);
 
511
    return EVENT_DONE;          // callback done
 
512
  }
 
513
}
 
514
 
 
515
struct AutoReference
 
516
{
 
517
  AutoReference(int *cnt)
 
518
  {
 
519
    _cnt = cnt;
 
520
    (*_cnt)++;
 
521
  }
 
522
   ~AutoReference()
 
523
  {
 
524
    (*_cnt)--;
 
525
  }
 
526
  int *_cnt;
 
527
};
 
528
 
 
529
int
 
530
ICPPeerReadCont::PeerReadStateMachine(PeerReadData * s, Event * e)
 
531
{
 
532
  AutoReference l(&_recursion_depth);
 
533
  //-----------------------------------------------------------
 
534
  // State machine to process ICP data received on UDP socket
 
535
  //-----------------------------------------------------------
 
536
  MUTEX_TRY_LOCK(lock, this->mutex, this_ethread());
 
537
  if (!lock) {
 
538
    // we didn't get the lock, so we don't need to unlock it
 
539
    // coverity[missing_unlock]
 
540
    return EVENT_CONT;          // try again later
 
541
  }
 
542
 
 
543
  while (1) {                   // loop forever
 
544
 
 
545
    switch (s->_next_state) {
 
546
    case READ_ACTIVE:
 
547
      {
 
548
        ink_release_assert(_recursion_depth == 0);
 
549
        if (!_ICPpr->Lock())
 
550
          return EVENT_CONT;    // unable to get lock, try again later
 
551
 
 
552
        bool valid_peer = (_ICPpr->IdToPeer(s->_peer->GetPeerID()) == s->_peer);
 
553
 
 
554
        if (valid_peer && _ICPpr->AllowICPQueries()
 
555
            && _ICPpr->GetConfig()->globalConfig()->ICPconfigured()) {
 
556
 
 
557
          // Note pending incoming ICP request or response
 
558
          _ICPpr->IncPendingQuery();
 
559
          _ICPpr->Unlock();
 
560
 
 
561
          s->_next_state = READ_DATA;
 
562
          RECORD_ICP_STATE_CHANGE(s, 0, READ_DATA);
 
563
          break;                // move to next_state
 
564
 
 
565
        } else {
 
566
          _ICPpr->Unlock();
 
567
 
 
568
          // ICP NOT enabled, do nothing
 
569
          s->_next_state = READ_PROCESSING_COMPLETE;
 
570
          RECORD_ICP_STATE_CHANGE(s, 0, READ_PROCESSING_COMPLETE);
 
571
          return EVENT_DONE;
 
572
        }
 
573
      }
 
574
#if !defined(_WIN32) && !defined(__GNUC__)
 
575
    _end_case_read_active:     // fix DEC warnings
 
576
#endif
 
577
      ink_release_assert(0);    // Should never happen
 
578
 
 
579
    case READ_DATA:
 
580
      {
 
581
        ink_release_assert(_recursion_depth == 0);
 
582
 
 
583
        // Assumption of one outstanding read per peer...
 
584
        // Setup read from FD
 
585
        ink_assert(s->_peer->buf == NULL);
 
586
        Ptr<IOBufferBlock> buf = s->_peer->buf = new_IOBufferBlock();
 
587
        buf->alloc(ICPHandlerCont::ICPDataBuf_IOBuffer_sizeindex);
 
588
        s->_peer->fromaddrlen = sizeof(s->_peer->fromaddr);
 
589
        buf->fill(sizeof(ICPMsg_t));    // reserve space for decoding
 
590
        char *be = buf->buf_end() - 1;
 
591
        be[0] = 0;              // null terminate buffer
 
592
        s->_next_state = READ_DATA_DONE;
 
593
        RECORD_ICP_STATE_CHANGE(s, 0, READ_DATA_DONE);
 
594
        ink_assert(s->_peer->readAction == NULL);
 
595
        Action *a = s->_peer->RecvFrom_re(this, this, buf,
 
596
                                          buf->write_avail() - 1,
 
597
                                          (struct sockaddr *)
 
598
                                          &s->_peer->fromaddr,
 
599
                                          &s->_peer->fromaddrlen);
 
600
        if (!a) {
 
601
          a = ACTION_IO_ERROR;
 
602
        }
 
603
        if (a == ACTION_RESULT_DONE) {
 
604
          // we will have been called back already and our state updated
 
605
          // appropriately.
 
606
          // move to next state
 
607
          ink_assert(s->_next_state == PROCESS_READ_DATA);
 
608
          break;
 
609
        } else if (a == ACTION_IO_ERROR) {
 
610
          // actually, this *could* be taken care of by the main handler, but
 
611
          // error processing makes more sense at this point.  Therefore,
 
612
          // the main handler ignores the errors.
 
613
          //
 
614
          // No data, terminate read loop.
 
615
          //
 
616
          ICP_INCREMENT_DYN_STAT(no_data_read_stat);
 
617
          s->_peer->buf = NULL; // release reference
 
618
          s->_next_state = READ_NOT_ACTIVE_EXIT;
 
619
          RECORD_ICP_STATE_CHANGE(s, 0, READ_NOT_ACTIVE_EXIT);
 
620
          // move to next state
 
621
          break;
 
622
        } else {
 
623
          s->_peer->readAction = a;
 
624
          return EVENT_DONE;
 
625
        }
 
626
      }
 
627
#if !defined(_WIN32) && !defined(__GNUC__)
 
628
    _end_case_read_data:       // fix DEC warnings
 
629
#endif
 
630
      ink_release_assert(0);    // Should never happen
 
631
 
 
632
    case READ_DATA_DONE:
 
633
      {
 
634
        // Convert ICP message from network to host format
 
635
        if (s->_peer->readAction != NULL) {
 
636
          ink_assert(s->_peer->readAction == e);
 
637
          s->_peer->readAction = NULL;
 
638
        }
 
639
        s->_bytesReceived = completionUtil::getBytesTransferred(e);
 
640
 
 
641
        if (s->_bytesReceived >= 0) {
 
642
          s->_next_state = PROCESS_READ_DATA;
 
643
          RECORD_ICP_STATE_CHANGE(s, 0, PROCESS_READ_DATA);
 
644
        } else {
 
645
          ICP_INCREMENT_DYN_STAT(no_data_read_stat);
 
646
          s->_peer->buf = NULL; // release reference
 
647
          s->_next_state = READ_NOT_ACTIVE_EXIT;
 
648
          RECORD_ICP_STATE_CHANGE(s, 0, READ_NOT_ACTIVE_EXIT);
 
649
        }
 
650
        if (_recursion_depth > 0) {
 
651
          return EVENT_DONE;
 
652
        } else {
 
653
          break;
 
654
        }
 
655
      }
 
656
#if !defined(_WIN32) && !defined(__GNUC__)
 
657
    _end_case_read_data_done:  // fix DEC warnings
 
658
#endif
 
659
      ink_release_assert(0);    // Should never happen
 
660
 
 
661
    case PROCESS_READ_DATA:
 
662
    case ADD_PEER:
 
663
      {
 
664
        ink_release_assert(_recursion_depth == 0);
 
665
 
 
666
        Ptr<IOBufferBlock> bufblock = s->_peer->buf;
 
667
        char *buf = bufblock->start();
 
668
 
 
669
        if (s->_next_state == PROCESS_READ_DATA) {
 
670
          ICPRequestCont::NetToHostICPMsg((ICPMsg_t *)
 
671
                                          (buf + sizeof(ICPMsg_t)), (ICPMsg_t *) buf);
 
672
 
 
673
          // adjust buffer pointers to point to decoded message.
 
674
          bufblock->reset();
 
675
          bufblock->fill(s->_bytesReceived);
 
676
 
 
677
          // Validate message length for sanity
 
678
          if (s->_bytesReceived < ((ICPMsg_t *) buf)->h.msglen) {
 
679
            //
 
680
            // Short read, terminate
 
681
            //
 
682
            ICP_INCREMENT_DYN_STAT(short_read_stat);
 
683
            s->_peer->buf = NULL;
 
684
            s->_next_state = READ_NOT_ACTIVE;
 
685
            RECORD_ICP_STATE_CHANGE(s, 0, READ_NOT_ACTIVE);
 
686
            break;              // move to next_state
 
687
          }
 
688
        }
 
689
        // Validate receiver and convert the received sockaddr
 
690
        //   to internal sockaddr format.
 
691
        struct sockaddr_in from;
 
692
        if (!s->_peer->ExtToIntRecvSockAddr(&s->_peer->fromaddr, &from)) {
 
693
          int status;
 
694
          ICPConfigData *cfg = _ICPpr->GetConfig()->globalConfig();
 
695
          ICPMsg_t *ICPmsg = (ICPMsg_t *) buf;
 
696
 
 
697
          if ((cfg->ICPconfigured() == ICP_MODE_RECEIVE_ONLY) &&
 
698
              cfg->ICPReplyToUnknownPeer() &&
 
699
              ((ICPmsg->h.version == ICP_VERSION_2) ||
 
700
               (ICPmsg->h.version == ICP_VERSION_3)) && (ICPmsg->h.opcode == ICP_OP_QUERY)) {
 
701
 
 
702
            //
 
703
            // Add the unknown Peer to our database to
 
704
            // allow us to resolve the lookup request.
 
705
            //
 
706
            if (!_ICPpr->GetConfig()->Lock()) {
 
707
              s->_next_state = ADD_PEER;
 
708
              RECORD_ICP_STATE_CHANGE(s, 0, ADD_PEER);
 
709
              return EVENT_CONT;
 
710
            }
 
711
            if (!_ICPpr->GetFreePeers() || !_ICPpr->GetFreeSendPeers()) {
 
712
              Warning("ICP Peer limit exceeded");
 
713
              REC_SignalWarning(REC_SIGNAL_CONFIG_ERROR, "ICP Peer limit exceeded");
 
714
              _ICPpr->GetConfig()->Unlock();
 
715
              goto invalid_message;
 
716
            }
 
717
 
 
718
            int icp_reply_port = cfg->ICPDefaultReplyPort();
 
719
            if (!icp_reply_port) {
 
720
              icp_reply_port = ntohs(s->_peer->fromaddr.sin_port);
 
721
            }
 
722
            PeerConfigData *Pcfg = NEW(new PeerConfigData(PeerConfigData::CTYPE_SIBLING,
 
723
                                                          &s->_peer->fromaddr.sin_addr, 0,
 
724
                                                          icp_reply_port));
 
725
            ParentSiblingPeer *P = NEW(new ParentSiblingPeer(PEER_SIBLING, Pcfg, _ICPpr, true));
 
726
            status = _ICPpr->AddPeer(P);
 
727
            ink_release_assert(status);
 
728
            status = _ICPpr->AddPeerToSendList(P);
 
729
            ink_release_assert(status);
 
730
 
 
731
            P->GetChan()->setRemote(P->GetIP()->s_addr, P->GetPort());
 
732
 
 
733
            // coverity[uninit_use_in_call]
 
734
            Note("ICP Peer added ip=%u.%u.%u.%u port=%d", PRINT_IP(P->GetIP()->s_addr), P->GetPort());
 
735
            from = s->_peer->fromaddr;
 
736
          } else {
 
737
          invalid_message:
 
738
            //
 
739
            // Sender does not exist in ICP configuration, terminate
 
740
            //
 
741
            ICP_INCREMENT_DYN_STAT(invalid_sender_stat);
 
742
            Debug("icp", "Received msg from invalid sender [%s:%d]",
 
743
                  inet_ntoa(s->_peer->fromaddr.sin_addr), ntohs(s->_peer->fromaddr.sin_port));
 
744
 
 
745
            s->_peer->buf = NULL;
 
746
            s->_next_state = READ_NOT_ACTIVE;
 
747
            RECORD_ICP_STATE_CHANGE(s, 0, READ_NOT_ACTIVE);
 
748
            break;              // move to next_state
 
749
          }
 
750
        }
 
751
        // we hand off the decoded buffer from the Peer to the PeerReadData
 
752
        s->_sender = from;
 
753
        s->_rICPmsg_len = s->_bytesReceived;
 
754
        ink_assert(s->_buf == NULL);
 
755
        s->_buf = s->_peer->buf;
 
756
        s->_rICPmsg = (ICPMsg_t *) s->_buf->start();
 
757
        s->_peer->buf = NULL;
 
758
 
 
759
        //
 
760
        // Handle only ICP_VERSION_2/3 messages.  Reject all others.
 
761
        //
 
762
        if ((s->_rICPmsg->h.version != ICP_VERSION_2)
 
763
            && (s->_rICPmsg->h.version != ICP_VERSION_3)) {
 
764
          ICP_INCREMENT_DYN_STAT(read_not_v2_icp_stat);
 
765
          Debug("icp", "Received (v=%d) !v2 && !v3 msg from sender [%s:%d]",
 
766
                (uint32_t) s->_rICPmsg->h.version, inet_ntoa(from.sin_addr), ntohs(from.sin_port));
 
767
 
 
768
          s->_rICPmsg = NULL;
 
769
          s->_buf = NULL;
 
770
          s->_next_state = READ_NOT_ACTIVE;
 
771
          RECORD_ICP_STATE_CHANGE(s, 0, READ_NOT_ACTIVE);
 
772
          break;                // move to next_state
 
773
        }
 
774
        //
 
775
        // If this is a query message, redirect to
 
776
        // the query specific handlers.
 
777
        //
 
778
        if (s->_rICPmsg->h.opcode == ICP_OP_QUERY) {
 
779
          ICP_INCREMENT_DYN_STAT(icp_remote_query_requests_stat);
 
780
          ink_assert(!s->_mycont);
 
781
          s->_next_state = AWAITING_CACHE_LOOKUP_RESPONSE;
 
782
          RECORD_ICP_STATE_CHANGE(s, 0, AWAITING_CACHE_LOOKUP_RESPONSE);
 
783
 
 
784
          if (ICPPeerQueryCont(0, (Event *) 0) == EVENT_DONE) {
 
785
            break;              // Callback complete
 
786
          } else {
 
787
            return EVENT_DONE;  // Callback pending
 
788
          }
 
789
        } else {
 
790
          // We have a response message for an ICP query.
 
791
          Debug("icp", "Response for Id=%d, from [%s:%d]",
 
792
                s->_rICPmsg->h.requestno, inet_ntoa(s->_sender.sin_addr), ntohs(s->_sender.sin_port));
 
793
          ICP_INCREMENT_DYN_STAT(icp_remote_responses_stat);
 
794
          s->_next_state = GET_ICP_REQUEST;
 
795
          RECORD_ICP_STATE_CHANGE(s, 0, GET_ICP_REQUEST);
 
796
          break;                // move to next_state
 
797
        }
 
798
      }
 
799
#if !defined(_WIN32) && !defined(__GNUC__)
 
800
    _end_case_process_data_read:       // fix DEC warnings
 
801
#endif
 
802
      ink_release_assert(0);    // Should never happen
 
803
 
 
804
    case AWAITING_CACHE_LOOKUP_RESPONSE:
 
805
      {
 
806
        int status = 0;
 
807
        void *data = s->_rICPmsg->un.query.URL;
 
808
        int datalen = strlen((const char *) data) + 1;
 
809
 
 
810
        if (s->_queryResult == CACHE_EVENT_LOOKUP) {
 
811
          // Use the received ICP data buffer for the response message
 
812
          Debug("icp", "Sending ICP_OP_HIT for id=%d, [%s] to [%s:%d]",
 
813
                s->_rICPmsg->h.requestno, data, inet_ntoa(s->_sender.sin_addr), ntohs(s->_sender.sin_port));
 
814
          ICP_INCREMENT_DYN_STAT(icp_cache_lookup_success_stat);
 
815
          status = ICPRequestCont::BuildICPMsg(ICP_OP_HIT,
 
816
                                               s->_rICPmsg->h.requestno, 0 /* optflags */ , 0 /* optdata */ ,
 
817
                                               0 /* shostid */ ,
 
818
                                               data, datalen, &s->_mhdr, s->_iov, s->_rICPmsg);
 
819
        } else if (s->_queryResult == CACHE_EVENT_LOOKUP_FAILED) {
 
820
          // Use the received ICP data buffer for response message
 
821
          Debug("icp", "Sending ICP_OP_MISS for id=%d, [%s] to [%s:%d]",
 
822
                s->_rICPmsg->h.requestno, data, inet_ntoa(s->_sender.sin_addr), ntohs(s->_sender.sin_port));
 
823
          ICP_INCREMENT_DYN_STAT(icp_cache_lookup_fail_stat);
 
824
          status = ICPRequestCont::BuildICPMsg(ICP_OP_MISS,
 
825
                                               s->_rICPmsg->h.requestno, 0 /* optflags */ , 0 /* optdata */ ,
 
826
                                               0 /* shostid */ ,
 
827
                                               data, datalen, &s->_mhdr, s->_iov, s->_rICPmsg);
 
828
        } else {
 
829
          Warning("Bad cache lookup event: %d", s->_queryResult);
 
830
          ink_release_assert(!"Invalid cache lookup event");
 
831
        }
 
832
        ink_assert(status == 0);
 
833
 
 
834
        // Make system log entry for ICP query
 
835
        ICPlog logentry(s);
 
836
        LogAccessICP accessor(&logentry);
 
837
        Log::access(&accessor);
 
838
 
 
839
        s->_next_state = SEND_REPLY;
 
840
        RECORD_ICP_STATE_CHANGE(s, 0, SEND_REPLY);
 
841
 
 
842
        if (_recursion_depth > 0) {
 
843
          return EVENT_DONE;
 
844
        } else {
 
845
          break;
 
846
        }
 
847
      }
 
848
#if !defined(_WIN32) && !defined(__GNUC__)
 
849
    _end_case_awaiting_cache_lookup_response:  // fix DEC warnings
 
850
#endif
 
851
      ink_release_assert(0);    // Should never happen
 
852
 
 
853
    case SEND_REPLY:
 
854
      {
 
855
        ink_release_assert(_recursion_depth == 0);
 
856
        //
 
857
        // Send the query response back to the sender
 
858
        //
 
859
        s->_next_state = WRITE_DONE;
 
860
        RECORD_ICP_STATE_CHANGE(s, 0, WRITE_DONE);
 
861
        ink_assert(s->_peer->writeAction == NULL);
 
862
        Action *a = s->_peer->SendMsg_re(this, this,
 
863
                                         &s->_mhdr, &s->_sender);
 
864
        if (!a) {
 
865
          a = ACTION_IO_ERROR;
 
866
        }
 
867
        if (a == ACTION_RESULT_DONE) {
 
868
          // we have been called back already and our state updated
 
869
          // appropriately
 
870
          break;
 
871
 
 
872
        } else if (a == ACTION_IO_ERROR) {
 
873
          // Partial write.
 
874
          ICP_INCREMENT_DYN_STAT(query_response_partial_write_stat);
 
875
          unsigned char x[4];
 
876
          *(uint32_t *) & x = (uint32_t) s->_sender.sin_addr.s_addr;
 
877
          // coverity[uninit_use_in_call]
 
878
          Debug("icp_warn", "ICP response send, sent=%d res=%d, ip=%d.%d.%d.%d",
 
879
                ntohs(s->_rICPmsg->h.msglen), -1, x[0], x[1], x[2], x[3]);
 
880
          s->_next_state = READ_NOT_ACTIVE;
 
881
          RECORD_ICP_STATE_CHANGE(s, 0, READ_NOT_ACTIVE);
 
882
          break;
 
883
        } else {
 
884
          s->_peer->writeAction = a;
 
885
          return EVENT_DONE;
 
886
        }
 
887
      }
 
888
#if !defined(_WIN32) && !defined(__GNUC__)
 
889
    _end_case_send_reply:      // fix DEC warnings
 
890
#endif
 
891
      ink_release_assert(0);    // Should never happen
 
892
 
 
893
    case WRITE_DONE:
 
894
      {
 
895
        s->_peer->writeAction = NULL;
 
896
        int len = completionUtil::getBytesTransferred(e);
 
897
 
 
898
        if (len == (int)ntohs(s->_rICPmsg->h.msglen)) {
 
899
          ICP_INCREMENT_DYN_STAT(query_response_write_stat);
 
900
          s->_peer->LogSendMsg(s->_rICPmsg, &s->_sender);       // log query reply
 
901
        } else {
 
902
          // Partial write.
 
903
          ICP_INCREMENT_DYN_STAT(query_response_partial_write_stat);
 
904
          unsigned char x[4];
 
905
          *(uint32_t *) & x = (uint32_t) s->_sender.sin_addr.s_addr;
 
906
          // coverity[uninit_use_in_call]
 
907
          Debug("icp_warn", "ICP response send, sent=%d res=%d, ip=%d.%d.%d.%d",
 
908
                ntohs(s->_rICPmsg->h.msglen), len, x[0], x[1], x[2], x[3]);
 
909
        }
 
910
        // Processing complete, perform completion actions
 
911
        s->_next_state = READ_NOT_ACTIVE;
 
912
        RECORD_ICP_STATE_CHANGE(s, 0, READ_NOT_ACTIVE);
 
913
        Debug("icp", "state->READ_NOT_ACTIVE");
 
914
 
 
915
        if (_recursion_depth > 0) {
 
916
          return EVENT_DONE;
 
917
        } else {
 
918
          break;                // move to next_state
 
919
        }
 
920
      }
 
921
#if !defined(_WIN32) && !defined(__GNUC__)
 
922
    _end_case_write_done:      // fix DEC warnings
 
923
#endif
 
924
      ink_release_assert(0);    // Should never happen
 
925
 
 
926
    case GET_ICP_REQUEST:
 
927
      {
 
928
        ink_release_assert(_recursion_depth == 0);
 
929
        ink_assert(s->_rICPmsg && s->_rICPmsg_len);     // Sanity check
 
930
 
 
931
        // Get ICP request associated with response message
 
932
        s->_ICPReqCont = ICPRequestCont::FindICPRequest(s->_rICPmsg->h.requestno);
 
933
        if (s->_ICPReqCont) {
 
934
          s->_next_state = GET_ICP_REQUEST_MUTEX;
 
935
          RECORD_ICP_STATE_CHANGE(s, 0, GET_ICP_REQUEST_MUTEX);
 
936
          break;                // move to next_state
 
937
        }
 
938
        //
 
939
        // No ICP request for response message, log as "response
 
940
        // for non-existent ICP request" and terminate processing
 
941
        //
 
942
        Debug("icp", "No ICP Request for Id=%d", s->_rICPmsg->h.requestno);
 
943
        ICP_INCREMENT_DYN_STAT(no_icp_request_for_response_stat);
 
944
        Peer *p = _ICPpr->FindPeer(&s->_sender.sin_addr,
 
945
                                   ntohs(s->_sender.sin_port));
 
946
        p->LogRecvMsg(s->_rICPmsg, 0);
 
947
        s->_next_state = READ_NOT_ACTIVE;
 
948
        RECORD_ICP_STATE_CHANGE(s, 0, READ_NOT_ACTIVE);
 
949
        break;                  // move to next_state
 
950
      }
 
951
#if !defined(_WIN32) && !defined(__GNUC__)
 
952
    _end_case_get_icp_request: // fix DEC warnings
 
953
#endif
 
954
      ink_release_assert(0);    // Should never happen
 
955
 
 
956
    case GET_ICP_REQUEST_MUTEX:
 
957
      {
 
958
        ink_release_assert(_recursion_depth == 0);
 
959
        ink_assert(s->_ICPReqCont);
 
960
        Ptr<ProxyMutex> ICPReqContMutex(s->_ICPReqCont->mutex);
 
961
        EThread *ethread = this_ethread();
 
962
        ink_hrtime request_start_time;
 
963
 
 
964
        if (!MUTEX_TAKE_TRY_LOCK(ICPReqContMutex, ethread)) {
 
965
          ICP_INCREMENT_DYN_STAT(icp_response_request_nolock_stat);
 
966
          //
 
967
          // Unable to get ICP request mutex, delay and move back
 
968
          // to the GET_ICP_REQUEST state.  We need to do this
 
969
          // since the ICP request may be deallocated by the active
 
970
          // continuation.
 
971
          //
 
972
          s->_ICPReqCont = (ICPRequestCont *) 0;
 
973
          s->_next_state = GET_ICP_REQUEST;
 
974
          RECORD_ICP_STATE_CHANGE(s, 0, GET_ICP_REQUEST);
 
975
          return EVENT_CONT;
 
976
        }
 
977
        // Log as "response for ICP request"
 
978
        Peer *p = _ICPpr->FindPeer(&s->_sender.sin_addr,
 
979
                                   ntohs(s->_sender.sin_port));
 
980
        p->LogRecvMsg(s->_rICPmsg, 1);
 
981
 
 
982
        // Process the ICP response for the given ICP request
 
983
        ICPRequestCont::ICPRequestEventArgs_t args;
 
984
        args.rICPmsg = s->_rICPmsg;
 
985
        args.rICPmsg_len = s->_rICPmsg_len;
 
986
        args.peer = p;
 
987
        if (!s->_ICPReqCont->GetActionPtr()->cancelled) {
 
988
          request_start_time = s->_ICPReqCont->GetRequestStartTime();
 
989
          Debug("icp", "Passing Reply for ICP Id=%d", s->_rICPmsg->h.requestno);
 
990
          s->_ICPReqCont->handleEvent((int) ICP_RESPONSE_MESSAGE, (void *) &args);
 
991
        } else {
 
992
          request_start_time = 0;
 
993
          delete s->_ICPReqCont;
 
994
          Debug("icp", "User cancelled ICP request Id=%d", s->_rICPmsg->h.requestno);
 
995
        }
 
996
 
 
997
        // Note: s->_ICPReqCont is deallocated at this point.
 
998
        s->_ICPReqCont = 0;
 
999
 
 
1000
        MUTEX_UNTAKE_LOCK(ICPReqContMutex, ethread);
 
1001
        if (request_start_time) {
 
1002
          ICP_SUM_DYN_STAT(total_icp_response_time_stat, (ink_get_hrtime() - request_start_time));
 
1003
        }
 
1004
        RECORD_ICP_STATE_CHANGE(s, 0, READ_NOT_ACTIVE);
 
1005
        s->_next_state = READ_NOT_ACTIVE;
 
1006
        break;                  // move to next_state
 
1007
      }
 
1008
#if !defined(_WIN32) && !defined(__GNUC__)
 
1009
    _end_case_get_icp_request_mutex:   // fix DEC warnings
 
1010
#endif
 
1011
      ink_release_assert(0);    // Should never happen
 
1012
 
 
1013
    case READ_NOT_ACTIVE:
 
1014
    case READ_NOT_ACTIVE_EXIT:
 
1015
      {
 
1016
        ink_release_assert(_recursion_depth == 0);
 
1017
        if (!_ICPpr->Lock())
 
1018
          return EVENT_CONT;    // unable to get lock, try again later
 
1019
 
 
1020
        // Note incoming ICP request or response completion
 
1021
        _ICPpr->DecPendingQuery();
 
1022
        _ICPpr->Unlock();
 
1023
 
 
1024
        s->_buf = 0;
 
1025
        if (s->_next_state == READ_NOT_ACTIVE_EXIT) {
 
1026
          s->_next_state = READ_PROCESSING_COMPLETE;
 
1027
          return EVENT_DONE;
 
1028
        } else {
 
1029
          // Last read was valid, see if any more read data before exiting
 
1030
          s->reset();
 
1031
          s->_start_time = ink_get_hrtime();
 
1032
          s->_next_state = READ_ACTIVE;
 
1033
          RECORD_ICP_STATE_CHANGE(s, 0, READ_ACTIVE);
 
1034
          break;                // restart
 
1035
        }
 
1036
      }
 
1037
#if !defined(_WIN32) && !defined(__GNUC__)
 
1038
    _end_case_read_not_active: // fix DEC warnings
 
1039
#endif
 
1040
      ink_release_assert(0);    // Should never happen
 
1041
 
 
1042
    case READ_PROCESSING_COMPLETE:
 
1043
    default:
 
1044
      ink_release_assert(0);    // Should never happen
 
1045
 
 
1046
    }                           // End of switch
 
1047
 
 
1048
  }                             // End of while(1)
 
1049
}
 
1050
 
 
1051
//------------------------------------------------------------------------
 
1052
// Class ICPRequestCont member functions
 
1053
//      Implements the state machine which processes locally generated
 
1054
//      ICP queries.
 
1055
//------------------------------------------------------------------------
 
1056
ClassAllocator<ICPRequestCont> ICPRequestCont_allocator("ICPRequestCont_allocator");
 
1057
 
 
1058
ICPRequestCont::ICPRequestCont(ICPProcessor * pr, Continuation * c, URL * u)
 
1059
  : Continuation(0), _cont(c), _url(u), _start_time(0),
 
1060
    _ICPpr(pr), _timeout(0),
 
1061
    npending_actions(0), pendingActions(NULL),
 
1062
    _sequence_number(0), _expected_replies(0),
 
1063
    _expected_replies_list(MAX_DEFINED_PEERS), _received_replies(0), _next_state(ICP_START)
 
1064
{
 
1065
  memset((void *)&_ret_sockaddr, 0, sizeof(_ret_sockaddr));
 
1066
  _ret_status = ICP_LOOKUP_FAILED;
 
1067
  _act.cancelled = false;
 
1068
  _act = c;
 
1069
  memset((void *) &_ICPmsg, 0, sizeof(_ICPmsg));
 
1070
  memset((void *) &_sendMsgHdr, 0, sizeof(_sendMsgHdr));
 
1071
  memset((void *) &_sendMsgIOV, 0, sizeof(_sendMsgIOV[MSG_IOVECS]));
 
1072
 
 
1073
  if (c)
 
1074
    this->mutex = c->mutex;
 
1075
}
 
1076
 
 
1077
ICPRequestCont::~ICPRequestCont()
 
1078
{
 
1079
  _act = NULL;
 
1080
  this->mutex = NULL;
 
1081
 
 
1082
  if (_timeout) {
 
1083
    _timeout->cancel(this);
 
1084
    _timeout = 0;
 
1085
  }
 
1086
  RemoveICPRequest(_sequence_number);
 
1087
 
 
1088
  if (_ICPmsg.h.opcode == ICP_OP_QUERY) {
 
1089
    if (_ICPmsg.un.query.URL) {
 
1090
      xfree(_ICPmsg.un.query.URL);
 
1091
    }
 
1092
  }
 
1093
  if (pendingActions) {
 
1094
    delete pendingActions;
 
1095
    pendingActions = 0;
 
1096
  }
 
1097
}
 
1098
 
 
1099
void
 
1100
ICPRequestCont::remove_from_pendingActions(Action * a)
 
1101
{
 
1102
  if (!pendingActions) {
 
1103
    npending_actions--;
 
1104
    return;
 
1105
  }
 
1106
  for (intptr_t i = 0; i < pendingActions->length(); i++) {
 
1107
    if ((*pendingActions)[i] == a) {
 
1108
      for (intptr_t j = i; j < pendingActions->length() - 1; j++)
 
1109
        (*pendingActions)[j] = (*pendingActions)[j + 1];
 
1110
      pendingActions->set_length(pendingActions->length() - 1);
 
1111
      npending_actions--;
 
1112
      return;
 
1113
    }
 
1114
  }
 
1115
  npending_actions--;           // completed inline
 
1116
}
 
1117
 
 
1118
void
 
1119
ICPRequestCont::remove_all_pendingActions()
 
1120
{
 
1121
  int active_pendingActions = 0;
 
1122
 
 
1123
  if (!pendingActions) {
 
1124
    return;
 
1125
  }
 
1126
  for (intptr_t i = 0; i < pendingActions->length(); i++) {
 
1127
    if ((*pendingActions)[i]
 
1128
        && ((*pendingActions)[i] != ACTION_IO_ERROR)) {
 
1129
      ((*pendingActions)[i])->cancel();
 
1130
      (*pendingActions)[i] = 0;
 
1131
      npending_actions--;
 
1132
      active_pendingActions++;
 
1133
    } else {
 
1134
      (*pendingActions)[i] = 0;
 
1135
    }
 
1136
  }
 
1137
  pendingActions->set_length(pendingActions->length() - active_pendingActions);
 
1138
}
 
1139
 
 
1140
int
 
1141
ICPRequestCont::ICPRequestEvent(int event, Event * e)
 
1142
{
 
1143
  // Note: Passed parameter 'e' is not an Event *
 
1144
  //       if event == ICP_RESPONSE_MESSAGE
 
1145
 
 
1146
  ink_assert(event == NET_EVENT_DATAGRAM_WRITE_COMPLETE ||
 
1147
             event == NET_EVENT_DATAGRAM_WRITE_ERROR ||
 
1148
             event == EVENT_IMMEDIATE || event == EVENT_INTERVAL || event == ICP_RESPONSE_MESSAGE);
 
1149
  // handle reentrant callback
 
1150
  if ((event == NET_EVENT_DATAGRAM_WRITE_COMPLETE)
 
1151
      || (event == NET_EVENT_DATAGRAM_WRITE_ERROR)) {
 
1152
    ink_assert(npending_actions > 0);
 
1153
    remove_from_pendingActions((Action *) e);
 
1154
    return EVENT_DONE;
 
1155
  }
 
1156
  // Start of user ICP query request processing.  We start here after
 
1157
  // the reschedule in ICPProcessor::ICPQuery().
 
1158
  switch (_next_state) {
 
1159
  case ICP_START:
 
1160
  case ICP_OFF_TERMINATE:
 
1161
  case ICP_QUEUE_REQUEST:
 
1162
  case ICP_AWAITING_RESPONSE:
 
1163
  case ICP_DEQUEUE_REQUEST:
 
1164
  case ICP_POST_COMPLETION:
 
1165
  case ICP_REQUEST_NOT_ACTIVE:
 
1166
    {
 
1167
      if (ICPStateMachine(event, (void *) e) == EVENT_CONT) {
 
1168
        //
 
1169
        // Unable to acquire lock, reschedule continuation
 
1170
        //
 
1171
        eventProcessor.schedule_in(this, HRTIME_MSECONDS(RETRY_INTERVAL), ET_ICP);
 
1172
        return EVENT_CONT;
 
1173
 
 
1174
      } else if (_next_state == ICP_DONE) {
 
1175
        //
 
1176
        // ICP request processing complete.
 
1177
        //
 
1178
        delete this;
 
1179
        break;
 
1180
      } else {
 
1181
        break;
 
1182
      }
 
1183
    }
 
1184
#if !defined(_WIN32) && !defined(__GNUC__)
 
1185
  _end_case:                   // fix DEC warnings
 
1186
#endif
 
1187
    ink_release_assert(0);      // should never happen
 
1188
 
 
1189
  case ICP_DONE:
 
1190
  default:
 
1191
    ink_release_assert(0);      // should never happen
 
1192
  }                             // End of switch
 
1193
 
 
1194
  return EVENT_DONE;
 
1195
}
 
1196
 
 
1197
int
 
1198
ICPRequestCont::NopICPRequestEvent(int event, Event * e)
 
1199
{
 
1200
  NOWARN_UNUSED(event);
 
1201
  NOWARN_UNUSED(e);
 
1202
  delete this;
 
1203
  return EVENT_DONE;
 
1204
}
 
1205
 
 
1206
int
 
1207
ICPRequestCont::ICPStateMachine(int event, void *d)
 
1208
{
 
1209
  //*******************************************
 
1210
  // ICP message processing state machine
 
1211
  //*******************************************
 
1212
  ICPConfiguration *ICPcf = _ICPpr->GetConfig();
 
1213
 
 
1214
  while (1) {                   // loop forever
 
1215
 
 
1216
    switch (_next_state) {
 
1217
    case ICP_START:
 
1218
      {
 
1219
        // User may have cancelled request, if so abort request.
 
1220
        if (_act.cancelled) {
 
1221
          _next_state = ICP_DONE;
 
1222
          return EVENT_DONE;
 
1223
        }
 
1224
 
 
1225
        if (!_ICPpr->Lock())
 
1226
          return EVENT_CONT;    // Unable to get lock, try again later
 
1227
 
 
1228
        if (_ICPpr->AllowICPQueries() && (ICPcf->globalConfig()->ICPconfigured() == ICP_MODE_SEND_RECEIVE)) {
 
1229
 
 
1230
          // Reject NULL pointer or "localhost" URLs
 
1231
          if (_url->valid()) {
 
1232
            int host_len;
 
1233
            const char *host = _url->host_get(&host_len);
 
1234
            if (ptr_len_casecmp(host, host_len, "127.0.0.1") == 0 || ptr_len_casecmp(host, host_len, "localhost") == 0) {
 
1235
              _ICPpr->Unlock();
 
1236
 
 
1237
              // NULL pointer or "localhost" URL, terminate request
 
1238
              _next_state = ICP_OFF_TERMINATE;
 
1239
              Debug("icp", "[ICP_START] NULL/localhost URL ignored Id=%d", _sequence_number);
 
1240
              break;            // move to next_state
 
1241
            }
 
1242
          }
 
1243
          // Note pending ICP request
 
1244
          _ICPpr->IncPendingQuery();
 
1245
          _ICPpr->Unlock();
 
1246
 
 
1247
          // Build the ICP query message
 
1248
          char *urlstr = _url->string_get(NULL);
 
1249
          int urlstr_len = strlen(urlstr) + 1;
 
1250
 
 
1251
          int status = BuildICPMsg(ICP_OP_QUERY,
 
1252
                                   _sequence_number = ICPReqSeqNumber(),
 
1253
                                   0 /* optflags */ , 0 /* optdata */ ,
 
1254
                                   0 /* shostid */ ,
 
1255
                                   (void *) urlstr, urlstr_len,
 
1256
                                   &_sendMsgHdr, _sendMsgIOV,
 
1257
                                   &_ICPmsg);
 
1258
          // urlstr memory freed in destructor
 
1259
          ink_assert(status == 0);
 
1260
          Debug("icp", "[ICP_START] ICP_OP_QUERY for [%s], Id=%d", urlstr, _sequence_number);
 
1261
 
 
1262
          _next_state = ICP_QUEUE_REQUEST;
 
1263
          break;                // move to next_state
 
1264
 
 
1265
        } else {
 
1266
          ICP_INCREMENT_DYN_STAT(icp_start_icpoff_stat);
 
1267
          _ICPpr->Unlock();
 
1268
 
 
1269
          // ICP NOT enabled, terminate request
 
1270
          _next_state = ICP_OFF_TERMINATE;
 
1271
          break;                // move to next_state
 
1272
        }
 
1273
      }
 
1274
#if !defined(_WIN32) && !defined(__GNUC__)
 
1275
    _end_case_icp_start:       // fix DEC warnings
 
1276
#endif
 
1277
      ink_release_assert(0);    // should never happen
 
1278
 
 
1279
    case ICP_OFF_TERMINATE:
 
1280
      {
 
1281
        if (!MUTEX_TAKE_TRY_LOCK_FOR(mutex, this_ethread(), _cont)) {
 
1282
          return EVENT_CONT;    // unable to get lock, delay and retry
 
1283
        }
 
1284
        Debug("icp", "[ICP_OFF_TERMINATE] Id=%d", _sequence_number);
 
1285
 
 
1286
        // ICP NOT enabled, post completion on request
 
1287
        if (!_act.cancelled) {
 
1288
          _cont->handleEvent(_ret_status, (void *) &_ret_sockaddr);
 
1289
        }
 
1290
        MUTEX_UNTAKE_LOCK(mutex, this_ethread());
 
1291
 
 
1292
        _next_state = ICP_DONE;
 
1293
        return EVENT_DONE;
 
1294
      }
 
1295
#if !defined(_WIN32) && !defined(__GNUC__)
 
1296
    _end_case_icp_off_terminate:       // fix DEC warnings
 
1297
#endif
 
1298
      ink_release_assert(0);    // should never happen
 
1299
 
 
1300
    case ICP_QUEUE_REQUEST:
 
1301
      {
 
1302
        // Place ICP request on the pending request queue
 
1303
        int ret = AddICPRequest(_sequence_number, this);
 
1304
        ink_assert(ret == 0);
 
1305
 
 
1306
        // Generate ICP requests to peers
 
1307
        int bias = _ICPpr->GetStartingSendPeerBias();
 
1308
        int SendPeers = _ICPpr->GetSendPeers();
 
1309
        npending_actions = 0;
 
1310
        while (SendPeers > 0) {
 
1311
          Peer *P = _ICPpr->GetNthSendPeer(SendPeers, bias);
 
1312
          if (!P->IsOnline()) {
 
1313
            SendPeers--;
 
1314
            continue;
 
1315
          }
 
1316
          //
 
1317
          // Send query request to Peers
 
1318
          //
 
1319
 
 
1320
          // because of reentrancy, we have to do this first, just
 
1321
          // in case we get called back immediately.
 
1322
          int was_expected = P->ExpectedReplies(&_expected_replies_list);
 
1323
          _expected_replies += was_expected;
 
1324
          npending_actions++;
 
1325
          Action *a = P->SendMsg_re(this,
 
1326
                                    P,
 
1327
                                    &_sendMsgHdr, (struct sockaddr_in *) 0);
 
1328
          if (!a) {
 
1329
            a = ACTION_IO_ERROR;
 
1330
          }
 
1331
          if (a != ACTION_IO_ERROR) {
 
1332
            if (a != ACTION_RESULT_DONE) {
 
1333
              if (!pendingActions) {
 
1334
                pendingActions = NEW(new DynArray<Action *>(&default_action));
 
1335
              }
 
1336
              (*pendingActions) (npending_actions) = a;
 
1337
            }
 
1338
            P->LogSendMsg(&_ICPmsg, (struct sockaddr_in *) 0);  // log as send query
 
1339
            Debug("icp", "[ICP_QUEUE_REQUEST] Id=%d send query to [%s:%d]",
 
1340
                  _sequence_number, inet_ntoa(*P->GetIP()), P->GetPort());
 
1341
          } else {
 
1342
            _expected_replies_list.ClearBit(P->GetPeerID());
 
1343
            _expected_replies -= was_expected;
 
1344
            // Partial or failed write.
 
1345
            ICP_INCREMENT_DYN_STAT(send_query_partial_write_stat);
 
1346
            unsigned char x[4];
 
1347
            *(uint32_t *) & x = (uint32_t) (*P->GetIP()).s_addr;
 
1348
            // coverity[uninit_use_in_call]
 
1349
            Debug("icp_warn",
 
1350
                  "ICP query send, res=%d, ip=%d.%d.%d.%d", ntohs(_ICPmsg.h.msglen), x[0], x[1], x[2], x[3]);
 
1351
          }
 
1352
          SendPeers--;
 
1353
        }
 
1354
 
 
1355
        Debug("icp", "[ICP_QUEUE_REQUEST] Id=%d expected replies=%d", _sequence_number, _expected_replies);
 
1356
        if (!_expected_replies) {
 
1357
          //
 
1358
          // Nothing to wait for, terminate ICP processing
 
1359
          //
 
1360
          ICP_INCREMENT_DYN_STAT(icp_queries_no_expected_replies_stat);
 
1361
          _next_state = ICP_DEQUEUE_REQUEST;
 
1362
          break;                // move to next_state
 
1363
        }
 
1364
        ICP_SUM_DYN_STAT(total_udp_send_queries_stat, _expected_replies);
 
1365
 
 
1366
        //
 
1367
        // Setup ICP request response timeout
 
1368
        //
 
1369
        int tval = _ICPpr->GetConfig()->globalConfig()->ICPqueryTimeout();
 
1370
        _timeout = eventProcessor.schedule_in(this, HRTIME_SECONDS(tval), ET_ICP);
 
1371
 
 
1372
        _next_state = ICP_AWAITING_RESPONSE;
 
1373
        return EVENT_DONE;
 
1374
      }
 
1375
#if !defined(_WIN32) && !defined(__GNUC__)
 
1376
    _end_case_icp_queue_request:       // fix DEC warnings
 
1377
#endif
 
1378
      ink_release_assert(0);    // should never happen
 
1379
 
 
1380
    case ICP_AWAITING_RESPONSE:
 
1381
      {
 
1382
        Debug("icp", "[ICP_AWAITING_RESPONSE] Id=%d", _sequence_number);
 
1383
        ink_assert(d);
 
1384
        ICPRequestEventArgs_t dummyArgs;
 
1385
        ICPRequestEventArgs_t *args = 0;
 
1386
 
 
1387
        if (event == ICP_RESPONSE_MESSAGE) {
 
1388
          args = (ICPRequestEventArgs_t *) d;
 
1389
        } else if (event == EVENT_INTERVAL) {
 
1390
          memset((void *) &dummyArgs, 0, sizeof(dummyArgs));
 
1391
          args = &dummyArgs;
 
1392
        } else {
 
1393
          ink_release_assert(0);        // should never happen
 
1394
        }
 
1395
 
 
1396
        // Process ICP response
 
1397
        if (ICPResponseMessage(event, args->rICPmsg, args->rICPmsg_len, args->peer) == EVENT_DONE) {
 
1398
          // ICP Request processing is complete, do completion actions
 
1399
          _next_state = ICP_DEQUEUE_REQUEST;
 
1400
          break;                // move to next_state
 
1401
 
 
1402
        } else {
 
1403
          // Continue to wait for additional replies
 
1404
          return EVENT_DONE;
 
1405
        }
 
1406
      }
 
1407
#if !defined(_WIN32) && !defined(__GNUC__)
 
1408
    _end_case_icp_awaiting_response:   // fix DEC warnings
 
1409
#endif
 
1410
      ink_release_assert(0);    // should never happen
 
1411
 
 
1412
    case ICP_DEQUEUE_REQUEST:
 
1413
      {
 
1414
        // Remove ICP request from active queue
 
1415
        int ret = RemoveICPRequest(_sequence_number);
 
1416
        Debug("icp", "[ICP_DEQUEUE_REQUEST] Id=%d", _sequence_number);
 
1417
        ink_assert(ret == 0);
 
1418
        //_sequence_number = 0; // moved to REQUEST_NOT_ACTIVE
 
1419
        _next_state = ICP_POST_COMPLETION;
 
1420
        break;                  // move to next_state
 
1421
      }
 
1422
#if !defined(_WIN32) && !defined(__GNUC__)
 
1423
    _end_case_icp_dequeue_request:     // fix DEC warnings
 
1424
#endif
 
1425
      ink_release_assert(0);    // should never happen
 
1426
 
 
1427
    case ICP_POST_COMPLETION:
 
1428
      {
 
1429
        if (!MUTEX_TAKE_TRY_LOCK_FOR(mutex, this_ethread(), _cont)) {
 
1430
          return EVENT_CONT;    // unable to get lock, delay and retry
 
1431
        }
 
1432
        Debug("icp", "[ICP_POST_COMPLETION] Id=%d", _sequence_number);
 
1433
 
 
1434
        // Post completion on the ICP request.
 
1435
        if (!_act.cancelled) {
 
1436
          _cont->handleEvent(_ret_status, (void *) &_ret_sockaddr);
 
1437
        }
 
1438
        MUTEX_UNTAKE_LOCK(mutex, this_ethread());
 
1439
        ICP_SUM_DYN_STAT(total_icp_request_time_stat, (ink_get_hrtime() - _start_time));
 
1440
 
 
1441
        _next_state = ICP_WAIT_SEND_COMPLETE;
 
1442
        break;                  // move to next_state
 
1443
      }
 
1444
#if !defined(_WIN32) && !defined(__GNUC__)
 
1445
    _end_case_icp_post_completion:     // fix DEC warnings
 
1446
#endif
 
1447
      ink_release_assert(0);    // should never happen
 
1448
    case ICP_WAIT_SEND_COMPLETE:
 
1449
      {
 
1450
        // wait for all the sends to complete.
 
1451
        if (npending_actions > 0) {
 
1452
          Debug("icp", "[ICP_WAIT_SEND_COMPLETE] Id=%d active=%d", _sequence_number, npending_actions);
 
1453
        } else {
 
1454
          _next_state = ICP_REQUEST_NOT_ACTIVE;
 
1455
          // move to next state
 
1456
          break;
 
1457
        }
 
1458
      }
 
1459
      break;
 
1460
#if !defined(_WIN32) && !defined(__GNUC__)
 
1461
    _end_case_icp_wait_send_complete:  // fix DEC warnings
 
1462
#endif
 
1463
      ink_release_assert(0);    // should never happen
 
1464
    case ICP_REQUEST_NOT_ACTIVE:
 
1465
      {
 
1466
        Debug("icp", "[ICP_REQUEST_NOT_ACTIVE] Id=%d", _sequence_number);
 
1467
        _sequence_number = 0;
 
1468
        if (!_ICPpr->Lock())
 
1469
          return EVENT_CONT;    // Unable to get lock, try again later
 
1470
 
 
1471
        // Note pending ICP request completion
 
1472
        _ICPpr->DecPendingQuery();
 
1473
        _ICPpr->Unlock();
 
1474
 
 
1475
        _next_state = ICP_DONE;
 
1476
        return EVENT_DONE;
 
1477
      }
 
1478
#if !defined(_WIN32) && !defined(__GNUC__)
 
1479
    _end_case_icp_request_not_active:  // fix DEC warnings
 
1480
#endif
 
1481
      ink_release_assert(0);    // should never happen
 
1482
 
 
1483
    case ICP_DONE:
 
1484
    default:
 
1485
      ink_release_assert(0);    // should never happen
 
1486
 
 
1487
    }                           // End of switch
 
1488
 
 
1489
  }                             // End of while(1)
 
1490
}
 
1491
 
 
1492
int
 
1493
ICPRequestCont::ICPResponseMessage(int event, ICPMsg_t * m, int ICPMsg_len, Peer * peer)
 
1494
{
 
1495
  NOWARN_UNUSED(ICPMsg_len);
 
1496
  if (event == EVENT_INTERVAL) {
 
1497
    _timeout = 0;
 
1498
    remove_all_pendingActions();
 
1499
 
 
1500
    // ICP request response timeout, if we received a response from
 
1501
    // any parent, return it to resolve the miss.
 
1502
 
 
1503
    if (_received_replies) {
 
1504
      int NumParentPeers = _ICPpr->GetParentPeers();
 
1505
      if (NumParentPeers > 0) {
 
1506
        int n;
 
1507
        Peer *pp;
 
1508
        for (n = 0; n < NumParentPeers; n++) {
 
1509
          pp = _ICPpr->GetNthParentPeer(0, _ICPpr->GetStartingParentPeerBias());
 
1510
          if (pp && !_expected_replies_list.IsBitSet(pp->GetPeerID())
 
1511
              && pp->isUp()) {
 
1512
            _ret_sockaddr.sin_addr.s_addr = (pp->GetIP())->s_addr;
 
1513
            _ret_sockaddr.sin_port = htons(((ParentSiblingPeer *) pp)->GetProxyPort());
 
1514
            _ret_status = ICP_LOOKUP_FOUND;
 
1515
 
 
1516
            Debug("icp",
 
1517
                  "ICP timeout using parent Id=%d from [%s:%d] return [%s:%d]",
 
1518
                  _sequence_number, inet_ntoa(*pp->GetIP()),
 
1519
                  pp->GetPort(), inet_ntoa(_ret_sockaddr.sin_addr), ntohs(_ret_sockaddr.sin_port));
 
1520
            return EVENT_DONE;
 
1521
          }
 
1522
        }
 
1523
      }
 
1524
    }
 
1525
    // Timeout received on ICP request, return ICP_LOOKUP_FAILED
 
1526
    Debug("icp", "ICP Response timeout for Id=%d", _sequence_number);
 
1527
    return EVENT_DONE;
 
1528
 
 
1529
  } else {
 
1530
    // We have received a response to our ICP query request.
 
1531
    // See if this response resolves the ICP query.
 
1532
    //
 
1533
    ink_assert(m->h.requestno == _sequence_number);
 
1534
 
 
1535
    switch (m->h.opcode) {
 
1536
    case ICP_OP_HIT:
 
1537
    case ICP_OP_HIT_OBJ:
 
1538
      {
 
1539
        // Kill timeout event
 
1540
        _timeout->cancel(this);
 
1541
        _timeout = 0;
 
1542
 
 
1543
        ICP_INCREMENT_DYN_STAT(icp_query_hits_stat);
 
1544
        ++_received_replies;
 
1545
        _ret_sockaddr.sin_addr.s_addr = (peer->GetIP())->s_addr;
 
1546
        _ret_sockaddr.sin_port = htons(((ParentSiblingPeer *) peer)->GetProxyPort());
 
1547
        _ret_status = ICP_LOOKUP_FOUND;
 
1548
 
 
1549
        Debug("icp",
 
1550
              "ICP Response HIT for Id=%d from [%s:%d] return [%s:%d]",
 
1551
              _sequence_number, inet_ntoa(*peer->GetIP()), peer->GetPort(),
 
1552
              inet_ntoa(_ret_sockaddr.sin_addr), ntohs(_ret_sockaddr.sin_port));
 
1553
        return EVENT_DONE;
 
1554
      }
 
1555
    case ICP_OP_MISS:
 
1556
    case ICP_OP_ERR:
 
1557
    case ICP_OP_MISS_NOFETCH:
 
1558
    case ICP_OP_DENIED:
 
1559
      {
 
1560
        Debug("icp", "ICP MISS response for Id=%d from [%s:%d]",
 
1561
              _sequence_number, inet_ntoa(*peer->GetIP()), peer->GetPort());
 
1562
        // "received_replies" is only for Peers who we expect a reply
 
1563
        //  from (Peers which are in the expected_replies_list).
 
1564
        int Id = peer->GetPeerID();
 
1565
        if (_expected_replies_list.IsBitSet(Id)) {
 
1566
          // Clear bit to note receipt of reply
 
1567
          _expected_replies_list.ClearBit(Id);
 
1568
          ++_received_replies;
 
1569
        }
 
1570
 
 
1571
        if (_received_replies < _expected_replies)
 
1572
          return EVENT_CONT;    // wait for more responses
 
1573
 
 
1574
        // Kill timeout event
 
1575
        _timeout->cancel(this);
 
1576
        _timeout = 0;
 
1577
 
 
1578
        ICP_INCREMENT_DYN_STAT(icp_query_misses_stat);
 
1579
        //
 
1580
        // All responders have returned ICP_OP_MISS.
 
1581
        // If parents exists, select one to resolve the request.
 
1582
        //
 
1583
        if (_ICPpr->GetParentPeers() > 0) {
 
1584
          // In cases where multiple parents exist, we use
 
1585
          // a round robin scheme.
 
1586
          Peer *p = NULL;
 
1587
          // try to find an UP parent, if none, return ICP_LOOKUP_FAILED
 
1588
          {
 
1589
            int i;
 
1590
            for (i = 0; i < _ICPpr->GetParentPeers(); i++) {
 
1591
              p = _ICPpr->GetNthParentPeer(0, _ICPpr->GetStartingParentPeerBias());
 
1592
              // find an UP parent
 
1593
              if (p->isUp())
 
1594
                break;
 
1595
            }
 
1596
            // if no parent is selected, then return ICP_LOOKUP_FAILED
 
1597
            if (i >= _ICPpr->GetParentPeers()) {
 
1598
              Debug("icp", "None of the %d ICP parent(s) is up", _ICPpr->GetParentPeers());
 
1599
              p = NULL;
 
1600
            }
 
1601
          }
 
1602
          if (p) {
 
1603
            _ret_sockaddr.sin_addr.s_addr = (p->GetIP())->s_addr;
 
1604
            _ret_sockaddr.sin_port = htons(((ParentSiblingPeer *) p)->GetProxyPort());
 
1605
            _ret_status = ICP_LOOKUP_FOUND;
 
1606
 
 
1607
            Debug("icp", "ICP ALL MISS(1) for Id=%d return [%s:%d]",
 
1608
                  _sequence_number, inet_ntoa(_ret_sockaddr.sin_addr), ntohs(_ret_sockaddr.sin_port));
 
1609
            return EVENT_DONE;
 
1610
          }
 
1611
        }
 
1612
        Debug("icp", "ICP ALL MISS(2) for Id=%d return [%s:%d]",
 
1613
              _sequence_number, inet_ntoa(_ret_sockaddr.sin_addr), ntohs(_ret_sockaddr.sin_port));
 
1614
        return EVENT_DONE;
 
1615
      }
 
1616
    default:
 
1617
      {
 
1618
        ICP_INCREMENT_DYN_STAT(invalid_icp_query_response_stat);
 
1619
        unsigned char x[4];
 
1620
        *(uint32_t *) & x = (uint32_t) peer->GetIP()->s_addr;
 
1621
        // coverity[uninit_use_in_call]
 
1622
        Warning("Invalid ICP response, op=%d reqno=%d ip=%d.%d.%d.%d",
 
1623
                m->h.opcode, m->h.requestno, x[0], x[1], x[2], x[3]);
 
1624
        return EVENT_CONT;      // wait for more responses
 
1625
      }
 
1626
 
 
1627
    }                           // End of switch
 
1628
  }
 
1629
}
 
1630
 
 
1631
//------------------------------------------------
 
1632
// Class ICPRequestCont static member functions
 
1633
//------------------------------------------------
 
1634
 
 
1635
// Static member function
 
1636
void
 
1637
ICPRequestCont::NetToHostICPMsg(ICPMsg_t * in, ICPMsg_t * out)
 
1638
{
 
1639
  out->h.opcode = in->h.opcode;
 
1640
  out->h.version = in->h.version;
 
1641
  out->h.msglen = ntohs(in->h.msglen);
 
1642
  out->h.requestno = ntohl(in->h.requestno);
 
1643
  out->h.optionflags = ntohl(in->h.optionflags);
 
1644
  out->h.optiondata = ntohl(in->h.optiondata);
 
1645
  out->h.shostid = ntohl(in->h.shostid);
 
1646
 
 
1647
  switch (in->h.opcode) {
 
1648
  case ICP_OP_QUERY:
 
1649
    {
 
1650
      memcpy((char *) &out->un.query.rhostid,
 
1651
             (char *) ((char *) (&in->h.shostid) + sizeof(in->h.shostid)), sizeof(out->un.query.rhostid));
 
1652
      out->un.query.rhostid = ntohl(out->un.query.rhostid);
 
1653
      out->un.query.URL = (char *) ((char *) (&in->h.shostid) + sizeof(in->h.shostid) + sizeof(out->un.query.rhostid));
 
1654
      break;
 
1655
    }
 
1656
  case ICP_OP_HIT:
 
1657
    {
 
1658
      out->un.hit.URL = (char *)((char *) (&in->h.shostid) + sizeof(in->h.shostid));
 
1659
      break;
 
1660
    }
 
1661
  case ICP_OP_MISS:
 
1662
    {
 
1663
      out->un.miss.URL = (char *)((char *) (&in->h.shostid) + sizeof(in->h.shostid));
 
1664
      break;
 
1665
    }
 
1666
  case ICP_OP_HIT_OBJ:
 
1667
    {
 
1668
      out->un.hitobj.URL = (char *)((char *) (&in->h.shostid) + sizeof(in->h.shostid));
 
1669
 
 
1670
      // strlen() is bounded since buffer in null terminated.
 
1671
      out->un.hitobj.p_objsize = (char *) (out->un.hitobj.URL + strlen(out->un.hitobj.URL));
 
1672
      memcpy((char *) &out->un.hitobj.objsize, out->un.hitobj.p_objsize, sizeof(out->un.hitobj.objsize));
 
1673
      out->un.hitobj.objsize = ntohs(out->un.hitobj.objsize);
 
1674
      out->un.hitobj.data = (char *) (out->un.hitobj.p_objsize + sizeof(out->un.hitobj.objsize));
 
1675
      break;
 
1676
    }
 
1677
  default:
 
1678
    break;
 
1679
  }
 
1680
}
 
1681
 
 
1682
int
 
1683
ICPRequestCont::BuildICPMsg(ICPopcode_t op, unsigned int seqno,
 
1684
                            int optflags, int optdata, int shostid,
 
1685
                            void *data, int datalen, struct msghdr *mhdr, struct iovec *iov, ICPMsg_t * icpmsg)
 
1686
{
 
1687
  // Build ICP message for transmission in network byte order.
 
1688
  if (op == ICP_OP_QUERY) {
 
1689
    icpmsg->un.query.rhostid = htonl(0);
 
1690
    icpmsg->un.query.URL = (char *) data;
 
1691
 
 
1692
    mhdr->msg_iov = iov;
 
1693
    mhdr->msg_iovlen = 3;
 
1694
 
 
1695
    iov[0].iov_base = (caddr_t) icpmsg;
 
1696
    iov[0].iov_len = sizeof(ICPMsgHdr_t);
 
1697
 
 
1698
    iov[1].iov_base = (caddr_t) & icpmsg->un.query.rhostid;
 
1699
    iov[1].iov_len = sizeof(icpmsg->un.query.rhostid);
 
1700
 
 
1701
    iov[2].iov_base = (caddr_t) data;
 
1702
    iov[2].iov_len = datalen;
 
1703
    icpmsg->h.msglen = htons(iov[0].iov_len + iov[1].iov_len + iov[2].iov_len);
 
1704
 
 
1705
  } else if (op == ICP_OP_HIT) {
 
1706
    icpmsg->un.hit.URL = (char *) data;
 
1707
 
 
1708
    mhdr->msg_iov = iov;
 
1709
    mhdr->msg_iovlen = 2;
 
1710
 
 
1711
    iov[0].iov_base = (caddr_t) icpmsg;
 
1712
    iov[0].iov_len = sizeof(ICPMsgHdr_t);
 
1713
 
 
1714
    iov[1].iov_base = (caddr_t) data;
 
1715
    iov[1].iov_len = datalen;
 
1716
    icpmsg->h.msglen = htons(iov[0].iov_len + iov[1].iov_len);
 
1717
 
 
1718
  } else if (op == ICP_OP_MISS) {
 
1719
    icpmsg->un.miss.URL = (char *) data;
 
1720
 
 
1721
    mhdr->msg_iov = iov;
 
1722
    mhdr->msg_iovlen = 2;
 
1723
 
 
1724
    iov[0].iov_base = (caddr_t) icpmsg;
 
1725
    iov[0].iov_len = sizeof(ICPMsgHdr_t);
 
1726
 
 
1727
    iov[1].iov_base = (caddr_t) data;
 
1728
    iov[1].iov_len = datalen;
 
1729
    icpmsg->h.msglen = htons(iov[0].iov_len + iov[1].iov_len);
 
1730
 
 
1731
  } else {
 
1732
    ink_release_assert(0);
 
1733
    return 1;                   // failed
 
1734
  }
 
1735
 
 
1736
  mhdr->msg_name = (caddr_t) 0;
 
1737
  mhdr->msg_namelen = 0;
 
1738
  // TODO: The following is just awkward
 
1739
#if !defined(linux) && !defined(freebsd) && !defined(darwin) && !defined(solaris)
 
1740
  mhdr->msg_accrights = (caddr_t) 0;
 
1741
  mhdr->msg_accrightslen = 0;
 
1742
#elif !defined(solaris)
 
1743
  mhdr->msg_control = 0;
 
1744
  mhdr->msg_controllen = 0;
 
1745
  mhdr->msg_flags = 0;
 
1746
#endif
 
1747
 
 
1748
  icpmsg->h.opcode = op;
 
1749
  icpmsg->h.version = ICP_VERSION_2;
 
1750
  icpmsg->h.requestno = htonl(seqno);
 
1751
  icpmsg->h.optionflags = htonl(optflags);
 
1752
  icpmsg->h.optiondata = htonl(optdata);
 
1753
  icpmsg->h.shostid = htonl(shostid);
 
1754
 
 
1755
  return 0;                     // Success
 
1756
}
 
1757
 
 
1758
// Static ICPRequestCont data declarations
 
1759
unsigned int
 
1760
  ICPRequestCont::ICPRequestSeqno = 1;
 
1761
Queue<ICPRequestCont> ICPRequestQueue[ICPRequestCont::ICP_REQUEST_HASH_SIZE];
 
1762
 
 
1763
// Static member function
 
1764
unsigned int
 
1765
ICPRequestCont::ICPReqSeqNumber()
 
1766
{
 
1767
  // Generate ICP request sequence numbers.  This must be unique.
 
1768
  unsigned int res = 0;
 
1769
  do {
 
1770
    res = (unsigned int) ink_atomic_increment((int *) &ICPRequestSeqno, 1);
 
1771
  } while (!res);
 
1772
 
 
1773
  return res;
 
1774
}
 
1775
 
 
1776
// Static member function
 
1777
inline int
 
1778
ICPRequestCont::ICPRequestHash(unsigned int seqno)
 
1779
{
 
1780
  // ICPRequestQueue hash
 
1781
  return seqno % ICP_REQUEST_HASH_SIZE;
 
1782
}
 
1783
 
 
1784
// Static member function
 
1785
int
 
1786
ICPRequestCont::AddICPRequest(unsigned int seqno, ICPRequestCont * r)
 
1787
{
 
1788
  // Add ICP request to ICP outstanding queue (ICPRequestQueue).
 
1789
  // return: 0 - success
 
1790
 
 
1791
  ICPRequestQueue[ICPRequestHash(seqno)].enqueue(r);
 
1792
  return 0;                     // Success
 
1793
}
 
1794
 
 
1795
// Static member function
 
1796
ICPRequestCont *
 
1797
ICPRequestCont::FindICPRequest(unsigned int seqno)
 
1798
{
 
1799
  // Find ICP request on outstanding queue with the given sequence number
 
1800
  int hash = ICPRequestHash(seqno);
 
1801
  ICPRequestCont *r;
 
1802
 
 
1803
  for (r = (ICPRequestCont *) ICPRequestQueue[hash].head; r; r = (ICPRequestCont *) r->link.next) {
 
1804
    if (r->_sequence_number == seqno)
 
1805
      return r;
 
1806
  }
 
1807
  return (ICPRequestCont *) 0;  // Not found
 
1808
}
 
1809
 
 
1810
// Static member function
 
1811
int
 
1812
ICPRequestCont::RemoveICPRequest(unsigned int seqno)
 
1813
{
 
1814
  // Remove ICP request from outstanding queue with the given
 
1815
  //  sequence number
 
1816
  // Return: 0 - success; 1 - not found
 
1817
 
 
1818
  if (!seqno) {
 
1819
    return 1;                   // Not found
 
1820
  }
 
1821
  int hash = ICPRequestHash(seqno);
 
1822
  ICPRequestCont *r;
 
1823
 
 
1824
  for (r = (ICPRequestCont *) ICPRequestQueue[hash].head; r; r = (ICPRequestCont *) r->link.next) {
 
1825
    if (r->_sequence_number == seqno) {
 
1826
      ICPRequestQueue[hash].remove(r);
 
1827
      return 0;
 
1828
    }
 
1829
  }
 
1830
  return 1;                     // Not found
 
1831
}
 
1832
 
 
1833
//------------------------------------------------------------------------
 
1834
// Class ICPProcessor member functions
 
1835
//      Central class which initializes the ICP world.
 
1836
//      Delegates incoming message processing to ICPHandlerCont
 
1837
//      and outgoing message processing to ICPRequestCont.
 
1838
//      Manages the ICP configuration database derived from TS
 
1839
//      configuration info.
 
1840
//------------------------------------------------------------------------
 
1841
 
 
1842
// Static data declarations for ICPProcessor
 
1843
void
 
1844
initialize_thread_for_icp(EThread * e)
 
1845
{
 
1846
  (void) e;
 
1847
}
 
1848
 
 
1849
ICPProcessor icpProcessorInternal;
 
1850
ICPProcessorExt icpProcessor(&icpProcessorInternal);
 
1851
 
 
1852
ICPProcessor::ICPProcessor()
 
1853
 : _l(0), _Initialized(0), _AllowIcpQueries(0),
 
1854
   _PendingIcpQueries(0), _ICPConfig(0), _ICPPeriodic(0), _ICPHandler(0),
 
1855
   _mcastCB_handler(NULL), _PeriodicEvent(0), _ICPHandlerEvent(0),
 
1856
   _nPeerList(-1), _LocalPeer(0),
 
1857
   _curSendPeer(0), _nSendPeerList(-1),
 
1858
   _curRecvPeer(0), _nRecvPeerList(-1), _curParentPeer(0), _nParentPeerList(-1), _ValidPollData(0), _last_recv_peer_bias(0)
 
1859
{
 
1860
  memset((void *)_PeerList, 0, sizeof(_PeerList[PEER_LIST_SIZE]));
 
1861
  memset((void *)_SendPeerList, 0, sizeof(_SendPeerList[SEND_PEER_LIST_SIZE]));
 
1862
  memset((void *)_RecvPeerList, 0, sizeof(_RecvPeerList[RECV_PEER_LIST_SIZE]));
 
1863
  memset((void *)_ParentPeerList, 0, sizeof(_ParentPeerList[PARENT_PEER_LIST_SIZE]));
 
1864
  memset((void *)_PeerIDtoPollIndex, 0, sizeof(_PeerIDtoPollIndex[PEER_ID_POLL_INDEX_SIZE]));
 
1865
}
 
1866
 
 
1867
ICPProcessor::~ICPProcessor()
 
1868
{
 
1869
#if defined (_WIN32)
 
1870
  if (0 < glShutdownInProgress)
 
1871
    return;
 
1872
#endif
 
1873
  if (_ICPPeriodic) {
 
1874
    MUTEX_TAKE_LOCK(_ICPPeriodic->mutex, this_ethread());
 
1875
    _PeriodicEvent->cancel();
 
1876
    Mutex_unlock(_ICPPeriodic->mutex, this_ethread());
 
1877
  }
 
1878
 
 
1879
  if (_ICPHandler) {
 
1880
    MUTEX_TAKE_LOCK(_ICPHandler->mutex, this_ethread());
 
1881
    _ICPHandlerEvent->cancel();
 
1882
    Mutex_unlock(_ICPHandler->mutex, this_ethread());
 
1883
  }
 
1884
}
 
1885
 
 
1886
void
 
1887
ICPProcessor::start()
 
1888
{
 
1889
  //*****************************************************
 
1890
  // Perform initialization actions for ICPProcessor
 
1891
  // (called at system startup)
 
1892
  //*****************************************************
 
1893
  if (_Initialized)             // Do only once
 
1894
    return;
 
1895
 
 
1896
  //
 
1897
  // Setup ICPProcessor lock, required since ICPProcessor is instantiated
 
1898
  //  as static object.
 
1899
  //
 
1900
  _l = NEW(new AtomicLock());
 
1901
 
 
1902
  //
 
1903
  // Setup custom allocators
 
1904
  //
 
1905
  // replaced with generic IOBufferBlock allocator
 
1906
  ICPHandlerCont::ICPDataBuf_IOBuffer_sizeindex = iobuffer_size_to_index(MAX_ICP_MSGSIZE, MAX_BUFFER_SIZE_INDEX);
 
1907
 
 
1908
  //
 
1909
  // Setup ICP stats callbacks
 
1910
  //
 
1911
  InitICPStatCallbacks();
 
1912
 
 
1913
  //
 
1914
  // Create ICP configuration objects
 
1915
  //
 
1916
  _ICPConfig = NEW(new ICPConfiguration());
 
1917
 
 
1918
  _mcastCB_handler = NEW(new ICPHandlerCont(this));
 
1919
  SET_CONTINUATION_HANDLER(_mcastCB_handler, (ICPHandlerContHandler) & ICPHandlerCont::TossEvent);
 
1920
 
 
1921
 
 
1922
  //
 
1923
  // Build ICP peer list and setup listen sockets
 
1924
  //
 
1925
  if (_ICPConfig->globalConfig()->ICPconfigured()) {
 
1926
    if (BuildPeerList() == 0) {
 
1927
      if (SetupListenSockets() == 0) {
 
1928
        _AllowIcpQueries = 1;   // allow receipt of queries
 
1929
      }
 
1930
    }
 
1931
  }
 
1932
  DumpICPConfig();
 
1933
 
 
1934
  //
 
1935
  // Start ICP configuration monitor (periodic continuation)
 
1936
  //
 
1937
  _ICPPeriodic = NEW(new ICPPeriodicCont(this));
 
1938
  SET_CONTINUATION_HANDLER(_ICPPeriodic, (ICPPeriodicContHandler) & ICPPeriodicCont::PeriodicEvent);
 
1939
  _PeriodicEvent = eventProcessor.schedule_every(_ICPPeriodic, HRTIME_MSECONDS(ICPPeriodicCont::PERIODIC_INTERVAL), ET_ICP);
 
1940
 
 
1941
  //
 
1942
  // Start ICP receive handler continuation
 
1943
  //
 
1944
  _ICPHandler = NEW(new ICPHandlerCont(this));
 
1945
  SET_CONTINUATION_HANDLER(_ICPHandler, (ICPHandlerContHandler) & ICPHandlerCont::PeriodicEvent);
 
1946
  _ICPHandlerEvent = eventProcessor.schedule_every(_ICPHandler,
 
1947
                                                   HRTIME_MSECONDS(ICPHandlerCont::ICP_HANDLER_INTERVAL), ET_ICP);
 
1948
  //
 
1949
  // Stale lookup data initializations
 
1950
  //
 
1951
  if (!gclient_request.valid()) {
 
1952
    gclient_request.create(HTTP_TYPE_REQUEST);
 
1953
  }
 
1954
  _Initialized = 1;
 
1955
}
 
1956
 
 
1957
Action *
 
1958
ICPProcessor::ICPQuery(Continuation * c, URL * url)
 
1959
{
 
1960
  //**************************************
 
1961
  // HTTP state machine interface to ICP
 
1962
  //**************************************
 
1963
 
 
1964
  // Build continuation to process ICP request
 
1965
  EThread *thread = this_ethread();
 
1966
  ProxyMutex *mutex = thread->mutex;
 
1967
  ICPRequestCont *rc = new(ICPRequestCont_allocator.alloc()) ICPRequestCont(this, c, url);
 
1968
 
 
1969
  ICP_INCREMENT_DYN_STAT(icp_query_requests_stat);
 
1970
  
 
1971
  rc->SetRequestStartTime();
 
1972
  SET_CONTINUATION_HANDLER(rc, (ICPRequestContHandler) & ICPRequestCont::ICPRequestEvent);
 
1973
  eventProcessor.schedule_imm(rc, ET_ICP);
 
1974
 
 
1975
  return rc->GetActionPtr();
 
1976
}
 
1977
 
 
1978
int
 
1979
ICPProcessor::BuildPeerList()
 
1980
{
 
1981
  // Returns 0 on Success
 
1982
 
 
1983
  //
 
1984
  //---------------------------------------------------------------------
 
1985
  //  We always place all allocated Peer elements onto PeerList[],
 
1986
  //  which is used to track allocated elements and validate (ip, port)
 
1987
  //  uniqueness in the ICP configuration.
 
1988
  //
 
1989
  //  All MultiCastPeer(s) link the underlying ParentSiblingPeer structures
 
1990
  //  using a singly linked list off the MultiCastPeer.
 
1991
  //
 
1992
  //  Peer elements placed onto SendPeerList[] are elements which are
 
1993
  //  the target of ICP queries.
 
1994
  //  In the case where MultiCasting is used, a pseudo peer element
 
1995
  //  (MultiCastPeer) is placed onto the SendPeerList[] to act as a place
 
1996
  //  holder for the underlying Peers.
 
1997
  //
 
1998
  //  RecvPeerList[] is the list of Peer(s) we perform reads on for
 
1999
  //  ICP messages.  In the case of MultiCast, the pseudo MultiCast peer
 
2000
  //  element (MultiCastPeer) is placed on this list.  Since we currently
 
2001
  //  funnel all unicast receives through the local peer UDP socket,
 
2002
  //  only the local peer and any pseudo MultiCastPeer structures reside
 
2003
  //  on this list.
 
2004
  //
 
2005
  //  Parent (PEER_PARENT) Peer elements are also added to ParentPeerList
 
2006
  //  which is used to select a parent in the case where all ICP queries
 
2007
  //  have returned ICP_MISS.
 
2008
  //---------------------------------------------------------------------
 
2009
  //
 
2010
  PeerConfigData *Pcfg;
 
2011
  Peer *P;
 
2012
  Peer *mcP;
 
2013
  int index;
 
2014
  int status;
 
2015
  PeerType_t type;
 
2016
 
 
2017
  //
 
2018
  // From the working copy of the ICP configuration data, build the
 
2019
  // internal Peer data structures for ICP processing.
 
2020
  // First, establish the Local Peer descriptor before processing
 
2021
  // parents and siblings.
 
2022
  //
 
2023
  Pcfg = _ICPConfig->indexToPeerConfigData(0);
 
2024
  ink_strncpy(Pcfg->_hostname, "localhost", sizeof(Pcfg->_hostname));
 
2025
  Pcfg->_ctype = PeerConfigData::CTYPE_LOCAL;
 
2026
 
 
2027
  // Get IP address for given interface
 
2028
  if (!mgmt_getAddrForIntr(GetConfig()->globalConfig()->ICPinterface(), &Pcfg->_ip_addr)) {
 
2029
    // No IP address for given interface
 
2030
    Warning("ICP interface [%s] has no IP address", GetConfig()->globalConfig()->ICPinterface());
 
2031
    REC_SignalWarning(REC_SIGNAL_CONFIG_ERROR, "ICP interface has no IP address");
 
2032
    Pcfg->_ip_addr.s_addr = 0;
 
2033
  } else {
 
2034
    Pcfg->_my_ip_addr.s_addr = Pcfg->_ip_addr.s_addr;
 
2035
  }
 
2036
  Pcfg->_proxy_port = 0;
 
2037
  Pcfg->_icp_port = GetConfig()->globalConfig()->ICPport();
 
2038
  Pcfg->_mc_member = 0;
 
2039
  Pcfg->_mc_ip_addr.s_addr = 0;
 
2040
  Pcfg->_mc_ttl = 0;
 
2041
 
 
2042
  //***************************************************
 
2043
  // Descriptor for local host, add to PeerList and
 
2044
  // RecvPeerList
 
2045
  //***************************************************
 
2046
  P = NEW(new ParentSiblingPeer(PEER_LOCAL, Pcfg, this));
 
2047
  status = AddPeer(P);
 
2048
  ink_release_assert(status);
 
2049
  status = AddPeerToRecvList(P);
 
2050
  ink_release_assert(status);
 
2051
  _LocalPeer = P;
 
2052
 
 
2053
  for (index = 1; index < MAX_DEFINED_PEERS; ++index) {
 
2054
    Pcfg = _ICPConfig->indexToPeerConfigData(index);
 
2055
    type = PeerConfigData::CTypeToPeerType_t(Pcfg->GetCType());
 
2056
    //
 
2057
    // Ignore parent and sibling entries corresponding to "localhost".
 
2058
    // This is possible in a cluster configuration where parents and
 
2059
    // siblings are cluster members.  Note that in a cluster
 
2060
    // configuration, "icp.config" is shared by all nodes.
 
2061
    //
 
2062
    if (Pcfg->GetIP()->s_addr == _LocalPeer->GetIP()->s_addr)
 
2063
      continue;                 // ignore
 
2064
 
 
2065
    if ((type == PEER_PARENT) || (type == PEER_SIBLING)) {
 
2066
 
 
2067
      if (Pcfg->MultiCastMember()) {
 
2068
        mcP = FindPeer(Pcfg->GetMultiCastIP(), Pcfg->GetICPPort());
 
2069
        if (!mcP) {
 
2070
          //*********************************
 
2071
          // Create multicast peer structure
 
2072
          //*********************************
 
2073
          mcP = NEW(new MultiCastPeer(Pcfg->GetMultiCastIP(), Pcfg->GetICPPort(), Pcfg->GetMultiCastTTL(), this));
 
2074
          status = AddPeer(mcP);
 
2075
          ink_assert(status);
 
2076
          status = AddPeerToSendList(mcP);
 
2077
          ink_assert(status);
 
2078
          status = AddPeerToRecvList(mcP);
 
2079
          ink_assert(status);
 
2080
        }
 
2081
        //*****************************
 
2082
        // Add child to MultiCast peer
 
2083
        //*****************************
 
2084
        P = NEW(new ParentSiblingPeer(type, Pcfg, this));
 
2085
        status = AddPeer(P);
 
2086
        ink_assert(status);
 
2087
        status = ((MultiCastPeer *) mcP)->AddMultiCastChild(P);
 
2088
        ink_assert(status);
 
2089
 
 
2090
      } else {
 
2091
        //*****************************
 
2092
        // Add parent/sibling peer
 
2093
        //*****************************
 
2094
        P = NEW(new ParentSiblingPeer(type, Pcfg, this));
 
2095
        status = AddPeer(P);
 
2096
        ink_assert(status);
 
2097
        status = AddPeerToSendList(P);
 
2098
        ink_assert(status);
 
2099
      }
 
2100
      //****************************************
 
2101
      // Also, add parent peers to parent list.
 
2102
      //****************************************
 
2103
      if (type == PEER_PARENT) {
 
2104
        status = AddPeerToParentList(P);
 
2105
        ink_assert(status);
 
2106
      }
 
2107
    }
 
2108
  }
 
2109
  return 0;                     // Success
 
2110
}
 
2111
 
 
2112
void
 
2113
ICPProcessor::FreePeerList()
 
2114
{
 
2115
  // Deallocate all Peer structures
 
2116
  int index;
 
2117
  for (index = 0; index < (_nPeerList + 1); ++index) {
 
2118
    if (_PeerList[index]) {
 
2119
      _PeerList[index] = 0;
 
2120
    }
 
2121
  }
 
2122
  // Reset all control data
 
2123
  _nPeerList = -1;
 
2124
  _LocalPeer = (Peer *) 0;
 
2125
  _curSendPeer = 0;
 
2126
  _nSendPeerList = -1;
 
2127
  _curRecvPeer = 0;
 
2128
  _nRecvPeerList = -1;
 
2129
  _curParentPeer = 0;
 
2130
  _nParentPeerList = -1;
 
2131
  _ValidPollData = 0;
 
2132
  _last_recv_peer_bias = 0;
 
2133
 
 
2134
  for (index = 0; index < PEER_LIST_SIZE; index++) {
 
2135
    _PeerList[index] = 0;
 
2136
  }
 
2137
  for (index = 0; index < SEND_PEER_LIST_SIZE; index++) {
 
2138
    _SendPeerList[index] = 0;
 
2139
  }
 
2140
  for (index = 0; index < RECV_PEER_LIST_SIZE; index++) {
 
2141
    _RecvPeerList[index] = 0;
 
2142
  }
 
2143
  for (index = 0; index < PARENT_PEER_LIST_SIZE; index++) {
 
2144
    _ParentPeerList[index] = 0;
 
2145
  }
 
2146
  memset((void *) _PeerIDtoPollIndex, 0, sizeof(_PeerIDtoPollIndex[PEER_ID_POLL_INDEX_SIZE]));
 
2147
}
 
2148
 
 
2149
int
 
2150
ICPProcessor::SetupListenSockets()
 
2151
{
 
2152
  int allow_null_configuration;
 
2153
 
 
2154
  if ((_ICPConfig->globalConfig()->ICPconfigured() == ICP_MODE_RECEIVE_ONLY)
 
2155
      && _ICPConfig->globalConfig()->ICPReplyToUnknownPeer()) {
 
2156
    allow_null_configuration = 1;
 
2157
  } else {
 
2158
    allow_null_configuration = 0;
 
2159
  }
 
2160
 
 
2161
  // Returns 0 on Success.
 
2162
 
 
2163
  //
 
2164
  // Perform some basic sanity checks on the ICP configuration.
 
2165
  //
 
2166
  if (!_LocalPeer) {
 
2167
    Warning("ICP setup, no defined local Peer");
 
2168
    REC_SignalWarning(REC_SIGNAL_CONFIG_ERROR, "ICP setup, no defined local Peer");
 
2169
    return 1;                   // Failed
 
2170
  }
 
2171
 
 
2172
  if (GetSendPeers() == 0) {
 
2173
    if (!allow_null_configuration) {
 
2174
      Warning("ICP setup, no defined send Peer(s)");
 
2175
      REC_SignalWarning(REC_SIGNAL_CONFIG_ERROR, "ICP setup, no defined send Peer(s)");
 
2176
      return 1;                 // Failed
 
2177
    }
 
2178
  }
 
2179
  if (GetRecvPeers() == 0) {
 
2180
    if (!allow_null_configuration) {
 
2181
      Warning("ICP setup, no defined receive Peer(s)");
 
2182
      REC_SignalWarning(REC_SIGNAL_CONFIG_ERROR, "ICP setup, no defined receive Peer(s)");
 
2183
      return 1;                 // Failed
 
2184
    }
 
2185
  }
 
2186
  //
 
2187
  // Establish the required sockets for elements on the PeerList[].
 
2188
  //
 
2189
  Peer *P;
 
2190
  int status;
 
2191
  int index;
 
2192
  for (index = 0; index < (_nPeerList + 1); ++index) {
 
2193
    if ((P = _PeerList[index])) {
 
2194
 
 
2195
      if ((P->GetType() == PEER_PARENT)
 
2196
          || (P->GetType() == PEER_SIBLING)) {
 
2197
        ParentSiblingPeer *pPS = (ParentSiblingPeer *) P;
 
2198
 
 
2199
        pPS->GetChan()->setRemote(pPS->GetIP()->s_addr, pPS->GetPort());
 
2200
 
 
2201
      } else if (P->GetType() == PEER_MULTICAST) {
 
2202
        MultiCastPeer *pMC = (MultiCastPeer *) P;
 
2203
        ink_assert(_mcastCB_handler != NULL);
 
2204
        status = pMC->GetSendChan()->setup_mc_send(pMC->GetIP()->s_addr, pMC->GetPort(), _LocalPeer->GetIP()->s_addr,
 
2205
#ifdef _WIN32
 
2206
                                                   _LocalPeer->GetPort(),
 
2207
#else
 
2208
                                                   0,
 
2209
#endif
 
2210
                                                   NON_BLOCKING, pMC->GetTTL(), DISABLE_MC_LOOPBACK, _mcastCB_handler);
 
2211
        if (status) {
 
2212
          unsigned char x[4], y[4];
 
2213
          *(uint32_t *) & x = (uint32_t) pMC->GetIP()->s_addr;
 
2214
          *(uint32_t *) & y = (uint32_t) _LocalPeer->GetIP()->s_addr;
 
2215
          // coverity[uninit_use_in_call]
 
2216
          Warning("ICP MC send setup failed, res=%d, ip=%d.%d.%d.%d:%d bind_ip=%d.%d.%d.%d:%d",
 
2217
                  status, x[0], x[1], x[2], x[3], pMC->GetPort(), y[0], y[1], y[2], y[3], 0);
 
2218
          REC_SignalWarning(REC_SIGNAL_CONFIG_ERROR, "ICP MC send setup failed");
 
2219
          return 1;             // Failed
 
2220
        }
 
2221
 
 
2222
        status = pMC->GetRecvChan()->setup_mc_receive(pMC->GetIP()->s_addr,
 
2223
                                                      pMC->GetPort(),
 
2224
                                                      NON_BLOCKING, pMC->GetSendChan(), _mcastCB_handler);
 
2225
        if (status) {
 
2226
          unsigned char x[4];
 
2227
          *(uint32_t *) & x = (uint32_t) pMC->GetIP()->s_addr;
 
2228
          // coverity[uninit_use_in_call]
 
2229
          Warning("ICP MC recv setup failed, res=%d, ip=%d.%d.%d.%d:%d",
 
2230
                  status, x[0], x[1], x[2], x[3], pMC->GetPort());
 
2231
          REC_SignalWarning(REC_SIGNAL_CONFIG_ERROR, "ICP MC recv setup failed");
 
2232
          return 1;             // Failed
 
2233
        }
 
2234
      }
 
2235
    }
 
2236
  }
 
2237
  //
 
2238
  // Setup the socket for the local host.
 
2239
  // We funnel all unicast sends and receives through
 
2240
  // the local peer UDP socket.
 
2241
  //
 
2242
  ParentSiblingPeer *pPS = (ParentSiblingPeer *) ((Peer *) _LocalPeer);
 
2243
 
 
2244
  pPS->GetChan()->setRemote(pPS->GetIP()->s_addr, pPS->GetPort());
 
2245
  return 0;                     // Success
 
2246
}
 
2247
 
 
2248
void
 
2249
ICPProcessor::ShutdownListenSockets()
 
2250
{
 
2251
  //
 
2252
  // Close all open sockets for elements on the PeerList[]
 
2253
  //
 
2254
  ink_assert(!PendingQuery());
 
2255
  Peer *P;
 
2256
 
 
2257
  int index;
 
2258
  for (index = 0; index < (_nPeerList + 1); ++index) {
 
2259
    if ((P = _PeerList[index])) {
 
2260
      if (P->GetType() == PEER_LOCAL) {
 
2261
        ParentSiblingPeer *pPS = (ParentSiblingPeer *) P;
 
2262
        (void) pPS->GetChan()->close();
 
2263
 
 
2264
      } else if (P->GetType() == PEER_MULTICAST) {
 
2265
        MultiCastPeer *pMC = (MultiCastPeer *) P;
 
2266
        (void) pMC->GetSendChan()->close();
 
2267
        (void) pMC->GetRecvChan()->close();
 
2268
      }
 
2269
    }
 
2270
  }
 
2271
}
 
2272
 
 
2273
int
 
2274
ICPProcessor::Reconfigure(int global_config_changed, int peer_config_changed)
 
2275
{
 
2276
  // Returns 0 on Success
 
2277
 
 
2278
  NOWARN_UNUSED(global_config_changed);
 
2279
  NOWARN_UNUSED(peer_config_changed);
 
2280
  //
 
2281
  // At this point, ICP requests processing is disabled and
 
2282
  // no pending ICP requests exist.
 
2283
  //
 
2284
  ink_assert(_ICPConfig->HaveLock());
 
2285
  ink_assert(!AllowICPQueries());
 
2286
  ink_assert(!PendingQuery());
 
2287
  //
 
2288
  // Shutdown and deallocate all structures associated with the
 
2289
  // current configuration.
 
2290
  //
 
2291
  ShutdownListenSockets();
 
2292
  FreePeerList();
 
2293
  //
 
2294
  // Copy the new configuration into the working copy and
 
2295
  // rebuild all associated structures.
 
2296
  //
 
2297
  _ICPConfig->UpdateGlobalConfig();
 
2298
  _ICPConfig->UpdatePeerConfig();
 
2299
 
 
2300
  int status;
 
2301
  if ((status = BuildPeerList()) == 0) {
 
2302
    status = SetupListenSockets();
 
2303
  }
 
2304
  DumpICPConfig();
 
2305
  return status;
 
2306
}
 
2307
 
 
2308
ICPProcessor::ReconfigState_t
 
2309
  ICPProcessor::ReconfigureStateMachine(ReconfigState_t s, int gconfig_changed, int pconfig_changed)
 
2310
{
 
2311
  //*****************************************************************
 
2312
  // State machine which performs the ICP reconfiguration actions.
 
2313
  // Defined states are as follows:
 
2314
  //  1) (RC_RECONFIG) disable ICP, reconfigure if no request pending,
 
2315
  //     else delay and retry.  Reconfigure and if success move to
 
2316
  //     RC_ENABLE_ICP else RC_DONE.
 
2317
  //  2) (RC_ENABLE_ICP) enable ICP, free ICP configuration lock.
 
2318
  //  3) (RC_DONE) free ICP configuration lock.
 
2319
  //*****************************************************************
 
2320
  ink_assert(_ICPConfig->HaveLock());
 
2321
  int reconfig_status;
 
2322
 
 
2323
  while (1) {
 
2324
 
 
2325
    switch (s) {
 
2326
    case RC_RECONFIG:
 
2327
      {
 
2328
        if (!Lock())
 
2329
          return RC_RECONFIG;   // Unable to get lock, try again
 
2330
 
 
2331
        if (PendingQuery()) {
 
2332
          DisableICPQueries();  // disable ICP processing
 
2333
          Unlock();
 
2334
          CancelPendingReads();
 
2335
          return RC_RECONFIG;   // Pending requests, delay and retry
 
2336
 
 
2337
        } else {
 
2338
          DisableICPQueries();  // disable ICP processing
 
2339
          Unlock();
 
2340
          // No pending ICP queries, perform reconfiguration
 
2341
          reconfig_status = Reconfigure(gconfig_changed, pconfig_changed);
 
2342
 
 
2343
          if (reconfig_status == 0) {
 
2344
            s = RC_ENABLE_ICP;  // reconfig OK, enable ICP
 
2345
          } else {
 
2346
            s = RC_DONE;        // reconfig failed, do not enable ICP
 
2347
          }
 
2348
          break;                // move to next state
 
2349
        }
 
2350
      }
 
2351
 
 
2352
    case RC_ENABLE_ICP:
 
2353
      {
 
2354
        if (!Lock())
 
2355
          return RC_ENABLE_ICP; // Unable to get lock, try again
 
2356
 
 
2357
        EnableICPQueries();     // Enable ICP processing
 
2358
        Unlock();
 
2359
 
 
2360
        s = RC_DONE;
 
2361
        break;                  // move to next state
 
2362
      }
 
2363
 
 
2364
    case RC_DONE:
 
2365
      {
 
2366
        // Release configuration lock
 
2367
        _ICPConfig->Unlock();
 
2368
        return RC_DONE;         // Reconfiguration complete
 
2369
      }
 
2370
    default:
 
2371
      {
 
2372
        ink_release_assert(0);  // Should never happen
 
2373
      }
 
2374
 
 
2375
    }                           // End of switch
 
2376
 
 
2377
  }                             // End of while
 
2378
#if !defined(_WIN32) && !defined(__GNUC__)
 
2379
_exit_while:                   // fix DEC warnings
 
2380
#endif
 
2381
  return RC_DONE;
 
2382
}
 
2383
 
 
2384
void
 
2385
ICPProcessor::CancelPendingReads()
 
2386
{
 
2387
  // Cancel pending ICP read by sending a bogus message to
 
2388
  //  the local ICP port.
 
2389
 
 
2390
  ICPRequestCont *r = new(ICPRequestCont_allocator.alloc())
 
2391
    ICPRequestCont(this, NULL, NULL);
 
2392
  SET_CONTINUATION_HANDLER(r, (ICPRequestContHandler) & ICPRequestCont::NopICPRequestEvent);
 
2393
  r->mutex = new_ProxyMutex();
 
2394
 
 
2395
  // TODO: Check return value?
 
2396
  ICPRequestCont::BuildICPMsg(ICP_OP_HIT, 0, 0 /* optflags */ , 0 /* optdata */ , 0 /* shostid */ ,
 
2397
                                       (void *) 0, 0, &r->_sendMsgHdr, r->_sendMsgIOV, &r->_ICPmsg);
 
2398
  r->_sendMsgHdr.msg_iovlen = 1;
 
2399
  r->_ICPmsg.h.version = ~r->_ICPmsg.h.version; // bogus message
 
2400
 
 
2401
  Peer *lp = GetLocalPeer();
 
2402
  r->_sendMsgHdr.msg_name = (caddr_t) & (lp->GetSendChan())->sa;
 
2403
  r->_sendMsgHdr.msg_namelen = sizeof((lp->GetSendChan())->sa);
 
2404
  udpNet.sendmsg_re(r, r, lp->GetSendFD(), &r->_sendMsgHdr);
 
2405
}
 
2406
 
 
2407
Peer *
 
2408
ICPProcessor::GenericFindListPeer(struct in_addr *ip, int port, int validListItems, Ptr<Peer> *List)
 
2409
{
 
2410
  Peer *P;
 
2411
  for (int n = 0; n < validListItems; ++n) {
 
2412
    if ((P = List[n])) {
 
2413
      if ((P->GetIP()->s_addr == ip->s_addr)
 
2414
          && ((port == -1) || (P->GetPort() == port)))
 
2415
        return P;
 
2416
    }
 
2417
  }
 
2418
  return (Peer *) 0;
 
2419
}
 
2420
 
 
2421
Peer *
 
2422
ICPProcessor::FindPeer(struct in_addr * ip, int port)
 
2423
{
 
2424
  // Find (Peer *) with the given (ip,port) on the global list (PeerList)
 
2425
  return GenericFindListPeer(ip, port, (_nPeerList + 1), _PeerList);
 
2426
}
 
2427
 
 
2428
Peer *
 
2429
ICPProcessor::FindSendListPeer(struct in_addr * ip, int port)
 
2430
{
 
2431
  // Find (Peer *) with the given (ip,port) on the
 
2432
  //  scheduler list (SendPeerList)
 
2433
  return GenericFindListPeer(ip, port, (_nSendPeerList + 1), _SendPeerList);
 
2434
}
 
2435
 
 
2436
Peer *
 
2437
ICPProcessor::FindRecvListPeer(struct in_addr * ip, int port)
 
2438
{
 
2439
  // Find (Peer *) with the given (ip,port) on the
 
2440
  //  receive list (RecvPeerList)
 
2441
  return GenericFindListPeer(ip, port, (_nRecvPeerList + 1), _RecvPeerList);
 
2442
}
 
2443
 
 
2444
int
 
2445
ICPProcessor::AddPeer(Peer * P)
 
2446
{
 
2447
  // Add (Peer *) to the global list (PeerList).  Make sure (ip,port) is
 
2448
  //  unique.
 
2449
  // Returns 1 - added; 0 - Not added
 
2450
 
 
2451
  //
 
2452
  // Make sure no duplicate exists
 
2453
  //
 
2454
  if (FindPeer(P->GetIP(), P->GetPort())) {
 
2455
    unsigned char x[4];
 
2456
    *(uint32_t *) & x = (uint32_t) P->GetIP()->s_addr;
 
2457
    // coverity[uninit_use_in_call]
 
2458
    Warning("bad icp.config, multiple peer definitions for ip=%d.%d.%d.%d", x[0], x[1], x[2], x[3]);
 
2459
    REC_SignalWarning(REC_SIGNAL_CONFIG_ERROR, "bad icp.config, multiple peer definitions");
 
2460
 
 
2461
    return 0;                   // Not added
 
2462
  } else {
 
2463
    // Valid entry
 
2464
    if (_nPeerList + 1 < PEER_LIST_SIZE) {
 
2465
      _nPeerList++;
 
2466
      _PeerList[_nPeerList] = P;
 
2467
      P->SetPeerID(_nPeerList);
 
2468
      return 1;                 // Added
 
2469
    } else {
 
2470
      return 0;                 // Not added
 
2471
    }
 
2472
  }
 
2473
}
 
2474
 
 
2475
int
 
2476
ICPProcessor::AddPeerToRecvList(Peer * P)
 
2477
{
 
2478
  // Add (Peer *) to the listen list (RecvPeerList).
 
2479
  //  Make sure (ip,port) is unique.
 
2480
  // Returns 1 - added; 0 - Not added
 
2481
 
 
2482
  // Assert that no duplicate exists
 
2483
  ink_assert(FindRecvListPeer(P->GetIP(), P->GetPort()) == 0);
 
2484
 
 
2485
  if (_nRecvPeerList + 1 < RECV_PEER_LIST_SIZE) {
 
2486
    _nRecvPeerList++;
 
2487
    _RecvPeerList[_nRecvPeerList] = P;
 
2488
    return 1;                   // Added
 
2489
  } else {
 
2490
    return 0;                   // Not added
 
2491
  }
 
2492
}
 
2493
 
 
2494
int
 
2495
ICPProcessor::AddPeerToSendList(Peer * P)
 
2496
{
 
2497
  // Add (Peer *) to the scheduler list (SendPeerList).
 
2498
  //  Make sure (ip,port) is unique.
 
2499
  // Returns 1 - added; 0 - Not added
 
2500
 
 
2501
  // Assert that no duplicate exists
 
2502
  ink_assert(FindSendListPeer(P->GetIP(), P->GetPort()) == 0);
 
2503
 
 
2504
  if (_nSendPeerList + 1 < SEND_PEER_LIST_SIZE) {
 
2505
    _nSendPeerList++;
 
2506
    _SendPeerList[_nSendPeerList] = P;
 
2507
    return 1;                   // Added
 
2508
  } else {
 
2509
    return 0;                   // Not added
 
2510
  }
 
2511
}
 
2512
 
 
2513
int
 
2514
ICPProcessor::AddPeerToParentList(Peer * P)
 
2515
{
 
2516
  // Add (Peer *) to the parent list (ParentPeerList).
 
2517
  // Returns 1 - added; 0 - Not added
 
2518
 
 
2519
  if (_nParentPeerList + 1 < PARENT_PEER_LIST_SIZE) {
 
2520
    _nParentPeerList++;
 
2521
    _ParentPeerList[_nParentPeerList] = P;
 
2522
    return 1;                   // Added
 
2523
  } else {
 
2524
    return 0;                   // Not added
 
2525
  }
 
2526
}
 
2527
 
 
2528
// End of ICP.cc