41
42
typedef UnaryMemFunT<BodyProducer, BodyPipe::Pointer> Parent;
43
BodyProducerDialer(BodyProducer *aProducer, Parent::Method aHandler,
44
BodyPipe::Pointer bp): Parent(aProducer, aHandler, bp) {}
44
BodyProducerDialer(const BodyProducer::Pointer &aProducer,
45
Parent::Method aHandler, BodyPipe::Pointer bp):
46
Parent(aProducer, aHandler, bp) {}
46
48
virtual bool canDial(AsyncCall &call);
55
57
typedef UnaryMemFunT<BodyConsumer, BodyPipe::Pointer> Parent;
57
BodyConsumerDialer(BodyConsumer *aConsumer, Parent::Method aHandler,
58
BodyPipe::Pointer bp): Parent(aConsumer, aHandler, bp) {}
59
BodyConsumerDialer(const BodyConsumer::Pointer &aConsumer,
60
Parent::Method aHandler, BodyPipe::Pointer bp):
61
Parent(aConsumer, aHandler, bp) {}
60
63
virtual bool canDial(AsyncCall &call);
66
69
if (!Parent::canDial(call))
69
BodyProducer *producer = object;
72
const BodyProducer::Pointer &producer = job;
70
73
BodyPipe::Pointer pipe = arg1;
71
74
if (!pipe->stillProducing(producer)) {
72
75
debugs(call.debugSection, call.debugLevel, HERE << producer <<
83
86
if (!Parent::canDial(call))
86
BodyConsumer *consumer = object;
89
const BodyConsumer::Pointer &consumer = job;
87
90
BodyPipe::Pointer pipe = arg1;
88
91
if (!pipe->stillConsuming(consumer)) {
89
92
debugs(call.debugSection, call.debugLevel, HERE << consumer <<
126
129
BodyPipe::BodyPipe(Producer *aProducer): theBodySize(-1),
127
130
theProducer(aProducer), theConsumer(0),
128
131
thePutSize(0), theGetSize(0),
129
mustAutoConsume(false), isCheckedOut(false)
132
mustAutoConsume(false), abortedConsumption(false), isCheckedOut(false)
131
134
// TODO: teach MemBuf to start with zero minSize
132
135
// TODO: limit maxSize by theBodySize, when known?
218
BodyPipe::setConsumerIfNotLate(Consumer *aConsumer)
221
BodyPipe::setConsumerIfNotLate(const Consumer::Pointer &aConsumer)
220
223
assert(!theConsumer);
224
assert(aConsumer.set()); // but might be invalid
223
226
// TODO: convert this into an exception and remove IfNotLate suffix
224
227
// If there is something consumed already, we are in an auto-consuming mode
241
// When BodyPipe consumer is gone, all events for that consumer must not
242
// reach the new consumer (if any). Otherwise, the calls may go out of order
243
// (if _some_ calls are dropped due to the ultimate destination being
244
// temporary NULL). The code keeps track of the number of outstanding
245
// events and skips that number if consumer leaves. TODO: when AscyncCall
246
// support is improved, should we just schedule calls directly to consumer?
248
247
BodyPipe::clearConsumer()
249
if (theConsumer.set()) {
251
250
debugs(91,7, HERE << "clearing consumer" << status());
253
if (consumedSize() && !exhausted()) {
254
AsyncCall::Pointer call= asyncCall(91, 7,
255
"BodyProducer::noteBodyConsumerAborted",
256
BodyProducerDialer(theProducer,
257
&BodyProducer::noteBodyConsumerAborted, this));
258
ScheduleCallHere(call);
252
// do not abort if we have not consumed so that HTTP or ICAP can retry
253
// benign xaction failures due to persistent connection race conditions
255
expectNoConsumption();
260
BodyPipe::expectNoConsumption()
263
if (!abortedConsumption && !exhausted()) {
264
AsyncCall::Pointer call= asyncCall(91, 7,
265
"BodyProducer::noteBodyConsumerAborted",
266
BodyProducerDialer(theProducer,
267
&BodyProducer::noteBodyConsumerAborted, this));
268
ScheduleCallHere(call);
269
abortedConsumption = true;
334
344
// raw buffers should always check them in (possibly unchanged)
335
345
// instead of relying on the automated undo mechanism of Checkout.
336
346
// The code can always use a temporary buffer to accomplish that.
337
assert(checkout.checkedOutSize == currentSize);
347
Must(checkout.checkedOutSize == currentSize);
340
350
// TODO: Optimize: inform consumer/producer about more data/space only if
378
388
BodyPipe::scheduleBodyDataNotification()
390
if (theConsumer.valid()) { // TODO: allow asyncCall() to check this instead
381
391
AsyncCall::Pointer call = asyncCall(91, 7,
382
392
"BodyConsumer::noteMoreBodyDataAvailable",
383
393
BodyConsumerDialer(theConsumer,
390
400
BodyPipe::scheduleBodyEndNotification()
402
if (theConsumer.valid()) { // TODO: allow asyncCall() to check this instead
393
403
if (bodySizeKnown() && bodySize() == thePutSize) {
394
404
AsyncCall::Pointer call = asyncCall(91, 7,
395
405
"BodyConsumer::noteBodyProductionEnded",
423
433
outputBuffer.Printf(" %d+%d", (int)theBuf.contentSize(), (int)theBuf.spaceSize());
425
435
outputBuffer.Printf(" pipe%p", this);
427
outputBuffer.Printf(" prod%p", theProducer);
429
outputBuffer.Printf(" cons%p", theConsumer);
436
if (theProducer.set())
437
outputBuffer.Printf(" prod%p", theProducer.get());
438
if (theConsumer.set())
439
outputBuffer.Printf(" cons%p", theConsumer.get());
431
441
if (mustAutoConsume)
432
442
outputBuffer.append(" A", 2);
443
if (abortedConsumption)
444
outputBuffer.append(" !C", 3);
433
445
if (isCheckedOut)
434
446
outputBuffer.append(" L", 2); // Locked
452
464
BodyPipeCheckout::~BodyPipeCheckout()
455
pipe.undoCheckOut(*this);
467
// Do not pipe.undoCheckOut(*this) because it asserts or throws
468
// TODO: consider implementing the long-term solution discussed at
469
// http://www.mail-archive.com/squid-dev@squid-cache.org/msg07910.html
470
debugs(91,2, HERE << "Warning: cannot undo BodyPipeCheckout");