~ubuntu-branches/ubuntu/saucy/apt-cacher-ng/saucy

« back to all changes in this revision

Viewing changes to source/dljob.cc

  • Committer: Bazaar Package Importer
  • Author(s): Eduard Bloch
  • Date: 2011-08-07 17:53:21 UTC
  • mfrom: (24.2.14 sid)
  • Revision ID: james.westby@ubuntu.com-20110807175321-k80jrotzovldgu3c
* New upstream version
  + fixes various regressions
* removed dependency on libfuse2 (now used via dlopen, moved to Suggests)
* support new build-* targets in rules

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
#include "debug.h"
2
 
#include "meta.h"
3
 
#include "dljob.h"
4
 
#include "dlcon.h"
5
 
 
6
 
#include <cstdio>
7
 
 
8
 
using namespace MYSTD;
9
 
 
10
 
tDlJob::tDlJob(dlcon *p, tFileItemPtr pFi, const string & sHost,
11
 
                const string & sPath) :
12
 
                        m_pHostiVec(NULL),
13
 
                        m_pStorage(pFi), 
14
 
                        m_parent(p),
15
 
                        m_fileUri(sHost, sPath)
16
 
{
17
 
        _Init();
18
 
}
19
 
 
20
 
tDlJob::tDlJob(dlcon *p, tFileItemPtr pFi, acfg::tRepoData * pBackends,
21
 
                const MYSTD::string & sPath) :
22
 
        m_pHostiVec(pBackends), m_pStorage(pFi),
23
 
                        m_parent(p),
24
 
                        m_fileUri("", sPath)
25
 
{
26
 
        _Init();
27
 
}
28
 
 
29
 
tDlJob::~tDlJob()
30
 
{
31
 
        if(m_pStorage)
32
 
        {
33
 
                m_pStorage->DelDownloaderRef("567 Unknown download error occured");
34
 
                m_pStorage.reset();
35
 
        }
36
 
}
37
 
 
38
 
acfg::tRepoData::IHookHandler * tDlJob::GetConnStateObserver()
39
 
{
40
 
        return m_pHostiVec ? m_pHostiVec->m_pHooks : NULL;
41
 
}
42
 
 
43
 
void tDlJob::_Init()
44
 
{
45
 
        m_nRest=0;
46
 
        m_bReconnectASAP=false;
47
 
        bSuggestReconnect=false;
48
 
        bRequestEnqueued=false;
49
 
        m_DlState=STATE_GETHEADER;
50
 
        m_pBackend=NULL;
51
 
        
52
 
        
53
 
        if(m_pStorage)
54
 
                m_pStorage->AddDownloaderRef();
55
 
}
56
 
 
57
 
 
58
 
void tDlJob::UnregDownloader(const string & sError)
59
 
{
60
 
        if(! m_pStorage.get())
61
 
                return;
62
 
        m_pStorage->DelDownloaderRef(sError);
63
 
        m_pStorage.reset();
64
 
}
65
 
 
66
 
bool tDlJob::FindConfig()
67
 
{
68
 
        // using backends? Find one which is not blacklisted
69
 
        
70
 
        if(m_pHostiVec)
71
 
        {
72
 
                // keep the existing one if possible
73
 
                if(m_pBackend && m_parent->m_MirrorHostBlacklist.find(m_pBackend->sHost) 
74
 
                                        == m_parent->m_MirrorHostBlacklist.end())
75
 
                        return true;
76
 
                
77
 
                for(UINT i=0; i<m_pHostiVec->size(); i++)
78
 
                {
79
 
                        tHttpUrl *beTest = & m_pHostiVec->at(i);
80
 
                        if(beTest && m_parent->m_MirrorHostBlacklist.find(beTest->sHost) 
81
 
                                        == m_parent->m_MirrorHostBlacklist.end())
82
 
                        {
83
 
                                m_pBackend=beTest;
84
 
                                return true;
85
 
                        }
86
 
                }
87
 
                return false;
88
 
        }
89
 
        else
90
 
        {
91
 
                // not in blacklist -> OK
92
 
                return (m_parent->m_MirrorHostBlacklist.find(m_fileUri.sHost)
93
 
                                == m_parent->m_MirrorHostBlacklist.end());
94
 
        }
95
 
}
96
 
 
97
 
void tDlJob::BlacklistBackend()
98
 
{
99
 
        m_parent->m_MirrorHostBlacklist.insert(m_pBackend 
100
 
                        ? m_pBackend->sHost
101
 
                    : m_fileUri.sHost);
102
 
}
103
 
 
104
 
// needs connectedHost, blacklist, output buffer from the parent, proxy mode?
105
 
bool tDlJob::AppendRequest(string sForThisHostOnly, tSS &head, const tHttpUrl * pProxy)
106
 
{
107
 
        LOGSTART2("tDlJob::AppendRequest", "hostfilter: " << sForThisHostOnly);
108
 
        if(! m_pStorage)
109
 
                return R_ERROR_LOCAL;
110
 
        
111
 
        //const string sHost = GetPeerName();
112
 
        
113
 
        // just make sure that both are set or both unset
114
 
        if( (0==m_pBackend) != (0==m_pHostiVec))
115
 
                return false;
116
 
        
117
 
        // host filter was set and mismatched?
118
 
        if( ! pProxy && ! sForThisHostOnly.empty() && sForThisHostOnly != GetPeerName())
119
 
        {
120
 
                bSuggestReconnect=true;
121
 
                return false;
122
 
        }
123
 
 
124
 
        head << (m_pStorage->m_bHeadOnly ? "HEAD " : "GET ")
125
 
                        << (pProxy ? RemoteUri() : RemotePath()) << " HTTP/1.1\r\n"
126
 
        << acfg::agentheader
127
 
        << "Host: " << GetPeerName() << "\r\n";
128
 
        
129
 
        if (pProxy) // also add authorization if there is any
130
 
        {
131
 
                ldbg("using proxy");
132
 
                head << pProxy->sPath << "Proxy-Connection: keep-alive\r\n";
133
 
        }
134
 
 
135
 
        if (m_pStorage->m_nSizeSeen > 0)
136
 
        {
137
 
                bool bSetRange(false), bSetIfRange(false);
138
 
                
139
 
                lockguard g(m_pStorage.get());
140
 
                const header &pHead = m_pStorage->GetHeaderUnlocked();
141
 
                const char *p=pHead.h[header::LAST_MODIFIED];
142
 
                
143
 
                if (m_pStorage->m_bCheckFreshness)
144
 
                {
145
 
                        if (p)
146
 
                        {
147
 
                                bSetIfRange=true;
148
 
                                bSetRange=true;
149
 
                        }
150
 
                        // else no date available, cannot rely on anything
151
 
                }
152
 
                else
153
 
                { 
154
 
                        /////////////// this was protection against broken stuff in the pool ////
155
 
                        // static file type, date does not matter. check known content length, not risking "range not satisfiable" result
156
 
                        //
157
 
                        //off_t nContLen=atol(h.get("Content-Length"));
158
 
                        //if (nContLen>0 && j->m_pStorage->m_nFileSize < nContLen)
159
 
                        bSetRange=true;
160
 
                }
161
 
 
162
 
        /* use APT's trick - set the starting position one byte lower -
163
 
         * this way the server has to send at least one byte if the assumed
164
 
         * position is correct, and we never get a 416 error (one byte
165
 
         * waste is acceptable).
166
 
         * */
167
 
                if (bSetRange)
168
 
                        head << "Range: bytes=" << (m_pStorage->m_nSizeSeen - 1) << "-\r\n";
169
 
 
170
 
                if (bSetIfRange)
171
 
                        head << "If-Range: " << p << "\r\n";
172
 
        }
173
 
        
174
 
        if(m_pStorage->m_bCheckFreshness)
175
 
                head << "Cache-Control: no-store,no-cache,max-age=0\r\n";
176
 
 
177
 
        if(acfg::exporigin && !m_parent->m_sXForwardedFor.empty())
178
 
                head << "X-Forwarded-For: " << m_parent->m_sXForwardedFor << "\r\n";
179
 
        
180
 
        if(!acfg::requestapx.empty())
181
 
                head.addEscaped(acfg::requestapx.c_str());
182
 
 
183
 
        head << "Accept: */*\r\nConnection: Keep-Alive\r\n\r\n";
184
 
                        
185
 
        ldbg("Request cooked, buffer contents: " << head);
186
 
        
187
 
        bRequestEnqueued=true;
188
 
        return true;
189
 
}
190
 
 
191
 
inline string tDlJob::RemoteUri()
192
 
{
193
 
        return (m_pBackend ?  m_pBackend->ToURI() + m_fileUri.sPath
194
 
                                : m_fileUri.ToURI());
195
 
}
196
 
 
197
 
inline string tDlJob::RemotePath()
198
 
{
199
 
        if(m_pBackend)
200
 
                return m_pBackend->sPath + m_fileUri.sPath;
201
 
        
202
 
        return m_fileUri.sPath;
203
 
}
204
 
 
205
 
tDlJob::tDlResult tDlJob::NewDataHandler(acbuf & inBuf, string &sErrorRet)
206
 
{
207
 
        LOGSTART("tDlJob::NewDataHandler");
208
 
        while (true)
209
 
        {
210
 
                off_t nToStore = min((off_t)inBuf.size(), m_nRest);
211
 
                ldbg("To store: " <<nToStore);
212
 
                if (0==nToStore)
213
 
                        break;
214
 
 
215
 
                if (!m_pStorage->StoreFileData(inBuf.rptr(), nToStore))
216
 
                        return R_ERROR_LOCAL;
217
 
 
218
 
                m_nRest-=nToStore;
219
 
                inBuf.drop(nToStore);
220
 
        }
221
 
 
222
 
        ldbg("Rest: " << m_nRest );
223
 
 
224
 
        if (m_nRest==0)
225
 
                m_DlState = (STATE_GETDATA==m_DlState) ? STATE_FINISHJOB
226
 
                                : STATE_GETCHUNKHEAD;
227
 
        else
228
 
                return R_MOREINPUT; // will come back
229
 
 
230
 
        return R_NEXTSTATE;
231
 
}
232
 
 
233
 
/*!
234
 
 * 
235
 
 * Process incoming traffic and write it down to disk/downloaders.
236
 
 */
237
 
tDlJob::tDlResult tDlJob::ProcessIncomming(acbuf & inBuf, string & sErrorRet)
238
 
{
239
 
        LOGSTART("tDlJob::ProcessIncomming");
240
 
        if(! m_pStorage)
241
 
                return R_ERROR_LOCAL;
242
 
        
243
 
        Reswitch:
244
 
        ldbg("switch: " << m_DlState);
245
 
        
246
 
        switch (m_DlState)
247
 
        {
248
 
        
249
 
        case (STATE_GETHEADER):
250
 
        {
251
 
                ldbg("STATE_GETHEADER");
252
 
                header h;
253
 
                if(inBuf.size()==0)
254
 
                        return R_MOREINPUT;
255
 
                dbgline;
256
 
                int l=h.LoadFromBuf(inBuf.rptr(), inBuf.size());
257
 
                if (0==l)
258
 
                        return R_MOREINPUT;
259
 
                else if(l>0)
260
 
                {
261
 
                        dbgline;
262
 
                        ldbg("contents: " << std::string(inBuf.rptr(), l));
263
 
                        inBuf.drop(l);
264
 
                        if (h.type!=header::ANSWER)
265
 
                        {
266
 
                                sErrorRet="500 Unexpected response type";
267
 
                                return R_ERROR_REMOTE;
268
 
                        }
269
 
                }
270
 
                else
271
 
                {
272
 
                        dbgline;
273
 
                        sErrorRet="500 Invalid header";
274
 
            return R_ERROR_REMOTE;
275
 
                }
276
 
                
277
 
                ldbg("GOT, parsed: " << h.frontLine);
278
 
 
279
 
                const char *p=h.h[header::CONNECTION] ? h.h[header::CONNECTION] : h.h[header::PROXY_CONNECTION];  
280
 
                if(p && 0==strcasecmp(p, "close") )
281
 
                {
282
 
                        ldbg("Peer wants to close connection after request");
283
 
                        m_bReconnectASAP=true;
284
 
                }
285
 
                if(m_pStorage->m_bHeadOnly)
286
 
                        m_DlState=STATE_FINISHJOB;
287
 
                else if ( NULL != (p=h.h[header::TRANSFER_ENCODING])
288
 
                                && 0==strcasecmp(p, "chunked"))
289
 
                        m_DlState=STATE_GETCHUNKHEAD;
290
 
                else
291
 
                {
292
 
                        dbgline;
293
 
                        p=h.h[header::CONTENT_LENGTH];
294
 
                        if (!p)
295
 
            {
296
 
                                sErrorRet="500 Missing Content-Length";
297
 
                return R_ERROR_REMOTE;
298
 
            }
299
 
                        // may support such endless stuff in the future but that's too unreliable for now
300
 
                        m_nRest=atoofft(p);
301
 
                        m_DlState=STATE_GETDATA;
302
 
                }
303
 
 
304
 
        h.set(header::XORIG, RemoteUri());
305
 
 
306
 
        if ( ! m_pStorage->DownloadStartedStoreHeader(h, inBuf.rptr()))
307
 
        {
308
 
            ldbg("Item dl'ed by others or in error state --> drop it, reconnect");
309
 
            m_DlState=STATE_GETDATA; // XXX maybe introduce a new error state, this is a kludge
310
 
            return R_SKIPITEM;
311
 
        }
312
 
                goto Reswitch;
313
 
 
314
 
        }
315
 
        case (STATE_GETDATA_CHUNKED): // fall through, just send it back to header parser hereafter
316
 
                ldbg("STATE_GETDATA_CHUNKED (to STATE_GETDATA)");
317
 
        case (STATE_GETDATA):
318
 
        {
319
 
                ldbg("STATE_GETDATA");
320
 
                tDlResult res = NewDataHandler(inBuf, sErrorRet);
321
 
                if(res != R_NEXTSTATE)
322
 
                        return res;
323
 
                goto Reswitch;
324
 
        }
325
 
        case (STATE_FINISHJOB):
326
 
        {
327
 
                ldbg("STATE_FINISHJOB");
328
 
                m_DlState=STATE_GETHEADER;
329
 
                m_pStorage->StoreFileData(NULL, 0);
330
 
        return m_bReconnectASAP ? R_SKIPITEM : R_DONE;
331
 
        }
332
 
        case (STATE_GETCHUNKHEAD):
333
 
        {
334
 
                ldbg("STATE_GETCHUNKHEAD");
335
 
                const char *p=inBuf.c_str();
336
 
                const char *e=strstr(p, "\r\n");
337
 
                if(e==p)
338
 
                { // came back from reading, drop remaining junk? 
339
 
                        inBuf.drop(2);
340
 
                        p+=2;
341
 
                        e=strstr(p, "\r\n");
342
 
                }
343
 
                dbgline;
344
 
                if(!e)
345
 
                {
346
 
                        inBuf.move();
347
 
                        return R_MOREINPUT; // get more data
348
 
                }
349
 
                unsigned int len(0);
350
 
                int n = sscanf(p, "%x", &len); 
351
 
                
352
 
                unsigned int nCheadSize=e-p+2;
353
 
                if(n==1 && len>0)
354
 
                {
355
 
                        ldbg("ok, skip " << nCheadSize <<" bytes, " <<p);
356
 
                        inBuf.drop(nCheadSize);
357
 
                        m_nRest=len;
358
 
                        m_DlState=STATE_GETDATA_CHUNKED;
359
 
                }
360
 
                else if(n==1)
361
 
                {
362
 
                        // skip the additional \r\n of the null-sized part here as well
363
 
                        ldbg("looks like the end, but needs to get everything into buffer to change the state reliably");
364
 
                        if(inBuf.size() < nCheadSize+2 )
365
 
                        {
366
 
                                inBuf.move();
367
 
                                return R_MOREINPUT;
368
 
                        }
369
 
                        if( ! (e[2]=='\r' && e[3]=='\n'))
370
 
                        {
371
 
                                aclog::err(m_pStorage->m_sKey+" -- error in chunk format detected");
372
 
                                return R_ERROR_REMOTE;
373
 
                        }
374
 
                        
375
 
                        inBuf.drop(nCheadSize+2);
376
 
                        m_DlState=STATE_FINISHJOB;
377
 
                }
378
 
                else
379
 
                        return R_ERROR_REMOTE; // that's bad...
380
 
                goto Reswitch;
381
 
                break;
382
 
        }
383
 
        
384
 
        }
385
 
        return R_ERROR_REMOTE;
386
 
}