~clint-fewbar/ubuntu/precise/squid3/ignore-sighup-early

« back to all changes in this revision

Viewing changes to src/BodyPipe.cc

  • Committer: Bazaar Package Importer
  • Author(s): Luigi Gangitano
  • Date: 2009-09-24 14:51:06 UTC
  • mfrom: (1.1.12 upstream)
  • mto: (20.2.1 sid)
  • mto: This revision was merged to the branch mainline in revision 21.
  • Revision ID: james.westby@ubuntu.com-20090924145106-38jgrzmj0d73pha5
Tags: 3.1.0.13-1
* Upload to experimental

* New upstream release
  - Fixes Follow-X-Forwarded-For support (Closes: #523943)
  - Adds IPv6 support (Closes: #432351)

* debian/rules
  - Removed obsolete configuration options
  - Enable db and radius basic authentication modules

* debian/patches/01-cf.data.debian
  - Adapted to new upstream version

* debian/patches/02-makefile-defaults
  - Adapted to new upstream version

* debian/{squid.postinst,squid.rc,README.Debian,watch}
  - Updated references to squid 3.1

* debian/squid3.install
  - Install CSS file for error pages
  - Install manual pages for new authentication modules

* debian/squid3-common.install
  - Install documented version of configuration file in /usr/share/doc/squid3

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
1
 
2
2
#include "squid.h"
3
3
#include "BodyPipe.h"
 
4
#include "TextException.h"
4
5
 
5
6
CBDATA_CLASS_INIT(BodyPipe);
6
7
 
 
8
// BodySink is a BodyConsumer class which  just consume and drops
 
9
// data from a BodyPipe
 
10
class BodySink: public BodyConsumer
 
11
{
 
12
    bool done;
 
13
public:
 
14
    BodySink():AsyncJob("BodySink"), done(false) {}
 
15
    virtual ~BodySink() {}
 
16
 
 
17
    virtual void noteMoreBodyDataAvailable(BodyPipe::Pointer bp) {
 
18
        size_t contentSize = bp->buf().contentSize();
 
19
        bp->consume(contentSize);
 
20
    }
 
21
    virtual void noteBodyProductionEnded(BodyPipe::Pointer bp) {
 
22
        stopConsumingFrom(bp);
 
23
        done = true;
 
24
    }
 
25
    virtual void noteBodyProducerAborted(BodyPipe::Pointer bp) {
 
26
        stopConsumingFrom(bp);
 
27
        done = true;
 
28
    }
 
29
    bool doneAll() const {return done && AsyncJob::doneAll();}
 
30
    CBDATA_CLASS2(BodySink);
 
31
};
 
32
 
 
33
CBDATA_CLASS_INIT(BodySink);
 
34
 
 
35
// The BodyProducerDialer is an AsyncCall class which used to schedule BodyProducer calls.
 
36
// In addition to a normal AsyncCall checks if the BodyProducer is still the producer of
 
37
// the BodyPipe passed as argument
 
38
class BodyProducerDialer: public UnaryMemFunT<BodyProducer, BodyPipe::Pointer>
 
39
{
 
40
public:
 
41
    typedef UnaryMemFunT<BodyProducer, BodyPipe::Pointer> Parent;
 
42
 
 
43
    BodyProducerDialer(BodyProducer *aProducer, Parent::Method aHandler,
 
44
                       BodyPipe::Pointer bp): Parent(aProducer, aHandler, bp) {}
 
45
 
 
46
    virtual bool canDial(AsyncCall &call);
 
47
};
 
48
 
 
49
// The BodyConsumerDialer is an AsyncCall class which used to schedule BodyConsumer calls.
 
50
// In addition to a normal AsyncCall checks if the BodyConsumer is still the reciptient
 
51
// of the BodyPipe passed as argument
 
52
class BodyConsumerDialer: public UnaryMemFunT<BodyConsumer, BodyPipe::Pointer>
 
53
{
 
54
public:
 
55
    typedef UnaryMemFunT<BodyConsumer, BodyPipe::Pointer> Parent;
 
56
 
 
57
    BodyConsumerDialer(BodyConsumer *aConsumer, Parent::Method aHandler,
 
58
                       BodyPipe::Pointer bp): Parent(aConsumer, aHandler, bp) {}
 
59
 
 
60
    virtual bool canDial(AsyncCall &call);
 
61
};
 
62
 
 
63
bool
 
64
BodyProducerDialer::canDial(AsyncCall &call)
 
65
{
 
66
    if (!Parent::canDial(call))
 
67
        return false;
 
68
 
 
69
    BodyProducer *producer = object;
 
70
    BodyPipe::Pointer pipe = arg1;
 
71
    if (!pipe->stillProducing(producer)) {
 
72
        debugs(call.debugSection, call.debugLevel, HERE << producer <<
 
73
               " no longer producing for " << pipe->status());
 
74
        return call.cancel("no longer producing");
 
75
    }
 
76
 
 
77
    return true;
 
78
}
 
79
 
 
80
bool
 
81
BodyConsumerDialer::canDial(AsyncCall &call)
 
82
{
 
83
    if (!Parent::canDial(call))
 
84
        return false;
 
85
 
 
86
    BodyConsumer *consumer = object;
 
87
    BodyPipe::Pointer pipe = arg1;
 
88
    if (!pipe->stillConsuming(consumer)) {
 
89
        debugs(call.debugSection, call.debugLevel, HERE << consumer <<
 
90
               " no longer consuming from " << pipe->status());
 
91
        return call.cancel("no longer consuming");
 
92
    }
 
93
 
 
94
    return true;
 
95
}
 
96
 
 
97
 
 
98
/* BodyProducer */
 
99
 
7
100
// inform the pipe that we are done and clear the Pointer
8
101
void BodyProducer::stopProducingFor(RefCount<BodyPipe> &pipe, bool atEof)
9
102
{
10
 
        debugs(91,7, HERE << this << " will not produce for " << pipe <<
11
 
                "; atEof: " << atEof);
12
 
        assert(pipe != NULL); // be strict: the caller state may depend on this
13
 
        pipe->clearProducer(atEof);
14
 
        pipe = NULL;
 
103
    debugs(91,7, HERE << this << " will not produce for " << pipe <<
 
104
           "; atEof: " << atEof);
 
105
    assert(pipe != NULL); // be strict: the caller state may depend on this
 
106
    pipe->clearProducer(atEof);
 
107
    pipe = NULL;
15
108
}
16
109
 
 
110
 
 
111
 
 
112
/* BodyConsumer */
 
113
 
17
114
// inform the pipe that we are done and clear the Pointer
18
115
void BodyConsumer::stopConsumingFrom(RefCount<BodyPipe> &pipe)
19
116
{
20
 
        debugs(91,7, HERE << this << " will not consume from " << pipe);
21
 
        assert(pipe != NULL); // be strict: the caller state may depend on this
22
 
        pipe->clearConsumer();
23
 
        pipe = NULL;
 
117
    debugs(91,7, HERE << this << " will not consume from " << pipe);
 
118
    assert(pipe != NULL); // be strict: the caller state may depend on this
 
119
    pipe->clearConsumer();
 
120
    pipe = NULL;
24
121
}
25
122
 
 
123
 
26
124
/* BodyPipe */
27
125
 
28
126
BodyPipe::BodyPipe(Producer *aProducer): theBodySize(-1),
29
 
        theProducer(aProducer), theConsumer(0),
30
 
        thePutSize(0), theGetSize(0), theCCallsPending(0), theCCallsToSkip(0),
31
 
        mustAutoConsume(false), isCheckedOut(false)
 
127
        theProducer(aProducer), theConsumer(0),
 
128
        thePutSize(0), theGetSize(0),
 
129
        mustAutoConsume(false), isCheckedOut(false)
32
130
{
33
 
        // TODO: teach MemBuf to start with zero minSize
34
 
        // TODO: limit maxSize by theBodySize, when known?
35
 
        theBuf.init(2*1024, MaxCapacity); 
36
 
        debugs(91,7, HERE << "created BodyPipe" << status());
 
131
    // TODO: teach MemBuf to start with zero minSize
 
132
    // TODO: limit maxSize by theBodySize, when known?
 
133
    theBuf.init(2*1024, MaxCapacity);
 
134
    debugs(91,7, HERE << "created BodyPipe" << status());
37
135
}
38
136
 
39
137
BodyPipe::~BodyPipe()
40
138
{
41
 
        debugs(91,7, HERE << "destroying BodyPipe" << status());
42
 
        assert(!theProducer);
43
 
        assert(!theConsumer);
44
 
        theBuf.clean();
 
139
    debugs(91,7, HERE << "destroying BodyPipe" << status());
 
140
    assert(!theProducer);
 
141
    assert(!theConsumer);
 
142
    theBuf.clean();
45
143
}
46
144
 
47
145
void BodyPipe::setBodySize(uint64_t aBodySize)
48
146
{
49
 
        assert(!bodySizeKnown());
50
 
        assert(aBodySize >= 0);
51
 
        assert(thePutSize <= aBodySize);
52
 
 
53
 
        // If this assert fails, we need to add code to check for eof and inform
54
 
        // the consumer about the eof condition via scheduleBodyEndNotification,
55
 
        // because just setting a body size limit may trigger the eof condition.
56
 
        assert(!theConsumer); 
57
 
 
58
 
        theBodySize = aBodySize;
59
 
        debugs(91,7, HERE << "set body size" << status());
 
147
    assert(!bodySizeKnown());
 
148
    assert(aBodySize >= 0);
 
149
    assert(thePutSize <= aBodySize);
 
150
 
 
151
    // If this assert fails, we need to add code to check for eof and inform
 
152
    // the consumer about the eof condition via scheduleBodyEndNotification,
 
153
    // because just setting a body size limit may trigger the eof condition.
 
154
    assert(!theConsumer);
 
155
 
 
156
    theBodySize = aBodySize;
 
157
    debugs(91,7, HERE << "set body size" << status());
60
158
}
61
159
 
62
160
uint64_t BodyPipe::bodySize() const
63
161
{
64
 
        assert(bodySizeKnown());
65
 
        return static_cast<uint64_t>(theBodySize);
 
162
    assert(bodySizeKnown());
 
163
    return static_cast<uint64_t>(theBodySize);
66
164
}
67
165
 
68
166
bool BodyPipe::expectMoreAfter(uint64_t offset) const
69
167
{
70
 
        assert(theGetSize <= offset);
71
 
        return offset < thePutSize || // buffer has more now or
72
 
                (!productionEnded() && mayNeedMoreData()); // buffer will have more
 
168
    assert(theGetSize <= offset);
 
169
    return offset < thePutSize || // buffer has more now or
 
170
           (!productionEnded() && mayNeedMoreData()); // buffer will have more
73
171
}
74
172
 
75
173
bool BodyPipe::exhausted() const
76
174
{
77
 
        return !expectMoreAfter(theGetSize);
 
175
    return !expectMoreAfter(theGetSize);
78
176
}
79
177
 
80
178
uint64_t BodyPipe::unproducedSize() const
81
179
{
82
 
        return bodySize() - thePutSize; // bodySize() asserts that size is known
 
180
    return bodySize() - thePutSize; // bodySize() asserts that size is known
83
181
}
84
182
 
85
183
void
86
184
BodyPipe::clearProducer(bool atEof)
87
185
{
88
 
        if (theProducer) {
89
 
                debugs(91,7, HERE << "clearing BodyPipe producer" << status());
90
 
                theProducer = NULL;
91
 
                if (atEof) {
92
 
                        if (!bodySizeKnown())
93
 
                                theBodySize = thePutSize;
94
 
                        else
95
 
                        if (bodySize() != thePutSize)
96
 
                                debugs(91,3, HERE << "aborting on premature eof" << status());
97
 
                } else {
98
 
                        // asserta that we can detect the abort if the consumer joins later
99
 
                        assert(!bodySizeKnown() || bodySize() != thePutSize);
100
 
                }
101
 
                scheduleBodyEndNotification();
102
 
        }
 
186
    if (theProducer) {
 
187
        debugs(91,7, HERE << "clearing BodyPipe producer" << status());
 
188
        theProducer = NULL;
 
189
        if (atEof) {
 
190
            if (!bodySizeKnown())
 
191
                theBodySize = thePutSize;
 
192
            else
 
193
                if (bodySize() != thePutSize)
 
194
                    debugs(91,3, HERE << "aborting on premature eof" << status());
 
195
        } else {
 
196
            // asserta that we can detect the abort if the consumer joins later
 
197
            assert(!bodySizeKnown() || bodySize() != thePutSize);
 
198
        }
 
199
        scheduleBodyEndNotification();
 
200
    }
103
201
}
104
202
 
105
203
size_t
106
204
BodyPipe::putMoreData(const char *buf, size_t size)
107
205
{
108
 
        if (bodySizeKnown())
109
 
                size = XMIN((uint64_t)size, unproducedSize());
 
206
    if (bodySizeKnown())
 
207
        size = min((uint64_t)size, unproducedSize());
110
208
 
111
 
        const size_t spaceSize = static_cast<size_t>(theBuf.potentialSpaceSize());
112
 
        if ((size = XMIN(size, spaceSize))) {
113
 
                theBuf.append(buf, size);
114
 
                postAppend(size);
115
 
                return size;
116
 
        }
117
 
        return 0;
 
209
    const size_t spaceSize = static_cast<size_t>(theBuf.potentialSpaceSize());
 
210
    if ((size = min(size, spaceSize))) {
 
211
        theBuf.append(buf, size);
 
212
        postAppend(size);
 
213
        return size;
 
214
    }
 
215
    return 0;
118
216
}
119
217
 
120
218
bool
121
219
BodyPipe::setConsumerIfNotLate(Consumer *aConsumer)
122
220
{
123
 
        assert(!theConsumer);
124
 
        assert(aConsumer);
125
 
 
126
 
        // TODO: convert this into an exception and remove IfNotLate suffix
127
 
        // If there is something consumed already, we are in an auto-consuming mode
128
 
        // and it is too late to attach a real consumer to the pipe.
129
 
        if (theGetSize > 0) {
130
 
                assert(mustAutoConsume);
131
 
                return false;
132
 
        }
133
 
 
134
 
        theConsumer = aConsumer;
135
 
        debugs(91,7, HERE << "set consumer" << status());
136
 
        if (theBuf.hasContent())
137
 
                scheduleBodyDataNotification();
138
 
        if (!theProducer)
139
 
                scheduleBodyEndNotification();
140
 
 
141
 
        return true;
 
221
    assert(!theConsumer);
 
222
    assert(aConsumer);
 
223
 
 
224
    // TODO: convert this into an exception and remove IfNotLate suffix
 
225
    // If there is something consumed already, we are in an auto-consuming mode
 
226
    // and it is too late to attach a real consumer to the pipe.
 
227
    if (theGetSize > 0) {
 
228
        assert(mustAutoConsume);
 
229
        return false;
 
230
    }
 
231
 
 
232
    theConsumer = aConsumer;
 
233
    debugs(91,7, HERE << "set consumer" << status());
 
234
    if (theBuf.hasContent())
 
235
        scheduleBodyDataNotification();
 
236
    if (!theProducer)
 
237
        scheduleBodyEndNotification();
 
238
 
 
239
    return true;
142
240
}
143
241
 
144
242
// When BodyPipe consumer is gone, all events for that consumer must not
145
243
// reach the new consumer (if any). Otherwise, the calls may go out of order
146
 
// (if _some_ calls are dropped due to the ultimate destination being 
 
244
// (if _some_ calls are dropped due to the ultimate destination being
147
245
// temporary NULL). The code keeps track of the number of outstanding
148
246
// events and skips that number if consumer leaves. TODO: when AscyncCall
149
247
// support is improved, should we just schedule calls directly to consumer?
150
248
void
151
 
BodyPipe::clearConsumer() {
152
 
        if (theConsumer) {
153
 
                debugs(91,7, HERE << "clearing consumer" << status());
154
 
                theConsumer = NULL;
155
 
                theCCallsToSkip = theCCallsPending; // skip all pending consumer calls
156
 
                if (consumedSize() && !exhausted())
157
 
                        AsyncCall(91,5, this, BodyPipe::tellBodyConsumerAborted);
158
 
        }
 
249
BodyPipe::clearConsumer()
 
250
{
 
251
    if (theConsumer) {
 
252
        debugs(91,7, HERE << "clearing consumer" << status());
 
253
        theConsumer = NULL;
 
254
        if (consumedSize() && !exhausted()) {
 
255
            AsyncCall::Pointer call= asyncCall(91, 7,
 
256
                                               "BodyProducer::noteBodyConsumerAborted",
 
257
                                               BodyProducerDialer(theProducer,
 
258
                                                                  &BodyProducer::noteBodyConsumerAborted, this));
 
259
            ScheduleCallHere(call);
 
260
        }
 
261
    }
159
262
}
160
263
 
161
264
size_t
162
265
BodyPipe::getMoreData(MemBuf &buf)
163
266
{
164
 
        if (!theBuf.hasContent())
165
 
                return 0; // did not touch the possibly uninitialized buf
 
267
    if (!theBuf.hasContent())
 
268
        return 0; // did not touch the possibly uninitialized buf
166
269
 
167
 
        if (buf.isNull())
168
 
                buf.init();
169
 
        const size_t size = XMIN(theBuf.contentSize(), buf.potentialSpaceSize());
170
 
        buf.append(theBuf.content(), size);
171
 
        theBuf.consume(size);
172
 
        postConsume(size);
173
 
        return size; // cannot be zero if we called buf.init above
 
270
    if (buf.isNull())
 
271
        buf.init();
 
272
    const size_t size = min(theBuf.contentSize(), buf.potentialSpaceSize());
 
273
    buf.append(theBuf.content(), size);
 
274
    theBuf.consume(size);
 
275
    postConsume(size);
 
276
    return size; // cannot be zero if we called buf.init above
174
277
}
175
278
 
176
279
void
177
280
BodyPipe::consume(size_t size)
178
281
{
179
 
        theBuf.consume(size);
180
 
        postConsume(size);
181
 
}
182
 
 
183
 
void
184
 
BodyPipe::enableAutoConsumption() {
185
 
        mustAutoConsume = true;
186
 
        debugs(91,5, HERE << "enabled auto consumption" << status());
187
 
        if (!theConsumer && theBuf.hasContent())
188
 
                scheduleBodyDataNotification();
 
282
    theBuf.consume(size);
 
283
    postConsume(size);
 
284
}
 
285
 
 
286
// In the AutoConsumption  mode the consumer has gone but the producer continues
 
287
// producing data. We are using a BodySink BodyConsumer which just discards the produced data.
 
288
void
 
289
BodyPipe::enableAutoConsumption()
 
290
{
 
291
    mustAutoConsume = true;
 
292
    debugs(91,5, HERE << "enabled auto consumption" << status());
 
293
    if (!theConsumer && theBuf.hasContent())
 
294
        startAutoConsumption();
 
295
}
 
296
 
 
297
// start auto consumption by creating body sink
 
298
void
 
299
BodyPipe::startAutoConsumption()
 
300
{
 
301
    Must(mustAutoConsume);
 
302
    Must(!theConsumer);
 
303
    theConsumer = new BodySink;
 
304
    debugs(91,7, HERE << "starting auto consumption" << status());
 
305
    scheduleBodyDataNotification();
189
306
}
190
307
 
191
308
MemBuf &
192
 
BodyPipe::checkOut() {
193
 
        assert(!isCheckedOut);
194
 
        isCheckedOut = true;
195
 
        return theBuf;  
 
309
BodyPipe::checkOut()
 
310
{
 
311
    assert(!isCheckedOut);
 
312
    isCheckedOut = true;
 
313
    return theBuf;
196
314
}
197
315
 
198
316
void
199
317
BodyPipe::checkIn(Checkout &checkout)
200
318
{
201
 
        assert(isCheckedOut);
202
 
        isCheckedOut = false;
203
 
        const size_t currentSize = theBuf.contentSize();
204
 
        if (checkout.checkedOutSize > currentSize)
205
 
                postConsume(checkout.checkedOutSize - currentSize);
206
 
        else
207
 
        if (checkout.checkedOutSize < currentSize)
208
 
                postAppend(currentSize - checkout.checkedOutSize);
 
319
    assert(isCheckedOut);
 
320
    isCheckedOut = false;
 
321
    const size_t currentSize = theBuf.contentSize();
 
322
    if (checkout.checkedOutSize > currentSize)
 
323
        postConsume(checkout.checkedOutSize - currentSize);
 
324
    else
 
325
        if (checkout.checkedOutSize < currentSize)
 
326
            postAppend(currentSize - checkout.checkedOutSize);
209
327
}
210
328
 
211
329
void
212
330
BodyPipe::undoCheckOut(Checkout &checkout)
213
331
{
214
 
        assert(isCheckedOut);
215
 
        const size_t currentSize = theBuf.contentSize();
216
 
        // We can only undo if size did not change, and even that carries
217
 
        // some risk. If this becomes a problem, the code checking out
218
 
        // raw buffers should always check them in (possibly unchanged)
219
 
        // instead of relying on the automated undo mechanism of Checkout.
220
 
        // The code can always use a temporary buffer to accomplish that.
221
 
        assert(checkout.checkedOutSize == currentSize);
 
332
    assert(isCheckedOut);
 
333
    const size_t currentSize = theBuf.contentSize();
 
334
    // We can only undo if size did not change, and even that carries
 
335
    // some risk. If this becomes a problem, the code checking out
 
336
    // raw buffers should always check them in (possibly unchanged)
 
337
    // instead of relying on the automated undo mechanism of Checkout.
 
338
    // The code can always use a temporary buffer to accomplish that.
 
339
    assert(checkout.checkedOutSize == currentSize);
222
340
}
223
341
 
224
342
// TODO: Optimize: inform consumer/producer about more data/space only if
225
343
// they used the data/space since we notified them last time.
226
344
 
227
345
void
228
 
BodyPipe::postConsume(size_t size) {
229
 
        assert(!isCheckedOut);
230
 
        theGetSize += size;
231
 
        debugs(91,7, HERE << "consumed " << size << " bytes" << status());
232
 
        if (mayNeedMoreData())
233
 
                AsyncCall(91,5, this, BodyPipe::tellMoreBodySpaceAvailable);
 
346
BodyPipe::postConsume(size_t size)
 
347
{
 
348
    assert(!isCheckedOut);
 
349
    theGetSize += size;
 
350
    debugs(91,7, HERE << "consumed " << size << " bytes" << status());
 
351
    if (mayNeedMoreData()) {
 
352
        AsyncCall::Pointer call=  asyncCall(91, 7,
 
353
                                            "BodyProducer::noteMoreBodySpaceAvailable",
 
354
                                            BodyProducerDialer(theProducer,
 
355
                                                               &BodyProducer::noteMoreBodySpaceAvailable, this));
 
356
        ScheduleCallHere(call);
 
357
    }
234
358
}
235
359
 
236
360
void
237
 
BodyPipe::postAppend(size_t size) {
238
 
        assert(!isCheckedOut);
239
 
        thePutSize += size;
240
 
        debugs(91,7, HERE << "added " << size << " bytes" << status());
241
 
 
242
 
        // We should not consume here even if mustAutoConsume because the
243
 
        // caller may not be ready for the data to be consumed during this call.
244
 
        scheduleBodyDataNotification();
245
 
 
246
 
        if (!mayNeedMoreData())
247
 
                clearProducer(true); // reached end-of-body
 
361
BodyPipe::postAppend(size_t size)
 
362
{
 
363
    assert(!isCheckedOut);
 
364
    thePutSize += size;
 
365
    debugs(91,7, HERE << "added " << size << " bytes" << status());
 
366
 
 
367
    if (mustAutoConsume && !theConsumer && size > 0)
 
368
        startAutoConsumption();
 
369
 
 
370
    // We should not consume here even if mustAutoConsume because the
 
371
    // caller may not be ready for the data to be consumed during this call.
 
372
    scheduleBodyDataNotification();
 
373
 
 
374
    if (!mayNeedMoreData())
 
375
        clearProducer(true); // reached end-of-body
248
376
}
249
377
 
250
378
 
251
379
void
252
380
BodyPipe::scheduleBodyDataNotification()
253
381
{
254
 
        if (theConsumer || mustAutoConsume) {
255
 
                ++theCCallsPending;
256
 
                AsyncCall(91,5, this, BodyPipe::tellMoreBodyDataAvailable);
257
 
        }
 
382
    if (theConsumer) {
 
383
        AsyncCall::Pointer call = asyncCall(91, 7,
 
384
                                            "BodyConsumer::noteMoreBodyDataAvailable",
 
385
                                            BodyConsumerDialer(theConsumer,
 
386
                                                               &BodyConsumer::noteMoreBodyDataAvailable, this));
 
387
        ScheduleCallHere(call);
 
388
    }
258
389
}
259
390
 
260
391
void
261
392
BodyPipe::scheduleBodyEndNotification()
262
393
{
263
 
        if (theConsumer) {
264
 
                ++theCCallsPending;
265
 
                if (bodySizeKnown() && bodySize() == thePutSize)
266
 
                        AsyncCall(91,5, this, BodyPipe::tellBodyProductionEnded);
267
 
                else
268
 
                        AsyncCall(91,5, this, BodyPipe::tellBodyProducerAborted);
269
 
        }
270
 
}
271
 
 
272
 
void BodyPipe::tellMoreBodySpaceAvailable()
273
 
{
274
 
        if (theProducer != NULL)
275
 
                theProducer->noteMoreBodySpaceAvailable(*this);
276
 
}
277
 
 
278
 
void BodyPipe::tellBodyConsumerAborted()
279
 
{
280
 
        if (theProducer != NULL)
281
 
                theProducer->noteBodyConsumerAborted(*this);
282
 
}
283
 
 
284
 
void BodyPipe::tellMoreBodyDataAvailable()
285
 
{
286
 
        if (skipCCall())
287
 
                return;
288
 
 
289
 
        if (theConsumer != NULL)
290
 
                theConsumer->noteMoreBodyDataAvailable(*this);
291
 
        else
292
 
        if (mustAutoConsume && theBuf.hasContent())
293
 
                consume(theBuf.contentSize());
294
 
}
295
 
 
296
 
void BodyPipe::tellBodyProductionEnded()
297
 
{
298
 
        if (skipCCall())
299
 
                return;
300
 
 
301
 
        if (theConsumer != NULL)
302
 
                theConsumer->noteBodyProductionEnded(*this);
303
 
}
304
 
 
305
 
void BodyPipe::tellBodyProducerAborted()
306
 
{
307
 
        if (skipCCall())
308
 
                return;
309
 
 
310
 
        if (theConsumer != NULL)
311
 
                theConsumer->noteBodyProducerAborted(*this);
312
 
}
313
 
 
314
 
// skips calls destined for the previous consumer; see BodyPipe::clearConsumer
315
 
bool BodyPipe::skipCCall()
316
 
{
317
 
        assert(theCCallsPending > 0);
318
 
        --theCCallsPending;
319
 
 
320
 
        if (theCCallsToSkip <= 0)
321
 
                return false;
322
 
 
323
 
        --theCCallsToSkip;
324
 
        debugs(91,5, HERE << "skipped call");
325
 
        return true;
 
394
    if (theConsumer) {
 
395
        if (bodySizeKnown() && bodySize() == thePutSize) {
 
396
            AsyncCall::Pointer call = asyncCall(91, 7,
 
397
                                                "BodyConsumer::noteBodyProductionEnded",
 
398
                                                BodyConsumerDialer(theConsumer,
 
399
                                                                   &BodyConsumer::noteBodyProductionEnded, this));
 
400
            ScheduleCallHere(call);
 
401
        } else {
 
402
            AsyncCall::Pointer call = asyncCall(91, 7,
 
403
                                                "BodyConsumer::noteBodyProducerAborted",
 
404
                                                BodyConsumerDialer(theConsumer,
 
405
                                                                   &BodyConsumer::noteBodyProducerAborted, this));
 
406
            ScheduleCallHere(call);
 
407
        }
 
408
    }
326
409
}
327
410
 
328
411
// a short temporary string describing buffer status for debugging
333
416
 
334
417
    buf.append(" [", 2);
335
418
 
336
 
        buf.Printf("%"PRIu64"<=%"PRIu64, theGetSize, thePutSize);
 
419
    buf.Printf("%"PRIu64"<=%"PRIu64, theGetSize, thePutSize);
337
420
    if (theBodySize >= 0)
338
421
        buf.Printf("<=%"PRId64, theBodySize);
339
 
        else
340
 
                buf.append("<=?", 3);
341
 
 
342
 
        buf.Printf(" %d+%d", (int)theBuf.contentSize(), (int)theBuf.spaceSize());
343
 
 
344
 
        buf.Printf(" pipe%p", this);
 
422
    else
 
423
        buf.append("<=?", 3);
 
424
 
 
425
    buf.Printf(" %d+%d", (int)theBuf.contentSize(), (int)theBuf.spaceSize());
 
426
 
 
427
    buf.Printf(" pipe%p", this);
345
428
    if (theProducer)
346
429
        buf.Printf(" prod%p", theProducer);
347
430
    if (theConsumer)
348
431
        buf.Printf(" cons%p", theConsumer);
349
432
 
350
 
        if (mustAutoConsume)
351
 
                buf.append(" A", 2);
352
 
        if (isCheckedOut)
353
 
                buf.append(" L", 2); // Locked
 
433
    if (mustAutoConsume)
 
434
        buf.append(" A", 2);
 
435
    if (isCheckedOut)
 
436
        buf.append(" L", 2); // Locked
354
437
 
355
438
    buf.append("]", 1);
356
439
 
363
446
/* BodyPipeCheckout */
364
447
 
365
448
BodyPipeCheckout::BodyPipeCheckout(BodyPipe &aPipe): pipe(aPipe),
366
 
        buf(aPipe.checkOut()), offset(aPipe.consumedSize()),
367
 
        checkedOutSize(buf.contentSize()), checkedIn(false)
 
449
        buf(aPipe.checkOut()), offset(aPipe.consumedSize()),
 
450
        checkedOutSize(buf.contentSize()), checkedIn(false)
368
451
{
369
452
}
370
453
 
371
454
BodyPipeCheckout::~BodyPipeCheckout()
372
455
{
373
 
        if (!checkedIn) {
374
 
//              pipe.undoCheckOut(*this);
375
 
                debugs(91, 2, "BodyPipeCheckout will gone without the checkin performed"); 
376
 
                pipe.checkIn(*this);
377
 
        }
 
456
    if (!checkedIn)
 
457
        pipe.undoCheckOut(*this);
378
458
}
379
459
 
380
460
void
381
461
BodyPipeCheckout::checkIn()
382
462
{
383
 
        assert(!checkedIn);
384
 
        pipe.checkIn(*this);
385
 
        checkedIn = true;
 
463
    assert(!checkedIn);
 
464
    pipe.checkIn(*this);
 
465
    checkedIn = true;
386
466
}
387
467
 
388
468
 
389
469
BodyPipeCheckout::BodyPipeCheckout(const BodyPipeCheckout &c): pipe(c.pipe),
390
 
        buf(c.buf), offset(c.offset), checkedOutSize(c.checkedOutSize),
391
 
        checkedIn(c.checkedIn)
 
470
        buf(c.buf), offset(c.offset), checkedOutSize(c.checkedOutSize),
 
471
        checkedIn(c.checkedIn)
392
472
{
393
 
        assert(false); // prevent copying
 
473
    assert(false); // prevent copying
394
474
}
395
475
 
396
476
BodyPipeCheckout &
397
477
BodyPipeCheckout::operator =(const BodyPipeCheckout &)
398
478
{
399
 
        assert(false); // prevent assignment
400
 
        return *this;
 
479
    assert(false); // prevent assignment
 
480
    return *this;
401
481
}