~clint-fewbar/ubuntu/precise/squid3/ignore-sighup-early

« back to all changes in this revision

Viewing changes to src/ICAP/ICAPModXact.cc

  • Committer: Bazaar Package Importer
  • Author(s): Luigi Gangitano
  • Date: 2010-05-04 11:15:49 UTC
  • mfrom: (1.3.1 upstream)
  • mto: (20.3.1 squeeze) (21.2.1 sid)
  • mto: This revision was merged to the branch mainline in revision 21.
  • Revision ID: james.westby@ubuntu.com-20100504111549-1apjh2g5sndki4te
Tags: upstream-3.1.3
ImportĀ upstreamĀ versionĀ 3.1.3

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
/*
2
 
 * DEBUG: section 93    ICAP (RFC 3507) Client
3
 
 */
4
 
 
5
 
#include "squid.h"
6
 
#include "comm.h"
7
 
#include "HttpMsg.h"
8
 
#include "HttpRequest.h"
9
 
#include "HttpReply.h"
10
 
#include "ICAPServiceRep.h"
11
 
#include "ICAPInitiator.h"
12
 
#include "ICAPLauncher.h"
13
 
#include "ICAPModXact.h"
14
 
#include "ICAPClient.h"
15
 
#include "ChunkedCodingParser.h"
16
 
#include "TextException.h"
17
 
#include "AuthUserRequest.h"
18
 
#include "ICAPConfig.h"
19
 
#include "SquidTime.h"
20
 
 
21
 
// flow and terminology:
22
 
//     HTTP| --> receive --> encode --> write --> |network
23
 
//     end | <-- send    <-- parse  <-- read  <-- |end
24
 
 
25
 
// TODO: replace gotEncapsulated() with something faster; we call it often
26
 
 
27
 
CBDATA_CLASS_INIT(ICAPModXact);
28
 
CBDATA_CLASS_INIT(ICAPModXactLauncher);
29
 
 
30
 
static const size_t TheBackupLimit = BodyPipe::MaxCapacity;
31
 
 
32
 
extern ICAPConfig TheICAPConfig;
33
 
 
34
 
 
35
 
ICAPModXact::State::State()
36
 
{
37
 
    memset(this, 0, sizeof(*this));
38
 
}
39
 
 
40
 
ICAPModXact::ICAPModXact(ICAPInitiator *anInitiator, HttpMsg *virginHeader,
41
 
    HttpRequest *virginCause, ICAPServiceRep::Pointer &aService):
42
 
    ICAPXaction("ICAPModXact", anInitiator, aService),
43
 
    icapReply(NULL),
44
 
    virginConsumed(0),
45
 
    bodyParser(NULL),
46
 
    canStartBypass(false) // too early
47
 
{
48
 
    assert(virginHeader);
49
 
 
50
 
    virgin.setHeader(virginHeader); // sets virgin.body_pipe if needed
51
 
    virgin.setCause(virginCause); // may be NULL
52
 
 
53
 
    // adapted header and body are initialized when we parse them
54
 
 
55
 
    // writing and reading ends are handled by ICAPXaction
56
 
 
57
 
    // encoding
58
 
    // nothing to do because we are using temporary buffers
59
 
 
60
 
    // parsing
61
 
    icapReply = new HttpReply;
62
 
    icapReply->protoPrefix = "ICAP/"; // TODO: make an IcapReply class?
63
 
 
64
 
    debugs(93,7, "ICAPModXact initialized." << status());
65
 
}
66
 
 
67
 
// initiator wants us to start
68
 
void ICAPModXact::start()
69
 
{
70
 
    ICAPXaction::start();
71
 
 
72
 
    estimateVirginBody(); // before virgin disappears!
73
 
 
74
 
    canStartBypass = service().bypass;
75
 
 
76
 
    // it is an ICAP violation to send request to a service w/o known OPTIONS
77
 
 
78
 
    if (service().up())
79
 
        startWriting();
80
 
    else
81
 
        waitForService();
82
 
 
83
 
    // XXX: If commConnectStart in startWriting fails, we may get here
84
 
    //_after_ the object got destroyed. Somebody please fix commConnectStart!
85
 
    // TODO: Does re-entrance protection in callStart() solve the above?
86
 
}
87
 
 
88
 
static
89
 
void ICAPModXact_noteServiceReady(void *data, ICAPServiceRep::Pointer &)
90
 
{
91
 
    ICAPModXact *x = static_cast<ICAPModXact*>(data);
92
 
    assert(x);
93
 
    x->noteServiceReady();
94
 
}
95
 
 
96
 
void ICAPModXact::waitForService()
97
 
{
98
 
    Must(!state.serviceWaiting);
99
 
    debugs(93, 7, "ICAPModXact will wait for the ICAP service" << status());
100
 
    state.serviceWaiting = true;
101
 
    service().callWhenReady(&ICAPModXact_noteServiceReady, this);
102
 
}
103
 
 
104
 
void ICAPModXact::noteServiceReady()
105
 
{
106
 
    ICAPXaction_Enter(noteServiceReady);
107
 
 
108
 
    Must(state.serviceWaiting);
109
 
    state.serviceWaiting = false;
110
 
 
111
 
    if (service().up()) {
112
 
        startWriting();
113
 
    } else {
114
 
        disableRetries();
115
 
        throw TexcHere("ICAP service is unusable");
116
 
    }
117
 
 
118
 
    ICAPXaction_Exit();
119
 
}
120
 
 
121
 
void ICAPModXact::startWriting()
122
 
{
123
 
    state.writing = State::writingConnect;
124
 
 
125
 
    decideOnPreview(); // must be decided before we decideOnRetries
126
 
    decideOnRetries();
127
 
 
128
 
    openConnection();
129
 
    // put nothing here as openConnection calls commConnectStart
130
 
    // and that may call us back without waiting for the next select loop
131
 
}
132
 
 
133
 
// connection with the ICAP service established
134
 
void ICAPModXact::handleCommConnected()
135
 
{
136
 
    Must(state.writing == State::writingConnect);
137
 
 
138
 
    startReading(); // wait for early errors from the ICAP server
139
 
 
140
 
    MemBuf requestBuf;
141
 
    requestBuf.init();
142
 
 
143
 
    makeRequestHeaders(requestBuf);
144
 
    debugs(93, 9, "ICAPModXact ICAP will write" << status() << ":\n" <<
145
 
           (requestBuf.terminate(), requestBuf.content()));
146
 
 
147
 
    // write headers
148
 
    state.writing = State::writingHeaders;
149
 
    scheduleWrite(requestBuf);
150
 
}
151
 
 
152
 
void ICAPModXact::handleCommWrote(size_t sz)
153
 
{
154
 
    debugs(93, 5, HERE << "Wrote " << sz << " bytes");
155
 
 
156
 
    if (state.writing == State::writingHeaders)
157
 
        handleCommWroteHeaders();
158
 
    else
159
 
        handleCommWroteBody();
160
 
}
161
 
 
162
 
void ICAPModXact::handleCommWroteHeaders()
163
 
{
164
 
    Must(state.writing == State::writingHeaders);
165
 
 
166
 
    // determine next step
167
 
    if (preview.enabled())
168
 
        state.writing = preview.done() ? State::writingPaused : State::writingPreview;
169
 
    else
170
 
    if (virginBody.expected())
171
 
        state.writing = State::writingPrime;
172
 
    else {
173
 
        stopWriting(true);
174
 
        return;
175
 
    }
176
 
 
177
 
    writeMore();
178
 
}
179
 
 
180
 
void ICAPModXact::writeMore()
181
 
{
182
 
    debugs(93, 5, HERE << "checking whether to write more" << status());
183
 
 
184
 
    if (writer) // already writing something
185
 
        return;
186
 
 
187
 
    switch (state.writing) {
188
 
 
189
 
    case State::writingInit:    // waiting for service OPTIONS
190
 
        Must(state.serviceWaiting);
191
 
 
192
 
    case State::writingConnect: // waiting for the connection to establish
193
 
 
194
 
    case State::writingHeaders: // waiting for the headers to be written
195
 
 
196
 
    case State::writingPaused:  // waiting for the ICAP server response
197
 
 
198
 
    case State::writingReallyDone: // nothing more to write
199
 
        return;
200
 
 
201
 
    case State::writingAlmostDone: // was waiting for the last write
202
 
        stopWriting(false);
203
 
        return;
204
 
 
205
 
    case State::writingPreview:
206
 
        writePreviewBody();
207
 
        return;
208
 
 
209
 
    case State::writingPrime:
210
 
        writePrimeBody();
211
 
        return;
212
 
 
213
 
    default:
214
 
        throw TexcHere("ICAPModXact in bad writing state");
215
 
    }
216
 
}
217
 
 
218
 
void ICAPModXact::writePreviewBody()
219
 
{
220
 
    debugs(93, 8, HERE << "will write Preview body from " <<
221
 
        virgin.body_pipe << status());
222
 
    Must(state.writing == State::writingPreview);
223
 
    Must(virgin.body_pipe != NULL);
224
 
 
225
 
    const size_t sizeMax = (size_t)virgin.body_pipe->buf().contentSize();
226
 
    const size_t size = XMIN(preview.debt(), sizeMax);
227
 
    writeSomeBody("preview body", size);
228
 
 
229
 
    // change state once preview is written
230
 
 
231
 
    if (preview.done()) {
232
 
        debugs(93, 7, "ICAPModXact wrote entire Preview body" << status());
233
 
 
234
 
        if (preview.ieof())
235
 
            stopWriting(true);
236
 
        else
237
 
            state.writing = State::writingPaused;
238
 
    }
239
 
}
240
 
 
241
 
void ICAPModXact::writePrimeBody()
242
 
{
243
 
    Must(state.writing == State::writingPrime);
244
 
    Must(virginBodyWriting.active());
245
 
 
246
 
    const size_t size = (size_t)virgin.body_pipe->buf().contentSize();
247
 
    writeSomeBody("prime virgin body", size);
248
 
 
249
 
    if (virginBodyEndReached(virginBodyWriting)) {
250
 
        debugs(93, 5, HERE << "wrote entire body");
251
 
        stopWriting(true);
252
 
    }
253
 
}
254
 
 
255
 
void ICAPModXact::writeSomeBody(const char *label, size_t size)
256
 
{
257
 
    Must(!writer && state.writing < state.writingAlmostDone);
258
 
    Must(virgin.body_pipe != NULL);
259
 
    debugs(93, 8, HERE << "will write up to " << size << " bytes of " <<
260
 
           label);
261
 
 
262
 
    MemBuf writeBuf; // TODO: suggest a min size based on size and lastChunk
263
 
 
264
 
    writeBuf.init(); // note: we assume that last-chunk will fit
265
 
 
266
 
    const size_t writableSize = virginContentSize(virginBodyWriting);
267
 
    const size_t chunkSize = XMIN(writableSize, size);
268
 
 
269
 
    if (chunkSize) {
270
 
        debugs(93, 7, HERE << "will write " << chunkSize <<
271
 
               "-byte chunk of " << label);
272
 
 
273
 
        openChunk(writeBuf, chunkSize, false);
274
 
        writeBuf.append(virginContentData(virginBodyWriting), chunkSize);
275
 
        closeChunk(writeBuf);
276
 
 
277
 
        virginBodyWriting.progress(chunkSize);
278
 
        virginConsume();
279
 
    } else {
280
 
        debugs(93, 7, "ICAPModXact has no writable " << label << " content");
281
 
    }
282
 
 
283
 
    const bool wroteEof = virginBodyEndReached(virginBodyWriting);
284
 
    bool lastChunk = wroteEof;
285
 
    if (state.writing == State::writingPreview) {
286
 
        preview.wrote(chunkSize, wroteEof); // even if wrote nothing
287
 
        lastChunk = lastChunk || preview.done();
288
 
    }
289
 
 
290
 
    if (lastChunk) {
291
 
        debugs(93, 8, HERE << "will write last-chunk of " << label);
292
 
        addLastRequestChunk(writeBuf);
293
 
    }
294
 
 
295
 
    debugs(93, 7, HERE << "will write " << writeBuf.contentSize()
296
 
           << " raw bytes of " << label);
297
 
 
298
 
    if (writeBuf.hasContent()) {
299
 
        scheduleWrite(writeBuf); // comm will free the chunk
300
 
    } else {
301
 
        writeBuf.clean();
302
 
    }
303
 
}
304
 
 
305
 
void ICAPModXact::addLastRequestChunk(MemBuf &buf)
306
 
{
307
 
    const bool ieof = state.writing == State::writingPreview && preview.ieof();
308
 
    openChunk(buf, 0, ieof);
309
 
    closeChunk(buf);
310
 
}
311
 
 
312
 
void ICAPModXact::openChunk(MemBuf &buf, size_t chunkSize, bool ieof)
313
 
{
314
 
    buf.Printf((ieof ? "%x; ieof\r\n" : "%x\r\n"), (int) chunkSize);
315
 
}
316
 
 
317
 
void ICAPModXact::closeChunk(MemBuf &buf)
318
 
{
319
 
    buf.append(ICAP::crlf, 2); // chunk-terminating CRLF
320
 
}
321
 
 
322
 
// did the activity reached the end of the virgin body?
323
 
bool ICAPModXact::virginBodyEndReached(const VirginBodyAct &act) const
324
 
{
325
 
    return 
326
 
        !act.active() || // did all (assuming it was originally planned)
327
 
        !virgin.body_pipe->expectMoreAfter(act.offset()); // wont have more
328
 
}
329
 
 
330
 
// the size of buffered virgin body data available for the specified activity
331
 
// if this size is zero, we may be done or may be waiting for more data
332
 
size_t ICAPModXact::virginContentSize(const VirginBodyAct &act) const
333
 
{
334
 
    Must(act.active());
335
 
    // asbolute start of unprocessed data
336
 
    const uint64_t start = act.offset();
337
 
    // absolute end of buffered data
338
 
    const uint64_t end = virginConsumed + virgin.body_pipe->buf().contentSize();
339
 
    Must(virginConsumed <= start && start <= end);
340
 
    return static_cast<size_t>(end - start);
341
 
}
342
 
 
343
 
// pointer to buffered virgin body data available for the specified activity
344
 
const char *ICAPModXact::virginContentData(const VirginBodyAct &act) const
345
 
{
346
 
    Must(act.active());
347
 
    const uint64_t start = act.offset();
348
 
    Must(virginConsumed <= start);
349
 
    return virgin.body_pipe->buf().content() + static_cast<size_t>(start-virginConsumed);
350
 
}
351
 
 
352
 
void ICAPModXact::virginConsume()
353
 
{
354
 
    debugs(93, 9, "consumption guards: " << !virgin.body_pipe << isRetriable);
355
 
 
356
 
    if (!virgin.body_pipe)
357
 
        return; // nothing to consume
358
 
 
359
 
    if (isRetriable)
360
 
        return; // do not consume if we may have to retry later
361
 
 
362
 
    BodyPipe &bp = *virgin.body_pipe;
363
 
 
364
 
    // Why > 2? HttpState does not use the last bytes in the buffer
365
 
    // because delayAwareRead() is arguably broken. See 
366
 
    // HttpStateData::maybeReadVirginBody for more details.
367
 
    if (canStartBypass && bp.buf().spaceSize() > 2) {
368
 
        // Postponing may increase memory footprint and slow the HTTP side
369
 
        // down. Not postponing may increase the number of ICAP errors 
370
 
        // if the ICAP service fails. We may also use "potential" space to
371
 
        // postpone more aggressively. Should the trade-off be configurable?
372
 
        debugs(93, 8, HERE << "postponing consumption from " << bp.status());
373
 
        return;
374
 
    }
375
 
 
376
 
    const size_t have = static_cast<size_t>(bp.buf().contentSize());
377
 
    const uint64_t end = virginConsumed + have;
378
 
    uint64_t offset = end;
379
 
 
380
 
    debugs(93, 9, HERE << "max virgin consumption offset=" << offset <<
381
 
        " acts " << virginBodyWriting.active() << virginBodySending.active() <<
382
 
        " consumed=" << virginConsumed << 
383
 
        " from " << virgin.body_pipe->status());
384
 
 
385
 
    if (virginBodyWriting.active())
386
 
        offset = XMIN(virginBodyWriting.offset(), offset);
387
 
 
388
 
    if (virginBodySending.active())
389
 
        offset = XMIN(virginBodySending.offset(), offset);
390
 
 
391
 
    Must(virginConsumed <= offset && offset <= end);
392
 
 
393
 
    if (const size_t size = static_cast<size_t>(offset - virginConsumed)) {
394
 
        debugs(93, 8, HERE << "consuming " << size << " out of " << have <<
395
 
               " virgin body bytes");
396
 
        bp.consume(size);
397
 
        virginConsumed += size;
398
 
        Must(!isRetriable); // or we should not be consuming
399
 
        disableBypass("consumed content");
400
 
    }
401
 
}
402
 
 
403
 
void ICAPModXact::handleCommWroteBody()
404
 
{
405
 
    writeMore();
406
 
}
407
 
 
408
 
// Called when we do not expect to call comm_write anymore.
409
 
// We may have a pending write though.
410
 
// If stopping nicely, we will just wait for that pending write, if any.
411
 
void ICAPModXact::stopWriting(bool nicely)
412
 
{
413
 
    if (state.writing == State::writingReallyDone)
414
 
        return;
415
 
 
416
 
    if (writer) {
417
 
        if (nicely) {
418
 
            debugs(93, 7, HERE << "will wait for the last write" << status());
419
 
            state.writing = State::writingAlmostDone; // may already be set
420
 
            checkConsuming();
421
 
            return;
422
 
        }
423
 
        debugs(93, 3, HERE << "will NOT wait for the last write" << status());
424
 
 
425
 
        // Comm does not have an interface to clear the writer callback nicely,
426
 
        // but without clearing the writer we cannot recycle the connection.
427
 
        // We prevent connection reuse and hope that we can handle a callback
428
 
        // call at any time, usually in the middle of the destruction sequence!
429
 
        // Somebody should add comm_remove_write_handler() to comm API.
430
 
        reuseConnection = false;
431
 
        ignoreLastWrite = true;
432
 
    }
433
 
 
434
 
    debugs(93, 7, HERE << "will no longer write" << status());
435
 
    if (virginBodyWriting.active()) {
436
 
        virginBodyWriting.disable();
437
 
        virginConsume();
438
 
    }
439
 
    state.writing = State::writingReallyDone;
440
 
    checkConsuming();
441
 
}
442
 
 
443
 
void ICAPModXact::stopBackup()
444
 
{
445
 
    if (!virginBodySending.active())
446
 
        return;
447
 
 
448
 
    debugs(93, 7, "ICAPModXact will no longer backup" << status());
449
 
    virginBodySending.disable();
450
 
    virginConsume();
451
 
}
452
 
 
453
 
bool ICAPModXact::doneAll() const
454
 
{
455
 
    return ICAPXaction::doneAll() && !state.serviceWaiting &&
456
 
           doneSending() &&
457
 
           doneReading() && state.doneWriting();
458
 
}
459
 
 
460
 
void ICAPModXact::startReading()
461
 
{
462
 
    Must(connection >= 0);
463
 
    Must(!reader);
464
 
    Must(!adapted.header);
465
 
    Must(!adapted.body_pipe);
466
 
 
467
 
    // we use the same buffer for headers and body and then consume headers
468
 
    readMore();
469
 
}
470
 
 
471
 
void ICAPModXact::readMore()
472
 
{
473
 
    if (reader || doneReading()) {
474
 
        debugs(93,3,HERE << "returning from readMore because reader or doneReading()");
475
 
        return;
476
 
    }
477
 
 
478
 
    // do not fill readBuf if we have no space to store the result
479
 
    if (adapted.body_pipe != NULL &&
480
 
        !adapted.body_pipe->buf().hasPotentialSpace()) {
481
 
        debugs(93,3,HERE << "not reading because ICAP reply pipe is full");
482
 
        return;
483
 
    }
484
 
 
485
 
    if (readBuf.hasSpace())
486
 
        scheduleRead();
487
 
    else
488
 
        debugs(93,3,HERE << "nothing to do because !readBuf.hasSpace()");
489
 
}
490
 
 
491
 
// comm module read a portion of the ICAP response for us
492
 
void ICAPModXact::handleCommRead(size_t)
493
 
{
494
 
    Must(!state.doneParsing());
495
 
    parseMore();
496
 
    readMore();
497
 
}
498
 
 
499
 
void ICAPModXact::echoMore()
500
 
{
501
 
    Must(state.sending == State::sendingVirgin);
502
 
    Must(adapted.body_pipe != NULL);
503
 
    Must(virginBodySending.active());
504
 
 
505
 
    const size_t sizeMax = virginContentSize(virginBodySending);
506
 
    debugs(93,5, HERE << "will echo up to " << sizeMax << " bytes from " <<
507
 
        virgin.body_pipe->status());
508
 
    debugs(93,5, HERE << "will echo up to " << sizeMax << " bytes to   " <<
509
 
        adapted.body_pipe->status());
510
 
 
511
 
    if (sizeMax > 0) {
512
 
        const size_t size = adapted.body_pipe->putMoreData(virginContentData(virginBodySending), sizeMax);
513
 
        debugs(93,5, HERE << "echoed " << size << " out of " << sizeMax <<
514
 
           " bytes");
515
 
        virginBodySending.progress(size);
516
 
        virginConsume();
517
 
        disableBypass("echoed content");
518
 
    }
519
 
 
520
 
    if (virginBodyEndReached(virginBodySending)) {
521
 
        debugs(93, 5, "ICAPModXact echoed all" << status());
522
 
        stopSending(true);
523
 
    } else {
524
 
        debugs(93, 5, "ICAPModXact has " <<
525
 
            virgin.body_pipe->buf().contentSize() << " bytes " <<
526
 
            "and expects more to echo" << status());
527
 
        // TODO: timeout if virgin or adapted pipes are broken
528
 
    }
529
 
}
530
 
 
531
 
bool ICAPModXact::doneSending() const
532
 
{
533
 
    return state.sending == State::sendingDone;
534
 
}
535
 
 
536
 
// stop (or do not start) sending adapted message body
537
 
void ICAPModXact::stopSending(bool nicely)
538
 
{
539
 
    if (doneSending())
540
 
        return;
541
 
 
542
 
    if (state.sending != State::sendingUndecided) {
543
 
        debugs(93, 7, "ICAPModXact will no longer send" << status());
544
 
        if (adapted.body_pipe != NULL) {
545
 
            virginBodySending.disable();
546
 
            // we may leave debts if we were echoing and the virgin
547
 
            // body_pipe got exhausted before we echoed all planned bytes
548
 
            const bool leftDebts = adapted.body_pipe->needsMoreData();
549
 
            stopProducingFor(adapted.body_pipe, nicely && !leftDebts);
550
 
        }
551
 
    } else {
552
 
        debugs(93, 7, "ICAPModXact will not start sending" << status());
553
 
        Must(!adapted.body_pipe);
554
 
    }
555
 
 
556
 
    state.sending = State::sendingDone;
557
 
    checkConsuming();
558
 
}
559
 
 
560
 
// should be called after certain state.writing or state.sending changes
561
 
void ICAPModXact::checkConsuming()
562
 
{
563
 
    // quit if we already stopped or are still using the pipe
564
 
    if (!virgin.body_pipe || !state.doneConsumingVirgin())
565
 
        return;
566
 
 
567
 
    debugs(93, 7, HERE << "will stop consuming" << status());
568
 
    stopConsumingFrom(virgin.body_pipe);
569
 
}
570
 
 
571
 
void ICAPModXact::parseMore()
572
 
{
573
 
    debugs(93, 5, HERE << "have " << readBuf.contentSize() << " bytes to parse" <<
574
 
           status());
575
 
    debugs(93, 5, HERE << "\n" << readBuf.content());
576
 
 
577
 
    if (state.parsingHeaders())
578
 
        parseHeaders();
579
 
 
580
 
    if (state.parsing == State::psBody)
581
 
        parseBody();
582
 
}
583
 
 
584
 
void ICAPModXact::callException(const TextException &e)
585
 
{
586
 
    if (!canStartBypass || isRetriable) {
587
 
        ICAPXaction::callException(e);
588
 
        return;
589
 
    }
590
 
 
591
 
    try {
592
 
        debugs(93, 3, "bypassing ICAPModXact::" << inCall << " exception: " <<
593
 
           e.message << ' ' << status());
594
 
        bypassFailure();
595
 
    }
596
 
    catch (const TextException &bypassE) {
597
 
        ICAPXaction::callException(bypassE);
598
 
    }
599
 
}
600
 
 
601
 
void ICAPModXact::bypassFailure()
602
 
{
603
 
    disableBypass("already started to bypass");
604
 
 
605
 
    Must(!isRetriable); // or we should not be bypassing
606
 
 
607
 
    prepEchoing();
608
 
 
609
 
    startSending();
610
 
 
611
 
    // end all activities associated with the ICAP server
612
 
 
613
 
    stopParsing();
614
 
 
615
 
    stopWriting(true); // or should we force it?
616
 
    if (connection >= 0) {
617
 
        reuseConnection = false; // be conservative
618
 
        cancelRead(); // may not work; and we cannot stop connecting either
619
 
        if (!doneWithIo())
620
 
            debugs(93, 7, "Warning: bypass failed to stop I/O" << status());
621
 
    }
622
 
}
623
 
 
624
 
void ICAPModXact::disableBypass(const char *reason)
625
 
{
626
 
    if (canStartBypass) {
627
 
        debugs(93,7, HERE << "will never start bypass because " << reason);
628
 
        canStartBypass = false;
629
 
    }
630
 
}
631
 
 
632
 
 
633
 
 
634
 
// note that allocation for echoing is done in handle204NoContent()
635
 
void ICAPModXact::maybeAllocateHttpMsg()
636
 
{
637
 
    if (adapted.header) // already allocated
638
 
        return;
639
 
 
640
 
    if (gotEncapsulated("res-hdr")) {
641
 
        adapted.setHeader(new HttpReply);
642
 
    } else if (gotEncapsulated("req-hdr")) {
643
 
        adapted.setHeader(new HttpRequest);
644
 
    } else
645
 
        throw TexcHere("Neither res-hdr nor req-hdr in maybeAllocateHttpMsg()");
646
 
}
647
 
 
648
 
void ICAPModXact::parseHeaders()
649
 
{
650
 
    Must(state.parsingHeaders());
651
 
 
652
 
    if (state.parsing == State::psIcapHeader) {
653
 
        debugs(93, 5, HERE << "parse ICAP headers");
654
 
        parseIcapHead();
655
 
    }
656
 
 
657
 
    if (state.parsing == State::psHttpHeader) {
658
 
        debugs(93, 5, HERE << "parse HTTP headers");
659
 
        parseHttpHead();
660
 
    }
661
 
 
662
 
    if (state.parsingHeaders()) { // need more data
663
 
        Must(mayReadMore());
664
 
        return;
665
 
    }
666
 
 
667
 
    startSending();
668
 
}
669
 
 
670
 
// called after parsing all headers or when bypassing an exception
671
 
void ICAPModXact::startSending()
672
 
{
673
 
    disableBypass("sent headers");
674
 
    sendAnswer(adapted.header);
675
 
 
676
 
    if (state.sending == State::sendingVirgin)
677
 
        echoMore();
678
 
}
679
 
 
680
 
void ICAPModXact::parseIcapHead()
681
 
{
682
 
    Must(state.sending == State::sendingUndecided);
683
 
 
684
 
    if (!parseHead(icapReply))
685
 
        return;
686
 
 
687
 
    if (httpHeaderHasConnDir(&icapReply->header, "close")) {
688
 
        debugs(93, 5, HERE << "found connection close");
689
 
        reuseConnection = false;
690
 
    }
691
 
 
692
 
    switch (icapReply->sline.status) {
693
 
 
694
 
    case 100:
695
 
        handle100Continue();
696
 
        break;
697
 
 
698
 
    case 200:
699
 
    case 201: // Symantec Scan Engine 5.0 and later when modifying HTTP msg
700
 
 
701
 
        if (!validate200Ok()) {
702
 
            throw TexcHere("Invalid ICAP Response");
703
 
        } else {
704
 
            handle200Ok();
705
 
        }
706
 
 
707
 
        break;
708
 
 
709
 
    case 204:
710
 
        handle204NoContent();
711
 
        break;
712
 
 
713
 
    default:
714
 
        debugs(93, 5, HERE << "ICAP status " << icapReply->sline.status);
715
 
        handleUnknownScode();
716
 
        break;
717
 
    }
718
 
 
719
 
    // handle100Continue() manages state.writing on its own.
720
 
    // Non-100 status means the server needs no postPreview data from us.
721
 
    if (state.writing == State::writingPaused)
722
 
        stopWriting(true);
723
 
}
724
 
 
725
 
bool ICAPModXact::validate200Ok()
726
 
{
727
 
    if (ICAP::methodRespmod == service().method) {
728
 
        if (!gotEncapsulated("res-hdr"))
729
 
            return false;
730
 
 
731
 
        return true;
732
 
    }
733
 
 
734
 
    if (ICAP::methodReqmod == service().method) {
735
 
        if (!gotEncapsulated("res-hdr") && !gotEncapsulated("req-hdr"))
736
 
            return false;
737
 
 
738
 
        return true;
739
 
    }
740
 
 
741
 
    return false;
742
 
}
743
 
 
744
 
void ICAPModXact::handle100Continue()
745
 
{
746
 
    Must(state.writing == State::writingPaused);
747
 
    // server must not respond before the end of preview: we may send ieof
748
 
    Must(preview.enabled() && preview.done() && !preview.ieof());
749
 
 
750
 
    // 100 "Continue" cancels our preview commitment, not 204s outside preview
751
 
    if (!state.allowedPostview204)
752
 
        stopBackup();
753
 
 
754
 
    state.parsing = State::psIcapHeader; // eventually
755
 
    icapReply->reset();
756
 
 
757
 
    state.writing = State::writingPrime;
758
 
 
759
 
    writeMore();
760
 
}
761
 
 
762
 
void ICAPModXact::handle200Ok()
763
 
{
764
 
    state.parsing = State::psHttpHeader;
765
 
    state.sending = State::sendingAdapted;
766
 
    stopBackup();
767
 
    checkConsuming();
768
 
}
769
 
 
770
 
void ICAPModXact::handle204NoContent()
771
 
{
772
 
    stopParsing();
773
 
    prepEchoing();
774
 
}
775
 
 
776
 
// Called when we receive a 204 No Content response and
777
 
// when we are trying to bypass a service failure.
778
 
// We actually start sending (echoig or not) in startSending.
779
 
void ICAPModXact::prepEchoing()
780
 
{
781
 
    disableBypass("preparing to echo content");
782
 
 
783
 
    // We want to clone the HTTP message, but we do not want
784
 
    // to copy some non-HTTP state parts that HttpMsg kids carry in them.
785
 
    // Thus, we cannot use a smart pointer, copy constructor, or equivalent.
786
 
    // Instead, we simply write the HTTP message and "clone" it by parsing.
787
 
 
788
 
    HttpMsg *oldHead = virgin.header;
789
 
    debugs(93, 7, "ICAPModXact cloning virgin message " << oldHead);
790
 
 
791
 
    MemBuf httpBuf;
792
 
 
793
 
    // write the virgin message into a memory buffer
794
 
    httpBuf.init();
795
 
    packHead(httpBuf, oldHead);
796
 
 
797
 
    // allocate the adapted message and copy metainfo
798
 
    Must(!adapted.header);
799
 
    HttpMsg *newHead = NULL;
800
 
    if (const HttpRequest *oldR = dynamic_cast<const HttpRequest*>(oldHead)) {
801
 
        HttpRequest *newR = new HttpRequest;
802
 
        inheritVirginProperties(*newR, *oldR);
803
 
        newHead = newR;
804
 
    } else
805
 
    if (dynamic_cast<const HttpReply*>(oldHead))
806
 
        newHead = new HttpReply;
807
 
    Must(newHead);
808
 
 
809
 
    adapted.setHeader(newHead);
810
 
 
811
 
    // parse the buffer back
812
 
    http_status error = HTTP_STATUS_NONE;
813
 
 
814
 
    Must(newHead->parse(&httpBuf, true, &error));
815
 
 
816
 
    Must(newHead->hdr_sz == httpBuf.contentSize()); // no leftovers
817
 
 
818
 
    httpBuf.clean();
819
 
 
820
 
    debugs(93, 7, "ICAPModXact cloned virgin message " << oldHead << " to " <<
821
 
        newHead);
822
 
 
823
 
    // setup adapted body pipe if needed
824
 
    if (oldHead->body_pipe != NULL) {
825
 
        debugs(93, 7, HERE << "will echo virgin body from " <<
826
 
            oldHead->body_pipe);
827
 
        if (!virginBodySending.active())
828
 
            virginBodySending.plan(); // will throw if not possible
829
 
        state.sending = State::sendingVirgin;
830
 
        checkConsuming();
831
 
 
832
 
        // TODO: optimize: is it possible to just use the oldHead pipe and
833
 
        // remove ICAP from the loop? This echoing is probably a common case!
834
 
        makeAdaptedBodyPipe("echoed virgin response");
835
 
        if (oldHead->body_pipe->bodySizeKnown())
836
 
            adapted.body_pipe->setBodySize(oldHead->body_pipe->bodySize());
837
 
        debugs(93, 7, HERE << "will echo virgin body to " <<
838
 
            adapted.body_pipe);
839
 
    } else {
840
 
        debugs(93, 7, HERE << "no virgin body to echo");
841
 
        stopSending(true);
842
 
    }
843
 
}
844
 
 
845
 
void ICAPModXact::handleUnknownScode()
846
 
{
847
 
    stopParsing();
848
 
    stopBackup();
849
 
    // TODO: mark connection as "bad"
850
 
 
851
 
    // Terminate the transaction; we do not know how to handle this response.
852
 
    throw TexcHere("Unsupported ICAP status code");
853
 
}
854
 
 
855
 
void ICAPModXact::parseHttpHead()
856
 
{
857
 
    if (gotEncapsulated("res-hdr") || gotEncapsulated("req-hdr")) {
858
 
        maybeAllocateHttpMsg();
859
 
 
860
 
        if (!parseHead(adapted.header))
861
 
            return; // need more header data
862
 
 
863
 
        if (HttpRequest *newHead = dynamic_cast<HttpRequest*>(adapted.header)) {
864
 
            const HttpRequest *oldR = dynamic_cast<const HttpRequest*>(virgin.header);
865
 
            Must(oldR);
866
 
            // TODO: the adapted request did not really originate from the 
867
 
            // client; give proxy admin an option to prevent copying of 
868
 
            // sensitive client information here. See the following thread:
869
 
            // http://www.squid-cache.org/mail-archive/squid-dev/200703/0040.html
870
 
            inheritVirginProperties(*newHead, *oldR);
871
 
        }
872
 
    }
873
 
 
874
 
    decideOnParsingBody();
875
 
}
876
 
 
877
 
// parses both HTTP and ICAP headers
878
 
bool ICAPModXact::parseHead(HttpMsg *head)
879
 
{
880
 
    Must(head);
881
 
    debugs(93, 5, HERE << "have " << readBuf.contentSize() << " head bytes to parse" <<
882
 
           "; state: " << state.parsing);
883
 
 
884
 
    http_status error = HTTP_STATUS_NONE;
885
 
    const bool parsed = head->parse(&readBuf, commEof, &error);
886
 
    Must(parsed || !error); // success or need more data
887
 
 
888
 
    if (!parsed) { // need more data
889
 
        debugs(93, 5, HERE << "parse failed, need more data, return false");
890
 
        head->reset();
891
 
        return false;
892
 
    }
893
 
 
894
 
    debugs(93, 5, HERE << "parse success, consume " << head->hdr_sz << " bytes, return true");
895
 
    readBuf.consume(head->hdr_sz);
896
 
    return true;
897
 
}
898
 
 
899
 
// TODO: Move this method to HttpRequest?
900
 
void ICAPModXact::inheritVirginProperties(HttpRequest &newR, const HttpRequest &oldR) {
901
 
 
902
 
    newR.client_addr = oldR.client_addr;
903
 
    newR.client_port = oldR.client_port;
904
 
 
905
 
    newR.my_addr = oldR.my_addr;
906
 
    newR.my_port = oldR.my_port;
907
 
 
908
 
    // This may be too conservative for the 204 No Content case
909
 
    // may eventually need cloneNullAdaptationImmune() for that.
910
 
    newR.flags = oldR.flags.cloneAdaptationImmune();
911
 
 
912
 
    if (oldR.auth_user_request) {
913
 
        newR.auth_user_request = oldR.auth_user_request;
914
 
        AUTHUSERREQUESTLOCK(newR.auth_user_request, "newR in ICAPModXact");
915
 
    }
916
 
}
917
 
 
918
 
void ICAPModXact::decideOnParsingBody() {
919
 
    if (gotEncapsulated("res-body") || gotEncapsulated("req-body")) {
920
 
        debugs(93, 5, HERE << "expecting a body");
921
 
        state.parsing = State::psBody;
922
 
        bodyParser = new ChunkedCodingParser;
923
 
        makeAdaptedBodyPipe("adapted response from the ICAP server");
924
 
        Must(state.sending == State::sendingAdapted);
925
 
    } else {
926
 
        debugs(93, 5, HERE << "not expecting a body");
927
 
        stopParsing();
928
 
        stopSending(true);
929
 
    }
930
 
}
931
 
 
932
 
void ICAPModXact::parseBody()
933
 
{
934
 
    Must(state.parsing == State::psBody);
935
 
    Must(bodyParser);
936
 
 
937
 
    debugs(93, 5, HERE << "have " << readBuf.contentSize() << " body bytes to parse");
938
 
 
939
 
    // the parser will throw on errors
940
 
    BodyPipeCheckout bpc(*adapted.body_pipe);
941
 
    const bool parsed = bodyParser->parse(&readBuf, &bpc.buf);
942
 
    bpc.checkIn();
943
 
 
944
 
    debugs(93, 5, HERE << "have " << readBuf.contentSize() << " body bytes after " <<
945
 
           "parse; parsed all: " << parsed);
946
 
 
947
 
    // TODO: expose BodyPipe::putSize() to make this check simpler and clearer
948
 
    if (adapted.body_pipe->buf().contentSize() > 0) // parsed something sometime
949
 
        disableBypass("sent adapted content");
950
 
 
951
 
    if (parsed) {
952
 
        stopParsing();
953
 
        stopSending(true); // the parser succeeds only if all parsed data fits
954
 
        return;
955
 
    }
956
 
 
957
 
    debugs(93,3,HERE << this << " needsMoreData = " << bodyParser->needsMoreData());
958
 
 
959
 
    if (bodyParser->needsMoreData()) {
960
 
        debugs(93,3,HERE << this);
961
 
        Must(mayReadMore());
962
 
        readMore();
963
 
    }
964
 
 
965
 
    if (bodyParser->needsMoreSpace()) {
966
 
        Must(!doneSending()); // can hope for more space
967
 
        Must(adapted.body_pipe->buf().contentSize() > 0); // paranoid
968
 
        // TODO: there should be a timeout in case the sink is broken
969
 
        // or cannot consume partial content (while we need more space)
970
 
    }
971
 
}
972
 
 
973
 
void ICAPModXact::stopParsing()
974
 
{
975
 
    if (state.parsing == State::psDone)
976
 
        return;
977
 
 
978
 
    debugs(93, 7, "ICAPModXact will no longer parse" << status());
979
 
 
980
 
    delete bodyParser;
981
 
 
982
 
    bodyParser = NULL;
983
 
 
984
 
    state.parsing = State::psDone;
985
 
}
986
 
 
987
 
// HTTP side added virgin body data
988
 
void ICAPModXact::noteMoreBodyDataAvailable(BodyPipe &)
989
 
{
990
 
    ICAPXaction_Enter(noteMoreBodyDataAvailable);
991
 
 
992
 
    writeMore();
993
 
 
994
 
    if (state.sending == State::sendingVirgin)
995
 
        echoMore();
996
 
 
997
 
    ICAPXaction_Exit();
998
 
}
999
 
 
1000
 
// HTTP side sent us all virgin info
1001
 
void ICAPModXact::noteBodyProductionEnded(BodyPipe &)
1002
 
{
1003
 
    ICAPXaction_Enter(noteBodyProductionEnded);
1004
 
 
1005
 
    Must(virgin.body_pipe->productionEnded());
1006
 
 
1007
 
    // push writer and sender in case we were waiting for the last-chunk
1008
 
    writeMore();
1009
 
 
1010
 
    if (state.sending == State::sendingVirgin)
1011
 
        echoMore();
1012
 
 
1013
 
    ICAPXaction_Exit();
1014
 
}
1015
 
 
1016
 
// body producer aborted, but the initiator may still want to know 
1017
 
// the answer, even though the HTTP message has been truncated
1018
 
void ICAPModXact::noteBodyProducerAborted(BodyPipe &)
1019
 
{
1020
 
    ICAPXaction_Enter(noteBodyProducerAborted);
1021
 
 
1022
 
    Must(virgin.body_pipe->productionEnded());
1023
 
 
1024
 
    // push writer and sender in case we were waiting for the last-chunk
1025
 
    writeMore();
1026
 
 
1027
 
    if (state.sending == State::sendingVirgin)
1028
 
        echoMore();
1029
 
 
1030
 
    ICAPXaction_Exit();
1031
 
}
1032
 
 
1033
 
// adapted body consumer wants more adapted data and 
1034
 
// possibly freed some buffer space
1035
 
void ICAPModXact::noteMoreBodySpaceAvailable(BodyPipe &)
1036
 
{
1037
 
    ICAPXaction_Enter(noteMoreBodySpaceAvailable);
1038
 
 
1039
 
    if (state.sending == State::sendingVirgin)
1040
 
        echoMore();
1041
 
    else if (state.sending == State::sendingAdapted)
1042
 
        parseMore();
1043
 
    else
1044
 
        Must(state.sending == State::sendingUndecided);
1045
 
 
1046
 
    ICAPXaction_Exit();
1047
 
}
1048
 
 
1049
 
// adapted body consumer aborted
1050
 
void ICAPModXact::noteBodyConsumerAborted(BodyPipe &)
1051
 
{
1052
 
    ICAPXaction_Enter(noteBodyConsumerAborted);
1053
 
 
1054
 
    mustStop("adapted body consumer aborted");
1055
 
 
1056
 
    ICAPXaction_Exit();
1057
 
}
1058
 
 
1059
 
// internal cleanup
1060
 
void ICAPModXact::swanSong()
1061
 
{
1062
 
    debugs(93, 5, HERE << "swan sings" << status());
1063
 
 
1064
 
    stopWriting(false);
1065
 
    stopSending(false);
1066
 
 
1067
 
    if (icapReply) {
1068
 
        delete icapReply;
1069
 
        icapReply = NULL;
1070
 
    }
1071
 
 
1072
 
    ICAPXaction::swanSong();
1073
 
}
1074
 
 
1075
 
void ICAPModXact::makeRequestHeaders(MemBuf &buf)
1076
 
{
1077
 
    /*
1078
 
     * XXX These should use HttpHdr interfaces instead of Printfs
1079
 
     */
1080
 
    const ICAPServiceRep &s = service();
1081
 
    buf.Printf("%s %s ICAP/1.0\r\n", s.methodStr(), s.uri.buf());
1082
 
    buf.Printf("Host: %s:%d\r\n", s.host.buf(), s.port);
1083
 
    buf.Printf("Date: %s\r\n", mkrfc1123(squid_curtime));
1084
 
 
1085
 
    if (!TheICAPConfig.reuse_connections)
1086
 
        buf.Printf("Connection: close\r\n");
1087
 
 
1088
 
    // we must forward "Proxy-Authenticate" and "Proxy-Authorization"
1089
 
    // as ICAP headers.
1090
 
    if (virgin.header->header.has(HDR_PROXY_AUTHENTICATE)) 
1091
 
        buf.Printf("Proxy-Authenticate: %s\r\n", 
1092
 
                virgin.header->header.getByName("Proxy-Authenticate").buf());    
1093
 
    
1094
 
    if (virgin.header->header.has(HDR_PROXY_AUTHORIZATION)) 
1095
 
        buf.Printf("Proxy-Authorization: %s\r\n", 
1096
 
                virgin.header->header.getByName("Proxy-Authorization").buf());
1097
 
 
1098
 
    buf.Printf("Encapsulated: ");
1099
 
 
1100
 
    MemBuf httpBuf;
1101
 
 
1102
 
    httpBuf.init();
1103
 
 
1104
 
    // build HTTP request header, if any
1105
 
    ICAP::Method m = s.method;
1106
 
 
1107
 
    const HttpRequest *request = virgin.cause ?
1108
 
        virgin.cause :
1109
 
        dynamic_cast<const HttpRequest*>(virgin.header);
1110
 
 
1111
 
    // to simplify, we could assume that request is always available
1112
 
 
1113
 
    String urlPath;
1114
 
    if (request) {
1115
 
        urlPath = request->urlpath;
1116
 
        if (ICAP::methodRespmod == m)
1117
 
            encapsulateHead(buf, "req-hdr", httpBuf, request);
1118
 
        else
1119
 
        if (ICAP::methodReqmod == m)
1120
 
            encapsulateHead(buf, "req-hdr", httpBuf, virgin.header);
1121
 
    }
1122
 
 
1123
 
    if (ICAP::methodRespmod == m)
1124
 
        if (const HttpMsg *prime = virgin.header)
1125
 
            encapsulateHead(buf, "res-hdr", httpBuf, prime);
1126
 
 
1127
 
    if (!virginBody.expected())
1128
 
        buf.Printf("null-body=%d", (int) httpBuf.contentSize());
1129
 
    else if (ICAP::methodReqmod == m)
1130
 
        buf.Printf("req-body=%d", (int) httpBuf.contentSize());
1131
 
    else
1132
 
        buf.Printf("res-body=%d", (int) httpBuf.contentSize());
1133
 
 
1134
 
    buf.append(ICAP::crlf, 2); // terminate Encapsulated line
1135
 
 
1136
 
    if (preview.enabled()) {
1137
 
        buf.Printf("Preview: %d\r\n", (int)preview.ad());
1138
 
        if (virginBody.expected()) // there is a body to preview
1139
 
            virginBodySending.plan();
1140
 
        else
1141
 
            finishNullOrEmptyBodyPreview(httpBuf);
1142
 
    }
1143
 
 
1144
 
    if (shouldAllow204()) {
1145
 
        debugs(93,5, HERE << "will allow 204s outside of preview");
1146
 
        state.allowedPostview204 = true;
1147
 
        buf.Printf("Allow: 204\r\n");
1148
 
        if (virginBody.expected()) // there is a body to echo
1149
 
            virginBodySending.plan();
1150
 
    }
1151
 
 
1152
 
    if (TheICAPConfig.send_client_ip && request)
1153
 
        if (request->client_addr.s_addr != any_addr.s_addr &&
1154
 
            request->client_addr.s_addr != no_addr.s_addr)
1155
 
            buf.Printf("X-Client-IP: %s\r\n", inet_ntoa(request->client_addr));
1156
 
 
1157
 
    if (TheICAPConfig.send_client_username && request)
1158
 
        makeUsernameHeader(request, buf);
1159
 
 
1160
 
    // fprintf(stderr, "%s\n", buf.content());
1161
 
 
1162
 
    buf.append(ICAP::crlf, 2); // terminate ICAP header
1163
 
 
1164
 
    // start ICAP request body with encapsulated HTTP headers
1165
 
    buf.append(httpBuf.content(), httpBuf.contentSize());
1166
 
 
1167
 
    httpBuf.clean();
1168
 
}
1169
 
 
1170
 
void ICAPModXact::makeUsernameHeader(const HttpRequest *request, MemBuf &buf) {
1171
 
    if (const AuthUserRequest *auth = request->auth_user_request) {
1172
 
        if (char const *name = auth->username()) {
1173
 
            const char *value = TheICAPConfig.client_username_encode ?
1174
 
                base64_encode(name) : name;
1175
 
            buf.Printf("%s: %s\r\n", TheICAPConfig.client_username_header,
1176
 
                value);
1177
 
        }
1178
 
    }
1179
 
}
1180
 
 
1181
 
void ICAPModXact::encapsulateHead(MemBuf &icapBuf, const char *section, MemBuf &httpBuf, const HttpMsg *head)
1182
 
{
1183
 
    // update ICAP header
1184
 
    icapBuf.Printf("%s=%d, ", section, (int) httpBuf.contentSize());
1185
 
 
1186
 
    // begin cloning
1187
 
    HttpMsg *headClone = NULL;
1188
 
    
1189
 
    if (const HttpRequest* old_request = dynamic_cast<const HttpRequest*>(head)) {
1190
 
        HttpRequest* new_request = new HttpRequest;
1191
 
        urlParse(old_request->method, old_request->canonical,new_request);
1192
 
        new_request->http_ver = old_request->http_ver;
1193
 
        inheritVirginProperties(*new_request, *old_request);
1194
 
        headClone = new_request;
1195
 
    } 
1196
 
    else if (const HttpReply *old_reply = dynamic_cast<const HttpReply*>(head)) {
1197
 
        HttpReply* new_reply = new HttpReply;
1198
 
        new_reply->sline = old_reply->sline;
1199
 
        headClone = new_reply;
1200
 
    }
1201
 
    
1202
 
    Must(headClone);
1203
 
    
1204
 
    HttpHeaderPos pos = HttpHeaderInitPos;
1205
 
    HttpHeaderEntry* p_head_entry = NULL;
1206
 
    while (NULL != (p_head_entry = head->header.getEntry(&pos)) )
1207
 
        headClone->header.addEntry(p_head_entry->clone());
1208
 
 
1209
 
    // end cloning
1210
 
        
1211
 
    // remove all hop-by-hop headers from the clone
1212
 
    headClone->header.removeHopByHopEntries();
1213
 
 
1214
 
    // pack polished HTTP header
1215
 
    packHead(httpBuf, headClone);
1216
 
 
1217
 
    delete headClone;
1218
 
}
1219
 
 
1220
 
void ICAPModXact::packHead(MemBuf &httpBuf, const HttpMsg *head)
1221
 
{
1222
 
    Packer p;
1223
 
    packerToMemInit(&p, &httpBuf);
1224
 
    head->packInto(&p, true);
1225
 
    packerClean(&p);
1226
 
}
1227
 
 
1228
 
// decides whether to offer a preview and calculates its size
1229
 
void ICAPModXact::decideOnPreview()
1230
 
{
1231
 
    if (!TheICAPConfig.preview_enable) {
1232
 
        debugs(93, 5, HERE << "preview disabled by squid.conf");
1233
 
        return;
1234
 
    }
1235
 
 
1236
 
    const HttpRequest *request = virgin.cause ?
1237
 
        virgin.cause :
1238
 
        dynamic_cast<const HttpRequest*>(virgin.header);
1239
 
    const String urlPath = request ? request->urlpath : String();
1240
 
    size_t wantedSize;
1241
 
    if (!service().wantsPreview(urlPath, wantedSize)) {
1242
 
        debugs(93, 5, "ICAPModXact should not offer preview for " << urlPath);
1243
 
        return;
1244
 
    }
1245
 
 
1246
 
    // we decided to do preview, now compute its size
1247
 
 
1248
 
    Must(wantedSize >= 0);
1249
 
 
1250
 
    // cannot preview more than we can backup
1251
 
    size_t ad = XMIN(wantedSize, TheBackupLimit);
1252
 
 
1253
 
    if (!virginBody.expected())
1254
 
        ad = 0;
1255
 
    else
1256
 
    if (virginBody.knownSize())
1257
 
        ad = XMIN(static_cast<uint64_t>(ad), virginBody.size()); // not more than we have
1258
 
 
1259
 
    debugs(93, 5, "ICAPModXact should offer " << ad << "-byte preview " <<
1260
 
           "(service wanted " << wantedSize << ")");
1261
 
 
1262
 
    preview.enable(ad);
1263
 
    Must(preview.enabled());
1264
 
}
1265
 
 
1266
 
// decides whether to allow 204 responses
1267
 
bool ICAPModXact::shouldAllow204()
1268
 
{
1269
 
    if (!service().allows204())
1270
 
        return false;
1271
 
 
1272
 
    return canBackupEverything();
1273
 
}
1274
 
 
1275
 
// used by shouldAllow204 and decideOnRetries
1276
 
bool ICAPModXact::canBackupEverything() const
1277
 
{
1278
 
    if (!virginBody.expected())
1279
 
        return true; // no body means no problems with backup
1280
 
 
1281
 
    // if there is a body, check whether we can backup it all
1282
 
 
1283
 
    if (!virginBody.knownSize())
1284
 
        return false;
1285
 
 
1286
 
    // or should we have a different backup limit?
1287
 
    // note that '<' allows for 0-termination of the "full" backup buffer
1288
 
    return virginBody.size() < TheBackupLimit;
1289
 
}
1290
 
 
1291
 
// Decide whether this transaction can be retried if pconn fails
1292
 
// Must be called after decideOnPreview and before openConnection()
1293
 
void ICAPModXact::decideOnRetries()
1294
 
{
1295
 
    if (!isRetriable)
1296
 
        return; // no, already decided
1297
 
 
1298
 
    if (preview.enabled())
1299
 
        return; // yes, because preview provides enough guarantees
1300
 
 
1301
 
    if (canBackupEverything())
1302
 
        return; // yes, because we can back everything up
1303
 
 
1304
 
    disableRetries(); // no, because we cannot back everything up
1305
 
}
1306
 
 
1307
 
// Normally, the body-writing code handles preview body. It can deal with
1308
 
// bodies of unexpected size, including those that turn out to be empty.
1309
 
// However, that code assumes that the body was expected and body control
1310
 
// structures were initialized. This is not the case when there is no body
1311
 
// or the body is known to be empty, because the virgin message will lack a
1312
 
// body_pipe. So we handle preview of null-body and zero-size bodies here.
1313
 
void ICAPModXact::finishNullOrEmptyBodyPreview(MemBuf &buf)
1314
 
{
1315
 
    Must(!virginBodyWriting.active()); // one reason we handle it here
1316
 
    Must(!virgin.body_pipe);          // another reason we handle it here
1317
 
    Must(!preview.ad());
1318
 
 
1319
 
    // do not add last-chunk because our Encapsulated header says null-body
1320
 
    // addLastRequestChunk(buf);
1321
 
    preview.wrote(0, true);
1322
 
 
1323
 
    Must(preview.done());
1324
 
    Must(preview.ieof());
1325
 
}
1326
 
 
1327
 
void ICAPModXact::fillPendingStatus(MemBuf &buf) const
1328
 
{
1329
 
    ICAPXaction::fillPendingStatus(buf);
1330
 
 
1331
 
    if (state.serviceWaiting)
1332
 
        buf.append("U", 1);
1333
 
 
1334
 
    if (virgin.body_pipe != NULL)
1335
 
        buf.append("R", 1);
1336
 
 
1337
 
    if (connection > 0 && !doneReading())
1338
 
        buf.append("r", 1);
1339
 
 
1340
 
    if (!state.doneWriting() && state.writing != State::writingInit)
1341
 
        buf.Printf("w(%d)", state.writing);
1342
 
 
1343
 
    if (preview.enabled()) {
1344
 
        if (!preview.done())
1345
 
            buf.Printf("P(%d)", (int) preview.debt());
1346
 
    }
1347
 
 
1348
 
    if (virginBodySending.active())
1349
 
        buf.append("B", 1);
1350
 
 
1351
 
    if (!state.doneParsing() && state.parsing != State::psIcapHeader)
1352
 
        buf.Printf("p(%d)", state.parsing);
1353
 
 
1354
 
    if (!doneSending() && state.sending != State::sendingUndecided)
1355
 
        buf.Printf("S(%d)", state.sending);
1356
 
 
1357
 
    if (canStartBypass)
1358
 
       buf.append("Y", 1);
1359
 
}
1360
 
 
1361
 
void ICAPModXact::fillDoneStatus(MemBuf &buf) const
1362
 
{
1363
 
    ICAPXaction::fillDoneStatus(buf);
1364
 
 
1365
 
    if (!virgin.body_pipe)
1366
 
        buf.append("R", 1);
1367
 
 
1368
 
    if (state.doneWriting())
1369
 
        buf.append("w", 1);
1370
 
 
1371
 
    if (preview.enabled()) {
1372
 
        if (preview.done())
1373
 
            buf.Printf("P%s", preview.ieof() ? "(ieof)" : "");
1374
 
    }
1375
 
 
1376
 
    if (doneReading())
1377
 
        buf.append("r", 1);
1378
 
 
1379
 
    if (state.doneParsing())
1380
 
        buf.append("p", 1);
1381
 
 
1382
 
    if (doneSending())
1383
 
        buf.append("S", 1);
1384
 
}
1385
 
 
1386
 
bool ICAPModXact::gotEncapsulated(const char *section) const
1387
 
{
1388
 
    return icapReply->header.getByNameListMember("Encapsulated",
1389
 
            section, ',').size() > 0;
1390
 
}
1391
 
 
1392
 
// calculate whether there is a virgin HTTP body and
1393
 
// whether its expected size is known
1394
 
// TODO: rename because we do not just estimate
1395
 
void ICAPModXact::estimateVirginBody()
1396
 
{
1397
 
    // note: lack of size info may disable previews and 204s
1398
 
 
1399
 
    HttpMsg *msg = virgin.header;
1400
 
    Must(msg);
1401
 
 
1402
 
    method_t method;
1403
 
 
1404
 
    if (virgin.cause)
1405
 
        method = virgin.cause->method;
1406
 
    else
1407
 
    if (HttpRequest *req = dynamic_cast<HttpRequest*>(msg))
1408
 
        method = req->method;
1409
 
    else
1410
 
        method = METHOD_NONE;
1411
 
 
1412
 
    int64_t size;
1413
 
    // expectingBody returns true for zero-sized bodies, but we will not
1414
 
    // get a pipe for that body, so we treat the message as bodyless
1415
 
    if (method != METHOD_NONE && msg->expectingBody(method, size) && size) {
1416
 
        debugs(93, 6, "ICAPModXact expects virgin body from " << 
1417
 
            virgin.body_pipe << "; size: " << size);
1418
 
 
1419
 
        virginBody.expect(size);
1420
 
        virginBodyWriting.plan();
1421
 
 
1422
 
        // sign up as a body consumer
1423
 
        Must(msg->body_pipe != NULL);
1424
 
        Must(msg->body_pipe == virgin.body_pipe);
1425
 
        Must(virgin.body_pipe->setConsumerIfNotLate(this));
1426
 
 
1427
 
        // make sure TheBackupLimit is in-sync with the buffer size
1428
 
        Must(TheBackupLimit <= static_cast<size_t>(msg->body_pipe->buf().max_capacity));
1429
 
    } else {
1430
 
        debugs(93, 6, "ICAPModXact does not expect virgin body");
1431
 
        Must(msg->body_pipe == NULL);
1432
 
        checkConsuming();
1433
 
    }
1434
 
}
1435
 
 
1436
 
void ICAPModXact::makeAdaptedBodyPipe(const char *what) {
1437
 
    Must(!adapted.body_pipe);
1438
 
    Must(!adapted.header->body_pipe);
1439
 
    adapted.header->body_pipe = new BodyPipe(this);
1440
 
    adapted.body_pipe = adapted.header->body_pipe;
1441
 
    debugs(93, 7, HERE << "will supply " << what << " via " <<
1442
 
        adapted.body_pipe << " pipe");
1443
 
}
1444
 
 
1445
 
 
1446
 
// TODO: Move SizedEstimate, MemBufBackup, and ICAPPreview elsewhere
1447
 
 
1448
 
SizedEstimate::SizedEstimate()
1449
 
        : theData(dtUnexpected)
1450
 
{}
1451
 
 
1452
 
void SizedEstimate::expect(int64_t aSize)
1453
 
{
1454
 
    theData = (aSize >= 0) ? aSize : (int64_t)dtUnknown;
1455
 
}
1456
 
 
1457
 
bool SizedEstimate::expected() const
1458
 
{
1459
 
    return theData != dtUnexpected;
1460
 
}
1461
 
 
1462
 
bool SizedEstimate::knownSize() const
1463
 
{
1464
 
    Must(expected());
1465
 
    return theData != dtUnknown;
1466
 
}
1467
 
 
1468
 
uint64_t SizedEstimate::size() const
1469
 
{
1470
 
    Must(knownSize());
1471
 
    return static_cast<uint64_t>(theData);
1472
 
}
1473
 
 
1474
 
 
1475
 
 
1476
 
VirginBodyAct::VirginBodyAct(): theStart(0), theState(stUndecided)
1477
 
{}
1478
 
 
1479
 
void VirginBodyAct::plan()
1480
 
{
1481
 
    Must(!disabled());
1482
 
    Must(!theStart); // not started
1483
 
    theState = stActive;
1484
 
}
1485
 
 
1486
 
void VirginBodyAct::disable()
1487
 
{
1488
 
    theState = stDisabled;
1489
 
}
1490
 
 
1491
 
void VirginBodyAct::progress(size_t size)
1492
 
{
1493
 
    Must(active());
1494
 
    Must(size >= 0);
1495
 
    theStart += static_cast<int64_t>(size);
1496
 
}
1497
 
 
1498
 
uint64_t VirginBodyAct::offset() const
1499
 
{
1500
 
    Must(active());
1501
 
    return static_cast<uint64_t>(theStart);
1502
 
}
1503
 
 
1504
 
 
1505
 
ICAPPreview::ICAPPreview(): theWritten(0), theAd(0), theState(stDisabled)
1506
 
{}
1507
 
 
1508
 
void ICAPPreview::enable(size_t anAd)
1509
 
{
1510
 
    // TODO: check for anAd not exceeding preview size limit
1511
 
    Must(anAd >= 0);
1512
 
    Must(!enabled());
1513
 
    theAd = anAd;
1514
 
    theState = stWriting;
1515
 
}
1516
 
 
1517
 
bool ICAPPreview::enabled() const
1518
 
{
1519
 
    return theState != stDisabled;
1520
 
}
1521
 
 
1522
 
size_t ICAPPreview::ad() const
1523
 
{
1524
 
    Must(enabled());
1525
 
    return theAd;
1526
 
}
1527
 
 
1528
 
bool ICAPPreview::done() const
1529
 
{
1530
 
    Must(enabled());
1531
 
    return theState >= stIeof;
1532
 
}
1533
 
 
1534
 
bool ICAPPreview::ieof() const
1535
 
{
1536
 
    Must(enabled());
1537
 
    return theState == stIeof;
1538
 
}
1539
 
 
1540
 
size_t ICAPPreview::debt() const
1541
 
{
1542
 
    Must(enabled());
1543
 
    return done() ? 0 : (theAd - theWritten);
1544
 
}
1545
 
 
1546
 
void ICAPPreview::wrote(size_t size, bool wroteEof)
1547
 
{
1548
 
    Must(enabled());
1549
 
 
1550
 
    theWritten += size;
1551
 
 
1552
 
        Must(theWritten <= theAd);
1553
 
 
1554
 
        if (wroteEof)
1555
 
                theState = stIeof; // written size is irrelevant
1556
 
        else
1557
 
    if (theWritten >= theAd)
1558
 
        theState = stDone;
1559
 
}
1560
 
 
1561
 
bool ICAPModXact::fillVirginHttpHeader(MemBuf &mb) const
1562
 
{
1563
 
    if (virgin.header == NULL)
1564
 
        return false;
1565
 
 
1566
 
    virgin.header->firstLineBuf(mb);
1567
 
 
1568
 
    return true;
1569
 
}
1570
 
 
1571
 
 
1572
 
/* ICAPModXactLauncher */
1573
 
 
1574
 
ICAPModXactLauncher::ICAPModXactLauncher(ICAPInitiator *anInitiator, HttpMsg *virginHeader, HttpRequest *virginCause, ICAPServiceRep::Pointer &aService):
1575
 
    ICAPLauncher("ICAPModXactLauncher", anInitiator, aService)
1576
 
{
1577
 
    virgin.setHeader(virginHeader);
1578
 
    virgin.setCause(virginCause);
1579
 
}
1580
 
 
1581
 
ICAPXaction *ICAPModXactLauncher::createXaction()
1582
 
{
1583
 
    return new ICAPModXact(this, virgin.header, virgin.cause, theService);
1584
 
}