2
* DEBUG: section 93 ICAP (RFC 3507) Client
8
#include "HttpRequest.h"
10
#include "ICAPServiceRep.h"
11
#include "ICAPInitiator.h"
12
#include "ICAPLauncher.h"
13
#include "ICAPModXact.h"
14
#include "ICAPClient.h"
15
#include "ChunkedCodingParser.h"
16
#include "TextException.h"
17
#include "AuthUserRequest.h"
18
#include "ICAPConfig.h"
19
#include "SquidTime.h"
21
// flow and terminology:
22
// HTTP| --> receive --> encode --> write --> |network
23
// end | <-- send <-- parse <-- read <-- |end
25
// TODO: replace gotEncapsulated() with something faster; we call it often
27
CBDATA_CLASS_INIT(ICAPModXact);
28
CBDATA_CLASS_INIT(ICAPModXactLauncher);
30
static const size_t TheBackupLimit = BodyPipe::MaxCapacity;
32
extern ICAPConfig TheICAPConfig;
35
ICAPModXact::State::State()
37
memset(this, 0, sizeof(*this));
40
ICAPModXact::ICAPModXact(ICAPInitiator *anInitiator, HttpMsg *virginHeader,
41
HttpRequest *virginCause, ICAPServiceRep::Pointer &aService):
42
ICAPXaction("ICAPModXact", anInitiator, aService),
46
canStartBypass(false) // too early
50
virgin.setHeader(virginHeader); // sets virgin.body_pipe if needed
51
virgin.setCause(virginCause); // may be NULL
53
// adapted header and body are initialized when we parse them
55
// writing and reading ends are handled by ICAPXaction
58
// nothing to do because we are using temporary buffers
61
icapReply = new HttpReply;
62
icapReply->protoPrefix = "ICAP/"; // TODO: make an IcapReply class?
64
debugs(93,7, "ICAPModXact initialized." << status());
67
// initiator wants us to start
68
void ICAPModXact::start()
72
estimateVirginBody(); // before virgin disappears!
74
canStartBypass = service().bypass;
76
// it is an ICAP violation to send request to a service w/o known OPTIONS
83
// XXX: If commConnectStart in startWriting fails, we may get here
84
//_after_ the object got destroyed. Somebody please fix commConnectStart!
85
// TODO: Does re-entrance protection in callStart() solve the above?
89
void ICAPModXact_noteServiceReady(void *data, ICAPServiceRep::Pointer &)
91
ICAPModXact *x = static_cast<ICAPModXact*>(data);
93
x->noteServiceReady();
96
void ICAPModXact::waitForService()
98
Must(!state.serviceWaiting);
99
debugs(93, 7, "ICAPModXact will wait for the ICAP service" << status());
100
state.serviceWaiting = true;
101
service().callWhenReady(&ICAPModXact_noteServiceReady, this);
104
void ICAPModXact::noteServiceReady()
106
ICAPXaction_Enter(noteServiceReady);
108
Must(state.serviceWaiting);
109
state.serviceWaiting = false;
111
if (service().up()) {
115
throw TexcHere("ICAP service is unusable");
121
void ICAPModXact::startWriting()
123
state.writing = State::writingConnect;
125
decideOnPreview(); // must be decided before we decideOnRetries
129
// put nothing here as openConnection calls commConnectStart
130
// and that may call us back without waiting for the next select loop
133
// connection with the ICAP service established
134
void ICAPModXact::handleCommConnected()
136
Must(state.writing == State::writingConnect);
138
startReading(); // wait for early errors from the ICAP server
143
makeRequestHeaders(requestBuf);
144
debugs(93, 9, "ICAPModXact ICAP will write" << status() << ":\n" <<
145
(requestBuf.terminate(), requestBuf.content()));
148
state.writing = State::writingHeaders;
149
scheduleWrite(requestBuf);
152
void ICAPModXact::handleCommWrote(size_t sz)
154
debugs(93, 5, HERE << "Wrote " << sz << " bytes");
156
if (state.writing == State::writingHeaders)
157
handleCommWroteHeaders();
159
handleCommWroteBody();
162
void ICAPModXact::handleCommWroteHeaders()
164
Must(state.writing == State::writingHeaders);
166
// determine next step
167
if (preview.enabled())
168
state.writing = preview.done() ? State::writingPaused : State::writingPreview;
170
if (virginBody.expected())
171
state.writing = State::writingPrime;
180
void ICAPModXact::writeMore()
182
debugs(93, 5, HERE << "checking whether to write more" << status());
184
if (writer) // already writing something
187
switch (state.writing) {
189
case State::writingInit: // waiting for service OPTIONS
190
Must(state.serviceWaiting);
192
case State::writingConnect: // waiting for the connection to establish
194
case State::writingHeaders: // waiting for the headers to be written
196
case State::writingPaused: // waiting for the ICAP server response
198
case State::writingReallyDone: // nothing more to write
201
case State::writingAlmostDone: // was waiting for the last write
205
case State::writingPreview:
209
case State::writingPrime:
214
throw TexcHere("ICAPModXact in bad writing state");
218
void ICAPModXact::writePreviewBody()
220
debugs(93, 8, HERE << "will write Preview body from " <<
221
virgin.body_pipe << status());
222
Must(state.writing == State::writingPreview);
223
Must(virgin.body_pipe != NULL);
225
const size_t sizeMax = (size_t)virgin.body_pipe->buf().contentSize();
226
const size_t size = XMIN(preview.debt(), sizeMax);
227
writeSomeBody("preview body", size);
229
// change state once preview is written
231
if (preview.done()) {
232
debugs(93, 7, "ICAPModXact wrote entire Preview body" << status());
237
state.writing = State::writingPaused;
241
void ICAPModXact::writePrimeBody()
243
Must(state.writing == State::writingPrime);
244
Must(virginBodyWriting.active());
246
const size_t size = (size_t)virgin.body_pipe->buf().contentSize();
247
writeSomeBody("prime virgin body", size);
249
if (virginBodyEndReached(virginBodyWriting)) {
250
debugs(93, 5, HERE << "wrote entire body");
255
void ICAPModXact::writeSomeBody(const char *label, size_t size)
257
Must(!writer && state.writing < state.writingAlmostDone);
258
Must(virgin.body_pipe != NULL);
259
debugs(93, 8, HERE << "will write up to " << size << " bytes of " <<
262
MemBuf writeBuf; // TODO: suggest a min size based on size and lastChunk
264
writeBuf.init(); // note: we assume that last-chunk will fit
266
const size_t writableSize = virginContentSize(virginBodyWriting);
267
const size_t chunkSize = XMIN(writableSize, size);
270
debugs(93, 7, HERE << "will write " << chunkSize <<
271
"-byte chunk of " << label);
273
openChunk(writeBuf, chunkSize, false);
274
writeBuf.append(virginContentData(virginBodyWriting), chunkSize);
275
closeChunk(writeBuf);
277
virginBodyWriting.progress(chunkSize);
280
debugs(93, 7, "ICAPModXact has no writable " << label << " content");
283
const bool wroteEof = virginBodyEndReached(virginBodyWriting);
284
bool lastChunk = wroteEof;
285
if (state.writing == State::writingPreview) {
286
preview.wrote(chunkSize, wroteEof); // even if wrote nothing
287
lastChunk = lastChunk || preview.done();
291
debugs(93, 8, HERE << "will write last-chunk of " << label);
292
addLastRequestChunk(writeBuf);
295
debugs(93, 7, HERE << "will write " << writeBuf.contentSize()
296
<< " raw bytes of " << label);
298
if (writeBuf.hasContent()) {
299
scheduleWrite(writeBuf); // comm will free the chunk
305
void ICAPModXact::addLastRequestChunk(MemBuf &buf)
307
const bool ieof = state.writing == State::writingPreview && preview.ieof();
308
openChunk(buf, 0, ieof);
312
void ICAPModXact::openChunk(MemBuf &buf, size_t chunkSize, bool ieof)
314
buf.Printf((ieof ? "%x; ieof\r\n" : "%x\r\n"), (int) chunkSize);
317
void ICAPModXact::closeChunk(MemBuf &buf)
319
buf.append(ICAP::crlf, 2); // chunk-terminating CRLF
322
// did the activity reached the end of the virgin body?
323
bool ICAPModXact::virginBodyEndReached(const VirginBodyAct &act) const
326
!act.active() || // did all (assuming it was originally planned)
327
!virgin.body_pipe->expectMoreAfter(act.offset()); // wont have more
330
// the size of buffered virgin body data available for the specified activity
331
// if this size is zero, we may be done or may be waiting for more data
332
size_t ICAPModXact::virginContentSize(const VirginBodyAct &act) const
335
// asbolute start of unprocessed data
336
const uint64_t start = act.offset();
337
// absolute end of buffered data
338
const uint64_t end = virginConsumed + virgin.body_pipe->buf().contentSize();
339
Must(virginConsumed <= start && start <= end);
340
return static_cast<size_t>(end - start);
343
// pointer to buffered virgin body data available for the specified activity
344
const char *ICAPModXact::virginContentData(const VirginBodyAct &act) const
347
const uint64_t start = act.offset();
348
Must(virginConsumed <= start);
349
return virgin.body_pipe->buf().content() + static_cast<size_t>(start-virginConsumed);
352
void ICAPModXact::virginConsume()
354
debugs(93, 9, "consumption guards: " << !virgin.body_pipe << isRetriable);
356
if (!virgin.body_pipe)
357
return; // nothing to consume
360
return; // do not consume if we may have to retry later
362
BodyPipe &bp = *virgin.body_pipe;
364
// Why > 2? HttpState does not use the last bytes in the buffer
365
// because delayAwareRead() is arguably broken. See
366
// HttpStateData::maybeReadVirginBody for more details.
367
if (canStartBypass && bp.buf().spaceSize() > 2) {
368
// Postponing may increase memory footprint and slow the HTTP side
369
// down. Not postponing may increase the number of ICAP errors
370
// if the ICAP service fails. We may also use "potential" space to
371
// postpone more aggressively. Should the trade-off be configurable?
372
debugs(93, 8, HERE << "postponing consumption from " << bp.status());
376
const size_t have = static_cast<size_t>(bp.buf().contentSize());
377
const uint64_t end = virginConsumed + have;
378
uint64_t offset = end;
380
debugs(93, 9, HERE << "max virgin consumption offset=" << offset <<
381
" acts " << virginBodyWriting.active() << virginBodySending.active() <<
382
" consumed=" << virginConsumed <<
383
" from " << virgin.body_pipe->status());
385
if (virginBodyWriting.active())
386
offset = XMIN(virginBodyWriting.offset(), offset);
388
if (virginBodySending.active())
389
offset = XMIN(virginBodySending.offset(), offset);
391
Must(virginConsumed <= offset && offset <= end);
393
if (const size_t size = static_cast<size_t>(offset - virginConsumed)) {
394
debugs(93, 8, HERE << "consuming " << size << " out of " << have <<
395
" virgin body bytes");
397
virginConsumed += size;
398
Must(!isRetriable); // or we should not be consuming
399
disableBypass("consumed content");
403
void ICAPModXact::handleCommWroteBody()
408
// Called when we do not expect to call comm_write anymore.
409
// We may have a pending write though.
410
// If stopping nicely, we will just wait for that pending write, if any.
411
void ICAPModXact::stopWriting(bool nicely)
413
if (state.writing == State::writingReallyDone)
418
debugs(93, 7, HERE << "will wait for the last write" << status());
419
state.writing = State::writingAlmostDone; // may already be set
423
debugs(93, 3, HERE << "will NOT wait for the last write" << status());
425
// Comm does not have an interface to clear the writer callback nicely,
426
// but without clearing the writer we cannot recycle the connection.
427
// We prevent connection reuse and hope that we can handle a callback
428
// call at any time, usually in the middle of the destruction sequence!
429
// Somebody should add comm_remove_write_handler() to comm API.
430
reuseConnection = false;
431
ignoreLastWrite = true;
434
debugs(93, 7, HERE << "will no longer write" << status());
435
if (virginBodyWriting.active()) {
436
virginBodyWriting.disable();
439
state.writing = State::writingReallyDone;
443
void ICAPModXact::stopBackup()
445
if (!virginBodySending.active())
448
debugs(93, 7, "ICAPModXact will no longer backup" << status());
449
virginBodySending.disable();
453
bool ICAPModXact::doneAll() const
455
return ICAPXaction::doneAll() && !state.serviceWaiting &&
457
doneReading() && state.doneWriting();
460
void ICAPModXact::startReading()
462
Must(connection >= 0);
464
Must(!adapted.header);
465
Must(!adapted.body_pipe);
467
// we use the same buffer for headers and body and then consume headers
471
void ICAPModXact::readMore()
473
if (reader || doneReading()) {
474
debugs(93,3,HERE << "returning from readMore because reader or doneReading()");
478
// do not fill readBuf if we have no space to store the result
479
if (adapted.body_pipe != NULL &&
480
!adapted.body_pipe->buf().hasPotentialSpace()) {
481
debugs(93,3,HERE << "not reading because ICAP reply pipe is full");
485
if (readBuf.hasSpace())
488
debugs(93,3,HERE << "nothing to do because !readBuf.hasSpace()");
491
// comm module read a portion of the ICAP response for us
492
void ICAPModXact::handleCommRead(size_t)
494
Must(!state.doneParsing());
499
void ICAPModXact::echoMore()
501
Must(state.sending == State::sendingVirgin);
502
Must(adapted.body_pipe != NULL);
503
Must(virginBodySending.active());
505
const size_t sizeMax = virginContentSize(virginBodySending);
506
debugs(93,5, HERE << "will echo up to " << sizeMax << " bytes from " <<
507
virgin.body_pipe->status());
508
debugs(93,5, HERE << "will echo up to " << sizeMax << " bytes to " <<
509
adapted.body_pipe->status());
512
const size_t size = adapted.body_pipe->putMoreData(virginContentData(virginBodySending), sizeMax);
513
debugs(93,5, HERE << "echoed " << size << " out of " << sizeMax <<
515
virginBodySending.progress(size);
517
disableBypass("echoed content");
520
if (virginBodyEndReached(virginBodySending)) {
521
debugs(93, 5, "ICAPModXact echoed all" << status());
524
debugs(93, 5, "ICAPModXact has " <<
525
virgin.body_pipe->buf().contentSize() << " bytes " <<
526
"and expects more to echo" << status());
527
// TODO: timeout if virgin or adapted pipes are broken
531
bool ICAPModXact::doneSending() const
533
return state.sending == State::sendingDone;
536
// stop (or do not start) sending adapted message body
537
void ICAPModXact::stopSending(bool nicely)
542
if (state.sending != State::sendingUndecided) {
543
debugs(93, 7, "ICAPModXact will no longer send" << status());
544
if (adapted.body_pipe != NULL) {
545
virginBodySending.disable();
546
// we may leave debts if we were echoing and the virgin
547
// body_pipe got exhausted before we echoed all planned bytes
548
const bool leftDebts = adapted.body_pipe->needsMoreData();
549
stopProducingFor(adapted.body_pipe, nicely && !leftDebts);
552
debugs(93, 7, "ICAPModXact will not start sending" << status());
553
Must(!adapted.body_pipe);
556
state.sending = State::sendingDone;
560
// should be called after certain state.writing or state.sending changes
561
void ICAPModXact::checkConsuming()
563
// quit if we already stopped or are still using the pipe
564
if (!virgin.body_pipe || !state.doneConsumingVirgin())
567
debugs(93, 7, HERE << "will stop consuming" << status());
568
stopConsumingFrom(virgin.body_pipe);
571
void ICAPModXact::parseMore()
573
debugs(93, 5, HERE << "have " << readBuf.contentSize() << " bytes to parse" <<
575
debugs(93, 5, HERE << "\n" << readBuf.content());
577
if (state.parsingHeaders())
580
if (state.parsing == State::psBody)
584
void ICAPModXact::callException(const TextException &e)
586
if (!canStartBypass || isRetriable) {
587
ICAPXaction::callException(e);
592
debugs(93, 3, "bypassing ICAPModXact::" << inCall << " exception: " <<
593
e.message << ' ' << status());
596
catch (const TextException &bypassE) {
597
ICAPXaction::callException(bypassE);
601
void ICAPModXact::bypassFailure()
603
disableBypass("already started to bypass");
605
Must(!isRetriable); // or we should not be bypassing
611
// end all activities associated with the ICAP server
615
stopWriting(true); // or should we force it?
616
if (connection >= 0) {
617
reuseConnection = false; // be conservative
618
cancelRead(); // may not work; and we cannot stop connecting either
620
debugs(93, 7, "Warning: bypass failed to stop I/O" << status());
624
void ICAPModXact::disableBypass(const char *reason)
626
if (canStartBypass) {
627
debugs(93,7, HERE << "will never start bypass because " << reason);
628
canStartBypass = false;
634
// note that allocation for echoing is done in handle204NoContent()
635
void ICAPModXact::maybeAllocateHttpMsg()
637
if (adapted.header) // already allocated
640
if (gotEncapsulated("res-hdr")) {
641
adapted.setHeader(new HttpReply);
642
} else if (gotEncapsulated("req-hdr")) {
643
adapted.setHeader(new HttpRequest);
645
throw TexcHere("Neither res-hdr nor req-hdr in maybeAllocateHttpMsg()");
648
void ICAPModXact::parseHeaders()
650
Must(state.parsingHeaders());
652
if (state.parsing == State::psIcapHeader) {
653
debugs(93, 5, HERE << "parse ICAP headers");
657
if (state.parsing == State::psHttpHeader) {
658
debugs(93, 5, HERE << "parse HTTP headers");
662
if (state.parsingHeaders()) { // need more data
670
// called after parsing all headers or when bypassing an exception
671
void ICAPModXact::startSending()
673
disableBypass("sent headers");
674
sendAnswer(adapted.header);
676
if (state.sending == State::sendingVirgin)
680
void ICAPModXact::parseIcapHead()
682
Must(state.sending == State::sendingUndecided);
684
if (!parseHead(icapReply))
687
if (httpHeaderHasConnDir(&icapReply->header, "close")) {
688
debugs(93, 5, HERE << "found connection close");
689
reuseConnection = false;
692
switch (icapReply->sline.status) {
699
case 201: // Symantec Scan Engine 5.0 and later when modifying HTTP msg
701
if (!validate200Ok()) {
702
throw TexcHere("Invalid ICAP Response");
710
handle204NoContent();
714
debugs(93, 5, HERE << "ICAP status " << icapReply->sline.status);
715
handleUnknownScode();
719
// handle100Continue() manages state.writing on its own.
720
// Non-100 status means the server needs no postPreview data from us.
721
if (state.writing == State::writingPaused)
725
bool ICAPModXact::validate200Ok()
727
if (ICAP::methodRespmod == service().method) {
728
if (!gotEncapsulated("res-hdr"))
734
if (ICAP::methodReqmod == service().method) {
735
if (!gotEncapsulated("res-hdr") && !gotEncapsulated("req-hdr"))
744
void ICAPModXact::handle100Continue()
746
Must(state.writing == State::writingPaused);
747
// server must not respond before the end of preview: we may send ieof
748
Must(preview.enabled() && preview.done() && !preview.ieof());
750
// 100 "Continue" cancels our preview commitment, not 204s outside preview
751
if (!state.allowedPostview204)
754
state.parsing = State::psIcapHeader; // eventually
757
state.writing = State::writingPrime;
762
void ICAPModXact::handle200Ok()
764
state.parsing = State::psHttpHeader;
765
state.sending = State::sendingAdapted;
770
void ICAPModXact::handle204NoContent()
776
// Called when we receive a 204 No Content response and
777
// when we are trying to bypass a service failure.
778
// We actually start sending (echoig or not) in startSending.
779
void ICAPModXact::prepEchoing()
781
disableBypass("preparing to echo content");
783
// We want to clone the HTTP message, but we do not want
784
// to copy some non-HTTP state parts that HttpMsg kids carry in them.
785
// Thus, we cannot use a smart pointer, copy constructor, or equivalent.
786
// Instead, we simply write the HTTP message and "clone" it by parsing.
788
HttpMsg *oldHead = virgin.header;
789
debugs(93, 7, "ICAPModXact cloning virgin message " << oldHead);
793
// write the virgin message into a memory buffer
795
packHead(httpBuf, oldHead);
797
// allocate the adapted message and copy metainfo
798
Must(!adapted.header);
799
HttpMsg *newHead = NULL;
800
if (const HttpRequest *oldR = dynamic_cast<const HttpRequest*>(oldHead)) {
801
HttpRequest *newR = new HttpRequest;
802
inheritVirginProperties(*newR, *oldR);
805
if (dynamic_cast<const HttpReply*>(oldHead))
806
newHead = new HttpReply;
809
adapted.setHeader(newHead);
811
// parse the buffer back
812
http_status error = HTTP_STATUS_NONE;
814
Must(newHead->parse(&httpBuf, true, &error));
816
Must(newHead->hdr_sz == httpBuf.contentSize()); // no leftovers
820
debugs(93, 7, "ICAPModXact cloned virgin message " << oldHead << " to " <<
823
// setup adapted body pipe if needed
824
if (oldHead->body_pipe != NULL) {
825
debugs(93, 7, HERE << "will echo virgin body from " <<
827
if (!virginBodySending.active())
828
virginBodySending.plan(); // will throw if not possible
829
state.sending = State::sendingVirgin;
832
// TODO: optimize: is it possible to just use the oldHead pipe and
833
// remove ICAP from the loop? This echoing is probably a common case!
834
makeAdaptedBodyPipe("echoed virgin response");
835
if (oldHead->body_pipe->bodySizeKnown())
836
adapted.body_pipe->setBodySize(oldHead->body_pipe->bodySize());
837
debugs(93, 7, HERE << "will echo virgin body to " <<
840
debugs(93, 7, HERE << "no virgin body to echo");
845
void ICAPModXact::handleUnknownScode()
849
// TODO: mark connection as "bad"
851
// Terminate the transaction; we do not know how to handle this response.
852
throw TexcHere("Unsupported ICAP status code");
855
void ICAPModXact::parseHttpHead()
857
if (gotEncapsulated("res-hdr") || gotEncapsulated("req-hdr")) {
858
maybeAllocateHttpMsg();
860
if (!parseHead(adapted.header))
861
return; // need more header data
863
if (HttpRequest *newHead = dynamic_cast<HttpRequest*>(adapted.header)) {
864
const HttpRequest *oldR = dynamic_cast<const HttpRequest*>(virgin.header);
866
// TODO: the adapted request did not really originate from the
867
// client; give proxy admin an option to prevent copying of
868
// sensitive client information here. See the following thread:
869
// http://www.squid-cache.org/mail-archive/squid-dev/200703/0040.html
870
inheritVirginProperties(*newHead, *oldR);
874
decideOnParsingBody();
877
// parses both HTTP and ICAP headers
878
bool ICAPModXact::parseHead(HttpMsg *head)
881
debugs(93, 5, HERE << "have " << readBuf.contentSize() << " head bytes to parse" <<
882
"; state: " << state.parsing);
884
http_status error = HTTP_STATUS_NONE;
885
const bool parsed = head->parse(&readBuf, commEof, &error);
886
Must(parsed || !error); // success or need more data
888
if (!parsed) { // need more data
889
debugs(93, 5, HERE << "parse failed, need more data, return false");
894
debugs(93, 5, HERE << "parse success, consume " << head->hdr_sz << " bytes, return true");
895
readBuf.consume(head->hdr_sz);
899
// TODO: Move this method to HttpRequest?
900
void ICAPModXact::inheritVirginProperties(HttpRequest &newR, const HttpRequest &oldR) {
902
newR.client_addr = oldR.client_addr;
903
newR.client_port = oldR.client_port;
905
newR.my_addr = oldR.my_addr;
906
newR.my_port = oldR.my_port;
908
// This may be too conservative for the 204 No Content case
909
// may eventually need cloneNullAdaptationImmune() for that.
910
newR.flags = oldR.flags.cloneAdaptationImmune();
912
if (oldR.auth_user_request) {
913
newR.auth_user_request = oldR.auth_user_request;
914
AUTHUSERREQUESTLOCK(newR.auth_user_request, "newR in ICAPModXact");
918
void ICAPModXact::decideOnParsingBody() {
919
if (gotEncapsulated("res-body") || gotEncapsulated("req-body")) {
920
debugs(93, 5, HERE << "expecting a body");
921
state.parsing = State::psBody;
922
bodyParser = new ChunkedCodingParser;
923
makeAdaptedBodyPipe("adapted response from the ICAP server");
924
Must(state.sending == State::sendingAdapted);
926
debugs(93, 5, HERE << "not expecting a body");
932
void ICAPModXact::parseBody()
934
Must(state.parsing == State::psBody);
937
debugs(93, 5, HERE << "have " << readBuf.contentSize() << " body bytes to parse");
939
// the parser will throw on errors
940
BodyPipeCheckout bpc(*adapted.body_pipe);
941
const bool parsed = bodyParser->parse(&readBuf, &bpc.buf);
944
debugs(93, 5, HERE << "have " << readBuf.contentSize() << " body bytes after " <<
945
"parse; parsed all: " << parsed);
947
// TODO: expose BodyPipe::putSize() to make this check simpler and clearer
948
if (adapted.body_pipe->buf().contentSize() > 0) // parsed something sometime
949
disableBypass("sent adapted content");
953
stopSending(true); // the parser succeeds only if all parsed data fits
957
debugs(93,3,HERE << this << " needsMoreData = " << bodyParser->needsMoreData());
959
if (bodyParser->needsMoreData()) {
960
debugs(93,3,HERE << this);
965
if (bodyParser->needsMoreSpace()) {
966
Must(!doneSending()); // can hope for more space
967
Must(adapted.body_pipe->buf().contentSize() > 0); // paranoid
968
// TODO: there should be a timeout in case the sink is broken
969
// or cannot consume partial content (while we need more space)
973
void ICAPModXact::stopParsing()
975
if (state.parsing == State::psDone)
978
debugs(93, 7, "ICAPModXact will no longer parse" << status());
984
state.parsing = State::psDone;
987
// HTTP side added virgin body data
988
void ICAPModXact::noteMoreBodyDataAvailable(BodyPipe &)
990
ICAPXaction_Enter(noteMoreBodyDataAvailable);
994
if (state.sending == State::sendingVirgin)
1000
// HTTP side sent us all virgin info
1001
void ICAPModXact::noteBodyProductionEnded(BodyPipe &)
1003
ICAPXaction_Enter(noteBodyProductionEnded);
1005
Must(virgin.body_pipe->productionEnded());
1007
// push writer and sender in case we were waiting for the last-chunk
1010
if (state.sending == State::sendingVirgin)
1016
// body producer aborted, but the initiator may still want to know
1017
// the answer, even though the HTTP message has been truncated
1018
void ICAPModXact::noteBodyProducerAborted(BodyPipe &)
1020
ICAPXaction_Enter(noteBodyProducerAborted);
1022
Must(virgin.body_pipe->productionEnded());
1024
// push writer and sender in case we were waiting for the last-chunk
1027
if (state.sending == State::sendingVirgin)
1033
// adapted body consumer wants more adapted data and
1034
// possibly freed some buffer space
1035
void ICAPModXact::noteMoreBodySpaceAvailable(BodyPipe &)
1037
ICAPXaction_Enter(noteMoreBodySpaceAvailable);
1039
if (state.sending == State::sendingVirgin)
1041
else if (state.sending == State::sendingAdapted)
1044
Must(state.sending == State::sendingUndecided);
1049
// adapted body consumer aborted
1050
void ICAPModXact::noteBodyConsumerAborted(BodyPipe &)
1052
ICAPXaction_Enter(noteBodyConsumerAborted);
1054
mustStop("adapted body consumer aborted");
1060
void ICAPModXact::swanSong()
1062
debugs(93, 5, HERE << "swan sings" << status());
1072
ICAPXaction::swanSong();
1075
void ICAPModXact::makeRequestHeaders(MemBuf &buf)
1078
* XXX These should use HttpHdr interfaces instead of Printfs
1080
const ICAPServiceRep &s = service();
1081
buf.Printf("%s %s ICAP/1.0\r\n", s.methodStr(), s.uri.buf());
1082
buf.Printf("Host: %s:%d\r\n", s.host.buf(), s.port);
1083
buf.Printf("Date: %s\r\n", mkrfc1123(squid_curtime));
1085
if (!TheICAPConfig.reuse_connections)
1086
buf.Printf("Connection: close\r\n");
1088
// we must forward "Proxy-Authenticate" and "Proxy-Authorization"
1090
if (virgin.header->header.has(HDR_PROXY_AUTHENTICATE))
1091
buf.Printf("Proxy-Authenticate: %s\r\n",
1092
virgin.header->header.getByName("Proxy-Authenticate").buf());
1094
if (virgin.header->header.has(HDR_PROXY_AUTHORIZATION))
1095
buf.Printf("Proxy-Authorization: %s\r\n",
1096
virgin.header->header.getByName("Proxy-Authorization").buf());
1098
buf.Printf("Encapsulated: ");
1104
// build HTTP request header, if any
1105
ICAP::Method m = s.method;
1107
const HttpRequest *request = virgin.cause ?
1109
dynamic_cast<const HttpRequest*>(virgin.header);
1111
// to simplify, we could assume that request is always available
1115
urlPath = request->urlpath;
1116
if (ICAP::methodRespmod == m)
1117
encapsulateHead(buf, "req-hdr", httpBuf, request);
1119
if (ICAP::methodReqmod == m)
1120
encapsulateHead(buf, "req-hdr", httpBuf, virgin.header);
1123
if (ICAP::methodRespmod == m)
1124
if (const HttpMsg *prime = virgin.header)
1125
encapsulateHead(buf, "res-hdr", httpBuf, prime);
1127
if (!virginBody.expected())
1128
buf.Printf("null-body=%d", (int) httpBuf.contentSize());
1129
else if (ICAP::methodReqmod == m)
1130
buf.Printf("req-body=%d", (int) httpBuf.contentSize());
1132
buf.Printf("res-body=%d", (int) httpBuf.contentSize());
1134
buf.append(ICAP::crlf, 2); // terminate Encapsulated line
1136
if (preview.enabled()) {
1137
buf.Printf("Preview: %d\r\n", (int)preview.ad());
1138
if (virginBody.expected()) // there is a body to preview
1139
virginBodySending.plan();
1141
finishNullOrEmptyBodyPreview(httpBuf);
1144
if (shouldAllow204()) {
1145
debugs(93,5, HERE << "will allow 204s outside of preview");
1146
state.allowedPostview204 = true;
1147
buf.Printf("Allow: 204\r\n");
1148
if (virginBody.expected()) // there is a body to echo
1149
virginBodySending.plan();
1152
if (TheICAPConfig.send_client_ip && request)
1153
if (request->client_addr.s_addr != any_addr.s_addr &&
1154
request->client_addr.s_addr != no_addr.s_addr)
1155
buf.Printf("X-Client-IP: %s\r\n", inet_ntoa(request->client_addr));
1157
if (TheICAPConfig.send_client_username && request)
1158
makeUsernameHeader(request, buf);
1160
// fprintf(stderr, "%s\n", buf.content());
1162
buf.append(ICAP::crlf, 2); // terminate ICAP header
1164
// start ICAP request body with encapsulated HTTP headers
1165
buf.append(httpBuf.content(), httpBuf.contentSize());
1170
void ICAPModXact::makeUsernameHeader(const HttpRequest *request, MemBuf &buf) {
1171
if (const AuthUserRequest *auth = request->auth_user_request) {
1172
if (char const *name = auth->username()) {
1173
const char *value = TheICAPConfig.client_username_encode ?
1174
base64_encode(name) : name;
1175
buf.Printf("%s: %s\r\n", TheICAPConfig.client_username_header,
1181
void ICAPModXact::encapsulateHead(MemBuf &icapBuf, const char *section, MemBuf &httpBuf, const HttpMsg *head)
1183
// update ICAP header
1184
icapBuf.Printf("%s=%d, ", section, (int) httpBuf.contentSize());
1187
HttpMsg *headClone = NULL;
1189
if (const HttpRequest* old_request = dynamic_cast<const HttpRequest*>(head)) {
1190
HttpRequest* new_request = new HttpRequest;
1191
urlParse(old_request->method, old_request->canonical,new_request);
1192
new_request->http_ver = old_request->http_ver;
1193
inheritVirginProperties(*new_request, *old_request);
1194
headClone = new_request;
1196
else if (const HttpReply *old_reply = dynamic_cast<const HttpReply*>(head)) {
1197
HttpReply* new_reply = new HttpReply;
1198
new_reply->sline = old_reply->sline;
1199
headClone = new_reply;
1204
HttpHeaderPos pos = HttpHeaderInitPos;
1205
HttpHeaderEntry* p_head_entry = NULL;
1206
while (NULL != (p_head_entry = head->header.getEntry(&pos)) )
1207
headClone->header.addEntry(p_head_entry->clone());
1211
// remove all hop-by-hop headers from the clone
1212
headClone->header.removeHopByHopEntries();
1214
// pack polished HTTP header
1215
packHead(httpBuf, headClone);
1220
void ICAPModXact::packHead(MemBuf &httpBuf, const HttpMsg *head)
1223
packerToMemInit(&p, &httpBuf);
1224
head->packInto(&p, true);
1228
// decides whether to offer a preview and calculates its size
1229
void ICAPModXact::decideOnPreview()
1231
if (!TheICAPConfig.preview_enable) {
1232
debugs(93, 5, HERE << "preview disabled by squid.conf");
1236
const HttpRequest *request = virgin.cause ?
1238
dynamic_cast<const HttpRequest*>(virgin.header);
1239
const String urlPath = request ? request->urlpath : String();
1241
if (!service().wantsPreview(urlPath, wantedSize)) {
1242
debugs(93, 5, "ICAPModXact should not offer preview for " << urlPath);
1246
// we decided to do preview, now compute its size
1248
Must(wantedSize >= 0);
1250
// cannot preview more than we can backup
1251
size_t ad = XMIN(wantedSize, TheBackupLimit);
1253
if (!virginBody.expected())
1256
if (virginBody.knownSize())
1257
ad = XMIN(static_cast<uint64_t>(ad), virginBody.size()); // not more than we have
1259
debugs(93, 5, "ICAPModXact should offer " << ad << "-byte preview " <<
1260
"(service wanted " << wantedSize << ")");
1263
Must(preview.enabled());
1266
// decides whether to allow 204 responses
1267
bool ICAPModXact::shouldAllow204()
1269
if (!service().allows204())
1272
return canBackupEverything();
1275
// used by shouldAllow204 and decideOnRetries
1276
bool ICAPModXact::canBackupEverything() const
1278
if (!virginBody.expected())
1279
return true; // no body means no problems with backup
1281
// if there is a body, check whether we can backup it all
1283
if (!virginBody.knownSize())
1286
// or should we have a different backup limit?
1287
// note that '<' allows for 0-termination of the "full" backup buffer
1288
return virginBody.size() < TheBackupLimit;
1291
// Decide whether this transaction can be retried if pconn fails
1292
// Must be called after decideOnPreview and before openConnection()
1293
void ICAPModXact::decideOnRetries()
1296
return; // no, already decided
1298
if (preview.enabled())
1299
return; // yes, because preview provides enough guarantees
1301
if (canBackupEverything())
1302
return; // yes, because we can back everything up
1304
disableRetries(); // no, because we cannot back everything up
1307
// Normally, the body-writing code handles preview body. It can deal with
1308
// bodies of unexpected size, including those that turn out to be empty.
1309
// However, that code assumes that the body was expected and body control
1310
// structures were initialized. This is not the case when there is no body
1311
// or the body is known to be empty, because the virgin message will lack a
1312
// body_pipe. So we handle preview of null-body and zero-size bodies here.
1313
void ICAPModXact::finishNullOrEmptyBodyPreview(MemBuf &buf)
1315
Must(!virginBodyWriting.active()); // one reason we handle it here
1316
Must(!virgin.body_pipe); // another reason we handle it here
1317
Must(!preview.ad());
1319
// do not add last-chunk because our Encapsulated header says null-body
1320
// addLastRequestChunk(buf);
1321
preview.wrote(0, true);
1323
Must(preview.done());
1324
Must(preview.ieof());
1327
void ICAPModXact::fillPendingStatus(MemBuf &buf) const
1329
ICAPXaction::fillPendingStatus(buf);
1331
if (state.serviceWaiting)
1334
if (virgin.body_pipe != NULL)
1337
if (connection > 0 && !doneReading())
1340
if (!state.doneWriting() && state.writing != State::writingInit)
1341
buf.Printf("w(%d)", state.writing);
1343
if (preview.enabled()) {
1344
if (!preview.done())
1345
buf.Printf("P(%d)", (int) preview.debt());
1348
if (virginBodySending.active())
1351
if (!state.doneParsing() && state.parsing != State::psIcapHeader)
1352
buf.Printf("p(%d)", state.parsing);
1354
if (!doneSending() && state.sending != State::sendingUndecided)
1355
buf.Printf("S(%d)", state.sending);
1361
void ICAPModXact::fillDoneStatus(MemBuf &buf) const
1363
ICAPXaction::fillDoneStatus(buf);
1365
if (!virgin.body_pipe)
1368
if (state.doneWriting())
1371
if (preview.enabled()) {
1373
buf.Printf("P%s", preview.ieof() ? "(ieof)" : "");
1379
if (state.doneParsing())
1386
bool ICAPModXact::gotEncapsulated(const char *section) const
1388
return icapReply->header.getByNameListMember("Encapsulated",
1389
section, ',').size() > 0;
1392
// calculate whether there is a virgin HTTP body and
1393
// whether its expected size is known
1394
// TODO: rename because we do not just estimate
1395
void ICAPModXact::estimateVirginBody()
1397
// note: lack of size info may disable previews and 204s
1399
HttpMsg *msg = virgin.header;
1405
method = virgin.cause->method;
1407
if (HttpRequest *req = dynamic_cast<HttpRequest*>(msg))
1408
method = req->method;
1410
method = METHOD_NONE;
1413
// expectingBody returns true for zero-sized bodies, but we will not
1414
// get a pipe for that body, so we treat the message as bodyless
1415
if (method != METHOD_NONE && msg->expectingBody(method, size) && size) {
1416
debugs(93, 6, "ICAPModXact expects virgin body from " <<
1417
virgin.body_pipe << "; size: " << size);
1419
virginBody.expect(size);
1420
virginBodyWriting.plan();
1422
// sign up as a body consumer
1423
Must(msg->body_pipe != NULL);
1424
Must(msg->body_pipe == virgin.body_pipe);
1425
Must(virgin.body_pipe->setConsumerIfNotLate(this));
1427
// make sure TheBackupLimit is in-sync with the buffer size
1428
Must(TheBackupLimit <= static_cast<size_t>(msg->body_pipe->buf().max_capacity));
1430
debugs(93, 6, "ICAPModXact does not expect virgin body");
1431
Must(msg->body_pipe == NULL);
1436
void ICAPModXact::makeAdaptedBodyPipe(const char *what) {
1437
Must(!adapted.body_pipe);
1438
Must(!adapted.header->body_pipe);
1439
adapted.header->body_pipe = new BodyPipe(this);
1440
adapted.body_pipe = adapted.header->body_pipe;
1441
debugs(93, 7, HERE << "will supply " << what << " via " <<
1442
adapted.body_pipe << " pipe");
1446
// TODO: Move SizedEstimate, MemBufBackup, and ICAPPreview elsewhere
1448
SizedEstimate::SizedEstimate()
1449
: theData(dtUnexpected)
1452
void SizedEstimate::expect(int64_t aSize)
1454
theData = (aSize >= 0) ? aSize : (int64_t)dtUnknown;
1457
bool SizedEstimate::expected() const
1459
return theData != dtUnexpected;
1462
bool SizedEstimate::knownSize() const
1465
return theData != dtUnknown;
1468
uint64_t SizedEstimate::size() const
1471
return static_cast<uint64_t>(theData);
1476
VirginBodyAct::VirginBodyAct(): theStart(0), theState(stUndecided)
1479
void VirginBodyAct::plan()
1482
Must(!theStart); // not started
1483
theState = stActive;
1486
void VirginBodyAct::disable()
1488
theState = stDisabled;
1491
void VirginBodyAct::progress(size_t size)
1495
theStart += static_cast<int64_t>(size);
1498
uint64_t VirginBodyAct::offset() const
1501
return static_cast<uint64_t>(theStart);
1505
ICAPPreview::ICAPPreview(): theWritten(0), theAd(0), theState(stDisabled)
1508
void ICAPPreview::enable(size_t anAd)
1510
// TODO: check for anAd not exceeding preview size limit
1514
theState = stWriting;
1517
bool ICAPPreview::enabled() const
1519
return theState != stDisabled;
1522
size_t ICAPPreview::ad() const
1528
bool ICAPPreview::done() const
1531
return theState >= stIeof;
1534
bool ICAPPreview::ieof() const
1537
return theState == stIeof;
1540
size_t ICAPPreview::debt() const
1543
return done() ? 0 : (theAd - theWritten);
1546
void ICAPPreview::wrote(size_t size, bool wroteEof)
1552
Must(theWritten <= theAd);
1555
theState = stIeof; // written size is irrelevant
1557
if (theWritten >= theAd)
1561
bool ICAPModXact::fillVirginHttpHeader(MemBuf &mb) const
1563
if (virgin.header == NULL)
1566
virgin.header->firstLineBuf(mb);
1572
/* ICAPModXactLauncher */
1574
ICAPModXactLauncher::ICAPModXactLauncher(ICAPInitiator *anInitiator, HttpMsg *virginHeader, HttpRequest *virginCause, ICAPServiceRep::Pointer &aService):
1575
ICAPLauncher("ICAPModXactLauncher", anInitiator, aService)
1577
virgin.setHeader(virginHeader);
1578
virgin.setCause(virginCause);
1581
ICAPXaction *ICAPModXactLauncher::createXaction()
1583
return new ICAPModXact(this, virgin.header, virgin.cause, theService);