104
101
fd_set rfds, wfds;
105
102
struct timeval tv;
107
bool bStopRequesting=false;
104
bool bStopAddingRequests=false;
106
// keep related state vars together
110
int state; // negative = not connected, otherwise count of requests in the pipeline
117
inline void recycle()
119
tcpconnect::RecycleIdleConnection(ptr);
111
128
int nMaxFd=m_wakepipe[0];
114
131
FD_SET(m_wakepipe[0], &rfds);
132
tDlJob *pCurJob=NULL;
135
// needs to protect against concurent queue modification
122
139
if(m_qToReceive.empty())
142
// if we wait then it might time out, but others may make use of it ATM
146
// received a signal to stop the work as soon as the process is completed?
128
goto do_select; // wakepipe is in the fds set, just wait for parent there
152
// and just wait for parent notification there
153
goto ready_for_select;
131
pj = m_qToReceive.front();
134
if(pj->bSuggestReconnect)
156
pCurJob = m_qToReceive.front();
158
if(pCurJob->bSuggestReconnect)
160
pCurJob->bSuggestReconnect=false;
136
161
ldbg("found host change flag, prepare reconnection");
137
pj->bSuggestReconnect=false;
138
166
nTolErrorCount=0; // new host, new luck...
139
sCurrentlyConnectedHost.clear();
142
if(sCurrentlyConnectedHost.empty())
169
/* fresh state and/or going for reconnection. Pick up a working hostname and
145
176
// set this right now... no matter how the rest looks, current
150
bStopRequesting=false;
181
bStopAddingRequests=false;
151
182
// just be sure about that
154
/* fresh state and/or going for reconnection. Pick up a working hostname and
158
if(!pj->FindConfig()) // bad, no usable host in the first item...
185
// bring job queue into a fresh state
186
for(dljIter it=m_qToReceive.begin(); it!=m_qToReceive.end(); )
160
pj->MarkFailed(sErrorMsg.empty() ? "503 No usable download host found" : sErrorMsg);
189
p->bSuggestReconnect=p->bRequestEnqueued=false;
191
if(p->HasBrokenStorage())
196
m_qToReceive.erase(it++);
201
if(!pCurJob) // lost front job, check the state
204
if(!pCurJob->FindConfig()) // bad, no usable host in the first item...
206
pCurJob->UnregDownloader(sErrorMsg.empty()
207
? "503 No usable download host found"
164
210
m_qToReceive.pop_front();
168
const string sTargetHost = pj->GetPeerName();
170
acfg::tHostiVec::IHookHandler *pobs = pj->GetConnStateObserver();
214
// can do internal work without locking queue (against modifications)
215
__lockguard.unLock();
217
const string sTargetHost = pCurJob->GetPeerName();
218
ldbg("new target host: "<<sTargetHost);
221
acfg::tHostiVec::IHookHandler *pobs = pCurJob->GetConnStateObserver();
172
223
pobs->JobConnect();
174
__lockguard.unLock();
175
// might override it with proxy stuff
176
bool bOk=_Connect(sTargetHost, sErrorMsg);
178
ldbg("Connection ok? "<<bOk<<", remember host? "<<sTargetHost);
180
m_pConnStateObserver=pobs;
183
sCurrentlyConnectedHost=sTargetHost;
225
con.ptr = tcpconnect::CreateConnected(sTargetHost, sErrorMsg);
227
ldbg("Connection state: "<<(con.ptr?"ok":"failed"));
232
con.ptr->SetStateObserver(pobs);
234
con.fd = con.ptr->GetFD();
237
pCurJob->BlacklistBackend();
239
// queue modification possible again
187
240
__lockguard.reLock();
192
pj->BlacklistBackend();
194
// weed out those with no way out, either in error state already, or
195
// ran out of backends (and notify users in this case)
196
dljIter it=m_qToReceive.begin();
197
bool bPrimaryFailed=false;
198
while(it!=m_qToReceive.end())
243
// recheck the queue, drop those without any possible download strategy
244
for(dljIter it=m_qToReceive.begin(); it!=m_qToReceive.end(); )
201
p->bSuggestReconnect=p->bRequestPrepared=false;
204
bool bDropIt=p->HasBrokenStorage();
209
if( ! p->FindConfig())
213
p->MarkFailed(sErrorMsg); // sensible only for the first... whatever
216
// ok, drop it for sure
224
m_qToReceive.erase(itmp);
247
if(!p->FindConfig()) // oh no, no more potential mirrors. Must tell the user.
249
p->UnregDownloader(sErrorMsg);
253
m_qToReceive.erase(it++);
258
// ooops, lost current job?
233
ldbg("Resetting sCurrentlyConnectedHost, primary source failed");
234
sCurrentlyConnectedHost.clear();
261
// can shift the pointer to the next if the connection is usable?
262
if(con.ptr && !m_qToReceive.empty()
263
&& m_qToReceive.front()->GetPeerName() == con.ptr->GetHostname())
265
pCurJob=m_qToReceive.front();
267
else // this was a fresh connection, reuse it
237
for(it=m_qToReceive.begin(); it!=m_qToReceive.end(); it++)
272
for(dljIter it=m_qToReceive.begin(); it!=m_qToReceive.end(); it++)
240
if(! (*it)->AppendRequest(sTargetHost, sendBuf))
276
if(p->AppendRequest(sTargetHost, sendBuf, con.ptr->GetProxyData()))
242
// refused -> needs to reconnect later
243
(*it)->bSuggestReconnect=true;
244
bStopRequesting=true; // MUST set this
280
// refused? -> needs to reconnect later
281
p->bSuggestReconnect=true;
282
bStopAddingRequests=true; // MUST set this
250
if(!bOk) // failed somewhere above, recheck
288
if(!con.ptr) // failed above? will recheck the connection state
252
291
} // (re)connect done
257
296
} // end of locked section
262
FD_SET(m_conFd, &rfds);
263
nMaxFd=std::max(m_conFd, nMaxFd);
302
FD_SET(con.fd, &rfds);
303
nMaxFd=std::max(con.fd, nMaxFd);
265
305
if(sendBuf.size())
267
307
ldbg("Needs to send " << sendBuf.size() << " bytes")
268
FD_SET(m_conFd, &wfds);
269
nMaxFd=std::max(m_conFd, nMaxFd);
308
FD_SET(con.fd, &wfds);
309
nMaxFd=std::max(con.fd, nMaxFd);
275
313
ldbg("select dlcon");
276
314
tv.tv_sec = acfg::nettimeout;
279
r=select(nMaxFd+1, &rfds, &wfds, NULL, &tv);
317
int r=select(nMaxFd+1, &rfds, &wfds, NULL, &tv);
288
326
sErrorMsg=string("500 Internal malfunction, ")+fer;
289
327
goto drop_and_restart_stream;
329
else if(r==0) // must be a timeout
293
if(!sCurrentlyConnectedHost.empty())
296
aclog::errnoFmter fer;
297
ldbg("Select timeout for sCurrentlyConnectedHost:" << sCurrentlyConnectedHost << ". Trigger reconnection..., m_sConnectedHost.clear()");
298
sCurrentlyConnectedHost.clear();
300
if(pj && pj->HasStarted())
302
aclog::err("Warning, disconnected during package download");
303
sErrorMsg=string("500 Connection abort, ")+fer;
304
goto drop_and_restart_stream;
333
sErrorMsg=aclog::errnoFmter("500 Connection abort, ");
334
aclog::err("Warning, disconnected during package download");
336
goto drop_and_restart_stream;
310
339
if (FD_ISSET(m_wakepipe[0], &rfds))
312
342
for(int tmp; read(m_wakepipe[0], &tmp, 1) > 0; ) ;
315
345
// got new stuff? and needs to prepare requests? and queue is not empty?
316
346
// walk around and send requests for them, if possible
317
if( !bStopRequesting &&
318
!sCurrentlyConnectedHost.empty() &&
319
!m_qToReceive.empty())
347
if( !bStopAddingRequests && con.ptr && !m_qToReceive.empty())
321
349
ldbg("Preparing requests for new stuff...");
323
// walks backward from the end to find a good position where previous request-sending stoped,
324
// instead of just starting from the beginning.
326
// WARNING!!! Optimistic code, relies on condition checks above
351
// walks backward from the end to find a good position where previous
352
// request-sending was interrupted instead of just starting from the beginning.
328
354
dljIter it=m_qToReceive.end();
329
355
for(it--; it!=m_qToReceive.begin(); it--)
330
if( (*it)->bRequestPrepared )
356
if( (*it)->bRequestEnqueued )
333
359
for(; it!=m_qToReceive.end(); it++)
336
362
// one position to far in backwalk
337
if(pjn->bRequestPrepared)
363
if(pjn->bRequestEnqueued)
340
366
// found the first unseen, do stuff
342
pjn->bRequestPrepared=true;
343
if(!pjn->FindConfig() || !pjn->AppendRequest(sCurrentlyConnectedHost, sendBuf))
368
pjn->bRequestEnqueued=true;
369
if(!pjn->FindConfig() || !pjn->AppendRequest(con.ptr->GetHostname(),
370
sendBuf, con.ptr->GetProxyData()))
345
// bad stuff, check later
346
bStopRequesting=true;
372
// bad stuff, to be reported later
373
bStopAddingRequests=true;
347
374
pjn->bSuggestReconnect=true;
354
if (m_conFd>=0 && sendBuf.size() && FD_ISSET(m_conFd, &wfds))
383
if (con.fd>=0 && sendBuf.size() && FD_ISSET(con.fd, &wfds))
356
FD_CLR(m_conFd, &wfds);
385
FD_CLR(con.fd, &wfds);
358
387
ldbg("Sending data...\n" << sendBuf);
359
int s=::send(m_conFd, sendBuf.data(), sendBuf.length(), MSG_NOSIGNAL);
388
int s=::send(con.fd, sendBuf.data(), sendBuf.length(), MSG_NOSIGNAL);
360
389
ldbg("Sent " << s << " bytes from " << sendBuf.length());
374
if (m_conFd>=0 && FD_ISSET(m_conFd, &rfds))
403
if (con.fd>=0 && FD_ISSET(con.fd, &rfds))
376
if(!pj) // huh, sends something without request? Maybe disconnect hang (zero read)?
405
if(!pCurJob) // huh, sends something without request? Maybe disconnect hang (zero read)?
378
407
ldbg("BUG: not expected data");
379
408
// just disconnect, and maybe reconnect
380
sCurrentlyConnectedHost.clear();
383
int r = m_InBuf.sysread(m_conFd);
412
int r = m_InBuf.sysread(con.fd);
387
continue; // should never happen, though.
389
// pickup the error code and make sure it's closed ASAP
415
// pickup the error code for later and kill current connection ASAP
416
sErrorMsg=aclog::errnoFmter("502 ");
396
422
* disconnected? Maybe because of end of Keep-Alive sequence, or
401
427
if(++nTolErrorCount < MAX_RETRIES)
404
if( ! pj->HasStarted())
430
if( ! pCurJob->HasStarted())
406
ldbg("MAX_RETRIES not reached, resetting sCurrentlyConnectedHost");
407
sCurrentlyConnectedHost.clear();
432
ldbg("MAX_RETRIES not reached, just reconnect");
412
438
{ // too many retries for this host :-(
413
pj->BlacklistBackend();
439
pCurJob->BlacklistBackend();
414
440
nTolErrorCount=0;
418
sErrorMsg=string("502 ")+f;
419
444
goto drop_and_restart_stream;
423
while( ! m_InBuf.empty() && pj)
448
while( ! m_InBuf.empty() && pCurJob)
425
ldbg("Processing input of " << pj->RemoteUri() );
426
tDlJob::tDlResult res=pj->ProcessIncomming(m_InBuf, sErrorMsg);
427
ldbg("... input processing result: " << res);
450
ldbg("Processing job for " << pCurJob->RemoteUri() );
451
tDlJob::tDlResult res=pCurJob->ProcessIncomming(m_InBuf, sErrorMsg);
452
ldbg("... incomming data processing result: " << res);
430
455
case(tDlJob::R_MOREINPUT):
431
456
case(tDlJob::R_NOOP):
432
goto leave_recv_loop; // will get more in the next loop
457
goto next_dlcon_cycle; // will get more in the next loop
433
458
case(tDlJob::R_SKIPITEM): // will restart the stream
434
459
// item was ours but download aborted... might do more fine recovery in the future
435
460
case(tDlJob::R_ERROR_LOCAL):
440
465
case(tDlJob::R_DONE):
442
467
nTolErrorCount=0; // well done and can sleep now, should be resumed carefully later
446
472
m_qToReceive.pop_front();
447
pj = m_qToReceive.empty() ? NULL : m_qToReceive.front();
473
pCurJob = m_qToReceive.empty() ? NULL : m_qToReceive.front();
448
474
ldbg("Remaining dlitems: " << m_qToReceive.size());
458
483
drop_and_restart_stream:
485
ldbg("Resetting pCurJob, resetting connection");
461
ldbg("Resetting pj, resetting sCurrentlyConnectedHost");
462
pj->MarkFailed(sErrorMsg);
463
sCurrentlyConnectedHost.clear();
489
pCurJob->UnregDownloader(sErrorMsg);
466
492
m_qToReceive.pop_front();