2
#include <libecap/common/area.h>
3
#include <libecap/common/delay.h>
4
#include <libecap/adapter/xaction.h>
5
#include "TextException.h"
6
#include "HttpRequest.h"
9
#include "adaptation/ecap/XactionRep.h"
11
CBDATA_NAMESPACED_CLASS_INIT(Adaptation::Ecap::XactionRep, XactionRep);
14
Adaptation::Ecap::XactionRep::XactionRep(Adaptation::Initiator *anInitiator,
15
HttpMsg *virginHeader, HttpRequest *virginCause,
16
const Adaptation::ServicePointer &aService):
17
AsyncJob("Adaptation::Ecap::XactionRep"),
18
Adaptation::Initiate("Adaptation::Ecap::XactionRep", anInitiator),
20
theVirginRep(virginHeader), theCauseRep(NULL),
21
proxyingVb(opUndecided), proxyingAb(opUndecided),
24
abProductionFinished(false), abProductionAtEnd(false)
27
theCauseRep = new MessageRep(virginCause);
30
Adaptation::Ecap::XactionRep::~XactionRep()
38
Adaptation::Ecap::XactionRep::master(const AdapterXaction &x)
46
Adaptation::Ecap::XactionRep::service()
48
Must(theService != NULL);
53
Adaptation::Ecap::XactionRep::start()
57
if (theVirginRep.raw().body_pipe != NULL)
58
canAccessVb = true; /// assumes nobody is consuming; \todo check
62
const HttpRequest *request = dynamic_cast<const HttpRequest*> (theCauseRep ?
63
theCauseRep->raw().header : theVirginRep.raw().header);
65
Adaptation::History::Pointer ah = request->adaptLogHistory();
67
// retrying=false because ecap never retries transactions
68
adaptHistoryId = ah->recordXactStart(service().cfg().key, current_time, false);
75
Adaptation::Ecap::XactionRep::swanSong()
77
// clear body_pipes, if any
78
// this code does not maintain proxying* and canAccessVb states; should it?
80
if (theAnswerRep != NULL) {
81
BodyPipe::Pointer body_pipe = answer().body_pipe;
82
if (body_pipe != NULL) {
83
Must(body_pipe->stillProducing(this));
84
stopProducingFor(body_pipe, false);
89
BodyPipe::Pointer body_pipe = theVirginRep.raw().body_pipe;
90
if (body_pipe != NULL) {
91
Must(body_pipe->stillConsuming(this));
92
stopConsumingFrom(body_pipe);
98
const HttpRequest *request = dynamic_cast<const HttpRequest*>(theCauseRep ?
99
theCauseRep->raw().header : theVirginRep.raw().header);
101
Adaptation::History::Pointer ah = request->adaptLogHistory();
102
if (ah != NULL && adaptHistoryId >= 0)
103
ah->recordXactFinish(adaptHistoryId);
105
Adaptation::Initiate::swanSong();
109
Adaptation::Ecap::XactionRep::virgin()
114
const libecap::Message &
115
Adaptation::Ecap::XactionRep::cause()
117
Must(theCauseRep != NULL);
122
Adaptation::Ecap::XactionRep::adapted()
124
Must(theAnswerRep != NULL);
125
return *theAnswerRep;
128
Adaptation::Message &
129
Adaptation::Ecap::XactionRep::answer()
131
MessageRep *rep = dynamic_cast<MessageRep*>(theAnswerRep.get());
137
Adaptation::Ecap::XactionRep::terminateMaster()
140
AdapterXaction x = theMaster;
147
Adaptation::Ecap::XactionRep::doneAll() const
149
return proxyingVb >= opComplete && proxyingAb >= opComplete &&
150
Adaptation::Initiate::doneAll();
153
// stops receiving virgin and enables auto-consumption
155
Adaptation::Ecap::XactionRep::dropVirgin(const char *reason)
157
debugs(93,4, HERE << "because " << reason << "; status:" << status());
158
Must(proxyingVb = opOn);
160
BodyPipePointer &p = theVirginRep.raw().body_pipe;
162
Must(p->stillConsuming(this));
163
stopConsumingFrom(p);
164
p->enableAutoConsumption();
165
proxyingVb = opComplete;
168
// called from adapter handler so does not inform adapter
172
Adaptation::Ecap::XactionRep::useVirgin()
174
debugs(93,3, HERE << status());
175
Must(proxyingAb == opUndecided);
176
proxyingAb = opNever;
178
BodyPipePointer &vbody_pipe = theVirginRep.raw().body_pipe;
180
HttpMsg *clone = theVirginRep.raw().header->clone();
181
// check that clone() copies the pipe so that we do not have to
182
Must(!vbody_pipe == !clone->body_pipe);
184
if (proxyingVb == opOn) {
185
Must(vbody_pipe->stillConsuming(this));
186
// if libecap consumed, we cannot shortcircuit
187
Must(!vbody_pipe->consumedSize());
188
stopConsumingFrom(vbody_pipe);
190
proxyingVb = opComplete;
191
} else if (proxyingVb == opUndecided) {
192
vbody_pipe = NULL; // it is not our pipe anymore
193
proxyingVb = opNever;
201
Adaptation::Ecap::XactionRep::useAdapted(const libecap::shared_ptr<libecap::Message> &m)
203
debugs(93,3, HERE << status());
206
Must(proxyingAb == opUndecided);
208
HttpMsg *msg = answer().header;
209
if (!theAnswerRep->body()) { // final, bodyless answer
210
proxyingAb = opNever;
212
} else { // got answer headers but need to handle body
214
Must(!msg->body_pipe); // only host can set body pipes
215
MessageRep *rep = dynamic_cast<MessageRep*>(theAnswerRep.get());
217
rep->tieBody(this); // sets us as a producer
218
Must(msg->body_pipe != NULL); // check tieBody
222
debugs(93,4, HERE << "adapter will produce body" << status());
223
theMaster->abMake(); // libecap will produce
228
Adaptation::Ecap::XactionRep::vbDiscard()
230
Must(proxyingVb == opUndecided);
231
// if adapter does not need vb, we do not need to send it
232
dropVirgin("vbDiscard");
233
Must(proxyingVb == opNever);
237
Adaptation::Ecap::XactionRep::vbMake()
239
Must(proxyingVb == opUndecided);
240
BodyPipePointer &p = theVirginRep.raw().body_pipe;
242
Must(p->setConsumerIfNotLate(this)); // to make vb, we must receive vb
247
Adaptation::Ecap::XactionRep::vbStopMaking()
249
// if adapter does not need vb, we do not need to receive it
250
if (proxyingVb == opOn)
251
dropVirgin("vbStopMaking");
252
Must(proxyingVb == opComplete);
256
Adaptation::Ecap::XactionRep::vbMakeMore()
258
Must(proxyingVb == opOn); // cannot make more if done proxying
259
// we cannot guarantee more vb, but we can check that there is a chance
260
Must(!theVirginRep.raw().body_pipe->exhausted());
264
Adaptation::Ecap::XactionRep::vbContent(libecap::size_type o, libecap::size_type s)
267
// We may not be proxyingVb yet. It should be OK, but see vbContentShift().
269
const BodyPipePointer &p = theVirginRep.raw().body_pipe;
272
// TODO: make MemBuf use size_t?
273
const size_t haveSize = static_cast<size_t>(p->buf().contentSize());
275
// convert to Squid types; XXX: check for overflow
276
const uint64_t offset = static_cast<uint64_t>(o);
277
Must(offset <= haveSize); // equal iff at the end of content
279
// nsize means no size limit: all content starting from offset
280
const size_t size = s == libecap::nsize ?
281
haveSize - offset : static_cast<size_t>(s);
283
// XXX: optimize by making theBody a shared_ptr (see Area::FromTemp*() src)
284
return libecap::Area::FromTempBuffer(p->buf().content() + offset,
285
min(static_cast<size_t>(haveSize - offset), size));
289
Adaptation::Ecap::XactionRep::vbContentShift(libecap::size_type n)
292
// We may not be proxyingVb yet. It should be OK now, but if BodyPipe
293
// consume() requirements change, we would have to return empty vbContent
294
// until the adapter registers as a consumer
296
BodyPipePointer &p = theVirginRep.raw().body_pipe;
298
const size_t size = static_cast<size_t>(n); // XXX: check for overflow
299
const size_t haveSize = static_cast<size_t>(p->buf().contentSize()); // TODO: make MemBuf use size_t?
300
p->consume(min(size, haveSize));
304
Adaptation::Ecap::XactionRep::noteAbContentDone(bool atEnd)
306
Must(proxyingAb == opOn && !abProductionFinished);
307
abProductionFinished = true;
308
abProductionAtEnd = atEnd; // store until ready to stop producing ourselves
309
debugs(93,5, HERE << "adapted body production ended");
314
Adaptation::Ecap::XactionRep::noteAbContentAvailable()
316
Must(proxyingAb == opOn && !abProductionFinished);
320
#if 0 /* XXX: implement */
322
Adaptation::Ecap::XactionRep::setAdaptedBodySize(const libecap::BodySize &size)
324
Must(answer().body_pipe != NULL);
326
answer().body_pipe->setBodySize(size.value());
327
// else the piped body size is unknown by default
332
Adaptation::Ecap::XactionRep::adaptationDelayed(const libecap::Delay &d)
334
debugs(93,3, HERE << "adapter needs time: " <<
335
d.state << '/' << d.progress);
340
Adaptation::Ecap::XactionRep::adaptationAborted()
342
tellQueryAborted(true); // should eCAP support retries?
343
mustStop("adaptationAborted");
347
Adaptation::Ecap::XactionRep::callable() const
353
Adaptation::Ecap::XactionRep::noteMoreBodySpaceAvailable(RefCount<BodyPipe> bp)
355
Must(proxyingAb == opOn);
360
Adaptation::Ecap::XactionRep::noteBodyConsumerAborted(RefCount<BodyPipe> bp)
362
Must(proxyingAb == opOn);
363
stopProducingFor(answer().body_pipe, false);
365
theMaster->abStopMaking();
366
proxyingAb = opComplete;
370
Adaptation::Ecap::XactionRep::noteMoreBodyDataAvailable(RefCount<BodyPipe> bp)
372
Must(proxyingVb == opOn);
374
theMaster->noteVbContentAvailable();
378
Adaptation::Ecap::XactionRep::noteBodyProductionEnded(RefCount<BodyPipe> bp)
380
Must(proxyingVb == opOn);
382
theMaster->noteVbContentDone(true);
383
proxyingVb = opComplete;
387
Adaptation::Ecap::XactionRep::noteBodyProducerAborted(RefCount<BodyPipe> bp)
389
Must(proxyingVb == opOn);
391
theMaster->noteVbContentDone(false);
392
proxyingVb = opComplete;
396
Adaptation::Ecap::XactionRep::noteInitiatorAborted()
398
mustStop("initiator aborted");
401
// get content from the adapter and put it into the adapted pipe
403
Adaptation::Ecap::XactionRep::moveAbContent()
405
Must(proxyingAb == opOn);
406
const libecap::Area c = theMaster->abContent(0, libecap::nsize);
407
debugs(93,5, HERE << "up to " << c.size << " bytes");
408
if (c.size == 0 && abProductionFinished) { // no ab now and in the future
409
stopProducingFor(answer().body_pipe, abProductionAtEnd);
410
proxyingAb = opComplete;
411
debugs(93,5, HERE << "last adapted body data retrieved");
412
} else if (c.size > 0) {
413
if (const size_t used = answer().body_pipe->putMoreData(c.start, c.size))
414
theMaster->abContentShift(used);
419
Adaptation::Ecap::XactionRep::status() const
426
if (proxyingVb == opOn) {
427
const BodyPipePointer &vp = theVirginRep.raw().body_pipe;
430
if (vp != NULL && vp->stillConsuming(this)) {
432
buf.append(vp->status(), strlen(vp->status())); // XXX
437
if (proxyingAb == opOn) {
438
MessageRep *rep = dynamic_cast<MessageRep*>(theAnswerRep.get());
440
const BodyPipePointer &ap = rep->raw().body_pipe;
441
if (ap != NULL && ap->stillProducing(this)) {
442
buf.append(" Ab", 3);
443
buf.append(ap->status(), strlen(ap->status())); // XXX
445
buf.append(" A.", 3);
448
buf.Printf(" ecapx%d]", id);
452
return buf.content();