~ubuntu-branches/debian/wheezy/apt-cacher-ng/wheezy

« back to all changes in this revision

Viewing changes to source/dlcon.cc

  • Committer: Bazaar Package Importer
  • Author(s): Eduard Bloch
  • Date: 2011-03-28 00:49:59 UTC
  • mfrom: (30.1.10 sid)
  • Revision ID: james.westby@ubuntu.com-20110328004959-qpk7249q698djveb
Tags: 0.6.1-1
* New upstream release
* Improved BindInterfaces description (closes: #545337)

Show diffs side-by-side

added added

removed removed

Lines of Context:
6
6
#include "acfg.h"
7
7
#include "dlcon.h"
8
8
 
 
9
#include "tcpconnect.h"
9
10
#include "fileitem.h"
10
11
#include <errno.h>
11
12
#include "dljob.h"
18
19
 
19
20
 
20
21
dlcon::dlcon(bool bManualExecution, string *xff) :
21
 
m_bSingleRun(bManualExecution)
 
22
m_bStopWhenIdle(bManualExecution)
22
23
{
23
24
        LOGSTART("dlcon::dlcon");
24
25
        m_wakepipe[0]=m_wakepipe[1]=-1;
40
41
        if (m_wakepipe[1]>=0)
41
42
                POKE(m_wakepipe[1]);
42
43
}
 
44
 
43
45
void dlcon::AddJob(tFileItemPtr m_pItem, 
44
46
                acfg::tHostiVec *pBackends, const MYSTD::string & sPatSuffix)
45
47
{
46
 
        //lockguard g(m_pItem.get());
47
48
        EnqJob(new tDlJob(this, m_pItem, pBackends, sPatSuffix));
48
49
}
 
50
 
49
51
void dlcon::AddJob(tFileItemPtr m_pItem, tHttpUrl hi)
50
52
{
51
 
        //lockguard g(m_pItem.get());
52
53
        EnqJob(new tDlJob(this, m_pItem, hi.sHost, hi.sPath));
53
54
}
54
55
 
56
57
{
57
58
        setLockGuard;
58
59
        
59
 
        // stop all activity as far as possible
60
 
        m_bSingleRun=true;
61
 
        
 
60
        // stop all activity as soon as possible
 
61
        m_bStopWhenIdle=true;
 
62
 
62
63
        /* forget all but the first, those has to be fetched by the own downloader. If first
63
64
         * item is being downloaded it will bail out RSN because of getting into NOMOREUSERS state. */
64
65
        while(m_qToReceive.size()>1)
72
73
dlcon::~dlcon()
73
74
{
74
75
        LOGSTART("dlcon::~dlcon, Destroying dlcon");
75
 
        
76
76
        checkforceclose(m_wakepipe[0]);
77
 
 
78
77
        checkforceclose(m_wakepipe[1]);
79
 
                
80
78
}
81
 
        
 
79
 
82
80
void dlcon::WorkLoop()
83
81
{
84
82
        LOGSTART("dlcon::WorkLoop");
85
 
    string sCurrentlyConnectedHost, sErrorMsg;
 
83
    string sErrorMsg;
86
84
    tSS sendBuf;
87
85
    
88
86
    // tmp stuff
89
 
    tDlJob *pj=NULL;
90
 
    int r, nTolErrorCount(0);
 
87
    int nTolErrorCount(0);
91
88
          
92
89
        if (!m_InBuf.init(acfg::dlbufsize))
93
90
        {
104
101
        fd_set rfds, wfds;
105
102
        struct timeval tv;
106
103
               
107
 
        bool bStopRequesting=false; 
108
 
  
109
 
        while (true)
 
104
        bool bStopAddingRequests=false;
 
105
 
 
106
        // keep related state vars together
 
107
        struct
 
108
        {
 
109
                tTcpHandlePtr ptr;
 
110
                int state; // negative = not connected, otherwise count of requests in the pipeline
 
111
                int fd;
 
112
                inline void reset()
 
113
                {
 
114
                        ptr.reset();
 
115
                        state=fd=-1;
 
116
                }
 
117
                inline void recycle()
 
118
                {
 
119
                        tcpconnect::RecycleIdleConnection(ptr);
 
120
                        state = fd = -1;
 
121
                }
 
122
        } con;
 
123
        con.reset();
 
124
 
 
125
        next_dlcon_cycle:
 
126
        for(;;)
110
127
        {
111
128
                int nMaxFd=m_wakepipe[0];
112
129
                FD_ZERO(&rfds);
113
130
                FD_ZERO(&wfds);
114
131
        FD_SET(m_wakepipe[0], &rfds);
115
 
                
 
132
        tDlJob *pCurJob=NULL;
 
133
 
116
134
                {
 
135
                // needs to protect against concurent queue modification
117
136
                        setLockGuard;
118
137
                        
119
 
                        pj=NULL;
120
 
                        
121
138
                        dbgline;
122
139
                        if(m_qToReceive.empty())
123
140
                        {
124
 
                                if(m_bSingleRun)
 
141
                                dbgline;
 
142
                                // if we wait then it might time out, but others may make use of it ATM
 
143
                                if(0 == con.state)
 
144
                                        con.recycle();
 
145
 
 
146
                                // received a signal to stop the work as soon as the process is completed?
 
147
                                if(m_bStopWhenIdle)
125
148
                                        return;
126
149
                                
127
150
                                dbgline;
128
 
                                goto do_select; // wakepipe is in the fds set, just wait for parent there
 
151
 
 
152
                                // and just wait for parent notification there
 
153
                                goto ready_for_select;
129
154
                        }
130
155
 
131
 
                        pj = m_qToReceive.front();
132
 
 
133
 
 
134
 
                        if(pj->bSuggestReconnect)
 
156
                        pCurJob = m_qToReceive.front();
 
157
 
 
158
                        if(pCurJob->bSuggestReconnect)
135
159
                        {
 
160
                                pCurJob->bSuggestReconnect=false;
136
161
                                ldbg("found host change flag, prepare reconnection");
137
 
                                pj->bSuggestReconnect=false;
 
162
                                if(0 == con.state)
 
163
                                        con.recycle();
 
164
                                else
 
165
                                        con.reset();
138
166
                                nTolErrorCount=0; // new host, new luck...
139
 
                                sCurrentlyConnectedHost.clear();
140
 
                                dbgline;
141
167
                        }
142
 
                        if(sCurrentlyConnectedHost.empty())
 
168
 
 
169
                        /* fresh state and/or going for reconnection. Pick up a working hostname and
 
170
                    * connect to it.
 
171
                    * */
 
172
 
 
173
                        if(con.state<0)
143
174
                        {
144
175
                                dbgline;
145
176
                                // set this right now... no matter how the rest looks, current
147
178
                                //
148
179
                                m_InBuf.clear();
149
180
                                sendBuf.clear();
150
 
                                bStopRequesting=false;
 
181
                                bStopAddingRequests=false;
151
182
                                // just be sure about that
152
 
                                _Disconnect();
153
 
                                                                
154
 
                                /* fresh state and/or going for reconnection. Pick up a working hostname and
155
 
                            * connect to it.
156
 
                            * */
157
 
    
158
 
                                if(!pj->FindConfig()) // bad, no usable host in the first item...
 
183
                                con.reset();
 
184
 
 
185
                                // bring job queue into a fresh state
 
186
                                for(dljIter it=m_qToReceive.begin(); it!=m_qToReceive.end(); )
159
187
                                {
160
 
                                        pj->MarkFailed(sErrorMsg.empty() ? "503 No usable download host found" : sErrorMsg);
 
188
                                        tDlJob *p=*it;
 
189
                                        p->bSuggestReconnect=p->bRequestEnqueued=false;
161
190
                                        dbgline;
162
 
                                        
163
 
                                        delete pj;
 
191
                                        if(p->HasBrokenStorage())
 
192
                                        {
 
193
                                                if(p == pCurJob)
 
194
                                                        pCurJob=NULL;
 
195
                                                delete p;
 
196
                                                m_qToReceive.erase(it++);
 
197
                                                continue;
 
198
                                        }
 
199
                                        ++it;
 
200
                                }
 
201
                                if(!pCurJob) // lost front job, check the state
 
202
                                        continue;
 
203
 
 
204
                                if(!pCurJob->FindConfig()) // bad, no usable host in the first item...
 
205
                                {
 
206
                                        pCurJob->UnregDownloader(sErrorMsg.empty()
 
207
                                                        ? "503 No usable download host found"
 
208
                                                                        : sErrorMsg);
 
209
                                        delete pCurJob;
164
210
                                        m_qToReceive.pop_front();
165
211
                                        continue;
166
212
                                }
167
213
                                
168
 
                                const string sTargetHost = pj->GetPeerName();
169
 
 
170
 
                                acfg::tHostiVec::IHookHandler *pobs = pj->GetConnStateObserver();
 
214
                                // can do internal work without locking queue (against modifications)
 
215
                                __lockguard.unLock();
 
216
 
 
217
                                const string sTargetHost = pCurJob->GetPeerName();
 
218
                                ldbg("new target host: "<<sTargetHost);
 
219
 
 
220
 
 
221
                                acfg::tHostiVec::IHookHandler *pobs = pCurJob->GetConnStateObserver();
171
222
                                if(pobs)
172
223
                                        pobs->JobConnect();
173
224
 
174
 
                                __lockguard.unLock();
175
 
                                // might override it with proxy stuff
176
 
                                bool bOk=_Connect(sTargetHost, sErrorMsg);
177
 
 
178
 
                                ldbg("Connection ok? "<<bOk<<", remember host? "<<sTargetHost);
179
 
 
180
 
                                m_pConnStateObserver=pobs;
181
 
 
182
 
                if(bOk)
183
 
                                        sCurrentlyConnectedHost=sTargetHost;
184
 
                                
185
 
                dbgline;
186
 
                
 
225
                                con.ptr = tcpconnect::CreateConnected(sTargetHost, sErrorMsg);
 
226
 
 
227
                                ldbg("Connection state: "<<(con.ptr?"ok":"failed"));
 
228
 
 
229
                                if(con.ptr)
 
230
                                {
 
231
                        dbgline;
 
232
                                        con.ptr->SetStateObserver(pobs);
 
233
                                        con.state = 0;
 
234
                                        con.fd = con.ptr->GetFD();
 
235
                                }
 
236
                                else
 
237
                                        pCurJob->BlacklistBackend();
 
238
 
 
239
                // queue modification possible again
187
240
                                __lockguard.reLock();
188
241
                                dbgline;
189
242
                                
190
 
                        
191
 
                                if(!bOk)
192
 
                                        pj->BlacklistBackend();
193
 
                                
194
 
                                // weed out those with no way out, either in error state already, or 
195
 
                                // ran out of backends (and notify users in this case)
196
 
                                dljIter it=m_qToReceive.begin();
197
 
                                bool bPrimaryFailed=false;
198
 
                                while(it!=m_qToReceive.end())
 
243
                                // recheck the queue, drop those without any possible download strategy
 
244
                                for(dljIter it=m_qToReceive.begin(); it!=m_qToReceive.end(); )
199
245
                                {
200
246
                                        tDlJob *p=*it;
201
 
                                        p->bSuggestReconnect=p->bRequestPrepared=false;
202
 
                                        dbgline;
203
 
                                        
204
 
                                        bool bDropIt=p->HasBrokenStorage();
205
 
                                        if(!bDropIt)
206
 
                                        {
207
 
                                                dbgline;
208
 
                                                
209
 
                                                if( ! p->FindConfig())
210
 
                                                {
211
 
                                                        dbgline;
212
 
                                                        bDropIt=true;
213
 
                                                        p->MarkFailed(sErrorMsg); // sensible only for the first... whatever
214
 
                                                }
215
 
                                        }
216
 
                                        // ok, drop it for sure
217
 
                                        if(bDropIt)
218
 
                                        {
219
 
                                                if(pj == p)
220
 
                                                        bPrimaryFailed=true;
221
 
                                                
222
 
                                                delete *it;
223
 
                                                dljIter itmp=it++;
224
 
                                                m_qToReceive.erase(itmp);
225
 
                                                
226
 
                                                dbgline;
 
247
                                        if(!p->FindConfig()) // oh no, no more potential mirrors. Must tell the user.
 
248
                                        {
 
249
                                                p->UnregDownloader(sErrorMsg);
 
250
                                                if(pCurJob == p)
 
251
                                                        pCurJob = NULL;
 
252
                                                delete p;
 
253
                                                m_qToReceive.erase(it++);
227
254
                                                continue;
228
255
                                        }
229
 
                                        it++;
 
256
                                        ++it;
230
257
                                }
231
 
                                if(bPrimaryFailed)
 
258
                                // ooops, lost current job?
 
259
                                if(!pCurJob)
232
260
                                {
233
 
                                        ldbg("Resetting sCurrentlyConnectedHost, primary source failed");
234
 
                                        sCurrentlyConnectedHost.clear();
 
261
                                        // can shift the pointer to the next if the connection is usable?
 
262
                                        if(con.ptr && !m_qToReceive.empty()
 
263
                                                        && m_qToReceive.front()->GetPeerName() == con.ptr->GetHostname())
 
264
                                        {
 
265
                                                pCurJob=m_qToReceive.front();
 
266
                                        }
 
267
                                        else // this was a fresh connection, reuse it
 
268
                                                con.recycle();
 
269
                                        continue;
235
270
                                }
236
271
                                
237
 
                                for(it=m_qToReceive.begin(); it!=m_qToReceive.end(); it++)
 
272
                                for(dljIter it=m_qToReceive.begin(); it!=m_qToReceive.end(); it++)
238
273
                                {
239
274
                                        dbgline;
240
 
                                        if(! (*it)->AppendRequest(sTargetHost, sendBuf))
 
275
                                        tDlJob *p=*it;
 
276
                                        if(p->AppendRequest(sTargetHost, sendBuf, con.ptr->GetProxyData()))
 
277
                                                con.state++;
 
278
                                        else
241
279
                                        {
242
 
                                                // refused -> needs to reconnect later
243
 
                                                (*it)->bSuggestReconnect=true;
244
 
                                                bStopRequesting=true;  // MUST set this
 
280
                                                // refused? -> needs to reconnect later
 
281
                                                p->bSuggestReconnect=true;
 
282
                                                bStopAddingRequests=true;  // MUST set this
245
283
                                                dbgline;
246
284
                                                break;
247
285
                                        }
248
286
                                }
249
287
                                
250
 
                                if(!bOk) // failed somewhere above, recheck
 
288
                                if(!con.ptr) // failed above? will recheck the connection state
251
289
                                        continue;
 
290
 
252
291
                        } // (re)connect done
253
292
 
254
293
                        dbgline;
256
295
          
257
296
                } // end of locked section      
258
297
                
259
 
                                        
260
 
                if(m_conFd>=0)
 
298
                ready_for_select:
 
299
 
 
300
                if(con.fd>=0)
261
301
                {
262
 
                        FD_SET(m_conFd, &rfds);
263
 
                        nMaxFd=std::max(m_conFd, nMaxFd);
 
302
                        FD_SET(con.fd, &rfds);
 
303
                        nMaxFd=std::max(con.fd, nMaxFd);
264
304
                        
265
305
                        if(sendBuf.size())
266
306
                        {
267
307
                                ldbg("Needs to send " << sendBuf.size() << " bytes")
268
 
                                FD_SET(m_conFd, &wfds);
269
 
                                nMaxFd=std::max(m_conFd, nMaxFd);
 
308
                                FD_SET(con.fd, &wfds);
 
309
                                nMaxFd=std::max(con.fd, nMaxFd);
270
310
                        }
271
311
                }
272
312
 
273
 
                do_select:
274
 
                
275
313
                ldbg("select dlcon");
276
314
                tv.tv_sec = acfg::nettimeout;
277
315
                tv.tv_usec = 0;
278
316
                
279
 
                r=select(nMaxFd+1, &rfds, &wfds, NULL, &tv);
 
317
                int r=select(nMaxFd+1, &rfds, &wfds, NULL, &tv);
280
318
                
281
319
                if (r<0)
282
320
                {
288
326
                    sErrorMsg=string("500 Internal malfunction, ")+fer;
289
327
                    goto drop_and_restart_stream;
290
328
                }
291
 
                else if(r==0)
 
329
                else if(r==0) // must be a timeout
292
330
                {
293
 
                        if(!sCurrentlyConnectedHost.empty()) 
294
 
            {
295
 
                                // catch errno ASAP
296
 
                                aclog::errnoFmter fer;
297
 
                ldbg("Select timeout for sCurrentlyConnectedHost:" << sCurrentlyConnectedHost << ". Trigger reconnection..., m_sConnectedHost.clear()");
298
 
                sCurrentlyConnectedHost.clear();
299
 
                
300
 
                if(pj && pj->HasStarted())
301
 
                {
302
 
                        aclog::err("Warning, disconnected during package download");
303
 
                        sErrorMsg=string("500 Connection abort, ")+fer;
304
 
                        goto drop_and_restart_stream;
305
 
                }
306
 
            }
307
 
                        continue;
 
331
                        if(con.state>0)
 
332
                        {
 
333
                                sErrorMsg=aclog::errnoFmter("500 Connection abort, ");
 
334
                                aclog::err("Warning, disconnected during package download");
 
335
                        }
 
336
                        goto drop_and_restart_stream;
308
337
                }
309
338
 
310
339
                if (FD_ISSET(m_wakepipe[0], &rfds))
311
340
                {
 
341
                        dbgline;
312
342
                        for(int tmp; read(m_wakepipe[0], &tmp, 1) > 0; ) ;
313
343
                        
314
344
                        setLockGuard;
315
345
                        // got new stuff? and needs to prepare requests? and queue is not empty?
316
346
                        // walk around and send requests for them, if possible
317
 
                        if(     !bStopRequesting &&
318
 
                                !sCurrentlyConnectedHost.empty() &&
319
 
                                !m_qToReceive.empty())
 
347
                        if(     !bStopAddingRequests && con.ptr && !m_qToReceive.empty())
320
348
                        {
321
349
                                ldbg("Preparing requests for new stuff...");
322
350
                                
323
 
                                // walks backward from the end to find a good position where previous request-sending stoped,
324
 
                                // instead of just starting from the beginning.
325
 
                                
326
 
                                // WARNING!!! Optimistic code, relies on condition checks above
 
351
                                // walks backward from the end to find a good position where previous
 
352
                                // request-sending was interrupted instead of just starting from the beginning.
327
353
                                
328
354
                                dljIter it=m_qToReceive.end();
329
355
                                for(it--; it!=m_qToReceive.begin(); it--)
330
 
                                        if( (*it)->bRequestPrepared )
 
356
                                        if( (*it)->bRequestEnqueued )
331
357
                                                break;
332
358
                                
333
359
                                for(; it!=m_qToReceive.end(); it++)
334
360
                                {
335
361
                                        tDlJob *pjn=*it;
336
362
                                        // one position to far in backwalk
337
 
                                        if(pjn->bRequestPrepared)
 
363
                                        if(pjn->bRequestEnqueued)
338
364
                                                continue;
339
365
                                        
340
366
                                        // found the first unseen, do stuff
341
367
                                        
342
 
                                        pjn->bRequestPrepared=true;
343
 
                                        if(!pjn->FindConfig() || !pjn->AppendRequest(sCurrentlyConnectedHost, sendBuf)) 
 
368
                                        pjn->bRequestEnqueued=true;
 
369
                                        if(!pjn->FindConfig() || !pjn->AppendRequest(con.ptr->GetHostname(),
 
370
                                                        sendBuf, con.ptr->GetProxyData()))
344
371
                                        {
345
 
                                                // bad stuff, check later
346
 
                                                bStopRequesting=true;
 
372
                                                // bad stuff, to be reported later
 
373
                                                bStopAddingRequests=true;
347
374
                                                pjn->bSuggestReconnect=true;
348
375
                                                break;
349
376
                                        }
 
377
                                        else
 
378
                                                con.state++;
350
379
                                }
351
380
                        }
352
381
                }
353
382
 
354
 
                if (m_conFd>=0 && sendBuf.size() && FD_ISSET(m_conFd, &wfds))
 
383
                if (con.fd>=0 && sendBuf.size() && FD_ISSET(con.fd, &wfds))
355
384
                {
356
 
                        FD_CLR(m_conFd, &wfds);
357
 
                        
 
385
                        FD_CLR(con.fd, &wfds);
 
386
 
358
387
                        ldbg("Sending data...\n" << sendBuf);
359
 
                        int s=::send(m_conFd, sendBuf.data(), sendBuf.length(), MSG_NOSIGNAL);
 
388
                        int s=::send(con.fd, sendBuf.data(), sendBuf.length(), MSG_NOSIGNAL);
360
389
                        ldbg("Sent " << s << " bytes from " << sendBuf.length());
361
390
                        if (s<0)
362
391
                        {
371
400
                sendBuf.drop(s);
372
401
                }
373
402
            
374
 
                if (m_conFd>=0 && FD_ISSET(m_conFd, &rfds))
 
403
                if (con.fd>=0 && FD_ISSET(con.fd, &rfds))
375
404
                {
376
 
                        if(!pj) // huh, sends something without request? Maybe disconnect hang (zero read)?
 
405
                        if(!pCurJob) // huh, sends something without request? Maybe disconnect hang (zero read)?
377
406
                        {
378
407
                                ldbg("BUG: not expected data");
379
408
                                // just disconnect, and maybe reconnect
380
 
                                sCurrentlyConnectedHost.clear();
 
409
                                con.reset();
381
410
                                continue;
382
411
                        }
383
 
                        int r = m_InBuf.sysread(m_conFd);
 
412
                        int r = m_InBuf.sysread(con.fd);
384
413
                        if (r <= 0)
385
414
                        {
386
 
                                if(r==-EAGAIN)
387
 
                                        continue; // should never happen, though.
388
 
                                
389
 
                                // pickup the error code and make sure it's closed ASAP
390
 
                                aclog::errnoFmter f;
391
 
                                _Disconnect();
392
 
                                
393
 
                                if(pj)
 
415
                                // pickup the error code for later and kill current connection ASAP
 
416
                                sErrorMsg=aclog::errnoFmter("502 ");
 
417
                                con.reset();
 
418
                                
 
419
                                if(pCurJob)
394
420
                                {
395
421
                                        /*
396
422
                                         * disconnected? Maybe because of end of Keep-Alive sequence, or
401
427
                                        if(++nTolErrorCount < MAX_RETRIES)
402
428
                                        {
403
429
                                                
404
 
                                                if( ! pj->HasStarted())
 
430
                                                if( ! pCurJob->HasStarted())
405
431
                                                {
406
 
                                                        ldbg("MAX_RETRIES not reached, resetting sCurrentlyConnectedHost");
407
 
                                                        sCurrentlyConnectedHost.clear();
 
432
                                                        ldbg("MAX_RETRIES not reached, just reconnect");
 
433
                                                        con.reset();
408
434
                                                        continue;
409
435
                                                }
410
436
                                        }
411
437
                                        else
412
438
                                        {   // too many retries for this host :-(
413
 
                                                pj->BlacklistBackend();
 
439
                                                pCurJob->BlacklistBackend();
414
440
                                                nTolErrorCount=0;
415
441
                                        }
416
442
                                }
417
443
                                
418
 
                                sErrorMsg=string("502 ")+f;
419
444
                goto drop_and_restart_stream;
420
445
                        }
421
446
        }
422
447
 
423
 
                while( ! m_InBuf.empty() && pj)
 
448
                while( ! m_InBuf.empty() && pCurJob)
424
449
                {
425
 
                        ldbg("Processing input of " << pj->RemoteUri() );
426
 
                        tDlJob::tDlResult res=pj->ProcessIncomming(m_InBuf, sErrorMsg);
427
 
                        ldbg("... input processing result: " << res);
 
450
                        ldbg("Processing job for " << pCurJob->RemoteUri() );
 
451
                        tDlJob::tDlResult res=pCurJob->ProcessIncomming(m_InBuf, sErrorMsg);
 
452
                        ldbg("... incomming data processing result: " << res);
428
453
                        switch(res)
429
454
                        {
430
455
                                case(tDlJob::R_MOREINPUT):
431
456
                                case(tDlJob::R_NOOP):
432
 
                                        goto leave_recv_loop; // will get more in the next loop
 
457
                                        goto next_dlcon_cycle; // will get more in the next loop
433
458
                                case(tDlJob::R_SKIPITEM): // will restart the stream
434
459
                                // item was ours but download aborted... might do more fine recovery in the future
435
460
                                case(tDlJob::R_ERROR_LOCAL):
440
465
                                case(tDlJob::R_DONE):
441
466
                                {
442
467
                                        nTolErrorCount=0; // well done and can sleep now, should be resumed carefully later
 
468
                                        con.state--;
443
469
                                        
444
470
                                        setLockGuard;
445
 
                                        delete pj;
 
471
                                        delete pCurJob;
446
472
                                        m_qToReceive.pop_front();
447
 
                                        pj = m_qToReceive.empty() ? NULL : m_qToReceive.front();
 
473
                                        pCurJob = m_qToReceive.empty() ? NULL : m_qToReceive.front();
448
474
                                        ldbg("Remaining dlitems: " << m_qToReceive.size());
449
475
                                        continue;
450
476
                                }
451
477
                                break;
452
478
                        }
453
479
        }
454
 
                leave_recv_loop:
455
480
                
456
481
                continue;
457
482
                
458
483
                drop_and_restart_stream:
459
 
                if(pj)
 
484
 
 
485
                ldbg("Resetting pCurJob, resetting connection");
 
486
                con.reset();
 
487
                if(pCurJob)
460
488
                {
461
 
                        ldbg("Resetting pj, resetting sCurrentlyConnectedHost");
462
 
                        pj->MarkFailed(sErrorMsg);
463
 
                        sCurrentlyConnectedHost.clear();
 
489
                        pCurJob->UnregDownloader(sErrorMsg);
464
490
                        setLockGuard;
465
 
                        delete pj;
 
491
                        delete pCurJob;
466
492
                        m_qToReceive.pop_front();
467
 
                        pj=NULL;
 
493
                        pCurJob=NULL;
468
494
                        continue;
469
495
                }
470
496
        }
471
497
}
472