10
tDlJob::tDlJob(dlcon *p, tFileItemPtr pFi, const string & sHost,
11
const string & sPath) :
15
m_fileUri(sHost, sPath)
20
tDlJob::tDlJob(dlcon *p, tFileItemPtr pFi, acfg::tRepoData * pBackends,
21
const MYSTD::string & sPath) :
22
m_pHostiVec(pBackends), m_pStorage(pFi),
33
m_pStorage->DelDownloaderRef("567 Unknown download error occured");
38
acfg::tRepoData::IHookHandler * tDlJob::GetConnStateObserver()
40
return m_pHostiVec ? m_pHostiVec->m_pHooks : NULL;
46
m_bReconnectASAP=false;
47
bSuggestReconnect=false;
48
bRequestEnqueued=false;
49
m_DlState=STATE_GETHEADER;
54
m_pStorage->AddDownloaderRef();
58
void tDlJob::UnregDownloader(const string & sError)
60
if(! m_pStorage.get())
62
m_pStorage->DelDownloaderRef(sError);
66
bool tDlJob::FindConfig()
68
// using backends? Find one which is not blacklisted
72
// keep the existing one if possible
73
if(m_pBackend && m_parent->m_MirrorHostBlacklist.find(m_pBackend->sHost)
74
== m_parent->m_MirrorHostBlacklist.end())
77
for(UINT i=0; i<m_pHostiVec->size(); i++)
79
tHttpUrl *beTest = & m_pHostiVec->at(i);
80
if(beTest && m_parent->m_MirrorHostBlacklist.find(beTest->sHost)
81
== m_parent->m_MirrorHostBlacklist.end())
91
// not in blacklist -> OK
92
return (m_parent->m_MirrorHostBlacklist.find(m_fileUri.sHost)
93
== m_parent->m_MirrorHostBlacklist.end());
97
void tDlJob::BlacklistBackend()
99
m_parent->m_MirrorHostBlacklist.insert(m_pBackend
104
// needs connectedHost, blacklist, output buffer from the parent, proxy mode?
105
bool tDlJob::AppendRequest(string sForThisHostOnly, tSS &head, const tHttpUrl * pProxy)
107
LOGSTART2("tDlJob::AppendRequest", "hostfilter: " << sForThisHostOnly);
109
return R_ERROR_LOCAL;
111
//const string sHost = GetPeerName();
113
// just make sure that both are set or both unset
114
if( (0==m_pBackend) != (0==m_pHostiVec))
117
// host filter was set and mismatched?
118
if( ! pProxy && ! sForThisHostOnly.empty() && sForThisHostOnly != GetPeerName())
120
bSuggestReconnect=true;
124
head << (m_pStorage->m_bHeadOnly ? "HEAD " : "GET ")
125
<< (pProxy ? RemoteUri() : RemotePath()) << " HTTP/1.1\r\n"
127
<< "Host: " << GetPeerName() << "\r\n";
129
if (pProxy) // also add authorization if there is any
132
head << pProxy->sPath << "Proxy-Connection: keep-alive\r\n";
135
if (m_pStorage->m_nSizeSeen > 0)
137
bool bSetRange(false), bSetIfRange(false);
139
lockguard g(m_pStorage.get());
140
const header &pHead = m_pStorage->GetHeaderUnlocked();
141
const char *p=pHead.h[header::LAST_MODIFIED];
143
if (m_pStorage->m_bCheckFreshness)
150
// else no date available, cannot rely on anything
154
/////////////// this was protection against broken stuff in the pool ////
155
// static file type, date does not matter. check known content length, not risking "range not satisfiable" result
157
//off_t nContLen=atol(h.get("Content-Length"));
158
//if (nContLen>0 && j->m_pStorage->m_nFileSize < nContLen)
162
/* use APT's trick - set the starting position one byte lower -
163
* this way the server has to send at least one byte if the assumed
164
* position is correct, and we never get a 416 error (one byte
165
* waste is acceptable).
168
head << "Range: bytes=" << (m_pStorage->m_nSizeSeen - 1) << "-\r\n";
171
head << "If-Range: " << p << "\r\n";
174
if(m_pStorage->m_bCheckFreshness)
175
head << "Cache-Control: no-store,no-cache,max-age=0\r\n";
177
if(acfg::exporigin && !m_parent->m_sXForwardedFor.empty())
178
head << "X-Forwarded-For: " << m_parent->m_sXForwardedFor << "\r\n";
180
if(!acfg::requestapx.empty())
181
head.addEscaped(acfg::requestapx.c_str());
183
head << "Accept: */*\r\nConnection: Keep-Alive\r\n\r\n";
185
ldbg("Request cooked, buffer contents: " << head);
187
bRequestEnqueued=true;
191
inline string tDlJob::RemoteUri()
193
return (m_pBackend ? m_pBackend->ToURI() + m_fileUri.sPath
194
: m_fileUri.ToURI());
197
inline string tDlJob::RemotePath()
200
return m_pBackend->sPath + m_fileUri.sPath;
202
return m_fileUri.sPath;
205
tDlJob::tDlResult tDlJob::NewDataHandler(acbuf & inBuf, string &sErrorRet)
207
LOGSTART("tDlJob::NewDataHandler");
210
off_t nToStore = min((off_t)inBuf.size(), m_nRest);
211
ldbg("To store: " <<nToStore);
215
if (!m_pStorage->StoreFileData(inBuf.rptr(), nToStore))
216
return R_ERROR_LOCAL;
219
inBuf.drop(nToStore);
222
ldbg("Rest: " << m_nRest );
225
m_DlState = (STATE_GETDATA==m_DlState) ? STATE_FINISHJOB
226
: STATE_GETCHUNKHEAD;
228
return R_MOREINPUT; // will come back
235
* Process incoming traffic and write it down to disk/downloaders.
237
tDlJob::tDlResult tDlJob::ProcessIncomming(acbuf & inBuf, string & sErrorRet)
239
LOGSTART("tDlJob::ProcessIncomming");
241
return R_ERROR_LOCAL;
244
ldbg("switch: " << m_DlState);
249
case (STATE_GETHEADER):
251
ldbg("STATE_GETHEADER");
256
int l=h.LoadFromBuf(inBuf.rptr(), inBuf.size());
262
ldbg("contents: " << std::string(inBuf.rptr(), l));
264
if (h.type!=header::ANSWER)
266
sErrorRet="500 Unexpected response type";
267
return R_ERROR_REMOTE;
273
sErrorRet="500 Invalid header";
274
return R_ERROR_REMOTE;
277
ldbg("GOT, parsed: " << h.frontLine);
279
const char *p=h.h[header::CONNECTION] ? h.h[header::CONNECTION] : h.h[header::PROXY_CONNECTION];
280
if(p && 0==strcasecmp(p, "close") )
282
ldbg("Peer wants to close connection after request");
283
m_bReconnectASAP=true;
285
if(m_pStorage->m_bHeadOnly)
286
m_DlState=STATE_FINISHJOB;
287
else if ( NULL != (p=h.h[header::TRANSFER_ENCODING])
288
&& 0==strcasecmp(p, "chunked"))
289
m_DlState=STATE_GETCHUNKHEAD;
293
p=h.h[header::CONTENT_LENGTH];
296
sErrorRet="500 Missing Content-Length";
297
return R_ERROR_REMOTE;
299
// may support such endless stuff in the future but that's too unreliable for now
301
m_DlState=STATE_GETDATA;
304
h.set(header::XORIG, RemoteUri());
306
if ( ! m_pStorage->DownloadStartedStoreHeader(h, inBuf.rptr()))
308
ldbg("Item dl'ed by others or in error state --> drop it, reconnect");
309
m_DlState=STATE_GETDATA; // XXX maybe introduce a new error state, this is a kludge
315
case (STATE_GETDATA_CHUNKED): // fall through, just send it back to header parser hereafter
316
ldbg("STATE_GETDATA_CHUNKED (to STATE_GETDATA)");
317
case (STATE_GETDATA):
319
ldbg("STATE_GETDATA");
320
tDlResult res = NewDataHandler(inBuf, sErrorRet);
321
if(res != R_NEXTSTATE)
325
case (STATE_FINISHJOB):
327
ldbg("STATE_FINISHJOB");
328
m_DlState=STATE_GETHEADER;
329
m_pStorage->StoreFileData(NULL, 0);
330
return m_bReconnectASAP ? R_SKIPITEM : R_DONE;
332
case (STATE_GETCHUNKHEAD):
334
ldbg("STATE_GETCHUNKHEAD");
335
const char *p=inBuf.c_str();
336
const char *e=strstr(p, "\r\n");
338
{ // came back from reading, drop remaining junk?
347
return R_MOREINPUT; // get more data
350
int n = sscanf(p, "%x", &len);
352
unsigned int nCheadSize=e-p+2;
355
ldbg("ok, skip " << nCheadSize <<" bytes, " <<p);
356
inBuf.drop(nCheadSize);
358
m_DlState=STATE_GETDATA_CHUNKED;
362
// skip the additional \r\n of the null-sized part here as well
363
ldbg("looks like the end, but needs to get everything into buffer to change the state reliably");
364
if(inBuf.size() < nCheadSize+2 )
369
if( ! (e[2]=='\r' && e[3]=='\n'))
371
aclog::err(m_pStorage->m_sKey+" -- error in chunk format detected");
372
return R_ERROR_REMOTE;
375
inBuf.drop(nCheadSize+2);
376
m_DlState=STATE_FINISHJOB;
379
return R_ERROR_REMOTE; // that's bad...
385
return R_ERROR_REMOTE;