24
25
//CBDATA_NAMESPACED_CLASS_INIT(Adaptation::Icap, Xaction);
26
Adaptation::Icap::Xaction::Xaction(const char *aTypeName, Adaptation::Initiator *anInitiator, Adaptation::Icap::ServiceRep::Pointer &aService):
27
Adaptation::Icap::Xaction::Xaction(const char *aTypeName,
28
Adaptation::Icap::ServiceRep::Pointer &aService):
27
29
AsyncJob(aTypeName),
28
Adaptation::Initiate(aTypeName, anInitiator),
30
Adaptation::Initiate(aTypeName),
50
52
debugs(93,3, typeName << " destructed, this=" << this <<
51
53
" [icapx" << id << ']'); // we should not call virtual status() here
52
54
HTTPMSGUNLOCK(icapRequest);
53
HTTPMSGUNLOCK(icapReply);
56
57
Adaptation::Icap::ServiceRep &
104
105
// fake the connect callback
105
106
// TODO: can we sync call Adaptation::Icap::Xaction::noteCommConnected here instead?
106
107
typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommConnectCbParams> Dialer;
107
Dialer dialer(this, &Adaptation::Icap::Xaction::noteCommConnected);
108
CbcPointer<Xaction> self(this);
109
Dialer dialer(self, &Adaptation::Icap::Xaction::noteCommConnected);
108
110
dialer.params.fd = connection;
109
111
dialer.params.flag = COMM_OK;
110
112
// fake other parameters by copying from the existing connection
116
118
disableRetries(); // we only retry pconn failures
118
120
IpAddress outgoing;
121
if (!Ip::EnableIpv6 && !outgoing.SetIPv4()) {
122
debugs(31, DBG_CRITICAL, "ERROR: IPv6 is disabled. " << outgoing << " is not an IPv4 address.");
123
dieOnConnectionFailure(); // throws
125
/* split-stack for now requires default IPv4-only socket */
126
if (Ip::EnableIpv6&IPV6_SPECIAL_SPLITSTACK && outgoing.IsAnyAddr() && !s.cfg().ipv6) {
119
130
connection = comm_open(SOCK_STREAM, 0, outgoing,
120
131
COMM_NONBLOCKING, s.cfg().uri.termedBuf());
127
138
// TODO: service bypass status may differ from that of a transaction
128
139
typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommTimeoutCbParams> TimeoutDialer;
129
AsyncCall::Pointer timeoutCall = asyncCall(93, 5, "Adaptation::Icap::Xaction::noteCommTimedout",
130
TimeoutDialer(this,&Adaptation::Icap::Xaction::noteCommTimedout));
140
AsyncCall::Pointer timeoutCall = JobCallback(93, 5,
141
TimeoutDialer, this, Adaptation::Icap::Xaction::noteCommTimedout);
132
142
commSetTimeout(connection, TheConfig.connect_timeout(
133
143
service().cfg().bypass), timeoutCall);
135
145
typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommCloseCbParams> CloseDialer;
136
closer = asyncCall(93, 5, "Adaptation::Icap::Xaction::noteCommClosed",
137
CloseDialer(this,&Adaptation::Icap::Xaction::noteCommClosed));
146
closer = JobCallback(93, 5,
147
CloseDialer, this, Adaptation::Icap::Xaction::noteCommClosed);
138
148
comm_add_close_handler(connection, closer);
140
150
typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommConnectCbParams> ConnectDialer;
141
connector = asyncCall(93,3, "Adaptation::Icap::Xaction::noteCommConnected",
142
ConnectDialer(this, &Adaptation::Icap::Xaction::noteCommConnected));
151
connector = JobCallback(93,3,
152
ConnectDialer, this, Adaptation::Icap::Xaction::noteCommConnected);
143
153
commConnectStart(connection, s.cfg().host.termedBuf(), s.cfg().port, connector);
224
234
// comm module will free the buffer
225
235
typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommIoCbParams> Dialer;
226
writer = asyncCall(93,3, "Adaptation::Icap::Xaction::noteCommWrote",
227
Dialer(this, &Adaptation::Icap::Xaction::noteCommWrote));
236
writer = JobCallback(93,3,
237
Dialer, this, Adaptation::Icap::Xaction::noteCommWrote);
229
239
comm_write_mbuf(connection, &buf, writer);
259
269
theService->cfg().methodStr() << " " <<
260
270
theService->cfg().uri << status());
261
271
reuseConnection = false;
262
service().noteFailure();
264
throw TexcHere(connector != NULL ?
273
const bool whileConnecting = connector != NULL;
274
closeConnection(); // so that late Comm callbacks do not disturb bypass
275
throw TexcHere(whileConnecting ?
265
276
"timed out while connecting to the ICAP service" :
266
277
"timed out while talking to the ICAP service");
306
317
// XXX: why does Config.Timeout lacks a write timeout?
307
318
// TODO: service bypass status may differ from that of a transaction
308
319
typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommTimeoutCbParams> TimeoutDialer;
309
AsyncCall::Pointer call = asyncCall(93, 5, "Adaptation::Icap::Xaction::noteCommTimedout",
310
TimeoutDialer(this,&Adaptation::Icap::Xaction::noteCommTimedout));
320
AsyncCall::Pointer call = JobCallback(93,5,
321
TimeoutDialer, this, Adaptation::Icap::Xaction::noteCommTimedout);
312
323
commSetTimeout(connection,
313
324
TheConfig.io_timeout(service().cfg().bypass), call);
330
341
* here instead of reading directly into readBuf.buf.
332
343
typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommIoCbParams> Dialer;
333
reader = asyncCall(93,3, "Adaptation::Icap::Xaction::noteCommRead",
334
Dialer(this, &Adaptation::Icap::Xaction::noteCommRead));
344
reader = JobCallback(93,3,
345
Dialer, this, Adaptation::Icap::Xaction::noteCommRead);
336
347
comm_read(connection, commBuf, readBuf.spaceSize(), reader);
346
357
Must(io.flag == COMM_OK);
347
358
Must(io.size >= 0);
349
al.icap.bytesRead+=io.size;
353
debugs(93, 3, HERE << "read " << io.size << " bytes");
356
* See comments in Adaptation::Icap::Xaction.h about why we use commBuf
357
* here instead of reading directly into readBuf.buf.
362
reuseConnection = false;
364
// detect a pconn race condition: eof on the first pconn read
365
if (!al.icap.bytesRead && retriable()) {
367
mustStop("pconn race");
372
al.icap.bytesRead+=io.size;
376
debugs(93, 3, HERE << "read " << io.size << " bytes");
379
* See comments in Adaptation::Icap::Xaction.h about why we use commBuf
380
* here instead of reading directly into readBuf.buf.
361
383
readBuf.append(commBuf, io.size);
362
384
disableRetries(); // because pconn did not fail
364
reuseConnection = false;
368
387
handleCommRead(io.size);
465
483
void Adaptation::Icap::Xaction::tellQueryAborted()
467
Adaptation::Icap::Launcher *l = dynamic_cast<Adaptation::Icap::Launcher*>(theInitiator.ptr());
468
Adaptation::Icap::XactAbortInfo abortInfo(icapRequest, icapReply, retriable(), repeatable());
469
CallJob(91, 5, __FILE__, __LINE__,
470
"Adaptation::Icap::Launcher::noteXactAbort",
471
XactAbortCall(l, &Adaptation::Icap::Launcher::noteXactAbort, abortInfo) );
485
if (theInitiator.set()) {
486
Adaptation::Icap::XactAbortInfo abortInfo(icapRequest, icapReply,
487
retriable(), repeatable());
488
Launcher *launcher = dynamic_cast<Launcher*>(theInitiator.get());
489
// launcher may be nil if initiator is invalid
490
CallJobHere1(91,5, CbcPointer<Launcher>(launcher),
491
Launcher, noteXactAbort, abortInfo);