~ubuntu-branches/ubuntu/oneiric/squid3/oneiric-security

« back to all changes in this revision

Viewing changes to src/BodyPipe.cc

  • Committer: Bazaar Package Importer
  • Author(s): Mahyuddin Susanto
  • Date: 2011-02-15 18:46:13 UTC
  • mfrom: (21.2.4 sid)
  • Revision ID: james.westby@ubuntu.com-20110215184613-1u3dh5sz4i055flk
Tags: 3.1.10-1ubuntu1
* Merge from debian unstable. (LP: #719283)  Remaining changes:
  - debian/patches/18-fix-ftbfs-binutils-gold.dpatch: Add library linker into
    LIBS instead to LDFLAGS to fixing FTBFS binutils-gold.
* Drop Ubuntu configuration for ufw which landed in Debian and sync it: 
  - debian/squid3.ufw.profile.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
1
 
2
2
#include "squid.h"
 
3
#include "base/AsyncJobCalls.h"
3
4
#include "BodyPipe.h"
4
5
#include "TextException.h"
5
6
 
40
41
public:
41
42
    typedef UnaryMemFunT<BodyProducer, BodyPipe::Pointer> Parent;
42
43
 
43
 
    BodyProducerDialer(BodyProducer *aProducer, Parent::Method aHandler,
44
 
                       BodyPipe::Pointer bp): Parent(aProducer, aHandler, bp) {}
 
44
    BodyProducerDialer(const BodyProducer::Pointer &aProducer,
 
45
                       Parent::Method aHandler, BodyPipe::Pointer bp):
 
46
            Parent(aProducer, aHandler, bp) {}
45
47
 
46
48
    virtual bool canDial(AsyncCall &call);
47
49
};
54
56
public:
55
57
    typedef UnaryMemFunT<BodyConsumer, BodyPipe::Pointer> Parent;
56
58
 
57
 
    BodyConsumerDialer(BodyConsumer *aConsumer, Parent::Method aHandler,
58
 
                       BodyPipe::Pointer bp): Parent(aConsumer, aHandler, bp) {}
 
59
    BodyConsumerDialer(const BodyConsumer::Pointer &aConsumer,
 
60
                       Parent::Method aHandler, BodyPipe::Pointer bp):
 
61
            Parent(aConsumer, aHandler, bp) {}
59
62
 
60
63
    virtual bool canDial(AsyncCall &call);
61
64
};
66
69
    if (!Parent::canDial(call))
67
70
        return false;
68
71
 
69
 
    BodyProducer *producer = object;
 
72
    const BodyProducer::Pointer &producer = job;
70
73
    BodyPipe::Pointer pipe = arg1;
71
74
    if (!pipe->stillProducing(producer)) {
72
75
        debugs(call.debugSection, call.debugLevel, HERE << producer <<
83
86
    if (!Parent::canDial(call))
84
87
        return false;
85
88
 
86
 
    BodyConsumer *consumer = object;
 
89
    const BodyConsumer::Pointer &consumer = job;
87
90
    BodyPipe::Pointer pipe = arg1;
88
91
    if (!pipe->stillConsuming(consumer)) {
89
92
        debugs(call.debugSection, call.debugLevel, HERE << consumer <<
126
129
BodyPipe::BodyPipe(Producer *aProducer): theBodySize(-1),
127
130
        theProducer(aProducer), theConsumer(0),
128
131
        thePutSize(0), theGetSize(0),
129
 
        mustAutoConsume(false), isCheckedOut(false)
 
132
        mustAutoConsume(false), abortedConsumption(false), isCheckedOut(false)
130
133
{
131
134
    // TODO: teach MemBuf to start with zero minSize
132
135
    // TODO: limit maxSize by theBodySize, when known?
183
186
void
184
187
BodyPipe::clearProducer(bool atEof)
185
188
{
186
 
    if (theProducer) {
 
189
    if (theProducer.set()) {
187
190
        debugs(91,7, HERE << "clearing BodyPipe producer" << status());
188
 
        theProducer = NULL;
 
191
        theProducer.clear();
189
192
        if (atEof) {
190
193
            if (!bodySizeKnown())
191
194
                theBodySize = thePutSize;
215
218
}
216
219
 
217
220
bool
218
 
BodyPipe::setConsumerIfNotLate(Consumer *aConsumer)
 
221
BodyPipe::setConsumerIfNotLate(const Consumer::Pointer &aConsumer)
219
222
{
220
223
    assert(!theConsumer);
221
 
    assert(aConsumer);
 
224
    assert(aConsumer.set()); // but might be invalid
222
225
 
223
226
    // TODO: convert this into an exception and remove IfNotLate suffix
224
227
    // If there is something consumed already, we are in an auto-consuming mode
228
231
        return false;
229
232
    }
230
233
 
 
234
    Must(!abortedConsumption); // did not promise to never consume
 
235
 
231
236
    theConsumer = aConsumer;
232
237
    debugs(91,7, HERE << "set consumer" << status());
233
238
    if (theBuf.hasContent())
238
243
    return true;
239
244
}
240
245
 
241
 
// When BodyPipe consumer is gone, all events for that consumer must not
242
 
// reach the new consumer (if any). Otherwise, the calls may go out of order
243
 
// (if _some_ calls are dropped due to the ultimate destination being
244
 
// temporary NULL). The code keeps track of the number of outstanding
245
 
// events and skips that number if consumer leaves. TODO: when AscyncCall
246
 
// support is improved, should we just schedule calls directly to consumer?
247
246
void
248
247
BodyPipe::clearConsumer()
249
248
{
250
 
    if (theConsumer) {
 
249
    if (theConsumer.set()) {
251
250
        debugs(91,7, HERE << "clearing consumer" << status());
252
 
        theConsumer = NULL;
253
 
        if (consumedSize() && !exhausted()) {
254
 
            AsyncCall::Pointer call= asyncCall(91, 7,
255
 
                                               "BodyProducer::noteBodyConsumerAborted",
256
 
                                               BodyProducerDialer(theProducer,
257
 
                                                                  &BodyProducer::noteBodyConsumerAborted, this));
258
 
            ScheduleCallHere(call);
259
 
        }
 
251
        theConsumer.clear();
 
252
        // do not abort if we have not consumed so that HTTP or ICAP can retry
 
253
        // benign xaction failures due to persistent connection race conditions
 
254
        if (consumedSize())
 
255
            expectNoConsumption();
 
256
    }
 
257
}
 
258
 
 
259
void
 
260
BodyPipe::expectNoConsumption()
 
261
{
 
262
    Must(!theConsumer);
 
263
    if (!abortedConsumption && !exhausted()) {
 
264
        AsyncCall::Pointer call= asyncCall(91, 7,
 
265
                                           "BodyProducer::noteBodyConsumerAborted",
 
266
                                           BodyProducerDialer(theProducer,
 
267
                                                              &BodyProducer::noteBodyConsumerAborted, this));
 
268
        ScheduleCallHere(call);
 
269
        abortedConsumption = true;
260
270
    }
261
271
}
262
272
 
334
344
    // raw buffers should always check them in (possibly unchanged)
335
345
    // instead of relying on the automated undo mechanism of Checkout.
336
346
    // The code can always use a temporary buffer to accomplish that.
337
 
    assert(checkout.checkedOutSize == currentSize);
 
347
    Must(checkout.checkedOutSize == currentSize);
338
348
}
339
349
 
340
350
// TODO: Optimize: inform consumer/producer about more data/space only if
377
387
void
378
388
BodyPipe::scheduleBodyDataNotification()
379
389
{
380
 
    if (theConsumer) {
 
390
    if (theConsumer.valid()) { // TODO: allow asyncCall() to check this instead
381
391
        AsyncCall::Pointer call = asyncCall(91, 7,
382
392
                                            "BodyConsumer::noteMoreBodyDataAvailable",
383
393
                                            BodyConsumerDialer(theConsumer,
389
399
void
390
400
BodyPipe::scheduleBodyEndNotification()
391
401
{
392
 
    if (theConsumer) {
 
402
    if (theConsumer.valid()) { // TODO: allow asyncCall() to check this instead
393
403
        if (bodySizeKnown() && bodySize() == thePutSize) {
394
404
            AsyncCall::Pointer call = asyncCall(91, 7,
395
405
                                                "BodyConsumer::noteBodyProductionEnded",
423
433
    outputBuffer.Printf(" %d+%d", (int)theBuf.contentSize(), (int)theBuf.spaceSize());
424
434
 
425
435
    outputBuffer.Printf(" pipe%p", this);
426
 
    if (theProducer)
427
 
        outputBuffer.Printf(" prod%p", theProducer);
428
 
    if (theConsumer)
429
 
        outputBuffer.Printf(" cons%p", theConsumer);
 
436
    if (theProducer.set())
 
437
        outputBuffer.Printf(" prod%p", theProducer.get());
 
438
    if (theConsumer.set())
 
439
        outputBuffer.Printf(" cons%p", theConsumer.get());
430
440
 
431
441
    if (mustAutoConsume)
432
442
        outputBuffer.append(" A", 2);
 
443
    if (abortedConsumption)
 
444
        outputBuffer.append(" !C", 3);
433
445
    if (isCheckedOut)
434
446
        outputBuffer.append(" L", 2); // Locked
435
447
 
451
463
 
452
464
BodyPipeCheckout::~BodyPipeCheckout()
453
465
{
454
 
    if (!checkedIn)
455
 
        pipe.undoCheckOut(*this);
 
466
    if (!checkedIn) {
 
467
        // Do not pipe.undoCheckOut(*this) because it asserts or throws
 
468
        // TODO: consider implementing the long-term solution discussed at
 
469
        // http://www.mail-archive.com/squid-dev@squid-cache.org/msg07910.html
 
470
        debugs(91,2, HERE << "Warning: cannot undo BodyPipeCheckout");
 
471
        pipe.checkIn(*this);
 
472
    }
456
473
}
457
474
 
458
475
void