3
3
#include "BodyPipe.h"
4
#include "TextException.h"
5
6
CBDATA_CLASS_INIT(BodyPipe);
8
// BodySink is a BodyConsumer class which just consume and drops
9
// data from a BodyPipe
10
class BodySink: public BodyConsumer
14
BodySink():AsyncJob("BodySink"), done(false) {}
15
virtual ~BodySink() {}
17
virtual void noteMoreBodyDataAvailable(BodyPipe::Pointer bp) {
18
size_t contentSize = bp->buf().contentSize();
19
bp->consume(contentSize);
21
virtual void noteBodyProductionEnded(BodyPipe::Pointer bp) {
22
stopConsumingFrom(bp);
25
virtual void noteBodyProducerAborted(BodyPipe::Pointer bp) {
26
stopConsumingFrom(bp);
29
bool doneAll() const {return done && AsyncJob::doneAll();}
30
CBDATA_CLASS2(BodySink);
33
CBDATA_CLASS_INIT(BodySink);
35
// The BodyProducerDialer is an AsyncCall class which used to schedule BodyProducer calls.
36
// In addition to a normal AsyncCall checks if the BodyProducer is still the producer of
37
// the BodyPipe passed as argument
38
class BodyProducerDialer: public UnaryMemFunT<BodyProducer, BodyPipe::Pointer>
41
typedef UnaryMemFunT<BodyProducer, BodyPipe::Pointer> Parent;
43
BodyProducerDialer(BodyProducer *aProducer, Parent::Method aHandler,
44
BodyPipe::Pointer bp): Parent(aProducer, aHandler, bp) {}
46
virtual bool canDial(AsyncCall &call);
49
// The BodyConsumerDialer is an AsyncCall class which used to schedule BodyConsumer calls.
50
// In addition to a normal AsyncCall checks if the BodyConsumer is still the reciptient
51
// of the BodyPipe passed as argument
52
class BodyConsumerDialer: public UnaryMemFunT<BodyConsumer, BodyPipe::Pointer>
55
typedef UnaryMemFunT<BodyConsumer, BodyPipe::Pointer> Parent;
57
BodyConsumerDialer(BodyConsumer *aConsumer, Parent::Method aHandler,
58
BodyPipe::Pointer bp): Parent(aConsumer, aHandler, bp) {}
60
virtual bool canDial(AsyncCall &call);
64
BodyProducerDialer::canDial(AsyncCall &call)
66
if (!Parent::canDial(call))
69
BodyProducer *producer = object;
70
BodyPipe::Pointer pipe = arg1;
71
if (!pipe->stillProducing(producer)) {
72
debugs(call.debugSection, call.debugLevel, HERE << producer <<
73
" no longer producing for " << pipe->status());
74
return call.cancel("no longer producing");
81
BodyConsumerDialer::canDial(AsyncCall &call)
83
if (!Parent::canDial(call))
86
BodyConsumer *consumer = object;
87
BodyPipe::Pointer pipe = arg1;
88
if (!pipe->stillConsuming(consumer)) {
89
debugs(call.debugSection, call.debugLevel, HERE << consumer <<
90
" no longer consuming from " << pipe->status());
91
return call.cancel("no longer consuming");
7
100
// inform the pipe that we are done and clear the Pointer
8
101
void BodyProducer::stopProducingFor(RefCount<BodyPipe> &pipe, bool atEof)
10
debugs(91,7, HERE << this << " will not produce for " << pipe <<
11
"; atEof: " << atEof);
12
assert(pipe != NULL); // be strict: the caller state may depend on this
13
pipe->clearProducer(atEof);
103
debugs(91,7, HERE << this << " will not produce for " << pipe <<
104
"; atEof: " << atEof);
105
assert(pipe != NULL); // be strict: the caller state may depend on this
106
pipe->clearProducer(atEof);
17
114
// inform the pipe that we are done and clear the Pointer
18
115
void BodyConsumer::stopConsumingFrom(RefCount<BodyPipe> &pipe)
20
debugs(91,7, HERE << this << " will not consume from " << pipe);
21
assert(pipe != NULL); // be strict: the caller state may depend on this
22
pipe->clearConsumer();
117
debugs(91,7, HERE << this << " will not consume from " << pipe);
118
assert(pipe != NULL); // be strict: the caller state may depend on this
119
pipe->clearConsumer();
28
126
BodyPipe::BodyPipe(Producer *aProducer): theBodySize(-1),
29
theProducer(aProducer), theConsumer(0),
30
thePutSize(0), theGetSize(0), theCCallsPending(0), theCCallsToSkip(0),
31
mustAutoConsume(false), isCheckedOut(false)
127
theProducer(aProducer), theConsumer(0),
128
thePutSize(0), theGetSize(0),
129
mustAutoConsume(false), isCheckedOut(false)
33
// TODO: teach MemBuf to start with zero minSize
34
// TODO: limit maxSize by theBodySize, when known?
35
theBuf.init(2*1024, MaxCapacity);
36
debugs(91,7, HERE << "created BodyPipe" << status());
131
// TODO: teach MemBuf to start with zero minSize
132
// TODO: limit maxSize by theBodySize, when known?
133
theBuf.init(2*1024, MaxCapacity);
134
debugs(91,7, HERE << "created BodyPipe" << status());
39
137
BodyPipe::~BodyPipe()
41
debugs(91,7, HERE << "destroying BodyPipe" << status());
139
debugs(91,7, HERE << "destroying BodyPipe" << status());
140
assert(!theProducer);
141
assert(!theConsumer);
47
145
void BodyPipe::setBodySize(uint64_t aBodySize)
49
assert(!bodySizeKnown());
50
assert(aBodySize >= 0);
51
assert(thePutSize <= aBodySize);
53
// If this assert fails, we need to add code to check for eof and inform
54
// the consumer about the eof condition via scheduleBodyEndNotification,
55
// because just setting a body size limit may trigger the eof condition.
58
theBodySize = aBodySize;
59
debugs(91,7, HERE << "set body size" << status());
147
assert(!bodySizeKnown());
148
assert(aBodySize >= 0);
149
assert(thePutSize <= aBodySize);
151
// If this assert fails, we need to add code to check for eof and inform
152
// the consumer about the eof condition via scheduleBodyEndNotification,
153
// because just setting a body size limit may trigger the eof condition.
154
assert(!theConsumer);
156
theBodySize = aBodySize;
157
debugs(91,7, HERE << "set body size" << status());
62
160
uint64_t BodyPipe::bodySize() const
64
assert(bodySizeKnown());
65
return static_cast<uint64_t>(theBodySize);
162
assert(bodySizeKnown());
163
return static_cast<uint64_t>(theBodySize);
68
166
bool BodyPipe::expectMoreAfter(uint64_t offset) const
70
assert(theGetSize <= offset);
71
return offset < thePutSize || // buffer has more now or
72
(!productionEnded() && mayNeedMoreData()); // buffer will have more
168
assert(theGetSize <= offset);
169
return offset < thePutSize || // buffer has more now or
170
(!productionEnded() && mayNeedMoreData()); // buffer will have more
75
173
bool BodyPipe::exhausted() const
77
return !expectMoreAfter(theGetSize);
175
return !expectMoreAfter(theGetSize);
80
178
uint64_t BodyPipe::unproducedSize() const
82
return bodySize() - thePutSize; // bodySize() asserts that size is known
180
return bodySize() - thePutSize; // bodySize() asserts that size is known
86
184
BodyPipe::clearProducer(bool atEof)
89
debugs(91,7, HERE << "clearing BodyPipe producer" << status());
93
theBodySize = thePutSize;
95
if (bodySize() != thePutSize)
96
debugs(91,3, HERE << "aborting on premature eof" << status());
98
// asserta that we can detect the abort if the consumer joins later
99
assert(!bodySizeKnown() || bodySize() != thePutSize);
101
scheduleBodyEndNotification();
187
debugs(91,7, HERE << "clearing BodyPipe producer" << status());
190
if (!bodySizeKnown())
191
theBodySize = thePutSize;
193
if (bodySize() != thePutSize)
194
debugs(91,3, HERE << "aborting on premature eof" << status());
196
// asserta that we can detect the abort if the consumer joins later
197
assert(!bodySizeKnown() || bodySize() != thePutSize);
199
scheduleBodyEndNotification();
106
204
BodyPipe::putMoreData(const char *buf, size_t size)
109
size = XMIN((uint64_t)size, unproducedSize());
207
size = min((uint64_t)size, unproducedSize());
111
const size_t spaceSize = static_cast<size_t>(theBuf.potentialSpaceSize());
112
if ((size = XMIN(size, spaceSize))) {
113
theBuf.append(buf, size);
209
const size_t spaceSize = static_cast<size_t>(theBuf.potentialSpaceSize());
210
if ((size = min(size, spaceSize))) {
211
theBuf.append(buf, size);
121
219
BodyPipe::setConsumerIfNotLate(Consumer *aConsumer)
123
assert(!theConsumer);
126
// TODO: convert this into an exception and remove IfNotLate suffix
127
// If there is something consumed already, we are in an auto-consuming mode
128
// and it is too late to attach a real consumer to the pipe.
129
if (theGetSize > 0) {
130
assert(mustAutoConsume);
134
theConsumer = aConsumer;
135
debugs(91,7, HERE << "set consumer" << status());
136
if (theBuf.hasContent())
137
scheduleBodyDataNotification();
139
scheduleBodyEndNotification();
221
assert(!theConsumer);
224
// TODO: convert this into an exception and remove IfNotLate suffix
225
// If there is something consumed already, we are in an auto-consuming mode
226
// and it is too late to attach a real consumer to the pipe.
227
if (theGetSize > 0) {
228
assert(mustAutoConsume);
232
theConsumer = aConsumer;
233
debugs(91,7, HERE << "set consumer" << status());
234
if (theBuf.hasContent())
235
scheduleBodyDataNotification();
237
scheduleBodyEndNotification();
144
242
// When BodyPipe consumer is gone, all events for that consumer must not
145
243
// reach the new consumer (if any). Otherwise, the calls may go out of order
146
// (if _some_ calls are dropped due to the ultimate destination being
244
// (if _some_ calls are dropped due to the ultimate destination being
147
245
// temporary NULL). The code keeps track of the number of outstanding
148
246
// events and skips that number if consumer leaves. TODO: when AscyncCall
149
247
// support is improved, should we just schedule calls directly to consumer?
151
BodyPipe::clearConsumer() {
153
debugs(91,7, HERE << "clearing consumer" << status());
155
theCCallsToSkip = theCCallsPending; // skip all pending consumer calls
156
if (consumedSize() && !exhausted())
157
AsyncCall(91,5, this, BodyPipe::tellBodyConsumerAborted);
249
BodyPipe::clearConsumer()
252
debugs(91,7, HERE << "clearing consumer" << status());
254
if (consumedSize() && !exhausted()) {
255
AsyncCall::Pointer call= asyncCall(91, 7,
256
"BodyProducer::noteBodyConsumerAborted",
257
BodyProducerDialer(theProducer,
258
&BodyProducer::noteBodyConsumerAborted, this));
259
ScheduleCallHere(call);
162
265
BodyPipe::getMoreData(MemBuf &buf)
164
if (!theBuf.hasContent())
165
return 0; // did not touch the possibly uninitialized buf
267
if (!theBuf.hasContent())
268
return 0; // did not touch the possibly uninitialized buf
169
const size_t size = XMIN(theBuf.contentSize(), buf.potentialSpaceSize());
170
buf.append(theBuf.content(), size);
171
theBuf.consume(size);
173
return size; // cannot be zero if we called buf.init above
272
const size_t size = min(theBuf.contentSize(), buf.potentialSpaceSize());
273
buf.append(theBuf.content(), size);
274
theBuf.consume(size);
276
return size; // cannot be zero if we called buf.init above
177
280
BodyPipe::consume(size_t size)
179
theBuf.consume(size);
184
BodyPipe::enableAutoConsumption() {
185
mustAutoConsume = true;
186
debugs(91,5, HERE << "enabled auto consumption" << status());
187
if (!theConsumer && theBuf.hasContent())
188
scheduleBodyDataNotification();
282
theBuf.consume(size);
286
// In the AutoConsumption mode the consumer has gone but the producer continues
287
// producing data. We are using a BodySink BodyConsumer which just discards the produced data.
289
BodyPipe::enableAutoConsumption()
291
mustAutoConsume = true;
292
debugs(91,5, HERE << "enabled auto consumption" << status());
293
if (!theConsumer && theBuf.hasContent())
294
startAutoConsumption();
297
// start auto consumption by creating body sink
299
BodyPipe::startAutoConsumption()
301
Must(mustAutoConsume);
303
theConsumer = new BodySink;
304
debugs(91,7, HERE << "starting auto consumption" << status());
305
scheduleBodyDataNotification();
192
BodyPipe::checkOut() {
193
assert(!isCheckedOut);
311
assert(!isCheckedOut);
199
317
BodyPipe::checkIn(Checkout &checkout)
201
assert(isCheckedOut);
202
isCheckedOut = false;
203
const size_t currentSize = theBuf.contentSize();
204
if (checkout.checkedOutSize > currentSize)
205
postConsume(checkout.checkedOutSize - currentSize);
207
if (checkout.checkedOutSize < currentSize)
208
postAppend(currentSize - checkout.checkedOutSize);
319
assert(isCheckedOut);
320
isCheckedOut = false;
321
const size_t currentSize = theBuf.contentSize();
322
if (checkout.checkedOutSize > currentSize)
323
postConsume(checkout.checkedOutSize - currentSize);
325
if (checkout.checkedOutSize < currentSize)
326
postAppend(currentSize - checkout.checkedOutSize);
212
330
BodyPipe::undoCheckOut(Checkout &checkout)
214
assert(isCheckedOut);
215
const size_t currentSize = theBuf.contentSize();
216
// We can only undo if size did not change, and even that carries
217
// some risk. If this becomes a problem, the code checking out
218
// raw buffers should always check them in (possibly unchanged)
219
// instead of relying on the automated undo mechanism of Checkout.
220
// The code can always use a temporary buffer to accomplish that.
221
assert(checkout.checkedOutSize == currentSize);
332
assert(isCheckedOut);
333
const size_t currentSize = theBuf.contentSize();
334
// We can only undo if size did not change, and even that carries
335
// some risk. If this becomes a problem, the code checking out
336
// raw buffers should always check them in (possibly unchanged)
337
// instead of relying on the automated undo mechanism of Checkout.
338
// The code can always use a temporary buffer to accomplish that.
339
assert(checkout.checkedOutSize == currentSize);
224
342
// TODO: Optimize: inform consumer/producer about more data/space only if
225
343
// they used the data/space since we notified them last time.
228
BodyPipe::postConsume(size_t size) {
229
assert(!isCheckedOut);
231
debugs(91,7, HERE << "consumed " << size << " bytes" << status());
232
if (mayNeedMoreData())
233
AsyncCall(91,5, this, BodyPipe::tellMoreBodySpaceAvailable);
346
BodyPipe::postConsume(size_t size)
348
assert(!isCheckedOut);
350
debugs(91,7, HERE << "consumed " << size << " bytes" << status());
351
if (mayNeedMoreData()) {
352
AsyncCall::Pointer call= asyncCall(91, 7,
353
"BodyProducer::noteMoreBodySpaceAvailable",
354
BodyProducerDialer(theProducer,
355
&BodyProducer::noteMoreBodySpaceAvailable, this));
356
ScheduleCallHere(call);
237
BodyPipe::postAppend(size_t size) {
238
assert(!isCheckedOut);
240
debugs(91,7, HERE << "added " << size << " bytes" << status());
242
// We should not consume here even if mustAutoConsume because the
243
// caller may not be ready for the data to be consumed during this call.
244
scheduleBodyDataNotification();
246
if (!mayNeedMoreData())
247
clearProducer(true); // reached end-of-body
361
BodyPipe::postAppend(size_t size)
363
assert(!isCheckedOut);
365
debugs(91,7, HERE << "added " << size << " bytes" << status());
367
if (mustAutoConsume && !theConsumer && size > 0)
368
startAutoConsumption();
370
// We should not consume here even if mustAutoConsume because the
371
// caller may not be ready for the data to be consumed during this call.
372
scheduleBodyDataNotification();
374
if (!mayNeedMoreData())
375
clearProducer(true); // reached end-of-body
252
380
BodyPipe::scheduleBodyDataNotification()
254
if (theConsumer || mustAutoConsume) {
256
AsyncCall(91,5, this, BodyPipe::tellMoreBodyDataAvailable);
383
AsyncCall::Pointer call = asyncCall(91, 7,
384
"BodyConsumer::noteMoreBodyDataAvailable",
385
BodyConsumerDialer(theConsumer,
386
&BodyConsumer::noteMoreBodyDataAvailable, this));
387
ScheduleCallHere(call);
261
392
BodyPipe::scheduleBodyEndNotification()
265
if (bodySizeKnown() && bodySize() == thePutSize)
266
AsyncCall(91,5, this, BodyPipe::tellBodyProductionEnded);
268
AsyncCall(91,5, this, BodyPipe::tellBodyProducerAborted);
272
void BodyPipe::tellMoreBodySpaceAvailable()
274
if (theProducer != NULL)
275
theProducer->noteMoreBodySpaceAvailable(*this);
278
void BodyPipe::tellBodyConsumerAborted()
280
if (theProducer != NULL)
281
theProducer->noteBodyConsumerAborted(*this);
284
void BodyPipe::tellMoreBodyDataAvailable()
289
if (theConsumer != NULL)
290
theConsumer->noteMoreBodyDataAvailable(*this);
292
if (mustAutoConsume && theBuf.hasContent())
293
consume(theBuf.contentSize());
296
void BodyPipe::tellBodyProductionEnded()
301
if (theConsumer != NULL)
302
theConsumer->noteBodyProductionEnded(*this);
305
void BodyPipe::tellBodyProducerAborted()
310
if (theConsumer != NULL)
311
theConsumer->noteBodyProducerAborted(*this);
314
// skips calls destined for the previous consumer; see BodyPipe::clearConsumer
315
bool BodyPipe::skipCCall()
317
assert(theCCallsPending > 0);
320
if (theCCallsToSkip <= 0)
324
debugs(91,5, HERE << "skipped call");
395
if (bodySizeKnown() && bodySize() == thePutSize) {
396
AsyncCall::Pointer call = asyncCall(91, 7,
397
"BodyConsumer::noteBodyProductionEnded",
398
BodyConsumerDialer(theConsumer,
399
&BodyConsumer::noteBodyProductionEnded, this));
400
ScheduleCallHere(call);
402
AsyncCall::Pointer call = asyncCall(91, 7,
403
"BodyConsumer::noteBodyProducerAborted",
404
BodyConsumerDialer(theConsumer,
405
&BodyConsumer::noteBodyProducerAborted, this));
406
ScheduleCallHere(call);
328
411
// a short temporary string describing buffer status for debugging