~clint-fewbar/ubuntu/precise/squid3/ignore-sighup-early

« back to all changes in this revision

Viewing changes to src/adaptation/ecap/XactionRep.cc

  • Committer: Bazaar Package Importer
  • Author(s): Luigi Gangitano
  • Date: 2010-05-04 11:15:49 UTC
  • mfrom: (1.3.1 upstream)
  • mto: (20.3.1 squeeze) (21.2.1 sid)
  • mto: This revision was merged to the branch mainline in revision 21.
  • Revision ID: james.westby@ubuntu.com-20100504111549-1apjh2g5sndki4te
Tags: upstream-3.1.3
ImportĀ upstreamĀ versionĀ 3.1.3

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
#include "squid.h"
 
2
#include <libecap/common/area.h>
 
3
#include <libecap/common/delay.h>
 
4
#include <libecap/adapter/xaction.h>
 
5
#include "TextException.h"
 
6
#include "HttpRequest.h"
 
7
#include "HttpReply.h"
 
8
#include "SquidTime.h"
 
9
#include "adaptation/ecap/XactionRep.h"
 
10
 
 
11
CBDATA_NAMESPACED_CLASS_INIT(Adaptation::Ecap::XactionRep, XactionRep);
 
12
 
 
13
 
 
14
Adaptation::Ecap::XactionRep::XactionRep(Adaptation::Initiator *anInitiator,
 
15
        HttpMsg *virginHeader, HttpRequest *virginCause,
 
16
        const Adaptation::ServicePointer &aService):
 
17
        AsyncJob("Adaptation::Ecap::XactionRep"),
 
18
        Adaptation::Initiate("Adaptation::Ecap::XactionRep", anInitiator),
 
19
        theService(aService),
 
20
        theVirginRep(virginHeader), theCauseRep(NULL),
 
21
        proxyingVb(opUndecided), proxyingAb(opUndecided),
 
22
        adaptHistoryId(-1),
 
23
        canAccessVb(false),
 
24
        abProductionFinished(false), abProductionAtEnd(false)
 
25
{
 
26
    if (virginCause)
 
27
        theCauseRep = new MessageRep(virginCause);
 
28
}
 
29
 
 
30
Adaptation::Ecap::XactionRep::~XactionRep()
 
31
{
 
32
    assert(!theMaster);
 
33
    delete theCauseRep;
 
34
    theAnswerRep.reset();
 
35
}
 
36
 
 
37
void
 
38
Adaptation::Ecap::XactionRep::master(const AdapterXaction &x)
 
39
{
 
40
    Must(!theMaster);
 
41
    Must(x != NULL);
 
42
    theMaster = x;
 
43
}
 
44
 
 
45
Adaptation::Service &
 
46
Adaptation::Ecap::XactionRep::service()
 
47
{
 
48
    Must(theService != NULL);
 
49
    return *theService;
 
50
}
 
51
 
 
52
void
 
53
Adaptation::Ecap::XactionRep::start()
 
54
{
 
55
    Must(theMaster);
 
56
 
 
57
    if (theVirginRep.raw().body_pipe != NULL)
 
58
        canAccessVb = true; /// assumes nobody is consuming; \todo check
 
59
    else
 
60
        proxyingVb = opNever;
 
61
 
 
62
    const HttpRequest *request = dynamic_cast<const HttpRequest*> (theCauseRep ?
 
63
                                 theCauseRep->raw().header : theVirginRep.raw().header);
 
64
    Must(request);
 
65
    Adaptation::History::Pointer ah = request->adaptLogHistory();
 
66
    if (ah != NULL) {
 
67
        // retrying=false because ecap never retries transactions
 
68
        adaptHistoryId = ah->recordXactStart(service().cfg().key, current_time, false);
 
69
    }
 
70
 
 
71
    theMaster->start();
 
72
}
 
73
 
 
74
void
 
75
Adaptation::Ecap::XactionRep::swanSong()
 
76
{
 
77
    // clear body_pipes, if any
 
78
    // this code does not maintain proxying* and canAccessVb states; should it?
 
79
 
 
80
    if (theAnswerRep != NULL) {
 
81
        BodyPipe::Pointer body_pipe = answer().body_pipe;
 
82
        if (body_pipe != NULL) {
 
83
            Must(body_pipe->stillProducing(this));
 
84
            stopProducingFor(body_pipe, false);
 
85
        }
 
86
    }
 
87
 
 
88
    {
 
89
        BodyPipe::Pointer body_pipe = theVirginRep.raw().body_pipe;
 
90
        if (body_pipe != NULL) {
 
91
            Must(body_pipe->stillConsuming(this));
 
92
            stopConsumingFrom(body_pipe);
 
93
        }
 
94
    }
 
95
 
 
96
    terminateMaster();
 
97
 
 
98
    const HttpRequest *request = dynamic_cast<const HttpRequest*>(theCauseRep ?
 
99
                                 theCauseRep->raw().header : theVirginRep.raw().header);
 
100
    Must(request);
 
101
    Adaptation::History::Pointer ah = request->adaptLogHistory();
 
102
    if (ah != NULL && adaptHistoryId >= 0)
 
103
        ah->recordXactFinish(adaptHistoryId);
 
104
 
 
105
    Adaptation::Initiate::swanSong();
 
106
}
 
107
 
 
108
libecap::Message &
 
109
Adaptation::Ecap::XactionRep::virgin()
 
110
{
 
111
    return theVirginRep;
 
112
}
 
113
 
 
114
const libecap::Message &
 
115
Adaptation::Ecap::XactionRep::cause()
 
116
{
 
117
    Must(theCauseRep != NULL);
 
118
    return *theCauseRep;
 
119
}
 
120
 
 
121
libecap::Message &
 
122
Adaptation::Ecap::XactionRep::adapted()
 
123
{
 
124
    Must(theAnswerRep != NULL);
 
125
    return *theAnswerRep;
 
126
}
 
127
 
 
128
Adaptation::Message &
 
129
Adaptation::Ecap::XactionRep::answer()
 
130
{
 
131
    MessageRep *rep = dynamic_cast<MessageRep*>(theAnswerRep.get());
 
132
    Must(rep);
 
133
    return rep->raw();
 
134
}
 
135
 
 
136
void
 
137
Adaptation::Ecap::XactionRep::terminateMaster()
 
138
{
 
139
    if (theMaster) {
 
140
        AdapterXaction x = theMaster;
 
141
        theMaster.reset();
 
142
        x->stop();
 
143
    }
 
144
}
 
145
 
 
146
bool
 
147
Adaptation::Ecap::XactionRep::doneAll() const
 
148
{
 
149
    return proxyingVb >= opComplete && proxyingAb >= opComplete &&
 
150
           Adaptation::Initiate::doneAll();
 
151
}
 
152
 
 
153
// stops receiving virgin and enables auto-consumption
 
154
void
 
155
Adaptation::Ecap::XactionRep::dropVirgin(const char *reason)
 
156
{
 
157
    debugs(93,4, HERE << "because " << reason << "; status:" << status());
 
158
    Must(proxyingVb = opOn);
 
159
 
 
160
    BodyPipePointer &p = theVirginRep.raw().body_pipe;
 
161
    Must(p != NULL);
 
162
    Must(p->stillConsuming(this));
 
163
    stopConsumingFrom(p);
 
164
    p->enableAutoConsumption();
 
165
    proxyingVb = opComplete;
 
166
    canAccessVb = false;
 
167
 
 
168
    // called from adapter handler so does not inform adapter
 
169
}
 
170
 
 
171
void
 
172
Adaptation::Ecap::XactionRep::useVirgin()
 
173
{
 
174
    debugs(93,3, HERE << status());
 
175
    Must(proxyingAb == opUndecided);
 
176
    proxyingAb = opNever;
 
177
 
 
178
    BodyPipePointer &vbody_pipe = theVirginRep.raw().body_pipe;
 
179
 
 
180
    HttpMsg *clone = theVirginRep.raw().header->clone();
 
181
    // check that clone() copies the pipe so that we do not have to
 
182
    Must(!vbody_pipe == !clone->body_pipe);
 
183
 
 
184
    if (proxyingVb == opOn) {
 
185
        Must(vbody_pipe->stillConsuming(this));
 
186
        // if libecap consumed, we cannot shortcircuit
 
187
        Must(!vbody_pipe->consumedSize());
 
188
        stopConsumingFrom(vbody_pipe);
 
189
        canAccessVb = false;
 
190
        proxyingVb = opComplete;
 
191
    } else if (proxyingVb == opUndecided) {
 
192
        vbody_pipe = NULL; // it is not our pipe anymore
 
193
        proxyingVb = opNever;
 
194
    }
 
195
 
 
196
    sendAnswer(clone);
 
197
    Must(done());
 
198
}
 
199
 
 
200
void
 
201
Adaptation::Ecap::XactionRep::useAdapted(const libecap::shared_ptr<libecap::Message> &m)
 
202
{
 
203
    debugs(93,3, HERE << status());
 
204
    Must(m);
 
205
    theAnswerRep = m;
 
206
    Must(proxyingAb == opUndecided);
 
207
 
 
208
    HttpMsg *msg = answer().header;
 
209
    if (!theAnswerRep->body()) { // final, bodyless answer
 
210
        proxyingAb = opNever;
 
211
        sendAnswer(msg);
 
212
    } else { // got answer headers but need to handle body
 
213
        proxyingAb = opOn;
 
214
        Must(!msg->body_pipe); // only host can set body pipes
 
215
        MessageRep *rep = dynamic_cast<MessageRep*>(theAnswerRep.get());
 
216
        Must(rep);
 
217
        rep->tieBody(this); // sets us as a producer
 
218
        Must(msg->body_pipe != NULL); // check tieBody
 
219
 
 
220
        sendAnswer(msg);
 
221
 
 
222
        debugs(93,4, HERE << "adapter will produce body" << status());
 
223
        theMaster->abMake(); // libecap will produce
 
224
    }
 
225
}
 
226
 
 
227
void
 
228
Adaptation::Ecap::XactionRep::vbDiscard()
 
229
{
 
230
    Must(proxyingVb == opUndecided);
 
231
    // if adapter does not need vb, we do not need to send it
 
232
    dropVirgin("vbDiscard");
 
233
    Must(proxyingVb == opNever);
 
234
}
 
235
 
 
236
void
 
237
Adaptation::Ecap::XactionRep::vbMake()
 
238
{
 
239
    Must(proxyingVb == opUndecided);
 
240
    BodyPipePointer &p = theVirginRep.raw().body_pipe;
 
241
    Must(p != NULL);
 
242
    Must(p->setConsumerIfNotLate(this)); // to make vb, we must receive vb
 
243
    proxyingVb = opOn;
 
244
}
 
245
 
 
246
void
 
247
Adaptation::Ecap::XactionRep::vbStopMaking()
 
248
{
 
249
    // if adapter does not need vb, we do not need to receive it
 
250
    if (proxyingVb == opOn)
 
251
        dropVirgin("vbStopMaking");
 
252
    Must(proxyingVb == opComplete);
 
253
}
 
254
 
 
255
void
 
256
Adaptation::Ecap::XactionRep::vbMakeMore()
 
257
{
 
258
    Must(proxyingVb == opOn); // cannot make more if done proxying
 
259
    // we cannot guarantee more vb, but we can check that there is a chance
 
260
    Must(!theVirginRep.raw().body_pipe->exhausted());
 
261
}
 
262
 
 
263
libecap::Area
 
264
Adaptation::Ecap::XactionRep::vbContent(libecap::size_type o, libecap::size_type s)
 
265
{
 
266
    Must(canAccessVb);
 
267
    // We may not be proxyingVb yet. It should be OK, but see vbContentShift().
 
268
 
 
269
    const BodyPipePointer &p = theVirginRep.raw().body_pipe;
 
270
    Must(p != NULL);
 
271
 
 
272
    // TODO: make MemBuf use size_t?
 
273
    const size_t haveSize = static_cast<size_t>(p->buf().contentSize());
 
274
 
 
275
    // convert to Squid types; XXX: check for overflow
 
276
    const uint64_t offset = static_cast<uint64_t>(o);
 
277
    Must(offset <= haveSize); // equal iff at the end of content
 
278
 
 
279
    // nsize means no size limit: all content starting from offset
 
280
    const size_t size = s == libecap::nsize ?
 
281
                        haveSize - offset : static_cast<size_t>(s);
 
282
 
 
283
    // XXX: optimize by making theBody a shared_ptr (see Area::FromTemp*() src)
 
284
    return libecap::Area::FromTempBuffer(p->buf().content() + offset,
 
285
                                         min(static_cast<size_t>(haveSize - offset), size));
 
286
}
 
287
 
 
288
void
 
289
Adaptation::Ecap::XactionRep::vbContentShift(libecap::size_type n)
 
290
{
 
291
    Must(canAccessVb);
 
292
    // We may not be proxyingVb yet. It should be OK now, but if BodyPipe
 
293
    // consume() requirements change, we would have to return empty vbContent
 
294
    // until the adapter registers as a consumer
 
295
 
 
296
    BodyPipePointer &p = theVirginRep.raw().body_pipe;
 
297
    Must(p != NULL);
 
298
    const size_t size = static_cast<size_t>(n); // XXX: check for overflow
 
299
    const size_t haveSize = static_cast<size_t>(p->buf().contentSize()); // TODO: make MemBuf use size_t?
 
300
    p->consume(min(size, haveSize));
 
301
}
 
302
 
 
303
void
 
304
Adaptation::Ecap::XactionRep::noteAbContentDone(bool atEnd)
 
305
{
 
306
    Must(proxyingAb == opOn && !abProductionFinished);
 
307
    abProductionFinished = true;
 
308
    abProductionAtEnd = atEnd; // store until ready to stop producing ourselves
 
309
    debugs(93,5, HERE << "adapted body production ended");
 
310
    moveAbContent();
 
311
}
 
312
 
 
313
void
 
314
Adaptation::Ecap::XactionRep::noteAbContentAvailable()
 
315
{
 
316
    Must(proxyingAb == opOn && !abProductionFinished);
 
317
    moveAbContent();
 
318
}
 
319
 
 
320
#if 0 /* XXX: implement */
 
321
void
 
322
Adaptation::Ecap::XactionRep::setAdaptedBodySize(const libecap::BodySize &size)
 
323
{
 
324
    Must(answer().body_pipe != NULL);
 
325
    if (size.known())
 
326
        answer().body_pipe->setBodySize(size.value());
 
327
    // else the piped body size is unknown by default
 
328
}
 
329
#endif
 
330
 
 
331
void
 
332
Adaptation::Ecap::XactionRep::adaptationDelayed(const libecap::Delay &d)
 
333
{
 
334
    debugs(93,3, HERE << "adapter needs time: " <<
 
335
           d.state << '/' << d.progress);
 
336
    // XXX: set timeout?
 
337
}
 
338
 
 
339
void
 
340
Adaptation::Ecap::XactionRep::adaptationAborted()
 
341
{
 
342
    tellQueryAborted(true); // should eCAP support retries?
 
343
    mustStop("adaptationAborted");
 
344
}
 
345
 
 
346
bool
 
347
Adaptation::Ecap::XactionRep::callable() const
 
348
{
 
349
    return !done();
 
350
}
 
351
 
 
352
void
 
353
Adaptation::Ecap::XactionRep::noteMoreBodySpaceAvailable(RefCount<BodyPipe> bp)
 
354
{
 
355
    Must(proxyingAb == opOn);
 
356
    moveAbContent();
 
357
}
 
358
 
 
359
void
 
360
Adaptation::Ecap::XactionRep::noteBodyConsumerAborted(RefCount<BodyPipe> bp)
 
361
{
 
362
    Must(proxyingAb == opOn);
 
363
    stopProducingFor(answer().body_pipe, false);
 
364
    Must(theMaster);
 
365
    theMaster->abStopMaking();
 
366
    proxyingAb = opComplete;
 
367
}
 
368
 
 
369
void
 
370
Adaptation::Ecap::XactionRep::noteMoreBodyDataAvailable(RefCount<BodyPipe> bp)
 
371
{
 
372
    Must(proxyingVb == opOn);
 
373
    Must(theMaster);
 
374
    theMaster->noteVbContentAvailable();
 
375
}
 
376
 
 
377
void
 
378
Adaptation::Ecap::XactionRep::noteBodyProductionEnded(RefCount<BodyPipe> bp)
 
379
{
 
380
    Must(proxyingVb == opOn);
 
381
    Must(theMaster);
 
382
    theMaster->noteVbContentDone(true);
 
383
    proxyingVb = opComplete;
 
384
}
 
385
 
 
386
void
 
387
Adaptation::Ecap::XactionRep::noteBodyProducerAborted(RefCount<BodyPipe> bp)
 
388
{
 
389
    Must(proxyingVb == opOn);
 
390
    Must(theMaster);
 
391
    theMaster->noteVbContentDone(false);
 
392
    proxyingVb = opComplete;
 
393
}
 
394
 
 
395
void
 
396
Adaptation::Ecap::XactionRep::noteInitiatorAborted()
 
397
{
 
398
    mustStop("initiator aborted");
 
399
}
 
400
 
 
401
// get content from the adapter and put it into the adapted pipe
 
402
void
 
403
Adaptation::Ecap::XactionRep::moveAbContent()
 
404
{
 
405
    Must(proxyingAb == opOn);
 
406
    const libecap::Area c = theMaster->abContent(0, libecap::nsize);
 
407
    debugs(93,5, HERE << "up to " << c.size << " bytes");
 
408
    if (c.size == 0 && abProductionFinished) { // no ab now and in the future
 
409
        stopProducingFor(answer().body_pipe, abProductionAtEnd);
 
410
        proxyingAb = opComplete;
 
411
        debugs(93,5, HERE << "last adapted body data retrieved");
 
412
    } else if (c.size > 0) {
 
413
        if (const size_t used = answer().body_pipe->putMoreData(c.start, c.size))
 
414
            theMaster->abContentShift(used);
 
415
    }
 
416
}
 
417
 
 
418
const char *
 
419
Adaptation::Ecap::XactionRep::status() const
 
420
{
 
421
    static MemBuf buf;
 
422
    buf.reset();
 
423
 
 
424
    buf.append(" [", 2);
 
425
 
 
426
    if (proxyingVb == opOn) {
 
427
        const BodyPipePointer &vp = theVirginRep.raw().body_pipe;
 
428
        if (!canAccessVb)
 
429
            buf.append("x", 1);
 
430
        if (vp != NULL && vp->stillConsuming(this)) {
 
431
            buf.append("Vb", 2);
 
432
            buf.append(vp->status(), strlen(vp->status())); // XXX
 
433
        } else
 
434
            buf.append("V.", 2);
 
435
    }
 
436
 
 
437
    if (proxyingAb == opOn) {
 
438
        MessageRep *rep = dynamic_cast<MessageRep*>(theAnswerRep.get());
 
439
        Must(rep);
 
440
        const BodyPipePointer &ap = rep->raw().body_pipe;
 
441
        if (ap != NULL && ap->stillProducing(this)) {
 
442
            buf.append(" Ab", 3);
 
443
            buf.append(ap->status(), strlen(ap->status())); // XXX
 
444
        } else
 
445
            buf.append(" A.", 3);
 
446
    }
 
447
 
 
448
    buf.Printf(" ecapx%d]", id);
 
449
 
 
450
    buf.terminate();
 
451
 
 
452
    return buf.content();
 
453
}