3
A brief file description
5
@section license License
7
Licensed to the Apache Software Foundation (ASF) under one
8
or more contributor license agreements. See the NOTICE file
9
distributed with this work for additional information
10
regarding copyright ownership. The ASF licenses this file
11
to you under the Apache License, Version 2.0 (the
12
"License"); you may not use this file except in compliance
13
with the License. You may obtain a copy of the License at
15
http://www.apache.org/licenses/LICENSE-2.0
17
Unless required by applicable law or agreed to in writing, software
18
distributed under the License is distributed on an "AS IS" BASIS,
19
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20
See the License for the specific language governing permissions and
21
limitations under the License.
24
#include "ink_unused.h" /* MAGIC_EDITING_TAG */
27
/****************************************************************************
32
****************************************************************************/
36
#include "P_EventSystem.h"
39
#include "MgmtUtils.h"
40
#include "P_RecProcess.h"
42
#include "ICPProcessor.h"
44
#include "logging/Log.h"
45
#include "logging/LogAccessICP.h"
46
#include "BaseManager.h"
50
extern long glShutdownInProgress;
53
extern CacheLookupHttpConfig global_cache_lookup_config;
54
HTTPHdr gclient_request;
56
//****************************************************************************
60
// ICP.h -- All ICP class definitions.
61
// ICPlog.h -- ICP log object for logging system
62
// ICP.cc -- Incoming/outgoing ICP request and ICP configuration
63
// data base management.
64
// ICPConfig.cc -- ICP interface to Traffic Server configuration
65
// management, member functions for ICPlog (object
66
// passed to logging system) along with
67
// miscellaneous support routines.
68
// ICPevents.h -- Event definitions specific to ICP.
69
// ICPProcessor.h -- ICP external interface for other subsystems.
70
// External subsystems only need to include this
72
// ICPProcessor.cc -- ICP external interface implementation.
73
// ICPStats.cc -- ICP statistic callback registration.
78
// ICPConfigData -- Manages global ICP data from the TS configuration
80
// PeerConfigData -- Manages ICP peer data from the TS configuration
82
// ICPConfigUpdateCont -- Used by
83
// ICPConfiguration::icp_config_change_callback()
84
// to retry callout after a delay in cases where
85
// we cannot acquire the configuration lock.
86
// ICPConfiguration -- Overall manager of ICP configuration from TS
87
// configuration. Acts as interface and uses
88
// ICPConfigData and PeerConfigData to implement
89
// actions. Also fields/processes TS configuration
90
// callouts for "icp.config" changes. ICP classes only
91
// see ICPConfiguration when dealing with TS
92
// configuration info.
94
// Peer (base class) -- abstract base class
95
// ParentSiblingPeer : Peer -- ICP object describing parent/sibling
96
// peer which is initialized from the
97
// TS configuration data.
98
// MultiCastPeer : Peer -- ICP object describing MultiCast peer.
99
// Object is initialized from the TS
100
// configuration data.
102
// BitMap -- Generic bit map management class
104
// ICPProcessor -- Central class which starts all periodic events
105
// and maintains ICP configuration database. Delegates
106
// incoming data processing to ICPHandlerCont and
107
// outgoing data processing to ICPRequestCont. Implements
108
// reconfiguration actions and query requests from the
109
// external interface.
111
// ICPRequestCont -- Implements the state machine which processes
112
// locally generated ICP queries. Generates message
113
// queries and processes query responses. Responses
114
// received via callout from ICPPeerReadCont.
116
// PeriodicCont (base class) -- abstract base class
117
// ICPPeriodicCont : PeriodicCont -- Periodic which looks for ICP
118
// configuration changes sent by the Traffic Server
119
// configuration manager, and initiates ICP reconfiguration
120
// in the event we have a valid configuration change via
121
// ICPProcessor::ReconfigureStateMachine().
123
// ICPHandlerCont : PeriodicCont -- Periodic which monitors incoming
124
// ICP sockets and starts processing of the incoming ICP data.
126
// ICPPeerReadCont -- Implements the incoming data state machine.
127
// Processes remote ICP query requests and passes query
128
// responses to ICPRequestCont via a callout.
129
// ICPlog -- Logging object which encapsulates ICP query info required
130
// by the new logging subsystem to produce squid access log
131
// data for ICP queries.
133
//****************************************************************************
135
// ICP is integrated into HTTP miss processing as follows.
137
// if (HTTP Traffic Server Miss) {
138
// if (proxy.config.icp.enabled) {
139
// Status = QueryICP(URL, &target_ip);
140
// if (Status == ICP_HIT)
141
// Issue Http Request to (target_ip, proxy_port);
143
// if (proxy.config.http.parent_proxy_routing_enable) {
144
// Issue Http Request to (proxy.config.http.parent_proxy_hostname,
145
// proxy.config.http.parent_proxy_port)
148
// Issue Http Request to Origin Server
151
//****************************************************************************
153
// VC++ 5.0 is rather picky
154
typedef int (ICPPeerReadCont::*ICPPeerReadContHandler) (int, void *);
155
typedef int (ICPPeriodicCont::*ICPPeriodicContHandler) (int, void *);
156
typedef int (ICPHandlerCont::*ICPHandlerContHandler) (int, void *);
157
typedef int (ICPRequestCont::*ICPRequestContHandler) (int, void *);
159
// Plugin freshness function
160
PluginFreshnessCalcFunc pluginFreshnessCalcFunc = (PluginFreshnessCalcFunc) NULL;
162
//---------------------------------------
163
// Class ICPHandlerCont member functions
164
// Deal with incoming ICP data
165
//---------------------------------------
167
// Static data declarations
168
//Allocator *ICPHandlerCont::IncomingICPDataBuf;
169
int64_t ICPHandlerCont::ICPDataBuf_IOBuffer_sizeindex;
170
static ClassAllocator <ICPPeerReadCont::PeerReadData>PeerReadDataAllocator("PeerReadDataAllocator");
171
static ClassAllocator<ICPPeerReadCont> ICPPeerReadContAllocator("ICPPeerReadContAllocator");
173
static Action *default_action = NULL;
176
ICPHandlerCont::ICPHandlerCont(ICPProcessor * icpP)
181
// do nothing continuation handler
183
ICPHandlerCont::TossEvent(int event, Event * e)
185
NOWARN_UNUSED(event);
191
ICPHandlerCont::PeriodicEvent(int event, Event * e)
193
NOWARN_UNUSED(event);
195
int n_peer, valid_peers;
198
// Periodic handler which initiates incoming message processing
199
// on the defined peers.
201
valid_peers = _ICPpr->GetRecvPeers();
203
// get peer info from the completionEvent token.
208
// start read I/Os on peers which don't have outstanding I/Os
209
for (n_peer = 0; n_peer < valid_peers; ++n_peer) {
210
P = _ICPpr->GetNthRecvPeer(n_peer, _ICPpr->GetLastRecvPeerBias());
211
if (!P || (P && !P->IsOnline()))
213
if (P->shouldStartRead()) {
215
///////////////////////////////////////////
216
// Setup state machine
217
///////////////////////////////////////////
218
ICPPeerReadCont *s = ICPPeerReadContAllocator.alloc();
219
int local_lookup = _ICPpr->GetConfig()->globalConfig()->ICPLocalCacheLookup();
221
s->init(_ICPpr, P, local_lookup);
222
RECORD_ICP_STATE_CHANGE(s, event, ICPPeerReadCont::READ_ACTIVE);
224
///////////////////////////////////////////
226
///////////////////////////////////////////
227
s->handleEvent(EVENT_INTERVAL, (Event *) 0);
234
ink_release_assert(!"unexpected event");
241
//***************************************************************************
242
// Nested Class PeerReadData member functions
243
// Used by ICPPeerReadCont to encapsulate the data required by
244
// PeerReadStateMachine
245
//***************************************************************************
246
ICPPeerReadCont::PeerReadData::PeerReadData()
252
ICPPeerReadCont::PeerReadData::init()
257
_next_state = READ_ACTIVE;
258
_cache_lookup_local = 0;
262
_cachelookupURL.clear();
269
memset((void *) &_sender, 0, sizeof(_sender));
272
ICPPeerReadCont::PeerReadData::~PeerReadData()
278
ICPPeerReadCont::PeerReadData::reset(int full_reset)
289
if (_cachelookupURL.valid()) {
290
_cachelookupURL.destroy();
294
//***************************************************************************
296
//------------------------------------------------------------------------
297
// ICPPeerReadCont -- ICP incoming message processing state machine
298
//------------------------------------------------------------------------
299
ICPPeerReadCont::ICPPeerReadCont():Continuation(0), _object_vc(NULL), _object_read(NULL),
300
_cache_req_hdr_heap_handle(NULL), _cache_resp_hdr_heap_handle(NULL), _ICPpr(NULL), _state(NULL),
301
_start_time(0), _recursion_depth(0)
306
ICPPeerReadCont::init(ICPProcessor * ICPpr, Peer * p, int lookup_local)
308
PeerReadData *s = PeerReadDataAllocator.alloc();
310
s->_start_time = ink_get_hrtime();
312
s->_next_state = READ_ACTIVE;
313
s->_cache_lookup_local = lookup_local;
314
SET_HANDLER((ICPPeerReadContHandler) & ICPPeerReadCont::ICPPeerReadEvent);
317
_recursion_depth = -1;
320
_cache_req_hdr_heap_handle = NULL;
321
_cache_resp_hdr_heap_handle = NULL;
322
mutex = new_ProxyMutex();
325
ICPPeerReadCont::~ICPPeerReadCont()
327
reset(1); // Full reset
331
ICPPeerReadCont::reset(int full_reset)
335
this->_state->reset(full_reset);
336
PeerReadDataAllocator.free(this->_state);
338
if (_cache_req_hdr_heap_handle) {
339
xfree(_cache_req_hdr_heap_handle);
340
_cache_req_hdr_heap_handle = NULL;
342
if (_cache_resp_hdr_heap_handle) {
343
xfree(_cache_resp_hdr_heap_handle);
344
_cache_resp_hdr_heap_handle = NULL;
349
ICPPeerReadCont::ICPPeerReadEvent(int event, Event * e)
353
case EVENT_IMMEDIATE:
357
case NET_EVENT_DATAGRAM_WRITE_COMPLETE:
358
case NET_EVENT_DATAGRAM_READ_COMPLETE:
359
case NET_EVENT_DATAGRAM_READ_ERROR:
360
case NET_EVENT_DATAGRAM_WRITE_ERROR:
362
ink_assert((event != NET_EVENT_DATAGRAM_READ_COMPLETE)
363
|| (_state->_next_state == READ_DATA_DONE));
364
ink_assert((event != NET_EVENT_DATAGRAM_WRITE_COMPLETE)
365
|| (_state->_next_state == WRITE_DONE));
367
ink_release_assert(this == (ICPPeerReadCont *)
368
completionUtil::getHandle(e));
371
case CACHE_EVENT_LOOKUP_FAILED:
372
case CACHE_EVENT_LOOKUP:
374
ink_assert(_state->_next_state == AWAITING_CACHE_LOOKUP_RESPONSE);
379
ink_release_assert(!"unexpected event");
383
// Front end to PeerReadStateMachine(), invoked by Event subsystem.
384
if (PeerReadStateMachine(_state, e) == EVENT_CONT) {
385
eventProcessor.schedule_in(this, RETRY_INTERVAL, ET_ICP);
388
} else if (_state->_next_state == READ_PROCESSING_COMPLETE) {
389
_state->_peer->cancelRead();
390
this->reset(1); // Full reset
391
ICPPeerReadContAllocator.free(this);
400
ICPPeerReadCont::StaleCheck(int event, Event * e)
403
ink_release_assert(mutex->thread_holding == this_ethread());
405
Debug("icp-stale", "Stale check res=%d for id=%d, [%s] from [%s:%d]",
406
event, _state->_rICPmsg->h.requestno,
407
_state->_rICPmsg->un.query.URL, inet_ntoa(_state->_sender.sin_addr), ntohs(_state->_sender.sin_port));
410
case ICP_STALE_OBJECT:
412
_state->_queryResult = CACHE_EVENT_LOOKUP_FAILED;
415
case ICP_FRESH_OBJECT:
417
_state->_queryResult = CACHE_EVENT_LOOKUP;
422
Debug("icp-stale", "ICPPeerReadCont::StaleCheck: Invalid Event %d\n", event);
423
_state->_queryResult = CACHE_EVENT_LOOKUP_FAILED;
427
_object_vc->do_io(VIO::CLOSE);
429
SET_HANDLER((ICPPeerReadContHandler) & ICPPeerReadCont::ICPPeerReadEvent);
430
return handleEvent(_state->_queryResult, 0);
434
ICPPeerReadCont::ICPPeerQueryEvent(int event, Event * e)
437
Debug("icp", "Remote Query lookup res=%d for id=%d, [%s] from [%s:%d]",
438
event, _state->_rICPmsg->h.requestno,
439
_state->_rICPmsg->un.query.URL, inet_ntoa(_state->_sender.sin_addr), ntohs(_state->_sender.sin_port));
440
if (pluginFreshnessCalcFunc) {
442
case CACHE_EVENT_OPEN_READ:
444
_object_vc = (CacheVConnection *) e;
445
SET_HANDLER((ICPPeerReadContHandler) & ICPPeerReadCont::StaleCheck);
446
_object_vc->get_http_info(&_object_read);
447
(*pluginFreshnessCalcFunc) ((void *) this);
450
case CACHE_EVENT_OPEN_READ_FAILED:
452
event = CACHE_EVENT_LOOKUP_FAILED;
460
_state->_queryResult = event;
461
SET_HANDLER((ICPPeerReadContHandler) & ICPPeerReadCont::ICPPeerReadEvent);
462
return handleEvent(event, e);
466
ICPPeerReadCont::ICPPeerQueryCont(int event, Event * e)
468
NOWARN_UNUSED(event);
473
// Perform lookup()/open_read() on behalf of PeerReadStateMachine()
475
((char *) _state->_rICPmsg)[MAX_ICP_MSGSIZE - 1] = 0; // null terminate
476
_state->_cachelookupURL.create(NULL);
477
const char *qurl = (const char *) _state->_rICPmsg->un.query.URL;
478
_state->_cachelookupURL.parse(qurl, strlen(qurl));
479
Debug("icp", "Remote Query for id=%d, [%s] from [%s:%d]",
480
_state->_rICPmsg->h.requestno,
481
_state->_rICPmsg->un.query.URL, inet_ntoa(_state->_sender.sin_addr), ntohs(_state->_sender.sin_port));
483
SET_HANDLER((ICPPeerReadContHandler) & ICPPeerReadCont::ICPPeerQueryEvent);
484
if (_state->_rICPmsg->un.query.URL && *_state->_rICPmsg->un.query.URL) {
485
_state->_queryResult = ~CACHE_EVENT_LOOKUP_FAILED;
486
_start_time = ink_get_hrtime();
487
if (pluginFreshnessCalcFunc && _ICPpr->GetConfig()->globalConfig()->ICPStaleLookup()) {
488
//////////////////////////////////////////////////////////////
489
// Note: _cache_lookup_local is ignored in this case, since
490
// cache clustering is not used with stale lookup.
491
//////////////////////////////////////////////////////////////
492
a = cacheProcessor.open_read(this, &_state->_cachelookupURL,
493
&gclient_request, &global_cache_lookup_config, (time_t) 0);
495
a = cacheProcessor.lookup(this, &_state->_cachelookupURL, _state->_cache_lookup_local);
500
if (a == ACTION_RESULT_DONE) {
501
return EVENT_DONE; // callback complete
502
} else if (a == ACTION_IO_ERROR) {
503
handleEvent(CACHE_EVENT_LOOKUP_FAILED, 0);
504
return EVENT_DONE; // callback complete
506
return EVENT_CONT; // callback pending
509
// Null URL, return failed lookup
510
handleEvent(CACHE_EVENT_LOOKUP_FAILED, 0);
511
return EVENT_DONE; // callback done
517
AutoReference(int *cnt)
530
ICPPeerReadCont::PeerReadStateMachine(PeerReadData * s, Event * e)
532
AutoReference l(&_recursion_depth);
533
//-----------------------------------------------------------
534
// State machine to process ICP data received on UDP socket
535
//-----------------------------------------------------------
536
MUTEX_TRY_LOCK(lock, this->mutex, this_ethread());
538
// we didn't get the lock, so we don't need to unlock it
539
// coverity[missing_unlock]
540
return EVENT_CONT; // try again later
543
while (1) { // loop forever
545
switch (s->_next_state) {
548
ink_release_assert(_recursion_depth == 0);
550
return EVENT_CONT; // unable to get lock, try again later
552
bool valid_peer = (_ICPpr->IdToPeer(s->_peer->GetPeerID()) == s->_peer);
554
if (valid_peer && _ICPpr->AllowICPQueries()
555
&& _ICPpr->GetConfig()->globalConfig()->ICPconfigured()) {
557
// Note pending incoming ICP request or response
558
_ICPpr->IncPendingQuery();
561
s->_next_state = READ_DATA;
562
RECORD_ICP_STATE_CHANGE(s, 0, READ_DATA);
563
break; // move to next_state
568
// ICP NOT enabled, do nothing
569
s->_next_state = READ_PROCESSING_COMPLETE;
570
RECORD_ICP_STATE_CHANGE(s, 0, READ_PROCESSING_COMPLETE);
574
#if !defined(_WIN32) && !defined(__GNUC__)
575
_end_case_read_active: // fix DEC warnings
577
ink_release_assert(0); // Should never happen
581
ink_release_assert(_recursion_depth == 0);
583
// Assumption of one outstanding read per peer...
584
// Setup read from FD
585
ink_assert(s->_peer->buf == NULL);
586
Ptr<IOBufferBlock> buf = s->_peer->buf = new_IOBufferBlock();
587
buf->alloc(ICPHandlerCont::ICPDataBuf_IOBuffer_sizeindex);
588
s->_peer->fromaddrlen = sizeof(s->_peer->fromaddr);
589
buf->fill(sizeof(ICPMsg_t)); // reserve space for decoding
590
char *be = buf->buf_end() - 1;
591
be[0] = 0; // null terminate buffer
592
s->_next_state = READ_DATA_DONE;
593
RECORD_ICP_STATE_CHANGE(s, 0, READ_DATA_DONE);
594
ink_assert(s->_peer->readAction == NULL);
595
Action *a = s->_peer->RecvFrom_re(this, this, buf,
596
buf->write_avail() - 1,
599
&s->_peer->fromaddrlen);
603
if (a == ACTION_RESULT_DONE) {
604
// we will have been called back already and our state updated
606
// move to next state
607
ink_assert(s->_next_state == PROCESS_READ_DATA);
609
} else if (a == ACTION_IO_ERROR) {
610
// actually, this *could* be taken care of by the main handler, but
611
// error processing makes more sense at this point. Therefore,
612
// the main handler ignores the errors.
614
// No data, terminate read loop.
616
ICP_INCREMENT_DYN_STAT(no_data_read_stat);
617
s->_peer->buf = NULL; // release reference
618
s->_next_state = READ_NOT_ACTIVE_EXIT;
619
RECORD_ICP_STATE_CHANGE(s, 0, READ_NOT_ACTIVE_EXIT);
620
// move to next state
623
s->_peer->readAction = a;
627
#if !defined(_WIN32) && !defined(__GNUC__)
628
_end_case_read_data: // fix DEC warnings
630
ink_release_assert(0); // Should never happen
634
// Convert ICP message from network to host format
635
if (s->_peer->readAction != NULL) {
636
ink_assert(s->_peer->readAction == e);
637
s->_peer->readAction = NULL;
639
s->_bytesReceived = completionUtil::getBytesTransferred(e);
641
if (s->_bytesReceived >= 0) {
642
s->_next_state = PROCESS_READ_DATA;
643
RECORD_ICP_STATE_CHANGE(s, 0, PROCESS_READ_DATA);
645
ICP_INCREMENT_DYN_STAT(no_data_read_stat);
646
s->_peer->buf = NULL; // release reference
647
s->_next_state = READ_NOT_ACTIVE_EXIT;
648
RECORD_ICP_STATE_CHANGE(s, 0, READ_NOT_ACTIVE_EXIT);
650
if (_recursion_depth > 0) {
656
#if !defined(_WIN32) && !defined(__GNUC__)
657
_end_case_read_data_done: // fix DEC warnings
659
ink_release_assert(0); // Should never happen
661
case PROCESS_READ_DATA:
664
ink_release_assert(_recursion_depth == 0);
666
Ptr<IOBufferBlock> bufblock = s->_peer->buf;
667
char *buf = bufblock->start();
669
if (s->_next_state == PROCESS_READ_DATA) {
670
ICPRequestCont::NetToHostICPMsg((ICPMsg_t *)
671
(buf + sizeof(ICPMsg_t)), (ICPMsg_t *) buf);
673
// adjust buffer pointers to point to decoded message.
675
bufblock->fill(s->_bytesReceived);
677
// Validate message length for sanity
678
if (s->_bytesReceived < ((ICPMsg_t *) buf)->h.msglen) {
680
// Short read, terminate
682
ICP_INCREMENT_DYN_STAT(short_read_stat);
683
s->_peer->buf = NULL;
684
s->_next_state = READ_NOT_ACTIVE;
685
RECORD_ICP_STATE_CHANGE(s, 0, READ_NOT_ACTIVE);
686
break; // move to next_state
689
// Validate receiver and convert the received sockaddr
690
// to internal sockaddr format.
691
struct sockaddr_in from;
692
if (!s->_peer->ExtToIntRecvSockAddr(&s->_peer->fromaddr, &from)) {
694
ICPConfigData *cfg = _ICPpr->GetConfig()->globalConfig();
695
ICPMsg_t *ICPmsg = (ICPMsg_t *) buf;
697
if ((cfg->ICPconfigured() == ICP_MODE_RECEIVE_ONLY) &&
698
cfg->ICPReplyToUnknownPeer() &&
699
((ICPmsg->h.version == ICP_VERSION_2) ||
700
(ICPmsg->h.version == ICP_VERSION_3)) && (ICPmsg->h.opcode == ICP_OP_QUERY)) {
703
// Add the unknown Peer to our database to
704
// allow us to resolve the lookup request.
706
if (!_ICPpr->GetConfig()->Lock()) {
707
s->_next_state = ADD_PEER;
708
RECORD_ICP_STATE_CHANGE(s, 0, ADD_PEER);
711
if (!_ICPpr->GetFreePeers() || !_ICPpr->GetFreeSendPeers()) {
712
Warning("ICP Peer limit exceeded");
713
REC_SignalWarning(REC_SIGNAL_CONFIG_ERROR, "ICP Peer limit exceeded");
714
_ICPpr->GetConfig()->Unlock();
715
goto invalid_message;
718
int icp_reply_port = cfg->ICPDefaultReplyPort();
719
if (!icp_reply_port) {
720
icp_reply_port = ntohs(s->_peer->fromaddr.sin_port);
722
PeerConfigData *Pcfg = NEW(new PeerConfigData(PeerConfigData::CTYPE_SIBLING,
723
&s->_peer->fromaddr.sin_addr, 0,
725
ParentSiblingPeer *P = NEW(new ParentSiblingPeer(PEER_SIBLING, Pcfg, _ICPpr, true));
726
status = _ICPpr->AddPeer(P);
727
ink_release_assert(status);
728
status = _ICPpr->AddPeerToSendList(P);
729
ink_release_assert(status);
731
P->GetChan()->setRemote(P->GetIP()->s_addr, P->GetPort());
733
// coverity[uninit_use_in_call]
734
Note("ICP Peer added ip=%u.%u.%u.%u port=%d", PRINT_IP(P->GetIP()->s_addr), P->GetPort());
735
from = s->_peer->fromaddr;
739
// Sender does not exist in ICP configuration, terminate
741
ICP_INCREMENT_DYN_STAT(invalid_sender_stat);
742
Debug("icp", "Received msg from invalid sender [%s:%d]",
743
inet_ntoa(s->_peer->fromaddr.sin_addr), ntohs(s->_peer->fromaddr.sin_port));
745
s->_peer->buf = NULL;
746
s->_next_state = READ_NOT_ACTIVE;
747
RECORD_ICP_STATE_CHANGE(s, 0, READ_NOT_ACTIVE);
748
break; // move to next_state
751
// we hand off the decoded buffer from the Peer to the PeerReadData
753
s->_rICPmsg_len = s->_bytesReceived;
754
ink_assert(s->_buf == NULL);
755
s->_buf = s->_peer->buf;
756
s->_rICPmsg = (ICPMsg_t *) s->_buf->start();
757
s->_peer->buf = NULL;
760
// Handle only ICP_VERSION_2/3 messages. Reject all others.
762
if ((s->_rICPmsg->h.version != ICP_VERSION_2)
763
&& (s->_rICPmsg->h.version != ICP_VERSION_3)) {
764
ICP_INCREMENT_DYN_STAT(read_not_v2_icp_stat);
765
Debug("icp", "Received (v=%d) !v2 && !v3 msg from sender [%s:%d]",
766
(uint32_t) s->_rICPmsg->h.version, inet_ntoa(from.sin_addr), ntohs(from.sin_port));
770
s->_next_state = READ_NOT_ACTIVE;
771
RECORD_ICP_STATE_CHANGE(s, 0, READ_NOT_ACTIVE);
772
break; // move to next_state
775
// If this is a query message, redirect to
776
// the query specific handlers.
778
if (s->_rICPmsg->h.opcode == ICP_OP_QUERY) {
779
ICP_INCREMENT_DYN_STAT(icp_remote_query_requests_stat);
780
ink_assert(!s->_mycont);
781
s->_next_state = AWAITING_CACHE_LOOKUP_RESPONSE;
782
RECORD_ICP_STATE_CHANGE(s, 0, AWAITING_CACHE_LOOKUP_RESPONSE);
784
if (ICPPeerQueryCont(0, (Event *) 0) == EVENT_DONE) {
785
break; // Callback complete
787
return EVENT_DONE; // Callback pending
790
// We have a response message for an ICP query.
791
Debug("icp", "Response for Id=%d, from [%s:%d]",
792
s->_rICPmsg->h.requestno, inet_ntoa(s->_sender.sin_addr), ntohs(s->_sender.sin_port));
793
ICP_INCREMENT_DYN_STAT(icp_remote_responses_stat);
794
s->_next_state = GET_ICP_REQUEST;
795
RECORD_ICP_STATE_CHANGE(s, 0, GET_ICP_REQUEST);
796
break; // move to next_state
799
#if !defined(_WIN32) && !defined(__GNUC__)
800
_end_case_process_data_read: // fix DEC warnings
802
ink_release_assert(0); // Should never happen
804
case AWAITING_CACHE_LOOKUP_RESPONSE:
807
void *data = s->_rICPmsg->un.query.URL;
808
int datalen = strlen((const char *) data) + 1;
810
if (s->_queryResult == CACHE_EVENT_LOOKUP) {
811
// Use the received ICP data buffer for the response message
812
Debug("icp", "Sending ICP_OP_HIT for id=%d, [%s] to [%s:%d]",
813
s->_rICPmsg->h.requestno, data, inet_ntoa(s->_sender.sin_addr), ntohs(s->_sender.sin_port));
814
ICP_INCREMENT_DYN_STAT(icp_cache_lookup_success_stat);
815
status = ICPRequestCont::BuildICPMsg(ICP_OP_HIT,
816
s->_rICPmsg->h.requestno, 0 /* optflags */ , 0 /* optdata */ ,
818
data, datalen, &s->_mhdr, s->_iov, s->_rICPmsg);
819
} else if (s->_queryResult == CACHE_EVENT_LOOKUP_FAILED) {
820
// Use the received ICP data buffer for response message
821
Debug("icp", "Sending ICP_OP_MISS for id=%d, [%s] to [%s:%d]",
822
s->_rICPmsg->h.requestno, data, inet_ntoa(s->_sender.sin_addr), ntohs(s->_sender.sin_port));
823
ICP_INCREMENT_DYN_STAT(icp_cache_lookup_fail_stat);
824
status = ICPRequestCont::BuildICPMsg(ICP_OP_MISS,
825
s->_rICPmsg->h.requestno, 0 /* optflags */ , 0 /* optdata */ ,
827
data, datalen, &s->_mhdr, s->_iov, s->_rICPmsg);
829
Warning("Bad cache lookup event: %d", s->_queryResult);
830
ink_release_assert(!"Invalid cache lookup event");
832
ink_assert(status == 0);
834
// Make system log entry for ICP query
836
LogAccessICP accessor(&logentry);
837
Log::access(&accessor);
839
s->_next_state = SEND_REPLY;
840
RECORD_ICP_STATE_CHANGE(s, 0, SEND_REPLY);
842
if (_recursion_depth > 0) {
848
#if !defined(_WIN32) && !defined(__GNUC__)
849
_end_case_awaiting_cache_lookup_response: // fix DEC warnings
851
ink_release_assert(0); // Should never happen
855
ink_release_assert(_recursion_depth == 0);
857
// Send the query response back to the sender
859
s->_next_state = WRITE_DONE;
860
RECORD_ICP_STATE_CHANGE(s, 0, WRITE_DONE);
861
ink_assert(s->_peer->writeAction == NULL);
862
Action *a = s->_peer->SendMsg_re(this, this,
863
&s->_mhdr, &s->_sender);
867
if (a == ACTION_RESULT_DONE) {
868
// we have been called back already and our state updated
872
} else if (a == ACTION_IO_ERROR) {
874
ICP_INCREMENT_DYN_STAT(query_response_partial_write_stat);
876
*(uint32_t *) & x = (uint32_t) s->_sender.sin_addr.s_addr;
877
// coverity[uninit_use_in_call]
878
Debug("icp_warn", "ICP response send, sent=%d res=%d, ip=%d.%d.%d.%d",
879
ntohs(s->_rICPmsg->h.msglen), -1, x[0], x[1], x[2], x[3]);
880
s->_next_state = READ_NOT_ACTIVE;
881
RECORD_ICP_STATE_CHANGE(s, 0, READ_NOT_ACTIVE);
884
s->_peer->writeAction = a;
888
#if !defined(_WIN32) && !defined(__GNUC__)
889
_end_case_send_reply: // fix DEC warnings
891
ink_release_assert(0); // Should never happen
895
s->_peer->writeAction = NULL;
896
int len = completionUtil::getBytesTransferred(e);
898
if (len == (int)ntohs(s->_rICPmsg->h.msglen)) {
899
ICP_INCREMENT_DYN_STAT(query_response_write_stat);
900
s->_peer->LogSendMsg(s->_rICPmsg, &s->_sender); // log query reply
903
ICP_INCREMENT_DYN_STAT(query_response_partial_write_stat);
905
*(uint32_t *) & x = (uint32_t) s->_sender.sin_addr.s_addr;
906
// coverity[uninit_use_in_call]
907
Debug("icp_warn", "ICP response send, sent=%d res=%d, ip=%d.%d.%d.%d",
908
ntohs(s->_rICPmsg->h.msglen), len, x[0], x[1], x[2], x[3]);
910
// Processing complete, perform completion actions
911
s->_next_state = READ_NOT_ACTIVE;
912
RECORD_ICP_STATE_CHANGE(s, 0, READ_NOT_ACTIVE);
913
Debug("icp", "state->READ_NOT_ACTIVE");
915
if (_recursion_depth > 0) {
918
break; // move to next_state
921
#if !defined(_WIN32) && !defined(__GNUC__)
922
_end_case_write_done: // fix DEC warnings
924
ink_release_assert(0); // Should never happen
926
case GET_ICP_REQUEST:
928
ink_release_assert(_recursion_depth == 0);
929
ink_assert(s->_rICPmsg && s->_rICPmsg_len); // Sanity check
931
// Get ICP request associated with response message
932
s->_ICPReqCont = ICPRequestCont::FindICPRequest(s->_rICPmsg->h.requestno);
933
if (s->_ICPReqCont) {
934
s->_next_state = GET_ICP_REQUEST_MUTEX;
935
RECORD_ICP_STATE_CHANGE(s, 0, GET_ICP_REQUEST_MUTEX);
936
break; // move to next_state
939
// No ICP request for response message, log as "response
940
// for non-existent ICP request" and terminate processing
942
Debug("icp", "No ICP Request for Id=%d", s->_rICPmsg->h.requestno);
943
ICP_INCREMENT_DYN_STAT(no_icp_request_for_response_stat);
944
Peer *p = _ICPpr->FindPeer(&s->_sender.sin_addr,
945
ntohs(s->_sender.sin_port));
946
p->LogRecvMsg(s->_rICPmsg, 0);
947
s->_next_state = READ_NOT_ACTIVE;
948
RECORD_ICP_STATE_CHANGE(s, 0, READ_NOT_ACTIVE);
949
break; // move to next_state
951
#if !defined(_WIN32) && !defined(__GNUC__)
952
_end_case_get_icp_request: // fix DEC warnings
954
ink_release_assert(0); // Should never happen
956
case GET_ICP_REQUEST_MUTEX:
958
ink_release_assert(_recursion_depth == 0);
959
ink_assert(s->_ICPReqCont);
960
Ptr<ProxyMutex> ICPReqContMutex(s->_ICPReqCont->mutex);
961
EThread *ethread = this_ethread();
962
ink_hrtime request_start_time;
964
if (!MUTEX_TAKE_TRY_LOCK(ICPReqContMutex, ethread)) {
965
ICP_INCREMENT_DYN_STAT(icp_response_request_nolock_stat);
967
// Unable to get ICP request mutex, delay and move back
968
// to the GET_ICP_REQUEST state. We need to do this
969
// since the ICP request may be deallocated by the active
972
s->_ICPReqCont = (ICPRequestCont *) 0;
973
s->_next_state = GET_ICP_REQUEST;
974
RECORD_ICP_STATE_CHANGE(s, 0, GET_ICP_REQUEST);
977
// Log as "response for ICP request"
978
Peer *p = _ICPpr->FindPeer(&s->_sender.sin_addr,
979
ntohs(s->_sender.sin_port));
980
p->LogRecvMsg(s->_rICPmsg, 1);
982
// Process the ICP response for the given ICP request
983
ICPRequestCont::ICPRequestEventArgs_t args;
984
args.rICPmsg = s->_rICPmsg;
985
args.rICPmsg_len = s->_rICPmsg_len;
987
if (!s->_ICPReqCont->GetActionPtr()->cancelled) {
988
request_start_time = s->_ICPReqCont->GetRequestStartTime();
989
Debug("icp", "Passing Reply for ICP Id=%d", s->_rICPmsg->h.requestno);
990
s->_ICPReqCont->handleEvent((int) ICP_RESPONSE_MESSAGE, (void *) &args);
992
request_start_time = 0;
993
delete s->_ICPReqCont;
994
Debug("icp", "User cancelled ICP request Id=%d", s->_rICPmsg->h.requestno);
997
// Note: s->_ICPReqCont is deallocated at this point.
1000
MUTEX_UNTAKE_LOCK(ICPReqContMutex, ethread);
1001
if (request_start_time) {
1002
ICP_SUM_DYN_STAT(total_icp_response_time_stat, (ink_get_hrtime() - request_start_time));
1004
RECORD_ICP_STATE_CHANGE(s, 0, READ_NOT_ACTIVE);
1005
s->_next_state = READ_NOT_ACTIVE;
1006
break; // move to next_state
1008
#if !defined(_WIN32) && !defined(__GNUC__)
1009
_end_case_get_icp_request_mutex: // fix DEC warnings
1011
ink_release_assert(0); // Should never happen
1013
case READ_NOT_ACTIVE:
1014
case READ_NOT_ACTIVE_EXIT:
1016
ink_release_assert(_recursion_depth == 0);
1017
if (!_ICPpr->Lock())
1018
return EVENT_CONT; // unable to get lock, try again later
1020
// Note incoming ICP request or response completion
1021
_ICPpr->DecPendingQuery();
1025
if (s->_next_state == READ_NOT_ACTIVE_EXIT) {
1026
s->_next_state = READ_PROCESSING_COMPLETE;
1029
// Last read was valid, see if any more read data before exiting
1031
s->_start_time = ink_get_hrtime();
1032
s->_next_state = READ_ACTIVE;
1033
RECORD_ICP_STATE_CHANGE(s, 0, READ_ACTIVE);
1037
#if !defined(_WIN32) && !defined(__GNUC__)
1038
_end_case_read_not_active: // fix DEC warnings
1040
ink_release_assert(0); // Should never happen
1042
case READ_PROCESSING_COMPLETE:
1044
ink_release_assert(0); // Should never happen
1048
} // End of while(1)
1051
//------------------------------------------------------------------------
1052
// Class ICPRequestCont member functions
1053
// Implements the state machine which processes locally generated
1055
//------------------------------------------------------------------------
1056
ClassAllocator<ICPRequestCont> ICPRequestCont_allocator("ICPRequestCont_allocator");
1058
ICPRequestCont::ICPRequestCont(ICPProcessor * pr, Continuation * c, URL * u)
1059
: Continuation(0), _cont(c), _url(u), _start_time(0),
1060
_ICPpr(pr), _timeout(0),
1061
npending_actions(0), pendingActions(NULL),
1062
_sequence_number(0), _expected_replies(0),
1063
_expected_replies_list(MAX_DEFINED_PEERS), _received_replies(0), _next_state(ICP_START)
1065
memset((void *)&_ret_sockaddr, 0, sizeof(_ret_sockaddr));
1066
_ret_status = ICP_LOOKUP_FAILED;
1067
_act.cancelled = false;
1069
memset((void *) &_ICPmsg, 0, sizeof(_ICPmsg));
1070
memset((void *) &_sendMsgHdr, 0, sizeof(_sendMsgHdr));
1071
memset((void *) &_sendMsgIOV, 0, sizeof(_sendMsgIOV[MSG_IOVECS]));
1074
this->mutex = c->mutex;
1077
ICPRequestCont::~ICPRequestCont()
1083
_timeout->cancel(this);
1086
RemoveICPRequest(_sequence_number);
1088
if (_ICPmsg.h.opcode == ICP_OP_QUERY) {
1089
if (_ICPmsg.un.query.URL) {
1090
xfree(_ICPmsg.un.query.URL);
1093
if (pendingActions) {
1094
delete pendingActions;
1100
ICPRequestCont::remove_from_pendingActions(Action * a)
1102
if (!pendingActions) {
1106
for (intptr_t i = 0; i < pendingActions->length(); i++) {
1107
if ((*pendingActions)[i] == a) {
1108
for (intptr_t j = i; j < pendingActions->length() - 1; j++)
1109
(*pendingActions)[j] = (*pendingActions)[j + 1];
1110
pendingActions->set_length(pendingActions->length() - 1);
1115
npending_actions--; // completed inline
1119
ICPRequestCont::remove_all_pendingActions()
1121
int active_pendingActions = 0;
1123
if (!pendingActions) {
1126
for (intptr_t i = 0; i < pendingActions->length(); i++) {
1127
if ((*pendingActions)[i]
1128
&& ((*pendingActions)[i] != ACTION_IO_ERROR)) {
1129
((*pendingActions)[i])->cancel();
1130
(*pendingActions)[i] = 0;
1132
active_pendingActions++;
1134
(*pendingActions)[i] = 0;
1137
pendingActions->set_length(pendingActions->length() - active_pendingActions);
1141
ICPRequestCont::ICPRequestEvent(int event, Event * e)
1143
// Note: Passed parameter 'e' is not an Event *
1144
// if event == ICP_RESPONSE_MESSAGE
1146
ink_assert(event == NET_EVENT_DATAGRAM_WRITE_COMPLETE ||
1147
event == NET_EVENT_DATAGRAM_WRITE_ERROR ||
1148
event == EVENT_IMMEDIATE || event == EVENT_INTERVAL || event == ICP_RESPONSE_MESSAGE);
1149
// handle reentrant callback
1150
if ((event == NET_EVENT_DATAGRAM_WRITE_COMPLETE)
1151
|| (event == NET_EVENT_DATAGRAM_WRITE_ERROR)) {
1152
ink_assert(npending_actions > 0);
1153
remove_from_pendingActions((Action *) e);
1156
// Start of user ICP query request processing. We start here after
1157
// the reschedule in ICPProcessor::ICPQuery().
1158
switch (_next_state) {
1160
case ICP_OFF_TERMINATE:
1161
case ICP_QUEUE_REQUEST:
1162
case ICP_AWAITING_RESPONSE:
1163
case ICP_DEQUEUE_REQUEST:
1164
case ICP_POST_COMPLETION:
1165
case ICP_REQUEST_NOT_ACTIVE:
1167
if (ICPStateMachine(event, (void *) e) == EVENT_CONT) {
1169
// Unable to acquire lock, reschedule continuation
1171
eventProcessor.schedule_in(this, HRTIME_MSECONDS(RETRY_INTERVAL), ET_ICP);
1174
} else if (_next_state == ICP_DONE) {
1176
// ICP request processing complete.
1184
#if !defined(_WIN32) && !defined(__GNUC__)
1185
_end_case: // fix DEC warnings
1187
ink_release_assert(0); // should never happen
1191
ink_release_assert(0); // should never happen
1198
ICPRequestCont::NopICPRequestEvent(int event, Event * e)
1200
NOWARN_UNUSED(event);
1207
ICPRequestCont::ICPStateMachine(int event, void *d)
1209
//*******************************************
1210
// ICP message processing state machine
1211
//*******************************************
1212
ICPConfiguration *ICPcf = _ICPpr->GetConfig();
1214
while (1) { // loop forever
1216
switch (_next_state) {
1219
// User may have cancelled request, if so abort request.
1220
if (_act.cancelled) {
1221
_next_state = ICP_DONE;
1225
if (!_ICPpr->Lock())
1226
return EVENT_CONT; // Unable to get lock, try again later
1228
if (_ICPpr->AllowICPQueries() && (ICPcf->globalConfig()->ICPconfigured() == ICP_MODE_SEND_RECEIVE)) {
1230
// Reject NULL pointer or "localhost" URLs
1231
if (_url->valid()) {
1233
const char *host = _url->host_get(&host_len);
1234
if (ptr_len_casecmp(host, host_len, "127.0.0.1") == 0 || ptr_len_casecmp(host, host_len, "localhost") == 0) {
1237
// NULL pointer or "localhost" URL, terminate request
1238
_next_state = ICP_OFF_TERMINATE;
1239
Debug("icp", "[ICP_START] NULL/localhost URL ignored Id=%d", _sequence_number);
1240
break; // move to next_state
1243
// Note pending ICP request
1244
_ICPpr->IncPendingQuery();
1247
// Build the ICP query message
1248
char *urlstr = _url->string_get(NULL);
1249
int urlstr_len = strlen(urlstr) + 1;
1251
int status = BuildICPMsg(ICP_OP_QUERY,
1252
_sequence_number = ICPReqSeqNumber(),
1253
0 /* optflags */ , 0 /* optdata */ ,
1255
(void *) urlstr, urlstr_len,
1256
&_sendMsgHdr, _sendMsgIOV,
1258
// urlstr memory freed in destructor
1259
ink_assert(status == 0);
1260
Debug("icp", "[ICP_START] ICP_OP_QUERY for [%s], Id=%d", urlstr, _sequence_number);
1262
_next_state = ICP_QUEUE_REQUEST;
1263
break; // move to next_state
1266
ICP_INCREMENT_DYN_STAT(icp_start_icpoff_stat);
1269
// ICP NOT enabled, terminate request
1270
_next_state = ICP_OFF_TERMINATE;
1271
break; // move to next_state
1274
#if !defined(_WIN32) && !defined(__GNUC__)
1275
_end_case_icp_start: // fix DEC warnings
1277
ink_release_assert(0); // should never happen
1279
case ICP_OFF_TERMINATE:
1281
if (!MUTEX_TAKE_TRY_LOCK_FOR(mutex, this_ethread(), _cont)) {
1282
return EVENT_CONT; // unable to get lock, delay and retry
1284
Debug("icp", "[ICP_OFF_TERMINATE] Id=%d", _sequence_number);
1286
// ICP NOT enabled, post completion on request
1287
if (!_act.cancelled) {
1288
_cont->handleEvent(_ret_status, (void *) &_ret_sockaddr);
1290
MUTEX_UNTAKE_LOCK(mutex, this_ethread());
1292
_next_state = ICP_DONE;
1295
#if !defined(_WIN32) && !defined(__GNUC__)
1296
_end_case_icp_off_terminate: // fix DEC warnings
1298
ink_release_assert(0); // should never happen
1300
case ICP_QUEUE_REQUEST:
1302
// Place ICP request on the pending request queue
1303
int ret = AddICPRequest(_sequence_number, this);
1304
ink_assert(ret == 0);
1306
// Generate ICP requests to peers
1307
int bias = _ICPpr->GetStartingSendPeerBias();
1308
int SendPeers = _ICPpr->GetSendPeers();
1309
npending_actions = 0;
1310
while (SendPeers > 0) {
1311
Peer *P = _ICPpr->GetNthSendPeer(SendPeers, bias);
1312
if (!P->IsOnline()) {
1317
// Send query request to Peers
1320
// because of reentrancy, we have to do this first, just
1321
// in case we get called back immediately.
1322
int was_expected = P->ExpectedReplies(&_expected_replies_list);
1323
_expected_replies += was_expected;
1325
Action *a = P->SendMsg_re(this,
1327
&_sendMsgHdr, (struct sockaddr_in *) 0);
1329
a = ACTION_IO_ERROR;
1331
if (a != ACTION_IO_ERROR) {
1332
if (a != ACTION_RESULT_DONE) {
1333
if (!pendingActions) {
1334
pendingActions = NEW(new DynArray<Action *>(&default_action));
1336
(*pendingActions) (npending_actions) = a;
1338
P->LogSendMsg(&_ICPmsg, (struct sockaddr_in *) 0); // log as send query
1339
Debug("icp", "[ICP_QUEUE_REQUEST] Id=%d send query to [%s:%d]",
1340
_sequence_number, inet_ntoa(*P->GetIP()), P->GetPort());
1342
_expected_replies_list.ClearBit(P->GetPeerID());
1343
_expected_replies -= was_expected;
1344
// Partial or failed write.
1345
ICP_INCREMENT_DYN_STAT(send_query_partial_write_stat);
1347
*(uint32_t *) & x = (uint32_t) (*P->GetIP()).s_addr;
1348
// coverity[uninit_use_in_call]
1350
"ICP query send, res=%d, ip=%d.%d.%d.%d", ntohs(_ICPmsg.h.msglen), x[0], x[1], x[2], x[3]);
1355
Debug("icp", "[ICP_QUEUE_REQUEST] Id=%d expected replies=%d", _sequence_number, _expected_replies);
1356
if (!_expected_replies) {
1358
// Nothing to wait for, terminate ICP processing
1360
ICP_INCREMENT_DYN_STAT(icp_queries_no_expected_replies_stat);
1361
_next_state = ICP_DEQUEUE_REQUEST;
1362
break; // move to next_state
1364
ICP_SUM_DYN_STAT(total_udp_send_queries_stat, _expected_replies);
1367
// Setup ICP request response timeout
1369
int tval = _ICPpr->GetConfig()->globalConfig()->ICPqueryTimeout();
1370
_timeout = eventProcessor.schedule_in(this, HRTIME_SECONDS(tval), ET_ICP);
1372
_next_state = ICP_AWAITING_RESPONSE;
1375
#if !defined(_WIN32) && !defined(__GNUC__)
1376
_end_case_icp_queue_request: // fix DEC warnings
1378
ink_release_assert(0); // should never happen
1380
case ICP_AWAITING_RESPONSE:
1382
Debug("icp", "[ICP_AWAITING_RESPONSE] Id=%d", _sequence_number);
1384
ICPRequestEventArgs_t dummyArgs;
1385
ICPRequestEventArgs_t *args = 0;
1387
if (event == ICP_RESPONSE_MESSAGE) {
1388
args = (ICPRequestEventArgs_t *) d;
1389
} else if (event == EVENT_INTERVAL) {
1390
memset((void *) &dummyArgs, 0, sizeof(dummyArgs));
1393
ink_release_assert(0); // should never happen
1396
// Process ICP response
1397
if (ICPResponseMessage(event, args->rICPmsg, args->rICPmsg_len, args->peer) == EVENT_DONE) {
1398
// ICP Request processing is complete, do completion actions
1399
_next_state = ICP_DEQUEUE_REQUEST;
1400
break; // move to next_state
1403
// Continue to wait for additional replies
1407
#if !defined(_WIN32) && !defined(__GNUC__)
1408
_end_case_icp_awaiting_response: // fix DEC warnings
1410
ink_release_assert(0); // should never happen
1412
case ICP_DEQUEUE_REQUEST:
1414
// Remove ICP request from active queue
1415
int ret = RemoveICPRequest(_sequence_number);
1416
Debug("icp", "[ICP_DEQUEUE_REQUEST] Id=%d", _sequence_number);
1417
ink_assert(ret == 0);
1418
//_sequence_number = 0; // moved to REQUEST_NOT_ACTIVE
1419
_next_state = ICP_POST_COMPLETION;
1420
break; // move to next_state
1422
#if !defined(_WIN32) && !defined(__GNUC__)
1423
_end_case_icp_dequeue_request: // fix DEC warnings
1425
ink_release_assert(0); // should never happen
1427
case ICP_POST_COMPLETION:
1429
if (!MUTEX_TAKE_TRY_LOCK_FOR(mutex, this_ethread(), _cont)) {
1430
return EVENT_CONT; // unable to get lock, delay and retry
1432
Debug("icp", "[ICP_POST_COMPLETION] Id=%d", _sequence_number);
1434
// Post completion on the ICP request.
1435
if (!_act.cancelled) {
1436
_cont->handleEvent(_ret_status, (void *) &_ret_sockaddr);
1438
MUTEX_UNTAKE_LOCK(mutex, this_ethread());
1439
ICP_SUM_DYN_STAT(total_icp_request_time_stat, (ink_get_hrtime() - _start_time));
1441
_next_state = ICP_WAIT_SEND_COMPLETE;
1442
break; // move to next_state
1444
#if !defined(_WIN32) && !defined(__GNUC__)
1445
_end_case_icp_post_completion: // fix DEC warnings
1447
ink_release_assert(0); // should never happen
1448
case ICP_WAIT_SEND_COMPLETE:
1450
// wait for all the sends to complete.
1451
if (npending_actions > 0) {
1452
Debug("icp", "[ICP_WAIT_SEND_COMPLETE] Id=%d active=%d", _sequence_number, npending_actions);
1454
_next_state = ICP_REQUEST_NOT_ACTIVE;
1455
// move to next state
1460
#if !defined(_WIN32) && !defined(__GNUC__)
1461
_end_case_icp_wait_send_complete: // fix DEC warnings
1463
ink_release_assert(0); // should never happen
1464
case ICP_REQUEST_NOT_ACTIVE:
1466
Debug("icp", "[ICP_REQUEST_NOT_ACTIVE] Id=%d", _sequence_number);
1467
_sequence_number = 0;
1468
if (!_ICPpr->Lock())
1469
return EVENT_CONT; // Unable to get lock, try again later
1471
// Note pending ICP request completion
1472
_ICPpr->DecPendingQuery();
1475
_next_state = ICP_DONE;
1478
#if !defined(_WIN32) && !defined(__GNUC__)
1479
_end_case_icp_request_not_active: // fix DEC warnings
1481
ink_release_assert(0); // should never happen
1485
ink_release_assert(0); // should never happen
1489
} // End of while(1)
1493
ICPRequestCont::ICPResponseMessage(int event, ICPMsg_t * m, int ICPMsg_len, Peer * peer)
1495
NOWARN_UNUSED(ICPMsg_len);
1496
if (event == EVENT_INTERVAL) {
1498
remove_all_pendingActions();
1500
// ICP request response timeout, if we received a response from
1501
// any parent, return it to resolve the miss.
1503
if (_received_replies) {
1504
int NumParentPeers = _ICPpr->GetParentPeers();
1505
if (NumParentPeers > 0) {
1508
for (n = 0; n < NumParentPeers; n++) {
1509
pp = _ICPpr->GetNthParentPeer(0, _ICPpr->GetStartingParentPeerBias());
1510
if (pp && !_expected_replies_list.IsBitSet(pp->GetPeerID())
1512
_ret_sockaddr.sin_addr.s_addr = (pp->GetIP())->s_addr;
1513
_ret_sockaddr.sin_port = htons(((ParentSiblingPeer *) pp)->GetProxyPort());
1514
_ret_status = ICP_LOOKUP_FOUND;
1517
"ICP timeout using parent Id=%d from [%s:%d] return [%s:%d]",
1518
_sequence_number, inet_ntoa(*pp->GetIP()),
1519
pp->GetPort(), inet_ntoa(_ret_sockaddr.sin_addr), ntohs(_ret_sockaddr.sin_port));
1525
// Timeout received on ICP request, return ICP_LOOKUP_FAILED
1526
Debug("icp", "ICP Response timeout for Id=%d", _sequence_number);
1530
// We have received a response to our ICP query request.
1531
// See if this response resolves the ICP query.
1533
ink_assert(m->h.requestno == _sequence_number);
1535
switch (m->h.opcode) {
1537
case ICP_OP_HIT_OBJ:
1539
// Kill timeout event
1540
_timeout->cancel(this);
1543
ICP_INCREMENT_DYN_STAT(icp_query_hits_stat);
1544
++_received_replies;
1545
_ret_sockaddr.sin_addr.s_addr = (peer->GetIP())->s_addr;
1546
_ret_sockaddr.sin_port = htons(((ParentSiblingPeer *) peer)->GetProxyPort());
1547
_ret_status = ICP_LOOKUP_FOUND;
1550
"ICP Response HIT for Id=%d from [%s:%d] return [%s:%d]",
1551
_sequence_number, inet_ntoa(*peer->GetIP()), peer->GetPort(),
1552
inet_ntoa(_ret_sockaddr.sin_addr), ntohs(_ret_sockaddr.sin_port));
1557
case ICP_OP_MISS_NOFETCH:
1560
Debug("icp", "ICP MISS response for Id=%d from [%s:%d]",
1561
_sequence_number, inet_ntoa(*peer->GetIP()), peer->GetPort());
1562
// "received_replies" is only for Peers who we expect a reply
1563
// from (Peers which are in the expected_replies_list).
1564
int Id = peer->GetPeerID();
1565
if (_expected_replies_list.IsBitSet(Id)) {
1566
// Clear bit to note receipt of reply
1567
_expected_replies_list.ClearBit(Id);
1568
++_received_replies;
1571
if (_received_replies < _expected_replies)
1572
return EVENT_CONT; // wait for more responses
1574
// Kill timeout event
1575
_timeout->cancel(this);
1578
ICP_INCREMENT_DYN_STAT(icp_query_misses_stat);
1580
// All responders have returned ICP_OP_MISS.
1581
// If parents exists, select one to resolve the request.
1583
if (_ICPpr->GetParentPeers() > 0) {
1584
// In cases where multiple parents exist, we use
1585
// a round robin scheme.
1587
// try to find an UP parent, if none, return ICP_LOOKUP_FAILED
1590
for (i = 0; i < _ICPpr->GetParentPeers(); i++) {
1591
p = _ICPpr->GetNthParentPeer(0, _ICPpr->GetStartingParentPeerBias());
1592
// find an UP parent
1596
// if no parent is selected, then return ICP_LOOKUP_FAILED
1597
if (i >= _ICPpr->GetParentPeers()) {
1598
Debug("icp", "None of the %d ICP parent(s) is up", _ICPpr->GetParentPeers());
1603
_ret_sockaddr.sin_addr.s_addr = (p->GetIP())->s_addr;
1604
_ret_sockaddr.sin_port = htons(((ParentSiblingPeer *) p)->GetProxyPort());
1605
_ret_status = ICP_LOOKUP_FOUND;
1607
Debug("icp", "ICP ALL MISS(1) for Id=%d return [%s:%d]",
1608
_sequence_number, inet_ntoa(_ret_sockaddr.sin_addr), ntohs(_ret_sockaddr.sin_port));
1612
Debug("icp", "ICP ALL MISS(2) for Id=%d return [%s:%d]",
1613
_sequence_number, inet_ntoa(_ret_sockaddr.sin_addr), ntohs(_ret_sockaddr.sin_port));
1618
ICP_INCREMENT_DYN_STAT(invalid_icp_query_response_stat);
1620
*(uint32_t *) & x = (uint32_t) peer->GetIP()->s_addr;
1621
// coverity[uninit_use_in_call]
1622
Warning("Invalid ICP response, op=%d reqno=%d ip=%d.%d.%d.%d",
1623
m->h.opcode, m->h.requestno, x[0], x[1], x[2], x[3]);
1624
return EVENT_CONT; // wait for more responses
1631
//------------------------------------------------
1632
// Class ICPRequestCont static member functions
1633
//------------------------------------------------
1635
// Static member function
1637
ICPRequestCont::NetToHostICPMsg(ICPMsg_t * in, ICPMsg_t * out)
1639
out->h.opcode = in->h.opcode;
1640
out->h.version = in->h.version;
1641
out->h.msglen = ntohs(in->h.msglen);
1642
out->h.requestno = ntohl(in->h.requestno);
1643
out->h.optionflags = ntohl(in->h.optionflags);
1644
out->h.optiondata = ntohl(in->h.optiondata);
1645
out->h.shostid = ntohl(in->h.shostid);
1647
switch (in->h.opcode) {
1650
memcpy((char *) &out->un.query.rhostid,
1651
(char *) ((char *) (&in->h.shostid) + sizeof(in->h.shostid)), sizeof(out->un.query.rhostid));
1652
out->un.query.rhostid = ntohl(out->un.query.rhostid);
1653
out->un.query.URL = (char *) ((char *) (&in->h.shostid) + sizeof(in->h.shostid) + sizeof(out->un.query.rhostid));
1658
out->un.hit.URL = (char *)((char *) (&in->h.shostid) + sizeof(in->h.shostid));
1663
out->un.miss.URL = (char *)((char *) (&in->h.shostid) + sizeof(in->h.shostid));
1666
case ICP_OP_HIT_OBJ:
1668
out->un.hitobj.URL = (char *)((char *) (&in->h.shostid) + sizeof(in->h.shostid));
1670
// strlen() is bounded since buffer in null terminated.
1671
out->un.hitobj.p_objsize = (char *) (out->un.hitobj.URL + strlen(out->un.hitobj.URL));
1672
memcpy((char *) &out->un.hitobj.objsize, out->un.hitobj.p_objsize, sizeof(out->un.hitobj.objsize));
1673
out->un.hitobj.objsize = ntohs(out->un.hitobj.objsize);
1674
out->un.hitobj.data = (char *) (out->un.hitobj.p_objsize + sizeof(out->un.hitobj.objsize));
1683
ICPRequestCont::BuildICPMsg(ICPopcode_t op, unsigned int seqno,
1684
int optflags, int optdata, int shostid,
1685
void *data, int datalen, struct msghdr *mhdr, struct iovec *iov, ICPMsg_t * icpmsg)
1687
// Build ICP message for transmission in network byte order.
1688
if (op == ICP_OP_QUERY) {
1689
icpmsg->un.query.rhostid = htonl(0);
1690
icpmsg->un.query.URL = (char *) data;
1692
mhdr->msg_iov = iov;
1693
mhdr->msg_iovlen = 3;
1695
iov[0].iov_base = (caddr_t) icpmsg;
1696
iov[0].iov_len = sizeof(ICPMsgHdr_t);
1698
iov[1].iov_base = (caddr_t) & icpmsg->un.query.rhostid;
1699
iov[1].iov_len = sizeof(icpmsg->un.query.rhostid);
1701
iov[2].iov_base = (caddr_t) data;
1702
iov[2].iov_len = datalen;
1703
icpmsg->h.msglen = htons(iov[0].iov_len + iov[1].iov_len + iov[2].iov_len);
1705
} else if (op == ICP_OP_HIT) {
1706
icpmsg->un.hit.URL = (char *) data;
1708
mhdr->msg_iov = iov;
1709
mhdr->msg_iovlen = 2;
1711
iov[0].iov_base = (caddr_t) icpmsg;
1712
iov[0].iov_len = sizeof(ICPMsgHdr_t);
1714
iov[1].iov_base = (caddr_t) data;
1715
iov[1].iov_len = datalen;
1716
icpmsg->h.msglen = htons(iov[0].iov_len + iov[1].iov_len);
1718
} else if (op == ICP_OP_MISS) {
1719
icpmsg->un.miss.URL = (char *) data;
1721
mhdr->msg_iov = iov;
1722
mhdr->msg_iovlen = 2;
1724
iov[0].iov_base = (caddr_t) icpmsg;
1725
iov[0].iov_len = sizeof(ICPMsgHdr_t);
1727
iov[1].iov_base = (caddr_t) data;
1728
iov[1].iov_len = datalen;
1729
icpmsg->h.msglen = htons(iov[0].iov_len + iov[1].iov_len);
1732
ink_release_assert(0);
1736
mhdr->msg_name = (caddr_t) 0;
1737
mhdr->msg_namelen = 0;
1738
// TODO: The following is just awkward
1739
#if !defined(linux) && !defined(freebsd) && !defined(darwin) && !defined(solaris)
1740
mhdr->msg_accrights = (caddr_t) 0;
1741
mhdr->msg_accrightslen = 0;
1742
#elif !defined(solaris)
1743
mhdr->msg_control = 0;
1744
mhdr->msg_controllen = 0;
1745
mhdr->msg_flags = 0;
1748
icpmsg->h.opcode = op;
1749
icpmsg->h.version = ICP_VERSION_2;
1750
icpmsg->h.requestno = htonl(seqno);
1751
icpmsg->h.optionflags = htonl(optflags);
1752
icpmsg->h.optiondata = htonl(optdata);
1753
icpmsg->h.shostid = htonl(shostid);
1755
return 0; // Success
1758
// Static ICPRequestCont data declarations
1760
ICPRequestCont::ICPRequestSeqno = 1;
1761
Queue<ICPRequestCont> ICPRequestQueue[ICPRequestCont::ICP_REQUEST_HASH_SIZE];
1763
// Static member function
1765
ICPRequestCont::ICPReqSeqNumber()
1767
// Generate ICP request sequence numbers. This must be unique.
1768
unsigned int res = 0;
1770
res = (unsigned int) ink_atomic_increment((int *) &ICPRequestSeqno, 1);
1776
// Static member function
1778
ICPRequestCont::ICPRequestHash(unsigned int seqno)
1780
// ICPRequestQueue hash
1781
return seqno % ICP_REQUEST_HASH_SIZE;
1784
// Static member function
1786
ICPRequestCont::AddICPRequest(unsigned int seqno, ICPRequestCont * r)
1788
// Add ICP request to ICP outstanding queue (ICPRequestQueue).
1789
// return: 0 - success
1791
ICPRequestQueue[ICPRequestHash(seqno)].enqueue(r);
1792
return 0; // Success
1795
// Static member function
1797
ICPRequestCont::FindICPRequest(unsigned int seqno)
1799
// Find ICP request on outstanding queue with the given sequence number
1800
int hash = ICPRequestHash(seqno);
1803
for (r = (ICPRequestCont *) ICPRequestQueue[hash].head; r; r = (ICPRequestCont *) r->link.next) {
1804
if (r->_sequence_number == seqno)
1807
return (ICPRequestCont *) 0; // Not found
1810
// Static member function
1812
ICPRequestCont::RemoveICPRequest(unsigned int seqno)
1814
// Remove ICP request from outstanding queue with the given
1816
// Return: 0 - success; 1 - not found
1819
return 1; // Not found
1821
int hash = ICPRequestHash(seqno);
1824
for (r = (ICPRequestCont *) ICPRequestQueue[hash].head; r; r = (ICPRequestCont *) r->link.next) {
1825
if (r->_sequence_number == seqno) {
1826
ICPRequestQueue[hash].remove(r);
1830
return 1; // Not found
1833
//------------------------------------------------------------------------
1834
// Class ICPProcessor member functions
1835
// Central class which initializes the ICP world.
1836
// Delegates incoming message processing to ICPHandlerCont
1837
// and outgoing message processing to ICPRequestCont.
1838
// Manages the ICP configuration database derived from TS
1839
// configuration info.
1840
//------------------------------------------------------------------------
1842
// Static data declarations for ICPProcessor
1844
initialize_thread_for_icp(EThread * e)
1849
ICPProcessor icpProcessorInternal;
1850
ICPProcessorExt icpProcessor(&icpProcessorInternal);
1852
ICPProcessor::ICPProcessor()
1853
: _l(0), _Initialized(0), _AllowIcpQueries(0),
1854
_PendingIcpQueries(0), _ICPConfig(0), _ICPPeriodic(0), _ICPHandler(0),
1855
_mcastCB_handler(NULL), _PeriodicEvent(0), _ICPHandlerEvent(0),
1856
_nPeerList(-1), _LocalPeer(0),
1857
_curSendPeer(0), _nSendPeerList(-1),
1858
_curRecvPeer(0), _nRecvPeerList(-1), _curParentPeer(0), _nParentPeerList(-1), _ValidPollData(0), _last_recv_peer_bias(0)
1860
memset((void *)_PeerList, 0, sizeof(_PeerList[PEER_LIST_SIZE]));
1861
memset((void *)_SendPeerList, 0, sizeof(_SendPeerList[SEND_PEER_LIST_SIZE]));
1862
memset((void *)_RecvPeerList, 0, sizeof(_RecvPeerList[RECV_PEER_LIST_SIZE]));
1863
memset((void *)_ParentPeerList, 0, sizeof(_ParentPeerList[PARENT_PEER_LIST_SIZE]));
1864
memset((void *)_PeerIDtoPollIndex, 0, sizeof(_PeerIDtoPollIndex[PEER_ID_POLL_INDEX_SIZE]));
1867
ICPProcessor::~ICPProcessor()
1869
#if defined (_WIN32)
1870
if (0 < glShutdownInProgress)
1874
MUTEX_TAKE_LOCK(_ICPPeriodic->mutex, this_ethread());
1875
_PeriodicEvent->cancel();
1876
Mutex_unlock(_ICPPeriodic->mutex, this_ethread());
1880
MUTEX_TAKE_LOCK(_ICPHandler->mutex, this_ethread());
1881
_ICPHandlerEvent->cancel();
1882
Mutex_unlock(_ICPHandler->mutex, this_ethread());
1887
ICPProcessor::start()
1889
//*****************************************************
1890
// Perform initialization actions for ICPProcessor
1891
// (called at system startup)
1892
//*****************************************************
1893
if (_Initialized) // Do only once
1897
// Setup ICPProcessor lock, required since ICPProcessor is instantiated
1898
// as static object.
1900
_l = NEW(new AtomicLock());
1903
// Setup custom allocators
1905
// replaced with generic IOBufferBlock allocator
1906
ICPHandlerCont::ICPDataBuf_IOBuffer_sizeindex = iobuffer_size_to_index(MAX_ICP_MSGSIZE, MAX_BUFFER_SIZE_INDEX);
1909
// Setup ICP stats callbacks
1911
InitICPStatCallbacks();
1914
// Create ICP configuration objects
1916
_ICPConfig = NEW(new ICPConfiguration());
1918
_mcastCB_handler = NEW(new ICPHandlerCont(this));
1919
SET_CONTINUATION_HANDLER(_mcastCB_handler, (ICPHandlerContHandler) & ICPHandlerCont::TossEvent);
1923
// Build ICP peer list and setup listen sockets
1925
if (_ICPConfig->globalConfig()->ICPconfigured()) {
1926
if (BuildPeerList() == 0) {
1927
if (SetupListenSockets() == 0) {
1928
_AllowIcpQueries = 1; // allow receipt of queries
1935
// Start ICP configuration monitor (periodic continuation)
1937
_ICPPeriodic = NEW(new ICPPeriodicCont(this));
1938
SET_CONTINUATION_HANDLER(_ICPPeriodic, (ICPPeriodicContHandler) & ICPPeriodicCont::PeriodicEvent);
1939
_PeriodicEvent = eventProcessor.schedule_every(_ICPPeriodic, HRTIME_MSECONDS(ICPPeriodicCont::PERIODIC_INTERVAL), ET_ICP);
1942
// Start ICP receive handler continuation
1944
_ICPHandler = NEW(new ICPHandlerCont(this));
1945
SET_CONTINUATION_HANDLER(_ICPHandler, (ICPHandlerContHandler) & ICPHandlerCont::PeriodicEvent);
1946
_ICPHandlerEvent = eventProcessor.schedule_every(_ICPHandler,
1947
HRTIME_MSECONDS(ICPHandlerCont::ICP_HANDLER_INTERVAL), ET_ICP);
1949
// Stale lookup data initializations
1951
if (!gclient_request.valid()) {
1952
gclient_request.create(HTTP_TYPE_REQUEST);
1958
ICPProcessor::ICPQuery(Continuation * c, URL * url)
1960
//**************************************
1961
// HTTP state machine interface to ICP
1962
//**************************************
1964
// Build continuation to process ICP request
1965
EThread *thread = this_ethread();
1966
ProxyMutex *mutex = thread->mutex;
1967
ICPRequestCont *rc = new(ICPRequestCont_allocator.alloc()) ICPRequestCont(this, c, url);
1969
ICP_INCREMENT_DYN_STAT(icp_query_requests_stat);
1971
rc->SetRequestStartTime();
1972
SET_CONTINUATION_HANDLER(rc, (ICPRequestContHandler) & ICPRequestCont::ICPRequestEvent);
1973
eventProcessor.schedule_imm(rc, ET_ICP);
1975
return rc->GetActionPtr();
1979
ICPProcessor::BuildPeerList()
1981
// Returns 0 on Success
1984
//---------------------------------------------------------------------
1985
// We always place all allocated Peer elements onto PeerList[],
1986
// which is used to track allocated elements and validate (ip, port)
1987
// uniqueness in the ICP configuration.
1989
// All MultiCastPeer(s) link the underlying ParentSiblingPeer structures
1990
// using a singly linked list off the MultiCastPeer.
1992
// Peer elements placed onto SendPeerList[] are elements which are
1993
// the target of ICP queries.
1994
// In the case where MultiCasting is used, a pseudo peer element
1995
// (MultiCastPeer) is placed onto the SendPeerList[] to act as a place
1996
// holder for the underlying Peers.
1998
// RecvPeerList[] is the list of Peer(s) we perform reads on for
1999
// ICP messages. In the case of MultiCast, the pseudo MultiCast peer
2000
// element (MultiCastPeer) is placed on this list. Since we currently
2001
// funnel all unicast receives through the local peer UDP socket,
2002
// only the local peer and any pseudo MultiCastPeer structures reside
2005
// Parent (PEER_PARENT) Peer elements are also added to ParentPeerList
2006
// which is used to select a parent in the case where all ICP queries
2007
// have returned ICP_MISS.
2008
//---------------------------------------------------------------------
2010
PeerConfigData *Pcfg;
2018
// From the working copy of the ICP configuration data, build the
2019
// internal Peer data structures for ICP processing.
2020
// First, establish the Local Peer descriptor before processing
2021
// parents and siblings.
2023
Pcfg = _ICPConfig->indexToPeerConfigData(0);
2024
ink_strncpy(Pcfg->_hostname, "localhost", sizeof(Pcfg->_hostname));
2025
Pcfg->_ctype = PeerConfigData::CTYPE_LOCAL;
2027
// Get IP address for given interface
2028
if (!mgmt_getAddrForIntr(GetConfig()->globalConfig()->ICPinterface(), &Pcfg->_ip_addr)) {
2029
// No IP address for given interface
2030
Warning("ICP interface [%s] has no IP address", GetConfig()->globalConfig()->ICPinterface());
2031
REC_SignalWarning(REC_SIGNAL_CONFIG_ERROR, "ICP interface has no IP address");
2032
Pcfg->_ip_addr.s_addr = 0;
2034
Pcfg->_my_ip_addr.s_addr = Pcfg->_ip_addr.s_addr;
2036
Pcfg->_proxy_port = 0;
2037
Pcfg->_icp_port = GetConfig()->globalConfig()->ICPport();
2038
Pcfg->_mc_member = 0;
2039
Pcfg->_mc_ip_addr.s_addr = 0;
2042
//***************************************************
2043
// Descriptor for local host, add to PeerList and
2045
//***************************************************
2046
P = NEW(new ParentSiblingPeer(PEER_LOCAL, Pcfg, this));
2047
status = AddPeer(P);
2048
ink_release_assert(status);
2049
status = AddPeerToRecvList(P);
2050
ink_release_assert(status);
2053
for (index = 1; index < MAX_DEFINED_PEERS; ++index) {
2054
Pcfg = _ICPConfig->indexToPeerConfigData(index);
2055
type = PeerConfigData::CTypeToPeerType_t(Pcfg->GetCType());
2057
// Ignore parent and sibling entries corresponding to "localhost".
2058
// This is possible in a cluster configuration where parents and
2059
// siblings are cluster members. Note that in a cluster
2060
// configuration, "icp.config" is shared by all nodes.
2062
if (Pcfg->GetIP()->s_addr == _LocalPeer->GetIP()->s_addr)
2065
if ((type == PEER_PARENT) || (type == PEER_SIBLING)) {
2067
if (Pcfg->MultiCastMember()) {
2068
mcP = FindPeer(Pcfg->GetMultiCastIP(), Pcfg->GetICPPort());
2070
//*********************************
2071
// Create multicast peer structure
2072
//*********************************
2073
mcP = NEW(new MultiCastPeer(Pcfg->GetMultiCastIP(), Pcfg->GetICPPort(), Pcfg->GetMultiCastTTL(), this));
2074
status = AddPeer(mcP);
2076
status = AddPeerToSendList(mcP);
2078
status = AddPeerToRecvList(mcP);
2081
//*****************************
2082
// Add child to MultiCast peer
2083
//*****************************
2084
P = NEW(new ParentSiblingPeer(type, Pcfg, this));
2085
status = AddPeer(P);
2087
status = ((MultiCastPeer *) mcP)->AddMultiCastChild(P);
2091
//*****************************
2092
// Add parent/sibling peer
2093
//*****************************
2094
P = NEW(new ParentSiblingPeer(type, Pcfg, this));
2095
status = AddPeer(P);
2097
status = AddPeerToSendList(P);
2100
//****************************************
2101
// Also, add parent peers to parent list.
2102
//****************************************
2103
if (type == PEER_PARENT) {
2104
status = AddPeerToParentList(P);
2109
return 0; // Success
2113
ICPProcessor::FreePeerList()
2115
// Deallocate all Peer structures
2117
for (index = 0; index < (_nPeerList + 1); ++index) {
2118
if (_PeerList[index]) {
2119
_PeerList[index] = 0;
2122
// Reset all control data
2124
_LocalPeer = (Peer *) 0;
2126
_nSendPeerList = -1;
2128
_nRecvPeerList = -1;
2130
_nParentPeerList = -1;
2132
_last_recv_peer_bias = 0;
2134
for (index = 0; index < PEER_LIST_SIZE; index++) {
2135
_PeerList[index] = 0;
2137
for (index = 0; index < SEND_PEER_LIST_SIZE; index++) {
2138
_SendPeerList[index] = 0;
2140
for (index = 0; index < RECV_PEER_LIST_SIZE; index++) {
2141
_RecvPeerList[index] = 0;
2143
for (index = 0; index < PARENT_PEER_LIST_SIZE; index++) {
2144
_ParentPeerList[index] = 0;
2146
memset((void *) _PeerIDtoPollIndex, 0, sizeof(_PeerIDtoPollIndex[PEER_ID_POLL_INDEX_SIZE]));
2150
ICPProcessor::SetupListenSockets()
2152
int allow_null_configuration;
2154
if ((_ICPConfig->globalConfig()->ICPconfigured() == ICP_MODE_RECEIVE_ONLY)
2155
&& _ICPConfig->globalConfig()->ICPReplyToUnknownPeer()) {
2156
allow_null_configuration = 1;
2158
allow_null_configuration = 0;
2161
// Returns 0 on Success.
2164
// Perform some basic sanity checks on the ICP configuration.
2167
Warning("ICP setup, no defined local Peer");
2168
REC_SignalWarning(REC_SIGNAL_CONFIG_ERROR, "ICP setup, no defined local Peer");
2172
if (GetSendPeers() == 0) {
2173
if (!allow_null_configuration) {
2174
Warning("ICP setup, no defined send Peer(s)");
2175
REC_SignalWarning(REC_SIGNAL_CONFIG_ERROR, "ICP setup, no defined send Peer(s)");
2179
if (GetRecvPeers() == 0) {
2180
if (!allow_null_configuration) {
2181
Warning("ICP setup, no defined receive Peer(s)");
2182
REC_SignalWarning(REC_SIGNAL_CONFIG_ERROR, "ICP setup, no defined receive Peer(s)");
2187
// Establish the required sockets for elements on the PeerList[].
2192
for (index = 0; index < (_nPeerList + 1); ++index) {
2193
if ((P = _PeerList[index])) {
2195
if ((P->GetType() == PEER_PARENT)
2196
|| (P->GetType() == PEER_SIBLING)) {
2197
ParentSiblingPeer *pPS = (ParentSiblingPeer *) P;
2199
pPS->GetChan()->setRemote(pPS->GetIP()->s_addr, pPS->GetPort());
2201
} else if (P->GetType() == PEER_MULTICAST) {
2202
MultiCastPeer *pMC = (MultiCastPeer *) P;
2203
ink_assert(_mcastCB_handler != NULL);
2204
status = pMC->GetSendChan()->setup_mc_send(pMC->GetIP()->s_addr, pMC->GetPort(), _LocalPeer->GetIP()->s_addr,
2206
_LocalPeer->GetPort(),
2210
NON_BLOCKING, pMC->GetTTL(), DISABLE_MC_LOOPBACK, _mcastCB_handler);
2212
unsigned char x[4], y[4];
2213
*(uint32_t *) & x = (uint32_t) pMC->GetIP()->s_addr;
2214
*(uint32_t *) & y = (uint32_t) _LocalPeer->GetIP()->s_addr;
2215
// coverity[uninit_use_in_call]
2216
Warning("ICP MC send setup failed, res=%d, ip=%d.%d.%d.%d:%d bind_ip=%d.%d.%d.%d:%d",
2217
status, x[0], x[1], x[2], x[3], pMC->GetPort(), y[0], y[1], y[2], y[3], 0);
2218
REC_SignalWarning(REC_SIGNAL_CONFIG_ERROR, "ICP MC send setup failed");
2222
status = pMC->GetRecvChan()->setup_mc_receive(pMC->GetIP()->s_addr,
2224
NON_BLOCKING, pMC->GetSendChan(), _mcastCB_handler);
2227
*(uint32_t *) & x = (uint32_t) pMC->GetIP()->s_addr;
2228
// coverity[uninit_use_in_call]
2229
Warning("ICP MC recv setup failed, res=%d, ip=%d.%d.%d.%d:%d",
2230
status, x[0], x[1], x[2], x[3], pMC->GetPort());
2231
REC_SignalWarning(REC_SIGNAL_CONFIG_ERROR, "ICP MC recv setup failed");
2238
// Setup the socket for the local host.
2239
// We funnel all unicast sends and receives through
2240
// the local peer UDP socket.
2242
ParentSiblingPeer *pPS = (ParentSiblingPeer *) ((Peer *) _LocalPeer);
2244
pPS->GetChan()->setRemote(pPS->GetIP()->s_addr, pPS->GetPort());
2245
return 0; // Success
2249
ICPProcessor::ShutdownListenSockets()
2252
// Close all open sockets for elements on the PeerList[]
2254
ink_assert(!PendingQuery());
2258
for (index = 0; index < (_nPeerList + 1); ++index) {
2259
if ((P = _PeerList[index])) {
2260
if (P->GetType() == PEER_LOCAL) {
2261
ParentSiblingPeer *pPS = (ParentSiblingPeer *) P;
2262
(void) pPS->GetChan()->close();
2264
} else if (P->GetType() == PEER_MULTICAST) {
2265
MultiCastPeer *pMC = (MultiCastPeer *) P;
2266
(void) pMC->GetSendChan()->close();
2267
(void) pMC->GetRecvChan()->close();
2274
ICPProcessor::Reconfigure(int global_config_changed, int peer_config_changed)
2276
// Returns 0 on Success
2278
NOWARN_UNUSED(global_config_changed);
2279
NOWARN_UNUSED(peer_config_changed);
2281
// At this point, ICP requests processing is disabled and
2282
// no pending ICP requests exist.
2284
ink_assert(_ICPConfig->HaveLock());
2285
ink_assert(!AllowICPQueries());
2286
ink_assert(!PendingQuery());
2288
// Shutdown and deallocate all structures associated with the
2289
// current configuration.
2291
ShutdownListenSockets();
2294
// Copy the new configuration into the working copy and
2295
// rebuild all associated structures.
2297
_ICPConfig->UpdateGlobalConfig();
2298
_ICPConfig->UpdatePeerConfig();
2301
if ((status = BuildPeerList()) == 0) {
2302
status = SetupListenSockets();
2308
ICPProcessor::ReconfigState_t
2309
ICPProcessor::ReconfigureStateMachine(ReconfigState_t s, int gconfig_changed, int pconfig_changed)
2311
//*****************************************************************
2312
// State machine which performs the ICP reconfiguration actions.
2313
// Defined states are as follows:
2314
// 1) (RC_RECONFIG) disable ICP, reconfigure if no request pending,
2315
// else delay and retry. Reconfigure and if success move to
2316
// RC_ENABLE_ICP else RC_DONE.
2317
// 2) (RC_ENABLE_ICP) enable ICP, free ICP configuration lock.
2318
// 3) (RC_DONE) free ICP configuration lock.
2319
//*****************************************************************
2320
ink_assert(_ICPConfig->HaveLock());
2321
int reconfig_status;
2329
return RC_RECONFIG; // Unable to get lock, try again
2331
if (PendingQuery()) {
2332
DisableICPQueries(); // disable ICP processing
2334
CancelPendingReads();
2335
return RC_RECONFIG; // Pending requests, delay and retry
2338
DisableICPQueries(); // disable ICP processing
2340
// No pending ICP queries, perform reconfiguration
2341
reconfig_status = Reconfigure(gconfig_changed, pconfig_changed);
2343
if (reconfig_status == 0) {
2344
s = RC_ENABLE_ICP; // reconfig OK, enable ICP
2346
s = RC_DONE; // reconfig failed, do not enable ICP
2348
break; // move to next state
2355
return RC_ENABLE_ICP; // Unable to get lock, try again
2357
EnableICPQueries(); // Enable ICP processing
2361
break; // move to next state
2366
// Release configuration lock
2367
_ICPConfig->Unlock();
2368
return RC_DONE; // Reconfiguration complete
2372
ink_release_assert(0); // Should never happen
2378
#if !defined(_WIN32) && !defined(__GNUC__)
2379
_exit_while: // fix DEC warnings
2385
ICPProcessor::CancelPendingReads()
2387
// Cancel pending ICP read by sending a bogus message to
2388
// the local ICP port.
2390
ICPRequestCont *r = new(ICPRequestCont_allocator.alloc())
2391
ICPRequestCont(this, NULL, NULL);
2392
SET_CONTINUATION_HANDLER(r, (ICPRequestContHandler) & ICPRequestCont::NopICPRequestEvent);
2393
r->mutex = new_ProxyMutex();
2395
// TODO: Check return value?
2396
ICPRequestCont::BuildICPMsg(ICP_OP_HIT, 0, 0 /* optflags */ , 0 /* optdata */ , 0 /* shostid */ ,
2397
(void *) 0, 0, &r->_sendMsgHdr, r->_sendMsgIOV, &r->_ICPmsg);
2398
r->_sendMsgHdr.msg_iovlen = 1;
2399
r->_ICPmsg.h.version = ~r->_ICPmsg.h.version; // bogus message
2401
Peer *lp = GetLocalPeer();
2402
r->_sendMsgHdr.msg_name = (caddr_t) & (lp->GetSendChan())->sa;
2403
r->_sendMsgHdr.msg_namelen = sizeof((lp->GetSendChan())->sa);
2404
udpNet.sendmsg_re(r, r, lp->GetSendFD(), &r->_sendMsgHdr);
2408
ICPProcessor::GenericFindListPeer(struct in_addr *ip, int port, int validListItems, Ptr<Peer> *List)
2411
for (int n = 0; n < validListItems; ++n) {
2412
if ((P = List[n])) {
2413
if ((P->GetIP()->s_addr == ip->s_addr)
2414
&& ((port == -1) || (P->GetPort() == port)))
2422
ICPProcessor::FindPeer(struct in_addr * ip, int port)
2424
// Find (Peer *) with the given (ip,port) on the global list (PeerList)
2425
return GenericFindListPeer(ip, port, (_nPeerList + 1), _PeerList);
2429
ICPProcessor::FindSendListPeer(struct in_addr * ip, int port)
2431
// Find (Peer *) with the given (ip,port) on the
2432
// scheduler list (SendPeerList)
2433
return GenericFindListPeer(ip, port, (_nSendPeerList + 1), _SendPeerList);
2437
ICPProcessor::FindRecvListPeer(struct in_addr * ip, int port)
2439
// Find (Peer *) with the given (ip,port) on the
2440
// receive list (RecvPeerList)
2441
return GenericFindListPeer(ip, port, (_nRecvPeerList + 1), _RecvPeerList);
2445
ICPProcessor::AddPeer(Peer * P)
2447
// Add (Peer *) to the global list (PeerList). Make sure (ip,port) is
2449
// Returns 1 - added; 0 - Not added
2452
// Make sure no duplicate exists
2454
if (FindPeer(P->GetIP(), P->GetPort())) {
2456
*(uint32_t *) & x = (uint32_t) P->GetIP()->s_addr;
2457
// coverity[uninit_use_in_call]
2458
Warning("bad icp.config, multiple peer definitions for ip=%d.%d.%d.%d", x[0], x[1], x[2], x[3]);
2459
REC_SignalWarning(REC_SIGNAL_CONFIG_ERROR, "bad icp.config, multiple peer definitions");
2461
return 0; // Not added
2464
if (_nPeerList + 1 < PEER_LIST_SIZE) {
2466
_PeerList[_nPeerList] = P;
2467
P->SetPeerID(_nPeerList);
2470
return 0; // Not added
2476
ICPProcessor::AddPeerToRecvList(Peer * P)
2478
// Add (Peer *) to the listen list (RecvPeerList).
2479
// Make sure (ip,port) is unique.
2480
// Returns 1 - added; 0 - Not added
2482
// Assert that no duplicate exists
2483
ink_assert(FindRecvListPeer(P->GetIP(), P->GetPort()) == 0);
2485
if (_nRecvPeerList + 1 < RECV_PEER_LIST_SIZE) {
2487
_RecvPeerList[_nRecvPeerList] = P;
2490
return 0; // Not added
2495
ICPProcessor::AddPeerToSendList(Peer * P)
2497
// Add (Peer *) to the scheduler list (SendPeerList).
2498
// Make sure (ip,port) is unique.
2499
// Returns 1 - added; 0 - Not added
2501
// Assert that no duplicate exists
2502
ink_assert(FindSendListPeer(P->GetIP(), P->GetPort()) == 0);
2504
if (_nSendPeerList + 1 < SEND_PEER_LIST_SIZE) {
2506
_SendPeerList[_nSendPeerList] = P;
2509
return 0; // Not added
2514
ICPProcessor::AddPeerToParentList(Peer * P)
2516
// Add (Peer *) to the parent list (ParentPeerList).
2517
// Returns 1 - added; 0 - Not added
2519
if (_nParentPeerList + 1 < PARENT_PEER_LIST_SIZE) {
2521
_ParentPeerList[_nParentPeerList] = P;
2524
return 0; // Not added